nmxptool.c 35.4 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.110 2008-01-17 11:00:33 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
15
#include "config.h"

16
17
18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
21

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
22

Matteo Quintiliani's avatar
Matteo Quintiliani committed
23
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24
25
26
#include <signal.h>
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
27
28
29
30
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

31
#include "nmxptool_getoptlong.h"
32
33

#ifdef HAVE_EARTHWORMOBJS
34
#include "nmxptool_ew.h"
35
#endif
36

37
38
39
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
40

41
42
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
43
44
#endif

45
46
#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )

47
48
49
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

Matteo Quintiliani's avatar
Matteo Quintiliani committed
50
#define GAP_TOLLERANCE 0.001
51

52
53
54
typedef struct {
    int significant;
    double last_time;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
55
    time_t last_time_call_raw_stream;
56
    int32_t x_1;
57
    double after_start_time;
58
    NMXP_RAW_STREAM_DATA raw_stream_buffer;
59
60
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
61
62

#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
63
64
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
66

Matteo Quintiliani's avatar
Matteo Quintiliani committed
67
static void save_channel_states();
68
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
69
70
static void flushing_raw_data_stream();

71
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
72
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
73
74
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
75
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
76
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
77
78
#endif

79
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
81

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
82
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
84


85
86
87
88
89
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
90
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
91
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
92
93
94
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

95
96
97
98
99
100
101
102

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


103
int main (int argc, char **argv) {
104
    int32_t connection_time;
105
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
107
    int to_cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
108
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
109
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
111

Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
114
    int time_to_sleep = 0;

115
116
    char str_start_time[200];
    char str_end_time[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
117
    char str_pd_time[200];
118
119
120

    NMXP_MSG_SERVER type;
    void *buffer;
121
    int32_t length;
122
123
    int ret;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
124
125
    int recv_errno = 0;

126
    char filename[500];
127
    char station_code[20], channel_code[20], network_code[20];
128

Matteo Quintiliani's avatar
Matteo Quintiliani committed
129
    char cur_after_start_time_str[1024];
130
131
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
132

133
134
    int times_flow = 0;
    double default_start_time = 0.0;
135
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
136

137
138
139
140
141
142
143
    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
144
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
145
146
147
148
149
150
151
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
152

Matteo Quintiliani's avatar
Matteo Quintiliani committed
153
154
155
156
157
158
159
160
161
162
163
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif

    /* Default is normal output */
164
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
165
166
167
168
169
170

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

171
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
172
173

#ifdef HAVE_EARTHWORMOBJS
174
175
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
176
	nmxptool_ew_configure(argv, &params);
177

Matteo Quintiliani's avatar
Matteo Quintiliani committed
178
179
180
181
182
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
#endif
183

184
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
185

Matteo Quintiliani's avatar
Matteo Quintiliani committed
186
187
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

188
189
190
191
192
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

193
194
	/* List available channels on server */
	if(params.flag_listchannels) {
195

196
197
	    nmxp_meta_chan_print(nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList));

198
	    return 1;
199

200
201
202
203
	} else if(params.flag_listchannelsnaqs) {

	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	    nmxp_chan_print_channelList(channelList);
204
	    return 1;
205

206
207
	}
    }
208

209
210
211
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);

212
213
    nmxptool_log_params(&params);

214
    /* Get list of available channels and get a subset list of params.channels */
215
    if( DAP_CONDITION(params) ) {
216
217
218
219
220
221
222
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
	/* From NaqsServer */
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    }

223
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
224
225
226

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
227
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
228
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
229
    } else {
230
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
231

232
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
233
234
235
236
237
238

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
239
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
240
	    channelListSeq[i_chan].x_1 = 0;
241
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
242
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
243
244
	}

245
246
247
248
	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
249
#ifdef HAVE_LIBMSEED
250
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
251
252

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
253
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
254

255
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);
256
257
258
259

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
260
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
261

262
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n", NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
263

264
		strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
265
266
267
268
269
270
271
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
272
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels %s error in format!\n");
273
274
275
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
276
277
	}
#endif
278

279
280
281
282
283
284
285
286
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

287
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
288

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    times_flow = 0;

    while(times_flow < 2) {

	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	    if(params.statefile) {
		load_channel_states(channelList_subset, channelListSeq);
	    }

	}

308
    /* TODO condition starting DAP or PDS */
309
310
311
    if( DAP_CONDITION(params) || (times_flow == 0  &&  params.statefile  && params.interval == DEFAULT_INTERVAL_INFINITE) ) {

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
312

313
314
315
316
317
318
319
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
320
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
321
322
323
	    params.end_time = params.start_time + span_interval;
	}

324

325
326
327
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
328

329
330
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
331
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
332
333
	    return 1;
	}
334

335
336
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
337
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
338
339
	    return 1;
	}
340

341
342
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
343
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
344
345
346
347
348
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
349
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
350
351
	    return 1;
	}
352

353
354
	exitdapcondition = 1;

355
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - 10.0;
356

357
	while(exitdapcondition) {
358

359
360
361
	    /* Start loop for sending requests */
	    i_chan=0;
	    request_SOCKET_OK = NMXP_SOCKET_OK;
362

363
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
364

365
366
367
		if(params.statefile) {
		    if(channelListSeq[i_chan].after_start_time > 0) {
			params.start_time = channelListSeq[i_chan].after_start_time;
368
369
370
371
372
373
			if(params.end_time - params.start_time > params.max_time_to_retrieve) {
			    nmxp_data_to_str(start_time_str, params.start_time);
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_time_to_retrieve);
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "start_time changed from %s to %s\n", start_time_str, default_start_time_str);
			    params.start_time = params.end_time - params.max_time_to_retrieve;
			}
374
375
376
377
378
379
380
381
382
383
		    } else {
			params.start_time = default_start_time;
		    }
		}

		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %s - end_time = %s - (default_start_time = %s)\n", start_time_str, end_time_str, default_start_time_str);

384
385
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
386

387
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
388

389
390
391
392
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
393

394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
		    if(params.flag_writefile) {
			/* Open output file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
409

410
411
412
413
			outfile = fopen(filename, "w");
			if(!outfile) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", filename);
			}
414
		    }
415
416

#ifdef HAVE_LIBMSEED
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
		    if(params.flag_writeseed) {
			/* Open output Mini-SEED file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
432

433
434
435
436
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
			}
437
438
		    }
#endif
439

440
441
442
443
444
445
446
447
448
449
450
451
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
452

453
		    }
454

455
456
457
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
458

459
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
460

461
462
463
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
464

465
466
467
468
			/* Log contents of last packet */
			if(params.flag_logdata) {
			    nmxp_data_log(pd);
			}
469

470
471
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
472

473
474
475
476
477
478
479
480
481
482
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
483
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
484
			}
485
486
487
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}
488

489
#ifdef HAVE_LIBMSEED
490
491
492
493
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
494
#endif
495

496
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
497
498
499
500
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
501
502
#endif

503
504
505
506
507
508
509
510
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
511
			}
512

513
514
515
516
517
518
519
520
521
			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
			    free(pd->buffer);
			    pd->buffer = NULL;
			}
522

523
524
525
526
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
			nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
		    }
527

528
529
530
531
532
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
533

534
#ifdef HAVE_LIBMSEED
535
536
537
538
539
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
540
#endif
541

542
543
		}
		i_chan++;
544
	    }
545
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
546

547
548
549
550
551
552
553
554
555
556
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
557
	    } else {
558
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
559
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
560

561

562
	} /* END while(exitdapcondition) */
563

564
565
566
567
568
569
570
571
572
573
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

574
	save_channel_states();
575

576
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
577

Matteo Quintiliani's avatar
Matteo Quintiliani committed
578
    } else {
579

580
581
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

582
583
	if(params.stc == -1) {

584

585
586
587
588
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

589
590
591
592
593
594
595
596
597
598
599
600
601
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
602
603
604
605
606
607
608

#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {
		p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	    }
#endif

609
	}
610

611
612
613
614
615
616
617
618
619
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
620
621
	}

622
623
624
625
626
627
628
629
630
631
632
633
634
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
635
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
636
637
638


	/* PDS Step 4: Send a Request Pending (optional) */
639
640


641
642
643
644
645
646
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
647
648
649
650
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
651
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
652
653
654

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
655
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
656
	    } else {
657
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
658
659
660
661
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
662
663
664
	// TODO
	exitpdscondition = 1;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
665
666
667
668
669
670
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_attach();
	}
#endif

671
	skip_current_packet = 0;
672

Matteo Quintiliani's avatar
Matteo Quintiliani committed
673
	while(exitpdscondition) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
674

675
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
676
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
677

678
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
679
680
681
		// TODO
		exitpdscondition = 1;
	    } else {
682
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
683
684
685
686
687

#ifdef HAVE_EARTHWORMOBJS
		nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
688
689
		exitpdscondition = 0;
	    }
690

691
692
693
694
695
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
696
	    /* Log contents of last packet */
697
698
699
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
700

701
	    skip_current_packet = 0;
702
	    if(pd &&
703
		    (params.statefile  ||  params.buffered_time)
704
705
706
707
708
709
710
711
712
713
714
715
716
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
717
718
719
720
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
721
			    pd->time = cur_after_start_time;
722
723
724
725
726
727
728
729
730
731
732
733
734
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

	    if(!skip_current_packet) {

735
736
		/* Manage Raw Stream */
		if(params.stc == -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
737

738
739
740
741
742
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
743

744
745
746
747
748
749
750
751
752
753
754
755
756
757
		    /* Check timeout for other channels */
		    if(params.timeoutrecv > 0) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
758
759
			}
		    }
760
761

		} else {
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
778
			}
779

Matteo Quintiliani's avatar
Matteo Quintiliani committed
780
781

#ifdef HAVE_LIBMSEED
782
783
784
785
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
786
787
#endif

788
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
789
790
791
792
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
793
#endif
794
795
		    }
		}
796
	    } /* End skip_current_packet condition */
797

Matteo Quintiliani's avatar
Matteo Quintiliani committed
798
	    if(pd) {
799
800
801
802
803
804
805
806
807
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
		    free(pd->buffer);
		    pd->buffer = NULL;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
808
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
809

Matteo Quintiliani's avatar
Matteo Quintiliani committed
810
811
812
813
814
815
#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {

		/* Check if we are being asked to terminate */
		if( nmxptool_ew_check_flag_terminate() ) {
		    logit ("t", "nmxptool terminating on request\n");
816
		    nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
817
818
819
820
821
822
823
824
825
		    exitpdscondition = 0;
		}

		/* Check if we need to send heartbeat message */
		nmxptool_ew_send_heartbeat_if_needed();

	    }
#endif

826
	} /* End main PDS loop */
827

Matteo Quintiliani's avatar
Matteo Quintiliani committed
828
829
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
830

Matteo Quintiliani's avatar
Matteo Quintiliani committed
831
	save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
832

Matteo Quintiliani's avatar
Matteo Quintiliani committed
833
834
835
836
837
838
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_detach();
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
839
840
841
842
843
844
845
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

846
847
848
849
850
851
852
853
854
855
856

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

857
858
859
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End PDS Flow.\n");

    }
860

861
862
863
864
865
    if(params.interval == DEFAULT_INTERVAL_INFINITE) {
	times_flow++;
    } else {
	times_flow = 100;
    }
866

867
    }
868

869
#ifdef HAVE_LIBMSEED
870
871
872
873
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
874
875
	    }
	}
876
    }
877
878
#endif

879
880
881
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
    }
882

883
884
885
    if(channelListSeq) {
	free(channelListSeq);
    }
886

887
888
889
890
    /* This has to be tha last */
    if(channelList_subset) {
	free(channelList_subset);
    }
891

892
    return 0;
893
} /* End MAIN */
894
895


896
897
898
#define MAX_LEN_FILENAME 4096
#define NMXP_STR_STATE_EXT ".nmxpstate"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
899
static void save_channel_states() {
900
901
902
903
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
904
    FILE *fstatefile = NULL;
905
    char statefilefilename[MAX_LEN_FILENAME] = "";
906

907
    if(params.statefile) {
908
909
910
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "w");
911
	if(fstatefile == NULL) {
912
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
913
	} else {
914
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Writing channel states into %s!\n", statefilefilename);
915
	}
916

917
918
919
920
921
922
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
923
	    sprintf(state_line_str, "%s %s %s",
924
		    channelList_subset->channel[to_cur_chan].name,
925
926
		    last_time_str,
		    raw_last_sample_time_str
927
928
929
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
930
		fprintf(fstatefile, "%s\n", state_line_str);
931
932
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
933
934
		} else {
		    /* Do nothing */
935
		}
936
937
938
	    }
	    to_cur_chan++;
	}
939
	if(fstatefile) {
940
	    fclose(fstatefile);
941
942
943
944
	}
    }
}

945
946
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
947
    FILE *fstatefileINPUT = NULL;
948
949
950
951
952
953
954
955
956
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
    char statefilefilename[MAX_LEN_FILENAME] = "";

    if(params.statefile) {
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
	if(fstatefile == NULL) {
	    fstatefileINPUT = fopen(params.statefile, "r");
	    if(fstatefileINPUT == NULL) {
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", params.statefile);
	    } else {
		fstatefile = fopen(statefilefilename, "w");
		if(fstatefile == NULL) {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
		} else {
		    while(fgets(line, MAXSIZE_LINE, fstatefileINPUT) != NULL) {
			fputs(line, fstatefile);
		    }
		    fclose(fstatefile);
		}
		fclose(fstatefileINPUT);
	    }
	}
    }
981
982

    if(params.statefile) {
983
984
985
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
986
	if(fstatefile == NULL) {
987
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", statefilefilename);
988
	} else {
989
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", statefilefilename);
990
991
992
993
994
995
996
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
997
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
1030
    errno = 0;
1031
1032
}

1033

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1034
1035
1036
1037
1038
1039
1040
static void flushing_raw_data_stream() {
    int to_cur_chan;

    /* Flush raw data stream for each channel */
    if(params.stc == -1) {
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
1041
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1042
1043
1044
1045
1046
1047
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
}
1048

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1049
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1050
1051
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
1052

1053
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Program interrupted!\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1054

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1055
    flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1056
    save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1057

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1058
1059
1060
1061
1062
    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

1063
#ifdef HAVE_EARTHWORMOBJS
1064
1065
1066
    if(params.ew_configuration_file) {
	nmxptool_ew_detach();
    }
1067
1068
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1069
1070
1071
1072
1073
1074
1075
#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

1076

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1077
1078
1079
1080
1081
1082
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

1083

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1084
1085