Commit 424d7715 authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Added management for infinite flow


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@646 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent c7da4317
Loading
Loading
Loading
Loading
+64 −8
Original line number Original line Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *	quintiliani@ingv.it
 *
 *
 * $Id: nmxptool.c,v 1.108 2008-01-16 10:51:44 mtheo Exp $
 * $Id: nmxptool.c,v 1.109 2008-01-17 08:15:00 mtheo Exp $
 *
 *
 */
 */


@@ -42,6 +42,8 @@
#include "seedlink_plugin.h"
#include "seedlink_plugin.h"
#endif
#endif


#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )

#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )


@@ -128,6 +130,8 @@ int main (int argc, char **argv) {
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
    int skip_current_packet = 0;


    int times_flow = 0;
    double default_start_time = 0.0;


    NMXP_DATA_PROCESS *pd;
    NMXP_DATA_PROCESS *pd;


@@ -207,7 +211,7 @@ int main (int argc, char **argv) {
    nmxptool_log_params(&params);
    nmxptool_log_params(&params);


    /* Get list of available channels and get a subset list of params.channels */
    /* Get list of available channels and get a subset list of params.channels */
    if(params.start_time != 0.0  &&  params.end_time != 0.0) {
    if( DAP_CONDITION(params) ) {
	/* From DataServer */
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
    } else {
@@ -281,12 +285,37 @@ int main (int argc, char **argv) {


    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");


    times_flow = 0;

    while(times_flow < 2) {

	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	    if(params.statefile) {
		load_channel_states(channelList_subset, channelListSeq);
	    }

	}

    /* TODO condition starting DAP or PDS */
    /* TODO condition starting DAP or PDS */
    if( (params.start_time != 0.0   &&   params.end_time != 0.0)
    if( DAP_CONDITION(params) || (times_flow == 0  &&  params.statefile  && params.interval == DEFAULT_INTERVAL_INFINITE) ) {
	    || params.delay > 0
      ) {


	if(params.delay > 0) {
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");

	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
	    params.end_time = params.start_time + span_interval;
	    params.end_time = params.start_time + span_interval;
	}
	}
@@ -322,9 +351,9 @@ int main (int argc, char **argv) {


	exitdapcondition = 1;
	exitdapcondition = 1;


	while(exitdapcondition) {
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - 10.0;


	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %.4f - end_time = %.4f\n", params.start_time, params.end_time);
	while(exitdapcondition) {


	    /* Start loop for sending requests */
	    /* Start loop for sending requests */
	    i_chan=0;
	    i_chan=0;
@@ -332,6 +361,20 @@ int main (int argc, char **argv) {


	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {


		if(params.statefile) {
		    if(channelListSeq[i_chan].after_start_time > 0) {
			params.start_time = channelListSeq[i_chan].after_start_time;
		    } else {
			params.start_time = default_start_time;
		    }
		}

		char start_time_str[30], end_time_str[30], default_start_time_str[30];
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %s - end_time = %s - (default_start_time = %s)\n", start_time_str, end_time_str, default_start_time_str);

		/* DAP Step 5: Send Data Request */
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));


@@ -522,10 +565,14 @@ int main (int argc, char **argv) {
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */
	/* ************************************************************ */


	save_channel_states();


	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");


    } else {
    } else {


	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

	if(params.stc == -1) {
	if(params.stc == -1) {




@@ -774,6 +821,7 @@ int main (int argc, char **argv) {


	/* Flush raw data stream for each channel */
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
	flushing_raw_data_stream();

	save_channel_states();
	save_channel_states();


#ifdef HAVE_EARTHWORMOBJS
#ifdef HAVE_EARTHWORMOBJS
@@ -800,7 +848,15 @@ int main (int argc, char **argv) {
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */
	/* *********************************************************** */


	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End PDS Flow.\n");


    }

    if(params.interval == DEFAULT_INTERVAL_INFINITE) {
	times_flow++;
    } else {
	times_flow = 100;
    }


    }
    }


+5 −13
Original line number Original line Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *	quintiliani@ingv.it
 *
 *
 * $Id: nmxptool_getoptlong.c,v 1.53 2008-01-09 17:16:30 mtheo Exp $
 * $Id: nmxptool_getoptlong.c,v 1.54 2008-01-17 08:14:44 mtheo Exp $
 *
 *
 */
 */


@@ -31,7 +31,7 @@ const NMXPTOOL_PARAMS NMXPTOOL_PARAMS_DEFAULT =
    NULL,
    NULL,
    0.0,
    0.0,
    0.0,
    0.0,
    DEFAULT_INTERVAL,
    DEFAULT_INTERVAL_NO_VALUE,
    NULL,
    NULL,
    NULL,
    NULL,
    DEFAULT_STC,
    DEFAULT_STC,
@@ -700,11 +700,6 @@ void nmxptool_log_params(NMXPTOOL_PARAMS *params) {
int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
    int ret = 0;
    int ret = 0;


    if(params->start_time != 0.0 && params->interval != 0   &&   params->end_time == 0.0) {
	params->end_time = params->start_time + params->interval;
	params->interval = 0;
    }

    if(params->ew_configuration_file != NULL) {
    if(params->ew_configuration_file != NULL) {
	/* Do nothing */
	/* Do nothing */
    } else if(params->hostname == NULL) {
    } else if(params->hostname == NULL) {
@@ -726,12 +721,9 @@ int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
    } else if(params->start_time == 0.0 &&  params->end_time != 0.0) {
    } else if(params->start_time == 0.0 &&  params->end_time != 0.0) {
	ret = -1;
	ret = -1;
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<end_time> is required when declaring <start_time>!\n");
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<end_time> is required when declaring <start_time>!\n");
    } else if(params->start_time != 0.0 &&  params->end_time == 0.0) {
    } else if(params->start_time != 0.0  &&  params->end_time != 0.0  && params->interval != DEFAULT_INTERVAL_NO_VALUE) {
	ret = -1;
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<start_time> has to be used with <end_time> or <interval>!\n");
    } else if(params->start_time != 0.0 && params->interval != 0   &&   params->end_time != 0.0) {
	ret = -1;
	ret = -1;
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<end_time> and <interval> can not be used together!\n");
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<start_time> has to be used with either <end_time> or <interval>!\n");
    } else if(params->start_time != 0.0   &&   params->end_time != 0.0
    } else if(params->start_time != 0.0   &&   params->end_time != 0.0
	    && params->start_time >= params->end_time) {
	    && params->start_time >= params->end_time) {
	ret = -1;
	ret = -1;
@@ -764,7 +756,7 @@ int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<rate> can not be used with options <start_time> and <end_time>.\n");
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<rate> can not be used with options <start_time> and <end_time>.\n");
    } else if(params->flag_buffered != 0 && params->start_time != 0.0   &&   params->end_time != 0.0) {
    } else if(params->flag_buffered != 0 && params->start_time != 0.0   &&   params->end_time != 0.0) {
	ret = -1;
	ret = -1;
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "<buffered> can not be used with options <start_time> and <end_time>.\n");
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "<buffered> can not be used with options <start_time> and <end_time>.\n");
    } else if( params->stc == -1
    } else if( params->stc == -1
	    && (params->max_tolerable_latency < DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM  ||
	    && (params->max_tolerable_latency < DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM  ||
		params->max_tolerable_latency > DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM)) {
		params->max_tolerable_latency > DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM)) {
+3 −3
Original line number Original line Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *	quintiliani@ingv.it
 *
 *
 * $Id: nmxptool_getoptlong.h,v 1.52 2008-01-14 08:06:42 mtheo Exp $
 * $Id: nmxptool_getoptlong.h,v 1.53 2008-01-17 08:14:44 mtheo Exp $
 *
 *
 */
 */


@@ -50,8 +50,8 @@


#define DEFAULT_BUFFERED_TIME			-1.0
#define DEFAULT_BUFFERED_TIME			-1.0


#define DEFAULT_INTERVAL_INFINITE		-1
#define DEFAULT_INTERVAL_NO_VALUE		-1
#define DEFAULT_INTERVAL			0
#define DEFAULT_INTERVAL_INFINITE		0


/*! \brief Struct that stores information about parameter of the program */
/*! \brief Struct that stores information about parameter of the program */
typedef struct {
typedef struct {