Commit 4d2f3444 authored by Matteo Quintiliani's avatar Matteo Quintiliani
Browse files

Formatting


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@239 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent 43b9056c
Loading
Loading
Loading
Loading
+185 −174
Original line number Original line Diff line number Diff line
@@ -19,8 +19,9 @@
#include "seedlink_plugin.h"
#include "seedlink_plugin.h"
#endif
#endif


static void clientShutdown(int sig);
#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK
static void clientDummyHandler(int sig);
#define GAP_TOLLERANCE 0.001
#define NMXPTOOL_MAX_FUNC_PD 10


/* Max number of packet I can tollerate to wait.
/* Max number of packet I can tollerate to wait.
 * It should be better to express it by time, for example 30 sec., 1 min., ecc....
 * It should be better to express it by time, for example 30 sec., 1 min., ecc....
@@ -42,6 +43,19 @@ typedef struct {
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
    NMXPTOOL_PD_RAW_STREAM raw_stream_buffer;
} NMXPTOOL_CHAN_SEQ;
} NMXPTOOL_CHAN_SEQ;


static void clientShutdown(int sig);
static void clientDummyHandler(int sig);

int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd);

int seq_no_compare(const void *a, const void *b);
// TODO func_pd has to become an array of functions
int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *));

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel);


/* Global variable for main program and handling terminitation program */
/* Global variable for main program and handling terminitation program */
NMXPTOOL_PARAMS params;
NMXPTOOL_PARAMS params;
int naqssock = 0;
int naqssock = 0;
@@ -57,177 +71,6 @@ MSRecord *msr_list_chan[MAX_N_CHAN];
#endif
#endif




#define CURRENT_NETWORK (params.network)? params.network : DEFAULT_NETWORK



int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}

int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);

    } else {
	nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
    }
    return ret;
}

int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
    /* TODO Set values */
    const int usec_correction = 0;
    const int timing_quality = 100;

    return send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
	    pd->pDataPtr, pd->nSamp);
}

// TODO func_pd has to become an array of functions
int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *)) {
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
    NMXP_DATA_PROCESS *pd = NULL;

    /* Allocate memory for pd and copy a_pd */
    pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
    memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
    if(a_pd->length > 0) {
	pd->buffer = malloc(pd->length);
	memcpy(pd->buffer, a_pd->buffer, a_pd->length);
    } else {
	pd->buffer = NULL;
    }
    if(a_pd->nSamp *  sizeof(int) > 0) {
	pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
    } else {
	pd->pDataPtr = NULL;
    }

    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
    }

    /* Add pd and sort array */
    if(p->n_pdlist + 1 >= NMXPTOOL_MAX_PDLIST_ITEMS) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
	    nmxp_log(0, 0, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	}
    }

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	nmxp_log(0, 0, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    send_again = 1;
	    j++;
	} else if(seq_no_diff >= NMXPTOOL_MAX_DIFF_SEQ_NO) {
	    // I have to drop packet with sequence number p->last_seq_no_sent+1
	    nmxp_log(LOG_WARN, 0, "Give up to wait packet %d!\n", p->last_seq_no_sent+1);
	    p->last_seq_no_sent++;
	    send_again = 1;
	}
    }

    /* Shift and free handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k + j < p->n_pdlist) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

    nmxp_log(0, 0, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);

    return ret;
}



#define GAP_TOLLERANCE 0.001

int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    int ret = 0;
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time1, time2);
	ret = 1;
    } else if (gap < -gap_tollerance) {
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time2, time1);
	ret = 1;
    }
    return ret;
}


int main (int argc, char **argv) {
int main (int argc, char **argv) {
    int32_t connection_time;
    int32_t connection_time;
    int request_SOCKET_OK;
    int request_SOCKET_OK;
@@ -595,7 +438,9 @@ int main (int argc, char **argv) {


    } else {
    } else {



	int (*p_func_pd[NMXPTOOL_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
	p_func_pd[0] = nmxptool_send_raw_depoch;
	p_func_pd[1] = nmxptool_write_miniseed;


	/* ************************************************************* */
	/* ************************************************************* */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
	/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
@@ -813,3 +658,169 @@ static void clientShutdown(int sig) {
/* Empty signal handler routine */
/* Empty signal handler routine */
static void clientDummyHandler(int sig) {
static void clientDummyHandler(int sig) {
}
}



int seq_no_compare(const void *a, const void *b)
{       
    int ret = 0;
    NMXP_DATA_PROCESS **ppa = (NMXP_DATA_PROCESS **) a;
    NMXP_DATA_PROCESS **ppb = (NMXP_DATA_PROCESS **) b;
    NMXP_DATA_PROCESS *pa = *ppa;
    NMXP_DATA_PROCESS *pb = *ppb;

    if(pa && pb) {
	if(pa->seq_no > pb->seq_no) {
	    ret = 1;
	} else if (pa->seq_no < pb->seq_no) {
	    ret = -1;
	}
    } else {
	printf("Error pa and/or pb are NULL!\n");
    }

    return ret;
}


int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
    int cur_chan;
    int ret = 0;
    if( (cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset)) != -1) {

	ret = nmxp_data_msr_pack(pd, &data_seed, msr_list_chan[cur_chan], channelListSeq[cur_chan].x_1);

    } else {
	nmxp_log(1, 0, "Key %d not found in channelList_subset!\n", pd->key);
    }
    return ret;
}

int nmxptool_send_raw_depoch(NMXP_DATA_PROCESS *pd) {
    /* TODO Set values */
    const int usec_correction = 0;
    const int timing_quality = 100;

    return send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
	    pd->pDataPtr, pd->nSamp);
}


int nmxptool_add_and_do_ordered(NMXPTOOL_PD_RAW_STREAM *p, NMXP_DATA_PROCESS *a_pd, int func_pd(NMXP_DATA_PROCESS *)) {
    int ret = 0;
    int send_again = 1;
    int seq_no_diff;
    int j=0, k=0;
    NMXP_DATA_PROCESS *pd = NULL;

    /* Allocate memory for pd and copy a_pd */
    pd = (NMXP_DATA_PROCESS *) malloc (sizeof(NMXP_DATA_PROCESS));
    memcpy(pd, a_pd, sizeof(NMXP_DATA_PROCESS));
    if(a_pd->length > 0) {
	pd->buffer = malloc(pd->length);
	memcpy(pd->buffer, a_pd->buffer, a_pd->length);
    } else {
	pd->buffer = NULL;
    }
    if(a_pd->nSamp *  sizeof(int) > 0) {
	pd->pDataPtr = (int *) malloc(a_pd->nSamp * sizeof(int));
	memcpy(pd->pDataPtr, a_pd->pDataPtr, a_pd->nSamp * sizeof(int));
    } else {
	pd->pDataPtr = NULL;
    }

    /* First time */
    if(p->last_seq_no_sent == -1) {
	p->last_seq_no_sent = pd->seq_no - 1;
	nmxp_log(0, 0, "First time.\n");
    }

    /* Add pd and sort array */
    if(p->n_pdlist + 1 >= NMXPTOOL_MAX_PDLIST_ITEMS) {
	/* Supposing p->pdlist is ordered,
	 * handle the first item and over write it.
	 */
	nmxp_log(LOG_WARN, 0, "Force handling packet %d!\n", p->pdlist[0]->seq_no);
	func_pd(p->pdlist[0]);
	p->last_seq_no_sent = (p->pdlist[0]->seq_no);
	p->pdlist[0] = pd;
    } else {
	p->pdlist[p->n_pdlist++] = pd;
    }
    qsort(p->pdlist, p->n_pdlist, sizeof(NMXP_DATA_PROCESS *), seq_no_compare);

    /* Print array, only for debugging */
    if(p->n_pdlist > 1) {
	int y = 0;
	for(y=0; y < p->n_pdlist; y++) {
	    nmxp_log(0, 0, "%02d pkt %d\n", y, p->pdlist[y]->seq_no);
	}
    }

    /* Manage array and execute func_pd() */
    j=0;
    send_again = 1;
    while(send_again  &&  j < p->n_pdlist) {
	send_again = 0;
	seq_no_diff = p->pdlist[j]->seq_no - p->last_seq_no_sent;
	nmxp_log(0, 0, "seq_no_diff=%d  j=%d  p->n_pdlist=%d (%d-%d)\n", seq_no_diff, j, p->n_pdlist, p->pdlist[j]->seq_no, p->last_seq_no_sent);
	if(seq_no_diff <= 0) {
	    // Duplicated packets: Discarded
	    nmxp_log(0, 0, "Packets %d discarded\n", p->pdlist[j]->seq_no);
	    send_again = 1;
	    j++;
	} else if(seq_no_diff == 1) {
	    func_pd(p->pdlist[j]);
	    p->last_seq_no_sent = p->pdlist[j]->seq_no;
	    send_again = 1;
	    j++;
	} else if(seq_no_diff >= NMXPTOOL_MAX_DIFF_SEQ_NO) {
	    // I have to drop packet with sequence number p->last_seq_no_sent+1
	    nmxp_log(LOG_WARN, 0, "Give up to wait packet %d!\n", p->last_seq_no_sent+1);
	    p->last_seq_no_sent++;
	    send_again = 1;
	}
    }

    /* Shift and free handled elements */
    if(j > 0) {
	for(k=0; k < p->n_pdlist; k++) {
	    if(k + j < p->n_pdlist) {
		if(p->pdlist[k]->buffer) {
		    free(p->pdlist[k]->buffer);
		    p->pdlist[k]->buffer = NULL;
		}
		if(p->pdlist[k]->pDataPtr) {
		    free(p->pdlist[k]->pDataPtr);
		    p->pdlist[k]->pDataPtr = NULL;
		}
		if(p->pdlist[k]) {
		    free(p->pdlist[k]);
		    p->pdlist[k] = NULL;
		}
		p->pdlist[k] = p->pdlist[k+j];
	    } else {
		p->pdlist[k] = NULL;
	    }
	}
	p->n_pdlist = p->n_pdlist - j;
    }

    nmxp_log(0, 0, "j=%d  p->n_pdlist=%d FINAL\n", j, p->n_pdlist);

    return ret;
}


int nmxptool_check_and_log_gap(double time1, double time2, const double gap_tollerance, const char *station, const char *channel) {
    int ret = 0;
    double gap = time1 - time2 ;
    if(gap > gap_tollerance) {
	nmxp_log(1, 0, "Gap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time1, time2);
	ret = 1;
    } else if (gap < -gap_tollerance) {
	nmxp_log(1, 0, "Overlap %.2f sec. for %s.%s from %.2f to %.2f!\n", gap, station, channel, time2, time1);
	ret = 1;
    }
    return ret;
}