nmxptool.c 22.5 KB
Newer Older
1
2
3
4
5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
6
7
8
9
10

#ifndef WIN32
#include <signal.h>
#endif

11
#include "config.h"
12
13
#include "nmxptool_getoptlong.h"

14
15
16
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
17

18
19
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
20
21
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK
#define GAP_TOLLERANCE 0.001
#define NMXPTOOL_MAX_FUNC_PD 10
25
26
27
28
29
30
31
32
33
34
35
36
37
38

/* Max number of packet I can tollerate to wait.
 * It should be better to express it by time, for example 30 sec., 1 min., ecc....
 */
#define NMXPTOOL_MAX_DIFF_SEQ_NO 40
/* I can suppose in the worst case I can have the double number of packets I can tollerate to wait */
#define NMXPTOOL_MAX_PDLIST_ITEMS NMXPTOOL_MAX_DIFF_SEQ_NO*2

typedef struct {
    int32_t last_seq_no_sent;
    int32_t n_pdlist;
    NMXP_DATA_PROCESS *pdlist[NMXPTOOL_MAX_PDLIST_ITEMS];
} NMXPTOOL_PD_RAW_STREAM;

39
40
41
42
43
44
45
typedef struct {
    int significant;
    double last_time;
    int32_t x_1;
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
46
47
48
49
50
51
52
53
54
55
56
57
58
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);

int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);

int seq_no_compare(const void *a, const void *b);
// TODO func_pd has to become an array of functions
int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *));

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST *channelList_subset = NULL;
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


74
int main (int argc, char **argv) {
75
    int32_t connection_time;
76
    int request_SOCKET_OK;
77
    int i_chan, cur_chan;
78
    int j;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
79
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
81
    int exitdapcondition;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
82
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
84
    int time_to_sleep = 0;

85
86
87

    NMXP_MSG_SERVER type;
    void *buffer;
88
    int32_t length;
89
90
91
    int ret;

    char filename[500];
92
    char station_code[20], channel_code[20];
93
94
95
96
97
98
99
100

    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
101
102
103
104
105
106
107
108
#ifndef WIN32
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
109

Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
111
112
113
114
115
116
117
118
119
120
121
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif


    /* Default is normal output */
122
123
124
125
126
127
128
129
130
131
132
133
    nmxp_log(-1, 0);

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

    /* Check consistency of params */
    if(nmxptool_check_params(&params) != 0) {
	return 1;
    }

134
135
136
137
    if(params.flag_verbose) {
	nmxp_log(-1, 2);
    }

138
139
140
    /* List available channels on server */
    if(params.flag_listchannels) {

Matteo Quintiliani's avatar
Matteo Quintiliani committed
141
142
143
144
	// TOREMOVE
	// channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	// TOREMOVE
	// nmxp_chan_print_channelList(channelList);
145

146
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo);
147

148
149
150
151
152
153
154
155
156
157
158
	return 1;
    }

    /* Get list of available channels and get a subset list of params.channels */
    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
	nmxp_log(1, 0, "Channels not found!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
159
160
    } else {
	nmxp_chan_print_channelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
161

162
163
164
165
166
167
168
	nmxp_log(0, 1, "Init channelListSeq.\n");

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
169
	    channelListSeq[i_chan].x_1 = 0;
170
171
172
173
174
	    channelListSeq[i_chan].raw_stream_buffer.last_seq_no_sent = -1;
	    channelListSeq[i_chan].raw_stream_buffer.n_pdlist = 0;
	    for(j=0; j<NMXPTOOL_MAX_PDLIST_ITEMS; j++) {
		channelListSeq[i_chan].raw_stream_buffer.pdlist[j] = NULL;
	    }
175
176
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
177
#ifdef HAVE_LIBMSEED
178
179
180
	nmxp_log(0, 1, "Init mini-SEED record list.\n");

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
181
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
182
183
184
185
186
187
188
189

	    nmxp_log(0, 1, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {

190
		nmxp_log(0, 1, "%s.%s.%s\n", CURRENT_NETWORK, station_code, channel_code);
191

192
		strcpy(msr_list_chan[i_chan]->network, CURRENT_NETWORK);
193
194
195
196
197
198
199
200
201
202
203
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
		nmxp_log(1, 0, "Channels %s error in format!\n");
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
204
205
	}
#endif
206

207
208
209
210
211
212
213
214
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
215
216
    nmxp_log(0, 1, "Starting comunication.\n");

217
    /* TODO condition starting DAP or PDS */
218
219
220
    if( (params.start_time != 0   &&   params.end_time != 0)
	    || params.delay > 0
	    ) {
221

Matteo Quintiliani's avatar
Matteo Quintiliani committed
222
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
223
	    params.start_time = ((time(NULL) - params.delay - span_interval) / 10) * 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
224
225
226
	    params.end_time = params.start_time + span_interval;
	}

227

228
229
230
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
231

232
233
234
235
236
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
	    nmxp_log(1, 0, "Error opening socket!\n");
	    return 1;
	}
237

238
239
240
241
242
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error reading connection time from server!\n");
	    return 1;
	}
243

244
245
246
247
248
249
250
251
252
253
254
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error sending connect request!\n");
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error waiting Ready message!\n");
	    return 1;
	}
255

256
257
258
259
260
261
	exitdapcondition = 1;

	while(exitdapcondition) {

	nmxp_log(0, 1, "start_time = %d - end_time = %d\n", params.start_time, params.end_time);

262
263
264
	/* Start loop for sending requests */
	i_chan=0;
	request_SOCKET_OK = NMXP_SOCKET_OK;
265

266
	while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
267

268
	    /* DAP Step 5: Send Data Request */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
269
	    request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (double) params.start_time, (double) params.end_time);
270

271
	    if(request_SOCKET_OK == NMXP_SOCKET_OK) {
272

273
274
		if(params.flag_writefile) {
		    /* Open output file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
275
		    sprintf(filename, "%s.%s.%d.%d.%d.nmx",
276
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
277
278
279
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time, params.end_time);
280

281
282
283
284
		    outfile = fopen(filename, "w");
		    if(!outfile) {
			nmxp_log(1, 0, "Can not to open file %s!", filename);
		    }
285
286
287
		}

#ifdef HAVE_LIBMSEED
288
289
		if(params.flag_writeseed) {
		    /* Open output Mini-SEED file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
290
		    sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed",
291
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
292
293
294
295
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time,
			    params.end_time);
296

297
298
299
300
301
302
		    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
		    if(!data_seed.outfile_mseed) {
			nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
		    }
		}
#endif
303

304
305
306
		if(params.flag_writefile  &&  outfile) {
		    /* Compute SNCL line */

307
308
309
310
311
		    /* Separate station_code_old_way and channel_code_old_way */
		    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {
			/* Write SNCL line */
			fprintf(outfile, "%s.%s.%s.%s\n",
				station_code,
312
				CURRENT_NETWORK,
313
314
				channel_code,
				(params.location)? params.location : "");
315
		    }
316
317
318

		}

319
320
321
		/* DAP Step 6: Receive Data until receiving a Ready message */
		ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
322

323
		while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
324

325
		    /* Process a packet and return value in NMXP_DATA_PROCESS structure */
326
		    pd = nmxp_processCompressedData(buffer, length, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
327
		    nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
328
329

		    /* Log contents of last packet */
330
331
332
		    if(params.flag_logdata) {
			nmxp_data_log(pd);
		    }
333

334
		    /* Set cur_chan */
335
		    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
336
337

		    /* Management of gaps */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
338
		    if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
339
340
			channelListSeq[cur_chan].significant = 1;
		    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
341
			if(channelListSeq[cur_chan].significant) {
342
343
344
345
			    if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				channelListSeq[cur_chan].x_1 = 0;
				nmxp_log(0, 0, "Warning: x0 set to zero!\n");
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
346
347
			}
		    }
348
		    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
349
			channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
350
351
		    }

352
#ifdef HAVE_LIBMSEED
353
354
		    /* Write Mini-SEED record */
		    if(params.flag_writeseed) {
355
			nmxptool_write_miniseed(pd);
356
		    }
357
#endif
358

359
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
360
		    /* Send data to SeedLink Server */
361
		    if(params.flag_slink) {
362
			nmxptool_send_raw_depoch(pd);
363
		    }
364
365
#endif

366
367
368
		    if(params.flag_writefile  &&  outfile) {
			/* Write buffer to the output file */
			if(outfile && buffer && length > 0) {
369
			    int32_t length_int = length;
370
371
372
373
374
			    nmxp_data_swap_4b((int32_t *) &length_int);
			    fwrite(&length_int, sizeof(length_int), 1, outfile);
			    fwrite(buffer, length, 1, outfile);
			}
		    }
375

Matteo Quintiliani's avatar
Matteo Quintiliani committed
376
377
378
379
380
		    /* Store x_1 */
		    if(pd->nSamp > 0) {
			channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		    }
		    /* Free pd->buffer */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
381
382
383
		    if(pd->buffer) {
			free(pd->buffer);
			pd->buffer = NULL;
384
		    }
385
386
387
388

		    /* Receive Data */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		    nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
389
390
		}

391
392
393
		if(params.flag_writefile  &&  outfile) {
		    /* Close output file */
		    fclose(outfile);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
394
		    outfile = NULL;
395
396
		}

397
398
399
400
#ifdef HAVE_LIBMSEED
		if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
		    /* Close output Mini-SEED file */
		    fclose(data_seed.outfile_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
401
		    data_seed.outfile_mseed = NULL;
402
403
		}
#endif
404
405

	    }
406
407
408
	    i_chan++;
	}
	/* DAP Step 7: Repeat steps 5 and 6 for each data request */
409

Matteo Quintiliani's avatar
Matteo Quintiliani committed
410
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
411
412
413
414
415
416
417
	    time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
	    if(time_to_sleep >= 0) {
		sleep(time_to_sleep);
	    } else {
		nmxp_log(1, 0, "time to sleep %dsec.\n", time_to_sleep);
		sleep(3);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
418
419
420
421
422
423
	    params.start_time = params.end_time;
	    params.end_time = params.start_time + span_interval;
	} else {
	    exitdapcondition = 0;
	}

424

Matteo Quintiliani's avatar
Matteo Quintiliani committed
425
    } /* END while(exitdapcondition) */
426

427
428
429
430
431
432
433
434
435
436
437
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */


438

Matteo Quintiliani's avatar
Matteo Quintiliani committed
439
    } else {
440

Matteo Quintiliani's avatar
Matteo Quintiliani committed
441
442
443
	int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
	p_func_pd[0] = nmxptool_send_raw_depoch;
	p_func_pd[1] = nmxptool_write_miniseed;
444

445
446
447
448
449
450
451
452
453
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
454
455
	}

456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);


	/* PDS Step 4: Send a Request Pending (optional) */
473
474


475
476
477
478
479
480
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
481
482
483
484
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
485
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
486
487
488
489
490
491
492
493
494
495

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
		nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
	    } else {
		nmxp_log(0, 1, "Opened file %s!\n", data_seed.filename_mseed);
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
496
497
498
499
	// TODO
	exitpdscondition = 1;

	while(exitpdscondition) {
500
	    /* Process Compressed or Decompressed Data */
501
	    pd = nmxp_receiveData(naqssock, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
502
503

	    /* Log contents of last packet */
504
505
506
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
507

508
	    /* Set cur_chan */
509
	    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
510
511

	    /* Management of gaps */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
512
	    if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
513
514
		channelListSeq[cur_chan].significant = 1;
	    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
515
		if(channelListSeq[cur_chan].significant) {
516
517
518
519
		    if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
			channelListSeq[cur_chan].x_1 = 0;
			nmxp_log(0, 0, "Warning: x0 set to zero!\n");
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
520
521
		}
	    }
522
	    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
523
		channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
524
525
	    }

526
	    nmxptool_add_and_do_ordered(&(channelListSeq[cur_chan].raw_stream_buffer), pd, nmxp_data_log);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
528
529
530

#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
531
		nmxptool_write_miniseed(pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
532
533
534
	    }
#endif

535
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
536
537
538
539
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		nmxptool_send_raw_depoch(pd);
	    }
540
541
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
542
543
544
545
546
	    /* Store x_1 */
	    if(pd->nSamp > 0) {
		channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
	    }
	    /* Free pd->buffer */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
547
548
549
550
551
	    if(pd->buffer) {
		free(pd->buffer);
		pd->buffer = NULL;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
552
553
	    // TODO
	    exitpdscondition = 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
554
555
556
557
558
559
560
561
562
	}

#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

563
564
565
566
567
568
569
570
571
572
573

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

574
575
576



577
    }
578

579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
#ifdef HAVE_LIBMSEED
	if(*msr_list_chan) {
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
		if(msr_list_chan[i_chan]) {
		    msr_free(&(msr_list_chan[i_chan]));
		}
	    }
	}
#endif

	if(channelListSeq) {
	    free(channelListSeq);
	}

	/* This has to be tha last */
	if(channelList_subset) {
	    free(channelList_subset);
	}

598
599

    return 0;
600
} /* End MAIN */
601
602
603
604
605





Matteo Quintiliani's avatar
Matteo Quintiliani committed
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
    nmxp_log(0, 0, "Program interrupted!\n");

    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

622

Matteo Quintiliani's avatar
Matteo Quintiliani committed
623
624
625
626
627
628
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

629

Matteo Quintiliani's avatar
Matteo Quintiliani committed
630
631
632
    if(channelList == NULL) {
	free(channelList);
    }
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649

#ifdef HAVE_LIBMSEED
    int i_chan;
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
650
651
652
653
654
655
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
656

Matteo Quintiliani's avatar
Matteo Quintiliani committed
657
658
659
660

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826



int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}


int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);

    } else {
	nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
    }
    return ret;
}

int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
    /* TODO Set values */
    const int usec_correction = 0;
    const int timing_quality = 100;

    return send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
	    pd->pDataPtr, pd->nSamp);
}


int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *)) {
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
    NMXP_DATA_PROCESS *pd = NULL;

    /* Allocate memory for pd and copy a_pd */
    pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
    memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
    if(a_pd->length > 0) {
	pd->buffer = malloc(pd->length);
	memcpy(pd->buffer, a_pd->buffer, a_pd->length);
    } else {
	pd->buffer = NULL;
    }
    if(a_pd->nSamp *  sizeof(int) > 0) {
	pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
    } else {
	pd->pDataPtr = NULL;
    }

    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
    }

    /* Add pd and sort array */
    if(p->n_pdlist + 1 >= NMXPTOOL_MAX_PDLIST_ITEMS) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
	    nmxp_log(0, 0, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	}
    }

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	nmxp_log(0, 0, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    send_again = 1;
	    j++;
	} else if(seq_no_diff >= NMXPTOOL_MAX_DIFF_SEQ_NO) {
	    // I have to drop packet with sequence number p->last_seq_no_sent+1
	    nmxp_log(LOG_WARN, 0, "Give up to wait packet %d!\n", p->last_seq_no_sent+1);
	    p->last_seq_no_sent++;
	    send_again = 1;
	}
    }

    /* Shift and free handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k + j < p->n_pdlist) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

    nmxp_log(0, 0, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);

    return ret;
}


int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    int ret = 0;
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time1, time2);
	ret = 1;
    } else if (gap < -gap_tollerance) {
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time2, time1);
	ret = 1;
    }
    return ret;
}