You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
622 lines
17 KiB
C++
622 lines
17 KiB
C++
/***************************************************************************
|
|
* Copyright (C) 2014 by gempa GmbH *
|
|
* *
|
|
* All Rights Reserved. *
|
|
* *
|
|
* NOTICE: All information contained herein is, and remains *
|
|
* the property of gempa GmbH and its suppliers, if any. The intellectual *
|
|
* and technical concepts contained herein are proprietary to gempa GmbH *
|
|
* and its suppliers. *
|
|
* Dissemination of this information or reproduction of this material *
|
|
* is strictly forbidden unless prior written permission is obtained *
|
|
* from gempa GmbH. *
|
|
***************************************************************************/
|
|
|
|
#include <gempa/caps/connection.h>
|
|
#include <gempa/caps/anypacket.h>
|
|
#include <gempa/caps/log.h>
|
|
#include <gempa/caps/mseedpacket.h>
|
|
#include <gempa/caps/metapacket.h>
|
|
#include <gempa/caps/rawpacket.h>
|
|
#include <gempa/caps/sessiontable.h>
|
|
#include <gempa/caps/utils.h>
|
|
|
|
#include <cstring>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <functional>
|
|
|
|
#include <sys/time.h>
|
|
#include <cerrno>
|
|
|
|
using namespace std;
|
|
|
|
namespace Gempa {
|
|
namespace CAPS {
|
|
|
|
namespace {
|
|
|
|
const Time InvalidTime = Time();
|
|
|
|
template <typename T>
|
|
bool fromString(T &value, const string &str);
|
|
|
|
template<> inline bool fromString(int &value, const string &str) {
|
|
stringstream ss(str);
|
|
ss >> value;
|
|
return !ss.bad();
|
|
}
|
|
|
|
}
|
|
|
|
Connection::Connection() {
|
|
_port = 18002;
|
|
_server = "localhost";
|
|
|
|
_metaMode = false;
|
|
_realtime = true;
|
|
_ssl = false;
|
|
|
|
_sessionTable = SessionTablePtr(new SessionTable());
|
|
_sessionTable->setItemAboutToBeRemovedFunc(
|
|
bind(&Connection::onItemAboutToBeRemoved, this, placeholders::_1)
|
|
);
|
|
|
|
reset();
|
|
}
|
|
|
|
Connection::~Connection() {
|
|
close();
|
|
}
|
|
|
|
bool Connection::setServer(const string &server) {
|
|
close();
|
|
reset();
|
|
|
|
size_t pos = server.rfind(':');
|
|
string addr = server;
|
|
int timeout = 300;
|
|
|
|
if ( pos == string::npos )
|
|
_server = addr;
|
|
else {
|
|
_server = addr.substr(0, pos);
|
|
if ( !fromString(_port, addr.substr(pos+1)) ) {
|
|
CAPS_ERROR("invalid source address: %s", addr.c_str());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ( timeout > 0 ) {
|
|
CAPS_DEBUG("setting socket timeout to %ds", timeout);
|
|
//_socket->setSocketTimeout(timeout,0);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void Connection::setCredentials(const std::string &user,
|
|
const std::string &password) {
|
|
if ( user.empty() || password.empty() ) {
|
|
_auth.clear();
|
|
CAPS_DEBUG("authentication deactivated");
|
|
}
|
|
else {
|
|
_auth = "AUTH " + user + " " + password;
|
|
CAPS_DEBUG("credentials set: %s:***", user.c_str());
|
|
}
|
|
}
|
|
|
|
Socket* Connection::createSocket() const {
|
|
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
|
|
return _ssl? new SSLSocket() : new Socket();
|
|
#else
|
|
return new Socket();
|
|
#endif
|
|
}
|
|
|
|
void Connection::disconnect() {
|
|
if ( _socket == NULL ) return;
|
|
|
|
CAPS_DEBUG("disconnecting");
|
|
if ( _state != Aborted )
|
|
_state = Error;
|
|
|
|
_socket->shutdown();
|
|
_socket->close();
|
|
_sessionTable->reset();
|
|
_currentID = - 1;
|
|
_currentItem = NULL;
|
|
}
|
|
|
|
void Connection::close() {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
// _state = Aborted;
|
|
disconnect();
|
|
_state = Aborted;
|
|
}
|
|
|
|
void Connection::abort() {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
if ( _state == Aborted ) {
|
|
CAPS_WARNING("abort already requested");
|
|
return;
|
|
}
|
|
|
|
if ( _socket->isValid() && _state == Active ) {
|
|
static string line("ABORT\n");
|
|
if ( _socket->write(line.c_str(), line.size()) != (int) line.size() ) {
|
|
CAPS_ERROR("could not send abort request");
|
|
disconnect();
|
|
}
|
|
else {
|
|
CAPS_DEBUG("abort command sent");
|
|
}
|
|
}
|
|
_state = Aborted;
|
|
}
|
|
|
|
bool Connection::setTimeout(int /*seconds*/) {
|
|
return false;
|
|
}
|
|
|
|
bool Connection::addStream(const string &net, const string &sta,
|
|
const string &loc, const string &cha) {
|
|
return addRequest(net, sta, loc, cha, _startTime, _endTime, false);
|
|
}
|
|
|
|
bool Connection::addStream(const string &net, const string &sta,
|
|
const string &loc, const string &cha,
|
|
const Time &stime,
|
|
const Time &etime) {
|
|
return addRequest(net, sta, loc, cha, stime, etime, false);
|
|
}
|
|
|
|
bool Connection::addRequest(const string &net, const string &sta,
|
|
const string &loc, const string &cha,
|
|
const Time &stime,
|
|
const Time &etime,
|
|
bool receivedData) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
if ( _state != EOD ) {
|
|
if ( _state == Active )
|
|
CAPS_WARNING("cannot add streams to an active session, invoke "
|
|
"abort() or close() first");
|
|
else
|
|
CAPS_WARNING("cannot add streams to an erroneous or aborted "
|
|
"session, invoke reset() first");
|
|
return false;
|
|
}
|
|
string streamID = net + "." + sta + "." + loc + "." + cha;
|
|
Request &req = _requests[streamID];
|
|
req.net = net;
|
|
req.sta = sta;
|
|
req.loc = loc;
|
|
req.cha = cha;
|
|
req.start = stime;
|
|
req.end = etime;
|
|
req.receivedData = receivedData;
|
|
return true;
|
|
}
|
|
|
|
DataRecord* Connection::next() {
|
|
if ( !handshake() ) return NULL;
|
|
|
|
while ( true ) {
|
|
{
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
// skip unread bytes of previous iteration and check connection state
|
|
if ( !seekToReadLimit() || _state == Error || _state == EOD )
|
|
return NULL;
|
|
}
|
|
|
|
ResponseHeader responseHeader;
|
|
if ( !responseHeader.get(_socketBuf) ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
if ( _state != Aborted)
|
|
CAPS_ERROR("could not read header");
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
|
|
CAPS_DEBUG("read header (id/size): %i/%lu", responseHeader.id,
|
|
(unsigned long)responseHeader.size);
|
|
_socketBuf.set_read_limit(responseHeader.size);
|
|
|
|
// if ( _abortRequested ) break;
|
|
|
|
// State or session table update
|
|
if ( responseHeader.id == 0 ) {
|
|
while ( responseHeader.size > 0 /*&& _state == Active*/) {
|
|
istream is(&_socketBuf);
|
|
if ( is.getline(_lineBuf, 200).fail() ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
CAPS_ERROR("header line exceeds maximum of 200 characters");
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
responseHeader.size -= is.gcount();
|
|
|
|
SessionTable::Status status =
|
|
_sessionTable->handleResponse(_lineBuf, is.gcount());
|
|
if ( status == SessionTable::Error ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
|
|
if ( status == SessionTable::EOD ) {
|
|
_state = EOD;
|
|
break;
|
|
}
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
CAPS_DEBUG("reading data record");
|
|
// data
|
|
if ( _currentID != responseHeader.id ) {
|
|
_currentItem = _sessionTable->getItem(responseHeader.id);
|
|
_currentID = responseHeader.id;
|
|
|
|
if ( _currentItem == NULL ) {
|
|
CAPS_WARNING("unknown data request ID %d", responseHeader.id);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
int size = responseHeader.size;
|
|
|
|
// To improve performance CAPS uses an optimized protocol
|
|
// to deliver RAW packets. The RAW packet class can't be used
|
|
// to read the header from socket.
|
|
DataRecord *dataRecord = NULL;
|
|
if ( _currentItem->type == RawDataPacket ) {
|
|
// Read optimized header
|
|
RawResponseHeader rawResponseHeader;
|
|
if ( !rawResponseHeader.get(_socketBuf) ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
CAPS_ERROR("failed to extract raw response header %s",
|
|
_currentItem->streamID.c_str());
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
|
|
Time time(rawResponseHeader.timeSeconds, rawResponseHeader.timeMicroSeconds);
|
|
size -= rawResponseHeader.dataSize();
|
|
|
|
// Create raw record, set header and read payload
|
|
RawDataRecord *record = new RawDataRecord;
|
|
record->setStartTime(time);
|
|
record->setSamplingFrequency(_currentItem->samplingFrequency,
|
|
_currentItem->samplingFrequencyDivider);
|
|
record->setDataType(_currentItem->dataType);
|
|
record->getData(_socketBuf, size);
|
|
|
|
dataRecord = record;
|
|
}
|
|
else if ( _currentItem->type == ANYPacket ) {
|
|
AnyDataRecord *record = new AnyDataRecord;
|
|
record->get(_socketBuf, size, InvalidTime, InvalidTime, size);
|
|
dataRecord = record;
|
|
}
|
|
else if ( _currentItem->type == MSEEDPacket ) {
|
|
MSEEDDataRecord *record = new MSEEDDataRecord;
|
|
record->get(_socketBuf, size, InvalidTime, InvalidTime, size);
|
|
dataRecord = record;
|
|
}
|
|
else if ( _currentItem->type == MetaDataPacket ) {
|
|
// Read start time from optimized header
|
|
MetaResponseHeader metaResponseHeader;
|
|
if ( !metaResponseHeader.get(_socketBuf) ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
CAPS_ERROR("failed to extract meta information from stream %s",
|
|
_currentItem->streamID.c_str());
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
|
|
Time stime(metaResponseHeader.startTime.seconds,
|
|
metaResponseHeader.endTime.microSeconds),
|
|
etime(metaResponseHeader.endTime.seconds,
|
|
metaResponseHeader.endTime.microSeconds);
|
|
|
|
size -= metaResponseHeader.dataSize();
|
|
|
|
MetaDataRecord::MetaHeader recHeader;
|
|
recHeader.dataHeader.setSamplingTime(stime);
|
|
recHeader.setEndTime(etime);
|
|
recHeader.dataHeader.samplingFrequencyNumerator = _currentItem->samplingFrequency;
|
|
recHeader.dataHeader.samplingFrequencyDenominator = _currentItem->samplingFrequencyDivider;
|
|
recHeader.dataHeader.dataType = _currentItem->dataType;
|
|
|
|
MetaDataRecord *record = new MetaDataRecord;
|
|
record->setHeader(recHeader);
|
|
dataRecord = record;
|
|
}
|
|
else {
|
|
CAPS_ERROR("received unknown packet type %d in stream %s",
|
|
_currentItem->type, _currentItem->streamID.c_str());
|
|
}
|
|
|
|
if ( dataRecord == NULL ) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
disconnect();
|
|
return NULL;
|
|
}
|
|
|
|
CAPS_DEBUG("data record read");
|
|
|
|
RequestList::iterator it = _requests.find(_currentItem->streamID);
|
|
if ( it == _requests.end() ) {
|
|
// TODO: Search request map for wildcard match. Add entry with
|
|
// streamID -> (record->endTime, wildcardItem->endTime) to
|
|
// restrict data query in case of reconnect
|
|
|
|
// CAPS_WARNING("received unrequested record: %s: %s - %s",
|
|
// _currentItem->streamID.c_str(),
|
|
// dataRecord->startTime().iso().c_str(),
|
|
// dataRecord->endTime().iso().c_str());
|
|
}
|
|
// Update request map to reflect current stream state
|
|
else if ( dataRecord->endTime() > it->second.start ) {
|
|
it->second.start = dataRecord->endTime();
|
|
it->second.receivedData = true;
|
|
}
|
|
|
|
return dataRecord;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
bool Connection::handshake() {
|
|
{
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
if ( _state == Error || _state == Aborted) {
|
|
CAPS_ERROR("cannot read from an erroneous or aborted session, invoke "
|
|
"reset() first");
|
|
return false;
|
|
}
|
|
else if ( _state == Active )
|
|
return true;
|
|
// _state is set to EOD
|
|
else if ( _requests.empty() ) {
|
|
CAPS_WARNING("no stream requested, invoke addStream() first");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ( _socket == NULL ) {
|
|
_socket = SocketPtr(createSocket());
|
|
if ( _socket == NULL ) return false;
|
|
}
|
|
|
|
// Connect to server if necessary
|
|
while ( !_socket->isValid() && _state == EOD ) {
|
|
if ( _socket->connect(_server, _port) == Socket::Success ) {
|
|
_socketBuf.setsocket(_socket.get());
|
|
CAPS_DEBUG("connection to %s:%d established", _server.c_str(), _port);
|
|
// _state = ios_base::eofbit;
|
|
break;
|
|
}
|
|
|
|
CAPS_WARNING("unable to connect to %s:%d, retrying in 5 seconds",
|
|
_server.c_str(), _port);
|
|
|
|
// Wait 5 seconds and keep response latency low
|
|
for ( int i = 0; (i < 10) && _state == EOD; ++i )
|
|
usleep(500000);
|
|
}
|
|
|
|
|
|
if ( _state != EOD ) return false;
|
|
|
|
// if ( _socket->isValid() ) {
|
|
// // Read all data from session
|
|
// if ( !seekToReadLimit() ) return false;
|
|
//// fd_set set;
|
|
//// FD_ZERO(&set);
|
|
//// FD_SET(_socket->fd(), &set);
|
|
//// timeval t = (struct timeval) {0};
|
|
|
|
//// /* select returns 0 on timeout, 1 if input/output is available, -1 on error. */
|
|
//// int retn = TEMP_FAILURE_RETRY(select(_socket->fd()+1, &set, NULL, NULL, &t));
|
|
//// if ( retn != 0 ) CAPS_ERROR("UUPS");
|
|
// }
|
|
|
|
// if ( _state == Aborted ) return false;
|
|
|
|
// Request streams
|
|
stringstream req;
|
|
|
|
if ( !_auth.empty() )
|
|
req << _auth << endl;
|
|
|
|
req << "BEGIN REQUEST" << endl
|
|
<< "META " << (_metaMode ? "ON" : "OFF") << endl
|
|
<< "REALTIME " << ( _realtime ? "ON" : "OFF") << endl;
|
|
|
|
// First pass: continue all previous streams
|
|
for ( RequestList::const_iterator it = _requests.begin();
|
|
it != _requests.end() && _state == EOD; ++it ) {
|
|
if ( it->second.receivedData )
|
|
formatRequest(req, it);
|
|
}
|
|
|
|
// Second pass: subscribe to remaining streams
|
|
for ( RequestList::iterator it = _requests.begin();
|
|
it != _requests.end() && _state == EOD; ++it ) {
|
|
if ( !it->second.receivedData )
|
|
formatRequest(req, it);
|
|
}
|
|
|
|
req << "END" << endl;
|
|
|
|
return sendRequest(req.str());
|
|
}
|
|
|
|
bool Connection::sendRequest(const string &req) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
|
|
if ( _state != EOD ) return false;
|
|
|
|
CAPS_DEBUG("%s", req.c_str());
|
|
if ( _socket->write(req.c_str(), req.size()) != (int) req.size() ) {
|
|
CAPS_ERROR("could not send data request");
|
|
disconnect();
|
|
return false;
|
|
}
|
|
|
|
Gempa::CAPS::ResponseHeader header;
|
|
if ( !header.get(_socketBuf) ) {
|
|
// Case to retry, connection closed by peer
|
|
CAPS_ERROR("could not read data request response header");
|
|
disconnect();
|
|
return false;
|
|
}
|
|
|
|
_socketBuf.set_read_limit(header.size);
|
|
if ( header.id != 0 ) {
|
|
CAPS_ERROR("invalid data request response header id, expected 0, got %d", header.id);
|
|
disconnect();
|
|
return false;
|
|
}
|
|
CAPS_DEBUG("data request response size: %lu", (unsigned long) header.size);
|
|
|
|
istream is(&_socketBuf);
|
|
// check line length
|
|
if ( is.getline(_lineBuf, 200).fail() )
|
|
CAPS_ERROR("data request response line exceeds maximum of 200 characters");
|
|
// skip remaining header data
|
|
else if ( !seekToReadLimit(false) ) {
|
|
CAPS_ERROR("could not seek to data request response header end");
|
|
return false;
|
|
}
|
|
|
|
if ( strncasecmp(_lineBuf, "ERROR:", 6) == 0 )
|
|
CAPS_ERROR("server responded to data request with: %s", _lineBuf);
|
|
else if ( strncasecmp(_lineBuf, "STATUS OK", 9) != 0 )
|
|
CAPS_ERROR("invalid data request response: %s", _lineBuf);
|
|
else {
|
|
CAPS_DEBUG("handshake complete");
|
|
_state = Active;
|
|
return true;
|
|
}
|
|
|
|
disconnect();
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Connection::seekToReadLimit(bool log) {
|
|
if ( !_socket->isValid() ) {
|
|
disconnect();
|
|
return false;
|
|
}
|
|
|
|
// Skip unread bytes from previous record
|
|
int skippies = _socketBuf.read_limit();
|
|
if ( skippies > 0 ) {
|
|
if ( log )
|
|
CAPS_WARNING("no seemless reading, skipping %d bytes", skippies);
|
|
if ( _socketBuf.pubseekoff(skippies, ios_base::cur, ios_base::in) < 0 ) {
|
|
CAPS_ERROR("could not seek to next header");
|
|
disconnect();
|
|
return false;
|
|
}
|
|
}
|
|
_socketBuf.set_read_limit(-1);
|
|
return true;
|
|
}
|
|
|
|
void Connection::formatRequest(stringstream& req, RequestList::const_iterator it) {
|
|
req << "STREAM ADD " << it->first << endl;
|
|
req << "TIME ";
|
|
|
|
int year, mon, day, hour, minute, second;
|
|
|
|
if ( it->second.start.valid() ) {
|
|
it->second.start.get(&year, &mon, &day, &hour, &minute, &second);
|
|
req << year << "," << mon << "," << day << ","
|
|
<< hour << "," << minute << "," << second;
|
|
if ( it->second.start.microseconds() > 0 ) {
|
|
req << "," << setfill('0') << setw(6)
|
|
<< it->second.start.microseconds() << setw(0);
|
|
}
|
|
}
|
|
|
|
req << ":";
|
|
|
|
if ( it->second.end.valid() ) {
|
|
it->second.end.get(&year, &mon, &day, &hour, &minute, &second);
|
|
req << year << "," << mon << "," << day << ","
|
|
<< hour << "," << minute << "," << second;
|
|
if ( it->second.end.microseconds() > 0 ) {
|
|
req << "," << setfill('0') << setw(6)
|
|
<< it->second.end.microseconds() << setw(0);
|
|
}
|
|
}
|
|
|
|
req << endl;
|
|
}
|
|
|
|
void Connection::onItemAboutToBeRemoved(const SessionTableItem *item) {
|
|
if ( _currentItem == item) {
|
|
_currentID = -1;
|
|
_currentItem = NULL;
|
|
}
|
|
|
|
// remove request since all data has been received
|
|
if ( item != NULL ) _requests.erase(item->streamID);
|
|
}
|
|
|
|
void Connection::reset(bool clearStreams) {
|
|
boost::mutex::scoped_lock l(_mutex);
|
|
CAPS_DEBUG("resetting connection");
|
|
if ( _state == Active ) {
|
|
CAPS_WARNING("cannot reset an active connection, invoking close()");
|
|
close();
|
|
}
|
|
// else if ( _state == Aborted ) {
|
|
// fd_set set;
|
|
// FD_ZERO(&set);
|
|
// FD_SET(_socket->fd(), &set);
|
|
// timeval t = (struct timeval) {0};
|
|
|
|
// /* select returns 0 on timeout, 1 if input/output is available, -1 on error. */
|
|
// while ( _socket->isValid() ) {
|
|
// int retn = TEMP_FAILURE_RETRY(select(_socket->fd()+1, &set, NULL, NULL, &t));
|
|
// if ( retn == 0 ) break;
|
|
|
|
// if ( retn != 0 ) CAPS_ERROR("UUPS");
|
|
|
|
// }
|
|
|
|
_state = EOD;
|
|
if ( clearStreams )
|
|
_requests.clear();
|
|
}
|
|
|
|
void Connection::setStartTime(const Time &stime) {
|
|
_startTime = stime;
|
|
CAPS_DEBUG("set global start time to %s", _startTime.toString("%F %T.%f").c_str());
|
|
}
|
|
|
|
void Connection::setEndTime(const Time &etime) {
|
|
_endTime = etime;
|
|
CAPS_DEBUG("set global end time to %s", _endTime.toString("%F %T.%f").c_str());
|
|
}
|
|
|
|
void Connection::setTimeWindow(const Time &stime, const Time &etime) {
|
|
_startTime = stime;
|
|
_endTime = etime;
|
|
CAPS_DEBUG("set global timewindow to %s~%s", _startTime.toString("%F %T.%f").c_str(),
|
|
_endTime.toString("%F %T.%f").c_str());
|
|
}
|
|
|
|
|
|
}
|
|
}
|