Commit 4242692b authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Improved function nmxptool_manage_raw_stream()


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@240 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent 4d2f3444
Loading
Loading
Loading
Loading
+57 −32
Original line number Diff line number Diff line
@@ -50,8 +50,7 @@ int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);

int seq_no_compare(const void *a, const void *b);
// TODO func_pd has to become an array of functions
int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *));
int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd);

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);

@@ -438,9 +437,25 @@ int main (int argc, char **argv) {

    } else {

	int n_func_pd = 0;
	int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
	p_func_pd[0] = nmxptool_send_raw_depoch;
	p_func_pd[1] = nmxptool_write_miniseed;

	if(params.stc == -1) {

#ifdef HAVE_LIBMSEED
	    /* Write Mini-SEED record */
	    if(params.flag_writeseed) {
		p_func_pd[n_func_pd++] = nmxptool_write_miniseed;
	    }
#endif

#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
	    /* Send data to SeedLink Server */
	    if(params.flag_slink) {
		p_func_pd[n_func_pd++] = nmxptool_send_raw_depoch;
	    }
#endif
	}

	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
@@ -508,6 +523,11 @@ int main (int argc, char **argv) {
	    /* Set cur_chan */
	    cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);

	    /* Manage Raw Stream */
	    if(params.stc == -1) {
		nmxptool_manage_raw_stream(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
	    } else {

		/* Management of gaps */
		if(!channelListSeq[cur_chan].significant && pd->nSamp > 0) {
		    channelListSeq[cur_chan].significant = 1;
@@ -523,7 +543,6 @@ int main (int argc, char **argv) {
		    channelListSeq[cur_chan].last_time = pd->time + ((double) pd->nSamp / (double) pd->sampRate);
		}

	    nmxptool_add_and_do_ordered(&(channelListSeq[cur_chan].raw_stream_buffer), pd, nmxp_data_log);

#ifdef HAVE_LIBMSEED
		/* Write Mini-SEED record */
@@ -538,6 +557,7 @@ int main (int argc, char **argv) {
		    nmxptool_send_raw_depoch(pd);
		}
#endif
	    }

	    /* Store x_1 */
	    if(pd->nSamp > 0) {
@@ -706,11 +726,12 @@ int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
}


int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *)) {
int nmxptool_manage_raw_stream(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *), int n_func_pd) {
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
    int i_func_pd;
    NMXP_DATA_PROCESS *pd = NULL;

    /* Allocate memory for pd and copy a_pd */
@@ -732,7 +753,7 @@ int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_
    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
	nmxp_log(0, 1, "First time nmxptool_manage_raw_stream().\n");
    }

    /* Add pd and sort array */
@@ -741,7 +762,9 @@ int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
	    (*p_func_pd[i_func_pd])(p->pdlist[0]);
	}
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
@@ -753,7 +776,7 @@ int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
	    nmxp_log(0, 0, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	    nmxp_log(0, 1, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	}
    }

@@ -763,14 +786,16 @@ int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	nmxp_log(0, 0, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
	nmxp_log(0, 1, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	    nmxp_log(LOG_WARN, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
	    for(i_func_pd=0; i_func_pd<n_func_pd; i_func_pd++) {
		(*p_func_pd[i_func_pd])(p->pdlist[j]);
	    }
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    send_again = 1;
	    j++;
@@ -806,7 +831,7 @@ int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_
	p->n_pdlist = p->n_pdlist - j;
    }

    nmxp_log(0, 0, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);
    nmxp_log(0, 1, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);

    return ret;
}