nmxp.c 24.8 KB
Newer Older
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Nanometrics Protocol Library
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
10
 * $Id: nmxp.c,v 1.54 2007-11-23 17:34:18 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
12
13
14
15
 */

#include "nmxp.h"

Matteo Quintiliani's avatar
Matteo Quintiliani committed
16
17
#include "config.h"

Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
18
#include <stdio.h>
19
#include <stdlib.h>
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
20
21
#include <string.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
22
23
24
25
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

26
int nmxp_sendConnect(int isock) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
27
    return nmxp_sendMessage(isock, NMXP_MSG_CONNECT, NULL, 0);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
28
29
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
30
31
int nmxp_sendTerminateSubscription(int isock, NMXP_SHUTDOWN_REASON reason, char *message) {
    return nmxp_sendMessage(isock, NMXP_MSG_TERMINATESUBSCRIPTION, message, ((message)? strlen(message)-1 : 0));
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
32
33
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
34
int nmxp_receiveChannelList(int isock, NMXP_CHAN_LIST **pchannelList) {
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
35
36
    int ret;
    int i;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
37
    int recv_errno;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
38

Matteo Quintiliani's avatar
Matteo Quintiliani committed
39
    NMXP_MSG_SERVER type;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
40
    void *buffer;
41
    int32_t length;
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
42
43
44

    *pchannelList = NULL;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
45
    ret = nmxp_receiveMessage(isock, &type, &buffer, &length, 0, &recv_errno);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
46

Matteo Quintiliani's avatar
Matteo Quintiliani committed
47
    if(type != NMXP_MSG_CHANNELLIST) {
48
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_CHANNELLIST!\n", type);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
49
50
51
52
53
    } else {

	*pchannelList = buffer;
	(*pchannelList)->number = ntohl((*pchannelList)->number);

54
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "number of channels %d\n", (*pchannelList)->number);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
55
56
57
58
59
	
	// TODO check

	for(i=0; i < (*pchannelList)->number; i++) {
	    (*pchannelList)->channel[i].key = ntohl((*pchannelList)->channel[i].key);
60
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", (*pchannelList)->channel[i].key, (*pchannelList)->channel[i].name);
Matteo Quintiliani's avatar
Start    
Matteo Quintiliani committed
61
62
63
64
65
66
67
68
	}

    }

    return ret;
}


69
int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, int32_t shortTermCompletion, int32_t out_format, NMXP_BUFFER_FLAG buffer_flag) {
70
    int ret;
71
    int32_t buffer_length = 16 + (4 * channelList->number); 
72
    char *buffer = malloc(buffer_length);
73
    int32_t app, i, disp;
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

    disp=0;

    app = htonl(channelList->number);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    for(i=0; i < channelList->number; i++) {
	app = htonl(channelList->channel[i].key);
	memcpy(&buffer[disp], &app, 4);
	disp+=4;
    }
    
    app = htonl(shortTermCompletion);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(out_format);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

    app = htonl(buffer_flag);
    memcpy(&buffer[disp], &app, 4);
    disp+=4;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
99
    ret = nmxp_sendMessage(isock, NMXP_MSG_ADDTIMESERIESCHANNELS, buffer, buffer_length);
100
101
102
103
104
105
106
107

    if(buffer) {
	free(buffer);
    }
    return ret;
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
108
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno ) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
109
    NMXP_MSG_SERVER type;
110
    void *buffer = NULL;
111
    int32_t length;
112
    NMXP_DATA_PROCESS *pd = NULL;
113

Matteo Quintiliani's avatar
Matteo Quintiliani committed
114
    if(nmxp_receiveMessage(isock, &type, &buffer, &length, timeoutsec, recv_errno) == NMXP_SOCKET_OK) {
115
	if(type == NMXP_MSG_COMPRESSED) {
116
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_COMPRESSED!\n", type);
117
	    pd = nmxp_processCompressedData(buffer, length, channelList, network_code);
118
	} else if(type == NMXP_MSG_DECOMPRESSED) {
119
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Type %d is NMXP_MSG_DECOMPRESSED!\n", type);
120
	    pd = nmxp_processDecompressedData(buffer, length, channelList, network_code);
121
	} else {
122
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
123
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
124
125
    }

126
    return pd;
127
128
}

129

130
int nmxp_sendConnectRequest(int isock, char *naqs_username, char *naqs_password, int32_t connection_time) {
131
132
133
    int ret;
    char crc32buf[100];
    NMXP_CONNECT_REQUEST connectRequest;
134
    int naqs_username_length, naqs_password_length;
135

136
137
138
139
140
    naqs_username_length = (naqs_username)? strlen(naqs_username) : 0;
    naqs_password_length = (naqs_password)? strlen(naqs_password) : 0;

    if(naqs_username_length == 0) {
	connectRequest.username[0] = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
141
    } else {
142
	strcpy(connectRequest.username, naqs_username);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
143
    }
144
145
146
    connectRequest.version = htonl(0);
    connectRequest.connection_time = htonl(connection_time);

147
    if(naqs_username_length == 0  &&  naqs_password_length == 0 ) {
148
	sprintf(crc32buf, "%d%d", connectRequest.version, connection_time);
149
    } else if(naqs_username_length != 0  &&  naqs_password_length != 0 ) {
150
151
	sprintf(crc32buf, "%s%d%d%s", naqs_username, connectRequest.version,
		connection_time, naqs_password);
152
    } else if(naqs_username_length != 0 ) {
153
	sprintf(crc32buf, "%s%d%d", naqs_username, connectRequest.version, connection_time);
154
    } else if(naqs_password_length != 0 ) {
155
156
157
158
159
160
161
	sprintf(crc32buf, "%d%d%s", connectRequest.version, connection_time, naqs_password);
    }
    connectRequest.crc32 = htonl(crc32((unsigned char *) crc32buf, strlen(crc32buf)));

    ret = nmxp_sendMessage(isock, NMXP_MSG_CONNECTREQUEST, &connectRequest, sizeof(NMXP_CONNECT_REQUEST));

    if(ret == NMXP_SOCKET_OK) {
162
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CRC, "Send a ConnectRequest crc32buf = (%s), crc32 = %d\n", crc32buf, connectRequest.crc32);
163
    } else {
164
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CRC, "Send a ConnectRequest.\n");
165
166
167
168
169
170
    }

    return ret;
}


171
int nmxp_readConnectionTime(int isock, int32_t *connection_time) {
172
    int ret;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
173
174
    int recv_errno;
    ret = nmxp_recv_ctrl(isock, connection_time, sizeof(int32_t), 0, &recv_errno);
175
    *connection_time = ntohl(*connection_time);
176
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Read connection time from socket %d.\n", *connection_time);
177
    if(ret != NMXP_SOCKET_OK) {
178
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Read connection time from socket.\n");
179
180
181
182
183
    }
    return ret;
}


184
int nmxp_waitReady(int isock) {
185
186
    int times = 0;
    int rc = NMXP_SOCKET_OK;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
187
188
189
    int32_t signature;
    int32_t type = 0;
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
190
    int recv_errno;
191
192

    while(rc == NMXP_SOCKET_OK  &&  type != NMXP_MSG_READY) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
193
	rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
194
195
196
	if(rc != NMXP_SOCKET_OK) return rc;
	signature = ntohl(signature);
	if(signature == 0) {
197
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "signature is equal to zero. receive again.\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
198
	    rc = nmxp_recv_ctrl(isock, &signature, sizeof(signature), 0, &recv_errno);
199
200
201
	    signature = ntohl(signature);
	}
	if(signature != NMX_SIGNATURE) {
202
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "signature is not valid. signature = %d\n", signature);
203
204
205
206
	    if(signature == 200) {
		    int32_t err_length;
		    int32_t err_reason;
		    char err_buff[200];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
207
		    rc = nmxp_recv_ctrl(isock, &err_length, sizeof(err_length), 0, &recv_errno);
208
		    err_length = ntohl(err_length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
209
		    rc = nmxp_recv_ctrl(isock, &err_reason, sizeof(err_reason), 0, &recv_errno);
210
211
		    err_reason = ntohl(err_reason);
		    if(err_length > 4) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
212
			    rc = nmxp_recv_ctrl(isock, err_buff, err_length-4, 0, &recv_errno);
213
214
			    err_buff[err_length] = 0;
		    }
215
		    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "TerminateMessage from Server: %s (%d).\n", err_buff, err_reason);
216
	    }
217
218
219
	    return NMXP_SOCKET_ERROR;
	}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
220
	rc = nmxp_recv_ctrl(isock, &type, sizeof(type), 0, &recv_errno);
221
222
223
	if(rc != NMXP_SOCKET_OK) return rc;
	type = ntohl(type);
	if(type != NMXP_MSG_READY) {
224
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "type is not READY. type = %d\n", type);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
225
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
226
227
228
229
230
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length > 0) {
		if(length == 4) {
		    int32_t app;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
231
		    rc = nmxp_recv_ctrl(isock, &app, length, 0, &recv_errno);
232
233
		    if(rc != NMXP_SOCKET_OK) return rc;
		    app = ntohl(app);
234
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "value = %d\n", app);
235
236
		} else {
		    char *buf_app = (char *) malloc(sizeof(char) * length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
237
		    rc = nmxp_recv_ctrl(isock, buf_app, length, 0, &recv_errno);
238
239
240
241
242
243
		    if(buf_app) {
			free(buf_app);
		    }
		}
	    }
	} else {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
244
	    rc = nmxp_recv_ctrl(isock, &length, sizeof(length), 0, &recv_errno);
245
246
247
	    if(rc != NMXP_SOCKET_OK) return rc;
	    length = ntohl(length);
	    if(length != 0) {
248
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "length is not equal to zero. length = %d\n", length);
249
250
251
252
253
254
		return NMXP_SOCKET_ERROR;
	    }
	}

	times++;
	if(times > 10) {
255
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "waiting_ready_message. times > 10\n");
256
257
258
259
260
261
262
263
264
	    rc = NMXP_SOCKET_ERROR;
	}

    }

    return rc;
}


265
int nmxp_sendDataRequest(int isock, int32_t key, int32_t start_time, int32_t end_time) {
266
267
268
269
270
271
272
273
274
275
    int ret;
    NMXP_DATA_REQUEST dataRequest;

    dataRequest.chan_key = htonl(key);
    dataRequest.start_time = htonl(start_time);
    dataRequest.end_time = htonl(end_time);

    ret = nmxp_sendMessage(isock, NMXP_MSG_DATAREQUEST, &dataRequest, sizeof(dataRequest));

    if(ret != NMXP_SOCKET_OK) {
276
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Send a Request message\n");
277
278
279
280
281
282
    }

    return ret;
}


283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype) {
    int naqssock;
    NMXP_CHAN_LIST *channelList = NULL, *channelList_subset = NULL;
    int i;

    // 1. Open a socket
    naqssock = nmxp_openSocket(hostname, portnum);

    if(naqssock != NMXP_SOCKET_ERROR) {

	// 2. Send a Connect
	if(nmxp_sendConnect(naqssock) == NMXP_SOCKET_OK) {

	    // 3. Receive ChannelList
	     if(nmxp_receiveChannelList(naqssock, &channelList) == NMXP_SOCKET_OK) {

		 channelList_subset = nmxp_chan_getType(channelList, datatype);
300
		 nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%d / %d\n", channelList_subset->number, channelList->number);
301
302
303
304
305

		 // nmxp_chan_sortByKey(channelList_subset);
		 nmxp_chan_sortByName(channelList_subset);

		 for(i=0; i < channelList_subset->number; i++) {
306
		     nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CHANNEL, "%12d %s\n", channelList_subset->channel[i].key, channelList_subset->channel[i].name);
307
308
309
310
311
312
313
314
315
316
317
318
		 }

		 // 4. Send a Request Pending (optional)

		 // 5. Send AddChannels

		 // 6. Repeat until finished: receive and handle packets

		 // 7. Send Terminate Subscription
		 nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");

	     } else {
319
		 nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on receiveChannelList()\n");
320
321
	     }
	} else {
322
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Error on sendConnect()\n");
323
324
325
326
327
328
329
330
331
332
333
334
335
	}

	// 8. Close the socket
	nmxp_closeSocket(naqssock);
    }

    if(channelList) {
	free(channelList);
    }

    return channelList_subset;
}

336

337
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo) {
338
339
    int naqssock;
    NMXP_CHAN_PRECISLIST *precisChannelList = NULL;
340
341
342
343
    NMXP_CHAN_LIST *channelList = NULL;
    NMXP_META_CHAN_LIST *chan_list = NULL;
    NMXP_META_CHAN_LIST *iter = NULL;
    int i = 0;
344
    int32_t connection_time;
345
346
    char *datas_username = NULL, *datas_password = NULL;
    int ret_sock;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
347
    int recv_errno;
348

349
    
350
351
    NMXP_MSG_SERVER type;
    void *buffer = NULL;
352
    int32_t length;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
353
    NMXP_MSGBODY_PRECISLISTREQUEST precisListRequestBody;
354
355
    NMXP_MSGBODY_CHANNELINFOREQUEST channelInfoRequestBody;
    NMXP_MSGBODY_CHANNELINFORESPONSE *channelInfo;
356
357
358
359
360
361
362
363

    char str_start[200], str_end[200];
    str_start[0] = 0;
    str_end[0] = 0;
    

    /* DAP Step 1: Open a socket */
    if( (naqssock = nmxp_openSocket(hostname, portnum)) == NMXP_SOCKET_ERROR) {
364
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error opening socket!\n");
365
366
367
368
369
	return NULL;
    }

    /* DAP Step 2: Read connection time */
    if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
370
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error reading connection time from server!\n");
371
372
373
374
375
	return NULL;
    }

    /* DAP Step 3: Send a ConnectRequest */
    if(nmxp_sendConnectRequest(naqssock, datas_username, datas_password, connection_time) != NMXP_SOCKET_OK) {
376
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error sending connect request!\n");
377
378
379
380
381
	return NULL;
    }

    /* DAP Step 4: Wait for a Ready message */
    if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
382
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error waiting Ready message!\n");
383
384
385
	return NULL;
    }

386
387
388
389
390
391



    /* DAP Step 5: Send Data Request */
    nmxp_sendHeader(naqssock, NMXP_MSG_CHANNELLISTREQUEST, 0);
    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
392
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
393
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
394
395
396
397
398
399
400
401

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	channelList = buffer;

	channelList->number = ntohl(channelList->number);

	for(i = 0; i < channelList->number; i++) {
	    channelList->channel[i].key = ntohl(channelList->channel[i].key);
402
	    if(getDataTypeFromKey(channelList->channel[i].key) == datatype) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
403
		nmxp_meta_chan_add(&chan_list, channelList->channel[i].key, channelList->channel[i].name, 0, 0, NULL, NMXP_META_SORT_NAME);
404
	    }
405
406
407
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
408
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
409
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
410
411
412
    }


413
414
415
416
    /* DAP Step 5: Send Data Request */
    precisListRequestBody.instr_id = htonl(-1);
    precisListRequestBody.datatype = htonl(NMXP_DATA_TIMESERIES);
    precisListRequestBody.type_of_channel = htonl(-1);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
417
    nmxp_sendMessage(naqssock, NMXP_MSG_PRECISLISTREQUEST, &precisListRequestBody, sizeof(NMXP_MSGBODY_PRECISLISTREQUEST));
418

419
420

    /* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
421
    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
422
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
423
424
425
426
427
428
429
430
431
432
433
434
435

    while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
	precisChannelList = buffer;

	precisChannelList->number = ntohl(precisChannelList->number);
	for(i = 0; i < precisChannelList->number; i++) {
	    precisChannelList->channel[i].key = ntohl(precisChannelList->channel[i].key);
	    precisChannelList->channel[i].start_time = ntohl(precisChannelList->channel[i].start_time);
	    precisChannelList->channel[i].end_time = ntohl(precisChannelList->channel[i].end_time);

	    nmxp_data_to_str(str_start, precisChannelList->channel[i].start_time);
	    nmxp_data_to_str(str_end, precisChannelList->channel[i].end_time);

436
	    if(!nmxp_meta_chan_set_times(chan_list, precisChannelList->channel[i].key, precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time)) {
437
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d not found for %s!\n", precisChannelList->channel[i].key, precisChannelList->channel[i].name);
438
439
440
	    }

	    /*
441
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_ANY, "%12d %12s %10d %10d %20s %20s\n",
442
443
444
		    precisChannelList->channel[i].key, precisChannelList->channel[i].name,
		    precisChannelList->channel[i].start_time, precisChannelList->channel[i].end_time,
		    str_start, str_end);
445
		    */
446
447
448
	}

	/* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
449
	ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
450
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
451
452
453
    }


454
455
    if(flag_request_channelinfo) {
	for(iter = chan_list; iter != NULL; iter = iter->next) {
456

457
458
459
460
461
	    if(getChannelNumberFromKey(iter->key) == 0) {
		/* DAP Step 5: Send Data Request */
		channelInfoRequestBody.key = htonl(iter->key);
		channelInfoRequestBody.ignored = htonl(0);
		nmxp_sendMessage(naqssock, NMXP_MSG_CHANNELINFOREQUEST, &channelInfoRequestBody, sizeof(NMXP_MSGBODY_CHANNELINFOREQUEST));
462

463
		/* DAP Step 6: Receive Data until receiving a Ready message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
464
		ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
465
		nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
466

467
468
469
		while(ret_sock == NMXP_SOCKET_OK   &&    type != NMXP_MSG_READY) {
		    channelInfo = buffer;
		    channelInfo->key = ntohl(channelInfo->key);
470

471
		    if(!nmxp_meta_chan_set_network(chan_list, channelInfo->key, channelInfo->network)) {
472
			nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CHANNEL, "Key %d (%d) not found for %s!\n", iter->key, channelInfo->key, iter->name);
473
474
		    }
		    /* Receive Message */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
475
		    ret_sock = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
476
		    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_EXTRA, "ret_sock = %d, type = %d, length = %d\n", ret_sock, type, length);
477
478
479
		}
	    }
	}
480
481
482
483
484
485
486
487
488
489
490
491
    }



    /* DAP Step 7: Repeat steps 5 and 6 for each data request */

    /* DAP Step 8: Send a Terminate message (optional) */
    nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");

    /* DAP Step 9: Close the socket */
    nmxp_closeSocket(naqssock);

492
493
494
    nmxp_meta_chan_print(chan_list);

    return chan_list;
495
496
}

497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
519
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tolerable_latency, int timeoutrecv) {
520
521
522
523
    int j;

    raw_stream_buffer->last_seq_no_sent = -1;
    raw_stream_buffer->last_sample_time = -1.0;
524
525
    /* TODO 
     * Suppose a packet can contain 1/4 secs of data */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
526
527
    raw_stream_buffer->max_tolerable_latency = max_tolerable_latency;
    raw_stream_buffer->max_pdlist_items = max_tolerable_latency * 4;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
528
    raw_stream_buffer->timeoutrecv = timeoutrecv;
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
    raw_stream_buffer->n_pdlist = 0;
    raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
    for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	raw_stream_buffer->pdlist[j] = NULL;
    }

}


void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
    int j;
    if(raw_stream_buffer->pdlist) {
	for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
	    if(raw_stream_buffer->pdlist[j]) {
		free(raw_stream_buffer->pdlist[j]);
	    }
	}
	free(raw_stream_buffer->pdlist);
    }
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
551
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
552
553
554
555
556
557
558
559
560
561
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    double latency = 0.0;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
    NMXP_DATA_PROCESS *pd = NULL;

562
    if(a_pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
	/*
	if(a_pd->packet_type == 33 || a_pd->packet_type == 97) {
	    nmxp_data_log(a_pd);
	}
	*/

	/* Allocate memory for pd and copy a_pd */
	pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
	memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
	if(a_pd->length > 0) {
	    pd->buffer = malloc(pd->length);
	    memcpy(pd->buffer, a_pd->buffer, a_pd->length);
	} else {
	    pd->buffer = NULL;
	}
	if(a_pd->nSamp *  sizeof(int) > 0) {
	    pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	    memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
	} else {
	    pd->pDataPtr = NULL;
	}
584
    }
585
586

    /* First time */
587
    if(p->last_seq_no_sent == -1  && pd) {
Matteo Quintiliani's avatar
Matteo Quintiliani committed
588
589
590
591
592
593
594
	if(p->timeoutrecv == 0) {
	    p->last_seq_no_sent = pd->seq_no - 1;
	    p->last_sample_time = pd->time;
	} else {
	    p->last_seq_no_sent = 0;
	    p->last_sample_time = 0;
	}
595
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "First time nmxp_raw_stream_manage() for %s.%s.%s .\n", pd->network, pd->station, pd->channel);
596
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
597

598
599
600
    if(p->n_pdlist > 0) {
	latency = nmxp_data_latency(p->pdlist[0]);
    }
601
602

    /* Add pd and sort array */
603
    if(p->n_pdlist >= p->max_pdlist_items
Matteo Quintiliani's avatar
Matteo Quintiliani committed
604
	    || latency >= p->max_tolerable_latency) {
605
606
607
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
608
609
610
611
612
613
614

	if(p->n_pdlist > 0) {
	    seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	    time_diff = p->pdlist[0]->time - p->last_sample_time;
	    latency = nmxp_data_latency(p->pdlist[0]);
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    if( seq_no_diff > 0) {
615
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2fs  n_pdlist %d  lat. %.1fs!\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
616
617
618
619
620
621
622
623
624
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, p->n_pdlist, latency);
		for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		    (*p_func_pd[i_func_pd])(p->pdlist[0]);
		}
		p->last_seq_no_sent = (p->pdlist[0]->seq_no);
		p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	    } else {
		/* It should not occur */
625
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs  lat. %.1fs\n",
Matteo Quintiliani's avatar
Matteo Quintiliani committed
626
627
			p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
			(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
628
629
	    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
	    /* Free handled packet */
	    if(p->pdlist[0]->buffer) {
		free(p->pdlist[0]->buffer);
		p->pdlist[0]->buffer = NULL;
	    }
	    if(p->pdlist[0]->pDataPtr) {
		free(p->pdlist[0]->pDataPtr);
		p->pdlist[0]->pDataPtr = NULL;
	    }
	    if(p->pdlist[0]) {
		free(p->pdlist[0]);
		p->pdlist[0] = NULL;
	    }
	    if(pd) {
		p->pdlist[0] = pd;
	    }
	} else {
	    if(pd) {
		p->n_pdlist = 1;
		p->pdlist[0] = pd;
	    }
651
652
653
	}

    } else {
654
655
656
	if(pd) {
	    p->pdlist[p->n_pdlist++] = pd;
	}
657
658
659
660
661
662
663
664
665
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);

    // TODO Check for packet duplication in pd->pdlist

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
666
	    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
667
668
669
	}
    }

Matteo Quintiliani's avatar
Matteo Quintiliani committed
670
671
672
673
674
    // Condition for time-out (pd is NULL)
    if(!pd && p->n_pdlist > 0) {
	p->last_seq_no_sent = p->pdlist[0]->seq_no - 1;
	p->last_sample_time = p->pdlist[0]->time;
    }
675
676
677
678
679
680
681
682
683
684
685
686

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	latency = nmxp_data_latency(p->pdlist[j]);
	nmxp_data_to_str(str_time, p->pdlist[j]->time);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
687
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2fs  lat %.1fs\n",
688
689
690
691
692
693
694
695
696
		    p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
697
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s time is not correct seq_no_diff=%d time_diff=%.2fs  ([%d] %d-%d)  (%s - %.2f sec.) lat. %.1fs\n",
698
699
700
701
702
703
704
705
706
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, latency);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
707
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_RAWSTREAM, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs  lat. %.1fs\n",
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff, latency);
	}
    }

    /* Shift and free j handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k < j) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
	    }
	    if(k + j < p->n_pdlist) {
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

740
741
742
    /* TOREMOVE
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_RAWSTREAM, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
       */
743
744
745
746

    return ret;
}

747
748
749
750
751
752
753

/* TODO */
int nmxp_raw_stream_manage_flush(NMXP_RAW_STREAM_DATA *p, int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;

    return ret;
}