Commit 7a23592c authored by Matteo Quintiliani's avatar Matteo Quintiliani

Added time-out management


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@425 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent 41e82755
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxp.h,v 1.40 2007-10-07 14:48:56 mtheo Exp $
* $Id: nmxp.h,v 1.41 2007-10-07 18:13:39 mtheo Exp $
*
*/
......@@ -1025,6 +1025,7 @@ typedef struct {
double last_sample_time;
int32_t max_pdlist_items;
double max_tollerable_latency;
int timeoutrecv;
int32_t n_pdlist;
NMXP_DATA_PROCESS **pdlist; /* Array for pd queue */
} NMXP_RAW_STREAM_DATA;
......@@ -1094,7 +1095,7 @@ int nmxp_sendAddTimeSeriesChannel(int isock, NMXP_CHAN_LIST_NET *channelList, in
* \retval NULL on error
*
*/
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code);
NMXP_DATA_PROCESS *nmxp_receiveData(int isock, NMXP_CHAN_LIST_NET *channelList, const char *network_code, int timeoutsec, int *recv_errno );
/*! \brief Sends the message "ConnectRequest" on a socket
......@@ -1189,9 +1190,10 @@ int nmxp_raw_stream_seq_no_compare(const void *a, const void *b);
*
* \param raw_stream_buffer pointer to NMXP_RAW_STREAM_DATA struct to initialize
* \param max_pdlist_items value of max number of items in array
* \param timeoutrecv value of time-out within receving packets
*
*/
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tollerable_latency);
void nmxp_raw_stream_init(NMXP_RAW_STREAM_DATA *raw_stream_buffer, int32_t max_tollerable_latency, int timeoutrecv);
/*! \brief Free fields inside a NMXP_RAW_STREAM_DATA structure
......
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxp_base.h,v 1.23 2007-10-07 14:11:23 mtheo Exp $
* $Id: nmxp_base.h,v 1.24 2007-10-07 18:13:39 mtheo Exp $
*
*/
......@@ -69,6 +69,8 @@ int nmxp_send_ctrl(int isock, void *buffer, int length);
* \param isock A descriptor referencing the socket.
* \param[out] buffer Data buffer.
* \param length Length in bytes.
* \param timeoutsec Time-out in seconds
* \param[out] errno errno value after recv()
*
* \warning Data buffer it has to be allocated before and big enough to contain length bytes!
*
......@@ -76,7 +78,7 @@ int nmxp_send_ctrl(int isock, void *buffer, int length);
* \retval NMXP_SOCKET_ERROR on error
*
*/
int nmxp_recv_ctrl(int isock, void *buffer, int length);
int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *recv_errno );
/*! \brief Sends header of a message.
......@@ -102,7 +104,7 @@ int nmxp_sendHeader(int isock, NMXP_MSG_CLIENT type, int32_t length);
* \retval NMXP_SOCKET_ERROR on error
*
*/
int nmxp_receiveHeader(int isock, NMXP_MSG_SERVER *type, int32_t *length);
int nmxp_receiveHeader(int isock, NMXP_MSG_SERVER *type, int32_t *lengthi, int timeoutsec, int *recv_errno );
/*! \brief Sends header and body of a message.
......@@ -132,7 +134,7 @@ int nmxp_sendMessage(int isock, NMXP_MSG_CLIENT type, void *buffer, int32_t leng
* \retval NMXP_SOCKET_ERROR on error
*
*/
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t *length);
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t *length, int timeoutsec, int *recv_errno );
/*! \brief Process Compressed Data message by function func_processData().
......
This diff is collapsed.
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxp_base.c,v 1.33 2007-10-07 14:11:23 mtheo Exp $
* $Id: nmxp_base.c,v 1.34 2007-10-07 18:13:39 mtheo Exp $
*
*/
......@@ -112,24 +112,37 @@ int nmxp_send_ctrl(int isock, void* buffer, int length)
}
int nmxp_recv_ctrl(int isock, void* buffer, int length)
int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *recv_errno )
{
int recvCount;
int recv_errno;
char recv_errno_str[200];
struct timeval timeo;
/*
struct timeval timeout;
socklen_t size_timeout = sizeof(timeout);
getsockopt(isock, SOL_SOCKET, SO_RCVTIMEO, &timeout, &size_timeout);
*/
if(timeoutsec > 0) {
timeo.tv_sec = timeoutsec;
timeo.tv_usec = 0;
if (setsockopt(isock, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)) < 0) {
perror("setsockopt SO_RCVTIMEO");
}
}
recvCount= recv(isock, (char*) buffer, length, MSG_WAITALL);
recv_errno = errno;
*recv_errno = errno;
timeo.tv_sec = 0;
timeo.tv_usec = 0;
if (setsockopt(isock, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)) < 0) {
perror("setsockopt SO_RCVTIMEO");
}
if (recvCount != length) {
switch(recv_errno) {
switch(*recv_errno) {
case EAGAIN : strcpy(recv_errno_str, "EAGAIN"); break;
case EBADF : strcpy(recv_errno_str, "EBADF"); break;
case ECONNREFUSED : strcpy(recv_errno_str, "ECONNREFUSED"); break;
......@@ -143,7 +156,7 @@ int nmxp_recv_ctrl(int isock, void* buffer, int length)
strcpy(recv_errno_str, "DEFAULT_NO_VALUE");
break;
}
nmxp_log(0, 1, "nmxp_recv_ctrl(): (recvCount != length) %d != %d - errno = %d (%s)\n", recvCount, length, recv_errno, recv_errno_str);
nmxp_log(0, 1, "nmxp_recv_ctrl(): (recvCount != length) %d != %d - errno = %d (%s)\n", recvCount, length, *recv_errno, recv_errno_str);
return NMXP_SOCKET_ERROR;
}
......@@ -164,12 +177,12 @@ int nmxp_sendHeader(int isock, NMXP_MSG_CLIENT type, int32_t length)
}
int nmxp_receiveHeader(int isock, NMXP_MSG_SERVER *type, int32_t *length)
int nmxp_receiveHeader(int isock, NMXP_MSG_SERVER *type, int32_t *length, int timeoutsec, int *recv_errno )
{
int ret ;
NMXP_MESSAGE_HEADER msg;
ret = nmxp_recv_ctrl(isock, &msg, sizeof(NMXP_MESSAGE_HEADER));
ret = nmxp_recv_ctrl(isock, &msg, sizeof(NMXP_MESSAGE_HEADER), timeoutsec, recv_errno);
*type = 0;
*length = 0;
......@@ -209,17 +222,17 @@ int nmxp_sendMessage(int isock, NMXP_MSG_CLIENT type, void *buffer, int32_t leng
}
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t *length) {
int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t *length, int timeoutsec, int *recv_errno ) {
int ret;
*buffer = NULL;
*length = 0;
ret = nmxp_receiveHeader(isock, type, length);
ret = nmxp_receiveHeader(isock, type, length, timeoutsec, recv_errno);
if( ret == NMXP_SOCKET_OK) {
if (*length > 0) {
*buffer = malloc(*length);
ret = nmxp_recv_ctrl(isock, *buffer, *length);
ret = nmxp_recv_ctrl(isock, *buffer, *length, 0, recv_errno);
if(*type == NMXP_MSG_ERROR) {
nmxp_log(1,0, "Received ErrorMessage: %s\n", *buffer);
......@@ -227,7 +240,11 @@ int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t
}
} else {
nmxp_log(1,0, "Error in nmxp_receiveMessage()\n");
if(*recv_errno != EAGAIN) {
nmxp_log(1,0, "Error in nmxp_receiveMessage()\n");
} else {
nmxp_log(NMXP_LOG_WARN, 0, "Timeout receveing in nmxp_receiveMessage()\n");
}
}
return ret;
......
......@@ -7,13 +7,14 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxptool.c,v 1.83 2007-10-07 14:11:03 mtheo Exp $
* $Id: nmxptool.c,v 1.84 2007-10-07 18:12:37 mtheo Exp $
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <nmxp.h>
......@@ -44,6 +45,7 @@
typedef struct {
int significant;
double last_time;
time_t last_time_call_raw_stream;
int32_t x_1;
NMXP_RAW_STREAM_DATA raw_stream_buffer;
} NMXPTOOL_CHAN_SEQ;
......@@ -84,8 +86,10 @@ int main (int argc, char **argv) {
int32_t connection_time;
int request_SOCKET_OK;
int i_chan, cur_chan = 0;
int to_cur_chan = 0;
int exitpdscondition;
int exitdapcondition;
time_t timeout_for_channel;
int span_interval = 10;
int time_to_sleep = 0;
......@@ -99,6 +103,8 @@ int main (int argc, char **argv) {
int32_t length;
int ret;
int recv_errno = 0;
char filename[500];
char station_code[20], channel_code[20], network_code[20];
......@@ -192,8 +198,9 @@ int main (int argc, char **argv) {
for(i_chan = 0; i_chan < channelList_subset->number; i_chan++) {
channelListSeq[i_chan].significant = 0;
channelListSeq[i_chan].last_time = 0.0;
channelListSeq[i_chan].last_time_call_raw_stream = 0;
channelListSeq[i_chan].x_1 = 0;
nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency);
nmxp_raw_stream_init(&(channelListSeq[i_chan].raw_stream_buffer), params.max_tolerable_latency, params.timeoutrecv);
}
#ifdef HAVE_LIBMSEED
......@@ -359,7 +366,7 @@ int main (int argc, char **argv) {
}
/* DAP Step 6: Receive Data until receiving a Ready message */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
while(ret == NMXP_SOCKET_OK && type != NMXP_MSG_READY) {
......@@ -427,7 +434,7 @@ int main (int argc, char **argv) {
}
/* Receive Data */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length, 0, &recv_errno);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
}
......@@ -574,21 +581,10 @@ int main (int argc, char **argv) {
#endif
#ifdef TEST_FOR_DOD
/*
struct timeval timeo;
timeo.tv_sec = 0;
timeo.tv_usec = 0;
if (setsockopt(naqssock, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)) < 0) {
perror("setsockopt SO_RCVTIMEO");
}
*/
#endif
while(exitpdscondition) {
/* Process Compressed or Decompressed Data */
pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK);
pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
/* Log contents of last packet */
if(params.flag_logdata) {
......@@ -596,13 +592,35 @@ int main (int argc, char **argv) {
}
if(pd) {
/* Set cur_chan */
cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
/* Set cur_chan */
cur_chan = nmxp_chan_lookupKeyIndex(pd->key, channelList_subset);
}
/* Manage Raw Stream */
if(params.stc == -1) {
nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
/* cur_char is computed only for pd != NULL */
if(pd) {
nmxp_raw_stream_manage(&(channelListSeq[cur_chan].raw_stream_buffer), pd, p_func_pd, n_func_pd);
channelListSeq[cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
}
/* Check timeout for other channels */
if(params.timeoutrecv > 0) {
to_cur_chan = 0;
while(to_cur_chan < channelList_subset->number) {
timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
if(channelListSeq[to_cur_chan].last_time_call_raw_stream != 0
&& timeout_for_channel >= params.timeoutrecv) {
nmxp_log(NMXP_LOG_WARN, 0, "Timeout for channel %s (%d sec.)\n",
channelList_subset->channel[to_cur_chan].name, timeout_for_channel);
nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
channelListSeq[to_cur_chan].last_time_call_raw_stream = nmxp_data_gmtime_now();
}
to_cur_chan++;
}
}
} else {
if(pd) {
......@@ -817,13 +835,18 @@ int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd) {
int nmxptool_print_seq_no(NMXP_DATA_PROCESS *pd) {
int ret = 0;
char str_time[200];
nmxp_data_to_str(str_time, pd->time);
nmxp_log(NMXP_LOG_NORM_NO, 0, "Process %s.%s %2d %d %d lat. %.1fs\n",
nmxp_log(NMXP_LOG_NORM_NO, 0, "Process %s.%s.%s %2d %d %d %s %dpts lat. %.1fs\n",
pd->network,
pd->station,
pd->channel,
pd->packet_type,
pd->seq_no,
pd->oldest_seq_no,
str_time,
pd->nSamp,
nmxp_data_latency(pd)
);
......
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxptool_getoptlong.c,v 1.33 2007-10-07 14:11:03 mtheo Exp $
* $Id: nmxptool_getoptlong.c,v 1.34 2007-10-07 18:12:37 mtheo Exp $
*
*/
......@@ -38,7 +38,8 @@ const NMXPTOOL_PARAMS NMXPTOOL_PARAMS_DEFAULT =
DEFAULT_RATE,
NULL,
DEFAULT_DELAY,
DEFAULT_MAX_TOLLERABLE_LATENCY,
DEFAULT_MAX_TOLERABLE_LATENCY,
DEFAULT_TIMEOUTRECV,
NULL,
0,
0,
......@@ -195,14 +196,19 @@ PDS arguments:\n\
>0 is for specified sample rate and decompressed data.\n\
-b, --buffered Request also recent packets into the past.\n\
-M, --maxlatency=SECs Max tolerable latency (default %d) [%d..%d].\n\
Usable only with Raw Stream --stc=-1.\n\
-T, --timeoutrecv=SECs Time-out receiving packets (default %d. No time-out) [%d..%d].\n\
-T is useful for retrieving Data On Demand.\n\
-M, -T are usable only with Raw Stream --stc=-1.\n\
\n\
",
DEFAULT_STC,
DEFAULT_RATE,
DEFAULT_MAX_TOLLERABLE_LATENCY,
DEFAULT_MAX_TOLLERABLE_LATENCY_MINIMUM,
DEFAULT_MAX_TOLLERABLE_LATENCY_MAXIMUM
DEFAULT_MAX_TOLERABLE_LATENCY,
DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM,
DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM,
DEFAULT_TIMEOUTRECV,
DEFAULT_TIMEOUTRECV_MINIMUM,
DEFAULT_TIMEOUTRECV_MAXIMUM
);
nmxptool_author_support();
......@@ -244,6 +250,7 @@ int nmxptool_getopt_long(int argc, char **argv, NMXPTOOL_PARAMS *params)
{"username", required_argument, 0, 'u'},
{"password", required_argument, 0, 'p'},
{"maxlatency", required_argument, 0, 'M'},
{"timeoutrecv", required_argument, 0, 'T'},
/* Following are flags */
{"verbose", no_argument, 0, 'v'},
{"logdata", no_argument, 0, 'g'},
......@@ -283,7 +290,7 @@ int nmxptool_getopt_long(int argc, char **argv, NMXPTOOL_PARAMS *params)
/* init params */
memcpy(params, &NMXPTOOL_PARAMS_DEFAULT, sizeof(NMXPTOOL_PARAMS_DEFAULT));
char optstr[100] = "H:P:D:C:N:L:S:R:s:e:t:d:u:p:M:vgbliwhV";
char optstr[100] = "H:P:D:C:N:L:S:R:s:e:t:d:u:p:M:T:vgbliwhV";
#ifdef HAVE_LIBMSEED
strcat(optstr, "m");
......@@ -400,6 +407,11 @@ int nmxptool_getopt_long(int argc, char **argv, NMXPTOOL_PARAMS *params)
nmxp_log(0, 0, "Max_tolerable_latency %d\n", params->max_tolerable_latency);
break;
case 'T':
params->timeoutrecv = atoi(optarg);
nmxp_log(0, 0, "Time-out receiving %d\n", params->timeoutrecv);
break;
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
case 'k':
params->flag_slink = 1;
......@@ -535,14 +547,26 @@ int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
ret = -1;
nmxp_log(NMXP_LOG_NORM_NO, 0, "<buffered> can not be used with options <start_time> and <end_time>.\n");
} else if( params->stc == -1
&& (params->max_tolerable_latency < DEFAULT_MAX_TOLLERABLE_LATENCY_MINIMUM ||
params->max_tolerable_latency > DEFAULT_MAX_TOLLERABLE_LATENCY_MAXIMUM)) {
&& (params->max_tolerable_latency < DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM ||
params->max_tolerable_latency > DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM)) {
ret = -1;
nmxp_log(NMXP_LOG_NORM_NO, 0, "<maxlatency> has to be within [%d..%d].\n",
DEFAULT_MAX_TOLLERABLE_LATENCY_MINIMUM,
DEFAULT_MAX_TOLLERABLE_LATENCY_MAXIMUM);
} else if( params->stc != -1 && params->max_tolerable_latency > 0) {
DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM,
DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM);
} else if( params->stc == -1
&& (params->timeoutrecv < DEFAULT_TIMEOUTRECV_MINIMUM ||
params->timeoutrecv > DEFAULT_TIMEOUTRECV_MAXIMUM)) {
if(params->timeoutrecv != 0) {
ret = -1;
nmxp_log(NMXP_LOG_NORM_NO, 0, "<timeoutrecv> has to be within [%d..%d] or equal to zero for not time-out.\n",
DEFAULT_TIMEOUTRECV_MINIMUM,
DEFAULT_TIMEOUTRECV_MAXIMUM);
}
} else if( params->stc != -1 && params->max_tolerable_latency > 0 ){
nmxp_log(NMXP_LOG_WARN, 0, "<maxlatency> ignored since not defined --stc=-1.\n");
} else if(params->stc != -1 && params->timeoutrecv > 0) {
params->timeoutrecv = 0;
nmxp_log(NMXP_LOG_WARN, 0, "<timeoutrecv> ignored since not defined --stc=-1.\n");
}
/*
......
......@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it
*
* $Id: nmxptool_getoptlong.h,v 1.18 2007-10-06 15:34:00 mtheo Exp $
* $Id: nmxptool_getoptlong.h,v 1.19 2007-10-07 18:12:37 mtheo Exp $
*
*/
......@@ -33,9 +33,13 @@
#define DEFAULT_DELAY_MAXIMUM 86400
#define DEFAULT_DELAY 0
#define DEFAULT_MAX_TOLLERABLE_LATENCY_MINIMUM 60
#define DEFAULT_MAX_TOLLERABLE_LATENCY_MAXIMUM 600
#define DEFAULT_MAX_TOLLERABLE_LATENCY 600
#define DEFAULT_MAX_TOLERABLE_LATENCY_MINIMUM 60
#define DEFAULT_MAX_TOLERABLE_LATENCY_MAXIMUM 600
#define DEFAULT_MAX_TOLERABLE_LATENCY 600
#define DEFAULT_TIMEOUTRECV 0
#define DEFAULT_TIMEOUTRECV_MINIMUM 10
#define DEFAULT_TIMEOUTRECV_MAXIMUM 300
/*! \brief Struct that stores information about parameter of the program */
......@@ -56,6 +60,7 @@ typedef struct {
char *plugin_slink;
int32_t delay;
int32_t max_tolerable_latency;
int32_t timeoutrecv;
char *ew_configuration_file;
int flag_writeseed;
int flag_verbose;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment