Commit b95d343d authored by Matteo Quintiliani's avatar Matteo Quintiliani

Raw Stream Management has been moved to libnmxp


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@286 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent bf7920ed
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxp.h,v 1.31 2007-09-07 07:08:30 mtheo Exp $
* $Id: nmxp.h,v 1.32 2007-09-12 12:37:05 mtheo Exp $
*
*/
......@@ -128,6 +128,17 @@ typedef struct {
int32_t end_time;
} NMXP_DATA_REQUEST;
#define NMXPTOOL_MAX_FUNC_PD 10
#define TIME_TOLLERANCE 0.001
typedef struct {
int32_t last_seq_no_sent;
double last_sample_time;
int32_t max_pdlist_items;
int32_t n_pdlist;
NMXP_DATA_PROCESS **pdlist; /* Array for pd queue */
} NMXP_RAW_STREAM_DATA;
/*! \brief Sends the message "Connect" on a socket
*
......@@ -275,5 +286,10 @@ NMXP_CHAN_LIST *nmxp_getAvailableChannelList(char * hostname, int portnum, NMXP_
*/
NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_DATATYPE datatype, int flag_request_channelinfo);
int nmxp_raw_stream_seq_no_compare(const void *a, const void *b);
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_pdlist_items);
void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer);
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd);
#endif
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxp.c,v 1.40 2007-09-07 07:08:30 mtheo Exp $
* $Id: nmxp.c,v 1.41 2007-09-12 12:37:05 mtheo Exp $
*
*/
......@@ -486,3 +486,213 @@ NMXP_META_CHAN_LIST *nmxp_getMetaChannelList(char * hostname, int portnum, NMXP_
return chan_list;
}
int nmxp_raw_stream_seq_no_compare(const void *a, const void *b)
{
int ret = 0;
NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
NMXP_DATA_PROCESS *pa = *ppa;
NMXP_DATA_PROCESS *pb = *ppb;
if(pa && pb) {
if(pa->seq_no > pb->seq_no) {
ret = 1;
} else if (pa->seq_no < pb->seq_no) {
ret = -1;
}
} else {
printf("Error pa and/or pb are NULL!\n");
}
return ret;
}
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_pdlist_items) {
int j;
raw_stream_buffer->last_seq_no_sent = -1;
raw_stream_buffer->last_sample_time = -1.0;
raw_stream_buffer->max_pdlist_items = max_pdlist_items;
raw_stream_buffer->n_pdlist = 0;
raw_stream_buffer->pdlist = (NMXP_DATA_PROCESS **) malloc (raw_stream_buffer->max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
raw_stream_buffer->pdlist[j] = NULL;
}
}
void nmxp_raw_stream_free(NMXP_RAW_STREAM_DATA *raw_stream_buffer) {
int j;
if(raw_stream_buffer->pdlist) {
for(j=0; j<raw_stream_buffer->max_pdlist_items; j++) {
if(raw_stream_buffer->pdlist[j]) {
free(raw_stream_buffer->pdlist[j]);
}
}
free(raw_stream_buffer->pdlist);
}
}
int nmxp_raw_stream_manage(NMXP_RAW_STREAM_DATA *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
int ret = 0;
int send_again = 1;
int seq_no_diff;
double time_diff;
double latency = 0.0;
int j=0, k=0;
int i_func_pd;
char str_time[200];
NMXP_DATA_PROCESS *pd = NULL;
/* Allocate memory for pd and copy a_pd */
pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
if(a_pd->length > 0) {
pd->buffer = malloc(pd->length);
memcpy(pd->buffer, a_pd->buffer, a_pd->length);
} else {
pd->buffer = NULL;
}
if(a_pd->nSamp * sizeof(int) > 0) {
pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
} else {
pd->pDataPtr = NULL;
}
/* First time */
if(p->last_seq_no_sent == -1) {
p->last_seq_no_sent = pd->seq_no - 1;
p->last_sample_time = pd->time;
nmxp_log(0, 1, "First time nmxp_raw_stream_manage().\n");
}
/* Add pd and sort array */
if(p->n_pdlist >= p->max_pdlist_items) {
/* Supposing p->pdlist is ordered,
* handle the first item and over write it.
*/
seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
time_diff = p->pdlist[0]->time - p->last_sample_time;
latency = nmxp_data_latency(p->pdlist[0]);
nmxp_data_to_str(str_time, p->pdlist[0]->time);
if( seq_no_diff > 0) {
nmxp_log(NMXP_LOG_WARN, 0, "Force handling packet %s.%s.%d.%d (%s - %.2f sec.) time_diff %.2fs lat. %.1fs!\n",
p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff, latency);
for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
(*p_func_pd[i_func_pd])(p->pdlist[0]);
}
p->last_seq_no_sent = (p->pdlist[0]->seq_no);
p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
} else {
/* It should not occur */
nmxp_log(NMXP_LOG_WARN, 0, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2fs lat. %.1fs\n",
p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
(double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff, latency);
}
/* Free handled packet */
if(p->pdlist[0]->buffer) {
free(p->pdlist[0]->buffer);
p->pdlist[0]->buffer = NULL;
}
if(p->pdlist[0]->pDataPtr) {
free(p->pdlist[0]->pDataPtr);
p->pdlist[0]->pDataPtr = NULL;
}
if(p->pdlist[0]) {
free(p->pdlist[0]);
p->pdlist[0] = NULL;
}
p->pdlist[0] = pd;
} else {
p->pdlist[p->n_pdlist++] = pd;
}
qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), nmxp_raw_stream_seq_no_compare);
// TODO Check for packet duplication in pd->pdlist
/* Print array, only for debugging */
if(p->n_pdlist > 1) {
int y = 0;
for(y=0; y < p->n_pdlist; y++) {
nmxp_log(0, 1, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
}
}
// TODO Condition for max tollerable latency
/* Manage array and execute func_pd() */
j=0;
send_again = 1;
while(send_again && j < p->n_pdlist) {
send_again = 0;
seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
time_diff = p->pdlist[j]->time - p->last_sample_time;
latency = nmxp_data_latency(p->pdlist[j]);
nmxp_data_to_str(str_time, p->pdlist[j]->time);
if(seq_no_diff <= 0) {
// Duplicated packets: Discarded
nmxp_log(NMXP_LOG_WARN, 0, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d time_diff=%.2fs lat %.1fs\n",
p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
(double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff, latency);
send_again = 1;
j++;
} else if(seq_no_diff == 1) {
for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
(*p_func_pd[i_func_pd])(p->pdlist[j]);
}
if(time_diff > TIME_TOLLERANCE || time_diff < -TIME_TOLLERANCE) {
nmxp_log(NMXP_LOG_WARN, 0, "time is not correct %s.%s seq_no_diff=%d time_diff=%.2fs ([%d] %d-%d) (%s - %.2f sec.) lat. %.1fs\n",
p->pdlist[j]->station, p->pdlist[j]->channel,
seq_no_diff, time_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent,
str_time, (double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate, latency);
}
p->last_seq_no_sent = p->pdlist[j]->seq_no;
p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
send_again = 1;
j++;
} else {
nmxp_log(NMXP_LOG_WARN, 0, "%s.%s seq_no_diff=%d ([%d] %d-%d) j=%2d p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2fs lat. %.1fs\n",
p->pdlist[j]->station, p->pdlist[j]->channel,
seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
str_time, (double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate, time_diff, latency);
}
}
/* Shift and free j handled elements */
if(j > 0) {
for(k=0; k < p->n_pdlist; k++) {
if(k < j) {
if(p->pdlist[k]->buffer) {
free(p->pdlist[k]->buffer);
p->pdlist[k]->buffer = NULL;
}
if(p->pdlist[k]->pDataPtr) {
free(p->pdlist[k]->pDataPtr);
p->pdlist[k]->pDataPtr = NULL;
}
if(p->pdlist[k]) {
free(p->pdlist[k]);
p->pdlist[k] = NULL;
}
}
if(k + j < p->n_pdlist) {
p->pdlist[k] = p->pdlist[k+j];
} else {
p->pdlist[k] = NULL;
}
}
p->n_pdlist = p->n_pdlist - j;
}
nmxp_log(0, 1, "j=%d p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
return ret;
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment