Commit 353319bf authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Rearrangement of functions and some improvements


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@62 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent fdd28904
......@@ -47,6 +47,9 @@
#include "nmxp_base.h"
#include <stdio.h>
/*! \brief Flag for buffered packets */
typedef enum {
NMXP_BUFFER_NO = 0,
......@@ -129,13 +132,14 @@ int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST *channelList, uint32
* \param isock A descriptor referencing the socket.
* \param channelList Channel list.
* \param func_processData Pointer to the function manages data extracted. It could be NULL.
* \param outfile If is not NULL write buffer contents.
*
* \retval SOCKET_OK on success
* \retval SOCKET_ERROR on error
*
*/
int nmxp_receiveData(int isock, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
int (*func_processData)(NMXP_DATA_PROCESS *pd)
);
......
......@@ -12,10 +12,8 @@
#ifndef NMXP_BASE_H
#define NMXP_BASE_H 1
#include <sys/types.h>
#include "nmxp_chan.h"
#include "nmxp_data.h"
#include "nmxp_chan.h"
#include "nmxp_log.h"
/*! Maximum time between connection attempts (seconds). */
......@@ -134,5 +132,29 @@ int nmxp_sendMessage(int isock, NMXP_MSG_CLIENT type, void *buffer, uint32_t len
*/
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, uint32_t *length);
/*! \brief Process Compressed Data message by function func_processData().
*
* \param buffer_data Pointer to the data buffer containing Compressed Nanometrics packets.
* \param length_data Buffer length in bytes.
* \param channelList Pointer to the Channel List.
*
* \return Return a pointer to static struct NMXP_DATA_PROCESS.
*
*/
NMXP_DATA_PROCESS *nmxp_processCompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList);
/*! \brief Process decompressed Data message by function func_processData().
*
* \param buffer_data Pointer to the data buffer containing Decompressed Nanometrics packets.
* \param length_data Buffer length in bytes.
* \param channelList Pointer to the Channel List.
*
* \return Return a pointer to static struct NMXP_DATA_PROCESS.
*
*/
NMXP_DATA_PROCESS *nmxp_processDecompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList);
#endif
......@@ -12,8 +12,6 @@
#ifndef NMXP_DATA_H
#define NMXP_DATA_H 1
#include "nmxp_chan.h"
#include <sys/types.h>
/*! First 4 bytes of all messages. */
......@@ -97,15 +95,15 @@ typedef struct {
int *pDataPtr; /*!< \brief Array of samples */
int nSamp; /*!< \brief Number or samples */
int sampRate; /*!< \brief Sample rate */
} NMXP_PROCESS_DATA;
} NMXP_DATA_PROCESS;
/*! \brief Initialize a structure NMXP_PROCESS_DATA
/*! \brief Initialize a structure NMXP_DATA_PROCESS
*
* \param pd
*
*/
int nmxp_init_process_data(NMXP_PROCESS_DATA *pd);
int nmxp_data_init(NMXP_DATA_PROCESS *pd);
/*! \brief Unpack a 17-byte Nanometrics compressed data bundle.
......@@ -121,41 +119,14 @@ int nmxp_init_process_data(NMXP_PROCESS_DATA *pd);
* doug@seismo.berkeley.edu
*
*/
int nmxp_unpack_bundle (int *outdata, unsigned char *indata, int *prev);
/*! \brief Print info about struct NMXP_PROCESS_DATA
*
* \param pd Pointer to struct NMXP_PROCESS_DATA
*
*/
int nmxp_log_process_data(NMXP_PROCESS_DATA *pd);
/*! \brief Process Compressed Data message by function func_processData().
*
* \param buffer_data Pointer to the data buffer containing Compressed Nanometrics packets.
* \param length_data Buffer length in bytes.
* \param channelList Pointer to the Channel List.
* \param func_processData Pointer to the function manages data extracted. It could be NULL.
*
*/
void nmxp_processCompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
);
int nmxp_data_unpack_bundle (int *outdata, unsigned char *indata, int *prev);
/*! \brief Process decompressed Data message by function func_processData().
/*! \brief Print info about struct NMXP_DATA_PROCESS
*
* \param buffer_data Pointer to the data buffer containing Decompressed Nanometrics packets.
* \param length_data Buffer length in bytes.
* \param channelList Pointer to the Channel List.
* \param func_processData Pointer to the function manages data extracted. It could be NULL.
* \param pd Pointer to struct NMXP_DATA_PROCESS
*
*/
void nmxp_processDecompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
);
int nmxp_data_log(NMXP_DATA_PROCESS *pd);
#endif
......@@ -98,24 +98,29 @@ int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST *channelList, uint32
int nmxp_receiveData(int isock, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
int (*func_processData)(NMXP_DATA_PROCESS *pd)
) {
int ret;
NMXP_MSG_SERVER type;
void *buffer;
uint32_t length;
NMXP_DATA_PROCESS *pd = NULL;
ret = nmxp_receiveMessage(isock, &type, &buffer, &length);
if(type == NMXP_MSG_COMPRESSED) {
nmxp_processCompressedDataFunc(buffer, length, channelList, func_processData);
pd = nmxp_processCompressedDataFunc(buffer, length, channelList);
} else if(type == NMXP_MSG_DECOMPRESSED) {
nmxp_processDecompressedDataFunc(buffer, length, channelList, func_processData);
pd = nmxp_processDecompressedDataFunc(buffer, length, channelList);
} else {
nmxp_log(1, 0, "Type %d is not NMXP_MSG_COMPRESSED or NMXP_MSG_DECOMPRESSED!\n", type);
}
if(pd) {
func_processData(pd);
}
if(buffer) {
free(buffer);
}
......
......@@ -220,3 +220,242 @@ int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, uint32_
return ret;
}
NMXP_DATA_PROCESS *nmxp_processDecompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList)
{
int32_t netInt = 0;
int32_t pKey = 0;
double pTime = 0.0;
int32_t pNSamp = 0;
int32_t pSampRate = 0;
int32_t *pDataPtr = 0;
int swap = 0;
int idx;
char *sta = 0; /* The station code */
char *chan = 0; /* The channel code */
static NMXP_DATA_PROCESS pd;
/* copy the header contents into local fields and swap */
memcpy(&netInt, &buffer_data[0], 4);
pKey = ntohl(netInt);
if ( pKey != netInt ) { swap = 1; }
memcpy(&pTime, &buffer_data[4], 8);
if ( swap ) { swab8(&pTime); }
memcpy(&netInt, &buffer_data[12], 4);
pNSamp = ntohl(netInt);
memcpy(&netInt, &buffer_data[16], 4);
pSampRate = ntohl(netInt);
/* There should be (length_data - 20) bytes of data as 32-bit ints here */
pDataPtr = (int32_t *) &buffer_data[20];
/* Swap the data samples to host order */
for ( idx=0; idx < pNSamp; idx++ ) {
netInt = ntohl(pDataPtr[idx]);
pDataPtr[idx] = netInt;
}
/* Lookup the station and channel code */
sta = strdup(nmxp_chan_lookupName(pKey, channelList));
if ( (chan = strchr(sta, '.')) == NULL ) {
nmxp_log(1,0, "Channel name not in STN.CHAN format: %s\n", sta);
/*
free(sta);
return;
*/
}
if(chan) {
*chan++ = '\0';
}
nmxp_data_init(&pd);
pd.key = pKey;
pd.sta = sta;
pd.chan = chan;
pd.packet_type = NMXP_MSG_DECOMPRESSED;
// pd.x0 = ;
// pd.seq_no = ;
pd.time = pTime;
pd.buffer = buffer_data;
pd.length = length_data;
pd.nSamp = pNSamp;
pd.pDataPtr = pDataPtr;
pd.sampRate = pSampRate;
free(sta);
return &pd;
}
NMXP_DATA_PROCESS *nmxp_processCompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList)
{
int32_t pKey = 0;
double pTime = 0.0;
int32_t pNSamp = 0;
int32_t pSampRate = 0;
int32_t *pDataPtr = 0;
char *sta = 0; /* The station code */
char *chan = 0; /* The channel code */
static NMXP_DATA_PROCESS pd;
const int nmx_rate_code_to_sample_rate[32] = {
0,1,2,5,10,20,40,50,
80,100,125,200,250,500,1000,25,
120,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0};
int nmx_oldest_sequence_number;
char nmx_hdr[25];
unsigned char nmx_ptype;
unsigned int nmx_seconds;
double nmx_seconds_double;
short int nmx_ticks, nmx_instr_id;
int nmx_seqno;
unsigned char nmx_sample_rate;
int nmx_x0;
int rate_code, chan_code, this_sample_rate;
int comp_bytecount;
unsigned char *indata;
#define MAX_OUTDATA 4096
int32_t outdata[MAX_OUTDATA];
int nout, i, k;
int prev_xn;
int my_order = get_my_wordorder();
nmxp_log(0, 1, "my_order is %d\n", my_order);
memcpy(&nmx_oldest_sequence_number, buffer_data, 4);
nmxp_log(0, 1, "Oldest sequence number = %d\n", nmx_oldest_sequence_number);
memcpy(nmx_hdr, buffer_data+4, 17);
/* Decode the Nanometrics packet header bundle. */
memcpy (&nmx_ptype, nmx_hdr+0, 1);
if ( (nmx_ptype & 0xf) == 9) {
/* Filler packet. Discard entire packet. */
nmxp_log (1,0, "Filler packet - discarding\n");
//m continue;
exit(0);
}
nmx_x0 = 0;
memcpy (&nmx_seconds, nmx_hdr+1, 4);
memcpy (&nmx_ticks, nmx_hdr+5, 2);
memcpy (&nmx_instr_id, nmx_hdr+7, 2);
memcpy (&nmx_seqno, nmx_hdr+9, 4);
memcpy (&nmx_sample_rate, nmx_hdr+13, 1);
memcpy (&nmx_x0, nmx_hdr+14, 3);
const unsigned int high_scale = 4096 * 2048;
const unsigned int high_scale_p = 4096 * 4096;
/* check if nmx_x0 is negative like as signed 3-byte int */
if( (nmx_x0 & high_scale) == high_scale) {
// nmxp_log(0, 0, "WARNING: changed nmx_x0, old value = %d\n", nmx_x0);
nmx_x0 -= high_scale_p;
}
if (my_order != SEED_LITTLE_ENDIAN) {
swab4 ((int *)&nmx_seconds);
swab2 (&nmx_ticks);
swab2 (&nmx_instr_id);
swab4 (&nmx_seqno);
nmx_x0 = nmx_x0 >> 8;
swab4 (&nmx_x0);
nmx_x0 = nmx_x0 >> 8;
}
nmx_seconds_double = (double) nmx_seconds + ( (double) nmx_ticks / 10000.0 );
rate_code = nmx_sample_rate>>3;
chan_code = nmx_sample_rate&7;
this_sample_rate = nmx_rate_code_to_sample_rate[rate_code];
nmxp_log(0, 1, "nmx_ptype = %d\n", nmx_ptype);
nmxp_log(0, 1, "nmx_seconds = %d\n", nmx_seconds);
nmxp_log(0, 1, "nmx_ticks = %d\n", nmx_ticks);
nmxp_log(0, 1, "nmx_seconds_double = %f\n", nmx_seconds_double);
nmxp_log(0, 1, "nmx_x0 = %d\n", nmx_x0);
nmxp_log(0, 1, "nmx_instr_id = %d\n", nmx_instr_id);
nmxp_log(0, 1, "nmx_seqno = %d\n", nmx_seqno);
nmxp_log(0, 1, "nmx_sample_rate = %d\n", nmx_sample_rate);
nmxp_log(0, 1, "this_sample_rate = %d\n", this_sample_rate);
comp_bytecount = length_data-21;
indata = (unsigned char *) buffer_data + 21;
/* Unpack the data bundles, each 17 bytes long. */
prev_xn = nmx_x0;
outdata[0] = nmx_x0;
nout = 1;
for (i=0; i<comp_bytecount; i+=17) {
if (i+17>comp_bytecount) {
nmxp_log (1,0, "comp_bytecount = %d, i+17 = %d\n",
comp_bytecount, i+17);
exit(1);
}
if (nout+16 > MAX_OUTDATA) {
nmxp_log (1,0, "Output buffer size too small\n");
exit(1);
}
k = nmxp_data_unpack_bundle (outdata+nout,indata+i,&prev_xn);
if (k < 0) nmxp_log (1,0, "Break: (k=%d) %s %d\n", k, __FILE__, __LINE__);
if (k < 0) break;
nout += k;
/* prev_xn = outdata[nout-1]; */
}
nout--;
nmxp_log(0, 1, "Unpacked %d samples.\n", nout);
pKey = (nmx_instr_id << 16) | ( 1 << 8) | ( chan_code);
pTime = nmx_seconds_double;
pDataPtr = outdata;
pNSamp = nout;
pSampRate = this_sample_rate;
/* Lookup the station and channel code */
sta = strdup(nmxp_chan_lookupName(pKey, channelList));
if ( (chan = strchr(sta, '.')) == NULL ) {
nmxp_log(1,0, "Channel name not in STN.CHAN format: %s\n", sta);
/*
free(sta);
return;
*/
}
if(chan) {
*chan++ = '\0';
}
nmxp_log(0, 1, "Channel key %d for %s.%s\n", pKey, sta, chan);
nmxp_data_init(&pd);
pd.key = pKey;
pd.sta = sta;
pd.chan = chan;
pd.packet_type = nmx_ptype;
pd.x0 = nmx_x0;
pd.seq_no = nmx_seqno;
pd.time = pTime;
pd.buffer = buffer_data;
pd.length = length_data;
pd.nSamp = pNSamp;
pd.pDataPtr = pDataPtr;
pd.sampRate = pSampRate;
free(sta);
return &pd;
}
......@@ -34,11 +34,13 @@ char *nmxp_chan_lookupName(uint32_t key, NMXP_CHAN_LIST *channelList)
{
int chan_number = channelList->number;
int i_chan = 0;
static char ret[12];
for (i_chan = 0; i_chan < chan_number; i_chan++)
{
if ( key == channelList->channel[i_chan].key )
return &channelList->channel[i_chan].name[0];
strcpy(ret, channelList->channel[i_chan].name);
return ret;
}
return NULL;
......
......@@ -19,7 +19,7 @@
#include <qlib2.h>
int nmxp_init_process_data(NMXP_PROCESS_DATA *pd) {
int nmxp_data_init(NMXP_DATA_PROCESS *pd) {
pd->key = -1;
pd->sta = NULL;
pd->chan = NULL;
......@@ -36,7 +36,7 @@ int nmxp_init_process_data(NMXP_PROCESS_DATA *pd) {
}
int nmxp_unpack_bundle (int *outdata, unsigned char *indata, int *prev)
int nmxp_data_unpack_bundle (int *outdata, unsigned char *indata, int *prev)
{
int nsamples = 0;
int d4[4];
......@@ -109,7 +109,7 @@ int nmxp_unpack_bundle (int *outdata, unsigned char *indata, int *prev)
}
int nmxp_log_process_data(NMXP_PROCESS_DATA *pd) {
int nmxp_data_log(NMXP_DATA_PROCESS *pd) {
nmxp_log(0, 0, "%12d %5s.%3s (%10.4f - %10.4f) nsamp: %04d, srate: %03d, len: %04d [%d, %d] (%d, %d, %d)\n",
pd->key,
pd->sta,
......@@ -128,257 +128,3 @@ int nmxp_log_process_data(NMXP_PROCESS_DATA *pd) {
return 0;
}
void nmxp_processDecompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
)
{
int32_t netInt = 0;
int32_t pKey = 0;
double pTime = 0.0;
int32_t pNSamp = 0;
int32_t pSampRate = 0;
int32_t *pDataPtr = 0;
int swap = 0;
int idx;
char *sta = 0; /* The station code */
char *chan = 0; /* The channel code */
/* copy the header contents into local fields and swap */
memcpy(&netInt, &buffer_data[0], 4);
pKey = ntohl(netInt);
if ( pKey != netInt ) { swap = 1; }
memcpy(&pTime, &buffer_data[4], 8);
if ( swap ) { swab8(&pTime); }
memcpy(&netInt, &buffer_data[12], 4);
pNSamp = ntohl(netInt);
memcpy(&netInt, &buffer_data[16], 4);
pSampRate = ntohl(netInt);
/* There should be (length_data - 20) bytes of data as 32-bit ints here */
pDataPtr = (int32_t *) &buffer_data[20];
/* Swap the data samples to host order */
for ( idx=0; idx < pNSamp; idx++ ) {
netInt = ntohl(pDataPtr[idx]);
pDataPtr[idx] = netInt;
}
/* Lookup the station and channel code */
sta = strdup(nmxp_chan_lookupName(pKey, channelList));
if ( (chan = strchr(sta, '.')) == NULL ) {
nmxp_log(1,0, "Channel name not in STN.CHAN format: %s\n", sta);
free(sta);
return;
}
*chan++ = '\0';
/* Send it off to the controlling SeedLink server */
/*
if ( send_raw_depoch(sta, chan, pTime, 0, 100, pDataPtr, pNSamp) < 0 ) {
nmxp_log(1,0, "cannot send data to seedlink: %s", strerror(errno));
exit(1);
}
*/
NMXP_PROCESS_DATA pd;
nmxp_init_process_data(&pd);
pd.key = pKey;
pd.sta = sta;
pd.chan = chan;
pd.packet_type = NMXP_MSG_DECOMPRESSED;
// pd.x0 = ;
// pd.seq_no = ;
pd.time = pTime;
pd.buffer = buffer_data;
pd.length = length_data;
pd.nSamp = pNSamp;
pd.pDataPtr = pDataPtr;
pd.sampRate = pSampRate;
if(func_processData) {
func_processData(&pd);
}
free(sta);
}
void nmxp_processCompressedDataFunc(char* buffer_data, int length_data, NMXP_CHAN_LIST *channelList,
int (*func_processData)(NMXP_PROCESS_DATA *pd)
)
{
int32_t pKey = 0;
double pTime = 0.0;
int32_t pNSamp = 0;
int32_t pSampRate = 0;
int32_t *pDataPtr = 0;
char *sta = 0; /* The station code */
char *chan = 0; /* The channel code */
const int nmx_rate_code_to_sample_rate[32] = {
0,1,2,5,10,20,40,50,
80,100,125,200,250,500,1000,25,
120,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0};
int nmx_oldest_sequence_number;
char nmx_hdr[25];
unsigned char nmx_ptype;
unsigned int nmx_seconds;
double nmx_seconds_double;
short int nmx_ticks, nmx_instr_id;
int nmx_seqno;
unsigned char nmx_sample_rate;
int nmx_x0;
int rate_code, chan_code, this_sample_rate;
int comp_bytecount;
unsigned char *indata;
#define MAX_OUTDATA 4096
int32_t outdata[MAX_OUTDATA];
int nout, i, k;
int prev_xn;
int my_order = get_my_wordorder();
nmxp_log(0, 1, "my_order is %d\n", my_order);
memcpy(&nmx_oldest_sequence_number, buffer_data, 4);
nmxp_log(0, 1, "Oldest sequence number = %d\n", nmx_oldest_sequence_number);
memcpy(nmx_hdr, buffer_data+4, 17);
/* Decode the Nanometrics packet header bundle. */
memcpy (&nmx_ptype, nmx_hdr+0, 1);
if ( (nmx_ptype & 0xf) == 9) {
/* Filler packet. Discard entire packet. */
nmxp_log (1,0, "Filler packet - discarding\n");
//m continue;
exit(0);
}
nmx_x0 = 0;
memcpy (&nmx_seconds, nmx_hdr+1, 4);
memcpy (&nmx_ticks, nmx_hdr+5, 2);
memcpy (&nmx_instr_id, nmx_hdr+7, 2);
memcpy (&nmx_seqno, nmx_hdr+9, 4);
memcpy (&nmx_sample_rate, nmx_hdr+13, 1);
memcpy (&nmx_x0, nmx_hdr+14, 3);
const unsigned int high_scale = 4096 * 2048;
const unsigned int high_scale_p = 4096 * 4096;
/* check if nmx_x0 is negative like as signed 3-byte int */
if( (nmx_x0 & high_scale) == high_scale) {
// nmxp_log(0, 0, "WARNING: changed nmx_x0, old value = %d\n", nmx_x0);
nmx_x0 -= high_scale_p;
}
if (my_order != SEED_LITTLE_ENDIAN) {
swab4 ((int *)&nmx_seconds);
swab2 (&nmx_ticks);
swab2 (&nmx_instr_id);
swab4 (&nmx_seqno);
nmx_x0 = nmx_x0 >> 8;
swab4 (&nmx_x0);
nmx_x0 = nmx_x0 >> 8;
}
nmx_seconds_double = (double) nmx_seconds + ( (double) nmx_ticks / 10000.0 );
rate_code = nmx_sample_rate>>3;
chan_code = nmx_sample_rate&7;
this_sample_rate = nmx_rate_code_to_sample_rate[rate_code];