Commit 6752f237 authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

A preliminary mechanism to manage Raw Stream


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@230 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent d1c6c684
Loading
Loading
Loading
Loading
+110 −0
Original line number Diff line number Diff line
@@ -22,10 +22,113 @@

#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK


/* Max number of packet I can tollerate to wait.
 * It should be better to express it by time, for example 30 sec., 1 min., ecc....
 */
#define NMXPTOOL_MAX_DIFF_SEQ_NO 40
/* I can suppose in the worst case I can have the double number of packets I can tollerate to wait */
#define NMXPTOOL_MAX_PDLIST_ITEMS NMXPTOOL_MAX_DIFF_SEQ_NO*2

typedef struct {
    int32_t last_seq_no_sent;
    int32_t n_pdlist;
    NMXP_DATA_PROCESS *pdlist[NMXPTOOL_MAX_PDLIST_ITEMS];
} NMXPTOOL_PD_RAW_STREAM;

int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *pd, int func_pd(NMXP_DATA_PROCESS *)) {
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;

    // First time
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
    }

    if(p->n_pdlist + 1 >= NMXPTOOL_MAX_PDLIST_ITEMS) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	nmxp_log(0, 0, "seq_no_diff %d\n", seq_no_diff);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    send_again = 1;
	    j++;
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
	    p->last_seq_no_sent = (p->pdlist[j]->seq_no);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff >= NMXPTOOL_MAX_DIFF_SEQ_NO) {
	    // I have to drop packet with sequence number p->last_seq_no_sent+1
	    nmxp_log(LOG_WARN, 0, "Give up to wait packet %d!\n", p->last_seq_no_sent+1);
	    p->last_seq_no_sent++;
	    send_again = 1;
	}
    }

    for(k=0; k < p->n_pdlist; k++) {
	if(k + j < p->n_pdlist) {
	    if(p->pdlist[k]->buffer) {
		free(p->pdlist[k]->buffer);
		p->pdlist[k]->buffer = NULL;
	    }
	    p->pdlist[k] = p->pdlist[k+j];
	} else {
	    p->pdlist[k] = NULL;
	}
    }
    p->n_pdlist = p->n_pdlist - j;

    nmxp_log(0, 0, "p->n_pdlist %d FINAL\n", p->n_pdlist);

    return ret;
}

typedef struct {
    int significant;
    double last_time;
    int32_t x_1;
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
} NMXPTOOL_CHAN_SEQ;


@@ -67,6 +170,7 @@ int main (int argc, char **argv) {
    int32_t connection_time;
    int request_SOCKET_OK;
    int i_chan, cur_chan;
    int j;
    int exitpdscondition;
    int exitdapcondition;

@@ -158,6 +262,11 @@ int main (int argc, char **argv) {
	    channelListSeq[i_chan].significant = 0;
	    channelListSeq[i_chan].last_time = 0.0;
	    channelListSeq[i_chan].x_1 = 0;
	    channelListSeq[i_chan].raw_stream_buffer.last_seq_no_sent = -1;
	    channelListSeq[i_chan].raw_stream_buffer.n_pdlist = 0;
	    for(j=0; j<NMXPTOOL_MAX_PDLIST_ITEMS; j++) {
		channelListSeq[i_chan].raw_stream_buffer.pdlist[j] = NULL;
	    }
	}

#ifdef HAVE_LIBMSEED
@@ -513,6 +622,7 @@ int main (int argc, char **argv) {
		channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
	    }

	    nmxptool_add_and_do_ordered(&(channelListSeq[cur_chan].raw_stream_buffer), pd, nmxp_data_log);

#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */