nmxp.c 24.9 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.55 2007-12-07 13:45:44 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

30
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
31
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
36
37
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
38
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
39
40
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
41
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
44
    void *buffer;
45
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46
47
48

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
49
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
50

Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
    if(type != NMXP_MSG_CHANNELLIST) {
52
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
53
54
55
56
57
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

58
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
59
60
61
62
63
	
	// TODO check

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
64
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", (*pchannelList)->channel[i].key, (*pchannelList)->channel[i].name);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
65
66
67
68
69
70
71
72
	}

    }

    return ret;
}


73
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
74
    int ret;
75
    int32_t buffer_length = 16 + (4 * channelList->number); 
76
    char *buffer = malloc(buffer_length);
77
    int32_t app, i, disp;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
103
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
104
105
106
107
108
109
110
111

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
    NMXP_MSG_SERVER type;
114
    void *buffer = NULL;
115
    int32_t length;
116
    NMXP_DATA_PROCESS *pd = NULL;
117

Matteo Quintiliani's avatar
Matteo Quintiliani committed
118
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
119
	if(type == NMXP_MSG_COMPRESSED) {
120
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
121
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
122
	} else if(type == NMXP_MSG_DECOMPRESSED) {
123
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
124
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
125
	} else {
126
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
127
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
128
129
    }

130
    return pd;
131
132
}

133

134
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
135
136
137
    int ret;
    char crc32buf[100];
    NMXP_CONNECT_REQUEST connectRequest;
138
    int naqs_username_length, naqs_password_length;
139

140
141
142
143
144
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

    if(naqs_username_length == 0) {
	connectRequest.username[0] = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
145
    } else {
146
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
147
    }
148
149
150
    connectRequest.version = htonl(0);
    connectRequest.connection_time = htonl(connection_time);

151
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
152
	sprintf(crc32buf, "%d%d", connectRequest.version, connection_time);
153
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
154
155
	sprintf(crc32buf, "%s%d%d%s", naqs_username, connectRequest.version,
		connection_time, naqs_password);
156
    } else if(naqs_username_length != 0 ) {
157
	sprintf(crc32buf, "%s%d%d", naqs_username, connectRequest.version, connection_time);
158
    } else if(naqs_password_length != 0 ) {
159
160
161
162
163
164
165
	sprintf(crc32buf, "%d%d%s", connectRequest.version, connection_time, naqs_password);
    }
    connectRequest.crc32 = htonl(crc32((unsigned char *) crc32buf, strlen(crc32buf)));

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
166
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf = (%s), crc32 = %d\n", crc32buf, connectRequest.crc32);
167
    } else {
168
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
169
170
171
172
173
174
    }

    return ret;
}


175
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
176
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
177
178
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
179
    *connection_time = ntohl(*connection_time);
180
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
181
    if(ret != NMXP_SOCKET_OK) {
182
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
183
184
185
186
187
    }
    return ret;
}


188
int nmxp_waitReady(int isock) {
189
190
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
191
192
193
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
194
    int recv_errno;
195
196

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
197
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
198
199
200
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
201
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
202
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
203
204
205
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
206
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
207
208
209
210
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
211
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
212
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
213
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
214
215
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
216
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
217
218
			    err_buff[err_length] = 0;
		    }
219
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n", err_buff, err_reason);
220
	    }
221
222
223
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
224
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
225
226
227
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
228
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
229
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
230
231
232
233
234
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
		if(length == 4) {
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
236
237
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
238
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
239
240
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
241
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
242
243
244
245
246
247
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
249
250
251
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
252
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
253
254
255
256
257
258
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
259
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
260
261
262
263
264
265
266
267
268
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


269
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
270
271
272
273
274
275
276
277
278
279
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
280
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
281
282
283
284
285
286
    }

    return ret;
}


287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
    int i;

    // 1. Open a socket
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

	// 2. Send a Connect
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

	    // 3. Receive ChannelList
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
304
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d\n", channelList_subset->number, channelList->number);
305
306
307
308
309

		 // nmxp_chan_sortByKey(channelList_subset);
		 nmxp_chan_sortByName(channelList_subset);

		 for(i=0; i < channelList_subset->number; i++) {
310
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", channelList_subset->channel[i].key, channelList_subset->channel[i].name);
311
312
313
314
315
316
317
318
319
320
321
322
		 }

		 // 4. Send a Request Pending (optional)

		 // 5. Send AddChannels

		 // 6. Repeat until finished: receive and handle packets

		 // 7. Send Terminate Subscription
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
323
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
324
325
	     }
	} else {
326
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
327
328
329
330
331
332
333
334
335
336
337
338
339
	}

	// 8. Close the socket
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

340

341
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo) {
342
343
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
344
345
346
347
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
348
    int32_t connection_time;
349
350
    char *datas_username = NULL, *datas_password = NULL;
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
351
    int recv_errno;
352

353
    
354
355
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
356
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
357
    NMXP_MSGBODY_PRECISLISTREQUEST precisListRequestBody;
358
359
    NMXP_MSGBODY_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_MSGBODY_CHANNELINFORESPONSE *channelInfo;
360
361
362
363
364
365
366
367

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    

    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
368
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
369
370
371
372
373
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
374
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
375
376
377
378
379
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
380
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
381
382
383
384
385
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
386
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
387
388
389
	return NULL;
    }

390
391
392
393
394
395



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
396
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
397
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
398
399
400
401
402
403
404
405

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
406
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
407
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
408
	    }
409
410
411
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
412
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
413
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
414
415
416
    }


417
418
419
420
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
421
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_MSGBODY_PRECISLISTREQUEST));
422

423
424

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
425
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
426
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
427
428
429
430
431
432
433
434
435
436
437
438
439

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

440
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
441
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n", precisChannelList->channel[i].key, precisChannelList->channel[i].name);
442
443
444
	    }

	    /*
445
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
446
447
448
		    precisChannelList->channel[i].key, precisChannelList->channel[i].name,
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
		    str_start, str_end);
449
		    */
450
451
452
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
453
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
454
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
455
456
457
    }


458
459
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
460

461
462
463
464
465
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_MSGBODY_CHANNELINFOREQUEST));
466

467
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
468
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
469
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
470

471
472
473
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
474

475
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
476
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n", iter->key, channelInfo->key, iter->name);
477
478
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
479
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
480
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
481
482
483
		}
	    }
	}
484
485
486
487
488
489
490
491
492
493
494
495
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

496
497
498
    nmxp_meta_chan_print(chan_list);

    return chan_list;
499
500
}

501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
523
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
524
525
526
527
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
528
529
    /* TODO 
     * Suppose a packet can contain 1/4 secs of data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
530
531
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
532
    raw_stream_buffer->timeoutrecv = timeoutrecv;
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
555
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
556
557
558
559
560
561
562
563
564
565
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;

566
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
588
    }
589
590

    /* First time */
591
    if(p->last_seq_no_sent == -1  && pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
592
593
594
595
596
597
598
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
599
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "First time nmxp_raw_stream_manage() for %s.%s.%s .\n", pd->network, pd->station, pd->channel);
600
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
601

602
603
604
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
605
606

    /* Add pd and sort array */
607
    if(p->n_pdlist >= p->max_pdlist_items
Matteo Quintiliani's avatar
Matteo Quintiliani committed
608
	    || latency >= p->max_tolerable_latency) {
609
610
611
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
612
613
614
615
616
617
618

	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
619
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2fs  n_pdlist %d  lat. %.1fs!\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
620
621
622
623
624
625
626
627
628
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, p->n_pdlist, latency);
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
629
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs  lat. %.1fs\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
630
631
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
632
633
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
655
656
657
	}

    } else {
658
659
660
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
661
662
663
664
665
666
667
668
669
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

    // TODO Check for packet duplication in pd->pdlist

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
670
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
671
672
673
	}
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
674
675
676
677
678
    // Condition for time-out (pd is NULL)
    if(!pd && p->n_pdlist > 0) {
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
679
680
681
682
683
684
685
686
687
688
689
690

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
691
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
692
693
694
695
696
697
698
699
700
		    p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
701
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s time is not correct seq_no_diff=%d time_diff=%.2fs  ([%d] %d-%d)  (%s - %.2f sec.) lat. %.1fs\n",
702
703
704
705
706
707
708
709
710
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, latency);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
711
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs  lat. %.1fs\n",
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff, latency);
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

744
745
746
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
747
748
749
750

    return ret;
}

751
752
753
754
755
756
757

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}