nmxp.c 30.3 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.72 2008-03-07 17:30:54 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

30
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
31
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
36
37
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
38
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
39
40
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
41
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
44
    void *buffer;
45
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46
47
48

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
49
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
50

Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
    if(type != NMXP_MSG_CHANNELLIST) {
52
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
53
54
55
56
57
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

58
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
59
	
60
	/* TODO check*/
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
61
62
63

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
64
65
66
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
		    (*pchannelList)->channel[i].key,
		    NMXP_LOG_STR((*pchannelList)->channel[i].name));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
67
68
69
70
71
72
73
74
	}

    }

    return ret;
}


75
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
76
    int ret;
77
    int32_t buffer_length = 16 + (4 * channelList->number); 
78
    char *buffer = malloc(buffer_length);
79
    int32_t app, i, disp;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
105
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
106
107
108
109
110
111
112
113

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
114
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
115
    NMXP_MSG_SERVER type;
116
    void *buffer = NULL;
117
    int32_t length;
118
    NMXP_DATA_PROCESS *pd = NULL;
119

Matteo Quintiliani's avatar
Matteo Quintiliani committed
120
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
121
	if(type == NMXP_MSG_COMPRESSED) {
122
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
123
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
124
	} else if(type == NMXP_MSG_DECOMPRESSED) {
125
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
126
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
127
	} else {
128
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
129
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
130
131
    }

132
    return pd;
133
134
}

135

136
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
137
    int ret;
138
    int i;
139
    char crc32buf[100];
140
141
    char *pcrc32buf = crc32buf;
    int crc32buf_length = 0;
142
    NMXP_CONNECT_REQUEST connectRequest;
143
    int naqs_username_length, naqs_password_length;
144
    int32_t protocol_version = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
145
    char *pp = NULL;
146

147
148
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "%s - %s\n",
	    NMXP_LOG_STR(naqs_username), NMXP_LOG_STR(naqs_password));
149

150
151
152
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

153
154
155
156
157
158
159
160
    for(i=0; i < 100; i++) {
	crc32buf[i] = 0;
    }

    for(i=0; i<12; i++) {
	connectRequest.username[i] = 0;
    }
    if(naqs_username_length != 0) {
161
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
162
    }
163
164
165
166

    connectRequest.version = protocol_version;
    connectRequest.connection_time = connection_time;

167
168
169
    if(!nmxp_data_bigendianhost()) {
	nmxp_data_swap_4b ((int32_t *)&connection_time);
    }
170

171
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
172
173
	/* sprintf(crc32buf, "%d%d", protocol_version, connection_time); */
	/* TODO */
174
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
	/* sprintf(crc32buf, "%s%d%d%s", naqs_username, protocol_version,
		connection_time, naqs_password); */

	memcpy(pcrc32buf, naqs_username, naqs_username_length);
	crc32buf_length += naqs_username_length;
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(protocol_version), sizeof(protocol_version));
	crc32buf_length += sizeof(protocol_version);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(connection_time), sizeof(connection_time));
	crc32buf_length += sizeof(connection_time);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, naqs_password, naqs_password_length);
	crc32buf_length += naqs_password_length;
	pcrc32buf = crc32buf + crc32buf_length;

194
    } else if(naqs_username_length != 0 ) {
195
196
	/* sprintf(crc32buf, "%s%d%d", naqs_username, protocol_version, connection_time); */
	/* TODO */
197
    } else if(naqs_password_length != 0 ) {
198
199
	/* sprintf(crc32buf, "%d%d%s", protocol_version, connection_time, naqs_password); */
	/* TODO */
200
    }
201
202
203
    connectRequest.version = htonl(connectRequest.version);
    connectRequest.connection_time = htonl(connectRequest.connection_time);
    connectRequest.crc32 = htonl(crc32(0L, crc32buf, crc32buf_length));
204
205
206
207

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
208
209
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf length %d, crc32 = %d\n",
		crc32buf_length, connectRequest.crc32);
210
211
212
	for(i=0; i < crc32buf_length; i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", crc32buf[i]);
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
213
	pp = (char *) &connectRequest.crc32;
214
215
216
217
	for(i=0; i < sizeof(connectRequest.crc32); i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", pp[i]);
	}
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "\n");
218
    } else {
219
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
220
221
222
223
224
225
    }

    return ret;
}


226
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
227
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
228
229
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
230
    *connection_time = ntohl(*connection_time);
231
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
232
    if(ret != NMXP_SOCKET_OK) {
233
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
234
235
236
237
238
    }
    return ret;
}


239
int nmxp_waitReady(int isock) {
240
241
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
242
243
244
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
245
    int recv_errno;
246
247

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
249
250
251
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
252
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
253
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
254
255
256
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
257
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
258
259
260
261
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
262
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
263
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
264
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
265
266
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
267
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
268
269
			    err_buff[err_length] = 0;
		    }
270
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n",
271
			    NMXP_LOG_STR(err_buff), err_reason);
272
	    }
273
274
275
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
276
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
277
278
279
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
280
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
281
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
282
283
284
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
		if(type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		    char *str_msg = NULL;
		    char *buf_app = (char *) malloc(sizeof(char) * length);
		    int32_t reason;
		    memcpy(&reason, buf_app, sizeof(reason));
		    reason = ntohl(reason);
		    str_msg = buf_app + sizeof(reason);
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
			    reason,
			    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
			    str_msg);
		    if(buf_app) {
			free(buf_app);
		    }
300
301
302
		    /* Close the socket*/
		    nmxp_closeSocket(isock);
		    exit(-1);
303
		} else if(length == 4) {
304
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
305
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
306
307
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
308
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
309
310
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
311
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
312
313
314
315
316
317
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
318
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
319
320
321
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
322
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
323
324
325
326
327
328
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
329
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
330
331
332
333
334
335
336
337
338
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


339
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
340
341
342
343
344
345
346
347
348
349
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
350
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
351
352
353
354
355
356
    }

    return ret;
}


357
358
359
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
360
    /* int i; */
361

362
    /* 1. Open a socket*/
363
364
365
366
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

367
	/* 2. Send a Connect*/
368
369
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

370
	    /* 3. Receive ChannelList*/
371
372
373
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
374
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d are DataType channel.\n", channelList_subset->number, channelList->number);
375

376
		 /* nmxp_chan_sortByKey(channelList_subset);*/
377
378
		 nmxp_chan_sortByName(channelList_subset);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
379
		 /*
380
		 for(i=0; i < channelList_subset->number; i++) {
381
382
383
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
			     channelList_subset->channel[i].key,
			     NMXP_LOG_STR(channelList_subset->channel[i].name));
384
		 }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
385
		 */
386

387
		 /* 4. Send a Request Pending (optional)*/
388

389
		 /* 5. Send AddChannels*/
390

391
		 /* 6. Repeat until finished: receive and handle packets*/
392

393
		 /* 7. Send Terminate Subscription*/
394
395
396
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
397
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
398
399
	     }
	} else {
400
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
401
402
	}

403
	/* 8. Close the socket*/
404
405
406
407
408
409
410
411
412
413
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

414

415
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo, char *datas_username, char *datas_password, NMXP_CHAN_LIST **pchannelList) {
416
417
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
418
419
420
421
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
422
    int32_t connection_time;
423
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
424
    int recv_errno;
425
    
426
427
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
428
    int32_t length;
429
430
431
    NMXP_PRECISLISTREQUEST precisListRequestBody;
    NMXP_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_CHANNELINFORESPONSE *channelInfo;
432
433
434
435
436
437
438

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    
    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
439
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
440
441
442
443
444
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
445
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
446
447
448
449
450
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
451
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
452
453
454
455
456
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
457
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
458
459
460
	return NULL;
    }

461
462
463
464
465
466



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
467
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
468
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
469
470
471
472
473
474
475
476

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
477
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
478
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
479
	    }
480
481
482
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
483
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
484
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
485
486
    }

487
    *pchannelList = channelList;
488

489
490
491
492
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
493
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_PRECISLISTREQUEST));
494

495
496

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
497
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
498
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
499
500
501
502
503
504
505
506
507
508
509
510
511

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

512
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
513
514
515
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n",
			precisChannelList->channel[i].key,
			NMXP_LOG_STR(precisChannelList->channel[i].name));
516
517
518
	    }

	    /*
519
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
520
		    precisChannelList->channel[i].key, NMXP_LOG_STR(precisChannelList->channel[i].name),
521
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
522
		    NMXP_LOG_STR(str_start), NMXP_LOG_STR(str_end));
523
		    */
524
525
526
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
528
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
529
530
531
    }


532
533
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
534

535
536
537
538
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
539
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_CHANNELINFOREQUEST));
540

541
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
542
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
543
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
544

545
546
547
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
548

549
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
550
551
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n",
				iter->key, channelInfo->key, NMXP_LOG_STR(iter->name));
552
553
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
554
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
555
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
556
557
558
		}
	    }
	}
559
560
561
562
563
564
565
566
567
568
569
570
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

571
    return chan_list;
572
573
}

574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
590
591
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY,
		"nmxp_raw_stream_seq_no_compare() pa %s NULL and pb %s NULL\n", (pa)? "!=" : "=", (pb)? "!=" : "=");
592
593
594
595
596
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
597
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
598
599
600
601
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
602
    /* TODO 
Matteo Quintiliani's avatar
Matteo Quintiliani committed
603
     * Suppose a packet can contain 1/4 secs of data (that is, minimum packet length is 0.25 secs of data) */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
604
605
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
606
    raw_stream_buffer->timeoutrecv = timeoutrecv;
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
629
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
630
631
632
633
634
635
636
637
638
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
639
    int y, w;
640
641
    int count_null_element = 0;
    char netstachan[100];
642

643
    /* Allocate pd copy value from a_pd */
644
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
666
667
    } else {
	nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
668
		"nmxp_raw_stream_manage() passing NMXP_DATA_PROCESS pointer equal to NULL\n");
669
    }
670
    /* From here, use only pd */
671
672

    /* First time */
673
    if(p->last_seq_no_sent == -1  &&  pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
674
675
676
677
678
679
680
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
681
	nmxp_data_to_str(str_time, pd->time);
682
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM,
683
684
685
686
687
		"%s.%s.%s [%d, %d] (%s + %.2f sec.) * First time nmxp_raw_stream_manage() * last_seq_no_sent=%d  last_sample_time=%.2f\n",
		NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel),
		pd->packet_type, pd->seq_no,
		NMXP_LOG_STR(str_time), (double) pd->nSamp / (double) pd->sampRate,
		 p->last_seq_no_sent, p->last_sample_time);
688
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
689

690
691
692
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
693

694
695
696
    /* Add pd and sort array, in case handle the first item */
    if( (p->n_pdlist >= p->max_pdlist_items || latency >= p->max_tolerable_latency)
	    && p->timeoutrecv <= 0 ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
697

698
	/* Supposing p->pdlist is ordered, handle the first item and over write it.  */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
699
700
701
702
703
704
	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
705
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
706
707
708
709
710
711
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * Force handling packet * n_pdlist=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs!\n",
			NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
			NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
			p->n_pdlist,
			seq_no_diff, time_diff, latency);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
712
713
714
715
716
717
718
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
719
720
721
722
723
724
725
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_RAWSTREAM,
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * SHOULD NOT OCCUR packet discarded * n_pdlist=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs!\n",
			NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
			NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
			p->n_pdlist,
			seq_no_diff, time_diff, latency);
726
727
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
749
750
	}
    } else {
751
752
753
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
754
    }
755
756
757
758
759

    /* Check if some element in pdlist is NULL and remove it */
    y=0;
    while(y < p->n_pdlist) {
	if(p->pdlist[y] == NULL) {
760
	    count_null_element++;
761
762
763
764
765
766
767
768
769
770
	    /* Shift array */
	    for(w=y+1; w < p->n_pdlist; w++) {
		p->pdlist[w-1] = p->pdlist[w];
	     }
	    p->n_pdlist--;
	} else {
	    y++;
	}
    }

771
772
773
774
775
776
777
778
779
780
781
782
783
784
    if(count_null_element > 0) {
	if(p->n_pdlist > 0) {
	    snprintf(netstachan, 100, "%s.%s.%s",
		    NMXP_LOG_STR(p->pdlist[0]->network),
		    NMXP_LOG_STR(p->pdlist[0]->station),
		    NMXP_LOG_STR(p->pdlist[0]->channel));
	} else {
	    strncpy(netstachan, "Unknown", 100);
	}
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY,
		"nmxp_raw_stream_manage() %d NULL elements in pdlist for %s.\n",
		count_null_element, netstachan);
    }

785
    /* Sort array */
786
787
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

788
    /* TODO Check for packet duplication in pd->pdlist*/
789
790

    /* Print array, only for debugging */
791
    /*
792
793
794
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
795
796
797
798
799
800
801
	    nmxp_data_to_str(str_time, p->pdlist[y]->time);
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * %02d n_pdlist=%d\n",
		    NMXP_LOG_STR(p->pdlist[y]->network), NMXP_LOG_STR(p->pdlist[y]->station), NMXP_LOG_STR(p->pdlist[y]->channel),
		    p->pdlist[y]->packet_type, p->pdlist[y]->seq_no,
		    NMXP_LOG_STR(str_time), (double) p->pdlist[y]->nSamp / (double) p->pdlist[y]->sampRate,
		    y, p->n_pdlist);
802
803
	}
    }
804
    */
805

806
    /* Condition for time-out (pd is NULL) */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
807
    if(!pd && p->n_pdlist > 0) {
808
809
810
811
812
813
814
815
816
817
818
	/* Log before changing values */
	nmxp_data_to_str(str_time, p->pdlist[0]->time);
	nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		"%s.%s.%s [%d, %d] (%s + %.2f sec.) * pd is NULL and n_pdlist = %d > 0 *  last_seq_no_sent=%d, last_sample_time=%.2f\n",
		NMXP_LOG_STR(p->pdlist[0]->network), NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
		p->pdlist[0]->packet_type, p->pdlist[0]->seq_no,
		NMXP_LOG_STR(str_time), (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate,
		p->n_pdlist,
		p->last_seq_no_sent, p->last_sample_time);

	/* Changing values */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
819
820
821
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
822
823
824
825
826
827
828
829
830
831
832

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
833
	    /* Duplicated packets: Discarded */
834
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
835
836
837
838
839
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * Packet discarded * seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel),
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, 
		    NMXP_LOG_STR(str_time), (double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate,
		    seq_no_diff, time_diff, latency);
840
841
842
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
843
	    /* Handle current packet j */
844
845
846
847
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
848
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
849
850
851
852
853
854
			"%s.%s.%s [%d, %d] (%s + %.2f sec.) * Time is not correct * last_seq_no_sent=%d  seq_no_diff=%d  time_diff=%.2fs  lat. %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate,
		    p->last_seq_no_sent,
		    seq_no_diff, time_diff, latency);
855
856
857
858
859
860
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
861
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
862
863
864
865
866
867
		    "%s.%s.%s [%d, %d] (%s + %.2f sec.) * seq_no_diff=%d > 1 * last_seq_no_sent=%d  j=%d  n_pdlist=%2d  time_diff=%.2fs  lat. %.1fs\n",
		    NMXP_LOG_STR(p->pdlist[j]->network), NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
		    p->pdlist[j]->packet_type, p->pdlist[j]->seq_no,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate,
		    seq_no_diff, p->last_seq_no_sent, j, p->n_pdlist,
		    time_diff, latency);
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

897
898
899
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
900
901
902
903

    return ret;
}

904
905
906
907
908
909
910

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}