nmxptool.c 32 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1 2 3 4 5 6 7 8 9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.107 2008-01-16 10:14:59 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12 13
 */

14 15
#include "config.h"

16 17 18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
19
#include <errno.h>
20 21

#include <nmxp.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
22

23
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
24 25 26
#include <signal.h>
#endif

27 28 29 30
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

31
#include "nmxptool_getoptlong.h"
32 33

#ifdef HAVE_EARTHWORMOBJS
34
#include "nmxptool_ew.h"
35
#endif
36

37 38 39
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
40

41 42
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
43 44
#endif

45 46 47
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

Matteo Quintiliani's avatar
Matteo Quintiliani committed
48
#define GAP_TOLLERANCE 0.001
49

50 51 52
typedef struct {
    int significant;
    double last_time;
53
    time_t last_time_call_raw_stream;
54
    int32_t x_1;
55
    double after_start_time;
56
    NMXP_RAW_STREAM_DATA raw_stream_buffer;
57 58
} NMXPTOOL_CHAN_SEQ;

59 60

#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
61 62
static void clientShutdown(int sig);
static void clientDummyHandler(int sig);
63
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
64

Matteo Quintiliani's avatar
Matteo Quintiliani committed
65
static void save_channel_states();
66
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
67 68
static void flushing_raw_data_stream();

69
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
70
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
71 72
#endif

73
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
74
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
75 76
#endif

77
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
78 79

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
81 82


83 84 85 86 87
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
88
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
89
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
90 91 92
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

93 94 95 96 97 98 99 100

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif


101
int main (int argc, char **argv) {
102
    int32_t connection_time;
103
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
104
    int i_chan, cur_chan = 0;
105
    int to_cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106
    int exitpdscondition;
107
    int exitdapcondition;
108
    time_t timeout_for_channel;
109

Matteo Quintiliani's avatar
Matteo Quintiliani committed
110
    int span_interval = 10;
111 112
    int time_to_sleep = 0;

113 114
    char str_start_time[200];
    char str_end_time[200];
115
    char str_pd_time[200];
116 117 118

    NMXP_MSG_SERVER type;
    void *buffer;
119
    int32_t length;
120 121
    int ret;

122 123
    int recv_errno = 0;

124
    char filename[500];
125
    char station_code[20], channel_code[20], network_code[20];
126

Matteo Quintiliani's avatar
Matteo Quintiliani committed
127
    char cur_after_start_time_str[1024];
128 129
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
130

131

132 133 134 135 136 137 138
    NMXP_DATA_PROCESS *pd;

#ifdef HAVE_LIBMSEED
    /* Init mini-SEED variables */
    nmxp_data_seed_init(&data_seed);
#endif

139
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
140 141 142 143 144 145 146
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

    sa.sa_handler = clientDummyHandler;
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
147

Matteo Quintiliani's avatar
Matteo Quintiliani committed
148 149 150 151 152 153 154 155 156 157 158
    sa.sa_handler = clientShutdown;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
#endif

    /* Default is normal output */
159
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
160 161 162 163 164 165

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

166
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
167 168

#ifdef HAVE_EARTHWORMOBJS
169 170
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
171
	nmxptool_ew_configure(argv, &params);
172

Matteo Quintiliani's avatar
Matteo Quintiliani committed
173 174 175 176 177
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
#endif
178

179
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
180

Matteo Quintiliani's avatar
Matteo Quintiliani committed
181 182
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

183 184 185 186 187
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

188 189
	/* List available channels on server */
	if(params.flag_listchannels) {
190

191 192
	    nmxp_meta_chan_print(nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList));

193
	    return 1;
194

195 196 197 198
	} else if(params.flag_listchannelsnaqs) {

	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
	    nmxp_chan_print_channelList(channelList);
199
	    return 1;
200

201 202
	}
    }
203

204 205 206
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);

207 208
    nmxptool_log_params(&params);

209
    /* Get list of available channels and get a subset list of params.channels */
210 211 212 213 214 215 216 217
    if(params.start_time != 0.0  &&  params.end_time != 0.0) {
	/* From DataServer */
	nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList);
    } else {
	/* From NaqsServer */
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES);
    }

218
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
219 220 221

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
222
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
223
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
224
    } else {
225
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
226

227
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "Init channelListSeq.\n");
228 229 230 231 232 233

	/* init channelListSeq */
	channelListSeq = (NMXPTOOL_CHAN_SEQ *) malloc(sizeof(NMXPTOOL_CHAN_SEQ) * channelList_subset->number);
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
234
	    channelListSeq[i_chan].last_time_call_raw_stream = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
	    channelListSeq[i_chan].x_1 = 0;
236
	    channelListSeq[i_chan].after_start_time = DEFAULT_BUFFERED_TIME;
237
	    nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
238 239
	}

240 241 242 243
	if(params.statefile) {
	    load_channel_states(channelList_subset, channelListSeq);
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
244
#ifdef HAVE_LIBMSEED
245
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
246 247

	/* Init mini-SEED record list */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
249

250
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record for %s\n", channelList_subset->channel[i_chan].name);
251 252 253 254

	    msr_list_chan[i_chan] = msr_init(NULL);

	    /* Separate station_code and channel_code */
255
	    if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
256

257
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n", NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
258

259
		strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
260 261 262 263 264 265 266
		strcpy(msr_list_chan[i_chan]->station, station_code);
		strcpy(msr_list_chan[i_chan]->channel, channel_code);

		msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */

	    } else {
267
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels %s error in format!\n");
268 269 270
		return 1;
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
271 272
	}
#endif
273

274 275 276 277 278 279 280 281
    }

    /* Free the complete channel list */
    if(channelList) {
	free(channelList);
	channelList = NULL;
    }

282
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Starting comunication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
283

284
    /* TODO condition starting DAP or PDS */
285
    if( (params.start_time != 0.0   &&   params.end_time != 0.0)
286
	    || params.delay > 0
287
      ) {
288

289
	if(params.delay > 0) {
290
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
291 292 293
	    params.end_time = params.start_time + span_interval;
	}

294

295 296 297
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
298

299 300
	/* DAP Step 1: Open a socket */
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
301
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
302 303
	    return 1;
	}
304

305 306
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
307
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
308 309
	    return 1;
	}
310

311 312
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
313
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
314 315 316 317 318
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
319
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
320 321
	    return 1;
	}
322

323 324 325 326
	exitdapcondition = 1;

	while(exitdapcondition) {

327
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "start_time = %.4f - end_time = %.4f\n", params.start_time, params.end_time);
328

329 330 331
	    /* Start loop for sending requests */
	    i_chan=0;
	    request_SOCKET_OK = NMXP_SOCKET_OK;
332

333
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  i_chan < channelList_subset->number) {
334

335 336
		/* DAP Step 5: Send Data Request */
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
337

338
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
339

340 341 342 343
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
344

345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
		    if(params.flag_writefile) {
			/* Open output file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
360

361 362 363 364
			outfile = fopen(filename, "w");
			if(!outfile) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", filename);
			}
365
		    }
366 367

#ifdef HAVE_LIBMSEED
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
		    if(params.flag_writeseed) {
			/* Open output Mini-SEED file */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
				    channelList_subset->channel[i_chan].name,
				    str_start_time,
				    str_end_time);
			}
383

384 385 386 387
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
			}
388 389
		    }
#endif
390

391 392 393 394 395 396 397 398 399 400 401 402
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
403

404
		    }
405

406 407 408
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
409

410
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
411

412 413 414
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
415

416 417 418 419
			/* Log contents of last packet */
			if(params.flag_logdata) {
			    nmxp_data_log(pd);
			}
420

421 422
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
423

424 425 426 427 428 429 430 431 432 433
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
434
			    }
435
			}
436 437 438
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
			}
439

440
#ifdef HAVE_LIBMSEED
441 442 443 444
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
445
#endif
446

447
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
448 449 450 451
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
452 453
#endif

454 455 456 457 458 459 460 461
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
462
			}
463

464 465 466 467 468 469 470 471 472
			/* Store x_1 */
			if(pd->nSamp > 0) {
			    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
			}
			/* Free pd->buffer */
			if(pd->buffer) {
			    free(pd->buffer);
			    pd->buffer = NULL;
			}
473

474 475 476 477
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
			nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type);
		    }
478

479 480 481 482 483
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
484

485
#ifdef HAVE_LIBMSEED
486 487 488 489 490
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
491
#endif
492

493 494
		}
		i_chan++;
495
	    }
496
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
497

498 499 500 501 502 503 504 505 506 507
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
508
	    } else {
509
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
510
	    }
511

512

513
	} /* END while(exitdapcondition) */
514

515 516 517 518 519 520 521 522 523 524 525
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */


526

Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
    } else {
528

529 530
	if(params.stc == -1) {

531

532 533 534 535
	    if(params.flag_logdata) {
		p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	    }

536 537 538 539 540 541 542 543 544 545 546 547 548
#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
549 550 551 552 553 554 555

#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {
		p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	    }
#endif

556
	}
557

558 559 560 561 562 563 564 565 566
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
567 568
	}

569 570 571 572 573 574 575 576 577 578 579 580 581
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}

	/* Get a subset of channel from arguments */
582
	channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
583 584 585


	/* PDS Step 4: Send a Request Pending (optional) */
586 587


588 589 590 591 592 593
	/* PDS Step 5: Send AddChannels */
	/* Request Data */
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);

	/* PDS Step 6: Repeat until finished: receive and handle packets */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
594 595 596 597
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
598
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
599 600 601

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
602
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not to open file %s!", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
603
	    } else {
604
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n", data_seed.filename_mseed);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
605 606 607 608
	    }
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
609 610 611
	// TODO
	exitpdscondition = 1;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
612 613 614 615 616 617
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_attach();
	}
#endif

618
	skip_current_packet = 0;
619

Matteo Quintiliani's avatar
Matteo Quintiliani committed
620
	while(exitpdscondition) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
621

622
	    /* Process Compressed or Decompressed Data */
623
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
624

625
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
626 627 628
		// TODO
		exitpdscondition = 1;
	    } else {
629
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
630 631 632 633 634

#ifdef HAVE_EARTHWORMOBJS
		nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA);
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
635 636
		exitpdscondition = 0;
	    }
637

638 639 640 641 642
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
643
	    /* Log contents of last packet */
644 645 646
	    if(params.flag_logdata) {
		nmxp_data_log(pd);
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
647

648
	    skip_current_packet = 0;
649
	    if(pd &&
650
		    (params.statefile  ||  params.buffered_time)
651 652 653 654 655 656 657 658 659 660 661 662 663
	      )	{
		if(params.statefile && channelListSeq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelListSeq[cur_chan].after_start_time;
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n", cur_chan, cur_after_start_time, cur_after_start_time_str);
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
664 665 666 667
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
668
			    pd->time = cur_after_start_time;
669 670 671 672 673 674 675 676 677 678 679 680 681
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

	    if(!skip_current_packet) {

682 683
		/* Manage Raw Stream */
		if(params.stc == -1) {
684

685 686 687 688 689
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {
			nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
			channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
		    }
690

691 692 693 694 695 696 697 698 699 700 701 702 703 704
		    /* Check timeout for other channels */
		    if(params.timeoutrecv > 0) {
			exitpdscondition = 1;
			to_cur_chan = 0;
			while(to_cur_chan < channelList_subset->number) {
			    timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
			    if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
				    && timeout_for_channel >= params.timeoutrecv) {
				nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout for channel %s (%d sec.)\n",
					channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
				nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
				channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
			    }
			    to_cur_chan++;
705 706
			}
		    }
707 708

		} else {
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724

		    if(pd) {
			/* Management of gaps */
			if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].significant = 1;
			} else {
			    if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
				if(nmxptool_check_and_log_gap(pd->time, channelListSeq[cur_chan].last_time, GAP_TOLLERANCE, pd->station, pd->channel)) {
				    channelListSeq[cur_chan].x_1 = 0;
				    nmxp_data_to_str(str_pd_time, pd->time);
				    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "%s.%s x0 set to zero at %s!\n", pd->station, pd->channel, str_pd_time);
				}
			    }
			}
			if(channelListSeq[cur_chan].significant && pd->nSamp > 0) {
			    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
725
			}
726

Matteo Quintiliani's avatar
Matteo Quintiliani committed
727 728

#ifdef HAVE_LIBMSEED
729 730 731 732
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
733 734
#endif

735
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
736 737 738 739
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
740
#endif
741 742
		    }
		}
743
	    } /* End skip_current_packet condition */
744

745
	    if(pd) {
746 747 748 749 750 751 752 753 754
		/* Store x_1 */
		if(pd->nSamp > 0) {
		    channelListSeq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];
		}
		/* Free pd->buffer */
		if(pd->buffer) {
		    free(pd->buffer);
		    pd->buffer = NULL;
		}
755
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
756

Matteo Quintiliani's avatar
Matteo Quintiliani committed
757 758 759 760 761 762
#ifdef HAVE_EARTHWORMOBJS
	    if(params.ew_configuration_file) {

		/* Check if we are being asked to terminate */
		if( nmxptool_ew_check_flag_terminate() ) {
		    logit ("t", "nmxptool terminating on request\n");
763
		    nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
764 765 766 767 768 769 770 771 772
		    exitpdscondition = 0;
		}

		/* Check if we need to send heartbeat message */
		nmxptool_ew_send_heartbeat_if_needed();

	    }
#endif

773
	} /* End main PDS loop */
774

Matteo Quintiliani's avatar
Matteo Quintiliani committed
775 776
	/* Flush raw data stream for each channel */
	flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
777
	save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
778

Matteo Quintiliani's avatar
Matteo Quintiliani committed
779 780 781 782 783 784
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    nmxptool_ew_detach();
	}
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
785 786 787 788 789 790 791
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	    /* Close output Mini-SEED file */
	    fclose(data_seed.outfile_mseed);
	}
#endif

792 793 794 795 796 797 798 799 800 801 802

	/* PDS Step 7: Send Terminate Subscription */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	/* PDS Step 8: Close the socket */
	nmxp_closeSocket(naqssock);

	/* *********************************************************** */
	/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* *********************************************************** */

803 804


805
    }
806

807
#ifdef HAVE_LIBMSEED
808 809 810 811
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
812 813
	    }
	}
814
    }
815 816
#endif

817 818 819
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
    }
820

821 822 823
    if(channelListSeq) {
	free(channelListSeq);
    }
824

825 826 827 828
    /* This has to be tha last */
    if(channelList_subset) {
	free(channelList_subset);
    }
829

830
    return 0;
831
} /* End MAIN */
832 833


Matteo Quintiliani's avatar
Matteo Quintiliani committed
834
static void save_channel_states() {
835 836 837 838
    int to_cur_chan;
    char last_time_str[30];
    char raw_last_sample_time_str[30];
    char state_line_str[1000];
839
    FILE *fstatefile = NULL;
840

841
    if(params.statefile) {
842
	fstatefile = fopen(params.statefile, "w");
843
	if(fstatefile == NULL) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
844 845 846
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", params.statefile);
	} else {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Writing channel states into %s!\n", params.statefile);
847
	}
848

849 850 851 852 853 854
	/* Save state for each channel */
	// if(params.stc == -1)
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
	    nmxp_data_to_str(last_time_str, channelListSeq[to_cur_chan].last_time);
	    nmxp_data_to_str(raw_last_sample_time_str, channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time);
855
	    sprintf(state_line_str, "%s %s %s",
856
		    channelList_subset->channel[to_cur_chan].name,
857 858
		    last_time_str,
		    raw_last_sample_time_str
859 860 861
		   );
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%s\n", state_line_str);
	    if(fstatefile) {
862
		fprintf(fstatefile, "%s\n", state_line_str);
863 864
		if( (channelListSeq[to_cur_chan].last_time != 0) || (channelListSeq[to_cur_chan].raw_stream_buffer.last_sample_time != -1.0) ) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s\n", state_line_str);
865 866
		} else {
		    /* Do nothing */
867
		}
868 869 870
	    }
	    to_cur_chan++;
	}
871
	if(fstatefile) {
872
	    fclose(fstatefile);
873 874 875 876
	}
    }
}

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
void load_channel_states(NMXP_CHAN_LIST_NET *chan_list, NMXPTOOL_CHAN_SEQ *chan_list_seq) {
    FILE *fstatefile = NULL;
#define MAXSIZE_LINE 2048
    char line[MAXSIZE_LINE];
    char s_chan[128];
    char s_noraw_time_s[128];
    char s_rawtime_s[128];
    double s_noraw_time_f_calc, s_rawtime_f_calc;
    int cur_chan;
    int n_scanf;
    NMXP_TM_T tmp_tmt;

    if(params.statefile) {
	fstatefile = fopen(params.statefile, "r");
	if(fstatefile == NULL) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Unable to write channel states into %s!\n", params.statefile);
	} else {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Loading channel states from %s!\n", params.statefile);
	    while(fgets(line, MAXSIZE_LINE, fstatefile) != NULL) {
		s_chan[0] = 0;
		s_noraw_time_s[0] = 0;
		s_rawtime_s[0] = 0;
		n_scanf = sscanf(line, "%s %s %s", s_chan, s_noraw_time_s, s_rawtime_s); 

		s_noraw_time_f_calc = DEFAULT_BUFFERED_TIME;
902
		s_rawtime_f_calc = DEFAULT_BUFFERED_TIME;
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
		if(n_scanf == 3) {
		    if(nmxp_data_parse_date(s_noraw_time_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_noraw_time_s); 
		    } else {
			s_noraw_time_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		    if(nmxp_data_parse_date(s_rawtime_s, &tmp_tmt) == -1) {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error parsing %s\n", s_rawtime_s); 
		    } else {
			s_rawtime_f_calc = nmxp_data_tm_to_time(&tmp_tmt);
		    }
		}
		nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_PACKETMAN, "%d %-14s %16.4f %s %16.4f %s\n", n_scanf, s_chan, s_noraw_time_f_calc, s_noraw_time_s, s_rawtime_f_calc, s_rawtime_s); 
		cur_chan = 0;
		while(cur_chan < chan_list->number  &&  strcasecmp(s_chan, chan_list->channel[cur_chan].name) != 0) {
		    cur_chan++;
		}
		if(cur_chan < chan_list->number) {
		    if( s_rawtime_f_calc != DEFAULT_BUFFERED_TIME  && s_rawtime_f_calc != 0.0 ) {
			chan_list_seq[cur_chan].after_start_time = s_rawtime_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_rawtime_s); 
		    } else if( s_noraw_time_f_calc != DEFAULT_BUFFERED_TIME ) {
			chan_list_seq[cur_chan].after_start_time = s_noraw_time_f_calc;
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s starting from %s.\n", s_chan, s_noraw_time_s); 
		    } else {
			nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "For channel %s there is not valid start_time.\n", s_chan); 
		    }
		}
	    }
	    fclose(fstatefile);
	}
    }
}

937

Matteo Quintiliani's avatar
Matteo Quintiliani committed
938 939 940 941 942 943 944
static void flushing_raw_data_stream() {
    int to_cur_chan;

    /* Flush raw data stream for each channel */
    if(params.stc == -1) {
	to_cur_chan = 0;
	while(to_cur_chan < channelList_subset->number) {
945
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Flushing data for channel %s\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
946 947 948 949 950 951
		    channelList_subset->channel[to_cur_chan].name);
	    nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
	    to_cur_chan++;
	}
    }
}
952

953
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
954 955
/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
956

957
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Program interrupted!\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
958

Matteo Quintiliani's avatar
Matteo Quintiliani committed
959
    flushing_raw_data_stream();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
960
    save_channel_states();
Matteo Quintiliani's avatar
Matteo Quintiliani committed
961

Matteo Quintiliani's avatar
Matteo Quintiliani committed
962 963 964 965 966
    if(params.flag_writefile  &&  outfile) {
	/* Close output file */
	fclose(outfile);
    }

967
#ifdef HAVE_EARTHWORMOBJS
968 969 970
    if(params.ew_configuration_file) {
	nmxptool_ew_detach();
    }
971 972
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
973 974 975 976 977 978 979
#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
	/* Close output Mini-SEED file */
	fclose(data_seed.outfile_mseed);
    }
#endif

980

Matteo Quintiliani's avatar
Matteo Quintiliani committed
981 982 983 984 985 986
    /* PDS Step 7: Send Terminate Subscription */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

    /* PDS Step 8: Close the socket */
    nmxp_closeSocket(naqssock);

987

Matteo Quintiliani's avatar
Matteo Quintiliani committed
988 989 990

    /* Free the complete channel list */
    if(channelList) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
991
	free(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
992
	channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
993
    }
994

995 996
    int i_chan = 0;

997 998 999 1000 1001 1002 1003 1004 1005 1006
#ifdef HAVE_LIBMSEED
    if(*msr_list_chan) {
	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(msr_list_chan[i_chan]) {
		msr_free(&(msr_list_chan[i_chan]));
	    }
	}
    }
#endif

1007
    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
1008
	nmxp_raw_stream_free(&(channelListSeq[i_chan].raw_stream_buffer));
1009 1010
    }

1011 1012 1013 1014 1015
    if(channelListSeq) {
	free(channelListSeq);
    }

    /* This has to be the last */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1016 1017 1018 1019 1020 1021
    if(channelList_subset == NULL) {
	free(channelList_subset);
    }

    exit( sig );
} /* End of clientShutdown() */
1022

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1023 1024 1025 1026

/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1027

1028 1029
#endif /* HAVE_WINDOWS_H */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1030 1031 1032



1033
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1034 1035 1036 1037 1038
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

1039
	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1040 1041

    } else {
1042
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found in channelList_subset!\n", pd->key);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1043 1044 1045
    }
    return ret;
}
1046
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1047

1048 1049
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd) {
    int ret = 0;
1050 1051
    char str_time[200];
    nmxp_data_to_str(str_time, pd->time);
1052

1053
    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_ANY, "Process %s.%s.%s %2d %d %d %s %dpts lat. %.1fs\n",
1054
	    pd->network,
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1055 1056
	    pd->station,
	    pd->channel,
1057 1058
	    pd->packet_type,
	    pd->seq_no,
1059
	    pd->oldest_seq_no,
1060 1061
	    str_time,
	    pd->nSamp,
1062
	    nmxp_data_latency(pd)
1063 1064 1065 1066 1067
	    );

    return ret;
}

1068

1069
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1070 1071 1072 1073 1074 1075 1076 1077
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
    /* TODO Set values */
    const int usec_correction = 0;
    const int timing_quality = 100;

    return send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
	    pd->pDataPtr, pd->nSamp);
}
1078
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1079 1080 1081 1082



int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
1083 1084
    char str_time1[200];
    char str_time2[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1085 1086
    int ret = 0;
    double gap = time1 - time2 ;
1087 1088
    nmxp_data_to_str(str_time1, time1);
    nmxp_data_to_str(str_time2, time2);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1089
    if(gap > gap_tollerance) {
1090
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_GAP, "Gap %.2f sec. for %s.%s from %s to %s!\n", gap, station, channel, str_time2, str_time1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1091 1092
	ret = 1;
    } else if (gap < -gap_tollerance) {
1093
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_GAP, "Overlap %.2f sec. for %s.%s from %s to %s!\n", gap, station, channel, str_time1, str_time2);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1094 1095 1096 1097
	ret = 1;
    }
    return ret;
}
1098

Matteo Quintiliani's avatar
Matteo Quintiliani committed
1099
void nmxptool_str_time_to_filename(char *str_time) {
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
    int i;
    for(i=0; i<strlen(str_time); i++) {
	if(   (str_time[i] >= 'A'  &&  str_time[i] <= 'Z')
		|| (str_time[i] >= 'a'  &&  str_time[i] <= 'z')
		|| (str_time[i] >= '0'  &&  str_time[i] <= '9')
		|| (str_time[i] == '_')
	  ) {
	    /* Do nothing */
	} else {
	    str_time[i] = '.';
	}
    }
}