nmxptool.c 50.7 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1 2 3 4 5 6 7 8 9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.231 2010-09-15 13:14:49 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12 13
 */

14
#include "config.h"
15 16 17
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
18
#include <errno.h>
19
#include <signal.h>
20

21 22 23
#include <sys/stat.h>
#include <unistd.h>

24 25 26 27
#include <nmxp.h>
#include "nmxptool_getoptlong.h"
#include "nmxptool_chanseq.h"
#include "nmxptool_sigcondition.h"
28
#include <nmxptool_listen.h>
29

30 31 32
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#else
33
#warning Requests of channels could not be efficient because they do not use a separate thread. 
34 35
#endif

36 37 38 39
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

40
#ifdef HAVE_EARTHWORMOBJS
41
#include "nmxptool_ew.h"
42
#endif
43

44 45 46
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
47

48 49
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
50 51
#endif

52 53
#define TIMES_FLOW_EXIT 100

54 55
int if_dap_condition_only_one_time = 0;

56 57 58 59 60 61 62 63 64
#define DAP_CONDITION(params_struct) (params_struct.start_time != 0.0 || params_struct.delay > 0)

#define EXIT_CONDITION_PRELIM (!nmxptool_sigcondition_read()  &&  !ew_check_flag_terminate  &&  !if_dap_condition_only_one_time)
#ifdef HAVE_LIBMSEED
#define EXIT_CONDITION ( EXIT_CONDITION_PRELIM  &&  data_seed.err_general==0 )
#else
#define EXIT_CONDITION ( EXIT_CONDITION_PRELIM )
#endif
		    
65 66
#define CURRENT_LOCATION ( (params.location)? params.location : DEFAULT_NULL_LOCATION )
#define LOCCODE_OR_CURRENT_LOCATION ( (location_code[0] != 0)? location_code : CURRENT_LOCATION )
67

68 69 70
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

71
static void ShutdownHandler(int sig);
72
static void nmxptool_AlarmHandler(int sig);
73
static void CloseConnectionHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
74

75 76
int nmxptool_exitcondition_on_open_socket();

Matteo Quintiliani's avatar
Matteo Quintiliani committed
77 78 79 80 81
void flushing_raw_data_stream();

void *nmxptool_print_info_raw_stream(void *arg);
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
82

83
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
84
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
85 86
int nmxptool_log_miniseed(const char *s);
int nmxptool_logerr_miniseed(const char *s);
87 88
#endif

89
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
90
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
91 92
#endif

93 94 95 96 97 98 99
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
void nmxptool_msr_send_mseed_handler (char *record, int reclen, void *handlerdata);
int nmxptool_msr_send_mseed(NMXP_DATA_PROCESS *pd);
#endif
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
100
#ifdef HAVE_PTHREAD_H
101
pthread_mutex_t mutex_sendAddTimeSeriesChannel = PTHREAD_MUTEX_INITIALIZER;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
102
pthread_t thread_request_channels;
103
pthread_attr_t attr_request_channels;
104
void *status_thread;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
105 106
void *p_nmxp_sendAddTimeSeriesChannel(void *arg);
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
107

108 109 110 111
#ifdef HAVE_PTHREAD_H
pthread_t thread_socket_listen;
pthread_attr_t attr_socket_listen;
void *status_thread_socket_listen;
112
int already_listen = 0;
113 114
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
115

116
/* Global variable for main program and handling terminitation program */
117 118 119
NMXPTOOL_PARAMS params={0};


120
int naqssock = 0;
121
int flag_force_close_connection = 0;
122 123
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
124
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
125
NMXP_CHAN_LIST_NET *channelList_subset_waste = NULL;
126
NMXPTOOL_CHAN_SEQ *channelList_Seq = NULL;
127
NMXP_META_CHAN_LIST *meta_channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
128 129 130
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

131 132
time_t lasttime_pds_receiveddata;
time_t timeout_pds_receiveddata = (NMXP_HIGHEST_TIMEOUT * 2);
133 134 135 136 137 138 139

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif

140
int ew_check_flag_terminate = 0;
141

142
int main (int argc, char **argv) {
143
    int32_t connection_time;
144
    int request_SOCKET_OK;
145 146 147 148
#ifdef HAVE_LIBMSEED
    int i_chan =0;
#endif
    int cur_chan = 0;
149
    int to_cur_chan = 0;
150
    int request_chan;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
151
    int exitpdscondition;
152
    int exitdapcondition;
153
    time_t timeout_for_channel;
154 155 156

    int time_to_sleep = 0;

157 158
    char str_start_time[200] = "";
    char str_end_time[200] = "";
159 160

    NMXP_MSG_SERVER type;
161
    char buffer[NMXP_MAX_LENGTH_DATA_BUFFER]={0};
162
    int32_t length;
163
    int ret;
164
    int main_ret = 0;
165

166
    int pd_null_count = 0;
167
    int timeoutrecv_warning = 300; /* 5 minutes */
168

169 170
    int times_flow = 0;

171
    int recv_errno = 0; 
Matteo Quintiliani's avatar
Matteo Quintiliani committed
172
#ifdef HAVE_EARTHWORMOBJS
173
    char *recv_errno_str;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
174
#endif
175
  
176
    char filename[500] = "";
177
    char station_code[20] = "", channel_code[20] = "", network_code[20] = "", location_code[20] = "";
178

Matteo Quintiliani's avatar
Matteo Quintiliani committed
179
    char cur_after_start_time_str[1024];
180 181
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
182

183
    double default_start_time = 0.0;
184
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
185

186
    NMXP_DATA_PROCESS *pd = NULL;
187

188 189 190 191
#ifdef HAVE_PTHREAD_H
    pthread_mutex_init(&mutex_sendAddTimeSeriesChannel, NULL);
#endif

192
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
193 194 195
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

196
    sa.sa_handler = nmxptool_AlarmHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
197 198 199
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
200

201 202 203
    sa.sa_handler = CloseConnectionHandler;
    sigaction(SIGUSR1, &sa, NULL);

204
    sa.sa_handler = ShutdownHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
205 206 207 208 209 210 211
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
212
#else
Matteo Quintiliani's avatar
Matteo Quintiliani committed
213 214
    /* Signal handling, use function signal() */

215
    /*
216
    signal(SIGALRM, nmxptool_AlarmHandler);
217
    */
218 219

    signal(SIGINT, ShutdownHandler);
220
    /*
221
    signal(SIGQUIT, ShutdownHandler);
222
    */
223 224
    signal(SIGTERM, ShutdownHandler);

225
    /*
226 227
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
228
    */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
229 230
#endif

231
    nmxptool_sigcondition_init();
232

Matteo Quintiliani's avatar
Matteo Quintiliani committed
233
    /* Default is normal output */
234
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
235 236 237 238 239 240

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

241
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
242 243

#ifdef HAVE_EARTHWORMOBJS
244

245 246
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
247
	nmxptool_ew_configure(argv, &params);
248

Matteo Quintiliani's avatar
Matteo Quintiliani committed
249 250 251 252
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
253

254 255 256
#else
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Earthworm feature has not been enabled in compilation!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
257
#endif
258

259
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
260

Matteo Quintiliani's avatar
Matteo Quintiliani committed
261 262
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

263 264 265 266 267
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

268 269
	/* List available channels on server */
	if(params.flag_listchannels) {
270

271
	    meta_channelList = nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_exitcondition_on_open_socket);
272 273 274

	    /* nmxp_meta_chan_print(meta_channelList); */
	    nmxp_meta_chan_print_with_match(meta_channelList, params.channels);
275

276
	    return 1;
277

278 279
	} else if(params.flag_listchannelsnaqs) {

280
	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_exitcondition_on_open_socket);
281 282

	    /* nmxp_chan_print_channelList(channelList); */
283
	    nmxp_chan_print_channelList_with_match(channelList, params.channels, 1);
284

285
	    return 1;
286

287 288
	}
    }
289

290
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
291 292 293
    if(params.verbose_level != DEFAULT_VERBOSE_LEVEL) {
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);
    }
294

295
#ifdef HAVE_LIBMSEED
296
    data_seed.err_general = 0;
297
    if(params.type_writeseed) {
298
	ms_loginit((void*)&nmxptool_log_miniseed, NULL, (void*)&nmxptool_logerr_miniseed, "error: ");
299
	/* Init mini-SEED variables */
300 301 302
	nmxp_data_seed_init(&data_seed, params.outdirseed,
		CURRENT_NETWORK,
		(params.type_writeseed == TYPE_WRITESEED_BUD)? NMXP_TYPE_WRITESEED_BUD : NMXP_TYPE_WRITESEED_SDS);
303 304 305
    }
#endif

306 307 308 309
    nmxptool_log_params(&params);

    if(params.stc == -1) {

310
#ifndef HAVE_WINDOWS_H
311 312 313
	if(params.listen_port != DEFAULT_LISTEN_PORT) {
	    p_func_pd[n_func_pd++] = nmxptool_listen_print_seq_no;
	}
314
#endif
315

316 317 318 319 320 321
	if(params.flag_logdata) {
	    p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	}

#ifdef HAVE_LIBMSEED
	/* Write Mini-SEED record */
322
	if(params.type_writeseed) {
323 324 325 326 327 328 329 330 331 332 333
	    p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	}
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slink) {
	    p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	}
#endif

334 335 336 337 338 339 340 341 342
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slinkms) {
	    p_func_pd[n_func_pd++] = nmxptool_msr_send_mseed;
	}
#endif
#endif

343 344 345 346 347 348 349 350 351 352 353 354 355 356
#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	}
#endif

    }

#ifdef HAVE_EARTHWORMOBJS
    if(params.ew_configuration_file) {
	nmxptool_ew_attach();
    }
#endif

357

358
    /* Exit only on request */
359
    while(EXIT_CONDITION) {
360

361
    NMXP_MEM_PRINT_PTR(0, 1);
362

363
    /* Get list of available channels and get a subset list of params.channels */
364
    if( DAP_CONDITION(params) ) {
365
	if_dap_condition_only_one_time = 1;
366
	/* From DataServer */
367
	if(!nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES,
368
		    params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_exitcondition_on_open_socket)) {
369 370
	    return -1;
	}
371 372
    } else {
	/* From NaqsServer */
373
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_exitcondition_on_open_socket);
374 375
    }

376 377 378 379 380
    if(!channelList) {
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channel list has not been received!\n");
	return 1;
    }

381
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK, CURRENT_LOCATION);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
382 383 384
    
    /* Free the complete channel list */
    if(channelList) {
385
	NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
386 387
	channelList = NULL;
    }
388 389 390

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
391
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
392
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
393
    } else {
394
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
395

396
	nmxptool_chanseq_init(&channelList_Seq, channelList_subset->number, DEFAULT_BUFFERED_TIME, params.max_tolerable_latency, params.timeoutrecv);
397

Matteo Quintiliani's avatar
Matteo Quintiliani committed
398
#ifdef HAVE_LIBMSEED
399
	if(params.type_writeseed  ||  params.flag_slinkms) {
400
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
401

402 403
	    /* Init mini-SEED record list */
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
404

405 406
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA,
			"Init mini-SEED record for %s\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
407

408
		msr_list_chan[i_chan] = msr_init(NULL);
409

410
		/* Separate station_code and channel_code */
411
		if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code, location_code)) {
412

413 414
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n",
			    NMXP_LOG_STR(NETCODE_OR_CURRENT_NETWORK), NMXP_LOG_STR(station_code), NMXP_LOG_STR(channel_code));
415 416 417
		    strncpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK, 11);
		    strncpy(msr_list_chan[i_chan]->station, station_code, 11);
		    strncpy(msr_list_chan[i_chan]->channel, channel_code, 11);
418 419 420 421 422
		    if(location_code[0] != 0) {
		      if(strcmp(location_code, DEFAULT_NULL_LOCATION) != 0) {
			strncpy(msr_list_chan[i_chan]->location, location_code, 11);
		      }
		    }
423

Matteo Quintiliani's avatar
Matteo Quintiliani committed
424
		    msr_list_chan[i_chan]->reclen   = params.reclen;     /* Byte record length */
425
		    msr_list_chan[i_chan]->encoding = params.encoding;  /* Steim 1 compression by default */
426

Matteo Quintiliani's avatar
Matteo Quintiliani committed
427 428 429 430 431
		    /* Reset some values */
		    msr_list_chan[i_chan]->sequence_number = 0;
		    msr_list_chan[i_chan]->datasamples = NULL;
		    msr_list_chan[i_chan]->numsamples = 0;

432 433 434 435 436
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL,
			    "Channels %s error in format!\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
		    return 1;
		}
437

438
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
439 440
	}
#endif
441

442 443
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
444
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin communication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
445

446
    times_flow = 0;
447
    recv_errno = 0;
448

449 450 451 452 453
    while(times_flow < 2  &&  recv_errno == 0 && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
	    &&  data_seed.err_general==0
#endif
	    ) {
454

455
	if(params.statefile) {
456
	    nmxptool_chanseq_load_states(channelList_subset, channelList_Seq, params.statefile, params.stc);
457 458
	}

459 460 461 462 463 464 465 466 467 468 469
	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
470
    /* Condition for starting DAP or PDS */
471 472
    if( DAP_CONDITION(params) ||
	    (times_flow == 0  &&  params.statefile && params.max_data_to_retrieve > 0 && params.interval == DEFAULT_INTERVAL_INFINITE) ) {
473 474

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
475

476 477 478 479 480 481 482
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
483 484
	    params.start_time = ((double) (time(NULL) - params.delay - params.span_data) / 10.0) * 10.0;
	    params.end_time = params.start_time + params.span_data;
485 486
	}

487

488 489 490
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
491

492
	/* DAP Step 1: Open a socket */
493
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap, nmxptool_exitcondition_on_open_socket)) == NMXP_SOCKET_ERROR) {
494
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
495 496
	    return 1;
	}
497

498 499
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
500
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
501 502
	    return 1;
	}
503

504 505
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
506
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
507 508 509 510 511
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
512
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
513 514
	    return 1;
	}
515

516 517
	exitdapcondition = 1;

518
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - params.max_data_to_retrieve;
519

520 521 522 523 524
	while(exitdapcondition  &&  !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
		&&  data_seed.err_general==0
#endif
	     ) {
525

526
	    /* Start loop for sending requests */
527
	    request_chan=0;
528
	    request_SOCKET_OK = NMXP_SOCKET_OK;
529

530
	    /* For each channel */
531 532 533 534 535
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  request_chan < channelList_subset->number  &&  exitdapcondition && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
		    &&  data_seed.err_general==0
#endif
		    ) {
536

537
		if(params.statefile) {
538 539
		    if(channelList_Seq[request_chan].after_start_time > 0) {
			params.start_time = channelList_Seq[request_chan].after_start_time;
540
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
541
			    nmxp_data_to_str(start_time_str, params.start_time);
542
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
543
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s start_time changed from %s to %s\n",
544 545 546
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    NMXP_LOG_STR(start_time_str),
				    NMXP_LOG_STR(default_start_time_str));
547
			    params.start_time = params.end_time - params.max_data_to_retrieve;
548
			}
549 550 551
		    } else {
			params.start_time = default_start_time;
		    }
552 553
		    channelList_Seq[request_chan].last_time = params.start_time;
		    channelList_Seq[request_chan].significant = 1;
554

555
		}
556

557 558
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "nmxp_sendDataRequest %d %s (%d)\n",
			channelList_subset->channel[request_chan].key,
559 560
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			request_chan);
561

562 563 564
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
565
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s start_time = %s - end_time = %s - (default_start_time = %s)\n",
566 567 568 569
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			NMXP_LOG_STR(start_time_str),
			NMXP_LOG_STR(end_time_str),
			NMXP_LOG_STR(default_start_time_str));
570

571
		/* DAP Step 5: Send Data Request */
572
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[request_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
573

574
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
575

576 577 578 579
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
580
/*Possible bug params not inited*/
581 582
		    if(params.flag_writefile) {
			/* Open output file */
583
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code, location_code)) {
584 585 586 587 588 589 590 591
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
592
				    channelList_subset->channel[request_chan].name,
593 594 595
				    str_start_time,
				    str_end_time);
			}
596

597 598
			outfile = fopen(filename, "w");
			if(!outfile) {
599
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
600
				    NMXP_LOG_STR(filename));
601
			}
602
		    }
603

604 605 606 607
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
608
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code, location_code)) {
609 610 611 612 613 614 615
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
616

617
		    }
618

619
		    /* DAP Step 6: Receive Data until receiving a Ready message */
620
		    ret = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
621

622 623
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret = %d, type = %d, length = %d, recv_errno = %d\n",
			    ret, type, length, recv_errno);
624

625 626 627 628 629
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY  && !nmxptool_sigcondition_read()
#ifdef HAVE_LIBMSEED
			    &&  data_seed.err_general==0
#endif
			 ) {
630 631 632 633 634 635 636 637 638
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */ /*STEFANO*/
                        
			if (pd != NULL) {
			    if (pd->pDataPtr != NULL) {
				NMXP_MEM_FREE(pd->pDataPtr);
			    }
			    NMXP_MEM_FREE(pd);
			}

639
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK, LOCCODE_OR_CURRENT_LOCATION);
640 641 642 643 644 645

			/* Force value for timing_quality if declared in the command-line */
			if(pd && params.timing_quality != -1) {
			    pd->timing_quality = params.timing_quality;
			}

646 647 648 649 650 651
                        /* set the data quality indicator */			
			if (pd) {
                          pd->quality_indicator = params.quality_indicator;
                        }
			                                                                          

652
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
653

654 655 656
			/* To prevent to manage a packet with zero sample after nmxp_data_trim() */
			if(pd->nSamp > 0) {

657 658
			/* Log contents of last packet */
			if(params.flag_logdata) {
659
			    nmxp_data_log(pd, params.flag_logsample);
660
			}
661

662 663
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
664 665 666 667 668

			/* It is not the channel I have requested or error from nmxp_chan_lookupKeyIndex() */
			if(request_chan != cur_chan  &&  cur_chan != -1) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "request_chan != cur_chan  %d != %d! (%d, %s) (%d, %s.%s.%s)\n",
				    request_chan, cur_chan,
669 670 671
				    channelList_subset->channel[request_chan].key,
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    pd->key, NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel));
672
			} else {
673

674
			/* Management of gaps */
675
			nmxptool_chanseq_gap(&(channelList_Seq[cur_chan]), pd);
676

677
#ifdef HAVE_LIBMSEED
678
			/* Write Mini-SEED record */
679
			if(params.type_writeseed) {
680 681
			    nmxptool_write_miniseed(pd);
			}
682
#endif
683

684
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
685 686 687 688
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
689 690
#endif

691 692 693 694 695 696 697 698 699
#ifdef HAVE_LIBMSEED
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
			/* Send data to SeedLink Server */
			if(params.flag_slinkms) {
			    nmxptool_msr_send_mseed(pd);
			}
#endif
#endif

700 701 702 703 704 705
#ifdef HAVE_EARTHWORMOBJS
			if(params.ew_configuration_file) {
			    nmxptool_ew_nmx2ew(pd);
			}
#endif

706 707
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
708
			    if(outfile && length > 0) {
709 710 711 712 713
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
714
			}
715

716 717 718
			/* Store x_1 */
			channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];

719 720
			}

721 722
			} else {
			    /* TODO: nSamp <= 0 */
723
			}
724

725

726
			/* Receive Data */
727
			ret = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
728
			/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
729
		    }
730

731 732 733 734 735
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
736

737 738
		} else {
		    /* TODO: error message */
739
		}
740
		request_chan++;
741 742 743 744 745

#ifdef HAVE_EARTHWORMOBJS
		if(params.ew_configuration_file) {

		    /* Check if we are being asked to terminate */
746
		    if( (ew_check_flag_terminate = nmxptool_ew_check_flag_terminate()) ) {
747
			logit ("t", "nmxptool terminating on request\n");
748
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ, NULL, params.hostname);
749 750 751 752 753 754 755 756 757 758
			exitdapcondition = 0;
			times_flow = TIMES_FLOW_EXIT;
		    }

		    /* Check if we need to send heartbeat message */
		    nmxptool_ew_send_heartbeat_if_needed();

		}
#endif

759
	    }
760
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
761

762
	    if(params.delay > 0) {
763 764
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + params.span_data));
		/* TODO if time_to_sleep exceds DAP protocol time-out, split sleep() and send alive packet */
765
		if(time_to_sleep >= 0) {
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
		    while(time_to_sleep>0 && !nmxptool_sigcondition_read()) {
			nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "time to sleep %d sec.\n", time_to_sleep);
			if(time_to_sleep >= NMXP_DAP_TIMEOUT_KEEPALIVE) {
			    nmxp_sleep(NMXP_DAP_TIMEOUT_KEEPALIVE);
			    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "nmxp_sendRequestPending\n");
			    nmxp_sendRequestPending(naqssock);
			    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
				nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
				return 1;
			    }
			} else {
			    nmxp_sleep(time_to_sleep);
			}
			time_to_sleep -= NMXP_DAP_TIMEOUT_KEEPALIVE;
		    }
781 782 783 784 785
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
786
		params.end_time = params.start_time + params.span_data;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
787
	    } else {
788
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
789
	    }
790

791
	} /* END while(exitdapcondition) */
792

793 794
#ifdef HAVE_LIBMSEED
	if(params.type_writeseed) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
795 796 797 798 799 800 801 802
	    if(*msr_list_chan) {
		for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
		    if(msr_list_chan[i_chan]) {
			/* Flush remaining samples */
			nmxp_data_msr_pack(NULL, &data_seed, msr_list_chan[i_chan]);
		    }
		}
	    }
803 804 805 806
	    nmxp_data_seed_fclose_all(&data_seed);
	}
#endif

807 808 809 810 811
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);
812
	naqssock = 0;
813 814 815 816 817

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

818
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
819

Matteo Quintiliani's avatar
Matteo Quintiliani committed
820
    } else {
821

822 823
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

824 825 826 827 828
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
829
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds, nmxptool_exitcondition_on_open_socket);
830 831 832

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
833 834
	}

835 836 837 838 839 840 841 842 843 844 845
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}
846
	/* Get a subset of channel from arguments, in respect to the step 3 of PDS */
847
	channelList_subset_waste = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK, CURRENT_LOCATION);
848

Matteo Quintiliani's avatar
Matteo Quintiliani committed
849 850
	/* Free the complete channel list */
	if(channelList) {
851
	    NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
852 853 854
	    channelList = NULL;
	}

855 856
	/* TODO check if channelList_subset_waste is equal to channelList_subset and free */
	if(channelList_subset_waste) {
857
	    NMXP_MEM_FREE(channelList_subset_waste);
858 859
	    channelList_subset_waste = NULL;
	}
860

861 862 863 864 865 866 867 868 869 870
	/* PDS Step 4: Send a Request Pending (optional) */

	/* PDS Step 5: Send AddChannels */
	/* Request Data */

	/* Better using a Thread */
#ifndef HAVE_PTHREAD_H
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate,
		(params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO, params.n_channel, params.usec, 1);
#else
871
	pthread_attr_init(&attr_request_channels);
872
	pthread_attr_setdetachstate(&attr_request_channels, PTHREAD_CREATE_DETACHED);
873 874
	pthread_create(&thread_request_channels, &attr_request_channels, p_nmxp_sendAddTimeSeriesChannel, (void *)NULL);
	pthread_attr_destroy(&attr_request_channels);
875 876
#endif

877
#ifndef HAVE_WINDOWS_H
878
#ifdef HAVE_PTHREAD_H
879 880
	if(!already_listen  &&  params.listen_port != DEFAULT_LISTEN_PORT) {
	    already_listen = 1;
881 882
	    pthread_attr_init(&attr_socket_listen);
	    pthread_attr_setdetachstate(&attr_socket_listen, PTHREAD_CREATE_DETACHED);
883
	    pthread_create(&thread_socket_listen, &attr_socket_listen, nmxptool_listen, (void *) &params.listen_port);
884 885
	    pthread_attr_destroy(&attr_socket_listen);
	}
886
#endif
887 888
#endif

889
	/* PDS Step 6: Repeat until finished: receive and handle packets */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
890

891
	/* TODO*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
892
	exitpdscondition = 1;
893
	flag_force_close_connection = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
894

895
	skip_current_packet = 0;
896 897 898

	time(&lasttime_pds_receiveddata);

899
	/* begin  main PDS loop */
900

901
	while(exitpdscondition && !nmxptool_sigcondition_read() && !flag_force_close_connection
902 903 904 905
#ifdef HAVE_LIBMSEED
		&&  data_seed.err_general==0
#endif
	     ) {
906 907 908 909 910 911 912 913
	    
	    /* added 2010-07-26, RR */
            if (pd != NULL) {
              if (pd->pDataPtr != NULL) {
                NMXP_MEM_FREE(pd->pDataPtr);
              }
              NMXP_MEM_FREE(pd);
            } 
914
	    /* Process Compressed or Decompressed Data */
915
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, LOCCODE_OR_CURRENT_LOCATION, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
916

917 918 919 920 921 922 923 924
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_EXTRA, "Received %s packet.\n", (pd)? "not null" : "null");

	    /* Get time when receive some data */
	    if(pd) {
		time(&lasttime_pds_receiveddata);
	    }

	    if ( (time(NULL) - lasttime_pds_receiveddata) >= timeout_pds_receiveddata ) {
925
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "No data received within %d seconds. Force to close connection and open again.\n", timeout_pds_receiveddata);
926 927 928
		flag_force_close_connection = 1;
		time(&lasttime_pds_receiveddata);
	    }
929

930 931 932 933
	    /* Force value for timing_quality if declared in the command-line */
	    if(pd && params.timing_quality != -1) {
		pd->timing_quality = params.timing_quality;
	    }
934 935 936 937 938 939
            
            /* set the data quality indicator */
            if (pd) {
              pd->quality_indicator = params.quality_indicator;
            }
                                                                                                            
940 941 942 943 944 945 946 947 948 949 950
	    if(!pd) {
		pd_null_count++;
		if((pd_null_count * params.timeoutrecv) >= timeoutrecv_warning) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Received %d times a null packet. (%d sec.)\n",
			    pd_null_count, pd_null_count * params.timeoutrecv);
		    pd_null_count = 0;
		}
	    } else {
		pd_null_count = 0;
	    }