Commit d44e7ba3 authored by Matteo Quintiliani's avatar Matteo Quintiliani

Timeout management


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@455 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent a5223be1
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy * Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it * quintiliani@ingv.it
* *
* $Id: nmxp_base.c,v 1.37 2007-10-24 10:05:33 mtheo Exp $ * $Id: nmxp_base.c,v 1.38 2007-10-25 09:11:34 mtheo Exp $
* *
*/ */
...@@ -142,7 +142,7 @@ int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *rec ...@@ -142,7 +142,7 @@ int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *rec
cc = recv(isock, buffer_char + recvCount, length - recvCount, 0); cc = recv(isock, buffer_char + recvCount, length - recvCount, 0);
*recv_errno = errno; *recv_errno = errno;
if(cc <= 0) { if(cc <= 0) {
nmxp_log(1, 0, "nmxp_recv_ctrl(): (cc=%d <= 0) errno=%d recvCount=%d length=%d\n", cc, *recv_errno, recvCount, length); nmxp_log(NMXP_LOG_ERR, 0, "nmxp_recv_ctrl(): (cc=%d <= 0) errno=%d recvCount=%d length=%d\n", cc, *recv_errno, recvCount, length);
} else { } else {
recvCount += cc; recvCount += cc;
} }
...@@ -171,9 +171,11 @@ int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *rec ...@@ -171,9 +171,11 @@ int nmxp_recv_ctrl(int isock, void *buffer, int length, int timeoutsec, int *rec
strcpy(recv_errno_str, "DEFAULT_NO_VALUE"); strcpy(recv_errno_str, "DEFAULT_NO_VALUE");
break; break;
} }
nmxp_log(0, 1, "nmxp_recv_ctrl(): recvCount=%d length=%d (cc=%d) errno=%d (%s)\n", recvCount, length, cc, *recv_errno, recv_errno_str); nmxp_log(NMXP_LOG_ERR, 0, "nmxp_recv_ctrl(): recvCount=%d length=%d (cc=%d) errno=%d (%s)\n", recvCount, length, cc, *recv_errno, recv_errno_str);
return NMXP_SOCKET_ERROR; if(recvCount != length || *recv_errno != EAGAIN) {
return NMXP_SOCKET_ERROR;
}
} }
return NMXP_SOCKET_OK; return NMXP_SOCKET_OK;
...@@ -244,21 +246,25 @@ int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t ...@@ -244,21 +246,25 @@ int nmxp_receiveMessage(int isock, NMXP_MSG_SERVER *type, void **buffer, int32_t
ret = nmxp_receiveHeader(isock, type, length, timeoutsec, recv_errno); ret = nmxp_receiveHeader(isock, type, length, timeoutsec, recv_errno);
if( ret == NMXP_SOCKET_OK) { if( ret == NMXP_SOCKET_OK ) {
if (*length > 0) { if (*length > 0) {
*buffer = malloc(*length); *buffer = malloc(*length);
ret = nmxp_recv_ctrl(isock, *buffer, *length, 0, recv_errno); ret = nmxp_recv_ctrl(isock, *buffer, *length, 0, recv_errno);
if(*type == NMXP_MSG_ERROR) { if(*type == NMXP_MSG_ERROR) {
nmxp_log(1,0, "Received ErrorMessage: %s\n", *buffer); nmxp_log(NMXP_LOG_ERR, 0, "Received ErrorMessage: %s\n", *buffer);
} else {
nmxp_log(NMXP_LOG_WARN, 1, "Received message type: %d length=%d\n", *type, *length);
} }
} }
} else { }
if(*recv_errno != EAGAIN) {
nmxp_log(1,0, "Error in nmxp_receiveMessage()\n"); if(*recv_errno != 0) {
} else { if(*recv_errno == EAGAIN) {
nmxp_log(NMXP_LOG_WARN, 0, "Timeout receveing in nmxp_receiveMessage()\n"); nmxp_log(NMXP_LOG_WARN, 0, "Timeout receveing in nmxp_receiveMessage()\n");
} else {
nmxp_log(NMXP_LOG_ERR, 0, "Error in nmxp_receiveMessage()\n");
} }
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Istituto Nazionale di Geofisica e Vulcanologia - Italy * Istituto Nazionale di Geofisica e Vulcanologia - Italy
* quintiliani@ingv.it * quintiliani@ingv.it
* *
* $Id: nmxptool.c,v 1.85 2007-10-24 10:05:02 mtheo Exp $ * $Id: nmxptool.c,v 1.86 2007-10-25 09:11:18 mtheo Exp $
* *
*/ */
...@@ -53,6 +53,8 @@ typedef struct { ...@@ -53,6 +53,8 @@ typedef struct {
static void clientShutdown(int sig); static void clientShutdown(int sig);
static void clientDummyHandler(int sig); static void clientDummyHandler(int sig);
static void flushing_raw_data_stream();
#ifdef HAVE_LIBMSEED #ifdef HAVE_LIBMSEED
int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd); int nmxptool_write_miniseed(NMXP_DATA_PROCESS *pd);
#endif #endif
...@@ -74,6 +76,9 @@ FILE *outfile = NULL; ...@@ -74,6 +76,9 @@ FILE *outfile = NULL;
NMXP_CHAN_LIST *channelList = NULL; NMXP_CHAN_LIST *channelList = NULL;
NMXP_CHAN_LIST_NET *channelList_subset = NULL; NMXP_CHAN_LIST_NET *channelList_subset = NULL;
NMXPTOOL_CHAN_SEQ *channelListSeq = NULL; NMXPTOOL_CHAN_SEQ *channelListSeq = NULL;
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
#ifdef HAVE_LIBMSEED #ifdef HAVE_LIBMSEED
/* Mini-SEED variables */ /* Mini-SEED variables */
...@@ -488,9 +493,6 @@ int main (int argc, char **argv) { ...@@ -488,9 +493,6 @@ int main (int argc, char **argv) {
} else { } else {
int n_func_pd = 0;
int (*p_func_pd[NMXP_MAX_FUNC_PD]) (NMXP_DATA_PROCESS *);
if(params.stc == -1) { if(params.stc == -1) {
...@@ -587,6 +589,12 @@ int main (int argc, char **argv) { ...@@ -587,6 +589,12 @@ int main (int argc, char **argv) {
pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno); pd = nmxp_receiveData(naqssock, channelList_subset, NETCODE_OR_CURRENT_NETWORK, params.timeoutrecv, &recv_errno);
if(recv_errno == 0) { if(recv_errno == 0) {
// TODO
exitpdscondition = 1;
} else {
nmxp_log(1, 0, "Error receiving data. pd=%p recv_errno=%d\n", pd, recv_errno);
exitpdscondition = 0;
}
/* Log contents of last packet */ /* Log contents of last packet */
if(params.flag_logdata) { if(params.flag_logdata) {
...@@ -609,6 +617,7 @@ int main (int argc, char **argv) { ...@@ -609,6 +617,7 @@ int main (int argc, char **argv) {
/* Check timeout for other channels */ /* Check timeout for other channels */
if(params.timeoutrecv > 0) { if(params.timeoutrecv > 0) {
exitpdscondition = 1;
to_cur_chan = 0; to_cur_chan = 0;
while(to_cur_chan < channelList_subset->number) { while(to_cur_chan < channelList_subset->number) {
timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream; timeout_for_channel = nmxp_data_gmtime_now() - channelListSeq[to_cur_chan].last_time_call_raw_stream;
...@@ -671,13 +680,6 @@ int main (int argc, char **argv) { ...@@ -671,13 +680,6 @@ int main (int argc, char **argv) {
} }
} }
// TODO
exitpdscondition = 1;
} else {
nmxp_log(1, 0, "Error receiving data.\n");
exitpdscondition = 0;
}
#ifdef HAVE_EARTHWORMOBJS #ifdef HAVE_EARTHWORMOBJS
if(params.ew_configuration_file) { if(params.ew_configuration_file) {
...@@ -694,6 +696,9 @@ int main (int argc, char **argv) { ...@@ -694,6 +696,9 @@ int main (int argc, char **argv) {
#endif #endif
} }
/* Flush raw data stream for each channel */
flushing_raw_data_stream();
#ifdef HAVE_EARTHWORMOBJS #ifdef HAVE_EARTHWORMOBJS
if(params.ew_configuration_file) { if(params.ew_configuration_file) {
...@@ -752,12 +757,28 @@ int main (int argc, char **argv) { ...@@ -752,12 +757,28 @@ int main (int argc, char **argv) {
static void flushing_raw_data_stream() {
int to_cur_chan;
/* Flush raw data stream for each channel */
if(params.stc == -1) {
to_cur_chan = 0;
while(to_cur_chan < channelList_subset->number) {
nmxp_log(NMXP_LOG_WARN, 0, "Flushing data for channel %s\n",
channelList_subset->channel[to_cur_chan].name);
nmxp_raw_stream_manage(&(channelListSeq[to_cur_chan].raw_stream_buffer), NULL, p_func_pd, n_func_pd);
to_cur_chan++;
}
}
}
/* Do any needed cleanup and exit */ /* Do any needed cleanup and exit */
static void clientShutdown(int sig) { static void clientShutdown(int sig) {
nmxp_log(0, 0, "Program interrupted!\n"); nmxp_log(0, 0, "Program interrupted!\n");
flushing_raw_data_stream();
if(params.flag_writefile && outfile) { if(params.flag_writefile && outfile) {
/* Close output file */ /* Close output file */
fclose(outfile); fclose(outfile);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment