nmxptool.c 35 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.109 2008-01-17 08:15:00 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
15
#include "config.h"

16
17
18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
21

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
22

Matteo Quintiliani's avatar
Matteo Quintiliani committed
23
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24
25
26
#include <signal.h>
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
27
28
29
30
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

31
#include "nmxptool_getoptlong.h"
32
33

#ifdef HAVE_EARTHWORMOBJS
34
#include "nmxptool_ew.h"
35
#endif
36

37
38
39
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
40

41
42
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
43
44
#endif

45
46
#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )

47
48
49
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

Matteo Quintiliani's avatar
Matteo Quintiliani committed
50
#define GAP_TOLLERANCE 0.001
51

52
53
54
typedef struct {
    int significant;
    double last_time;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
55
    time_t last_time_call_raw_stream;
56
    int32_t x_1;
57
    double after_start_time;
58
    NMXP_RAW_STREAM_DATA raw_stream_buffer;
59
60
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
61
62

#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
63
64
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
66

Matteo Quintiliani's avatar
Matteo Quintiliani committed
67
static void save_channel_states();
68
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
69
70
static void flushing_raw_data_stream();

71
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
72
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
73
74
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
75
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
76
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
77
78
#endif

79
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
81

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
82
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
84


85
86
87
88
89
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
90
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
91
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
92
93
94
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

95
96
97
98
99
100
101
102

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


103
int main (int argc, char **argv) {
104
    int32_t connection_time;
105
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
107
    int to_cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
108
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
109
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
111

Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
114
    int time_to_sleep = 0;

115
116
    char str_start_time[200];
    char str_end_time[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
117
    char str_pd_time[200];
118
119
120

    NMXP_MSG_SERVER type;
    void *buffer;
121
    int32_t length;
122
123
    int ret;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
124
125
    int recv_errno = 0;

126
    char filename[500];
127
    char station_code[20], channel_code[20], network_code[20];
128

Matteo Quintiliani's avatar
Matteo Quintiliani committed
129
    char cur_after_start_time_str[1024];
130
131
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
132

133
134
    int times_flow = 0;
    double default_start_time = 0.0;
135

136
137
138
139
140
141
142
    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
143
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
144
145
146
147
148
149
150
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
151

Matteo Quintiliani's avatar
Matteo Quintiliani committed
152
153
154
155
156
157
158
159
160
161
162
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif

    /* Default is normal output */
163
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
164
165
166
167
168
169

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

170
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
171
172

#ifdef HAVE_EARTHWORMOBJS
173
174
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
175
	nmxptool_ew_configure(argv, &params);
176

Matteo Quintiliani's avatar
Matteo Quintiliani committed
177
178
179
180
181
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
#endif
182

183
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
184

Matteo Quintiliani's avatar
Matteo Quintiliani committed
185
186
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

187
188
189
190
191
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

192
193
	/* List available channels on server */
	if(params.flag_listchannels) {
194

195
196
	    nmxp_meta_chan_print(nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList));

197
	    return 1;
198

199
200
201
202
	} else if(params.flag_listchannelsnaqs) {

	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	    nmxp_chan_print_channelList(channelList);
203
	    return 1;
204

205
206
	}
    }
207

208
209
210
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);

211
212
    nmxptool_log_params(&params);

213
    /* Get list of available channels and get a subset list of params.channels */
214
    if( DAP_CONDITION(params) ) {
215
216
217
218
219
220
221
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
	/* From NaqsServer */
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    }

222
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
223
224
225

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
226
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
227
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
228
    } else {
229
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
230

231
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
232
233
234
235
236
237

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
238
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
239
	    channelListSeq[i_chan].x_1 = 0;
240
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
241
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
242
243
	}

244
245
246
247
	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
#ifdef HAVE_LIBMSEED
249
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
250
251

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
252
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
253

254
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);
255
256
257
258

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
259
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
260

261
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n", NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
262

263
		strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
264
265
266
267
268
269
270
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
271
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels %s error in format!\n");
272
273
274
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
275
276
	}
#endif
277

278
279
280
281
282
283
284
285
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

286
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
287

288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
    times_flow = 0;

    while(times_flow < 2) {

	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	    if(params.statefile) {
		load_channel_states(channelList_subset, channelListSeq);
	    }

	}

307
    /* TODO condition starting DAP or PDS */
308
309
310
    if( DAP_CONDITION(params) || (times_flow == 0  &&  params.statefile  && params.interval == DEFAULT_INTERVAL_INFINITE) ) {

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
311

312
313
314
315
316
317
318
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
319
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
320
321
322
	    params.end_time = params.start_time + span_interval;
	}

323

324
325
326
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
327

328
329
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
330
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
331
332
	    return 1;
	}
333

334
335
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
336
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
337
338
	    return 1;
	}
339

340
341
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
342
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
343
344
345
346
347
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
348
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
349
350
	    return 1;
	}
351

352
353
	exitdapcondition = 1;

354
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - 10.0;
355

356
	while(exitdapcondition) {
357

358
359
360
	    /* Start loop for sending requests */
	    i_chan=0;
	    request_SOCKET_OK = NMXP_SOCKET_OK;
361

362
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
363

364
365
366
367
368
369
370
371
372
373
374
375
376
377
		if(params.statefile) {
		    if(channelListSeq[i_chan].after_start_time > 0) {
			params.start_time = channelListSeq[i_chan].after_start_time;
		    } else {
			params.start_time = default_start_time;
		    }
		}

		char start_time_str[30], end_time_str[30], default_start_time_str[30];
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %s - end_time = %s - (default_start_time = %s)\n", start_time_str, end_time_str, default_start_time_str);

378
379
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
380

381
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
382

383
384
385
386
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
387

388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
		    if(params.flag_writefile) {
			/* Open output file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
403

404
405
406
407
			outfile = fopen(filename, "w");
			if(!outfile) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", filename);
			}
408
		    }
409
410

#ifdef HAVE_LIBMSEED
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
		    if(params.flag_writeseed) {
			/* Open output Mini-SEED file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
426

427
428
429
430
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
			}
431
432
		    }
#endif
433

434
435
436
437
438
439
440
441
442
443
444
445
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
446

447
		    }
448

449
450
451
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
452

453
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
454

455
456
457
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
458

459
460
461
462
			/* Log contents of last packet */
			if(params.flag_logdata) {
			    nmxp_data_log(pd);
			}
463

464
465
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
466

467
468
469
470
471
472
473
474
475
476
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
477
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
478
			}
479
480
481
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}
482

483
#ifdef HAVE_LIBMSEED
484
485
486
487
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
488
#endif
489

490
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
491
492
493
494
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
495
496
#endif

497
498
499
500
501
502
503
504
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
505
			}
506

507
508
509
510
511
512
513
514
515
			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
			    free(pd->buffer);
			    pd->buffer = NULL;
			}
516

517
518
519
520
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
			nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
		    }
521

522
523
524
525
526
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
527

528
#ifdef HAVE_LIBMSEED
529
530
531
532
533
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
534
#endif
535

536
537
		}
		i_chan++;
538
	    }
539
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
540

541
542
543
544
545
546
547
548
549
550
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
551
	    } else {
552
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
553
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
554

555

556
	} /* END while(exitdapcondition) */
557

558
559
560
561
562
563
564
565
566
567
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

568
	save_channel_states();
569

570
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
571

Matteo Quintiliani's avatar
Matteo Quintiliani committed
572
    } else {
573

574
575
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

576
577
	if(params.stc == -1) {

578

579
580
581
582
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

583
584
585
586
587
588
589
590
591
592
593
594
595
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
596
597
598
599
600
601
602

#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {
		p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	    }
#endif

603
	}
604

605
606
607
608
609
610
611
612
613
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
614
615
	}

616
617
618
619
620
621
622
623
624
625
626
627
628
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
629
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
630
631
632


	/* PDS Step 4: Send a Request Pending (optional) */
633
634


635
636
637
638
639
640
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
641
642
643
644
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
645
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
646
647
648

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
649
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
650
	    } else {
651
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
652
653
654
655
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
656
657
658
	// TODO
	exitpdscondition = 1;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
659
660
661
662
663
664
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_attach();
	}
#endif

665
	skip_current_packet = 0;
666

Matteo Quintiliani's avatar
Matteo Quintiliani committed
667
	while(exitpdscondition) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
668

669
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
670
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
671

672
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
673
674
675
		// TODO
		exitpdscondition = 1;
	    } else {
676
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
677
678
679
680
681

#ifdef HAVE_EARTHWORMOBJS
		nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
682
683
		exitpdscondition = 0;
	    }
684

685
686
687
688
689
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
690
	    /* Log contents of last packet */
691
692
693
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
694

695
	    skip_current_packet = 0;
696
	    if(pd &&
697
		    (params.statefile  ||  params.buffered_time)
698
699
700
701
702
703
704
705
706
707
708
709
710
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
711
712
713
714
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
715
			    pd->time = cur_after_start_time;
716
717
718
719
720
721
722
723
724
725
726
727
728
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

	    if(!skip_current_packet) {

729
730
		/* Manage Raw Stream */
		if(params.stc == -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
731

732
733
734
735
736
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
737

738
739
740
741
742
743
744
745
746
747
748
749
750
751
		    /* Check timeout for other channels */
		    if(params.timeoutrecv > 0) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
752
753
			}
		    }
754
755

		} else {
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
772
			}
773

Matteo Quintiliani's avatar
Matteo Quintiliani committed
774
775

#ifdef HAVE_LIBMSEED
776
777
778
779
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
780
781
#endif

782
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
783
784
785
786
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
787
#endif
788
789
		    }
		}
790
	    } /* End skip_current_packet condition */
791

Matteo Quintiliani's avatar
Matteo Quintiliani committed
792
	    if(pd) {
793
794
795
796
797
798
799
800
801
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
		    free(pd->buffer);
		    pd->buffer = NULL;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
802
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
803

Matteo Quintiliani's avatar
Matteo Quintiliani committed
804
805
806
807
808
809
#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {

		/* Check if we are being asked to terminate */
		if( nmxptool_ew_check_flag_terminate() ) {
		    logit ("t", "nmxptool terminating on request\n");
810
		    nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
811
812
813
814
815
816
817
818
819
		    exitpdscondition = 0;
		}

		/* Check if we need to send heartbeat message */
		nmxptool_ew_send_heartbeat_if_needed();

	    }
#endif

820
	} /* End main PDS loop */
821

Matteo Quintiliani's avatar
Matteo Quintiliani committed
822
823
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
824

Matteo Quintiliani's avatar
Matteo Quintiliani committed
825
	save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
826

Matteo Quintiliani's avatar
Matteo Quintiliani committed
827
828
829
830
831
832
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_detach();
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
833
834
835
836
837
838
839
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

840
841
842
843
844
845
846
847
848
849
850

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

851
852
853
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End PDS Flow.\n");

    }
854

855
856
857
858
859
    if(params.interval == DEFAULT_INTERVAL_INFINITE) {
	times_flow++;
    } else {
	times_flow = 100;
    }
860

861
    }
862

863
#ifdef HAVE_LIBMSEED
864
865
866
867
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
868
869
	    }
	}
870
    }
871
872
#endif

873
874
875
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
    }
876

877
878
879
    if(channelListSeq) {
	free(channelListSeq);
    }
880

881
882
883
884
    /* This has to be tha last */
    if(channelList_subset) {
	free(channelList_subset);
    }
885

886
    return 0;
887
} /* End MAIN */
888
889


890
891
892
#define MAX_LEN_FILENAME 4096
#define NMXP_STR_STATE_EXT ".nmxpstate"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
893
static void save_channel_states() {
894
895
896
897
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
898
    FILE *fstatefile = NULL;
899
    char statefilefilename[MAX_LEN_FILENAME] = "";
900

901
    if(params.statefile) {
902
903
904
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "w");
905
	if(fstatefile == NULL) {
906
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
907
	} else {
908
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Writing channel states into %s!\n", statefilefilename);
909
	}
910

911
912
913
914
915
916
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
917
	    sprintf(state_line_str, "%s %s %s",
918
		    channelList_subset->channel[to_cur_chan].name,
919
920
		    last_time_str,
		    raw_last_sample_time_str
921
922
923
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
924
		fprintf(fstatefile, "%s\n", state_line_str);
925
926
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
927
928
		} else {
		    /* Do nothing */
929
		}
930
931
932
	    }
	    to_cur_chan++;
	}
933
	if(fstatefile) {
934
	    fclose(fstatefile);
935
936
937
938
	}
    }
}

939
940
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
941
    FILE *fstatefileINPUT = NULL;
942
943
944
945
946
947
948
949
950
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
    char statefilefilename[MAX_LEN_FILENAME] = "";

    if(params.statefile) {
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
	if(fstatefile == NULL) {
	    fstatefileINPUT = fopen(params.statefile, "r");
	    if(fstatefileINPUT == NULL) {
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", params.statefile);
	    } else {
		fstatefile = fopen(statefilefilename, "w");
		if(fstatefile == NULL) {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
		} else {
		    while(fgets(line, MAXSIZE_LINE, fstatefileINPUT) != NULL) {
			fputs(line, fstatefile);
		    }
		    fclose(fstatefile);
		}
		fclose(fstatefileINPUT);
	    }
	}
    }
975
976

    if(params.statefile) {
977
978
979
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
980
	if(fstatefile == NULL) {
981
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", statefilefilename);
982
	} else {
983
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", statefilefilename);
984
985
986
987
988
989
990
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
991
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
1024
    errno = 0;
1025
1026
}

1027

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1028
1029
1030
1031
1032
1033
1034
static void flushing_raw_data_stream() {
    int to_cur_chan;

    /* Flush raw data stream for each channel */
    if(params.stc == -1) {
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
1035
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1036
1037
1038
1039
1040
1041
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
}
1042

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1043
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1044
1045
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
1046

1047
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Program interrupted!\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1048

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1049
    flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1050
    save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1051

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1052
1053
1054
1055
1056
    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

1057
#ifdef HAVE_EARTHWORMOBJS
1058
1059
1060
    if(params.ew_configuration_file) {
	nmxptool_ew_detach();
    }
1061
1062
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1063
1064
1065
1066
1067
1068
1069
#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

1070

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1071
1072
1073
1074
1075
1076
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

1077

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1078
1079
1080

    /* Free the complete channel list */
    if(channelList) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1081
	free(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1082
	channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1083
    }
1084