nmxptool.c 24.3 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
10
11
 * $Id: nmxptool.c,v 1.45 2007-09-07 06:58:07 mtheo Exp $
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
 * $Log: not supported by cvs2svn $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
13
14
15
 * Revision 1.44  2007/09/07 06:57:08  mtheo
 * test on cvs keywords
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
18
 * 
 */

19
20
21
22
23
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24
25
26
27
28

#ifndef WIN32
#include <signal.h>
#endif

29
#include "config.h"
30
31
#include "nmxptool_getoptlong.h"

32
33
34
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
35

36
37
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
38
39
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
40
41
42
#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK
#define GAP_TOLLERANCE 0.001
#define NMXPTOOL_MAX_FUNC_PD 10
43

44
#define NMXPTOOL_MAX_PDLIST_ITEMS 40
45
46
47
48
49
50
51

typedef struct {
    int32_t last_seq_no_sent;
    int32_t n_pdlist;
    NMXP_DATA_PROCESS *pdlist[NMXPTOOL_MAX_PDLIST_ITEMS];
} NMXPTOOL_PD_RAW_STREAM;

52
53
54
55
56
57
58
typedef struct {
    int significant;
    double last_time;
    int32_t x_1;
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
59
60
61
62
63
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);

int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
64
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
66

int seq_no_compare(const void *a, const void *b);
67
int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
68
69
70
71

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST *channelList_subset = NULL;
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


87
int main (int argc, char **argv) {
88
    int32_t connection_time;
89
    int request_SOCKET_OK;
90
    int i_chan, cur_chan;
91
    int j;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
92
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
93
94
    int exitdapcondition;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
95
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
96
97
    int time_to_sleep = 0;

98
99
100

    NMXP_MSG_SERVER type;
    void *buffer;
101
    int32_t length;
102
103
104
    int ret;

    char filename[500];
105
    char station_code[20], channel_code[20];
106
107
108
109
110
111
112
113

    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
114
115
116
117
118
119
120
121
#ifndef WIN32
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
122

Matteo Quintiliani's avatar
Matteo Quintiliani committed
123
124
125
126
127
128
129
130
131
132
133
134
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif


    /* Default is normal output */
135
136
137
138
139
140
141
142
143
144
145
146
    nmxp_log(-1, 0);

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

    /* Check consistency of params */
    if(nmxptool_check_params(&params) != 0) {
	return 1;
    }

147
148
149
150
    if(params.flag_verbose) {
	nmxp_log(-1, 2);
    }

151
152
153
    /* List available channels on server */
    if(params.flag_listchannels) {

Matteo Quintiliani's avatar
Matteo Quintiliani committed
154
155
156
157
	// TOREMOVE
	// channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	// TOREMOVE
	// nmxp_chan_print_channelList(channelList);
158

159
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo);
160

161
162
163
164
165
166
167
168
169
170
171
	return 1;
    }

    /* Get list of available channels and get a subset list of params.channels */
    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
	nmxp_log(1, 0, "Channels not found!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
172
173
    } else {
	nmxp_chan_print_channelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
174

175
176
177
178
179
180
181
	nmxp_log(0, 1, "Init channelListSeq.\n");

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
182
	    channelListSeq[i_chan].x_1 = 0;
183
184
185
186
187
	    channelListSeq[i_chan].raw_stream_buffer.last_seq_no_sent = -1;
	    channelListSeq[i_chan].raw_stream_buffer.n_pdlist = 0;
	    for(j=0; j<NMXPTOOL_MAX_PDLIST_ITEMS; j++) {
		channelListSeq[i_chan].raw_stream_buffer.pdlist[j] = NULL;
	    }
188
189
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
190
#ifdef HAVE_LIBMSEED
191
192
193
	nmxp_log(0, 1, "Init mini-SEED record list.\n");

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
194
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
195
196
197
198
199
200
201
202

	    nmxp_log(0, 1, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {

203
		nmxp_log(0, 1, "%s.%s.%s\n", CURRENT_NETWORK, station_code, channel_code);
204

205
		strcpy(msr_list_chan[i_chan]->network, CURRENT_NETWORK);
206
207
208
209
210
211
212
213
214
215
216
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
		nmxp_log(1, 0, "Channels %s error in format!\n");
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
217
218
	}
#endif
219

220
221
222
223
224
225
226
227
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
228
229
    nmxp_log(0, 1, "Starting comunication.\n");

230
    /* TODO condition starting DAP or PDS */
231
232
233
    if( (params.start_time != 0   &&   params.end_time != 0)
	    || params.delay > 0
	    ) {
234

Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
236
	    params.start_time = ((time(NULL) - params.delay - span_interval) / 10) * 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
237
238
239
	    params.end_time = params.start_time + span_interval;
	}

240

241
242
243
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
244

245
246
247
248
249
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
	    nmxp_log(1, 0, "Error opening socket!\n");
	    return 1;
	}
250

251
252
253
254
255
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error reading connection time from server!\n");
	    return 1;
	}
256

257
258
259
260
261
262
263
264
265
266
267
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error sending connect request!\n");
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error waiting Ready message!\n");
	    return 1;
	}
268

269
270
271
272
273
274
	exitdapcondition = 1;

	while(exitdapcondition) {

	nmxp_log(0, 1, "start_time = %d - end_time = %d\n", params.start_time, params.end_time);

275
276
277
	/* Start loop for sending requests */
	i_chan=0;
	request_SOCKET_OK = NMXP_SOCKET_OK;
278

279
	while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
280

281
	    /* DAP Step 5: Send Data Request */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
282
	    request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (double) params.start_time, (double) params.end_time);
283

284
	    if(request_SOCKET_OK == NMXP_SOCKET_OK) {
285

286
287
		if(params.flag_writefile) {
		    /* Open output file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
288
		    sprintf(filename, "%s.%s.%d.%d.%d.nmx",
289
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
290
291
292
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time, params.end_time);
293

294
295
296
297
		    outfile = fopen(filename, "w");
		    if(!outfile) {
			nmxp_log(1, 0, "Can not to open file %s!", filename);
		    }
298
299
300
		}

#ifdef HAVE_LIBMSEED
301
302
		if(params.flag_writeseed) {
		    /* Open output Mini-SEED file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
303
		    sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed",
304
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
305
306
307
308
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time,
			    params.end_time);
309

310
311
312
313
314
315
		    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
		    if(!data_seed.outfile_mseed) {
			nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
		    }
		}
#endif
316

317
318
319
		if(params.flag_writefile  &&  outfile) {
		    /* Compute SNCL line */

320
321
322
323
324
		    /* Separate station_code_old_way and channel_code_old_way */
		    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {
			/* Write SNCL line */
			fprintf(outfile, "%s.%s.%s.%s\n",
				station_code,
325
				CURRENT_NETWORK,
326
327
				channel_code,
				(params.location)? params.location : "");
328
		    }
329
330
331

		}

332
333
334
		/* DAP Step 6: Receive Data until receiving a Ready message */
		ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
335

336
		while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
337

338
		    /* Process a packet and return value in NMXP_DATA_PROCESS structure */
339
		    pd = nmxp_processCompressedData(buffer, length, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
340
		    nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
341
342

		    /* Log contents of last packet */
343
344
345
		    if(params.flag_logdata) {
			nmxp_data_log(pd);
		    }
346

347
		    /* Set cur_chan */
348
		    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
349
350

		    /* Management of gaps */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
351
		    if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
352
353
			channelListSeq[cur_chan].significant = 1;
		    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
354
			if(channelListSeq[cur_chan].significant) {
355
356
			    if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				channelListSeq[cur_chan].x_1 = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
357
				nmxp_log(NMXP_LOG_WARN, 0, "x0 set to zero!\n");
358
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
359
360
			}
		    }
361
		    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
362
			channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
363
364
		    }

365
#ifdef HAVE_LIBMSEED
366
367
		    /* Write Mini-SEED record */
		    if(params.flag_writeseed) {
368
			nmxptool_write_miniseed(pd);
369
		    }
370
#endif
371

372
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
373
		    /* Send data to SeedLink Server */
374
		    if(params.flag_slink) {
375
			nmxptool_send_raw_depoch(pd);
376
		    }
377
378
#endif

379
380
381
		    if(params.flag_writefile  &&  outfile) {
			/* Write buffer to the output file */
			if(outfile && buffer && length > 0) {
382
			    int32_t length_int = length;
383
384
385
386
387
			    nmxp_data_swap_4b((int32_t *) &length_int);
			    fwrite(&length_int, sizeof(length_int), 1, outfile);
			    fwrite(buffer, length, 1, outfile);
			}
		    }
388

Matteo Quintiliani's avatar
Matteo Quintiliani committed
389
390
391
392
393
		    /* Store x_1 */
		    if(pd->nSamp > 0) {
			channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		    }
		    /* Free pd->buffer */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
394
395
396
		    if(pd->buffer) {
			free(pd->buffer);
			pd->buffer = NULL;
397
		    }
398
399
400
401

		    /* Receive Data */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		    nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
402
403
		}

404
405
406
		if(params.flag_writefile  &&  outfile) {
		    /* Close output file */
		    fclose(outfile);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
407
		    outfile = NULL;
408
409
		}

410
411
412
413
#ifdef HAVE_LIBMSEED
		if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
		    /* Close output Mini-SEED file */
		    fclose(data_seed.outfile_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
414
		    data_seed.outfile_mseed = NULL;
415
416
		}
#endif
417
418

	    }
419
420
421
	    i_chan++;
	}
	/* DAP Step 7: Repeat steps 5 and 6 for each data request */
422

Matteo Quintiliani's avatar
Matteo Quintiliani committed
423
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
424
425
426
427
428
429
430
	    time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
	    if(time_to_sleep >= 0) {
		sleep(time_to_sleep);
	    } else {
		nmxp_log(1, 0, "time to sleep %dsec.\n", time_to_sleep);
		sleep(3);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
431
432
433
434
435
436
	    params.start_time = params.end_time;
	    params.end_time = params.start_time + span_interval;
	} else {
	    exitdapcondition = 0;
	}

437

Matteo Quintiliani's avatar
Matteo Quintiliani committed
438
    } /* END while(exitdapcondition) */
439

440
441
442
443
444
445
446
447
448
449
450
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */


451

Matteo Quintiliani's avatar
Matteo Quintiliani committed
452
    } else {
453

454
	int n_func_pd = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
455
	int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
456
457
458

	if(params.stc == -1) {

459
460
461
462
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

463
464
465
466
467
468
469
470
471
472
473
474
475
476
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
	}
477

478
479
480
481
482
483
484
485
486
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
487
488
	}

489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);


	/* PDS Step 4: Send a Request Pending (optional) */
506
507


508
509
510
511
512
513
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
514
515
516
517
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
518
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
519
520
521
522
523
524
525
526
527
528

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
		nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
	    } else {
		nmxp_log(0, 1, "Opened file %s!\n", data_seed.filename_mseed);
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
529
530
531
532
	// TODO
	exitpdscondition = 1;

	while(exitpdscondition) {
533
	    /* Process Compressed or Decompressed Data */
534
	    pd = nmxp_receiveData(naqssock, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
535
536

	    /* Log contents of last packet */
537
538
539
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
540

541
	    /* Set cur_chan */
542
	    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
543

544
545
546
	    /* Manage Raw Stream */
	    if(params.stc == -1) {
		nmxptool_manage_raw_stream(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
547
	    } else {
548
549
550
551
552
553
554
555

		/* Management of gaps */
		if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
		    channelListSeq[cur_chan].significant = 1;
		} else {
		    if(channelListSeq[cur_chan].significant) {
			if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
			    channelListSeq[cur_chan].x_1 = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
556
			    nmxp_log(NMXP_LOG_WARN, 0, "x0 set to zero!\n");
557
			}
558
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
559
		}
560
561
562
		if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
		    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
		}
563

Matteo Quintiliani's avatar
Matteo Quintiliani committed
564
565

#ifdef HAVE_LIBMSEED
566
567
568
569
		/* Write Mini-SEED record */
		if(params.flag_writeseed) {
		    nmxptool_write_miniseed(pd);
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
570
571
#endif

572
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
573
574
575
576
		/* Send data to SeedLink Server */
		if(params.flag_slink) {
		    nmxptool_send_raw_depoch(pd);
		}
577
#endif
578
	    }
579

Matteo Quintiliani's avatar
Matteo Quintiliani committed
580
581
582
583
584
	    /* Store x_1 */
	    if(pd->nSamp > 0) {
		channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
	    }
	    /* Free pd->buffer */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
585
586
587
588
589
	    if(pd->buffer) {
		free(pd->buffer);
		pd->buffer = NULL;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
590
591
	    // TODO
	    exitpdscondition = 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
592
593
594
595
596
597
598
599
600
	}

#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

601
602
603
604
605
606
607
608
609
610
611

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

612
613
614



615
    }
616

617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
#ifdef HAVE_LIBMSEED
	if(*msr_list_chan) {
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
		if(msr_list_chan[i_chan]) {
		    msr_free(&(msr_list_chan[i_chan]));
		}
	    }
	}
#endif

	if(channelListSeq) {
	    free(channelListSeq);
	}

	/* This has to be tha last */
	if(channelList_subset) {
	    free(channelList_subset);
	}

636
637

    return 0;
638
} /* End MAIN */
639
640
641
642
643





Matteo Quintiliani's avatar
Matteo Quintiliani committed
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
    nmxp_log(0, 0, "Program interrupted!\n");

    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

660

Matteo Quintiliani's avatar
Matteo Quintiliani committed
661
662
663
664
665
666
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

667

Matteo Quintiliani's avatar
Matteo Quintiliani committed
668
669
670
    if(channelList == NULL) {
	free(channelList);
    }
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687

#ifdef HAVE_LIBMSEED
    int i_chan;
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
688
689
690
691
692
693
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
694

Matteo Quintiliani's avatar
Matteo Quintiliani committed
695
696
697
698

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736



int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}


int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);

    } else {
	nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
    }
    return ret;
}

737
738
739
740
741
742
743
744
745
746
747
748
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd) {
    int ret = 0;

    nmxp_log(NMXP_LOG_NORM_NO, 0, "%2d %d %d\n",
	    pd->packet_type,
	    pd->seq_no,
	    pd->oldest_seq_no
	    );

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
749
750
751
752
753
754
755
756
757
758
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
    /* TODO Set values */
    const int usec_correction = 0;
    const int timing_quality = 100;

    return send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
	    pd->pDataPtr, pd->nSamp);
}


759
int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
760
761
762
763
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
764
    int i_func_pd;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
765
    char str_time[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
    NMXP_DATA_PROCESS *pd = NULL;

    /* Allocate memory for pd and copy a_pd */
    pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
    memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
    if(a_pd->length > 0) {
	pd->buffer = malloc(pd->length);
	memcpy(pd->buffer, a_pd->buffer, a_pd->length);
    } else {
	pd->buffer = NULL;
    }
    if(a_pd->nSamp *  sizeof(int) > 0) {
	pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
    } else {
	pd->pDataPtr = NULL;
    }

    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
787
	nmxp_log(0, 1, "First time nmxptool_manage_raw_stream().\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
788
789
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
790
791
    // TODO Condition for max tollerable latency

Matteo Quintiliani's avatar
Matteo Quintiliani committed
792
    /* Add pd and sort array */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
793
    if(p->n_pdlist >= NMXPTOOL_MAX_PDLIST_ITEMS) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
794
795
796
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
797
798
	seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	if( seq_no_diff > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
799
800
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "Force handling packet %d (%s)!\n", p->pdlist[0]->seq_no, str_time);
801
802
803
804
805
806
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[0]);
	    }
	    p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	} else {
	    /* It should not occur */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
807
808
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "NOT OCCUR! Packets %d (%s) discarded, seq_no_diff=%d.\n", p->pdlist[0]->seq_no, str_time, seq_no_diff);
809
810
811
812
813
814
815
816
817
818
819
820
821
822
	}

	/* Free handled packet */
	if(p->pdlist[0]->buffer) {
	    free(p->pdlist[0]->buffer);
	    p->pdlist[0]->buffer = NULL;
	}
	if(p->pdlist[0]->pDataPtr) {
	    free(p->pdlist[0]->pDataPtr);
	    p->pdlist[0]->pDataPtr = NULL;
	}
	if(p->pdlist[0]) {
	    free(p->pdlist[0]);
	    p->pdlist[0] = NULL;
823
	}
824

Matteo Quintiliani's avatar
Matteo Quintiliani committed
825
826
827
828
829
830
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

831
832
    // TODO Check for packet duplication in pd->pdlist

Matteo Quintiliani's avatar
Matteo Quintiliani committed
833
834
835
836
    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
837
	    nmxp_log(0, 1, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
838
839
840
841
842
843
844
845
846
847
848
	}
    }

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
Matteo Quintiliani's avatar
Matteo Quintiliani committed
849
850
	    nmxp_data_to_str(str_time, p->pdlist[j]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "Packets %d (%s) discarded, seq_no_diff=%d.\n", p->pdlist[j]->seq_no, str_time, seq_no_diff);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
851
852
853
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
854
855
856
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
857
858
859
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    send_again = 1;
	    j++;
860
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
861
862
863
	    nmxp_data_to_str(str_time, p->pdlist[j]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "seq_no_diff=%d (%d-%d)  j=%2d  p->n_pdlist=%2d (%s)\n",
		    seq_no_diff, p->n_pdlist, p->pdlist[j]->seq_no, j, p->last_seq_no_sent, str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
864
865
866
	}
    }

867
    /* Shift and free j handled elements */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
868
869
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
870
	    if(k < j) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
871
872
873
874
875
876
877
878
879
880
881
882
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
883
884
	    }
	    if(k + j < p->n_pdlist) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
885
886
887
888
889
890
891
892
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

893
    nmxp_log(0, 1, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910

    return ret;
}


int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    int ret = 0;
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time1, time2);
	ret = 1;
    } else if (gap < -gap_tollerance) {
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time2, time1);
	ret = 1;
    }
    return ret;
}