nmxp.c 36.4 KB
Newer Older
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
1 2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
4 5 6 7 8 9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.95 2009-08-31 12:16:41 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
12 13 14
 */

#include "nmxp.h"
15
#include "nmxp_memory.h"
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
16

Matteo Quintiliani's avatar
Matteo Quintiliani committed
17 18
#include "config.h"

Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
19
#include <stdio.h>
20
#include <stdlib.h>
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
21 22
#include <string.h>

23 24 25 26
#ifdef HAVE_GETTIMEOFDAY
#include <sys/time.h>
#endif

27 28 29 30
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

31 32 33 34
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

35
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
36
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
37 38
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
39 40
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
41 42
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
44 45
    int ret;
    int i;
46
    int recv_errno;
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
47

Matteo Quintiliani's avatar
Matteo Quintiliani committed
48
    NMXP_MSG_SERVER type;
49
    char buffer[NMXP_MAX_LENGTH_DATA_BUFFER]={0};
50
    int32_t length;
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
51 52 53

    *pchannelList = NULL;

54
    ret = nmxp_receiveMessage(isock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
    
    /*TODO controllare ret*/
    if (ret == NMXP_SOCKET_OK) {
        if(type != NMXP_MSG_CHANNELLIST) {
            nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
        } else {
            (*pchannelList) = (NMXP_CHAN_LIST *) NMXP_MEM_MALLOC(length);

            if( (*pchannelList) != NULL) {

                memmove((*pchannelList), buffer, length);

                (*pchannelList)->number = ntohl((*pchannelList)->number);

                nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);

                /* TODO check*/

                for(i=0; i < (*pchannelList)->number; i++) {
                    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
                    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
                            (*pchannelList)->channel[i].key,
                            NMXP_LOG_STR((*pchannelList)->channel[i].name));
                }
            } else {
                nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "nmxp_receiveChannelList() Error allocating pchannelList!\n");
                ret = NMXP_SOCKET_ERROR;
            }
        }
Matteo Quintiliani's avatar
Start  
Matteo Quintiliani committed
84 85 86 87 88 89
    }

    return ret;
}


90
int nmxp_sendAddTimeSeriesChannel_raw(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
91
    int ret = NMXP_SOCKET_OK;
92
    int32_t buffer_length = 16 + (4 * channelList->number); 
93
    char *buffer = NULL;
94
    int32_t app, i, disp;
95

96 97 98
    if(buffer_length > 0) {

	buffer = NMXP_MEM_MALLOC(buffer_length);
99

100
	disp=0;
101

102
	app = htonl(channelList->number);
103 104 105
	memcpy(&buffer[disp], &app, 4);
	disp+=4;

106 107 108 109 110
	for(i=0; i < channelList->number; i++) {
	    app = htonl(channelList->channel[i].key);
	    memcpy(&buffer[disp], &app, 4);
	    disp+=4;
	}
111

112 113 114
	app = htonl(shortTermCompletion);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
115

116 117 118
	app = htonl(out_format);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
119

120 121 122 123 124 125 126 127 128 129 130 131
	app = htonl(buffer_flag);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;

	ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);

	if(buffer) {
	    NMXP_MEM_FREE(buffer);
	    buffer = NULL;
	}
    } else {
	nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "nmxp_sendAddTimeSeriesChannel_raw() buffer length = %d.\n", buffer_length);
132
    }
133

134 135 136
    return ret;
}

137
#define MAX_LEN_S_CHANNELS 4096
138
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag, int n_channel, int n_usec, int flag_restart) {
139 140
    static int i = 0;
    static int first_time = 1;
141
    /*TODO avoid static Stefano*/
142 143
    static struct timeval last_tp_now;

144
    char s_channels[MAX_LEN_S_CHANNELS];
145 146 147 148 149
    int j;
    int ret = 0;
    NMXP_CHAN_LIST_NET split_channelList;
    long diff_usec;
    struct timeval tp_now;
150 151 152 153 154 155 156
    double estimated_time = 0.0;

    if(n_usec == 0  &&  n_channel == 0) {
	n_channel = channelList->number;
    }

    estimated_time = (double) channelList->number * ( ((double) n_usec / 1000000.0) / (double) n_channel);
157

158
    if(flag_restart) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
159
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW,
160 161 162 163 164
		"Estimated time for channel requests: %d * (%d/%d) = %.3f sec.\n",
		channelList->number, n_usec / 1000, n_channel,
		estimated_time);


165 166 167 168
	first_time = 1;
	i = 0;
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
169
    /* Check if requests could be satisfied within NMXP_MAX_MSCHAN_MSEC */
170 171 172 173 174 175 176 177 178
    if(estimated_time > ((double) NMXP_MAX_MSCHAN_MSEC / 1000.0)) {
	n_usec = ( (double) NMXP_MAX_MSCHAN_MSEC * 1000.0 ) * ( (double) n_channel / (double) channelList->number);
	estimated_time = (double) channelList->number * ( ((double) n_usec / 1000000.0) / (double) n_channel);
	if(flag_restart) {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "Estimated time exceeds. New values %d/%d and estimated time %.3f sec.\n",
		    n_usec/1000, n_channel, estimated_time);
	}
    }

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
#ifdef HAVE_GETTIMEOFDAY
    gettimeofday(&tp_now, NULL);
#else
    TODO gettimeofday not found
#endif

    if(i <  channelList->number) {
	    if(first_time) {
		    diff_usec = n_usec + 1;
		    first_time = 0;
		    last_tp_now.tv_sec = 0;
		    last_tp_now.tv_usec = 0;
	    } else {
		    diff_usec = (tp_now.tv_sec - last_tp_now.tv_sec) * 1000000;
		    diff_usec += (tp_now.tv_usec - last_tp_now.tv_usec);
	    }
	    if(diff_usec >= n_usec) {
		    /* while(ret == 0  &&  i <  channelList->number) { */
		    split_channelList.number = 0;
		    while(split_channelList.number < n_channel  &&  i < channelList->number) {
			    split_channelList.channel[split_channelList.number].key = channelList->channel[i].key;
			    /* Not necessary, but it could help for debugging */
201
			    strncpy(split_channelList.channel[split_channelList.number].name, channelList->channel[i].name, NMXP_CHAN_MAX_SIZE_NAME);
202 203 204 205
			    split_channelList.number++;
			    i++;
		    }
		    if(split_channelList.number > 0) {
206
			snprintf(s_channels, MAX_LEN_S_CHANNELS, "%.0f/%d chan %d of %d:",
207
				(double)diff_usec/1000.0, split_channelList.number, i, channelList->number);
208
			    for(j=0; j < split_channelList.number; j++) {
209 210
				strncat(s_channels, " ", MAX_LEN_S_CHANNELS);
				strncat(s_channels, NMXP_LOG_STR(split_channelList.channel[j].name), MAX_LEN_S_CHANNELS);
211
			    }
212
			    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "%s\n", s_channels);
213 214 215 216 217 218 219 220 221 222 223
			    ret = nmxp_sendAddTimeSeriesChannel_raw(isock, &split_channelList, shortTermCompletion, out_format, buffer_flag);
		    }
		    /* } */
		    last_tp_now.tv_sec = tp_now.tv_sec;
		    last_tp_now.tv_usec = tp_now.tv_usec;
	    }
    }

    return ret;
}

224

225
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
226
    NMXP_MSG_SERVER type;
227
    char buffer[NMXP_MAX_LENGTH_DATA_BUFFER]={0};
228
    int32_t length;
229
    NMXP_DATA_PROCESS *pd = NULL;
230

231
    if(nmxp_receiveMessage(isock, &type, buffer, &length, timeoutsec, recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER) == NMXP_SOCKET_OK) {
232
	if(type == NMXP_MSG_COMPRESSED) {
233
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
234
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
235
	} else if(type == NMXP_MSG_DECOMPRESSED) {
236
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
237
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
238
	} else {
239
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
240
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
241 242
    }

243
    return pd;
244 245
}

246

247
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
248
    int ret;
249
    int i;
250
    char crc32buf[100];
251 252
    char *pcrc32buf = crc32buf;
    int crc32buf_length = 0;
253
    NMXP_CONNECT_REQUEST connectRequest;
254
    int naqs_username_length, naqs_password_length;
255
    int32_t protocol_version = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
256
    char *pp = NULL;
257

258 259
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "%s - %s\n",
	    NMXP_LOG_STR(naqs_username), NMXP_LOG_STR(naqs_password));
260

261 262 263
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

264 265 266 267 268 269 270 271
    for(i=0; i < 100; i++) {
	crc32buf[i] = 0;
    }

    for(i=0; i<12; i++) {
	connectRequest.username[i] = 0;
    }
    if(naqs_username_length != 0) {
272
	strncpy(connectRequest.username, naqs_username, NMXP_MAX_SIZE_USERNAME);
273
    }
274 275 276 277

    connectRequest.version = protocol_version;
    connectRequest.connection_time = connection_time;

278 279 280
    if(!nmxp_data_bigendianhost()) {
	nmxp_data_swap_4b ((int32_t *)&connection_time);
    }
281

282
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
283 284
	/* sprintf(crc32buf, "%d%d", protocol_version, connection_time); */
	/* TODO */
285
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
	/* sprintf(crc32buf, "%s%d%d%s", naqs_username, protocol_version,
		connection_time, naqs_password); */

	memcpy(pcrc32buf, naqs_username, naqs_username_length);
	crc32buf_length += naqs_username_length;
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(protocol_version), sizeof(protocol_version));
	crc32buf_length += sizeof(protocol_version);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(connection_time), sizeof(connection_time));
	crc32buf_length += sizeof(connection_time);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, naqs_password, naqs_password_length);
	crc32buf_length += naqs_password_length;
	pcrc32buf = crc32buf + crc32buf_length;

305
    } else if(naqs_username_length != 0 ) {
306 307
	/* sprintf(crc32buf, "%s%d%d", naqs_username, protocol_version, connection_time); */
	/* TODO */
308
    } else if(naqs_password_length != 0 ) {
309 310
	/* sprintf(crc32buf, "%d%d%s", protocol_version, connection_time, naqs_password); */
	/* TODO */
311
    }
312 313 314
    connectRequest.version = htonl(connectRequest.version);
    connectRequest.connection_time = htonl(connectRequest.connection_time);
    connectRequest.crc32 = htonl(crc32(0L, crc32buf, crc32buf_length));
315 316 317 318

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
319 320
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf length %d, crc32 = %d\n",
		crc32buf_length, connectRequest.crc32);
321 322 323
	for(i=0; i < crc32buf_length; i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", crc32buf[i]);
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
324
	pp = (char *) &connectRequest.crc32;
325 326 327 328
	for(i=0; i < sizeof(connectRequest.crc32); i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", pp[i]);
	}
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "\n");
329
    } else {
330
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
331 332 333 334 335 336
    }

    return ret;
}


337
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
338
    int ret;
339 340
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
341
    *connection_time = ntohl(*connection_time);
342
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
343
    if(ret != NMXP_SOCKET_OK) {
344
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
345 346 347 348 349
    }
    return ret;
}


350
int nmxp_waitReady(int isock) {
351 352
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
353 354 355
    int32_t signature;
    int32_t type = 0;
    int32_t length;
356
    int recv_errno;
357 358

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
359
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
360 361 362
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
363
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
364
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
365 366 367
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
368
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
369 370 371 372
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
373
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
374
		    err_length = ntohl(err_length);
375
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
376 377
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
378
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
379 380
			    err_buff[err_length] = 0;
		    }
381
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n",
382
			    NMXP_LOG_STR(err_buff), err_reason);
383
	    }
384 385 386
	    return NMXP_SOCKET_ERROR;
	}

387
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
388 389 390
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
391
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
392
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
393 394 395
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
396 397
		if(type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		    char *str_msg = NULL;
398
		    char *buf_app = (char *) NMXP_MEM_MALLOC(sizeof(char) * length);
399 400 401 402 403 404 405 406 407 408
		    int32_t reason;
		    memcpy(&reason, buf_app, sizeof(reason));
		    reason = ntohl(reason);
		    str_msg = buf_app + sizeof(reason);
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
			    reason,
			    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
			    str_msg);
		    if(buf_app) {
409
			NMXP_MEM_FREE(buf_app);
410
			buf_app = NULL;
411
		    }
412 413 414
		    /* Close the socket*/
		    nmxp_closeSocket(isock);
		    exit(-1);
415
		} else if(length == 4) {
416
		    int32_t app;
417
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
418 419
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
420
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
421
		} else {
422
		    char *buf_app = (char *) NMXP_MEM_MALLOC(sizeof(char) * length);
423
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
424
		    if(buf_app) {
425
			NMXP_MEM_FREE(buf_app);
426
			buf_app = NULL;
427 428 429 430
		    }
		}
	    }
	} else {
431
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
432 433 434
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
435
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
436 437 438 439 440 441
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
442
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
443 444 445 446 447 448 449 450 451
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


452
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
453 454 455 456 457 458 459 460 461 462
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
463
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
464 465 466 467 468 469
    }

    return ret;
}


470
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int (*func_cond)(void)) {
471 472
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
473
    /* int i; */
474

475
    /* 1. Open a socket*/
476
    naqssock = nmxp_openSocket(hostname, portnum, func_cond);
477 478 479

    if(naqssock != NMXP_SOCKET_ERROR) {

480
	/* 2. Send a Connect*/
481 482
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

483
	    /* 3. Receive ChannelList*/
484 485 486
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
487
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d are DataType channel.\n", channelList_subset->number, channelList->number);
488

489
		 /* nmxp_chan_sortByKey(channelList_subset);*/
490 491
		 nmxp_chan_sortByName(channelList_subset);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
492
		 /*
493
		 for(i=0; i < channelList_subset->number; i++) {
494 495 496
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
			     channelList_subset->channel[i].key,
			     NMXP_LOG_STR(channelList_subset->channel[i].name));
497
		 }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
498
		 */
499

500
		 /* 4. Send a Request Pending (optional)*/
501

502
		 /* 5. Send AddChannels*/
503

504
		 /* 6. Repeat until finished: receive and handle packets*/
505

506
		 /* 7. Send Terminate Subscription*/
507 508 509
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
510
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
511 512
	     }
	} else {
513
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
514 515
	}

516
	/* 8. Close the socket*/
517 518 519 520
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
521
	NMXP_MEM_FREE(channelList);
522
	channelList = NULL;
523 524 525 526 527
    }

    return channelList_subset;
}

528

529
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo, char *datas_username, char *datas_password, NMXP_CHAN_LIST **pchannelList, int (*func_cond)(void)) {
530 531
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
532 533 534 535
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
536
    int32_t connection_time;
537
    int ret_sock;
538
    int recv_errno;
539
    
540
    NMXP_MSG_SERVER type;
541
    char buffer[NMXP_MAX_LENGTH_DATA_BUFFER];
542
    int32_t length;
543 544
    NMXP_PRECISLISTREQUEST precisListRequestBody;
    NMXP_CHANNELINFOREQUEST channelInfoRequestBody;
545
    NMXP_CHANNELINFORESPONSE *channelInfo = NULL;
546

547
    char str_start[NMXP_DATA_MAX_SIZE_DATE], str_end[NMXP_DATA_MAX_SIZE_DATE];
548 549 550 551
    str_start[0] = 0;
    str_end[0] = 0;
    
    /* DAP Step 1: Open a socket */
552
    if( (naqssock = nmxp_openSocket(hostname, portnum, func_cond)) == NMXP_SOCKET_ERROR) {
553
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
554 555 556 557 558
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
559
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
560 561 562 563 564
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
565
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
566 567 568 569 570
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
571
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
572 573 574
	return NULL;
    }

575 576
    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
577

578
    /* DAP Step 6: Receive Data until receiving a Ready message */
579
    ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
580
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
581 582

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
583 584 585 586 587 588 589
	if(channelList) {
	    NMXP_MEM_FREE(channelList);
	    channelList = NULL;
	}
	channelList = (NMXP_CHAN_LIST *) NMXP_MEM_MALLOC(length);
	if(channelList) {
	    memmove(channelList, buffer, length);
590

591
	    channelList->number = ntohl(channelList->number);
592

593 594 595 596 597
	    for(i = 0; i < channelList->number; i++) {
		channelList->channel[i].key = ntohl(channelList->channel[i].key);
		if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
		    nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
		}
598
	    }
599 600
	} else {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "nmxp_getMetaChannelList() Error allocating channelList.\n");
601 602 603
	}

	/* Receive Message */
604
	ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
605
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
606 607
    }

608
    *pchannelList = channelList;
609

610 611 612 613
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
614
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_PRECISLISTREQUEST));
615

616 617

    /* DAP Step 6: Receive Data until receiving a Ready message */
618
    ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
619
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
620 621

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
	if(precisChannelList) {
	    NMXP_MEM_FREE(precisChannelList);
	    precisChannelList = NULL;
	}
	precisChannelList = (NMXP_CHAN_PRECISLIST *) NMXP_MEM_MALLOC(length);
	if(precisChannelList) {
	    memmove(precisChannelList, buffer, length);

	    precisChannelList->number = ntohl(precisChannelList->number);
	    for(i = 0; i < precisChannelList->number; i++) {
		precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
		precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
		precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

		nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
		nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

		if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n",
			    precisChannelList->channel[i].key,
			    NMXP_LOG_STR(precisChannelList->channel[i].name));
		}
644

645 646 647 648 649 650
		/*
		   nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
		   precisChannelList->channel[i].key, NMXP_LOG_STR(precisChannelList->channel[i].name),
		   precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
		   NMXP_LOG_STR(str_start), NMXP_LOG_STR(str_end));
		   */
651
	    }
652 653
	} else {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "nmxp_getMetaChannelList() Error allocating precisChannelList.\n");
654 655 656
	}

	/* Receive Message */
657
	ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
658
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
659 660 661
    }


662 663
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
664

665 666 667 668
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
669
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_CHANNELINFOREQUEST));
670

671
		/* DAP Step 6: Receive Data until receiving a Ready message */
672
		ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
673
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
674

675
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
		    if(channelInfo) {
			NMXP_MEM_FREE(channelInfo);
			channelInfo = NULL;
		    }
		    channelInfo = (NMXP_CHANNELINFORESPONSE *) NMXP_MEM_MALLOC(length);
		    if(channelInfo) {
			memmove(channelInfo, buffer, length);

			channelInfo->key = ntohl(channelInfo->key);

			if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
			    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n",
				    iter->key, channelInfo->key, NMXP_LOG_STR(iter->name));
			}
		    } else {
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "nmxp_getMetaChannelList() Error allocating precisChannelList.\n");
692
		    }
693

694
		    /* Receive Message */
695
		    ret_sock = nmxp_receiveMessage(naqssock, &type, buffer, &length, 0, &recv_errno, NMXP_MAX_LENGTH_DATA_BUFFER);
696
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
697 698 699
		}
	    }
	}
700 701 702 703 704 705 706 707 708 709 710 711
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

712
    return chan_list;
713 714
}

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
731 732
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY,
		"nmxp_raw_stream_seq_no_compare() pa %s NULL and pb %s NULL\n", (pa)? "!=" : "=", (pb)? "!=" : "=");
733 734 735 736 737
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
738
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
739 740 741 742
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
743
    raw_stream_buffer->last_latency = 0.0;
744
    /* TODO 
Matteo Quintiliani's avatar
Matteo Quintiliani committed
745
     * Suppose a packet can contain 1/4 secs of data (that is, minimum packet length is 0.25 secs of data) */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
746 747
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
748
    raw_stream_buffer->timeoutrecv = timeoutrecv;
749
    raw_stream_buffer->n_pdlist = 0;
750 751

    raw_stream_buffer->pdlist=NULL;
752
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) NMXP_MEM_MALLOC(raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
753 754 755 756 757 758 759 760 761
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
762 763
    if(raw_stream_buffer) {
	if(raw_stream_buffer->pdlist) {
764
	    for(j=0; j<raw_stream_buffer->n_pdlist; j++) {
765 766
		if(raw_stream_buffer->pdlist[j]) {
		    if(raw_stream_buffer->pdlist[j]->pDataPtr) {
767
			NMXP_MEM_FREE(raw_stream_buffer->pdlist[j]->pDataPtr);
768 769
			raw_stream_buffer->pdlist[j]->pDataPtr = NULL;
		    }
770
		    NMXP_MEM_FREE(raw_stream_buffer->pdlist[j]);
771
		    raw_stream_buffer->pdlist[j] = NULL;
772
		}
773
	    }
774
	    NMXP_MEM_FREE(raw_stream_buffer->pdlist);
775
	    raw_stream_buffer->pdlist = NULL;
776 777 778 779 780
	}
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
781
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
782 783 784 785 786 787 788
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
789
    char str_time[NMXP_DATA_MAX_SIZE_DATE];
790
    NMXP_DATA_PROCESS *pd = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
791
    int y, w;
792 793
    int count_null_element = 0;
    char netstachan[100];
794

795
    /* Allocate pd copy value from a_pd */
796
    if(a_pd) {
797 798 799 800 801 802 803
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
804
	pd = (NMXP_DATA_PROCESS *) NMXP_MEM_MALLOC(sizeof(NMXP_DATA_PROCESS));
805 806 807 808
	if (pd == NULL) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_RAWSTREAM,"nmxp_raw_stream_manage(): Error allocating memory\n");
	    exit(-1);
	}
809 810
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->nSamp *  sizeof(int) > 0) {
811
	    pd->pDataPtr = (int *) NMXP_MEM_MALLOC(a_pd->nSamp * sizeof(int));
812 813 814 815
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
816 817
    } else {
	nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
818
		"nmxp_raw_stream_manage() passing NMXP_DATA_PROCESS pointer equal to NULL\n");
819
    }
820
    /* From here, use only pd */
821 822

    /* First time */
823
    if(p->last_seq_no_sent == -1  &&  pd) {
824 825 826
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
827
	    p->last_latency = nmxp_data_latency(pd);;
828 829
	} else {
	    p->last_seq_no_sent = 0;
830 831
	    p->last_sample_time = 0.0;
	    p->last_latency = 0.0;
832
	}
833
	nmxp_data_to_str(str_time, pd->time);
834
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM,
835 836 837 838 839
		"%s.%s.%s [%d, %d] (%s + %.2f sec.) * First time nmxp_raw_stream_manage() * last_seq_no_sent=%d  last_sample_time=%.2f\n",
		NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel),
		pd->packet_type, pd->seq_no,
		NMXP_LOG_STR(str_time), (double) pd->nSamp / (double) pd->sampRate,
		 p->last_seq_no_sent, p->last_sample_time);
840
    }
841

842 843 844
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
845

846 847 848
    /* Add pd and sort array, in case handle the first item */
    if( (p->n_pdlist >= p->max_pdlist_items || latency >= p->max_tolerable_latency)
	    && p->timeoutrecv <= 0 ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
849

850
	/* Supposing p->pdlist is ordered, handle the first item and over write it.  */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
851 852 853 854 855 856
	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
857
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
858 859 860 861 862 863
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * Force handling packet * n_pdlist=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs!\n",
			NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
			NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
			p->n_pdlist,
			seq_no_diff, time_diff, latency);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
864 865 866 867 868
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
869
		p->last_latency = nmxp_data_latency(p->pdlist[0]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
870 871
	    } else {
		/* It should not occur */
872 873 874 875 876 877 878
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_RAWSTREAM,
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * SHOULD NOT OCCUR packet discarded * n_pdlist=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs!\n",
			NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
			NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
			p->n_pdlist,
			seq_no_diff, time_diff, latency);
879 880
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
881 882
	    /* Free handled packet */
	    if(p->pdlist[0]->pDataPtr) {
883
		NMXP_MEM_FREE(p->pdlist[0]->pDataPtr);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
884 885 886
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
887
		NMXP_MEM_FREE(p->pdlist[0]);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
888 889 890 891 892 893 894 895 896 897
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
898 899
	}
    } else {
900
	if(pd) {
901 902
	    p->pdlist[p->n_pdlist] = pd;
            p->n_pdlist++;
903
	}
904
    }
905 906 907 908 909

    /* Check if some element in pdlist is NULL and remove it */
    y=0;
    while(y < p->n_pdlist) {
	if(p->pdlist[y] == NULL) {
910
	    count_null_element++;
911 912 913 914 915 916 917 918 919 920
	    /* Shift array */
	    for(w=y+1; w < p->n_pdlist; w++) {
		p->pdlist[w-1] = p->pdlist[w];
	     }
	    p->n_pdlist--;
	} else {
	    y++;
	}
    }

921 922 923 924 925 926 927 928 929 930 931 932 933 934
    if(count_null_element > 0) {
	if(p->n_pdlist > 0) {
	    snprintf(netstachan, 100, "%s.%s.%s",
		    NMXP_LOG_STR(p->pdlist[0]->network),
		    NMXP_LOG_STR(p->pdlist[0]->station),
		    NMXP_LOG_STR(p->pdlist[0]->channel));
	} else {
	    strncpy(netstachan, "Unknown", 100);
	}
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY,
		"nmxp_raw_stream_manage() %d NULL elements in pdlist for %s.\n",
		count_null_element, netstachan);
    }

935
    /* Sort array */
936 937
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

938
    /* TODO Check for packet duplication in pd->pdlist*/
939 940

    /* Print array, only for debugging */
941
    /*
942 943 944
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
945 946 947 948 949 950 951
	    nmxp_data_to_str(str_time, p->pdlist[y]->time);
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * %02d n_pdlist=%d\n",
		    NMXP_LOG_STR(p->pdlist[y]->network), NMXP_LOG_STR(p->pdlist[y]->station), NMXP_LOG_STR(p->pdlist[y]->channel),
		    p->pdlist[y]->packet_type, p->pdlist[y]->seq_no,
		    NMXP_LOG_STR(str_time), (double) p->pdlist[y]->nSamp / (double) p->pdlist[y]->sampRate,
		    y, p->n_pdlist);
952 953
	}
    }
954
    */
955

956
    /* Condition for time-out (pd is NULL) */
957
    if(!pd && p->n_pdlist > 0) {
958 959 960 961 962 963 964 965 966 967 968
	/* Log before changing values */
	nmxp_data_to_str(str_time, p->pdlist[0]->time);
	nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		"%s.%s.%s [%d, %d] (%s + %.2f sec.) * pd is NULL and n_pdlist = %d > 0 *  last_seq_no_sent=%d, last_sample_time=%.2f\n",
		NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
		p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
		NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
		p->n_pdlist,
		p->last_seq_no_sent, p->last_sample_time);

	/* Changing values */
969 970
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
971
	p->last_latency = nmxp_data_latency(p->pdlist[0]);
972
    }
973 974 975 976 977 978 979 980 981 982 983

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
984
	    /* Duplicated packets: Discarded */
985
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
986 987 988 989 990
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * Packet discarded * seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel),
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, 
		    NMXP_LOG_STR(str_time), (double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate,
		    seq_no_diff, time_diff, latency);
991 992 993
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
994
	    /* Handle current packet j */
995 996 997 998
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
999
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY,
1000 1001 1002 1003 1004 1005
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * Time is not correct * last_seq_no_sent=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate,
		    p->last_seq_no_sent,
		    seq_no_diff, time_diff, latency);
1006 1007 1008
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
1009
	    p->last_latency = nmxp_data_latency(p->pdlist[j]);
1010 1011 1012
	    send_again = 1;
	    j++;
	} else {
1013
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
1014 1015 1016 1017 1018 1019
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * seq_no_diff=%d > 1 * last_seq_no_sent=%d  j=%d  n_pdlist=%2d  time_diff=%.2fs  lat. %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no,
		    str_time, (double)