Commit f2d17e93 authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Dynamic management of raw_stream_buffer


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@266 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent 5b161e41
Loading
Loading
Loading
Loading
+47 −16
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@
 * 	Istituto Nazionale di Geofisica e Vulcanologia - Italy
 *	quintiliani@ingv.it
 *
 * $Id: nmxptool.c,v 1.50 2007-09-07 20:20:04 mtheo Exp $
 * $Id: nmxptool.c,v 1.51 2007-09-10 12:50:25 mtheo Exp $
 *
 */

@@ -36,12 +36,11 @@
#define GAP_TOLLERANCE 0.001
#define NMXPTOOL_MAX_FUNC_PD 10

#define NMXPTOOL_MAX_PDLIST_ITEMS 40

typedef struct {
    int32_t last_seq_no_sent;
    double last_sample_time;
    int32_t n_pdlist;
    NMXP_DATA_PROCESS *pdlist[NMXPTOOL_MAX_PDLIST_ITEMS];
    NMXP_DATA_PROCESS **pdlist; /* Array for pd queue */
} NMXPTOOL_PD_RAW_STREAM;

typedef struct {
@@ -176,8 +175,10 @@ int main (int argc, char **argv) {
	    channelListSeq[i_chan].last_time = 0.0;
	    channelListSeq[i_chan].x_1 = 0;
	    channelListSeq[i_chan].raw_stream_buffer.last_seq_no_sent = -1;
	    channelListSeq[i_chan].raw_stream_buffer.last_sample_time = -1.0;
	    channelListSeq[i_chan].raw_stream_buffer.n_pdlist = 0;
	    for(j=0; j<NMXPTOOL_MAX_PDLIST_ITEMS; j++) {
	    channelListSeq[i_chan].raw_stream_buffer.pdlist = (NMXP_DATA_PROCESS **) malloc (params.max_pdlist_items * sizeof(NMXP_DATA_PROCESS *));
	    for(j=0; j<params.max_pdlist_items; j++) {
		channelListSeq[i_chan].raw_stream_buffer.pdlist[j] = NULL;
	    }
	}
@@ -619,6 +620,17 @@ int main (int argc, char **argv) {
	}
#endif

	for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	    if(channelListSeq[i_chan].raw_stream_buffer.pdlist) {
		for(j=0; j<params.max_pdlist_items; j++) {
		    if(channelListSeq[i_chan].raw_stream_buffer.pdlist[j]) {
			free(channelListSeq[i_chan].raw_stream_buffer.pdlist[j]);
		    }
		}
		free(channelListSeq[i_chan].raw_stream_buffer.pdlist);
	    }
	}

	if(channelListSeq) {
	    free(channelListSeq);
	}
@@ -638,6 +650,8 @@ int main (int argc, char **argv) {

/* Do any needed cleanup and exit */
static void clientShutdown(int sig) {
    int j;

    nmxp_log(0, 0, "Program interrupted!\n");

    if(params.flag_writefile  &&  outfile) {
@@ -675,6 +689,17 @@ static void clientShutdown(int sig) {
    }
#endif

    for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
	if(channelListSeq[i_chan].raw_stream_buffer.pdlist) {
	    for(j=0; j<params.max_pdlist_items; j++) {
		if(channelListSeq[i_chan].raw_stream_buffer.pdlist[j]) {
		    free(channelListSeq[i_chan].raw_stream_buffer.pdlist[j]);
		}
	    }
	    free(channelListSeq[i_chan].raw_stream_buffer.pdlist);
	}
    }

    if(channelListSeq) {
	free(channelListSeq);
    }
@@ -757,6 +782,7 @@ int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_p
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    double time_diff;
    int j=0, k=0;
    int i_func_pd;
    char str_time[200];
@@ -781,32 +807,33 @@ int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_p
    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	p->last_sample_time = pd->time;
	nmxp_log(0, 1, "First time nmxptool_manage_raw_stream().\n");
    }

    // TODO Condition for max tollerable latency

    /* Add pd and sort array */
    if(p->n_pdlist >= NMXPTOOL_MAX_PDLIST_ITEMS) {
    if(p->n_pdlist >= params.max_pdlist_items) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	seq_no_diff = p->pdlist[0]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[0]->time - p->last_sample_time;
	if( seq_no_diff > 0) {
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "Force handling packet %s.%s.%d.%d (%s - %f sec.)!\n",
	    nmxp_log(NMXP_LOG_WARN, 0, "Force handling packet %s.%s.%d.%d (%s - %.2f sec.)  time_diff %.2f sec.!\n",
		    p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
		    (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate);
		    (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, time_diff);
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[0]);
	    }
	    p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	    p->last_sample_time = (p->pdlist[0]->time + ((double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate ));
	} else {
	    /* It should not occur */
	    nmxp_data_to_str(str_time, p->pdlist[0]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d.\n",
	    nmxp_log(NMXP_LOG_WARN, 0, "NOT OCCUR! Packets %s.%s.%d.%d (%s - %.2f sec.) discarded, seq_no_diff=%d time_diff %.2f sec.\n",
		    p->pdlist[0]->station, p->pdlist[0]->channel, p->pdlist[0]->seq_no, p->pdlist[0]->packet_type, str_time,
		    (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff);
		    (double) p->pdlist[0]->nSamp / (double) p->pdlist[0]->sampRate, seq_no_diff, time_diff);
	}

	/* Free handled packet */
@@ -839,18 +866,21 @@ int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_p
	}
    }

    // TODO Condition for max tollerable latency

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	time_diff = p->pdlist[j]->time - p->last_sample_time;
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    nmxp_data_to_str(str_time, p->pdlist[j]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d.\n",
	    nmxp_log(NMXP_LOG_WARN, 0, "Packets %s.%s.%d.%d (%s - %f sec.) discarded, seq_no_diff=%d  time_diff=%.2f sec.\n",
		    p->pdlist[j]->station, p->pdlist[j]->channel, p->pdlist[j]->seq_no, p->pdlist[j]->packet_type, str_time,
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff);
		    (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, seq_no_diff, time_diff);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
@@ -858,14 +888,15 @@ int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_p
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    p->last_sample_time = (p->pdlist[j]->time + ((double) p->pdlist[j]->nSamp / (double) p->pdlist[j]->sampRate ));
	    send_again = 1;
	    j++;
	} else {
	    nmxp_data_to_str(str_time, p->pdlist[j]->time);
	    nmxp_log(NMXP_LOG_WARN, 0, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %f sec.)\n",
	    nmxp_log(NMXP_LOG_WARN, 0, "%s.%s seq_no_diff=%d ([%d] %d-%d)  j=%2d  p->n_pdlist=%2d (%s - %.2f sec.) time_diff=%.2f sec.\n",
		    p->pdlist[j]->station, p->pdlist[j]->channel, 
		    seq_no_diff, p->pdlist[j]->packet_type, p->pdlist[j]->seq_no, p->last_seq_no_sent, j, p->n_pdlist,
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate);
		    str_time, (double) p->pdlist[j]->nSamp /  (double) p->pdlist[j]->sampRate, time_diff);
	}
    }