nmxp.c 27.5 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.61 2008-02-24 17:30:27 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

30
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
31
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
36
37
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
38
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
39
40
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
41
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
44
    void *buffer;
45
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46
47
48

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
49
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
50

Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
    if(type != NMXP_MSG_CHANNELLIST) {
52
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
53
54
55
56
57
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

58
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
59
	
60
	/* TODO check*/
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
61
62
63

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
64
65
66
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
		    (*pchannelList)->channel[i].key,
		    NMXP_LOG_STR((*pchannelList)->channel[i].name));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
67
68
69
70
71
72
73
74
	}

    }

    return ret;
}


75
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
76
    int ret;
77
    int32_t buffer_length = 16 + (4 * channelList->number); 
78
    char *buffer = malloc(buffer_length);
79
    int32_t app, i, disp;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
105
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
106
107
108
109
110
111
112
113

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
114
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
115
    NMXP_MSG_SERVER type;
116
    void *buffer = NULL;
117
    int32_t length;
118
    NMXP_DATA_PROCESS *pd = NULL;
119

Matteo Quintiliani's avatar
Matteo Quintiliani committed
120
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
121
	if(type == NMXP_MSG_COMPRESSED) {
122
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
123
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
124
	} else if(type == NMXP_MSG_DECOMPRESSED) {
125
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
126
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
127
	} else {
128
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
129
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
130
131
    }

132
    return pd;
133
134
}

135

136
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
137
    int ret;
138
    int i;
139
    char crc32buf[100];
140
141
    char *pcrc32buf = crc32buf;
    int crc32buf_length = 0;
142
    NMXP_CONNECT_REQUEST connectRequest;
143
    int naqs_username_length, naqs_password_length;
144
145
    int32_t protocol_version = 0;

146
147
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "%s - %s\n",
	    NMXP_LOG_STR(naqs_username), NMXP_LOG_STR(naqs_password));
148

149
150
151
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

152
153
154
155
156
157
158
159
    for(i=0; i < 100; i++) {
	crc32buf[i] = 0;
    }

    for(i=0; i<12; i++) {
	connectRequest.username[i] = 0;
    }
    if(naqs_username_length != 0) {
160
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
161
    }
162
163
164
165

    connectRequest.version = protocol_version;
    connectRequest.connection_time = connection_time;

166
167
168
    if(!nmxp_data_bigendianhost()) {
	nmxp_data_swap_4b ((int32_t *)&connection_time);
    }
169

170
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
171
172
	/* sprintf(crc32buf, "%d%d", protocol_version, connection_time); */
	/* TODO */
173
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
	/* sprintf(crc32buf, "%s%d%d%s", naqs_username, protocol_version,
		connection_time, naqs_password); */

	memcpy(pcrc32buf, naqs_username, naqs_username_length);
	crc32buf_length += naqs_username_length;
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(protocol_version), sizeof(protocol_version));
	crc32buf_length += sizeof(protocol_version);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(connection_time), sizeof(connection_time));
	crc32buf_length += sizeof(connection_time);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, naqs_password, naqs_password_length);
	crc32buf_length += naqs_password_length;
	pcrc32buf = crc32buf + crc32buf_length;

193
    } else if(naqs_username_length != 0 ) {
194
195
	/* sprintf(crc32buf, "%s%d%d", naqs_username, protocol_version, connection_time); */
	/* TODO */
196
    } else if(naqs_password_length != 0 ) {
197
198
	/* sprintf(crc32buf, "%d%d%s", protocol_version, connection_time, naqs_password); */
	/* TODO */
199
    }
200
201
202
    connectRequest.version = htonl(connectRequest.version);
    connectRequest.connection_time = htonl(connectRequest.connection_time);
    connectRequest.crc32 = htonl(crc32(0L, crc32buf, crc32buf_length));
203
204
205
206

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
207
208
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf length %d, crc32 = %d\n",
		crc32buf_length, connectRequest.crc32);
209
210
211
212
213
214
215
216
	for(i=0; i < crc32buf_length; i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", crc32buf[i]);
	}
	char *pp = (char *) &connectRequest.crc32;
	for(i=0; i < sizeof(connectRequest.crc32); i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", pp[i]);
	}
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "\n");
217
    } else {
218
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
219
220
221
222
223
224
    }

    return ret;
}


225
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
226
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
227
228
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
229
    *connection_time = ntohl(*connection_time);
230
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
231
    if(ret != NMXP_SOCKET_OK) {
232
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
233
234
235
236
237
    }
    return ret;
}


238
int nmxp_waitReady(int isock) {
239
240
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
241
242
243
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
244
    int recv_errno;
245
246

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
247
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
248
249
250
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
251
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
252
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
253
254
255
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
256
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
257
258
259
260
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
261
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
262
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
263
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
264
265
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
266
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
267
268
			    err_buff[err_length] = 0;
		    }
269
270
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n",
			    NMXP_LOG_STR(err_buff), NMXP_LOG_STR(err_reason));
271
	    }
272
273
274
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
275
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
276
277
278
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
279
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
280
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
281
282
283
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
		if(type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		    char *str_msg = NULL;
		    char *buf_app = (char *) malloc(sizeof(char) * length);
		    int32_t reason;
		    memcpy(&reason, buf_app, sizeof(reason));
		    reason = ntohl(reason);
		    str_msg = buf_app + sizeof(reason);
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
			    reason,
			    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
			    str_msg);
		    if(buf_app) {
			free(buf_app);
		    }
		} else if(length == 4) {
300
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
301
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
302
303
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
304
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
305
306
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
307
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
308
309
310
311
312
313
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
314
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
315
316
317
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
318
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
319
320
321
322
323
324
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
325
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
326
327
328
329
330
331
332
333
334
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


335
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
336
337
338
339
340
341
342
343
344
345
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
346
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
347
348
349
350
351
352
    }

    return ret;
}


353
354
355
356
357
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
    int i;

358
    /* 1. Open a socket*/
359
360
361
362
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

363
	/* 2. Send a Connect*/
364
365
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

366
	    /* 3. Receive ChannelList*/
367
368
369
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
370
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d\n", channelList_subset->number, channelList->number);
371

372
		 /* nmxp_chan_sortByKey(channelList_subset);*/
373
374
375
		 nmxp_chan_sortByName(channelList_subset);

		 for(i=0; i < channelList_subset->number; i++) {
376
377
378
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n",
			     channelList_subset->channel[i].key,
			     NMXP_LOG_STR(channelList_subset->channel[i].name));
379
380
		 }

381
		 /* 4. Send a Request Pending (optional)*/
382

383
		 /* 5. Send AddChannels*/
384

385
		 /* 6. Repeat until finished: receive and handle packets*/
386

387
		 /* 7. Send Terminate Subscription*/
388
389
390
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
391
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
392
393
	     }
	} else {
394
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
395
396
	}

397
	/* 8. Close the socket*/
398
399
400
401
402
403
404
405
406
407
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

408

409
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo, char *datas_username, char *datas_password, NMXP_CHAN_LIST **pchannelList) {
410
411
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
412
413
414
415
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
416
    int32_t connection_time;
417
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
418
    int recv_errno;
419
    
420
421
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
422
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
423
    NMXP_MSGBODY_PRECISLISTREQUEST precisListRequestBody;
424
425
    NMXP_MSGBODY_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_MSGBODY_CHANNELINFORESPONSE *channelInfo;
426
427
428
429
430
431
432

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    
    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
433
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
434
435
436
437
438
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
439
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
440
441
442
443
444
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
445
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
446
447
448
449
450
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
451
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
452
453
454
	return NULL;
    }

455
456
457
458
459
460



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
461
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
462
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
463
464
465
466
467
468
469
470

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
471
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
472
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
473
	    }
474
475
476
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
477
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
478
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
479
480
    }

481
    *pchannelList = channelList;
482

483
484
485
486
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
487
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_MSGBODY_PRECISLISTREQUEST));
488

489
490

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
491
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
492
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
493
494
495
496
497
498
499
500
501
502
503
504
505

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

506
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
507
508
509
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n",
			precisChannelList->channel[i].key,
			NMXP_LOG_STR(precisChannelList->channel[i].name));
510
511
512
	    }

	    /*
513
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
514
		    precisChannelList->channel[i].key, NMXP_LOG_STR(precisChannelList->channel[i].name),
515
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
516
		    NMXP_LOG_STR(str_start), NMXP_LOG_STR(str_end));
517
		    */
518
519
520
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
521
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
522
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
523
524
525
    }


526
527
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
528

529
530
531
532
533
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_MSGBODY_CHANNELINFOREQUEST));
534

535
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
536
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
537
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
538

539
540
541
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
542

543
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
544
545
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n",
				iter->key, channelInfo->key, NMXP_LOG_STR(iter->name));
546
547
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
548
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
549
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
550
551
552
		}
	    }
	}
553
554
555
556
557
558
559
560
561
562
563
564
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

565
    return chan_list;
566
567
}

568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
584
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error pa and/or pb are NULL!\n");
585
586
587
588
589
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
590
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
591
592
593
594
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
595
596
    /* TODO 
     * Suppose a packet can contain 1/4 secs of data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
597
598
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
599
    raw_stream_buffer->timeoutrecv = timeoutrecv;
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
622
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
623
624
625
626
627
628
629
630
631
632
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;

633
    /* Allocate pd copy value from a_pd */
634
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
656
    }
657
658

    /* First time */
659
    if(p->last_seq_no_sent == -1  &&  pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
660
661
662
663
664
665
666
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
667
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM,
668
669
		"First time nmxp_raw_stream_manage() for %s.%s.%s .\n",
		NMXP_LOG_STR(pd->network), NMXP_LOG_STR(pd->station), NMXP_LOG_STR(pd->channel));
670
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
671

672
673
674
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
675
676

    /* Add pd and sort array */
677
678
679
680
    if( (p->n_pdlist >= p->max_pdlist_items
	    || latency >= p->max_tolerable_latency) &&
	    p->timeoutrecv <= 0
	    ) {
681
682
683
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
684
685
686
687
688
689
690

	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
691
692
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
			"Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2fs  n_pdlist %d  lat. %.1fs!\n",
693
694
695
			NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->seq_no, p->pdlist[0]->packet_type,
			NMXP_LOG_STR(str_time),
Matteo Quintiliani's avatar
Matteo Quintiliani committed
696
697
698
699
700
701
702
703
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, p->n_pdlist, latency);
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
704
705
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
			"NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs  lat. %.1fs\n",
706
707
			NMXP_LOG_STR(p->pdlist[0]->station), NMXP_LOG_STR(p->pdlist[0]->channel),
			p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, NMXP_LOG_STR(str_time),
Matteo Quintiliani's avatar
Matteo Quintiliani committed
708
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
709
710
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
732
733
734
	}

    } else {
735
736
737
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
738
739
740
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

741
    /* TODO Check for packet duplication in pd->pdlist*/
742
743
744
745
746

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
747
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
748
749
750
	}
    }

751
    /* Condition for time-out (pd is NULL) */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
752
753
754
755
    if(!pd && p->n_pdlist > 0) {
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
756
757
758
759
760
761
762
763
764
765
766

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
767
	    /* Duplicated packets: Discarded*/
768
769
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		    "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
770
771
772
		    NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel),
		    p->pdlist[j]->seq_no, p->pdlist[j]->packet_type,
		    NMXP_LOG_STR(str_time),
773
774
775
776
777
778
779
780
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
781
782
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
			"%s.%s time is not correct seq_no_diff=%d time_diff=%.2fs  ([%d] %d-%d)  (%s - %.2f sec.) lat. %.1fs\n",
783
		    NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
784
785
786
787
788
789
790
791
		    seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, latency);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
792
793
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
		    "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs  lat. %.1fs\n",
794
		    NMXP_LOG_STR(p->pdlist[j]->station), NMXP_LOG_STR(p->pdlist[j]->channel), 
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff, latency);
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

826
827
828
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
829
830
831
832

    return ret;
}

833
834
835
836
837
838
839

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}