Commit 9b0a676b authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Major change for infinite flow management


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@663 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent cf0d8e46
Loading
Loading
Loading
Loading
+80 −73
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
 * $Id: nmxptool.c,v 1.115 2008-01-17 13:58:37 mtheo Exp $
 * $Id: nmxptool.c,v 1.116 2008-01-17 16:23:31 mtheo Exp $
 *
 */

@@ -66,7 +66,7 @@ static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
#endif

static void save_channel_states();
static void save_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
static void flushing_raw_data_stream();

@@ -90,7 +90,7 @@ int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
NMXPTOOL_CHAN_SEQ *channelList_Seq = NULL;
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

@@ -233,21 +233,17 @@ int main (int argc, char **argv) {
    } else {
	nmxp_chan_print_netchannelList(channelList_subset);

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelList_Seq.\n");

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	/* init channelList_Seq */
	channelList_Seq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
	    channelListSeq[i_chan].x_1 = 0;
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
	}

	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	    channelList_Seq[i_chan].significant = 0;
	    channelList_Seq[i_chan].last_time = 0.0;
	    channelList_Seq[i_chan].last_time_call_raw_stream = 0;
	    channelList_Seq[i_chan].x_1 = 0;
	    channelList_Seq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
	    nmxp_raw_stream_init(&(channelList_Seq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
	}

#ifdef HAVE_LIBMSEED
@@ -300,6 +296,10 @@ int main (int argc, char **argv) {

    while(times_flow < 2) {

	if(params.statefile) {
	    load_channel_states(channelList_subset, channelList_Seq);
	}

	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
@@ -309,10 +309,6 @@ int main (int argc, char **argv) {
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	    if(params.statefile) {
		load_channel_states(channelList_subset, channelListSeq);
	    }

	}

    /* TODO condition starting DAP or PDS */
@@ -373,8 +369,8 @@ int main (int argc, char **argv) {
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {

		if(params.statefile) {
		    if(channelListSeq[i_chan].after_start_time > 0) {
			params.start_time = channelListSeq[i_chan].after_start_time;
		    if(channelList_Seq[i_chan].after_start_time > 0) {
			params.start_time = channelList_Seq[i_chan].after_start_time;
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
			    nmxp_data_to_str(start_time_str, params.start_time);
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
@@ -385,9 +381,10 @@ int main (int argc, char **argv) {
		    } else {
			params.start_time = default_start_time;
		    }
		}
		    channelList_Seq[i_chan].last_time = params.start_time;
		    channelList_Seq[i_chan].significant = 1;

		channelListSeq[i_chan].last_time = params.start_time;
		}

		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
@@ -483,21 +480,24 @@ int main (int argc, char **argv) {

			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
			if(i_chan != cur_chan) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "i_chan != cur_chan  %d != %d!\n", i_chan, cur_chan);
			}

			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			if(!channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
			    channelList_Seq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
			    if(channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelList_Seq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelList_Seq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			if(channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
			    channelList_Seq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}

#ifdef HAVE_LIBMSEED
@@ -532,7 +532,7 @@ int main (int argc, char **argv) {

			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			    channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
@@ -608,8 +608,6 @@ int main (int argc, char **argv) {
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

	save_channel_states();

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");

    } else {
@@ -733,15 +731,15 @@ int main (int argc, char **argv) {
	    if(pd &&
		    (params.statefile  ||  params.buffered_time)
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		if(params.statefile && channelList_Seq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelList_Seq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
@@ -768,8 +766,8 @@ int main (int argc, char **argv) {

		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			nmxp_raw_stream_manage(&(channelList_Seq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelList_Seq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }

		    /* Check timeout for other channels */
@@ -777,13 +775,13 @@ int main (int argc, char **argv) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
			    timeout_for_channel = nmxp_data_gmtime_now() - channelList_Seq[to_cur_chan].last_time_call_raw_stream;
			    if(channelList_Seq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
				nmxp_raw_stream_manage(&(channelList_Seq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelList_Seq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
			}
@@ -793,19 +791,19 @@ int main (int argc, char **argv) {

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			if(!channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
			    channelList_Seq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
			    if(channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelList_Seq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelList_Seq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			if(channelList_Seq[cur_chan].significant && pd->nSamp > 0) {
			    channelList_Seq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}


@@ -829,7 +827,7 @@ int main (int argc, char **argv) {
	    if(pd) {
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		    channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
@@ -859,8 +857,6 @@ int main (int argc, char **argv) {
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();

	save_channel_states();

#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
@@ -889,6 +885,10 @@ int main (int argc, char **argv) {
	times_flow = TIMES_FLOW_EXIT;
    }

    if(params.statefile) {
	save_channel_states(channelList_subset, channelList_Seq);
    }

    }

#ifdef HAVE_EARTHWORMOBJS
@@ -908,11 +908,11 @@ int main (int argc, char **argv) {
#endif

    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
	nmxp_raw_stream_free(&(channelList_Seq[i_chan].raw_stream_buffer));
    }

    if(channelListSeq) {
	free(channelListSeq);
    if(channelList_Seq) {
	free(channelList_Seq);
    }

    /* This has to be tha last */
@@ -927,7 +927,7 @@ int main (int argc, char **argv) {
#define MAX_LEN_FILENAME 4096
#define NMXP_STR_STATE_EXT ".nmxpstate"

static void save_channel_states() {
static void save_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
@@ -948,19 +948,20 @@ static void save_channel_states() {
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
	while(to_cur_chan < chan_list->number) {
	    nmxp_data_to_str(last_time_str, chan_list_seq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, chan_list_seq[to_cur_chan].raw_stream_buffer.last_sample_time);
	    sprintf(state_line_str, "%s %s %s",
		    channelList_subset->channel[to_cur_chan].name,
		    chan_list->channel[to_cur_chan].name,
		    last_time_str,
		    raw_last_sample_time_str
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
		fprintf(fstatefile, "%s\n", state_line_str);
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
		if( (chan_list_seq[to_cur_chan].last_time != 0) || (chan_list_seq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s %d %d %f %f\n", state_line_str, to_cur_chan, chan_list->channel[to_cur_chan].key,
			    chan_list_seq[to_cur_chan].last_time, chan_list_seq[to_cur_chan].raw_stream_buffer.last_sample_time);
		} else {
		    /* Do nothing */
		}
@@ -1028,17 +1029,19 @@ void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Parsing time '%s'\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Parsing time '%s'\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 

		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_EXTRA, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 

		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
@@ -1046,13 +1049,15 @@ void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s (%d %d) starting from %s. %f.\n", s_chan, cur_chan, chan_list->channel[cur_chan].key, s_rawtime_s, s_rawtime_f_calc); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s (%d %d) starting from %s. %f.\n", s_chan, cur_chan, chan_list->channel[cur_chan].key, s_noraw_time_s, s_noraw_time_f_calc); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Channel %s not found!\n", s_chan); 
		}
	    }
	    fclose(fstatefile);
@@ -1071,7 +1076,7 @@ static void flushing_raw_data_stream() {
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    nmxp_raw_stream_manage(&(channelList_Seq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
@@ -1085,7 +1090,9 @@ static void clientShutdown(int sig) {

    flushing_raw_data_stream();

    save_channel_states();
    if(params.statefile) {
	save_channel_states(channelList_subset, channelList_Seq);
    }

    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
@@ -1133,11 +1140,11 @@ static void clientShutdown(int sig) {
#endif

    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
	nmxp_raw_stream_free(&(channelList_Seq[i_chan].raw_stream_buffer));
    }

    if(channelListSeq) {
	free(channelListSeq);
    if(channelList_Seq) {
	free(channelList_Seq);
    }

    /* This has to be the last */