nmxp_base.c 20.5 KB
Newer Older
1
2
/*! \file
 *
Matteo Quintiliani's avatar
Matteo Quintiliani committed
3
 * \brief Base for Nanometrics Protocol Library
4
5
6
7
8
9
 *
 * Author:
 * 	Matteo Quintiliani
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
10
 * $Id: nmxp_base.c,v 1.81 2009-08-31 12:16:41 mtheo Exp $
Matteo Quintiliani's avatar
Matteo Quintiliani committed
11
 *
12
13
 */

Matteo Quintiliani's avatar
Matteo Quintiliani committed
14
#include "config.h"
15
#include "nmxp_base.h"
16
#include "nmxp_memory.h"
Matteo Quintiliani's avatar
Matteo Quintiliani committed
17
#ifdef HAVE_WINDOWS_H
18
#include "nmxp_win.h"
Matteo Quintiliani's avatar
Matteo Quintiliani committed
19
#endif
20
21
22
23
24
25

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>

Matteo Quintiliani's avatar
Matteo Quintiliani committed
26
27
28
29
#ifdef HAVE_WINDOWS_H
#include "winsock2.h"
#warning You are compiling on Windows MinGW...
#else
30
31
32
33
34
35
36
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
Matteo Quintiliani's avatar
Matteo Quintiliani committed
37
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
38

Matteo Quintiliani's avatar
Matteo Quintiliani committed
39
40
41
#define MAX_OUTDATA 4096


42
int nmxp_openSocket(char *hostname, int portNum, int (*func_cond)(void))
43
{
44
  /*TODO stefano avoid static*/
45
  static int sleepTime = 1;
46
  int isock = -1;
47
48
49
  struct hostent *hostinfo = NULL;
  struct sockaddr_in psServAddr;
  struct in_addr hostaddr;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
50

51
#ifdef HAVE_WINDOWS_H
Matteo Quintiliani's avatar
Matteo Quintiliani committed
52
  nmxp_initWinsock();
53
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
54

55
56
  if (!hostname)
  {
57
    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Empty host name?\n");
58
59
60
61
    return -1;
  }

  if ( (hostinfo = gethostbyname(hostname)) == NULL) {
62
63
    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Cannot lookup host: %s\n",
	    NMXP_LOG_STR(hostname));
64
65
66
    return -1;
  }

67
  while(!func_cond())
68
69
  {
    isock = socket (AF_INET, SOCK_STREAM, 0);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
70
71
72
73
74
75
76
77

#ifdef HAVE_WINDOWS_H
    if (isock == INVALID_SOCKET) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW,"Error at socket()\n");
	    WSACleanup();
	    exit(1);
    }
#else
78
79
    if (isock < 0)
    {
80
      nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Can't open stream socket\n");
81
82
      exit(1);
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
83
#endif
84
85
86
87
88
89

    /* Fill in the structure "psServAddr" with the address of server
       that we want to connect with */
    memset (&psServAddr, 0, sizeof(psServAddr));
    psServAddr.sin_family = AF_INET;
    psServAddr.sin_port = htons((unsigned short) portNum);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
90
91
92
93
94
#ifdef HAVE_WINDOWS_H
    unsigned long address;
    memcpy(&address, hostinfo->h_addr, (size_t) hostinfo->h_length);
    psServAddr.sin_addr.s_addr = address;
#else
95
    psServAddr.sin_addr = *(struct in_addr *)hostinfo->h_addr_list[0];
Matteo Quintiliani's avatar
Matteo Quintiliani committed
96
#endif
97
98
99
100

    /* Report action and resolved address */
    memcpy(&hostaddr.s_addr, *hostinfo->h_addr_list,
	   sizeof (hostaddr.s_addr));
101
    nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Attempting to connect to %s port %d\n",
102
	    NMXP_LOG_STR(inet_ntoa(hostaddr)), portNum);
103

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    if(connect(isock, (struct sockaddr *)&psServAddr, sizeof(psServAddr)) >= 0) {
	sleepTime = 1;
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Connection established: socket=%i,IP=%s,port=%d\n",
		isock, NMXP_LOG_STR(inet_ntoa(hostaddr)), portNum);
	return isock;
    } else {
	nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Connecting to %s port %d. Trying again after %d seconds...\n",
		NMXP_LOG_STR(inet_ntoa(hostaddr)), portNum, sleepTime);
	nmxp_closeSocket(isock);
	isock = -1;

	if(!func_cond()) {
	    nmxp_sleep (sleepTime);
	    sleepTime *= 2;
	    if (sleepTime > NMXP_SLEEPMAX)
		sleepTime = NMXP_SLEEPMAX;
	}
121
    }
122

123
  }
124
  return isock;
125
126
127
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
128
129
130
131
132
133
134
135
136
137
int nmxp_closeSocket(int isock) {
	int ret;
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_CONNFLOW, "Closed connection.\n");
#ifdef HAVE_WINDOWS_H
	ret = closesocket(isock);
	WSACleanup();
#else
	ret = close(isock);
#endif
	return ret;
138
139
140
141
142
143
144
145
146
147
148
149
150
}


int nmxp_send_ctrl(int isock, void* buffer, int length)
{
  int sendCount = send(isock, (char*) buffer, length, 0);
  
  if (sendCount != length)
    return NMXP_SOCKET_ERROR;

  return NMXP_SOCKET_OK;
}

151
152
153
154
#ifdef HAVE_BROKEN_SO_RCVTIMEO
#warning Managing non-blocking I/O using select()
int nmxp_recv_select_timeout(int s, char *buf, int len, int timeout)
{
Matteo Quintiliani's avatar
Matteo Quintiliani committed
155
#define HIGHEST_TIMEOUT 30
156
157
158
    fd_set fds;
    int n;
    struct timeval tv;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
159
    static int message_times = 0;
160

161
    /* set up the file descriptor set*/
162
163
164
    FD_ZERO(&fds);
    FD_SET(s, &fds);

165
    /* set up the struct timeval for the timeout*/
166
167
168
169
170
171
172
    if(timeout == 0) {
	if(message_times == 0) {
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_ANY, "nmxp_recv_select_timeout(): timeout = %d\n", HIGHEST_TIMEOUT);
	    message_times++;
	}
	timeout = HIGHEST_TIMEOUT;
    }
173
174
175
    tv.tv_sec = timeout;
    tv.tv_usec = 0;

176
    /* wait until timeout or data received*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
177
    errno = 0;
178
    n = select(s+1, &fds, NULL, NULL, &tv);
179
    if (n == 0) return -2; /* timeout!*/
180
    if(errno == EINTR) return -2; /* timeout! "Interrupted system call" */
181
    if (n == -1) return -1; /* error*/
182
 
183
    /* data must be here, so do a normal recv()*/
184
185
186
    return recv(s, buf, len, 0);
}
#endif
187

188
189
int nmxp_setsockopt_RCVTIMEO(int isock, int timeoutsec) {
    int ret = 0;
190
#ifdef HAVE_WINDOWS_H
191
    int timeos;
192
#else
193
194
195

#ifndef HAVE_BROKEN_SO_RCVTIMEO
    struct timeval timeo;
196
#endif
197

198
#endif
199

200
    if(timeoutsec > 0) {
201
#ifdef HAVE_WINDOWS_H
202
203
204
205
206
207
	timeos  = timeoutsec * 1000;
	ret = setsockopt(isock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeos, sizeof(timeos));
	if (ret < 0)
	{
	    perror("setsockopt SO_RCVTIMEO");
	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
208
#else
209
210

#ifndef HAVE_BROKEN_SO_RCVTIMEO
211
212
213
214
215
216
217
218
	timeo.tv_sec  = timeoutsec;
	timeo.tv_usec = 0;
	ret = setsockopt(isock, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo));
	if (ret < 0) {
	    perror("setsockopt SO_RCVTIMEO");
	}
#else
#warning nmxp_setsockopt_RCVTIMEO() do nothing for your system.
Matteo Quintiliani's avatar
Matteo Quintiliani committed
219
#endif
220

221
#endif
222
    }
223

224
225
    return ret;
}
226

Matteo Quintiliani's avatar
Matteo Quintiliani committed
227

228
229
230
#define MAXLEN_RECV_ERRNO_STR 200

char *nmxp_strerror(int errno_value) {
231
    char * ret_recv_errno_str;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
232
#ifdef HAVE_WINDOWS_H
233
    char *recv_errno_str;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
234
#else
235

236
#ifdef HAVE_STRERROR_R
237
    char recv_errno_str[MAXLEN_RECV_ERRNO_STR]="";
238
#else
239
    char *recv_errno_str=NULL; 
240
241
242
#endif

#endif
243
    ret_recv_errno_str= (char *) NMXP_MEM_MALLOC (MAXLEN_RECV_ERRNO_STR * sizeof(char));
244
245
246
247
248
249
250
251
    ret_recv_errno_str[0] = 0;

#ifdef HAVE_WINDOWS_H
    recv_errno_str = WSAGetLastErrorMessage(errno_value);
#else

#ifdef HAVE_STRERROR_R
    strerror_r(errno_value, recv_errno_str, MAXLEN_RECV_ERRNO_STR);
252
#else
253
    recv_errno_str = strerror(errno_value);
254
255
#endif

Matteo Quintiliani's avatar
Matteo Quintiliani committed
256
#endif
257

258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
    if(recv_errno_str) {
	strncpy(ret_recv_errno_str, recv_errno_str, MAXLEN_RECV_ERRNO_STR);
    }

    return ret_recv_errno_str;
}


int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *recv_errno )
{
  int recvCount;
  int cc;
  char *buffer_char = buffer;
  char *recv_errno_str = NULL;

273
  nmxp_setsockopt_RCVTIMEO(isock, timeoutsec);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
274
  
Matteo Quintiliani's avatar
Matteo Quintiliani committed
275
  cc = 1;
276
277
  *recv_errno  = 0;
  recvCount = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
278
  while(cc > 0 && *recv_errno == 0  && recvCount < length) {
279

Matteo Quintiliani's avatar
Matteo Quintiliani committed
280
      /* TODO some operating system could not reset errno */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
281
      errno = 0;
Matteo Quintiliani's avatar
Matteo Quintiliani committed
282

283
#ifdef HAVE_BROKEN_SO_RCVTIMEO
284
285
286
      cc = nmxp_recv_select_timeout(isock, buffer_char + recvCount, length - recvCount, timeoutsec);
#else
      cc = recv(isock, buffer_char + recvCount, length - recvCount, 0);
287
288
#endif

289
290
291
#ifdef HAVE_WINDOWS_H
      *recv_errno  = WSAGetLastError();
#else
292
293
294
295
296
      if(cc == -2) {
	  *recv_errno  = EWOULDBLOCK;
      } else {
	  *recv_errno  = errno;
      }
297
#endif
Matteo Quintiliani's avatar
Matteo Quintiliani committed
298
      if(cc <= 0) {
299
	  /*
300
301
	  nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "nmxp_recv_ctrl(): (cc=%d <= 0) errno=%d  recvCount=%d  length=%d\n",
	  cc, *recv_errno, recvCount, length);
302
	  */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
303
304
305
      } else {
	  recvCount += cc;
      }
306
  }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
307

308
  nmxp_setsockopt_RCVTIMEO(isock, 0);
309

Matteo Quintiliani's avatar
Matteo Quintiliani committed
310
  if (recvCount != length  ||  *recv_errno != 0  ||  cc <= 0) {
311

312
      recv_errno_str = nmxp_strerror(*recv_errno);
313

314
#ifdef HAVE_WINDOWS_H
315
      if(*recv_errno != WSAEWOULDBLOCK  &&  *recv_errno != WSAETIMEDOUT)
316
317
318
319
#else
      if(*recv_errno != EWOULDBLOCK)
#endif
      {
320
321
	  nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "nmxp_recv_ctrl(): %s (errno=%d recvCount=%d length=%d cc=%d)\n",
		  NMXP_LOG_STR(recv_errno_str), *recv_errno, recvCount, length, cc);
322
      }
323
       NMXP_MEM_FREE(recv_errno_str);
324
325
326
327
328
329
      /* TO IMPROVE 
       * Fixed bug receiving zero byte from recv() 'TCP FIN or EOF received'
       * */
      if(cc == 0  &&  *recv_errno == 0) {
	  *recv_errno = -100;
      }
330

Matteo Quintiliani's avatar
Matteo Quintiliani committed
331
#ifdef HAVE_WINDOWS_H
332
      if(recvCount != length || (*recv_errno != WSAEWOULDBLOCK  &&  *recv_errno != WSAETIMEDOUT))
Matteo Quintiliani's avatar
Matteo Quintiliani committed
333
#else
334
      if(recvCount != length || *recv_errno != EWOULDBLOCK)
Matteo Quintiliani's avatar
Matteo Quintiliani committed
335
#endif
336
337
338
      {
	  return NMXP_SOCKET_ERROR;
      }
339
340
341
342
343
344
  }
  
  return NMXP_SOCKET_OK;
}


345
int nmxp_sendHeader(int isock, NMXP_MSG_CLIENT type, int32_t length)
346
{  
Matteo Quintiliani's avatar
Matteo Quintiliani committed
347
    NMXP_MESSAGE_HEADER msg;
348
349
350
351
352

    msg.signature = htonl(NMX_SIGNATURE);
    msg.type      = htonl(type);
    msg.length    = htonl(length);

Matteo Quintiliani's avatar
Matteo Quintiliani committed
353
    return nmxp_send_ctrl(isock, &msg, sizeof(NMXP_MESSAGE_HEADER));
354
355
356
}


Matteo Quintiliani's avatar
Matteo Quintiliani committed
357
int nmxp_receiveHeader(int isock, NMXP_MSG_SERVER *type, int32_t *length, int timeoutsec, int *recv_errno )
358
359
{  
    int ret ;
360
    NMXP_MESSAGE_HEADER msg={0};
361

Matteo Quintiliani's avatar
Matteo Quintiliani committed
362
    ret = nmxp_recv_ctrl(isock, &msg, sizeof(NMXP_MESSAGE_HEADER), timeoutsec, recv_errno);
363
364
365
366

    *type = 0;
    *length = 0;

367
    if((ret == NMXP_SOCKET_OK) && (msg.type != 0)) {
368
369
370
371
372
373
374
	msg.signature = ntohl(msg.signature);
	msg.type      = ntohl(msg.type);
	msg.length    = ntohl(msg.length);

	if (msg.signature != NMX_SIGNATURE)
	{
	    ret = NMXP_SOCKET_ERROR;
375
376
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW,
		    "nmxp_receiveHeader(): signature mismatches. signature = %d, type = %d, length = %d\n",
377
378
379
380
381
382
383
384
385
386
387
		    msg.signature, msg.type, msg.length);
	} else {
	    *type = msg.type;
	    *length = msg.length;
	}
    }

    return ret;
}


388
int nmxp_sendMessage(int isock, NMXP_MSG_CLIENT type, void *buffer, int32_t length) {
389
390
391
392
393
394
395
396
397
398
399
    int ret;
    ret = nmxp_sendHeader(isock, type, length);
    if( ret == NMXP_SOCKET_OK) {
	if(buffer && length > 0) {
	    ret = nmxp_send_ctrl(isock, buffer, length);
	}
    }
    return ret;
}


400
401
402
403
404
405
406
407
408
409
410
411
412
413
int32_t nmxp_display_error_from_server(char *buffer, int32_t length) {
    /* NMXP_MSG_TERMINATESUBSCRIPTION */
    char *str_msg = NULL;
    int32_t reason;
    memcpy(&reason, buffer, sizeof(reason));
    reason = ntohl(reason);
    str_msg = buffer + sizeof(reason);
    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "%d %s shutdown: %s\n",
	    reason,
	    (reason == 0)? "Normal" : (reason == 1)? "Error" : (reason == 2)? "Timeout" : "Unknown",
	    str_msg);
    return reason;
}

414
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void *buffer, int32_t *length, int timeoutsec, int *recv_errno, int buffer_length) {
415
416
417
    int ret;
    *length = 0;

Matteo Quintiliani's avatar
Matteo Quintiliani committed
418
    ret = nmxp_receiveHeader(isock, type, length, timeoutsec, recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
419

Matteo Quintiliani's avatar
Matteo Quintiliani committed
420
    if( ret == NMXP_SOCKET_OK  ) {
421
422
423
424
425
426
427

	if(*length > buffer_length) {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_ANY, "nmxp_receiveMessage(): size of received messagge is bigger than buffer. (%d > %d). \n",
		    *length, buffer_length);
	    ret = NMXP_SOCKET_ERROR;
	} else if (*length > 0) {
	    ret = nmxp_recv_ctrl(isock, buffer, *length, 0, recv_errno);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
428

429
430
	    if(*type == NMXP_MSG_TERMINATESUBSCRIPTION) {
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Received TerminateSubscritption.\n");
431
		nmxp_display_error_from_server(buffer, *length);
432
	    } else if(*type == NMXP_MSG_ERROR) {
433
		nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Received ErrorMessage: %s\n", NMXP_LOG_STR(buffer));
Matteo Quintiliani's avatar
Matteo Quintiliani committed
434
	    } else {
435
		nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "Received message type: %d  length=%d\n", *type, *length);
Matteo Quintiliani's avatar
Matteo Quintiliani committed
436
437
438
	    }

	}
Matteo Quintiliani's avatar
Matteo Quintiliani committed
439
440
441
    }

    if(*recv_errno != 0) {
442
#ifdef HAVE_WINDOWS_H
443
	if(*recv_errno == WSAEWOULDBLOCK  ||  *recv_errno == WSAETIMEDOUT) {
444
#else
445
	if(*recv_errno == EWOULDBLOCK) {
446
#endif
447
	    nmxp_log(NMXP_LOG_WARN, NMXP_LOG_D_DOD, "Timeout receiving in nmxp_receiveMessage()\n");
Matteo Quintiliani's avatar
Matteo Quintiliani committed
448
	} else {
449
450
	    /* Log message is not necessary because managed by nmxp_recv_ctrl() */
	    /* nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_CONNFLOW, "Error in nmxp_receiveMessage()\n"); */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
451
	}
452
    }
Matteo Quintiliani's avatar
Matteo Quintiliani committed
453

454
455
456
    return ret;
}

457

458
NMXP_DATA_PROCESS *nmxp_processDecompressedData(char* buffer_data, int length_data, NMXP_CHAN_LIST_NET *channelList, const char *network_code_default)
459
460
461
462
463
464
{
  int32_t   netInt    = 0;
  int32_t   pKey      = 0;
  double    pTime     = 0.0;
  int32_t   pNSamp    = 0;
  int32_t   pSampRate = 0;
465
  int32_t  *pDataPtr  = NULL;
466
467
  int       swap      = 0;
  int       idx;
468
469
  int32_t  *outdata   = NULL;

470

471
472
473
  char station_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
  char channel_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
  char network_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
474

475
  char *nmxp_channel_name = NULL;
476
  NMXP_DATA_PROCESS *pd   = NULL;
477
478
479
480
481
482

  /* copy the header contents into local fields and swap */
  memcpy(&netInt, &buffer_data[0], 4);
  pKey = ntohl(netInt);
  if ( pKey != netInt ) { swap = 1; }

483
484
  nmxp_data_init(pd);
  outdata = (int32_t *) NMXP_MEM_MALLOC(MAX_OUTDATA*sizeof(int32_t));
485
486
  nmxp_channel_name = nmxp_chan_lookupName(pKey, channelList);

487
  if(nmxp_channel_name != NULL) {
488

489
  memcpy(&pTime, &buffer_data[4], 8);
490
  if ( swap ) { nmxp_data_swap_8b(&pTime); }
491
492
493
494
495
496
497

  memcpy(&netInt, &buffer_data[12], 4);
  pNSamp = ntohl(netInt);
  memcpy(&netInt, &buffer_data[16], 4);
  pSampRate = ntohl(netInt);

  /* There should be (length_data - 20) bytes of data as 32-bit ints here */
Matteo Quintiliani's avatar
Matteo Quintiliani committed
498
499
  memcpy(outdata , (int32_t *) &buffer_data[20], length_data - 20);
  pDataPtr = outdata;
500
501
502
503
504
505
506

  /* Swap the data samples to host order */
  for ( idx=0; idx < pNSamp; idx++ ) {
      netInt = ntohl(pDataPtr[idx]);
      pDataPtr[idx] = netInt;
  }

507
  if(!nmxp_chan_cpy_sta_chan(nmxp_channel_name, station_code, channel_code, network_code)) {
508
509
    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Channel name not in STA.CHAN format: %s\n",
	    NMXP_LOG_STR(nmxp_channel_name));
510
  }
511
512

  pd->key = pKey;
513
  if(network_code[0] != 0) {
514
      strncpy(pd->network, network_code, NMXP_DATA_NETWORK_LENGTH);
515
  } else {
516
      strncpy(pd->network, network_code_default, NMXP_DATA_NETWORK_LENGTH);
517
  }
518
  if(station_code[0] != 0) {
519
      strncpy(pd->station, station_code, NMXP_DATA_STATION_LENGTH);
520
  }
521
  if(channel_code[0] != 0) {
522
      strncpy(pd->channel, channel_code, NMXP_DATA_CHANNEL_LENGTH);
523
  }
524
525
526
527
528
529
530
531
532
533
534
  pd->packet_type = NMXP_MSG_DECOMPRESSED;
  pd->x0 = -1;
  pd->xn = -1;
  pd->x0n_significant = 0;
  pd->time = pTime;
  pd->nSamp = pNSamp;
  pd->pDataPtr = pDataPtr;
  pd->sampRate = pSampRate;



535
536
537
  /* TODO*/
  /* pd.oldest_seq_no = ;*/
  /* pd.seq_no = ;*/
538

539
  NMXP_MEM_FREE(nmxp_channel_name);
540
541
  } else {
      nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Channel name not found for key %d\n", pKey);
542
543
  }

544
  return pd;
545
546
547
}


548
NMXP_DATA_PROCESS *nmxp_processCompressedData(char* buffer_data, int length_data, NMXP_CHAN_LIST_NET *channelList, const char *network_code_default)
549
550
551
552
553
{
    int32_t   pKey      = 0;
    double    pTime     = 0.0;
    int32_t   pNSamp    = 0;
    int32_t   pSampRate = 0;
554
    int32_t  *pDataPtr  = NULL;
555

556
557
558
    char station_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
    char channel_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
    char network_code[NMXP_CHAN_MAX_SIZE_STR_PATTERN];
559

560
561
562
    NMXP_DATA_PROCESS *pd = NULL;


563

564
    int32_t nmx_rate_code_to_sample_rate[32] = {
565
566
567
568
569
	0,1,2,5,10,20,40,50,
	80,100,125,200,250,500,1000,25,
	120,0,0,0,0,0,0,0,
	0,0,0,0,0,0,0,0};

570
	int32_t nmx_oldest_sequence_number;
571
572
	char nmx_hdr[25];
	unsigned char nmx_ptype;
573
	int32_t nmx_seconds;
574
	double nmx_seconds_double;
575
576
	int16_t nmx_ticks, nmx_instr_id;
	int32_t nmx_seqno;
577
	unsigned char nmx_sample_rate;
578
579
	int32_t nmx_x0;
	int32_t rate_code, chan_code, this_sample_rate;
580

581
	int32_t comp_bytecount;
582
	unsigned char *indata;
583
584
        int32_t * outdata = NULL;

585
586
	int32_t nout, i, k;
	int32_t prev_xn;
587
588
	const uint32_t high_scale = 4096 * 2048;
	const uint32_t high_scale_p = 4096 * 4096;
589

590
591
	char *nmxp_channel_name = NULL;

592
593
594
        pd= (NMXP_DATA_PROCESS *) NMXP_MEM_MALLOC(sizeof(NMXP_DATA_PROCESS));
        memset(pd,0,sizeof(NMXP_DATA_PROCESS));

595
	/* TOREMOVE int my_order = get_my_wordorder();*/
Matteo Quintiliani's avatar
Matteo Quintiliani committed
596
	int my_host_is_bigendian = nmxp_data_bigendianhost();
597
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "my_host_is_bigendian %d\n", my_host_is_bigendian);
598
599

	memcpy(&nmx_oldest_sequence_number, buffer_data, 4);
600
601
602
	if (my_host_is_bigendian) {
	    nmxp_data_swap_4b (&nmx_oldest_sequence_number);
	}
603
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Oldest sequence number = %d\n", nmx_oldest_sequence_number);
604
605
606
607
608
609

	memcpy(nmx_hdr, buffer_data+4, 17);
	/* Decode the Nanometrics packet header bundle. */
	memcpy (&nmx_ptype, nmx_hdr+0, 1);
	if ( (nmx_ptype & 0xf) == 9) {
	    /* Filler packet.  Discard entire packet.   */
610
	    nmxp_log (NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Filler packet - discarding\n");
611
	    /*m continue;*/
612
613
614
615
616
617
618
619
620
621
622
	    exit(0);
	}

	nmx_x0 = 0;
	memcpy (&nmx_seconds, nmx_hdr+1, 4);
	memcpy (&nmx_ticks, nmx_hdr+5, 2);
	memcpy (&nmx_instr_id, nmx_hdr+7, 2);
	memcpy (&nmx_seqno, nmx_hdr+9, 4);
	memcpy (&nmx_sample_rate, nmx_hdr+13, 1);
	memcpy (&nmx_x0, nmx_hdr+14, 3);

623
	if (my_host_is_bigendian) {
624
	    nmxp_data_swap_4b ((int32_t *)&nmx_seconds);
625
626
627
628
	    nmxp_data_swap_2b (&nmx_ticks);
	    nmxp_data_swap_2b (&nmx_instr_id);
	    nmxp_data_swap_4b (&nmx_seqno);
	    nmxp_data_swap_4b (&nmx_x0);
629
	}
630
631
632
633
634
635
636

	/* check if nmx_x0 is negative like as signed 3-byte int */
	if( (nmx_x0 & high_scale) ==  high_scale) {
	    /* nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "WARNING: changed nmx_x0, old value = %d\n",  nmx_x0);*/
	    nmx_x0 -= high_scale_p;
	}

637
638
639
640
641
	nmx_seconds_double = (double) nmx_seconds + ( (double) nmx_ticks / 10000.0 );
	rate_code = nmx_sample_rate>>3;
	chan_code = nmx_sample_rate&7;
	this_sample_rate = nmx_rate_code_to_sample_rate[rate_code];

642
643
644
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_ptype          = %d\n", nmx_ptype);
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_seconds        = %d\n", nmx_seconds);
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_ticks          = %d\n", nmx_ticks);
645

646
647
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_seconds_double = %f\n", nmx_seconds_double);
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_x0             = %d\n", nmx_x0);
648

649
650
651
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_instr_id       = %d\n", nmx_instr_id);
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_seqno          = %d\n", nmx_seqno);
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "nmx_sample_rate    = %d\n", nmx_sample_rate);
652
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "this_sample_rate   = %d\n", this_sample_rate);
653

654
655
656
657
658
659
	pKey = (nmx_instr_id << 16) | ( 1 << 8) | ( chan_code);

	pTime = nmx_seconds_double;

	pSampRate = this_sample_rate;

660
661
	nmxp_data_init(pd);
        outdata = (int32_t *) NMXP_MEM_MALLOC(MAX_OUTDATA*sizeof(int32_t));
662
663
664
665
666
	nmxp_channel_name = nmxp_chan_lookupName(pKey, channelList);

	if(nmxp_channel_name) {

	if(!nmxp_chan_cpy_sta_chan(nmxp_channel_name, station_code, channel_code, network_code)) {
667
668
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Channel name not in STA.CHAN format: %s\n",
		    NMXP_LOG_STR(nmxp_channel_name));
669
670
	}
  
671
672
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Channel key %d for %s.%s\n",
		pKey, NMXP_LOG_STR(station_code), NMXP_LOG_STR(channel_code));
673

674
675
676
	comp_bytecount = length_data-21;
	indata = (unsigned char *) buffer_data + 21;

677
678
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "comp_bytecount     = %d  (N = %.2f)\n", comp_bytecount, (double) comp_bytecount / 17.0);

679
680
681
682
683
684
	/* Unpack the data bundles, each 17 bytes long. */
	prev_xn = nmx_x0;
	outdata[0] = nmx_x0;
	nout = 1;
	for (i=0; i<comp_bytecount; i+=17) {
	    if (i+17>comp_bytecount) {
685
		nmxp_log (NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "comp_bytecount = %d, i+17 = %d\n",
686
687
688
689
			comp_bytecount, i+17);
		exit(1);
	    }
	    if (nout+16 > MAX_OUTDATA)  {
690
		nmxp_log (NMXP_LOG_ERR,  NMXP_LOG_D_PACKETMAN, "Output buffer size too small\n");
691
692
693
		exit(1);
	    }
	    k = nmxp_data_unpack_bundle (outdata+nout,indata+i,&prev_xn);
694
695
696
697
698
	    if (k < 0) nmxp_log (NMXP_LOG_WARN, NMXP_LOG_D_PACKETMAN, "Null bundle: %s.%s.%s (k=%d) %s %d\n",
		    NMXP_LOG_STR(network_code),
		    NMXP_LOG_STR(station_code),
		    NMXP_LOG_STR(channel_code), k,
		    __FILE__,  __LINE__);
699
700
701
702
703
704
	    if (k < 0) break;
	    nout += k;
	    /* prev_xn = outdata[nout-1]; */
	}
	nout--;

705
	nmxp_log(NMXP_LOG_NORM, NMXP_LOG_D_PACKETMAN, "Unpacked %d samples.\n", nout);
706
707
708
709

	pDataPtr = outdata;
	pNSamp = nout;

710
        pd->key = pKey;
711
	if(network_code[0] != 0) {
712
	    strncpy(pd->network, network_code, NMXP_DATA_NETWORK_LENGTH);
713
	} else {
714
	    strncpy(pd->network, network_code_default, NMXP_DATA_NETWORK_LENGTH);
715
	}
716
	if(station_code[0] != 0) {
717
	    strncpy(pd->station, station_code, NMXP_DATA_STATION_LENGTH);
718
	}
719
	if(channel_code[0] != 0) {
720
	    strncpy(pd->channel, channel_code, NMXP_DATA_CHANNEL_LENGTH);
721
	}
722
723
724
725
726
727
728
729
730
731
732
733
	pd->packet_type = nmx_ptype;
	pd->x0 = nmx_x0;
	pd->xn = pDataPtr[nout];
	pd->x0n_significant = 1;
	pd->oldest_seq_no = nmx_oldest_sequence_number;
	pd->seq_no = nmx_seqno;
	pd->time = pTime;
	pd->nSamp = pNSamp;
	pd->pDataPtr = pDataPtr;
	pd->sampRate = pSampRate;

	NMXP_MEM_FREE(nmxp_channel_name);
734
735
	} else {
	    nmxp_log(NMXP_LOG_ERR, NMXP_LOG_D_PACKETMAN, "Channel name not found for key %d\n", pKey);
736
737
	}

738
739
	return pd;

740
741
}

742
743
744

unsigned int nmxp_sleep(unsigned int sleep_time) {
#ifdef HAVE_WINDOWS_H
745
    Sleep(sleep_time * 1000);
746
    return 0;
747
748
749
750
751
#else
    return sleep(sleep_time);
#endif
}

Matteo Quintiliani's avatar
Matteo Quintiliani committed
752
753
754
755
756
757
758
759
760
unsigned int nmxp_usleep(unsigned int usleep_time) {
#ifdef HAVE_WINDOWS_H
    Sleep((usleep_time+500)/1000);
    return 0;
#else
    return usleep(usleep_time);
#endif
}