nmxp.c 27 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.59 2008-02-24 15:10:31 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

30
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
31
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
36
37
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
38
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
39
40
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
41
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
44
    void *buffer;
45
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46
47
48

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
49
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
50

Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
    if(type != NMXP_MSG_CHANNELLIST) {
52
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
53
54
55
56
57
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

58
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
59
	
60
	/* TODO check*/
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
61
62
63

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
64
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", (*pchannelList)->channel[i].key, (*pchannelList)->channel[i].name);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
65
66
67
68
69
70
71
72
	}

    }

    return ret;
}


73
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
74
    int ret;
75
    int32_t buffer_length = 16 + (4 * channelList->number); 
76
    char *buffer = malloc(buffer_length);
77
    int32_t app, i, disp;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
103
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
104
105
106
107
108
109
110
111

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
    NMXP_MSG_SERVER type;
114
    void *buffer = NULL;
115
    int32_t length;
116
    NMXP_DATA_PROCESS *pd = NULL;
117

Matteo Quintiliani's avatar
Matteo Quintiliani committed
118
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
119
	if(type == NMXP_MSG_COMPRESSED) {
120
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
121
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
122
	} else if(type == NMXP_MSG_DECOMPRESSED) {
123
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
124
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
125
	} else {
126
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
127
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
128
129
    }

130
    return pd;
131
132
}

133

134
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
135
    int ret;
136
    int i;
137
    char crc32buf[100];
138
139
    char *pcrc32buf = crc32buf;
    int crc32buf_length = 0;
140
    NMXP_CONNECT_REQUEST connectRequest;
141
    int naqs_username_length, naqs_password_length;
142
143
144
    int32_t protocol_version = 0;

    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "%s - %s\n", naqs_username, naqs_password);
145

146
147
148
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

149
150
151
152
153
154
155
156
    for(i=0; i < 100; i++) {
	crc32buf[i] = 0;
    }

    for(i=0; i<12; i++) {
	connectRequest.username[i] = 0;
    }
    if(naqs_username_length != 0) {
157
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
158
    }
159
160
161
162

    connectRequest.version = protocol_version;
    connectRequest.connection_time = connection_time;

163
164
165
    if(!nmxp_data_bigendianhost()) {
	nmxp_data_swap_4b ((int32_t *)&connection_time);
    }
166

167
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
168
169
	/* sprintf(crc32buf, "%d%d", protocol_version, connection_time); */
	/* TODO */
170
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
	/* sprintf(crc32buf, "%s%d%d%s", naqs_username, protocol_version,
		connection_time, naqs_password); */

	memcpy(pcrc32buf, naqs_username, naqs_username_length);
	crc32buf_length += naqs_username_length;
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(protocol_version), sizeof(protocol_version));
	crc32buf_length += sizeof(protocol_version);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(connection_time), sizeof(connection_time));
	crc32buf_length += sizeof(connection_time);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, naqs_password, naqs_password_length);
	crc32buf_length += naqs_password_length;
	pcrc32buf = crc32buf + crc32buf_length;

190
    } else if(naqs_username_length != 0 ) {
191
192
	/* sprintf(crc32buf, "%s%d%d", naqs_username, protocol_version, connection_time); */
	/* TODO */
193
    } else if(naqs_password_length != 0 ) {
194
195
	/* sprintf(crc32buf, "%d%d%s", protocol_version, connection_time, naqs_password); */
	/* TODO */
196
    }
197
198
199
    connectRequest.version = htonl(connectRequest.version);
    connectRequest.connection_time = htonl(connectRequest.connection_time);
    connectRequest.crc32 = htonl(crc32(0L, crc32buf, crc32buf_length));
200
201
202
203

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
204
205
206
207
208
209
210
211
212
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf length %d, crc32 = %d\n", crc32buf_length, connectRequest.crc32);
	for(i=0; i < crc32buf_length; i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", crc32buf[i]);
	}
	char *pp = (char *) &connectRequest.crc32;
	for(i=0; i < sizeof(connectRequest.crc32); i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", pp[i]);
	}
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "\n");
213
    } else {
214
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
215
216
217
218
219
220
    }

    return ret;
}


221
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
222
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
223
224
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
225
    *connection_time = ntohl(*connection_time);
226
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
227
    if(ret != NMXP_SOCKET_OK) {
228
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
229
230
231
232
233
    }
    return ret;
}


234
int nmxp_waitReady(int isock) {
235
236
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
237
238
239
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
240
    int recv_errno;
241
242

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
243
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
244
245
246
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
247
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
248
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
249
250
251
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
252
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
253
254
255
256
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
257
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
258
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
259
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
260
261
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
262
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
263
264
			    err_buff[err_length] = 0;
		    }
265
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n", err_buff, err_reason);
266
	    }
267
268
269
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
270
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
271
272
273
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
274
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
275
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
276
277
278
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
		if(type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		    char *str_msg = NULL;
		    char *buf_app = (char *) malloc(sizeof(char) * length);
		    int32_t reason;
		    memcpy(&reason, buf_app, sizeof(reason));
		    reason = ntohl(reason);
		    str_msg = buf_app + sizeof(reason);
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
			    reason,
			    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
			    str_msg);
		    if(buf_app) {
			free(buf_app);
		    }
		} else if(length == 4) {
295
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
296
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
297
298
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
299
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
300
301
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
302
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
303
304
305
306
307
308
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
309
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
310
311
312
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
313
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
314
315
316
317
318
319
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
320
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
321
322
323
324
325
326
327
328
329
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


330
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
331
332
333
334
335
336
337
338
339
340
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
341
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
342
343
344
345
346
347
    }

    return ret;
}


348
349
350
351
352
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
    int i;

353
    /* 1. Open a socket*/
354
355
356
357
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

358
	/* 2. Send a Connect*/
359
360
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

361
	    /* 3. Receive ChannelList*/
362
363
364
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
365
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d\n", channelList_subset->number, channelList->number);
366

367
		 /* nmxp_chan_sortByKey(channelList_subset);*/
368
369
370
		 nmxp_chan_sortByName(channelList_subset);

		 for(i=0; i < channelList_subset->number; i++) {
371
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", channelList_subset->channel[i].key, channelList_subset->channel[i].name);
372
373
		 }

374
		 /* 4. Send a Request Pending (optional)*/
375

376
		 /* 5. Send AddChannels*/
377

378
		 /* 6. Repeat until finished: receive and handle packets*/
379

380
		 /* 7. Send Terminate Subscription*/
381
382
383
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
384
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
385
386
	     }
	} else {
387
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
388
389
	}

390
	/* 8. Close the socket*/
391
392
393
394
395
396
397
398
399
400
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

401

402
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo, char *datas_username, char *datas_password, NMXP_CHAN_LIST **pchannelList) {
403
404
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
405
406
407
408
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
409
    int32_t connection_time;
410
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
411
    int recv_errno;
412
    
413
414
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
415
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
416
    NMXP_MSGBODY_PRECISLISTREQUEST precisListRequestBody;
417
418
    NMXP_MSGBODY_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_MSGBODY_CHANNELINFORESPONSE *channelInfo;
419
420
421
422
423
424
425

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    
    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
426
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
427
428
429
430
431
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
432
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
433
434
435
436
437
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
438
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
439
440
441
442
443
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
444
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
445
446
447
	return NULL;
    }

448
449
450
451
452
453



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
454
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
455
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
456
457
458
459
460
461
462
463

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
464
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
465
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
466
	    }
467
468
469
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
470
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
471
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
472
473
    }

474
    *pchannelList = channelList;
475

476
477
478
479
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
480
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_MSGBODY_PRECISLISTREQUEST));
481

482
483

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
484
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
485
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
486
487
488
489
490
491
492
493
494
495
496
497
498

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

499
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
500
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n", precisChannelList->channel[i].key, precisChannelList->channel[i].name);
501
502
503
	    }

	    /*
504
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
505
506
507
		    precisChannelList->channel[i].key, precisChannelList->channel[i].name,
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
		    str_start, str_end);
508
		    */
509
510
511
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
512
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
513
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
514
515
516
    }


517
518
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
519

520
521
522
523
524
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_MSGBODY_CHANNELINFOREQUEST));
525

526
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
527
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
528
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
529

530
531
532
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
533

534
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
535
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n", iter->key, channelInfo->key, iter->name);
536
537
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
538
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
539
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
540
541
542
		}
	    }
	}
543
544
545
546
547
548
549
550
551
552
553
554
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

555
    return chan_list;
556
557
}

558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
574
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "Error pa and/or pb are NULL!\n");
575
576
577
578
579
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
580
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
581
582
583
584
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
585
586
    /* TODO 
     * Suppose a packet can contain 1/4 secs of data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
587
588
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
589
    raw_stream_buffer->timeoutrecv = timeoutrecv;
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
612
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
613
614
615
616
617
618
619
620
621
622
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;

623
    /* Allocate pd copy value from a_pd */
624
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
646
    }
647
648

    /* First time */
649
    if(p->last_seq_no_sent == -1  &&  pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
650
651
652
653
654
655
656
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
657
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "First time nmxp_raw_stream_manage() for %s.%s.%s .\n", pd->network, pd->station, pd->channel);
658
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
659

660
661
662
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
663
664

    /* Add pd and sort array */
665
666
667
668
    if( (p->n_pdlist >= p->max_pdlist_items
	    || latency >= p->max_tolerable_latency) &&
	    p->timeoutrecv <= 0
	    ) {
669
670
671
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
672
673
674
675
676
677
678

	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
679
680
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
			"Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2fs  n_pdlist %d  lat. %.1fs!\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
681
682
683
684
685
686
687
688
689
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, p->n_pdlist, latency);
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
690
691
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM,
			"NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs  lat. %.1fs\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
692
693
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
694
695
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
717
718
719
	}

    } else {
720
721
722
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
723
724
725
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

726
    /* TODO Check for packet duplication in pd->pdlist*/
727
728
729
730
731

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
732
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
733
734
735
	}
    }

736
    /* Condition for time-out (pd is NULL) */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
737
738
739
740
    if(!pd && p->n_pdlist > 0) {
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
741
742
743
744
745
746
747
748
749
750
751

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
752
	    /* Duplicated packets: Discarded*/
753
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
754
755
756
757
758
759
760
761
762
		    p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
763
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s time is not correct seq_no_diff=%d time_diff=%.2fs  ([%d] %d-%d)  (%s - %.2f sec.) lat. %.1fs\n",
764
765
766
767
768
769
770
771
772
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, latency);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
773
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs  lat. %.1fs\n",
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff, latency);
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

806
807
808
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
809
810
811
812

    return ret;
}

813
814
815
816
817
818
819

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}