Commit 30cda35f authored by Matteo Quintiliani's avatar Matteo Quintiliani

Added PRIVATE DATA STREAM subscription protocol


git-svn-id: file:///home/quintiliani/svncopy/nmxptool/trunk@108 3cd66e75-5955-46cb-a940-c26e5fc5497d
parent e9a47714
......@@ -81,172 +81,230 @@ int main (int argc, char **argv) {
}
/* ************************************************************** */
/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
/* ************************************************************** */
/* TODO condition starting DAP */
if(params.start_time != 0 && params.end_time != 0) {
/* DAP Step 1: Open a socket */
if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
nmxp_log(1, 0, "Error opening socket!\n");
return 1;
}
/* ************************************************************** */
/* Start subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
/* ************************************************************** */
/* DAP Step 2: Read connection time */
if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error reading connection time from server!\n");
return 1;
}
/* DAP Step 1: Open a socket */
if( (naqssock = nmxp_openSocket(params.hostname, params.portnumberdap)) == NMXP_SOCKET_ERROR) {
nmxp_log(1, 0, "Error opening socket!\n");
return 1;
}
/* DAP Step 3: Send a ConnectRequest */
if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error sending connect request!\n");
return 1;
}
/* DAP Step 2: Read connection time */
if(nmxp_readConnectionTime(naqssock, &connection_time) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error reading connection time from server!\n");
return 1;
}
/* DAP Step 4: Wait for a Ready message */
if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error waiting Ready message!\n");
return 1;
}
/* DAP Step 3: Send a ConnectRequest */
if(nmxp_sendConnectRequest(naqssock, params.datas_username, params.datas_password, connection_time) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error sending connect request!\n");
return 1;
}
/* DAP Step 4: Wait for a Ready message */
if(nmxp_waitReady(naqssock) != NMXP_SOCKET_OK) {
nmxp_log(1, 0, "Error waiting Ready message!\n");
return 1;
}
/* Start loop for sending requests */
i_chan=0;
request_SOCKET_OK = NMXP_SOCKET_OK;
/* Start loop for sending requests */
i_chan=0;
request_SOCKET_OK = NMXP_SOCKET_OK;
while(request_SOCKET_OK == NMXP_SOCKET_OK && i_chan < channelList_subset->number) {
while(request_SOCKET_OK == NMXP_SOCKET_OK && i_chan < channelList_subset->number) {
/* DAP Step 5: Send Data Request */
request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
/* DAP Step 5: Send Data Request */
request_SOCKET_OK = nmxp_sendDataRequest(naqssock, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
if(request_SOCKET_OK == NMXP_SOCKET_OK) {
if(request_SOCKET_OK == NMXP_SOCKET_OK) {
if(params.flag_writefile) {
/* Open output file */
sprintf(filename, "%s.%s.%d.%d.%d.nmx", (params.network)? params.network : DEFAULT_NETWORK, channelList_subset->channel[i_chan].name, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
outfile = fopen(filename, "w");
if(params.flag_writefile) {
/* Open output file */
sprintf(filename, "%s.%s.%d.%d.%d.nmx", (params.network)? params.network : DEFAULT_NETWORK, channelList_subset->channel[i_chan].name, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
if(!outfile) {
nmxp_log(1, 0, "Can not to open file %s!", filename);
outfile = fopen(filename, "w");
if(!outfile) {
nmxp_log(1, 0, "Can not to open file %s!", filename);
}
}
}
#ifdef HAVE_LIBMSEED
if(params.flag_writeseed) {
/* Open output Mini-SEED file */
sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed", (params.network)? params.network : DEFAULT_NETWORK, channelList_subset->channel[i_chan].name, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
if(!data_seed.outfile_mseed) {
nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
}
}
#endif
if(params.flag_writeseed) {
/* Open output Mini-SEED file */
sprintf(data_seed.filename_mseed, "%s.%s.%d.%d.%d.miniseed", (params.network)? params.network : DEFAULT_NETWORK, channelList_subset->channel[i_chan].name, channelList_subset->channel[i_chan].key, params.start_time, params.end_time);
if(params.flag_writefile && outfile) {
/* Compute SNCL line */
data_seed.outfile_mseed = fopen(data_seed.filename_mseed, "w");
/* Separate station_code and channel_code */
station_code = NULL;
channel_code = NULL;
if(!data_seed.outfile_mseed) {
nmxp_log(1, 0, "Can not to open file %s!", data_seed.filename_mseed);
}
}
#endif
station_code = strdup(channelList_subset->channel[i_chan].name);
if ( (channel_code = strchr(station_code, '.')) == NULL ) {
nmxp_log(1,0, "Channel name not in STA.CHAN format: %s\n", station_code);
}
if(channel_code) {
*channel_code++ = '\0';
}
if(params.flag_writefile && outfile) {
/* Compute SNCL line */
/* Separate station_code and channel_code */
station_code = NULL;
channel_code = NULL;
station_code = strdup(channelList_subset->channel[i_chan].name);
if ( (channel_code = strchr(station_code, '.')) == NULL ) {
nmxp_log(1,0, "Channel name not in STA.CHAN format: %s\n", station_code);
}
if(channel_code) {
*channel_code++ = '\0';
}
if(station_code) {
free(station_code);
}
if(station_code) {
free(station_code);
/* Write SNCL line */
fprintf(outfile, "%s.%s.%s.%s\n", station_code, (params.network)? params.network : DEFAULT_NETWORK, channel_code, (params.location)? params.location : "");
}
/* Write SNCL line */
fprintf(outfile, "%s.%s.%s.%s\n", station_code, (params.network)? params.network : DEFAULT_NETWORK, channel_code, (params.location)? params.location : "");
}
/* DAP Step 6: Receive Data until receiving a Ready message */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
/* DAP Step 6: Receive Data until receiving a Ready message */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
while(ret == NMXP_SOCKET_OK && type != NMXP_MSG_READY) {
while(ret == NMXP_SOCKET_OK && type != NMXP_MSG_READY) {
/* Process a packet and return value in NMXP_DATA_PROCESS structure */
pd = nmxp_processCompressedDataFunc(buffer, length, channelList_subset);
/* Process a packet and return value in NMXP_DATA_PROCESS structure */
pd = nmxp_processCompressedDataFunc(buffer, length, channelList_subset);
#ifdef HAVE_LIBMSEED
/* Write Mini-SEED record */
if(params.flag_writeseed) {
/* Write Mini-SEED record */
if(params.flag_writeseed) {
nmxp_data_msr_pack(pd, &data_seed);
nmxp_data_msr_pack(pd, &data_seed);
}
}
#endif
#ifdef HAVE___SRC_SEEDLINK_PLUGIN_C
/* Send data to SeedLink Server */
if(params.flag_writeseedlink) {
/* Send data to SeedLink Server */
if(params.flag_writeseedlink) {
/* TODO Set values */
const int usec_correction = 0;
const int timing_quality = 100;
/* TODO Set values */
const int usec_correction = 0;
const int timing_quality = 100;
send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
pd->pDataPtr, pd->nSamp);
send_raw_depoch(pd->station, pd->channel, pd->time, usec_correction, timing_quality,
pd->pDataPtr, pd->nSamp);
}
}
#endif
/* Log contents of last packet */
nmxp_data_log(pd);
/* Log contents of last packet */
nmxp_data_log(pd);
if(params.flag_writefile && outfile) {
/* Write buffer to the output file */
if(outfile && buffer && length > 0) {
int length_int = length;
nmxp_data_swap_4b((int32_t *) &length_int);
fwrite(&length_int, sizeof(length_int), 1, outfile);
fwrite(buffer, length, 1, outfile);
}
}
if(params.flag_writefile && outfile) {
/* Write buffer to the output file */
if(outfile && buffer && length > 0) {
int length_int = length;
nmxp_data_swap_4b((int32_t *) &length_int);
fwrite(&length_int, sizeof(length_int), 1, outfile);
fwrite(buffer, length, 1, outfile);
if(buffer) {
free(buffer);
}
/* Receive Data */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
}
if(buffer) {
free(buffer);
if(params.flag_writefile && outfile) {
/* Close output file */
fclose(outfile);
}
/* Receive Data */
ret = nmxp_receiveMessage(naqssock, &type, &buffer, &length);
nmxp_log(0, 1, "ret = %d, type = %d\n", ret, type);
}
#ifdef HAVE_LIBMSEED
if(params.flag_writeseed && data_seed.outfile_mseed) {
/* Close output Mini-SEED file */
fclose(data_seed.outfile_mseed);
}
#endif
if(params.flag_writefile && outfile) {
/* Close output file */
fclose(outfile);
}
i_chan++;
}
/* DAP Step 7: Repeat steps 5 and 6 for each data request */
#ifdef HAVE_LIBMSEED
if(params.flag_writeseed && data_seed.outfile_mseed) {
/* Close output Mini-SEED file */
fclose(data_seed.outfile_mseed);
}
#endif
/* DAP Step 8: Send a Terminate message (optional) */
nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");
/* DAP Step 9: Close the socket */
nmxp_closeSocket(naqssock);
/* ************************************************************ */
/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
/* ************************************************************ */
} else {
/* ************************************************************* */
/* Start subscription protocol "PRIVATE DATA STREAM" version 1.4 */
/* ************************************************************* */
/* PDS Step 1: Open a socket */
naqssock = nmxp_openSocket(params.hostname, params.portnumberpds);
if(naqssock == NMXP_SOCKET_ERROR) {
return 1;
}
i_chan++;
}
/* DAP Step 7: Repeat steps 5 and 6 for each data request */
/* DAP Step 8: Send a Terminate message (optional) */
nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Bye!");
/* PDS Step 2: Send a Connect */
if(nmxp_sendConnect(naqssock) != NMXP_SOCKET_OK) {
printf("Error on sendConnect()\n");
return 1;
}
/* PDS Step 3: Receive ChannelList */
if(nmxp_receiveChannelList(naqssock, &channelList) != NMXP_SOCKET_OK) {
printf("Error on receiveChannelList()\n");
return 1;
}
/* Get a subset of channel from arguments */
channelList_subset = nmxp_chan_subset(channelList, NMXP_DATA_TIMESERIES, params.channels);
/* PDS Step 4: Send a Request Pending (optional) */
/* DAP Step 9: Close the socket */
nmxp_closeSocket(naqssock);
/* ************************************************************ */
/* End subscription protocol "DATA ACCESS PROTOCOL" version 1.0 */
/* ************************************************************ */
/* PDS Step 5: Send AddChannels */
/* Request Data */
nmxp_sendAddTimeSeriesChannel(naqssock, channelList_subset, params.stc, params.rate, (params.flag_buffered)? NMXP_BUFFER_YES : NMXP_BUFFER_NO);
/* PDS Step 6: Repeat until finished: receive and handle packets */
while(1) {
/* Process Compressed or Decompressed Data */
nmxp_receiveData(naqssock, channelList_subset, &nmxp_data_log);
}
/* PDS Step 7: Send Terminate Subscription */
nmxp_sendTerminateSubscription(naqssock, NMXP_SHUTDOWN_NORMAL, "Good Bye!");
/* PDS Step 8: Close the socket */
nmxp_closeSocket(naqssock);
/* *********************************************************** */
/* End subscription protocol "PRIVATE DATA STREAM" version 1.4 */
/* *********************************************************** */
}
return 0;
......
......@@ -338,13 +338,8 @@ int nmxptool_check_params(NMXPTOOL_PARAMS *params) {
} else if(params->channels == NULL) {
ret = -1;
printf("<STA.CHAN> is required!\n");
} else if(params->start_time == 0) {
ret = -1;
printf("<start_time> is required!\n");
} else if(params->end_time == 0) {
ret = -1;
printf("<end_time> is required!\n");
} else if (params->start_time >= params->end_time) {
} else if(params->start_time != 0 && params->end_time != 0
&& params->start_time >= params->end_time) {
ret = -1;
printf("<start_time> is less than <end_time>!\n");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment