nmxptool.c 33.3 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.108 2008-01-16 10:51:44 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
15
#include "config.h"

16
17
18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
21

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
22

Matteo Quintiliani's avatar
Matteo Quintiliani committed
23
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24
25
26
#include <signal.h>
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
27
28
29
30
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

31
#include "nmxptool_getoptlong.h"
32
33

#ifdef HAVE_EARTHWORMOBJS
34
#include "nmxptool_ew.h"
35
#endif
36

37
38
39
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
40

41
42
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
43
44
#endif

45
46
47
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

Matteo Quintiliani's avatar
Matteo Quintiliani committed
48
#define GAP_TOLLERANCE 0.001
49

50
51
52
typedef struct {
    int significant;
    double last_time;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
53
    time_t last_time_call_raw_stream;
54
    int32_t x_1;
55
    double after_start_time;
56
    NMXP_RAW_STREAM_DATA raw_stream_buffer;
57
58
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
59
60

#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
61
62
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
63
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
64

Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
static void save_channel_states();
66
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
67
68
static void flushing_raw_data_stream();

69
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
70
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
71
72
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
73
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
74
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
75
76
#endif

77
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
78
79

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
81
82


83
84
85
86
87
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
88
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
89
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
90
91
92
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

93
94
95
96
97
98
99
100

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


101
int main (int argc, char **argv) {
102
    int32_t connection_time;
103
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
104
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
105
    int to_cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
107
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
108
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
109

Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
111
112
    int time_to_sleep = 0;

113
114
    char str_start_time[200];
    char str_end_time[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
115
    char str_pd_time[200];
116
117
118

    NMXP_MSG_SERVER type;
    void *buffer;
119
    int32_t length;
120
121
    int ret;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
122
123
    int recv_errno = 0;

124
    char filename[500];
125
    char station_code[20], channel_code[20], network_code[20];
126

Matteo Quintiliani's avatar
Matteo Quintiliani committed
127
    char cur_after_start_time_str[1024];
128
129
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
130

131

132
133
134
135
136
137
138
    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
139
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
140
141
142
143
144
145
146
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
147

Matteo Quintiliani's avatar
Matteo Quintiliani committed
148
149
150
151
152
153
154
155
156
157
158
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif

    /* Default is normal output */
159
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
160
161
162
163
164
165

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

166
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
167
168

#ifdef HAVE_EARTHWORMOBJS
169
170
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
171
	nmxptool_ew_configure(argv, &params);
172

Matteo Quintiliani's avatar
Matteo Quintiliani committed
173
174
175
176
177
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
#endif
178

179
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
180

Matteo Quintiliani's avatar
Matteo Quintiliani committed
181
182
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

183
184
185
186
187
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

188
189
	/* List available channels on server */
	if(params.flag_listchannels) {
190

191
192
	    nmxp_meta_chan_print(nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList));

193
	    return 1;
194

195
196
197
198
	} else if(params.flag_listchannelsnaqs) {

	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	    nmxp_chan_print_channelList(channelList);
199
	    return 1;
200

201
202
	}
    }
203

204
205
206
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);

207
208
    nmxptool_log_params(&params);

209
    /* Get list of available channels and get a subset list of params.channels */
210
211
212
213
214
215
216
217
    if(params.start_time != 0.0  &&  params.end_time != 0.0) {
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
	/* From NaqsServer */
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    }

218
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
219
220
221

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
222
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
223
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
224
    } else {
225
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
226

227
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
228
229
230
231
232
233

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
234
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
	    channelListSeq[i_chan].x_1 = 0;
236
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
237
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
238
239
	}

240
241
242
243
	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
244
#ifdef HAVE_LIBMSEED
245
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
246
247

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
249

250
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);
251
252
253
254

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
255
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
256

257
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n", NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
258

259
		strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
260
261
262
263
264
265
266
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
267
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels %s error in format!\n");
268
269
270
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
271
272
	}
#endif
273

274
275
276
277
278
279
280
281
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

282
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
283

284
    /* TODO condition starting DAP or PDS */
285
    if( (params.start_time != 0.0   &&   params.end_time != 0.0)
286
	    || params.delay > 0
287
      ) {
288

Matteo Quintiliani's avatar
Matteo Quintiliani committed
289
	if(params.delay > 0) {
290
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
291
292
293
	    params.end_time = params.start_time + span_interval;
	}

294

295
296
297
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
298

299
300
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
301
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
302
303
	    return 1;
	}
304

305
306
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
307
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
308
309
	    return 1;
	}
310

311
312
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
313
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
314
315
316
317
318
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
319
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
320
321
	    return 1;
	}
322

323
324
325
326
	exitdapcondition = 1;

	while(exitdapcondition) {

327
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %.4f - end_time = %.4f\n", params.start_time, params.end_time);
328

329
330
331
	    /* Start loop for sending requests */
	    i_chan=0;
	    request_SOCKET_OK = NMXP_SOCKET_OK;
332

333
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
334

335
336
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
337

338
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
339

340
341
342
343
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
344

345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
		    if(params.flag_writefile) {
			/* Open output file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
360

361
362
363
364
			outfile = fopen(filename, "w");
			if(!outfile) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", filename);
			}
365
		    }
366
367

#ifdef HAVE_LIBMSEED
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
		    if(params.flag_writeseed) {
			/* Open output Mini-SEED file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
383

384
385
386
387
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
			}
388
389
		    }
#endif
390

391
392
393
394
395
396
397
398
399
400
401
402
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
403

404
		    }
405

406
407
408
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
409

410
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
411

412
413
414
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
415

416
417
418
419
			/* Log contents of last packet */
			if(params.flag_logdata) {
			    nmxp_data_log(pd);
			}
420

421
422
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
423

424
425
426
427
428
429
430
431
432
433
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
434
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
435
			}
436
437
438
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}
439

440
#ifdef HAVE_LIBMSEED
441
442
443
444
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
445
#endif
446

447
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
448
449
450
451
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
452
453
#endif

454
455
456
457
458
459
460
461
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
462
			}
463

464
465
466
467
468
469
470
471
472
			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
			    free(pd->buffer);
			    pd->buffer = NULL;
			}
473

474
475
476
477
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
			nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
		    }
478

479
480
481
482
483
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
484

485
#ifdef HAVE_LIBMSEED
486
487
488
489
490
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
491
#endif
492

493
494
		}
		i_chan++;
495
	    }
496
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
497

498
499
500
501
502
503
504
505
506
507
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
508
	    } else {
509
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
510
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
511

512

513
	} /* END while(exitdapcondition) */
514

515
516
517
518
519
520
521
522
523
524
525
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */


526

Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
    } else {
528

529
530
	if(params.stc == -1) {

531

532
533
534
535
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

536
537
538
539
540
541
542
543
544
545
546
547
548
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
549
550
551
552
553
554
555

#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {
		p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	    }
#endif

556
	}
557

558
559
560
561
562
563
564
565
566
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
567
568
	}

569
570
571
572
573
574
575
576
577
578
579
580
581
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
582
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
583
584
585


	/* PDS Step 4: Send a Request Pending (optional) */
586
587


588
589
590
591
592
593
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
594
595
596
597
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
598
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
599
600
601

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
602
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
603
	    } else {
604
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
605
606
607
608
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
609
610
611
	// TODO
	exitpdscondition = 1;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
612
613
614
615
616
617
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_attach();
	}
#endif

618
	skip_current_packet = 0;
619

Matteo Quintiliani's avatar
Matteo Quintiliani committed
620
	while(exitpdscondition) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
621

622
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
623
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
624

625
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
626
627
628
		// TODO
		exitpdscondition = 1;
	    } else {
629
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
630
631
632
633
634

#ifdef HAVE_EARTHWORMOBJS
		nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
635
636
		exitpdscondition = 0;
	    }
637

638
639
640
641
642
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
643
	    /* Log contents of last packet */
644
645
646
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
647

648
	    skip_current_packet = 0;
649
	    if(pd &&
650
		    (params.statefile  ||  params.buffered_time)
651
652
653
654
655
656
657
658
659
660
661
662
663
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
664
665
666
667
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
668
			    pd->time = cur_after_start_time;
669
670
671
672
673
674
675
676
677
678
679
680
681
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

	    if(!skip_current_packet) {

682
683
		/* Manage Raw Stream */
		if(params.stc == -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
684

685
686
687
688
689
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
690

691
692
693
694
695
696
697
698
699
700
701
702
703
704
		    /* Check timeout for other channels */
		    if(params.timeoutrecv > 0) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
705
706
			}
		    }
707
708

		} else {
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
725
			}
726

Matteo Quintiliani's avatar
Matteo Quintiliani committed
727
728

#ifdef HAVE_LIBMSEED
729
730
731
732
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
733
734
#endif

735
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
736
737
738
739
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
740
#endif
741
742
		    }
		}
743
	    } /* End skip_current_packet condition */
744

Matteo Quintiliani's avatar
Matteo Quintiliani committed
745
	    if(pd) {
746
747
748
749
750
751
752
753
754
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
		    free(pd->buffer);
		    pd->buffer = NULL;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
755
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
756

Matteo Quintiliani's avatar
Matteo Quintiliani committed
757
758
759
760
761
762
#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {

		/* Check if we are being asked to terminate */
		if( nmxptool_ew_check_flag_terminate() ) {
		    logit ("t", "nmxptool terminating on request\n");
763
		    nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
764
765
766
767
768
769
770
771
772
		    exitpdscondition = 0;
		}

		/* Check if we need to send heartbeat message */
		nmxptool_ew_send_heartbeat_if_needed();

	    }
#endif

773
	} /* End main PDS loop */
774

Matteo Quintiliani's avatar
Matteo Quintiliani committed
775
776
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
777
	save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
778

Matteo Quintiliani's avatar
Matteo Quintiliani committed
779
780
781
782
783
784
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_detach();
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
785
786
787
788
789
790
791
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

792
793
794
795
796
797
798
799
800
801
802

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

803
804


805
    }
806

807
#ifdef HAVE_LIBMSEED
808
809
810
811
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
812
813
	    }
	}
814
    }
815
816
#endif

817
818
819
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
    }
820

821
822
823
    if(channelListSeq) {
	free(channelListSeq);
    }
824

825
826
827
828
    /* This has to be tha last */
    if(channelList_subset) {
	free(channelList_subset);
    }
829

830
    return 0;
831
} /* End MAIN */
832
833


834
835
836
#define MAX_LEN_FILENAME 4096
#define NMXP_STR_STATE_EXT ".nmxpstate"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
837
static void save_channel_states() {
838
839
840
841
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
842
    FILE *fstatefile = NULL;
843
    char statefilefilename[MAX_LEN_FILENAME] = "";
844

845
    if(params.statefile) {
846
847
848
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "w");
849
	if(fstatefile == NULL) {
850
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
851
	} else {
852
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Writing channel states into %s!\n", statefilefilename);
853
	}
854

855
856
857
858
859
860
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
861
	    sprintf(state_line_str, "%s %s %s",
862
		    channelList_subset->channel[to_cur_chan].name,
863
864
		    last_time_str,
		    raw_last_sample_time_str
865
866
867
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
868
		fprintf(fstatefile, "%s\n", state_line_str);
869
870
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
871
872
		} else {
		    /* Do nothing */
873
		}
874
875
876
	    }
	    to_cur_chan++;
	}
877
	if(fstatefile) {
878
	    fclose(fstatefile);
879
880
881
882
	}
    }
}

883
884
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
885
    FILE *fstatefileINPUT = NULL;
886
887
888
889
890
891
892
893
894
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
    char statefilefilename[MAX_LEN_FILENAME] = "";

    if(params.statefile) {
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
	if(fstatefile == NULL) {
	    fstatefileINPUT = fopen(params.statefile, "r");
	    if(fstatefileINPUT == NULL) {
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", params.statefile);
	    } else {
		fstatefile = fopen(statefilefilename, "w");
		if(fstatefile == NULL) {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", statefilefilename);
		} else {
		    while(fgets(line, MAXSIZE_LINE, fstatefileINPUT) != NULL) {
			fputs(line, fstatefile);
		    }
		    fclose(fstatefile);
		}
		fclose(fstatefileINPUT);
	    }
	}
    }
919
920

    if(params.statefile) {
921
922
923
	strncpy(statefilefilename, params.statefile, MAX_LEN_FILENAME);
	strncat(statefilefilename, NMXP_STR_STATE_EXT, MAX_LEN_FILENAME);
	fstatefile = fopen(statefilefilename, "r");
924
	if(fstatefile == NULL) {
925
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to read channel states from %s!\n", statefilefilename);
926
	} else {
927
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", statefilefilename);
928
929
930
931
932
933
934
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
935
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
968
    errno = 0;
969
970
}

971

Matteo Quintiliani's avatar
Matteo Quintiliani committed
972
973
974
975
976
977
978
static void flushing_raw_data_stream() {
    int to_cur_chan;

    /* Flush raw data stream for each channel */
    if(params.stc == -1) {
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
979
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
980
981
982
983
984
985
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
}
986

Matteo Quintiliani's avatar
Matteo Quintiliani committed
987
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
988
989
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
990

991
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Program interrupted!\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
992

Matteo Quintiliani's avatar
Matteo Quintiliani committed
993
    flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
994
    save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
995

Matteo Quintiliani's avatar
Matteo Quintiliani committed
996
997
998
999
1000
    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

1001
#ifdef HAVE_EARTHWORMOBJS
1002
1003
1004
    if(params.ew_configuration_file) {
	nmxptool_ew_detach();
    }
1005
1006
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1007
1008
1009
1010
1011
1012
1013
#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

1014

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1015
1016
1017
1018
1019
1020
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

1021

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1022
1023
1024

    /* Free the complete channel list */
    if(channelList) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1025
	free(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1026
	channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1027
    }
1028

1029
1030
    int i_chan = 0;

1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
#ifdef HAVE_LIBMSEED
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

1041
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
1042
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
1043
1044
    }

1045
1046
1047
1048
1049
    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1050
1051
1052
1053
1054
1055
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
1056

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1057
1058
1059
1060

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1061

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1062
1063
#endif /* HAVE_WINDOWS_H */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1064
1065
1066



1067
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1068
1069
1070
1071
1072
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

1073
	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1074
1075

    } else {
1076
	nmxp_log(