nmxp.c 26.8 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp.c,v 1.56 2007-12-28 10:37:28 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#endif

30
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
31
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
35
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
36
37
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
38
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
39
40
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
41
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42

Matteo Quintiliani's avatar
Matteo Quintiliani committed
43
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
44
    void *buffer;
45
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46
47
48

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
49
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
50

Matteo Quintiliani's avatar
Matteo Quintiliani committed
51
    if(type != NMXP_MSG_CHANNELLIST) {
52
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
53
54
55
56
57
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

58
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
59
60
61
62
63
	
	// TODO check

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
64
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", (*pchannelList)->channel[i].key, (*pchannelList)->channel[i].name);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
65
66
67
68
69
70
71
72
	}

    }

    return ret;
}


73
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
74
    int ret;
75
    int32_t buffer_length = 16 + (4 * channelList->number); 
76
    char *buffer = malloc(buffer_length);
77
    int32_t app, i, disp;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
103
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
104
105
106
107
108
109
110
111

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
112
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
113
    NMXP_MSG_SERVER type;
114
    void *buffer = NULL;
115
    int32_t length;
116
    NMXP_DATA_PROCESS *pd = NULL;
117

Matteo Quintiliani's avatar
Matteo Quintiliani committed
118
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
119
	if(type == NMXP_MSG_COMPRESSED) {
120
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
121
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
122
	} else if(type == NMXP_MSG_DECOMPRESSED) {
123
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
124
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
125
	} else {
126
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
127
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
128
129
    }

130
    return pd;
131
132
}

133

134
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
135
    int ret;
136
    int i;
137
    char crc32buf[100];
138
139
    char *pcrc32buf = crc32buf;
    int crc32buf_length = 0;
140
    NMXP_CONNECT_REQUEST connectRequest;
141
    int naqs_username_length, naqs_password_length;
142
143
144
    int32_t protocol_version = 0;

    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "%s - %s\n", naqs_username, naqs_password);
145

146
147
148
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

149
150
151
152
153
154
155
156
    for(i=0; i < 100; i++) {
	crc32buf[i] = 0;
    }

    for(i=0; i<12; i++) {
	connectRequest.username[i] = 0;
    }
    if(naqs_username_length != 0) {
157
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
158
    }
159
160
161
162
163

    connectRequest.version = protocol_version;
    connectRequest.connection_time = connection_time;

    nmxp_data_swap_4b ((int32_t *)&connection_time);
164

165
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
166
167
	/* sprintf(crc32buf, "%d%d", protocol_version, connection_time); */
	/* TODO */
168
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
	/* sprintf(crc32buf, "%s%d%d%s", naqs_username, protocol_version,
		connection_time, naqs_password); */

	memcpy(pcrc32buf, naqs_username, naqs_username_length);
	crc32buf_length += naqs_username_length;
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(protocol_version), sizeof(protocol_version));
	crc32buf_length += sizeof(protocol_version);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, &(connection_time), sizeof(connection_time));
	crc32buf_length += sizeof(connection_time);
	pcrc32buf = crc32buf + crc32buf_length;

	memcpy(pcrc32buf, naqs_password, naqs_password_length);
	crc32buf_length += naqs_password_length;
	pcrc32buf = crc32buf + crc32buf_length;

188
    } else if(naqs_username_length != 0 ) {
189
190
	/* sprintf(crc32buf, "%s%d%d", naqs_username, protocol_version, connection_time); */
	/* TODO */
191
    } else if(naqs_password_length != 0 ) {
192
193
	/* sprintf(crc32buf, "%d%d%s", protocol_version, connection_time, naqs_password); */
	/* TODO */
194
    }
195
196
197
    connectRequest.version = htonl(connectRequest.version);
    connectRequest.connection_time = htonl(connectRequest.connection_time);
    connectRequest.crc32 = htonl(crc32(0L, crc32buf, crc32buf_length));
198
199
200
201

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
202
203
204
205
206
207
208
209
210
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf length %d, crc32 = %d\n", crc32buf_length, connectRequest.crc32);
	for(i=0; i < crc32buf_length; i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", crc32buf[i]);
	}
	char *pp = (char *) &connectRequest.crc32;
	for(i=0; i < sizeof(connectRequest.crc32); i++) {
	    nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "%d ", pp[i]);
	}
	nmxp_log(NMXP_LOG_NORM_NO, NMXP_LOG_D_CRC, "\n");
211
    } else {
212
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
213
214
215
216
217
218
    }

    return ret;
}


219
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
220
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
221
222
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
223
    *connection_time = ntohl(*connection_time);
224
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
225
    if(ret != NMXP_SOCKET_OK) {
226
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
227
228
229
230
231
    }
    return ret;
}


232
int nmxp_waitReady(int isock) {
233
234
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
235
236
237
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
238
    int recv_errno;
239
240

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
241
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
242
243
244
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
245
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
246
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
247
248
249
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
250
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
251
252
253
254
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
255
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
256
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
257
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
258
259
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
260
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
261
262
			    err_buff[err_length] = 0;
		    }
263
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n", err_buff, err_reason);
264
	    }
265
266
267
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
268
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
269
270
271
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
272
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
273
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
274
275
276
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
		if(type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		    char *str_msg = NULL;
		    char *buf_app = (char *) malloc(sizeof(char) * length);
		    int32_t reason;
		    memcpy(&reason, buf_app, sizeof(reason));
		    reason = ntohl(reason);
		    str_msg = buf_app + sizeof(reason);
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
			    reason,
			    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
			    str_msg);
		    if(buf_app) {
			free(buf_app);
		    }
		} else if(length == 4) {
293
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
294
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
295
296
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
297
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
298
299
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
300
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
301
302
303
304
305
306
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
307
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
308
309
310
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
311
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
312
313
314
315
316
317
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
318
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
319
320
321
322
323
324
325
326
327
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


328
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
329
330
331
332
333
334
335
336
337
338
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
339
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
340
341
342
343
344
345
    }

    return ret;
}


346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
    int i;

    // 1. Open a socket
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

	// 2. Send a Connect
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

	    // 3. Receive ChannelList
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
363
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d\n", channelList_subset->number, channelList->number);
364
365
366
367
368

		 // nmxp_chan_sortByKey(channelList_subset);
		 nmxp_chan_sortByName(channelList_subset);

		 for(i=0; i < channelList_subset->number; i++) {
369
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", channelList_subset->channel[i].key, channelList_subset->channel[i].name);
370
371
372
373
374
375
376
377
378
379
380
381
		 }

		 // 4. Send a Request Pending (optional)

		 // 5. Send AddChannels

		 // 6. Repeat until finished: receive and handle packets

		 // 7. Send Terminate Subscription
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
382
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
383
384
	     }
	} else {
385
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
386
387
388
389
390
391
392
393
394
395
396
397
398
	}

	// 8. Close the socket
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

399

400
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo, char *datas_username, char *datas_password, NMXP_CHAN_LIST **pchannelList) {
401
402
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
403
404
405
406
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
407
    int32_t connection_time;
408
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
409
    int recv_errno;
410
    
411
412
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
413
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
414
    NMXP_MSGBODY_PRECISLISTREQUEST precisListRequestBody;
415
416
    NMXP_MSGBODY_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_MSGBODY_CHANNELINFORESPONSE *channelInfo;
417
418
419
420
421
422
423

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    
    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
424
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
425
426
427
428
429
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
430
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
431
432
433
434
435
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
436
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
437
438
439
440
441
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
442
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
443
444
445
	return NULL;
    }

446
447
448
449
450
451



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
452
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
453
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
454
455
456
457
458
459
460
461

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
462
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
463
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
464
	    }
465
466
467
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
468
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
469
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
470
471
    }

472
    *pchannelList = channelList;
473

474
475
476
477
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
478
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_MSGBODY_PRECISLISTREQUEST));
479

480
481

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
482
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
483
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
484
485
486
487
488
489
490
491
492
493
494
495
496

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

497
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
498
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n", precisChannelList->channel[i].key, precisChannelList->channel[i].name);
499
500
501
	    }

	    /*
502
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
503
504
505
		    precisChannelList->channel[i].key, precisChannelList->channel[i].name,
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
		    str_start, str_end);
506
		    */
507
508
509
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
510
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
511
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
512
513
514
    }


515
516
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
517

518
519
520
521
522
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_MSGBODY_CHANNELINFOREQUEST));
523

524
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
525
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
526
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
527

528
529
530
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
531

532
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
533
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n", iter->key, channelInfo->key, iter->name);
534
535
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
536
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
537
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
538
539
540
		}
	    }
	}
541
542
543
544
545
546
547
548
549
550
551
552
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

553
    return chan_list;
554
555
}

556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
578
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
579
580
581
582
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
583
584
    /* TODO 
     * Suppose a packet can contain 1/4 secs of data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
585
586
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
587
    raw_stream_buffer->timeoutrecv = timeoutrecv;
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
610
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
611
612
613
614
615
616
617
618
619
620
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;

621
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
643
    }
644
645

    /* First time */
646
    if(p->last_seq_no_sent == -1  && pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
647
648
649
650
651
652
653
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
654
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "First time nmxp_raw_stream_manage() for %s.%s.%s .\n", pd->network, pd->station, pd->channel);
655
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
656

657
658
659
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
660
661

    /* Add pd and sort array */
662
    if(p->n_pdlist >= p->max_pdlist_items
Matteo Quintiliani's avatar
Matteo Quintiliani committed
663
	    || latency >= p->max_tolerable_latency) {
664
665
666
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
667
668
669
670
671
672
673

	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
674
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2fs  n_pdlist %d  lat. %.1fs!\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
675
676
677
678
679
680
681
682
683
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, p->n_pdlist, latency);
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
684
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs  lat. %.1fs\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
685
686
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
687
688
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
710
711
712
	}

    } else {
713
714
715
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
716
717
718
719
720
721
722
723
724
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

    // TODO Check for packet duplication in pd->pdlist

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
725
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
726
727
728
	}
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
729
730
731
732
733
    // Condition for time-out (pd is NULL)
    if(!pd && p->n_pdlist > 0) {
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
734
735
736
737
738
739
740
741
742
743
744
745

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
746
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
747
748
749
750
751
752
753
754
755
		    p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
756
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s time is not correct seq_no_diff=%d time_diff=%.2fs  ([%d] %d-%d)  (%s - %.2f sec.) lat. %.1fs\n",
757
758
759
760
761
762
763
764
765
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, latency);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
766
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs  lat. %.1fs\n",
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff, latency);
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

799
800
801
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
802
803
804
805

    return ret;
}

806
807
808
809
810
811
812

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}