nmxptool.c 45.8 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1 2 3 4 5 6 7 8 9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
10
 * $Id: nmxptool.c,v 1.207 2008-11-07 22:44:25 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12 13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14 15
#include "config.h"

16 17 18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
#include <signal.h>
21

22 23 24
#include <sys/stat.h>
#include <unistd.h>

25 26 27 28
#include <nmxp.h>
#include "nmxptool_getoptlong.h"
#include "nmxptool_chanseq.h"
#include "nmxptool_sigcondition.h"
29
#include <nmxptool_listen.h>
30

31 32 33
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#else
34
#warning Requests of channels could not be efficient because they do not use a separate thread. 
35 36
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
37 38 39 40
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

41
#ifdef HAVE_EARTHWORMOBJS
42
#include "nmxptool_ew.h"
43
#endif
44

45 46 47
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
48

49 50
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
51 52
#endif

53 54
#define TIMES_FLOW_EXIT 100

55 56
int if_dap_condition_only_one_time = 0;

57
#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )
58
#define EXIT_CONDITION (!nmxptool_sigcondition_read()  &&  !ew_check_flag_terminate  &&  !if_dap_condition_only_one_time)
59

60 61 62
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

63 64
static void ShutdownHandler(int sig);
static void AlarmHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65

Matteo Quintiliani's avatar
Matteo Quintiliani committed
66 67 68 69 70
void flushing_raw_data_stream();

void *nmxptool_print_info_raw_stream(void *arg);
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
71

72
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
73
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
74 75
int nmxptool_log_miniseed(const char *s);
int nmxptool_logerr_miniseed(const char *s);
76 77
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
78
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
79
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80 81
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
82 83
#ifdef HAVE_PTHREAD_H
pthread_t thread_request_channels;
84
pthread_attr_t attr_request_channels;
85
void *status_thread;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
86 87
void *p_nmxp_sendAddTimeSeriesChannel(void *arg);
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
88

89 90 91 92
#ifdef HAVE_PTHREAD_H
pthread_t thread_socket_listen;
pthread_attr_t attr_socket_listen;
void *status_thread_socket_listen;
93
int already_listen = 0;
94 95
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
96

97 98 99 100 101
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
102
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
103
NMXP_CHAN_LIST_NET *channelList_subset_waste = NULL;
104
NMXPTOOL_CHAN_SEQ *channelList_Seq = NULL;
105
NMXP_META_CHAN_LIST *meta_channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106 107 108
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

109 110 111 112 113 114 115

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif

116
int ew_check_flag_terminate = 0;
117

118 119 120 121 122 123
#ifdef HAVE_WINDOWS_H
const char sepdir = '\\';
#else
const char sepdir = '/';
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
124 125 126
#ifdef HAVE_MKDIR
/* TODO */
#endif
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
int mkdirp(const char *filename, mode_t mode) {
    char *dir = strdup(filename);
    int i, l;
    int	error=0;

    if(!filename) return -1;
    dir = strdup(filename);
    if(!dir) return -1;

    l = strlen(dir);
    i = 0;
    while(i < l  &&  error != -1) {
	if(dir[i] == sepdir  &&  i > 0) {
	    dir[i] = 0;
	    /* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "trying to create %s...\n", dir); */
	    if(chdir(dir) == -1) {
		error=mkdir(dir, mode);
	    }
	    dir[i] = sepdir;
	}
	i++;
    }
    if(error != -1) {
	error=mkdir(dir, mode);
    }

    free(dir);
    return error;
}


158
int main (int argc, char **argv) {
159
    int32_t connection_time;
160
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
161
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
162
    int to_cur_chan = 0;
163
    int request_chan;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
164
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
165
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
166
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
167

Matteo Quintiliani's avatar
Matteo Quintiliani committed
168
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
169 170
    int time_to_sleep = 0;

171 172
    char str_start_time[200] = "";
    char str_end_time[200] = "";
173 174

    NMXP_MSG_SERVER type;
175
    void *buffer = NULL;
176
    int32_t length;
177
    int ret;
178
    int main_ret = 0;
179

180
    int pd_null_count = 0;
181
    int timeoutrecv_warning = 300; /* 5 minutes */
182

183 184
    int times_flow = 0;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
185 186
    int recv_errno = 0;

187
    char dirseedchan[1024];
188 189
    char filename[500] = "";
    char station_code[20] = "", channel_code[20] = "", network_code[20] = "";
190

Matteo Quintiliani's avatar
Matteo Quintiliani committed
191
    char cur_after_start_time_str[1024];
192 193
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
194

195
    double default_start_time = 0.0;
196
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
197

198
    NMXP_DATA_PROCESS *pd = NULL;
199

Matteo Quintiliani's avatar
Matteo Quintiliani committed
200
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
201 202 203
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

204
    sa.sa_handler = AlarmHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
205 206 207
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
208

209
    sa.sa_handler = ShutdownHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
210 211 212 213 214 215 216
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
217
#else
Matteo Quintiliani's avatar
Matteo Quintiliani committed
218 219
    /* Signal handling, use function signal() */

220
    /*
221
    signal(SIGALRM, AlarmHandler);
222
    */
223 224

    signal(SIGINT, ShutdownHandler);
225
    /*
226
    signal(SIGQUIT, ShutdownHandler);
227
    */
228 229
    signal(SIGTERM, ShutdownHandler);

230
    /*
231 232
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
233
    */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
234 235
#endif

236
    nmxptool_sigcondition_init();
237

Matteo Quintiliani's avatar
Matteo Quintiliani committed
238
    /* Default is normal output */
239
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
240 241 242 243 244 245

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

246
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
247 248

#ifdef HAVE_EARTHWORMOBJS
249

250 251
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
252
	nmxptool_ew_configure(argv, &params);
253

Matteo Quintiliani's avatar
Matteo Quintiliani committed
254 255 256 257
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
258

259 260 261
#else
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Earthworm feature has not been enabled in compilation!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
262
#endif
263

264
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
265

Matteo Quintiliani's avatar
Matteo Quintiliani committed
266 267
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

268 269 270 271 272
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

273 274
	/* List available channels on server */
	if(params.flag_listchannels) {
275

276 277 278 279
	    meta_channelList = nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read);

	    /* nmxp_meta_chan_print(meta_channelList); */
	    nmxp_meta_chan_print_with_match(meta_channelList, params.channels);
280

281
	    return 1;
282

283 284
	} else if(params.flag_listchannelsnaqs) {

285
	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
286 287 288 289

	    /* nmxp_chan_print_channelList(channelList); */
	    nmxp_chan_print_channelList_with_match(channelList, params.channels);

290
	    return 1;
291

292 293
	}
    }
294

295
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
296 297 298
    if(params.verbose_level != DEFAULT_VERBOSE_LEVEL) {
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);
    }
299

300
#ifdef HAVE_LIBMSEED
301
    if(params.type_writeseed) {
302
	ms_loginit((void*)&nmxptool_log_miniseed, NULL, (void*)&nmxptool_logerr_miniseed, "error: ");
303 304 305 306 307
	/* Init mini-SEED variables */
	nmxp_data_seed_init(&data_seed);
    }
#endif

308 309 310 311
    nmxptool_log_params(&params);

    if(params.stc == -1) {

312
#ifndef HAVE_WINDOWS_H
313
	p_func_pd[n_func_pd++] = nmxptool_listen_print_seq_no;
314
#endif
315

316 317 318 319 320 321
	if(params.flag_logdata) {
	    p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	}

#ifdef HAVE_LIBMSEED
	/* Write Mini-SEED record */
322
	if(params.type_writeseed) {
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
	    p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	}
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slink) {
	    p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	}
#endif

#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	}
#endif

    }

#ifdef HAVE_EARTHWORMOBJS
    if(params.ew_configuration_file) {
	nmxptool_ew_attach();
    }
#endif

    /* Exit only on request */
349
    while(EXIT_CONDITION) {
350

351
    NMXP_MEM_PRINT_PTR(0);
352

353
    /* Get list of available channels and get a subset list of params.channels */
354
    if( DAP_CONDITION(params) ) {
355
	if_dap_condition_only_one_time = 1;
356
	/* From DataServer */
357
	if(!nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES,
358
		    params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read)) {
359 360
	    return -1;
	}
361 362
    } else {
	/* From NaqsServer */
363
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
364 365
    }

366 367 368 369 370
    if(!channelList) {
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channel list has not been received!\n");
	return 1;
    }

371
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
372 373 374
    
    /* Free the complete channel list */
    if(channelList) {
375
	NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
376 377
	channelList = NULL;
    }
378 379 380

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
381
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
382
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
383
    } else {
384
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
385

386
	nmxptool_chanseq_init(&channelList_Seq, channelList_subset->number, DEFAULT_BUFFERED_TIME, params.max_tolerable_latency, params.timeoutrecv);
387

Matteo Quintiliani's avatar
Matteo Quintiliani committed
388
#ifdef HAVE_LIBMSEED
389
	if(params.type_writeseed) {
390
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
391

392 393
	    /* Init mini-SEED record list */
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
394

395 396
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA,
			"Init mini-SEED record for %s\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
397

398
		msr_list_chan[i_chan] = msr_init(NULL);
399

400 401
		/* Separate station_code and channel_code */
		if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
402

403 404
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n",
			    NMXP_LOG_STR(NETCODE_OR_CURRENT_NETWORK), NMXP_LOG_STR(station_code), NMXP_LOG_STR(channel_code));
405

406 407 408
		    strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
		    strcpy(msr_list_chan[i_chan]->station, station_code);
		    strcpy(msr_list_chan[i_chan]->channel, channel_code);
409

410 411
		    msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		    msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */
412

413 414 415 416 417
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL,
			    "Channels %s error in format!\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
		    return 1;
		}
418

419
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
420 421
	}
#endif
422

423 424
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
425
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin communication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
426

427
    times_flow = 0;
428
    recv_errno = 0;
429

430
    while(times_flow < 2  &&  recv_errno == 0 && !nmxptool_sigcondition_read()) {
431

432
	if(params.statefile) {
433
	    nmxptool_chanseq_load_states(channelList_subset, channelList_Seq, params.statefile);
434 435
	}

436 437 438 439 440 441 442 443 444 445 446
	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
447
    /* Condition for starting DAP or PDS */
448 449
    if( DAP_CONDITION(params) ||
	    (times_flow == 0  &&  params.statefile && params.max_data_to_retrieve > 0 && params.interval == DEFAULT_INTERVAL_INFINITE) ) {
450 451

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
452

453 454 455 456 457 458 459
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
460
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
461 462 463
	    params.end_time = params.start_time + span_interval;
	}

464

465 466 467
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
468

469
	/* DAP Step 1: Open a socket */
470
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap, nmxptool_sigcondition_read)) == NMXP_SOCKET_ERROR) {
471
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
472 473
	    return 1;
	}
474

475 476
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
477
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
478 479
	    return 1;
	}
480

481 482
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
483
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
484 485 486 487 488
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
489
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
490 491
	    return 1;
	}
492

493 494
	exitdapcondition = 1;

495
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - params.max_data_to_retrieve;
496

497
	while(exitdapcondition  &&  !nmxptool_sigcondition_read()) {
498

499
	    /* Start loop for sending requests */
500
	    request_chan=0;
501
	    request_SOCKET_OK = NMXP_SOCKET_OK;
502

503
	    /* For each channel */
504
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  request_chan < channelList_subset->number  &&  exitdapcondition && !nmxptool_sigcondition_read()) {
505

506
		if(params.statefile) {
507 508
		    if(channelList_Seq[request_chan].after_start_time > 0) {
			params.start_time = channelList_Seq[request_chan].after_start_time;
509
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
510
			    nmxp_data_to_str(start_time_str, params.start_time);
511
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
512
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s start_time changed from %s to %s\n",
513 514 515
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    NMXP_LOG_STR(start_time_str),
				    NMXP_LOG_STR(default_start_time_str));
516
			    params.start_time = params.end_time - params.max_data_to_retrieve;
517
			}
518 519 520
		    } else {
			params.start_time = default_start_time;
		    }
521 522
		    channelList_Seq[request_chan].last_time = params.start_time;
		    channelList_Seq[request_chan].significant = 1;
523

524
		}
525

526 527
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "nmxp_sendDataRequest %d %s (%d)\n",
			channelList_subset->channel[request_chan].key,
528 529
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			request_chan);
530

531 532 533
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
534
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s start_time = %s - end_time = %s - (default_start_time = %s)\n",
535 536 537 538
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			NMXP_LOG_STR(start_time_str),
			NMXP_LOG_STR(end_time_str),
			NMXP_LOG_STR(default_start_time_str));
539

540
		/* DAP Step 5: Send Data Request */
541
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[request_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
542

543
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
544

545 546 547 548
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
549

550 551
		    if(params.flag_writefile) {
			/* Open output file */
552
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
553 554 555 556 557 558 559 560
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
561
				    channelList_subset->channel[request_chan].name,
562 563 564
				    str_start_time,
				    str_end_time);
			}
565

566 567
			outfile = fopen(filename, "w");
			if(!outfile) {
568
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
569
				    NMXP_LOG_STR(filename));
570
			}
571
		    }
572 573

#ifdef HAVE_LIBMSEED
574
		    if(params.type_writeseed) {
575
			/* Open output Mini-SEED file */
576
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594

			    if(params.type_writeseed == TYPE_WRITESEED_BUD) {
				sprintf(dirseedchan, "%s%c%s%c%s", params.outdirseed, sepdir,
					NETCODE_OR_CURRENT_NETWORK, sepdir,
					station_code);
			    } else {
				sprintf(dirseedchan, "%s%c%d%c%s%c%s%c%s.D", params.outdirseed, sepdir,
					nmxp_data_year_from_epoch(params.start_time), sepdir,
					NETCODE_OR_CURRENT_NETWORK, sepdir,
					station_code, sepdir,
					channel_code);
			    }

			    if(chdir(dirseedchan) == -1) {
				/* nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Directory %s does not exist!\n", dirseedchan); */

				if(mkdirp(dirseedchan, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH) == -1) {
				    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Directory %s has not been created!\n", dirseedchan);
595
				} else {
596 597
				    /* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Directory %s created!\n", dirseedchan); */
				    if(chdir(dirseedchan) == -1) {
598
					/* ERROR */
599
					nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Directory %s should be created but it does not exist!\n", dirseedchan);
600 601 602
				    }
				}

603
			    } else {
604
				/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Directory %s exists!\n", dirseedchan); */
605 606 607 608
			    }


			    /*
609 610 611 612 613 614
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
615 616 617
				    */

			    /* TODO if the requested data covers different days
618
			     * filename contains also the year and the yday of the end time, this breaks both SDS and BUD structure */
619 620
			    if( ( nmxp_data_year_from_epoch(params.start_time) == nmxp_data_year_from_epoch(params.end_time) )
				    &&  ( nmxp_data_yday_from_epoch(params.start_time) == nmxp_data_yday_from_epoch(params.end_time)) ) {
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
				if(params.type_writeseed == TYPE_WRITESEED_BUD) {
				    sprintf(data_seed.filename_mseed, "%s.%s..%s.%d.%03d",
					    station_code,
					    NETCODE_OR_CURRENT_NETWORK,
					    channel_code,
					    nmxp_data_year_from_epoch(params.start_time),
					    nmxp_data_yday_from_epoch(params.start_time));
				} else {
				    sprintf(data_seed.filename_mseed, "%s.%s..%s.D.%d.%03d",
					    NETCODE_OR_CURRENT_NETWORK,
					    station_code,
					    channel_code,
					    nmxp_data_year_from_epoch(params.start_time),
					    nmxp_data_yday_from_epoch(params.start_time));
				}
636
			    } else {
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
				if(params.type_writeseed == TYPE_WRITESEED_BUD) {
				    sprintf(data_seed.filename_mseed, "%s.%s..%s.%d.%03d-%d.%03d",
					    station_code,
					    NETCODE_OR_CURRENT_NETWORK,
					    channel_code,
					    nmxp_data_year_from_epoch(params.start_time),
					    nmxp_data_yday_from_epoch(params.start_time),
					    nmxp_data_year_from_epoch(params.end_time),
					    nmxp_data_yday_from_epoch(params.end_time));
				} else {
				    sprintf(data_seed.filename_mseed, "%s.%s..%s.D.%d.%03d-%d.%03d",
					    NETCODE_OR_CURRENT_NETWORK,
					    station_code,
					    channel_code,
					    nmxp_data_year_from_epoch(params.start_time),
					    nmxp_data_yday_from_epoch(params.start_time),
					    nmxp_data_year_from_epoch(params.end_time),
					    nmxp_data_yday_from_epoch(params.end_time));
				}
656 657
			    }

658 659
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
660
				    channelList_subset->channel[request_chan].name,
661 662 663
				    str_start_time,
				    str_end_time);
			}
664

665 666
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "r");
			if(data_seed.outfile_mseed) {
667
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "File %s already exist. It will not be overrode.\n",
668 669 670
				    NMXP_LOG_STR(data_seed.filename_mseed));
			    fclose(data_seed.outfile_mseed);
			    data_seed.outfile_mseed = NULL;
671
			} else {
672

673 674
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
675
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
676
				    NMXP_LOG_STR(data_seed.filename_mseed));
677
			}
678

679
			}
680 681
		    }
#endif
682

683 684 685 686
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
687
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
688 689 690 691 692 693 694
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
695

696
		    }
697

698 699
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
700

701 702
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret = %d, type = %d, length = %d, recv_errno = %d\n",
			    ret, type, length, recv_errno);
703

704
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
705

706 707 708
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
709

710 711 712
			/* To prevent to manage a packet with zero sample after nmxp_data_trim() */
			if(pd->nSamp > 0) {

713 714
			/* Log contents of last packet */
			if(params.flag_logdata) {
715
			    nmxp_data_log(pd, params.flag_logsample);
716
			}
717

718 719
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
720 721 722 723 724

			/* It is not the channel I have requested or error from nmxp_chan_lookupKeyIndex() */
			if(request_chan != cur_chan  &&  cur_chan != -1) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "request_chan != cur_chan  %d != %d! (%d, %s) (%d, %s.%s.%s)\n",
				    request_chan, cur_chan,
725 726 727
				    channelList_subset->channel[request_chan].key,
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    pd->key, NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel));
728
			} else {
729

730
			/* Management of gaps */
731
			nmxptool_chanseq_gap(&(channelList_Seq[cur_chan]), pd);
732

733
#ifdef HAVE_LIBMSEED
734
			/* Write Mini-SEED record */
735
			if(params.type_writeseed) {
736 737
			    nmxptool_write_miniseed(pd);
			}
738
#endif
739

740
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
741 742 743 744
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
745 746
#endif

747 748 749 750 751 752
#ifdef HAVE_EARTHWORMOBJS
			if(params.ew_configuration_file) {
			    nmxptool_ew_nmx2ew(pd);
			}
#endif

753 754 755 756 757 758 759 760
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
761
			}
762

763 764 765
			/* Store x_1 */
			channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];

766 767
			}

768 769
			} else {
			    /* TODO: nSamp <= 0 */
770
			}
771

772

773 774
			/* Free pd->buffer */
			if(pd->buffer) {
775
			    NMXP_MEM_FREE(pd->buffer);
776 777
			    pd->buffer = NULL;
			}
778

779 780
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
781
			/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
782
		    }
783

784 785 786 787 788
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
789

790
#ifdef HAVE_LIBMSEED
791
		    if(params.type_writeseed  &&  data_seed.outfile_mseed) {
792 793 794 795
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
796
#endif
797

798 799
		} else {
		    /* TODO: error message */
800
		}
801
		request_chan++;
802 803 804 805 806

#ifdef HAVE_EARTHWORMOBJS
		if(params.ew_configuration_file) {

		    /* Check if we are being asked to terminate */
807
		    if( (ew_check_flag_terminate = nmxptool_ew_check_flag_terminate()) ) {
808
			logit ("t", "nmxptool terminating on request\n");
809
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ, NULL, params.hostname);
810 811 812 813 814 815 816 817 818 819
			exitdapcondition = 0;
			times_flow = TIMES_FLOW_EXIT;
		    }

		    /* Check if we need to send heartbeat message */
		    nmxptool_ew_send_heartbeat_if_needed();

		}
#endif

820
	    }
821
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
822

823 824 825 826 827 828 829 830 831 832
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
833
	    } else {
834
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
835
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
836

837
	} /* END while(exitdapcondition) */
838

839 840 841 842 843
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);
844
	naqssock = 0;
845 846 847 848 849

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

850
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
851

Matteo Quintiliani's avatar
Matteo Quintiliani committed
852
    } else {
853

854 855
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

856 857 858 859 860
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
861
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds, nmxptool_sigcondition_read);
862 863 864

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
865 866
	}

867 868 869 870 871 872 873 874 875 876 877
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}
878 879
	/* Get a subset of channel from arguments, in respect to the step 3 of PDS */
	channelList_subset_waste = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
880

Matteo Quintiliani's avatar
Matteo Quintiliani committed
881 882
	/* Free the complete channel list */
	if(channelList) {
883
	    NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
884 885 886
	    channelList = NULL;
	}

887 888
	/* TODO check if channelList_subset_waste is equal to channelList_subset and free */
	if(channelList_subset_waste) {
889
	    NMXP_MEM_FREE(channelList_subset_waste);
890 891
	    channelList_subset_waste = NULL;
	}
892

Matteo Quintiliani's avatar
Matteo Quintiliani committed
893
#ifdef HAVE_LIBMSEED
894
	if(params.type_writeseed) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
895 896
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
897
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
898 899 900

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
901
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
902
			NMXP_LOG_STR(data_seed.filename_mseed));
Matteo Quintiliani's avatar
Matteo Quintiliani committed
903
	    } else {
904 905
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n",
			NMXP_LOG_STR(data_seed.filename_mseed));
Matteo Quintiliani's avatar
Matteo Quintiliani committed
906 907 908
	    }
	}
#endif
909 910 911 912 913 914 915 916 917 918 919
	
	/* PDS Step 4: Send a Request Pending (optional) */

	/* PDS Step 5: Send AddChannels */
	/* Request Data */

	/* Better using a Thread */
#ifndef HAVE_PTHREAD_H
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate,
		(params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO, params.n_channel, params.usec, 1);
#else
920 921 922 923
	pthread_attr_init(&attr_request_channels);
	pthread_attr_setdetachstate(&attr_request_channels, PTHREAD_CREATE_JOINABLE);
	pthread_create(&thread_request_channels, &attr_request_channels, p_nmxp_sendAddTimeSeriesChannel, (void *)NULL);
	pthread_attr_destroy(&attr_request_channels);
924 925
#endif

926
#ifndef HAVE_WINDOWS_H
927
#ifdef HAVE_PTHREAD_H
928 929
	if(!already_listen  &&  params.listen_port != DEFAULT_LISTEN_PORT) {
	    already_listen = 1;
930 931
	    pthread_attr_init(&attr_socket_listen);
	    pthread_attr_setdetachstate(&attr_socket_listen, PTHREAD_CREATE_DETACHED);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
932
	    pthread_create(&thread_socket_listen, &attr_socket_listen, nmxptool_listen, (void *)params.listen_port);
933 934
	    pthread_attr_destroy(&attr_socket_listen);
	}
935
#endif
936 937
#endif

938
	/* PDS Step 6: Repeat until finished: receive and handle packets */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
939

940
	/* TODO*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
941 942
	exitpdscondition = 1;

943
	skip_current_packet = 0;
944

945
	while(exitpdscondition && !nmxptool_sigcondition_read()) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
946

947
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
948
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
949

950 951 952 953 954 955