nmxptool.c 21.7 KB
Newer Older
1
2
3
4
5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
6
7
8
9
10

#ifndef WIN32
#include <signal.h>
#endif

11
#include "config.h"
12
13
#include "nmxptool_getoptlong.h"

14
15
16
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
17

18
19
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
20
21
#endif

22

23
24
#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

/* Max number of packet I can tollerate to wait.
 * It should be better to express it by time, for example 30 sec., 1 min., ecc....
 */
#define NMXPTOOL_MAX_DIFF_SEQ_NO 40
/* I can suppose in the worst case I can have the double number of packets I can tollerate to wait */
#define NMXPTOOL_MAX_PDLIST_ITEMS NMXPTOOL_MAX_DIFF_SEQ_NO*2

typedef struct {
    int32_t last_seq_no_sent;
    int32_t n_pdlist;
    NMXP_DATA_PROCESS *pdlist[NMXPTOOL_MAX_PDLIST_ITEMS];
} NMXPTOOL_PD_RAW_STREAM;

int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
60
int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *)) {
61
62
63
64
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
66
    NMXP_DATA_PROCESS *pd = NULL;

67
    /* Allocate memory for pd and copy a_pd */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
68
69
70
    pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
    memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
    pd->buffer = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
71
72
73
74
75
76
    if(a_pd->nSamp *  sizeof(int) > 0) {
	pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
    } else {
	pd->pDataPtr = NULL;
    }
77

78
    /* First time */
79
80
81
82
83
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
    }

84
    /* Add pd and sort array */
85
86
87
88
89
90
91
92
93
94
95
96
97
    if(p->n_pdlist + 1 >= NMXPTOOL_MAX_PDLIST_ITEMS) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

98
    /* Print array, only for debugging */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
99
100
101
102
103
104
105
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
	    nmxp_log(0, 0, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	}
    }

106
    /* Manage array and execute func_pd() */
107
108
109
110
111
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
	nmxp_log(0, 0, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
113
114
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
Matteo Quintiliani's avatar
Matteo Quintiliani committed
115
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
116
117
118
119
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
120
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
121
122
123
124
125
126
127
128
129
130
	    send_again = 1;
	    j++;
	} else if(seq_no_diff >= NMXPTOOL_MAX_DIFF_SEQ_NO) {
	    // I have to drop packet with sequence number p->last_seq_no_sent+1
	    nmxp_log(LOG_WARN, 0, "Give up to wait packet %d!\n", p->last_seq_no_sent+1);
	    p->last_seq_no_sent++;
	    send_again = 1;
	}
    }

131
    /* Shift and free handled elements */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
132
133
134
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k + j < p->n_pdlist) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
135
136
137
138
139
140
141
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
142
143
144
145
		}
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
146
147
	    }
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
148
	p->n_pdlist = p->n_pdlist - j;
149
150
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
151
    nmxp_log(0, 0, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
152
153
154
155

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
156
157
158
typedef struct {
    int significant;
    double last_time;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
159
    int32_t x_1;
160
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
161
162
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
163

164
165
#define GAP_TOLLERANCE 0.001

166
167
int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    int ret = 0;
168
169
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
170
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time1, time2);
171
	ret = 1;
172
    } else if (gap < -gap_tollerance) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
173
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time2, time1);
174
	ret = 1;
175
    }
176
    return ret;
177
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
178
179
180
181
182
183
184
185
186
187
188

static void clientShutdown(int sig);
static void clientDummyHandler(int sig);


/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST *channelList_subset = NULL;
189
190
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
191
192
193
#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
194
MSRecord *msr_list_chan[MAX_N_CHAN];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
195
196
#endif

197
198

int main (int argc, char **argv) {
199
    int32_t connection_time;
200
    int request_SOCKET_OK;
201
    int i_chan, cur_chan;
202
    int j;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
203
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
204
205
    int exitdapcondition;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
206
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
207
208
    int time_to_sleep = 0;

209
210
211

    NMXP_MSG_SERVER type;
    void *buffer;
212
    int32_t length;
213
214
215
    int ret;

    char filename[500];
216
    char station_code[20], channel_code[20];
217
218
219
220
221
222
223
224

    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
225
226
227
228
229
230
231
232
#ifndef WIN32
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
233

Matteo Quintiliani's avatar
Matteo Quintiliani committed
234
235
236
237
238
239
240
241
242
243
244
245
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif


    /* Default is normal output */
246
247
248
249
250
251
252
253
254
255
256
257
    nmxp_log(-1, 0);

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

    /* Check consistency of params */
    if(nmxptool_check_params(&params) != 0) {
	return 1;
    }

258
259
260
261
    if(params.flag_verbose) {
	nmxp_log(-1, 2);
    }

262
263
264
    /* List available channels on server */
    if(params.flag_listchannels) {

Matteo Quintiliani's avatar
Matteo Quintiliani committed
265
266
267
268
	// TOREMOVE
	// channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	// TOREMOVE
	// nmxp_chan_print_channelList(channelList);
269

270
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo);
271

272
273
274
275
276
277
278
279
280
281
282
	return 1;
    }

    /* Get list of available channels and get a subset list of params.channels */
    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
	nmxp_log(1, 0, "Channels not found!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
283
284
    } else {
	nmxp_chan_print_channelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
285

286
287
288
289
290
291
292
	nmxp_log(0, 1, "Init channelListSeq.\n");

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
293
	    channelListSeq[i_chan].x_1 = 0;
294
295
296
297
298
	    channelListSeq[i_chan].raw_stream_buffer.last_seq_no_sent = -1;
	    channelListSeq[i_chan].raw_stream_buffer.n_pdlist = 0;
	    for(j=0; j<NMXPTOOL_MAX_PDLIST_ITEMS; j++) {
		channelListSeq[i_chan].raw_stream_buffer.pdlist[j] = NULL;
	    }
299
300
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
301
#ifdef HAVE_LIBMSEED
302
303
304
	nmxp_log(0, 1, "Init mini-SEED record list.\n");

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
305
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
306
307
308
309
310
311
312
313

	    nmxp_log(0, 1, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {

314
		nmxp_log(0, 1, "%s.%s.%s\n", CURRENT_NETWORK, station_code, channel_code);
315

316
		strcpy(msr_list_chan[i_chan]->network, CURRENT_NETWORK);
317
318
319
320
321
322
323
324
325
326
327
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
		nmxp_log(1, 0, "Channels %s error in format!\n");
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
328
329
	}
#endif
330

331
332
333
334
335
336
337
338
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
339
340
    nmxp_log(0, 1, "Starting comunication.\n");

341
    /* TODO condition starting DAP or PDS */
342
343
344
    if( (params.start_time != 0   &&   params.end_time != 0)
	    || params.delay > 0
	    ) {
345

Matteo Quintiliani's avatar
Matteo Quintiliani committed
346
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
347
	    params.start_time = ((time(NULL) - params.delay - span_interval) / 10) * 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
348
349
350
	    params.end_time = params.start_time + span_interval;
	}

351

352
353
354
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
355

356
357
358
359
360
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
	    nmxp_log(1, 0, "Error opening socket!\n");
	    return 1;
	}
361

362
363
364
365
366
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error reading connection time from server!\n");
	    return 1;
	}
367

368
369
370
371
372
373
374
375
376
377
378
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error sending connect request!\n");
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error waiting Ready message!\n");
	    return 1;
	}
379

380
381
382
383
384
385
	exitdapcondition = 1;

	while(exitdapcondition) {

	nmxp_log(0, 1, "start_time = %d - end_time = %d\n", params.start_time, params.end_time);

386
387
388
	/* Start loop for sending requests */
	i_chan=0;
	request_SOCKET_OK = NMXP_SOCKET_OK;
389

390
	while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
391

392
	    /* DAP Step 5: Send Data Request */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
393
	    request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (double) params.start_time, (double) params.end_time);
394

395
	    if(request_SOCKET_OK == NMXP_SOCKET_OK) {
396

397
398
		if(params.flag_writefile) {
		    /* Open output file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
399
		    sprintf(filename, "%s.%s.%d.%d.%d.nmx",
400
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
401
402
403
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time, params.end_time);
404

405
406
407
408
		    outfile = fopen(filename, "w");
		    if(!outfile) {
			nmxp_log(1, 0, "Can not to open file %s!", filename);
		    }
409
410
411
		}

#ifdef HAVE_LIBMSEED
412
413
		if(params.flag_writeseed) {
		    /* Open output Mini-SEED file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
414
		    sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed",
415
			    CURRENT_NETWORK,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
416
417
418
419
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time,
			    params.end_time);
420

421
422
423
424
425
426
		    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
		    if(!data_seed.outfile_mseed) {
			nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
		    }
		}
#endif
427

428
429
430
		if(params.flag_writefile  &&  outfile) {
		    /* Compute SNCL line */

431
432
433
434
435
		    /* Separate station_code_old_way and channel_code_old_way */
		    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {
			/* Write SNCL line */
			fprintf(outfile, "%s.%s.%s.%s\n",
				station_code,
436
				CURRENT_NETWORK,
437
438
				channel_code,
				(params.location)? params.location : "");
439
		    }
440
441
442

		}

443
444
445
		/* DAP Step 6: Receive Data until receiving a Ready message */
		ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
446

447
		while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
448

449
		    /* Process a packet and return value in NMXP_DATA_PROCESS structure */
450
		    pd = nmxp_processCompressedData(buffer, length, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
451
		    nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
452
453

		    /* Log contents of last packet */
454
455
456
		    if(params.flag_logdata) {
			nmxp_data_log(pd);
		    }
457

458
459
		    /* Management of gaps */
		    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
460
		    if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
461
462
			channelListSeq[cur_chan].significant = 1;
		    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
463
			if(channelListSeq[cur_chan].significant) {
464
465
466
467
			    if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				channelListSeq[cur_chan].x_1 = 0;
				nmxp_log(0, 0, "Warning: x0 set to zero!\n");
			    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
468
469
			}
		    }
470
		    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
471
			channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
472
473
		    }

474
#ifdef HAVE_LIBMSEED
475
476
		    /* Write Mini-SEED record */
		    if(params.flag_writeseed) {
477
478
			if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

Matteo Quintiliani's avatar
Matteo Quintiliani committed
479
			    nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);
480
481
482
483

			} else {
			    nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
			}
484
		    }
485
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
486
487
488
		    if(pd->nSamp > 0) {
			channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		    }
489

490
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
491
		    /* Send data to SeedLink Server */
492
		    if(params.flag_slink) {
493
494
495
			/* TODO Set values */
			const int usec_correction = 0;
			const int timing_quality = 100;
496

497
498
499
			send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
				pd->pDataPtr, pd->nSamp);
		    }
500
501
#endif

502
503
504
		    if(params.flag_writefile  &&  outfile) {
			/* Write buffer to the output file */
			if(outfile && buffer && length > 0) {
505
			    int32_t length_int = length;
506
507
508
509
510
			    nmxp_data_swap_4b((int32_t *) &length_int);
			    fwrite(&length_int, sizeof(length_int), 1, outfile);
			    fwrite(buffer, length, 1, outfile);
			}
		    }
511

Matteo Quintiliani's avatar
Matteo Quintiliani committed
512
513
514
		    if(pd->buffer) {
			free(pd->buffer);
			pd->buffer = NULL;
515
		    }
516
517
518
519

		    /* Receive Data */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		    nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
520
521
		}

522
523
524
		if(params.flag_writefile  &&  outfile) {
		    /* Close output file */
		    fclose(outfile);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
525
		    outfile = NULL;
526
527
		}

528
529
530
531
#ifdef HAVE_LIBMSEED
		if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
		    /* Close output Mini-SEED file */
		    fclose(data_seed.outfile_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
532
		    data_seed.outfile_mseed = NULL;
533
534
		}
#endif
535
536

	    }
537
538
539
	    i_chan++;
	}
	/* DAP Step 7: Repeat steps 5 and 6 for each data request */
540

Matteo Quintiliani's avatar
Matteo Quintiliani committed
541
	if(params.delay > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
542
543
544
545
546
547
548
	    time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
	    if(time_to_sleep >= 0) {
		sleep(time_to_sleep);
	    } else {
		nmxp_log(1, 0, "time to sleep %dsec.\n", time_to_sleep);
		sleep(3);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
549
550
551
552
553
554
	    params.start_time = params.end_time;
	    params.end_time = params.start_time + span_interval;
	} else {
	    exitdapcondition = 0;
	}

555

Matteo Quintiliani's avatar
Matteo Quintiliani committed
556
    } /* END while(exitdapcondition) */
557

558
559
560
561
562
563
564
565
566
567
568
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */


569
    } else {
570

571
572
573
574




575
576
577
578
579
580
581
582
583
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
584
585
	}

586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);


	/* PDS Step 4: Send a Request Pending (optional) */
603
604


605
606
607
608
609
610
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
611
612
613
614
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
615
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
616
617
618
619
620
621
622
623
624
625

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
		nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
	    } else {
		nmxp_log(0, 1, "Opened file %s!\n", data_seed.filename_mseed);
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
626
627
628
629
	// TODO
	exitpdscondition = 1;

	while(exitpdscondition) {
630
	    /* Process Compressed or Decompressed Data */
631
	    pd = nmxp_receiveData(naqssock, channelList_subset, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
632
633

	    /* Log contents of last packet */
634
635
636
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
637

638
639
	    /* Management of gaps */
	    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
640
	    if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
641
642
		channelListSeq[cur_chan].significant = 1;
	    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
643
		if(channelListSeq[cur_chan].significant) {
644
645
646
647
		    if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
			channelListSeq[cur_chan].x_1 = 0;
			nmxp_log(0, 0, "Warning: x0 set to zero!\n");
		    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
648
649
		}
	    }
650
	    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
651
		channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
652
653
	    }

654
	    nmxptool_add_and_do_ordered(&(channelListSeq[cur_chan].raw_stream_buffer), pd, nmxp_data_log);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
655
656
657
658

#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
659
		if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
660

Matteo Quintiliani's avatar
Matteo Quintiliani committed
661
		    nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
662
663
664
665
666
667

		} else {
		    nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
		}
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
668
669
670
	    if(pd->nSamp > 0) {
		channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
671

672
673
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
		    /* Send data to SeedLink Server */
674
		    if(params.flag_slink) {
675
676
677
678
679
680
681
682
683
			/* TODO Set values */
			const int usec_correction = 0;
			const int timing_quality = 100;

			send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
				pd->pDataPtr, pd->nSamp);
		    }
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
684
685
686
687
688
	    if(pd->buffer) {
		free(pd->buffer);
		pd->buffer = NULL;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
689
690
	    // TODO
	    exitpdscondition = 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
691
692
693
694
695
696
697
698
699
	}

#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

700
701
702
703
704
705
706
707
708
709
710

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

711
712
713



714
    }
715

716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
#ifdef HAVE_LIBMSEED
	if(*msr_list_chan) {
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
		if(msr_list_chan[i_chan]) {
		    msr_free(&(msr_list_chan[i_chan]));
		}
	    }
	}
#endif

	if(channelListSeq) {
	    free(channelListSeq);
	}

	/* This has to be tha last */
	if(channelList_subset) {
	    free(channelList_subset);
	}

735
736

    return 0;
737
} /* End MAIN */
738
739
740
741
742





Matteo Quintiliani's avatar
Matteo Quintiliani committed
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
    nmxp_log(0, 0, "Program interrupted!\n");

    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

759

Matteo Quintiliani's avatar
Matteo Quintiliani committed
760
761
762
763
764
765
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

766

Matteo Quintiliani's avatar
Matteo Quintiliani committed
767
768
769
    if(channelList == NULL) {
	free(channelList);
    }
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786

#ifdef HAVE_LIBMSEED
    int i_chan;
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
787
788
789
790
791
792
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
793

Matteo Quintiliani's avatar
Matteo Quintiliani committed
794
795
796
797

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}