nmxptool.c 15.6 KB
Newer Older
1
2
3
4
5
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <nmxp.h>
6
#include "config.h"
7
8
#include "nmxptool_getoptlong.h"

9
10
11
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
12

13
14
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
15
16
#endif

17

Matteo Quintiliani's avatar
Matteo Quintiliani committed
18
19
20
typedef struct {
    int significant;
    double last_time;
21
22
} NMXPTOOL_CHAN_SEQ;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
23

24
25
26
27
28
29
30
31
32
33
#define GAP_TOLLERANCE 0.001

void nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %d to %d!\n", gap, station, channel, time1, time2);
    } else if (gap < -gap_tollerance) {
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %d to %d!\n", gap, station, channel, time2, time1);
    }
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
36
37
38
39
40
41
42
43
44

static void clientShutdown(int sig);
static void clientDummyHandler(int sig);


/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST *channelList_subset = NULL;
45
46
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
47
48
49
#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
50
MSRecord *msr_list_chan[MAX_N_CHAN];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
52
#endif

53
54
55
56

int main (int argc, char **argv) {
    uint32_t connection_time;
    int request_SOCKET_OK;
57
    int i_chan, cur_chan;
58
59
60
61
62
63
64

    NMXP_MSG_SERVER type;
    void *buffer;
    uint32_t length;
    int ret;

    char filename[500];
65
    char station_code[20], channel_code[20];
66
67
68
69
70
71
72
73

    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
74
75
76
77
78
79
80
81
#ifndef WIN32
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
82

Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
84
85
86
87
88
89
90
91
92
93
94
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif


    /* Default is normal output */
95
96
97
98
99
100
101
102
103
104
105
106
    nmxp_log(-1, 0);

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

    /* Check consistency of params */
    if(nmxptool_check_params(&params) != 0) {
	return 1;
    }

107
108
109
110
    if(params.flag_verbose) {
	nmxp_log(-1, 2);
    }

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    /* List available channels on server */
    if(params.flag_listchannels) {

	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	nmxp_chan_print_channelList(channelList);

	return 1;
    }

    /* Get list of available channels and get a subset list of params.channels */
    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
	nmxp_log(1, 0, "Channels not found!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
128
129
    } else {
	nmxp_chan_print_channelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
130

131
132
133
134
135
136
137
138
139
	nmxp_log(0, 1, "Init channelListSeq.\n");

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
140
#ifdef HAVE_LIBMSEED
141
142
143
	nmxp_log(0, 1, "Init mini-SEED record list.\n");

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
144
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

	    nmxp_log(0, 1, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {

		nmxp_log(0, 1, "%s.%s.%s\n", (params.network)? params.network : DEFAULT_NETWORK, station_code, channel_code);

		strcpy(msr_list_chan[i_chan]->network, (params.network)? params.network : DEFAULT_NETWORK);
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
		nmxp_log(1, 0, "Channels %s error in format!\n");
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
167
168
	}
#endif
169

170
171
172
173
174
175
176
177
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
178
179
    nmxp_log(0, 1, "Starting comunication.\n");

180

181
    /* TODO condition starting DAP or PDS */
182
    if(params.start_time != 0   &&   params.end_time != 0) {
183

184
185
186



187
188
189
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
190

191
192
193
194
195
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
	    nmxp_log(1, 0, "Error opening socket!\n");
	    return 1;
	}
196

197
198
199
200
201
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error reading connection time from server!\n");
	    return 1;
	}
202

203
204
205
206
207
208
209
210
211
212
213
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error sending connect request!\n");
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
	    nmxp_log(1, 0, "Error waiting Ready message!\n");
	    return 1;
	}
214

215
216
217
	/* Start loop for sending requests */
	i_chan=0;
	request_SOCKET_OK = NMXP_SOCKET_OK;
218

219
	while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
220

221
222
	    /* DAP Step 5: Send Data Request */
	    request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
223

224
	    if(request_SOCKET_OK == NMXP_SOCKET_OK) {
225

226
227
		if(params.flag_writefile) {
		    /* Open output file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
228
229
230
231
232
		    sprintf(filename, "%s.%s.%d.%d.%d.nmx",
			    (params.network)? params.network : DEFAULT_NETWORK,
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time, params.end_time);
233

234
235
236
237
		    outfile = fopen(filename, "w");
		    if(!outfile) {
			nmxp_log(1, 0, "Can not to open file %s!", filename);
		    }
238
239
240
		}

#ifdef HAVE_LIBMSEED
241
242
		if(params.flag_writeseed) {
		    /* Open output Mini-SEED file */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
243
244
245
246
247
248
		    sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed",
			    (params.network)? params.network : DEFAULT_NETWORK,
			    channelList_subset->channel[i_chan].name,
			    channelList_subset->channel[i_chan].key,
			    params.start_time,
			    params.end_time);
249

250
251
252
253
254
255
		    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
		    if(!data_seed.outfile_mseed) {
			nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
		    }
		}
#endif
256

257
258
259
		if(params.flag_writefile  &&  outfile) {
		    /* Compute SNCL line */

260
261
262
263
264
265
266
267
		    /* Separate station_code_old_way and channel_code_old_way */
		    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code)) {
			/* Write SNCL line */
			fprintf(outfile, "%s.%s.%s.%s\n",
				station_code,
				(params.network)? params.network : DEFAULT_NETWORK,
				channel_code,
				(params.location)? params.location : "");
268
		    }
269
270
271

		}

272
273
274
		/* DAP Step 6: Receive Data until receiving a Ready message */
		ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
275

276
		while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
277

278
		    /* Process a packet and return value in NMXP_DATA_PROCESS structure */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
279
280
281
		    pd = nmxp_processCompressedData(buffer, length, channelList_subset);

		    /* Log contents of last packet */
282
283
284
		    if(params.flag_logdata) {
			nmxp_data_log(pd);
		    }
285

286
287
288
289
290
291
292
293
294
		    /* Management of gaps */
		    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
		    if(!channelListSeq[cur_chan].significant) {
			channelListSeq[cur_chan].significant = 1;
		    } else {
			nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel);
		    }
		    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);

295
#ifdef HAVE_LIBMSEED
296
297
		    /* Write Mini-SEED record */
		    if(params.flag_writeseed) {
298
299
300
301
302
303
304
			if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

			    nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan]);

			} else {
			    nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
			}
305
		    }
306
307
#endif

308
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
309
310
311
312
313
		    /* Send data to SeedLink Server */
		    if(params.flag_writeseedlink) {
			/* TODO Set values */
			const int usec_correction = 0;
			const int timing_quality = 100;
314

315
316
317
			send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
				pd->pDataPtr, pd->nSamp);
		    }
318
319
#endif

320
321
322
323
324
325
326
327
328
		    if(params.flag_writefile  &&  outfile) {
			/* Write buffer to the output file */
			if(outfile && buffer && length > 0) {
			    int length_int = length;
			    nmxp_data_swap_4b((int32_t *) &length_int);
			    fwrite(&length_int, sizeof(length_int), 1, outfile);
			    fwrite(buffer, length, 1, outfile);
			}
		    }
329

Matteo Quintiliani's avatar
Matteo Quintiliani committed
330
331
332
		    if(pd->buffer) {
			free(pd->buffer);
			pd->buffer = NULL;
333
		    }
334
335
336
337

		    /* Receive Data */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
		    nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
338
339
		}

340
341
342
		if(params.flag_writefile  &&  outfile) {
		    /* Close output file */
		    fclose(outfile);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
343
		    outfile = NULL;
344
345
		}

346
347
348
349
#ifdef HAVE_LIBMSEED
		if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
		    /* Close output Mini-SEED file */
		    fclose(data_seed.outfile_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
350
		    data_seed.outfile_mseed = NULL;
351
352
		}
#endif
353
354

	    }
355
356
357
	    i_chan++;
	}
	/* DAP Step 7: Repeat steps 5 and 6 for each data request */
358

359
360
361
362
363
364
365
366
367
368
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

369
370
371



372
    } else {
373

374
375
376
377




378
379
380
381
382
383
384
385
386
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
387
388
	}

389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);


	/* PDS Step 4: Send a Request Pending (optional) */
406
407


408
409
410
411
412
413
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
		    (params.network)? params.network : DEFAULT_NETWORK);

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
		nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
	    } else {
		nmxp_log(0, 1, "Opened file %s!\n", data_seed.filename_mseed);
	    }
	}
#endif

429
430
	while(1) {
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
431
432
433
	    pd = nmxp_receiveData(naqssock, channelList_subset);

	    /* Log contents of last packet */
434
435
436
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
437

438
439
440
441
442
443
444
445
446
	    /* Management of gaps */
	    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    if(!channelListSeq[cur_chan].significant) {
		channelListSeq[cur_chan].significant = 1;
	    } else {
		nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel);
	    }
	    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
447
448
449
450

#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
451
		if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
452

453
		    nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
454
455
456
457
458
459
460

		} else {
		    nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
		}
	    }
#endif

461
462
463
464
465
466
467
468
469
470
471
472
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
		    /* Send data to SeedLink Server */
		    if(params.flag_writeseedlink) {
			/* TODO Set values */
			const int usec_correction = 0;
			const int timing_quality = 100;

			send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
				pd->pDataPtr, pd->nSamp);
		    }
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
473
474
475
476
477
478
479
480
481
482
483
484
485
486
	    if(pd->buffer) {
		free(pd->buffer);
		pd->buffer = NULL;
	    }

	}

#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

487
488
489
490
491
492
493
494
495
496
497

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

498
499
500



501
    }
502

503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
#ifdef HAVE_LIBMSEED
	if(*msr_list_chan) {
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
		if(msr_list_chan[i_chan]) {
		    msr_free(&(msr_list_chan[i_chan]));
		}
	    }
	}
#endif

	if(channelListSeq) {
	    free(channelListSeq);
	}

	/* This has to be tha last */
	if(channelList_subset) {
	    free(channelList_subset);
	}

522
523

    return 0;
524
} /* End MAIN */
525
526
527
528
529





Matteo Quintiliani's avatar
Matteo Quintiliani committed
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
    nmxp_log(0, 0, "Program interrupted!\n");

    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

546

Matteo Quintiliani's avatar
Matteo Quintiliani committed
547
548
549
550
551
552
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

553

Matteo Quintiliani's avatar
Matteo Quintiliani committed
554
555
556
    if(channelList == NULL) {
	free(channelList);
    }
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573

#ifdef HAVE_LIBMSEED
    int i_chan;
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
574
575
576
577
578
579
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
580

Matteo Quintiliani's avatar
Matteo Quintiliani committed
581
582
583
584

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}