nmxptool.c 35.5 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
10
 * $Id: nmxptool.c,v 1.113 2008-01-17 12:45:09 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
15
#include "config.h"

16
17
18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
21

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
22

Matteo Quintiliani's avatar
Matteo Quintiliani committed
23
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24
25
26
#include <signal.h>
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
27
28
29
30
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

31
#include "nmxptool_getoptlong.h"
32
33

#ifdef HAVE_EARTHWORMOBJS
34
#include "nmxptool_ew.h"
35
#endif
36

37
38
39
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
40

41
42
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
43
44
#endif

45
46
#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )

47
48
49
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

Matteo Quintiliani's avatar
Matteo Quintiliani committed
50
#define GAP_TOLLERANCE 0.001
51

52
53
54
typedef struct {
    int significant;
    double last_time;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
55
    time_t last_time_call_raw_stream;
56
    int32_t x_1;
57
    double after_start_time;
58
    NMXP_RAW_STREAM_DATA raw_stream_buffer;
59
60
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
61
62

#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
63
64
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
66

Matteo Quintiliani's avatar
Matteo Quintiliani committed
67
static void save_channel_states();
68
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
69
70
static void flushing_raw_data_stream();

71
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
72
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
73
74
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
75
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
76
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
77
78
#endif

79
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
81

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
82
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
84


85
86
87
88
89
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
90
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
91
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
92
93
94
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

95
96
97
98
99
100
101
102

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


103
int main (int argc, char **argv) {
104
    int32_t connection_time;
105
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
107
    int to_cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
108
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
109
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
111

Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
114
    int time_to_sleep = 0;

115
116
    char str_start_time[200];
    char str_end_time[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
117
    char str_pd_time[200];
118
119
120

    NMXP_MSG_SERVER type;
    void *buffer;
121
    int32_t length;
122
123
    int ret;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
124
125
    int recv_errno = 0;

126
    char filename[500];
127
    char station_code[20], channel_code[20], network_code[20];
128

Matteo Quintiliani's avatar
Matteo Quintiliani committed
129
    char cur_after_start_time_str[1024];
130
131
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
132

133
134
    int times_flow = 0;
    double default_start_time = 0.0;
135
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
136

137
138
139
140
141
142
143
    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
144
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
145
146
147
148
149
150
151
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
152

Matteo Quintiliani's avatar
Matteo Quintiliani committed
153
154
155
156
157
158
159
160
161
162
163
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif

    /* Default is normal output */
164
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
165
166
167
168
169
170

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

171
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
172
173

#ifdef HAVE_EARTHWORMOBJS
174
175
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
176
	nmxptool_ew_configure(argv, &params);
177

Matteo Quintiliani's avatar
Matteo Quintiliani committed
178
179
180
181
182
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
#endif
183

184
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
185

Matteo Quintiliani's avatar
Matteo Quintiliani committed
186
187
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

188
189
190
191
192
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

193
194
	/* List available channels on server */
	if(params.flag_listchannels) {
195

196
197
	    nmxp_meta_chan_print(nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList));

198
	    return 1;
199

200
201
202
203
	} else if(params.flag_listchannelsnaqs) {

	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	    nmxp_chan_print_channelList(channelList);
204
	    return 1;
205

206
207
	}
    }
208

209
210
211
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);

212
213
    nmxptool_log_params(&params);

214
    /* Get list of available channels and get a subset list of params.channels */
215
    if( DAP_CONDITION(params) ) {
216
217
218
219
220
221
222
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
	/* From NaqsServer */
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    }

223
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
224
225
226

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
227
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
228
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
229
    } else {
230
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
231

232
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
233
234
235
236
237
238

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
239
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
240
	    channelListSeq[i_chan].x_1 = 0;
241
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
242
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
243
244
	}

245
246
247
248
	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
249
#ifdef HAVE_LIBMSEED
250
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
251
252

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
253
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
254

255
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);
256
257
258
259

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
260
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
261

262
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n", NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
263

264
		strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
265
266
267
268
269
270
271
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
272
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels %s error in format!\n");
273
274
275
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
276
277
	}
#endif
278

279
280
281
282
283
284
285
286
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

287
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
288

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    times_flow = 0;

    while(times_flow < 2) {

	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	    if(params.statefile) {
		load_channel_states(channelList_subset, channelListSeq);
	    }

	}

308
    /* TODO condition starting DAP or PDS */
309
310
311
    if( DAP_CONDITION(params) || (times_flow == 0  &&  params.statefile  && params.interval == DEFAULT_INTERVAL_INFINITE) ) {

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
312

313
314
315
316
317
318
319
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
320
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
321
322
323
	    params.end_time = params.start_time + span_interval;
	}

324

325
326
327
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
328

329
330
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
331
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
332
333
	    return 1;
	}
334

335
336
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
337
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
338
339
	    return 1;
	}
340

341
342
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
343
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
344
345
346
347
348
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
349
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
350
351
	    return 1;
	}
352

353
354
	exitdapcondition = 1;

355
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - 10.0;
356

357
	while(exitdapcondition) {
358

359
360
361
	    /* Start loop for sending requests */
	    i_chan=0;
	    request_SOCKET_OK = NMXP_SOCKET_OK;
362

363
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
364

365
366
367
		if(params.statefile) {
		    if(channelListSeq[i_chan].after_start_time > 0) {
			params.start_time = channelListSeq[i_chan].after_start_time;
368
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
369
			    nmxp_data_to_str(start_time_str, params.start_time);
370
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
371
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "start_time changed from %s to %s\n", start_time_str, default_start_time_str);
372
			    params.start_time = params.end_time - params.max_data_to_retrieve;
373
			}
374
375
376
377
378
		    } else {
			params.start_time = default_start_time;
		    }
		}

379
380
		channelListSeq[i_chan].last_time = params.start_time;

381
382
383
384
385
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %s - end_time = %s - (default_start_time = %s)\n", start_time_str, end_time_str, default_start_time_str);

386
387
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
388

389
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
390

391
392
393
394
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
395

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
		    if(params.flag_writefile) {
			/* Open output file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
411

412
413
414
415
			outfile = fopen(filename, "w");
			if(!outfile) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", filename);
			}
416
		    }
417
418

#ifdef HAVE_LIBMSEED
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
		    if(params.flag_writeseed) {
			/* Open output Mini-SEED file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
434

435
436
437
438
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
			}
439
440
		    }
#endif
441

442
443
444
445
446
447
448
449
450
451
452
453
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
454

455
		    }
456

457
458
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
459
		    /* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
460

461
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
462

463
464
465
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
466

467
468
469
470
			/* Log contents of last packet */
			if(params.flag_logdata) {
			    nmxp_data_log(pd);
			}
471

472
473
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
474

475
476
477
478
479
480
481
482
483
484
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
485
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
486
			}
487
488
489
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}
490

491
#ifdef HAVE_LIBMSEED
492
493
494
495
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
496
#endif
497

498
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
499
500
501
502
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
503
504
#endif

505
506
507
508
509
510
511
512
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
513
			}
514

515
516
517
518
519
520
521
522
523
			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
			    free(pd->buffer);
			    pd->buffer = NULL;
			}
524

525
526
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
			/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
528
		    }
529

530
531
532
533
534
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
535

536
#ifdef HAVE_LIBMSEED
537
538
539
540
541
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
542
#endif
543

544
545
		}
		i_chan++;
546
	    }
547
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
548

549
550
551
552
553
554
555
556
557
558
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
559
	    } else {
560
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
561
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
562

563

564
	} /* END while(exitdapcondition) */
565

566
567
568
569
570
571
572
573
574
575
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

576
	save_channel_states();
577

578
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
579

Matteo Quintiliani's avatar
Matteo Quintiliani committed
580
    } else {
581

582
583
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

584
585
	if(params.stc == -1) {

586

587
588
589
590
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

591
592
593
594
595
596
597
598
599
600
601
602
603
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
604
605
606
607
608
609
610

#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {
		p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	    }
#endif

611
	}
612

613
614
615
616
617
618
619
620
621
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
622
623
	}

624
625
626
627
628
629
630
631
632
633
634
635
636
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
637
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
638
639
640


	/* PDS Step 4: Send a Request Pending (optional) */
641
642


643
644
645
646
647
648
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
649
650
651
652
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
653
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
654
655
656

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
657
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
658
	    } else {
659
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
660
661
662
663
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
664
665
666
	// TODO
	exitpdscondition = 1;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
667
668
669
670
671
672
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_attach();
	}
#endif

673
	skip_current_packet = 0;
674

Matteo Quintiliani's avatar
Matteo Quintiliani committed
675
	while(exitpdscondition) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
676

677
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
678
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
679

680
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
681
682
683
		// TODO
		exitpdscondition = 1;
	    } else {
684
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
685
686
687
688
689

#ifdef HAVE_EARTHWORMOBJS
		nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
690
691
		exitpdscondition = 0;
	    }
692

693
694
695
696
697
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
698
	    /* Log contents of last packet */
699
700
701
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
702

703
	    skip_current_packet = 0;
704
	    if(pd &&
705
		    (params.statefile  ||  params.buffered_time)
706
707
708
709
710
711
712
713
714
715
716
717
718
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
719
720
721
722
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
723
			    pd->time = cur_after_start_time;
724
725
726
727
728
729
730
731
732
733
734
735
736
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

	    if(!skip_current_packet) {

737
738
		/* Manage Raw Stream */
		if(params.stc == -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
739

740
741
742
743
744
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
745

746
747
748
749
750
751
752
753
754
755
756
757
758
759
		    /* Check timeout for other channels */
		    if(params.timeoutrecv > 0) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
760
761
			}
		    }
762
763

		} else {
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
780
			}
781

Matteo Quintiliani's avatar
Matteo Quintiliani committed
782
783

#ifdef HAVE_LIBMSEED
784
785
786
787
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
788
789
#endif

790
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
791
792
793
794
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
795
#endif
796
797
		    }
		}
798
	    } /* End skip_current_packet condition */
799

Matteo Quintiliani's avatar
Matteo Quintiliani committed
800
	    if(pd) {
801
802
803
804
805
806
807
808
809
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
		    free(pd->buffer);
		    pd->buffer = NULL;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
810
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
811

Matteo Quintiliani's avatar
Matteo Quintiliani committed
812
813
814
815
816
817
#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {

		/* Check if we are being asked to terminate */
		if( nmxptool_ew_check_flag_terminate() ) {
		    logit ("t", "nmxptool terminating on request\n");
818
		    nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
819
820
821
822
823
824
825
826
827
		    exitpdscondition = 0;
		}

		/* Check if we need to send heartbeat message */
		nmxptool_ew_send_heartbeat_if_needed();

	    }
#endif

828
	} /* End main PDS loop */
829

Matteo Quintiliani's avatar
Matteo Quintiliani committed
830
831
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
832

Matteo Quintiliani's avatar
Matteo Quintiliani committed
833
	save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
834

Matteo Quintiliani's avatar
Matteo Quintiliani committed
835
836
837
838
839
840
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_detach();
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
841
842
843
844
845
846
847
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

848
849
850
851
852
853
854
855
856
857
858

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

859
860
861
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End PDS Flow.\n");

    }
862

863
864
865
866
867
    if(params.interval == DEFAULT_INTERVAL_INFINITE) {
	times_flow++;
    } else {
	times_flow = 100;
    }
868

869
    }
870

871
#ifdef HAVE_LIBMSEED
872
873
874
875
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
876
877
	    }
	}
878
    }
879
880
#endif

881
882
883
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
    }
884

885
886
887
    if(channelListSeq) {
	free(channelListSeq);
    }
888

889
890
891
892
    /* This has to be tha last */
    if(channelList_subset) {
	free(channelList_subset);
    }
893

894
    return 0;
895
} /* End MAIN */
896
897


898
899
900
#define MAX_LEN_FILENAME 4096
#define NMXP_STR_STATE_EXT ".nmxpstate"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
901
static void save_channel_states() {
902
903
904
905
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
906
    FILE *fstatefile = NULL;
907
    char statefilefilename[MAX_LEN_FILENAME] = "";
908

909
    if(params.statefile) {
910
911
912
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "w");
913
	if(fstatefile == NULL) {
914
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
915
	} else {
916
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Writing channel states into %s!\n", statefilefilename);
917
	}
918

919
920
921
922
923
924
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
925
	    sprintf(state_line_str, "%s %s %s",
926
		    channelList_subset->channel[to_cur_chan].name,
927
928
		    last_time_str,
		    raw_last_sample_time_str
929
930
931
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
932
		fprintf(fstatefile, "%s\n", state_line_str);
933
934
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
935
936
		} else {
		    /* Do nothing */
937
		}
938
939
940
	    }
	    to_cur_chan++;
	}
941
	if(fstatefile) {
942
	    fclose(fstatefile);
943
944
945
946
	}
    }
}

947
948
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
949
    FILE *fstatefileINPUT = NULL;
950
951
952
953
954
955
956
957
958
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
    char statefilefilename[MAX_LEN_FILENAME] = "";

    if(params.statefile) {
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
	if(fstatefile == NULL) {
	    fstatefileINPUT = fopen(params.statefile, "r");
	    if(fstatefileINPUT == NULL) {
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", params.statefile);
	    } else {
		fstatefile = fopen(statefilefilename, "w");
		if(fstatefile == NULL) {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
		} else {
		    while(fgets(line, MAXSIZE_LINE, fstatefileINPUT) != NULL) {
			fputs(line, fstatefile);
		    }
		    fclose(fstatefile);
		}
		fclose(fstatefileINPUT);
	    }
	}
    }
983
984

    if(params.statefile) {
985
986
987
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
988
	if(fstatefile == NULL) {
989
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", statefilefilename);
990
	} else {
991
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", statefilefilename);
992
993
994
995
996
997
998
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
999
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
1032
    errno = 0;
1033
1034
}

1035

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1036
1037
1038
1039
1040
1041
1042
static void flushing_raw_data_stream() {
    int to_cur_chan;

    /* Flush raw data stream for each channel */
    if(params.stc == -1) {
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
1043
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1044
1045
1046
1047
1048
1049
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
}
1050

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1051
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1052
1053
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
1054

1055
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Program interrupted!\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1056

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1057
    flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1058
    save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1059

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1060
1061
1062
1063
1064
    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

1065
#ifdef HAVE_EARTHWORMOBJS