nmxptool.c 44 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1
2
3
4
5
6
7
8
9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.216 2009-08-17 08:19:46 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
15
#include "config.h"

16
17
18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
#include <signal.h>
21

22
23
24
#include <sys/stat.h>
#include <unistd.h>

25
26
27
28
#include <nmxp.h>
#include "nmxptool_getoptlong.h"
#include "nmxptool_chanseq.h"
#include "nmxptool_sigcondition.h"
29
#include <nmxptool_listen.h>
30

31
32
33
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#else
34
#warning Requests of channels could not be efficient because they do not use a separate thread. 
35
36
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
37
38
39
40
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

41
#ifdef HAVE_EARTHWORMOBJS
42
#include "nmxptool_ew.h"
43
#endif
44

45
46
47
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
48

49
50
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
51
52
#endif

53
54
#define TIMES_FLOW_EXIT 100

55
56
int if_dap_condition_only_one_time = 0;

57
58
59
60
61
62
63
64
65
#define DAP_CONDITION(params_struct) (params_struct.start_time != 0.0 || params_struct.delay > 0)

#define EXIT_CONDITION_PRELIM (!nmxptool_sigcondition_read()  &&  !ew_check_flag_terminate  &&  !if_dap_condition_only_one_time)
#ifdef HAVE_LIBMSEED
#define EXIT_CONDITION ( EXIT_CONDITION_PRELIM  &&  data_seed.err_general==0 )
#else
#define EXIT_CONDITION ( EXIT_CONDITION_PRELIM )
#endif
		    
66

67
68
69
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

70
71
static void ShutdownHandler(int sig);
static void AlarmHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
72

Matteo Quintiliani's avatar
Matteo Quintiliani committed
73
74
75
76
77
void flushing_raw_data_stream();

void *nmxptool_print_info_raw_stream(void *arg);
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
78

79
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
81
82
int nmxptool_log_miniseed(const char *s);
int nmxptool_logerr_miniseed(const char *s);
83
84
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
85
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
86
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
87
88
#endif

89
90
91
92
93
94
95
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
void nmxptool_msr_send_mseed_handler (char *record, int reclen, void *handlerdata);
int nmxptool_msr_send_mseed(NMXP_DATA_PROCESS *pd);
#endif
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
96
97
#ifdef HAVE_PTHREAD_H
pthread_t thread_request_channels;
98
pthread_attr_t attr_request_channels;
99
void *status_thread;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
100
101
void *p_nmxp_sendAddTimeSeriesChannel(void *arg);
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
102

103
104
105
106
#ifdef HAVE_PTHREAD_H
pthread_t thread_socket_listen;
pthread_attr_t attr_socket_listen;
void *status_thread_socket_listen;
107
int already_listen = 0;
108
109
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
110

111
112
113
114
115
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
116
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
117
NMXP_CHAN_LIST_NET *channelList_subset_waste = NULL;
118
NMXPTOOL_CHAN_SEQ *channelList_Seq = NULL;
119
NMXP_META_CHAN_LIST *meta_channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
120
121
122
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

123
124
125
126
127
128
129

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif

130
int ew_check_flag_terminate = 0;
131

132
int main (int argc, char **argv) {
133
    int32_t connection_time;
134
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
135
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
136
    int to_cur_chan = 0;
137
    int request_chan;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
138
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
139
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
140
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
141

Matteo Quintiliani's avatar
Matteo Quintiliani committed
142
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
143
144
    int time_to_sleep = 0;

145
146
    char str_start_time[200] = "";
    char str_end_time[200] = "";
147
148

    NMXP_MSG_SERVER type;
149
    void *buffer = NULL;
150
    int32_t length;
151
    int ret;
152
    int main_ret = 0;
153

154
    int pd_null_count = 0;
155
    int timeoutrecv_warning = 300; /* 5 minutes */
156

157
158
    int times_flow = 0;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
159
160
    int recv_errno = 0;

161
162
    char filename[500] = "";
    char station_code[20] = "", channel_code[20] = "", network_code[20] = "";
163

Matteo Quintiliani's avatar
Matteo Quintiliani committed
164
    char cur_after_start_time_str[1024];
165
166
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
167

168
    double default_start_time = 0.0;
169
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
170

171
    NMXP_DATA_PROCESS *pd = NULL;
172

Matteo Quintiliani's avatar
Matteo Quintiliani committed
173
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
174
175
176
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

177
    sa.sa_handler = AlarmHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
178
179
180
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
181

182
    sa.sa_handler = ShutdownHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
183
184
185
186
187
188
189
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
190
#else
Matteo Quintiliani's avatar
Matteo Quintiliani committed
191
192
    /* Signal handling, use function signal() */

193
    /*
194
    signal(SIGALRM, AlarmHandler);
195
    */
196
197

    signal(SIGINT, ShutdownHandler);
198
    /*
199
    signal(SIGQUIT, ShutdownHandler);
200
    */
201
202
    signal(SIGTERM, ShutdownHandler);

203
    /*
204
205
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
206
    */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
207
208
#endif

209
    nmxptool_sigcondition_init();
210

Matteo Quintiliani's avatar
Matteo Quintiliani committed
211
    /* Default is normal output */
212
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
213
214
215
216
217
218

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

219
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
220
221

#ifdef HAVE_EARTHWORMOBJS
222

223
224
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
225
	nmxptool_ew_configure(argv, &params);
226

Matteo Quintiliani's avatar
Matteo Quintiliani committed
227
228
229
230
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
231

232
233
234
#else
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Earthworm feature has not been enabled in compilation!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
#endif
236

237
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
238

Matteo Quintiliani's avatar
Matteo Quintiliani committed
239
240
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

241
242
243
244
245
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

246
247
	/* List available channels on server */
	if(params.flag_listchannels) {
248

249
250
251
252
	    meta_channelList = nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read);

	    /* nmxp_meta_chan_print(meta_channelList); */
	    nmxp_meta_chan_print_with_match(meta_channelList, params.channels);
253

254
	    return 1;
255

256
257
	} else if(params.flag_listchannelsnaqs) {

258
	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
259
260
261
262

	    /* nmxp_chan_print_channelList(channelList); */
	    nmxp_chan_print_channelList_with_match(channelList, params.channels);

263
	    return 1;
264

265
266
	}
    }
267

268
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
269
270
271
    if(params.verbose_level != DEFAULT_VERBOSE_LEVEL) {
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);
    }
272

273
#ifdef HAVE_LIBMSEED
274
    data_seed.err_general = 0;
275
    if(params.type_writeseed) {
276
	ms_loginit((void*)&nmxptool_log_miniseed, NULL, (void*)&nmxptool_logerr_miniseed, "error: ");
277
	/* Init mini-SEED variables */
278
279
280
	nmxp_data_seed_init(&data_seed, params.outdirseed,
		CURRENT_NETWORK,
		(params.type_writeseed == TYPE_WRITESEED_BUD)? NMXP_TYPE_WRITESEED_BUD : NMXP_TYPE_WRITESEED_SDS);
281
282
283
    }
#endif

284
285
286
287
    nmxptool_log_params(&params);

    if(params.stc == -1) {

288
#ifndef HAVE_WINDOWS_H
289
	p_func_pd[n_func_pd++] = nmxptool_listen_print_seq_no;
290
#endif
291

292
293
294
295
296
297
	if(params.flag_logdata) {
	    p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	}

#ifdef HAVE_LIBMSEED
	/* Write Mini-SEED record */
298
	if(params.type_writeseed) {
299
300
301
302
303
304
305
306
307
308
309
	    p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	}
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slink) {
	    p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	}
#endif

310
311
312
313
314
315
316
317
318
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slinkms) {
	    p_func_pd[n_func_pd++] = nmxptool_msr_send_mseed;
	}
#endif
#endif

319
320
321
322
323
324
325
326
327
328
329
330
331
332
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	}
#endif

    }

#ifdef HAVE_EARTHWORMOBJS
    if(params.ew_configuration_file) {
	nmxptool_ew_attach();
    }
#endif

333

334
    /* Exit only on request */
335
    while(EXIT_CONDITION) {
336

337
    NMXP_MEM_PRINT_PTR(0, 1);
338

339
    /* Get list of available channels and get a subset list of params.channels */
340
    if( DAP_CONDITION(params) ) {
341
	if_dap_condition_only_one_time = 1;
342
	/* From DataServer */
343
	if(!nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES,
344
		    params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read)) {
345
346
	    return -1;
	}
347
348
    } else {
	/* From NaqsServer */
349
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
350
351
    }

352
353
354
355
356
    if(!channelList) {
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channel list has not been received!\n");
	return 1;
    }

357
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
358
359
360
    
    /* Free the complete channel list */
    if(channelList) {
361
	NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
362
363
	channelList = NULL;
    }
364
365
366

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
367
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
368
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
369
    } else {
370
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
371

372
	nmxptool_chanseq_init(&channelList_Seq, channelList_subset->number, DEFAULT_BUFFERED_TIME, params.max_tolerable_latency, params.timeoutrecv);
373

Matteo Quintiliani's avatar
Matteo Quintiliani committed
374
#ifdef HAVE_LIBMSEED
375
	if(params.type_writeseed  ||  params.flag_slinkms) {
376
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
377

378
379
	    /* Init mini-SEED record list */
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
380

381
382
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA,
			"Init mini-SEED record for %s\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
383

384
		msr_list_chan[i_chan] = msr_init(NULL);
385

386
387
		/* Separate station_code and channel_code */
		if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
388

389
390
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n",
			    NMXP_LOG_STR(NETCODE_OR_CURRENT_NETWORK), NMXP_LOG_STR(station_code), NMXP_LOG_STR(channel_code));
391

392
393
394
		    strncpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK, 11);
		    strncpy(msr_list_chan[i_chan]->station, station_code, 11);
		    strncpy(msr_list_chan[i_chan]->channel, channel_code, 11);
395

396
397
		    msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		    msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */
398

399
400
401
402
403
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL,
			    "Channels %s error in format!\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
		    return 1;
		}
404

405
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
406
407
	}
#endif
408

409
410
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
411
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin communication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
412

413
    times_flow = 0;
414
    recv_errno = 0;
415

416
417
418
419
420
    while(times_flow < 2  &&  recv_errno == 0 && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
	    &&  data_seed.err_general==0
#endif
	    ) {
421

422
	if(params.statefile) {
423
	    nmxptool_chanseq_load_states(channelList_subset, channelList_Seq, params.statefile);
424
425
	}

426
427
428
429
430
431
432
433
434
435
436
	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
437
    /* Condition for starting DAP or PDS */
438
439
    if( DAP_CONDITION(params) ||
	    (times_flow == 0  &&  params.statefile && params.max_data_to_retrieve > 0 && params.interval == DEFAULT_INTERVAL_INFINITE) ) {
440
441

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
442

443
444
445
446
447
448
449
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
450
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
451
452
453
	    params.end_time = params.start_time + span_interval;
	}

454

455
456
457
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
458

459
	/* DAP Step 1: Open a socket */
460
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap, nmxptool_sigcondition_read)) == NMXP_SOCKET_ERROR) {
461
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
462
463
	    return 1;
	}
464

465
466
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
467
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
468
469
	    return 1;
	}
470

471
472
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
473
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
474
475
476
477
478
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
479
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
480
481
	    return 1;
	}
482

483
484
	exitdapcondition = 1;

485
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - params.max_data_to_retrieve;
486

487
488
489
490
491
	while(exitdapcondition  &&  !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
		&&  data_seed.err_general==0
#endif
	     ) {
492

493
	    /* Start loop for sending requests */
494
	    request_chan=0;
495
	    request_SOCKET_OK = NMXP_SOCKET_OK;
496

497
	    /* For each channel */
498
499
500
501
502
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  request_chan < channelList_subset->number  &&  exitdapcondition && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
		    &&  data_seed.err_general==0
#endif
		    ) {
503

504
		if(params.statefile) {
505
506
		    if(channelList_Seq[request_chan].after_start_time > 0) {
			params.start_time = channelList_Seq[request_chan].after_start_time;
507
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
508
			    nmxp_data_to_str(start_time_str, params.start_time);
509
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
510
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s start_time changed from %s to %s\n",
511
512
513
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    NMXP_LOG_STR(start_time_str),
				    NMXP_LOG_STR(default_start_time_str));
514
			    params.start_time = params.end_time - params.max_data_to_retrieve;
515
			}
516
517
518
		    } else {
			params.start_time = default_start_time;
		    }
519
520
		    channelList_Seq[request_chan].last_time = params.start_time;
		    channelList_Seq[request_chan].significant = 1;
521

522
		}
523

524
525
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "nmxp_sendDataRequest %d %s (%d)\n",
			channelList_subset->channel[request_chan].key,
526
527
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			request_chan);
528

529
530
531
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
532
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s start_time = %s - end_time = %s - (default_start_time = %s)\n",
533
534
535
536
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			NMXP_LOG_STR(start_time_str),
			NMXP_LOG_STR(end_time_str),
			NMXP_LOG_STR(default_start_time_str));
537

538
		/* DAP Step 5: Send Data Request */
539
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[request_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
540

541
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
542

543
544
545
546
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
547

548
549
		    if(params.flag_writefile) {
			/* Open output file */
550
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
551
552
553
554
555
556
557
558
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
559
				    channelList_subset->channel[request_chan].name,
560
561
562
				    str_start_time,
				    str_end_time);
			}
563

564
565
			outfile = fopen(filename, "w");
			if(!outfile) {
566
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
567
				    NMXP_LOG_STR(filename));
568
			}
569
		    }
570

571
572
573
574
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
575
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
576
577
578
579
580
581
582
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
583

584
		    }
585

586
587
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
588

589
590
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret = %d, type = %d, length = %d, recv_errno = %d\n",
			    ret, type, length, recv_errno);
591

592
593
594
595
596
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY  && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
			    &&  data_seed.err_general==0
#endif
			 ) {
597

598
599
600
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
601

602
603
604
			/* To prevent to manage a packet with zero sample after nmxp_data_trim() */
			if(pd->nSamp > 0) {

605
606
			/* Log contents of last packet */
			if(params.flag_logdata) {
607
			    nmxp_data_log(pd, params.flag_logsample);
608
			}
609

610
611
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
612
613
614
615
616

			/* It is not the channel I have requested or error from nmxp_chan_lookupKeyIndex() */
			if(request_chan != cur_chan  &&  cur_chan != -1) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "request_chan != cur_chan  %d != %d! (%d, %s) (%d, %s.%s.%s)\n",
				    request_chan, cur_chan,
617
618
619
				    channelList_subset->channel[request_chan].key,
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    pd->key, NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel));
620
			} else {
621

622
			/* Management of gaps */
623
			nmxptool_chanseq_gap(&(channelList_Seq[cur_chan]), pd);
624

625
#ifdef HAVE_LIBMSEED
626
			/* Write Mini-SEED record */
627
			if(params.type_writeseed) {
628
629
			    nmxptool_write_miniseed(pd);
			}
630
#endif
631

632
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
633
634
635
636
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
637
638
#endif

639
640
641
642
643
644
645
646
647
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
			/* Send data to SeedLink Server */
			if(params.flag_slinkms) {
			    nmxptool_msr_send_mseed(pd);
			}
#endif
#endif

648
649
650
651
652
653
#ifdef HAVE_EARTHWORMOBJS
			if(params.ew_configuration_file) {
			    nmxptool_ew_nmx2ew(pd);
			}
#endif

654
655
656
657
658
659
660
661
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
662
			}
663

664
665
666
			/* Store x_1 */
			channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];

667
668
			}

669
670
			} else {
			    /* TODO: nSamp <= 0 */
671
			}
672

673

674
675
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
676
			/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
677
		    }
678

679
680
681
682
683
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
684

685
686
		} else {
		    /* TODO: error message */
687
		}
688
		request_chan++;
689
690
691
692
693

#ifdef HAVE_EARTHWORMOBJS
		if(params.ew_configuration_file) {

		    /* Check if we are being asked to terminate */
694
		    if( (ew_check_flag_terminate = nmxptool_ew_check_flag_terminate()) ) {
695
			logit ("t", "nmxptool terminating on request\n");
696
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ, NULL, params.hostname);
697
698
699
700
701
702
703
704
705
706
			exitdapcondition = 0;
			times_flow = TIMES_FLOW_EXIT;
		    }

		    /* Check if we need to send heartbeat message */
		    nmxptool_ew_send_heartbeat_if_needed();

		}
#endif

707
	    }
708
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
709

710
711
712
713
714
715
716
717
718
719
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
720
	    } else {
721
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
722
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
723

724
	} /* END while(exitdapcondition) */
725

726
727
728
729
730
731
#ifdef HAVE_LIBMSEED
	if(params.type_writeseed) {
	    nmxp_data_seed_fclose_all(&data_seed);
	}
#endif

732
733
734
735
736
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);
737
	naqssock = 0;
738
739
740
741
742

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

743
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
744

Matteo Quintiliani's avatar
Matteo Quintiliani committed
745
    } else {
746

747
748
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

749
750
751
752
753
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
754
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds, nmxptool_sigcondition_read);
755
756
757

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
758
759
	}

760
761
762
763
764
765
766
767
768
769
770
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}
771
772
	/* Get a subset of channel from arguments, in respect to the step 3 of PDS */
	channelList_subset_waste = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
773

Matteo Quintiliani's avatar
Matteo Quintiliani committed
774
775
	/* Free the complete channel list */
	if(channelList) {
776
	    NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
777
778
779
	    channelList = NULL;
	}

780
781
	/* TODO check if channelList_subset_waste is equal to channelList_subset and free */
	if(channelList_subset_waste) {
782
	    NMXP_MEM_FREE(channelList_subset_waste);
783
784
	    channelList_subset_waste = NULL;
	}
785

786
787
788
789
790
791
792
793
794
795
	/* PDS Step 4: Send a Request Pending (optional) */

	/* PDS Step 5: Send AddChannels */
	/* Request Data */

	/* Better using a Thread */
#ifndef HAVE_PTHREAD_H
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate,
		(params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO, params.n_channel, params.usec, 1);
#else
796
797
798
799
	pthread_attr_init(&attr_request_channels);
	pthread_attr_setdetachstate(&attr_request_channels, PTHREAD_CREATE_JOINABLE);
	pthread_create(&thread_request_channels, &attr_request_channels, p_nmxp_sendAddTimeSeriesChannel, (void *)NULL);
	pthread_attr_destroy(&attr_request_channels);
800
801
#endif

802
#ifndef HAVE_WINDOWS_H
803
#ifdef HAVE_PTHREAD_H
804
805
	if(!already_listen  &&  params.listen_port != DEFAULT_LISTEN_PORT) {
	    already_listen = 1;
806
807
	    pthread_attr_init(&attr_socket_listen);
	    pthread_attr_setdetachstate(&attr_socket_listen, PTHREAD_CREATE_DETACHED);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
808
	    pthread_create(&thread_socket_listen, &attr_socket_listen, nmxptool_listen, (void *)params.listen_port);
809
810
	    pthread_attr_destroy(&attr_socket_listen);
	}
811
#endif
812
813
#endif

814
	/* PDS Step 6: Repeat until finished: receive and handle packets */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
815

816
	/* TODO*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
817
818
	exitpdscondition = 1;

819
	skip_current_packet = 0;
820

821
822
823
824
825
	while(exitpdscondition && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
		&&  data_seed.err_general==0
#endif
	     ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
826

827
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
828
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
829

830
831
832
833
834
835
836
837
838
839
840
	    if(!pd) {
		pd_null_count++;
		if((pd_null_count * params.timeoutrecv) >= timeoutrecv_warning) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Received %d times a null packet. (%d sec.)\n",
			    pd_null_count, pd_null_count * params.timeoutrecv);
		    pd_null_count = 0;
		}
	    } else {
		pd_null_count = 0;
	    }

841
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
842
843
		exitpdscondition = 1;
	    } else {
844
845
846
847
848
849
#ifdef HAVE_WINDOWS_H
		if(recv_errno == WSAEWOULDBLOCK  ||  recv_errno == WSAETIMEDOUT)
#else
		if(recv_errno == EWOULDBLOCK)
#endif
		{
850
851
		    exitpdscondition = 1;
		} else {
852
853
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n",
			    pd, recv_errno);
854
855

#ifdef HAVE_EARTHWORMOBJS
856
		    if(params.ew_configuration_file) {
857
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA, nmxp_strerror(recv_errno), params.hostname);
858
		    }
859
#endif
860
861
		    exitpdscondition = 0;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
862
	    }
863

864
865
866
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
867
		if(cur_chan == -1) {
868
869
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Key %d not found in channelList_subset!\n",
			    pd->key);
870
		}
871
872
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
873
	    /* Log contents of last packet */
874
	    if(params.flag_logdata) {
875
		nmxp_data_log(pd, params.flag_logsample);
876
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
877

878
	    skip_current_packet = 0;
879
	    if(pd &&
880
881
		    (params.statefile  ||  params.buffered_time) &&
		    ( params.timeoutrecv <= 0 )
882
	      )	{
883
884
		if(params.statefile && channelList_Seq[cur_chan].after_start_time > 0.0) {
		    cur_after_start_time = channelList_Seq[cur_chan].after_start_time;
885
886
887
888
889
890
		} else if(params.buffered_time) {
		    cur_after_start_time = params.buffered_time;
		} else {
		    cur_after_start_time = DEFAULT_BUFFERED_TIME;
		}
		nmxp_data_to_str(cur_after_start_time_str, cur_after_start_time);
891
892
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "cur_chan %d, cur_after_start_time %f, cur_after_start_time_str %s\n",
			cur_chan, cur_after_start_time, NMXP_LOG_STR(cur_after_start_time_str));
893
894
895
		if(pd->time + ((double) pd->nSamp / (double) pd->sampRate) >= cur_after_start_time) {
		    if(pd->time < cur_after_start_time) {
			int first_nsample_to_remove = (cur_after_start_time - pd->time) * (double) pd->sampRate;
896
897
898
899
			/* Remove the first sample in order avoiding overlap  */
			first_nsample_to_remove++;
			if(pd->nSamp > first_nsample_to_remove) {
			    pd->nSamp -= first_nsample_to_remove;
900
			    pd->time = cur_after_start_time;
901
902
903
904
905
906
907
908
909
910
911
			    pd->pDataPtr += first_nsample_to_remove;
			    pd->x0 = pd->pDataPtr[0];
			} else {
			    skip_current_packet = 1;
			}
		    }
		} else {
		    skip_current_packet = 1;
		}
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
912
	    if(!skip_current_packet) {
913

914
915
		/* Manage Raw Stream */
		if(params.stc == -1) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
916

917
918
		    /* cur_char is computed only for pd != NULL */
		    if(pd) {