Commit 8e81d33d authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Added function load_channel_states

Added management of 'after_start_time' for each channel


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@576 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent 370f5a31
Loading
Loading
Loading
Loading
+97 −15
Original line number Original line Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *	quintiliani@ingv.it
 *
 *
 * $Id: nmxptool.c,v 1.100 2007-12-17 11:05:32 mtheo Exp $
 * $Id: nmxptool.c,v 1.101 2007-12-19 14:13:59 mtheo Exp $
 *
 *
 */
 */


@@ -63,6 +63,7 @@ static void clientDummyHandler(int sig);
#endif
#endif


static void save_channel_states();
static void save_channel_states();
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
static void flushing_raw_data_stream();
static void flushing_raw_data_stream();


#ifdef HAVE_LIBMSEED
#ifdef HAVE_LIBMSEED
@@ -123,6 +124,10 @@ int main (int argc, char **argv) {
    char filename[500];
    char filename[500];
    char station_code[20], channel_code[20], network_code[20];
    char station_code[20], channel_code[20], network_code[20];


    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
	

    NMXP_DATA_PROCESS *pd;
    NMXP_DATA_PROCESS *pd;


#ifdef HAVE_LIBMSEED
#ifdef HAVE_LIBMSEED
@@ -217,10 +222,14 @@ int main (int argc, char **argv) {
	    channelListSeq[i_chan].last_time = 0.0;
	    channelListSeq[i_chan].last_time = 0.0;
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
	    channelListSeq[i_chan].x_1 = 0;
	    channelListSeq[i_chan].x_1 = 0;
	    channelListSeq[i_chan].after_start_time = 0.0;
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
	}
	}


	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

#ifdef HAVE_LIBMSEED
#ifdef HAVE_LIBMSEED
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");


@@ -595,8 +604,7 @@ int main (int argc, char **argv) {
	}
	}
#endif
#endif


	double after_start_time = params.buffered_time;
	skip_current_packet = 0;
	int skip_current_packet = 0;
	
	
	while(exitpdscondition) {
	while(exitpdscondition) {


@@ -622,15 +630,27 @@ int main (int argc, char **argv) {
	    }
	    }


	    skip_current_packet = 0;
	    skip_current_packet = 0;
	    if(pd) {
	    if(pd &&
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= after_start_time) {
		(params.statefile  ||  params.buffered_time)
		    if(pd->time < after_start_time) {
	      )	{
			int first_nsample_to_remove = (after_start_time - pd->time) * (double) pd->sampRate;
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		char cur_after_start_time_str[1024];
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
			/* Remove the first sample in order avoiding overlap  */
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
			    pd->nSamp -= first_nsample_to_remove;
			    pd->time = after_start_time;
			    pd->time = cur_after_start_time;
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			    pd->x0 = pd->pDataPtr[0];
			} else {
			} else {
@@ -800,9 +820,10 @@ static void save_channel_states() {
    char last_time_str[30];
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
    char state_line_str[1000];
    FILE *fstatefile = NULL;


    if(params.statefile) {
    if(params.statefile) {
	FILE *fstatefile = fopen(params.statefile, "w");
	fstatefile = fopen(params.statefile, "w");
	if(fstatefile == NULL) {
	if(fstatefile == NULL) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", params.statefile);
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", params.statefile);
	} else {
	} else {
@@ -815,17 +836,18 @@ static void save_channel_states() {
	while(to_cur_chan < channelList_subset->number) {
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
	    sprintf(state_line_str, "%-14s %16.4f %s %16.4f %s",
	    sprintf(state_line_str, "%s %s %s",
		    channelList_subset->channel[to_cur_chan].name,
		    channelList_subset->channel[to_cur_chan].name,
		    channelListSeq[to_cur_chan].last_time, last_time_str,
		    last_time_str,
		    channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time, raw_last_sample_time_str
		    raw_last_sample_time_str
		   );
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
	    if(fstatefile) {
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		fprintf(fstatefile, "%s\n", state_line_str);
		fprintf(fstatefile, "%s\n", state_line_str);
		} else {
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
		} else {
		    /* Do nothing */
		}
		}
	    }
	    }
	    to_cur_chan++;
	    to_cur_chan++;
@@ -836,6 +858,66 @@ static void save_channel_states() {
    }
    }
}
}


void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;

    if(params.statefile) {
	fstatefile = fopen(params.statefile, "r");
	if(fstatefile == NULL) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", params.statefile);
	} else {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", params.statefile);
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
	       	s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
}



static void flushing_raw_data_stream() {
static void flushing_raw_data_stream() {
    int to_cur_chan;
    int to_cur_chan;