nmxptool.c 44.4 KB
Newer Older
Matteo Quintiliani's avatar
Matteo Quintiliani committed
1 2 3 4 5 6 7 8 9
/*! \file
 *
 * \brief Nanometrics Protocol Tool
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxptool.c,v 1.202 2008-11-05 15:28:13 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
12 13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14 15
#include "config.h"

16 17 18
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#include <errno.h>
20
#include <signal.h>
21

22 23 24
#include <sys/stat.h>
#include <unistd.h>

25 26 27 28
#include <nmxp.h>
#include "nmxptool_getoptlong.h"
#include "nmxptool_chanseq.h"
#include "nmxptool_sigcondition.h"
29
#include <nmxptool_listen.h>
30

31 32 33
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#else
34
#warning Requests of channels could not be efficient because they do not use a separate thread. 
35 36
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
37 38 39 40
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif

41
#ifdef HAVE_EARTHWORMOBJS
42
#include "nmxptool_ew.h"
43
#endif
44

45 46 47
#ifdef HAVE_LIBMSEED
#include <libmseed.h>
#endif
48

49 50
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_H
#include "seedlink_plugin.h"
51 52
#endif

53 54
#define TIMES_FLOW_EXIT 100

55 56
int if_dap_condition_only_one_time = 0;

57
#define DAP_CONDITION(params_struct) ( params_struct.start_time != 0.0 || params_struct.delay > 0 )
58
#define EXIT_CONDITION (!nmxptool_sigcondition_read()  &&  !ew_check_flag_terminate  &&  !if_dap_condition_only_one_time)
59

60 61 62
#define CURRENT_NETWORK ( (params.network)? params.network : DEFAULT_NETWORK )
#define NETCODE_OR_CURRENT_NETWORK ( (network_code[0] != 0)? network_code : CURRENT_NETWORK )

63 64
static void ShutdownHandler(int sig);
static void AlarmHandler(int sig);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
65

Matteo Quintiliani's avatar
Matteo Quintiliani committed
66 67 68 69 70
void flushing_raw_data_stream();

void *nmxptool_print_info_raw_stream(void *arg);
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd);
void nmxptool_str_time_to_filename(char *str_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
71

72
#ifdef HAVE_LIBMSEED
Matteo Quintiliani's avatar
Matteo Quintiliani committed
73
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
74 75
int nmxptool_log_miniseed(const char *s);
int nmxptool_logerr_miniseed(const char *s);
76 77
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
78
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
Matteo Quintiliani's avatar
Matteo Quintiliani committed
79
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
80 81
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
82 83
#ifdef HAVE_PTHREAD_H
pthread_t thread_request_channels;
84
pthread_attr_t attr_request_channels;
85
void *status_thread;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
86 87
void *p_nmxp_sendAddTimeSeriesChannel(void *arg);
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
88

89 90 91 92
#ifdef HAVE_PTHREAD_H
pthread_t thread_socket_listen;
pthread_attr_t attr_socket_listen;
void *status_thread_socket_listen;
93
int already_listen = 0;
94 95
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
96

97 98 99 100 101
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
int naqssock = 0;
FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL;
102
NMXP_CHAN_LIST_NET *channelList_subset = NULL;
103
NMXP_CHAN_LIST_NET *channelList_subset_waste = NULL;
104
NMXPTOOL_CHAN_SEQ *channelList_Seq = NULL;
105
NMXP_META_CHAN_LIST *meta_channelList = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
106 107 108
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);

109 110 111 112 113 114 115

#ifdef HAVE_LIBMSEED
/* Mini-SEED variables */
NMXP_DATA_SEED data_seed;
MSRecord *msr_list_chan[MAX_N_CHAN];
#endif

116
int ew_check_flag_terminate = 0;
117

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
int mkdirp(const char *filename, mode_t mode) {
    const char sepdir = '/';
    char *dir = strdup(filename);
    int i, l;
    int	error=0;

    if(!filename) return -1;
    dir = strdup(filename);
    if(!dir) return -1;

    l = strlen(dir);
    i = 0;
    while(i < l  &&  error != -1) {
	if(dir[i] == sepdir  &&  i > 0) {
	    dir[i] = 0;
	    /* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "trying to create %s...\n", dir); */
	    if(chdir(dir) == -1) {
		error=mkdir(dir, mode);
	    }
	    dir[i] = sepdir;
	}
	i++;
    }
    if(error != -1) {
	error=mkdir(dir, mode);
    }

    free(dir);
    return error;
}


150
int main (int argc, char **argv) {
151
    int32_t connection_time;
152
    int request_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
153
    int i_chan, cur_chan = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
154
    int to_cur_chan = 0;
155
    int request_chan;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
156
    int exitpdscondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
157
    int exitdapcondition;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
158
    time_t timeout_for_channel;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
159

Matteo Quintiliani's avatar
Matteo Quintiliani committed
160
    int span_interval = 10;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
161 162
    int time_to_sleep = 0;

163 164
    char str_start_time[200] = "";
    char str_end_time[200] = "";
165 166

    NMXP_MSG_SERVER type;
167
    void *buffer = NULL;
168
    int32_t length;
169
    int ret;
170
    int main_ret = 0;
171

172
    int pd_null_count = 0;
173
    int timeoutrecv_warning = 300; /* 5 minutes */
174

175 176
    int times_flow = 0;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
177 178
    int recv_errno = 0;

179 180
    char filename[500] = "";
    char station_code[20] = "", channel_code[20] = "", network_code[20] = "";
181

Matteo Quintiliani's avatar
Matteo Quintiliani committed
182
    char cur_after_start_time_str[1024];
183 184
    double cur_after_start_time = DEFAULT_BUFFERED_TIME;
    int skip_current_packet = 0;
185

186
    double default_start_time = 0.0;
187
    char start_time_str[30], end_time_str[30], default_start_time_str[30];
188

189
    NMXP_DATA_PROCESS *pd = NULL;
190

Matteo Quintiliani's avatar
Matteo Quintiliani committed
191
#ifndef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
192 193 194
    /* Signal handling, use POSIX calls with standardized semantics */
    struct sigaction sa;

195
    sa.sa_handler = AlarmHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
196 197 198
    sa.sa_flags = SA_RESTART;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
199

200
    sa.sa_handler = ShutdownHandler;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
201 202 203 204 205 206 207
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGQUIT, &sa, NULL); 
    sigaction(SIGTERM, &sa, NULL);

    sa.sa_handler = SIG_IGN;
    sigaction(SIGHUP, &sa, NULL);
    sigaction(SIGPIPE, &sa, NULL); 
208
#else
Matteo Quintiliani's avatar
Matteo Quintiliani committed
209 210
    /* Signal handling, use function signal() */

211
    /*
212
    signal(SIGALRM, AlarmHandler);
213
    */
214 215

    signal(SIGINT, ShutdownHandler);
216
    /*
217
    signal(SIGQUIT, ShutdownHandler);
218
    */
219 220
    signal(SIGTERM, ShutdownHandler);

221
    /*
222 223
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
224
    */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
225 226
#endif

227
    nmxptool_sigcondition_init();
228

Matteo Quintiliani's avatar
Matteo Quintiliani committed
229
    /* Default is normal output */
230
    nmxp_log(NMXP_LOG_SET, NMXP_LOG_D_NULL);
231 232 233 234 235 236

    /* Initialize params from argument values */
    if(nmxptool_getopt_long(argc, argv, &params) != 0) {
	return 1;
    }

237
    if(params.ew_configuration_file) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
238 239

#ifdef HAVE_EARTHWORMOBJS
240

241 242
	nmxp_log_init(nmxptool_ew_logit_msg, nmxptool_ew_logit_err);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
243
	nmxptool_ew_configure(argv, &params);
244

Matteo Quintiliani's avatar
Matteo Quintiliani committed
245 246 247 248
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}
249

250 251 252
#else
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Earthworm feature has not been enabled in compilation!\n");
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
253
#endif
254

255
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
256

Matteo Quintiliani's avatar
Matteo Quintiliani committed
257 258
	nmxp_log_init(nmxp_log_stdout, nmxp_log_stderr);

259 260 261 262 263
	/* Check consistency of params */
	if(nmxptool_check_params(&params) != 0) {
	    return 1;
	}

264 265
	/* List available channels on server */
	if(params.flag_listchannels) {
266

267 268 269 270
	    meta_channelList = nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES, params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read);

	    /* nmxp_meta_chan_print(meta_channelList); */
	    nmxp_meta_chan_print_with_match(meta_channelList, params.channels);
271

272
	    return 1;
273

274 275
	} else if(params.flag_listchannelsnaqs) {

276
	    channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
277 278 279 280

	    /* nmxp_chan_print_channelList(channelList); */
	    nmxp_chan_print_channelList_with_match(channelList, params.channels);

281
	    return 1;
282

283 284
	}
    }
285

286
    nmxp_log(NMXP_LOG_SET, params.verbose_level);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
287 288 289
    if(params.verbose_level != DEFAULT_VERBOSE_LEVEL) {
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "verbose_level %d\n", params.verbose_level);
    }
290

291 292
#ifdef HAVE_LIBMSEED
    if(params.flag_writeseed) {
293
	ms_loginit((void*)&nmxptool_log_miniseed, NULL, (void*)&nmxptool_logerr_miniseed, "error: ");
294 295 296 297 298
	/* Init mini-SEED variables */
	nmxp_data_seed_init(&data_seed);
    }
#endif

299 300 301 302
    nmxptool_log_params(&params);

    if(params.stc == -1) {

303
#ifndef HAVE_WINDOWS_H
304
	p_func_pd[n_func_pd++] = nmxptool_listen_print_seq_no;
305
#endif
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
	if(params.flag_logdata) {
	    p_func_pd[n_func_pd++] = nmxptool_print_seq_no;
	}

#ifdef HAVE_LIBMSEED
	/* Write Mini-SEED record */
	if(params.flag_writeseed) {
	    p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	}
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	/* Send data to SeedLink Server */
	if(params.flag_slink) {
	    p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	}
#endif

#ifdef HAVE_EARTHWORMOBJS
	if(params.ew_configuration_file) {
	    p_func_pd[n_func_pd++] = nmxptool_ew_nmx2ew;
	}
#endif

    }

#ifdef HAVE_EARTHWORMOBJS
    if(params.ew_configuration_file) {
	nmxptool_ew_attach();
    }
#endif

    /* Exit only on request */
340
    while(EXIT_CONDITION) {
341

342
    NMXP_MEM_PRINT_PTR(0);
343

344
    /* Get list of available channels and get a subset list of params.channels */
345
    if( DAP_CONDITION(params) ) {
346
	if_dap_condition_only_one_time = 1;
347
	/* From DataServer */
348
	if(!nmxp_getMetaChannelList(params.hostname, params.portnumberdap, NMXP_DATA_TIMESERIES,
349
		    params.flag_request_channelinfo, params.datas_username, params.datas_password, &channelList, nmxptool_sigcondition_read)) {
350 351
	    return -1;
	}
352 353
    } else {
	/* From NaqsServer */
354
	channelList = nmxp_getAvailableChannelList(params.hostname, params.portnumberpds, NMXP_DATA_TIMESERIES, nmxptool_sigcondition_read);
355 356
    }

357 358 359 360 361
    if(!channelList) {
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channel list has not been received!\n");
	return 1;
    }

362
    channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
363 364 365
    
    /* Free the complete channel list */
    if(channelList) {
366
	NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
367 368
	channelList = NULL;
    }
369 370 371

    /* Check if some channel already exists */
    if(channelList_subset->number <= 0) {
372
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Channels not found!\n");
373
	return 1;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
374
    } else {
375
	nmxp_chan_print_netchannelList(channelList_subset);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
376

377
	nmxptool_chanseq_init(&channelList_Seq, channelList_subset->number, DEFAULT_BUFFERED_TIME, params.max_tolerable_latency, params.timeoutrecv);
378

Matteo Quintiliani's avatar
Matteo Quintiliani committed
379
#ifdef HAVE_LIBMSEED
380 381
	if(params.flag_writeseed) {
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Init mini-SEED record list.\n");
382

383 384
	    /* Init mini-SEED record list */
	    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
385

386 387
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA,
			"Init mini-SEED record for %s\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
388

389
		msr_list_chan[i_chan] = msr_init(NULL);
390

391 392
		/* Separate station_code and channel_code */
		if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[i_chan].name, station_code, channel_code, network_code)) {
393

394 395
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s.%s.%s\n",
			    NMXP_LOG_STR(NETCODE_OR_CURRENT_NETWORK), NMXP_LOG_STR(station_code), NMXP_LOG_STR(channel_code));
396

397 398 399
		    strcpy(msr_list_chan[i_chan]->network, NETCODE_OR_CURRENT_NETWORK);
		    strcpy(msr_list_chan[i_chan]->station, station_code);
		    strcpy(msr_list_chan[i_chan]->channel, channel_code);
400

401 402
		    msr_list_chan[i_chan]->reclen = 512;         /* byte record length */
		    msr_list_chan[i_chan]->encoding = DE_STEIM1;  /* Steim 1 compression */
403

404 405 406 407 408
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL,
			    "Channels %s error in format!\n", NMXP_LOG_STR(channelList_subset->channel[i_chan].name));
		    return 1;
		}
409

410
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
411 412
	}
#endif
413

414 415
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
416
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin communication.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
417

418
    times_flow = 0;
419
    recv_errno = 0;
420

421
    while(times_flow < 2  &&  recv_errno == 0 && !nmxptool_sigcondition_read()) {
422

423
	if(params.statefile) {
424
	    nmxptool_chanseq_load_states(channelList_subset, channelList_Seq, params.statefile);
425 426
	}

427 428 429 430 431 432 433 434 435 436 437
	if(times_flow == 0) {
	    if(params.statefile) {
		params.interval = DEFAULT_INTERVAL_INFINITE;
	    }
	} else if(times_flow == 1) {
	    params.start_time = 0.0;
	    params.end_time = 0.0;
	    params.interval = DEFAULT_INTERVAL_NO_VALUE;

	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
438
    /* Condition for starting DAP or PDS */
439 440
    if( DAP_CONDITION(params) ||
	    (times_flow == 0  &&  params.statefile && params.max_data_to_retrieve > 0 && params.interval == DEFAULT_INTERVAL_INFINITE) ) {
441 442

	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin DAP Flow.\n");
443

444 445 446 447 448 449 450
	if(params.interval > 0  ||  params.interval == DEFAULT_INTERVAL_INFINITE) {
	    if(params.interval > 0) {
		params.end_time = params.start_time + params.interval;
	    } else {
		params.end_time = nmxp_data_gmtime_now();
	    }
	} else if(params.delay > 0) {
451
	    params.start_time = ((double) (time(NULL) - params.delay - span_interval) / 10.0) * 10.0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
452 453 454
	    params.end_time = params.start_time + span_interval;
	}

455

456 457 458
	/* ************************************************************** */
	/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************** */
459

460
	/* DAP Step 1: Open a socket */
461
	if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap, nmxptool_sigcondition_read)) == NMXP_SOCKET_ERROR) {
462
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
463 464
	    return 1;
	}
465

466 467
	/* DAP Step 2: Read connection time */
	if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
468
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
469 470
	    return 1;
	}
471

472 473
	/* DAP Step 3: Send a ConnectRequest */
	if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
474
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
475 476 477 478 479
	    return 1;
	}

	/* DAP Step 4: Wait for a Ready message */
	if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
480
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
481 482
	    return 1;
	}
483

484 485
	exitdapcondition = 1;

486
	default_start_time = (params.start_time > 0.0)? params.start_time : nmxp_data_gmtime_now() - params.max_data_to_retrieve;
487

488
	while(exitdapcondition  &&  !nmxptool_sigcondition_read()) {
489

490
	    /* Start loop for sending requests */
491
	    request_chan=0;
492
	    request_SOCKET_OK = NMXP_SOCKET_OK;
493

494
	    /* For each channel */
495
	    while(request_SOCKET_OK == NMXP_SOCKET_OK  &&  request_chan < channelList_subset->number  &&  exitdapcondition && !nmxptool_sigcondition_read()) {
496

497
		if(params.statefile) {
498 499
		    if(channelList_Seq[request_chan].after_start_time > 0) {
			params.start_time = channelList_Seq[request_chan].after_start_time;
500
			if(params.end_time - params.start_time > params.max_data_to_retrieve) {
501
			    nmxp_data_to_str(start_time_str, params.start_time);
502
			    nmxp_data_to_str(default_start_time_str, params.end_time - params.max_data_to_retrieve);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
503
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "%s start_time changed from %s to %s\n",
504 505 506
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    NMXP_LOG_STR(start_time_str),
				    NMXP_LOG_STR(default_start_time_str));
507
			    params.start_time = params.end_time - params.max_data_to_retrieve;
508
			}
509 510 511
		    } else {
			params.start_time = default_start_time;
		    }
512 513
		    channelList_Seq[request_chan].last_time = params.start_time;
		    channelList_Seq[request_chan].significant = 1;
514

515
		}
516

517 518
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "nmxp_sendDataRequest %d %s (%d)\n",
			channelList_subset->channel[request_chan].key,
519 520
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			request_chan);
521

522 523 524
		nmxp_data_to_str(start_time_str, params.start_time);
		nmxp_data_to_str(end_time_str, params.end_time);
		nmxp_data_to_str(default_start_time_str, default_start_time);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
525
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "%s start_time = %s - end_time = %s - (default_start_time = %s)\n",
526 527 528 529
			NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
			NMXP_LOG_STR(start_time_str),
			NMXP_LOG_STR(end_time_str),
			NMXP_LOG_STR(default_start_time_str));
530

531
		/* DAP Step 5: Send Data Request */
532
		request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[request_chan].key, (int32_t) params.start_time, (int32_t) (params.end_time + 1.0));
533

534
		if(request_SOCKET_OK == NMXP_SOCKET_OK) {
535

536 537 538 539
		    nmxp_data_to_str(str_start_time, params.start_time);
		    nmxp_data_to_str(str_end_time, params.end_time);
		    nmxptool_str_time_to_filename(str_start_time);
		    nmxptool_str_time_to_filename(str_end_time);
540

541 542
		    if(params.flag_writefile) {
			/* Open output file */
543
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
544 545 546 547 548 549 550 551
			    sprintf(filename, "%s.%s.%s_%s_%s.nmx",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
			} else {
			    sprintf(filename, "%s_%s_%s.nmx",
552
				    channelList_subset->channel[request_chan].name,
553 554 555
				    str_start_time,
				    str_end_time);
			}
556

557 558
			outfile = fopen(filename, "w");
			if(!outfile) {
559
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
560
				    NMXP_LOG_STR(filename));
561
			}
562
		    }
563 564

#ifdef HAVE_LIBMSEED
565
		    if(params.flag_writeseed) {
566
			char dirsdschan[1024];
567
			/* Open output Mini-SEED file */
568
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
			    sprintf(dirsdschan, "%s/%d/%s/%s/%s.D", params.outdirseed, nmxp_data_year_from_epoch(params.start_time), NETCODE_OR_CURRENT_NETWORK, station_code, channel_code);
			    if(mkdirp(dirsdschan, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH) == -1) {
				/* ERROR */
				nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Directory %s has not been created!\n", dirsdschan);
			    } else {
				nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Directory %s created!\n", dirsdschan);
			    }

			    if(chdir(dirsdschan) == -1) {
				/* ERROR */
				nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Directory %s does not exist!\n", dirsdschan);
			    } else {
				nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "Directory %s exist!\n", dirsdschan);
			    }


			    /*
586 587 588 589 590 591
			    sprintf(data_seed.filename_mseed, "%s.%s.%s_%s_%s.miniseed",
				    NETCODE_OR_CURRENT_NETWORK,
				    station_code,
				    channel_code,
				    str_start_time,
				    str_end_time);
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
				    */

			    /* TODO if the requested data covers different days
			     * filename contains also the year and the yday of the end time, this breaks SDS structure */
			    if( ( nmxp_data_year_from_epoch(params.start_time) == nmxp_data_year_from_epoch(params.end_time) )
				    &&  ( nmxp_data_yday_from_epoch(params.start_time) == nmxp_data_yday_from_epoch(params.end_time)) ) {
				sprintf(data_seed.filename_mseed, "%s.%s..%s.D.%d.%03d",
					NETCODE_OR_CURRENT_NETWORK,
					station_code,
					channel_code,
					nmxp_data_year_from_epoch(params.start_time),
					nmxp_data_yday_from_epoch(params.start_time));
			    } else {
				sprintf(data_seed.filename_mseed, "%s.%s..%s.D.%d.%03d-%d.%03d",
					NETCODE_OR_CURRENT_NETWORK,
					station_code,
					channel_code,
					nmxp_data_year_from_epoch(params.start_time),
					nmxp_data_yday_from_epoch(params.start_time),
					nmxp_data_year_from_epoch(params.end_time),
					nmxp_data_yday_from_epoch(params.end_time));
			    }

615 616
			} else {
			    sprintf(filename, "%s_%s_%s.miniseed",
617
				    channelList_subset->channel[request_chan].name,
618 619 620
				    str_start_time,
				    str_end_time);
			}
621

622 623
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "r");
			if(data_seed.outfile_mseed) {
624
			    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "File %s already exist. It will not be overrode.\n",
625 626 627
				    NMXP_LOG_STR(data_seed.filename_mseed));
			    fclose(data_seed.outfile_mseed);
			    data_seed.outfile_mseed = NULL;
628
			} else {
629

630 631
			data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
			if(!data_seed.outfile_mseed) {
632
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
633
				    NMXP_LOG_STR(data_seed.filename_mseed));
634
			}
635

636
			}
637 638
		    }
#endif
639

640 641 642 643
		    if(params.flag_writefile  &&  outfile) {
			/* Compute SNCL line */

			/* Separate station_code_old_way and channel_code_old_way */
644
			if(nmxp_chan_cpy_sta_chan(channelList_subset->channel[request_chan].name, station_code, channel_code, network_code)) {
645 646 647 648 649 650 651
			    /* Write SNCL line */
			    fprintf(outfile, "%s.%s.%s.%s\n",
				    station_code,
				    NETCODE_OR_CURRENT_NETWORK,
				    channel_code,
				    (params.location)? params.location : "");
			}
652

653
		    }
654

655 656
		    /* DAP Step 6: Receive Data until receiving a Ready message */
		    ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
657

658 659
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret = %d, type = %d, length = %d, recv_errno = %d\n",
			    ret, type, length, recv_errno);
660

661
		    while(ret == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
662

663 664 665
			/* Process a packet and return value in NMXP_DATA_PROCESS structure */
			pd = nmxp_processCompressedData(buffer, length, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
			nmxp_data_trim(pd, params.start_time, params.end_time, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
666

667 668 669
			/* To prevent to manage a packet with zero sample after nmxp_data_trim() */
			if(pd->nSamp > 0) {

670 671
			/* Log contents of last packet */
			if(params.flag_logdata) {
672
			    nmxp_data_log(pd, params.flag_logsample);
673
			}
674

675 676
			/* Set cur_chan */
			cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
677 678 679 680 681

			/* It is not the channel I have requested or error from nmxp_chan_lookupKeyIndex() */
			if(request_chan != cur_chan  &&  cur_chan != -1) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "request_chan != cur_chan  %d != %d! (%d, %s) (%d, %s.%s.%s)\n",
				    request_chan, cur_chan,
682 683 684
				    channelList_subset->channel[request_chan].key,
				    NMXP_LOG_STR(channelList_subset->channel[request_chan].name),
				    pd->key, NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel));
685
			} else {
686

687
			/* Management of gaps */
688
			nmxptool_chanseq_gap(&(channelList_Seq[cur_chan]), pd);
689

690
#ifdef HAVE_LIBMSEED
691 692 693 694
			/* Write Mini-SEED record */
			if(params.flag_writeseed) {
			    nmxptool_write_miniseed(pd);
			}
695
#endif
696

697
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
698 699 700 701
			/* Send data to SeedLink Server */
			if(params.flag_slink) {
			    nmxptool_send_raw_depoch(pd);
			}
702 703
#endif

704 705 706 707 708 709
#ifdef HAVE_EARTHWORMOBJS
			if(params.ew_configuration_file) {
			    nmxptool_ew_nmx2ew(pd);
			}
#endif

710 711 712 713 714 715 716 717
			if(params.flag_writefile  &&  outfile) {
			    /* Write buffer to the output file */
			    if(outfile && buffer && length > 0) {
				int32_t length_int = length;
				nmxp_data_swap_4b((int32_t *) &length_int);
				fwrite(&length_int, sizeof(length_int), 1, outfile);
				fwrite(buffer, length, 1, outfile);
			    }
718
			}
719

720 721 722
			/* Store x_1 */
			channelList_Seq[cur_chan].x_1 = pd->pDataPtr[pd->nSamp-1];

723 724
			}

725 726
			} else {
			    /* TODO: nSamp <= 0 */
727
			}
728

729

730 731
			/* Free pd->buffer */
			if(pd->buffer) {
732
			    NMXP_MEM_FREE(pd->buffer);
733 734
			    pd->buffer = NULL;
			}
735

736 737
			/* Receive Data */
			ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
738
			/* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "ret = %d, type = %d\n", ret, type); */
739
		    }
740

741 742 743 744 745
		    if(params.flag_writefile  &&  outfile) {
			/* Close output file */
			fclose(outfile);
			outfile = NULL;
		    }
746

747
#ifdef HAVE_LIBMSEED
748 749 750 751 752
		    if(params.flag_writeseed  &&  data_seed.outfile_mseed) {
			/* Close output Mini-SEED file */
			fclose(data_seed.outfile_mseed);
			data_seed.outfile_mseed = NULL;
		    }
753
#endif
754

755 756
		} else {
		    /* TODO: error message */
757
		}
758
		request_chan++;
759 760 761 762 763

#ifdef HAVE_EARTHWORMOBJS
		if(params.ew_configuration_file) {

		    /* Check if we are being asked to terminate */
764
		    if( (ew_check_flag_terminate = nmxptool_ew_check_flag_terminate()) ) {
765
			logit ("t", "nmxptool terminating on request\n");
766
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_TERMREQ, NULL, params.hostname);
767 768 769 770 771 772 773 774 775 776
			exitdapcondition = 0;
			times_flow = TIMES_FLOW_EXIT;
		    }

		    /* Check if we need to send heartbeat message */
		    nmxptool_ew_send_heartbeat_if_needed();

		}
#endif

777
	    }
778
	    /* DAP Step 7: Repeat steps 5 and 6 for each data request */
779

780 781 782 783 784 785 786 787 788 789
	    if(params.delay > 0) {
		time_to_sleep = (params.end_time - params.start_time) - (time(NULL) - (params.start_time + params.delay + span_interval));
		if(time_to_sleep >= 0) {
		    nmxp_sleep(time_to_sleep);
		} else {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "time to sleep %d sec.\n", time_to_sleep);
		    nmxp_sleep(3);
		}
		params.start_time = params.end_time;
		params.end_time = params.start_time + span_interval;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
790
	    } else {
791
		exitdapcondition = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
792
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
793

794
	} /* END while(exitdapcondition) */
795

796 797 798 799 800
	/* DAP Step 8: Send a Terminate message (optional) */
	nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

	/* DAP Step 9: Close the socket */
	nmxp_closeSocket(naqssock);
801
	naqssock = 0;
802 803 804 805 806

	/* ************************************************************ */
	/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
	/* ************************************************************ */

807
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "End DAP Flow.\n");
808

Matteo Quintiliani's avatar
Matteo Quintiliani committed
809
    } else {
810

811 812
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Begin PDS Flow.\n");

813 814 815 816 817
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* ************************************************************* */

	/* PDS Step 1: Open a socket */
818
	naqssock = nmxp_openSocket(params.hostname, params.portnumberpds, nmxptool_sigcondition_read);
819 820 821

	if(naqssock == NMXP_SOCKET_ERROR) {
	    return 1;
822 823
	}

824 825 826 827 828 829 830 831 832 833 834
	/* PDS Step 2: Send a Connect */
	if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
	    printf("Error on sendConnect()\n");
	    return 1;
	}

	/* PDS Step 3: Receive ChannelList */
	if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
	    printf("Error on receiveChannelList()\n");
	    return 1;
	}
835 836
	/* Get a subset of channel from arguments, in respect to the step 3 of PDS */
	channelList_subset_waste = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels, CURRENT_NETWORK);
837

Matteo Quintiliani's avatar
Matteo Quintiliani committed
838 839
	/* Free the complete channel list */
	if(channelList) {
840
	    NMXP_MEM_FREE(channelList);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
841 842 843
	    channelList = NULL;
	}

844 845
	/* TODO check if channelList_subset_waste is equal to channelList_subset and free */
	if(channelList_subset_waste) {
846
	    NMXP_MEM_FREE(channelList_subset_waste);
847 848
	    channelList_subset_waste = NULL;
	}
849

Matteo Quintiliani's avatar
Matteo Quintiliani committed
850 851 852 853
#ifdef HAVE_LIBMSEED
	if(params.flag_writeseed) {
	    /* Open output Mini-SEED file */
	    sprintf(data_seed.filename_mseed, "%s.realtime.miniseed",
854
		    CURRENT_NETWORK);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
855 856 857

	    data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
	    if(!data_seed.outfile_mseed) {
858
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_EXTRA, "Can not open file %s!\n",
859
			NMXP_LOG_STR(data_seed.filename_mseed));
Matteo Quintiliani's avatar
Matteo Quintiliani committed
860
	    } else {
861 862
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "Opened file %s!\n",
			NMXP_LOG_STR(data_seed.filename_mseed));
Matteo Quintiliani's avatar
Matteo Quintiliani committed
863 864 865
	    }
	}
#endif
866 867 868 869 870 871 872 873 874 875 876
	
	/* PDS Step 4: Send a Request Pending (optional) */

	/* PDS Step 5: Send AddChannels */
	/* Request Data */

	/* Better using a Thread */
#ifndef HAVE_PTHREAD_H
	nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate,
		(params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO, params.n_channel, params.usec, 1);
#else
877 878 879 880
	pthread_attr_init(&attr_request_channels);
	pthread_attr_setdetachstate(&attr_request_channels, PTHREAD_CREATE_JOINABLE);
	pthread_create(&thread_request_channels, &attr_request_channels, p_nmxp_sendAddTimeSeriesChannel, (void *)NULL);
	pthread_attr_destroy(&attr_request_channels);
881 882
#endif

883
#ifndef HAVE_WINDOWS_H
884
#ifdef HAVE_PTHREAD_H
885 886
	if(!already_listen  &&  params.listen_port != DEFAULT_LISTEN_PORT) {
	    already_listen = 1;
887 888
	    pthread_attr_init(&attr_socket_listen);
	    pthread_attr_setdetachstate(&attr_socket_listen, PTHREAD_CREATE_DETACHED);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
889
	    pthread_create(&thread_socket_listen, &attr_socket_listen, nmxptool_listen, (void *)params.listen_port);
890 891
	    pthread_attr_destroy(&attr_socket_listen);
	}
892
#endif
893 894
#endif

895
	/* PDS Step 6: Repeat until finished: receive and handle packets */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
896

897
	/* TODO*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
898 899
	exitpdscondition = 1;

900
	skip_current_packet = 0;
901

902
	while(exitpdscondition && !nmxptool_sigcondition_read()) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
903

904
	    /* Process Compressed or Decompressed Data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
905
	    pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
906

907 908 909 910 911 912 913 914 915 916 917
	    if(!pd) {
		pd_null_count++;
		if((pd_null_count * params.timeoutrecv) >= timeoutrecv_warning) {
		    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Received %d times a null packet. (%d sec.)\n",
			    pd_null_count, pd_null_count * params.timeoutrecv);
		    pd_null_count = 0;
		}
	    } else {
		pd_null_count = 0;
	    }

918
	    if(recv_errno == 0) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
919 920
		exitpdscondition = 1;
	    } else {
921 922 923 924 925 926
#ifdef HAVE_WINDOWS_H
		if(recv_errno == WSAEWOULDBLOCK  ||  recv_errno == WSAETIMEDOUT)
#else
		if(recv_errno == EWOULDBLOCK)
#endif
		{
927 928
		    exitpdscondition = 1;
		} else {
929 930
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error receiving data. pd=%p recv_errno=%d\n",
			    pd, recv_errno);
931 932

#ifdef HAVE_EARTHWORMOBJS
933
		    if(params.ew_configuration_file) {
934
			nmxptool_ew_send_error(NMXPTOOL_EW_ERR_RECVDATA, nmxp_strerror(recv_errno), params.hostname);
935
		    }
936
#endif
937 938
		    exitpdscondition = 0;
		}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
939
	    }
940

941 942 943
	    if(pd) {
		/* Set cur_chan */
		cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
944
		if(cur_chan == -1) {
945 946
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Key %d not found in channelList_subset!\n",
			    pd->key);
947
		}
948 949
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
950
	    /* Log contents of last packet */
951
	    if(params.flag_logdata) {
952
		nmxp_data_log(pd, params.flag_logsample);
953
	    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
954

955
	    skip_current_packet = 0;