You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

622 lines
17 KiB
C++

/***************************************************************************
* Copyright (C) 2014 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#include <gempa/caps/connection.h>
#include <gempa/caps/anypacket.h>
#include <gempa/caps/log.h>
#include <gempa/caps/mseedpacket.h>
#include <gempa/caps/metapacket.h>
#include <gempa/caps/rawpacket.h>
#include <gempa/caps/sessiontable.h>
#include <gempa/caps/utils.h>
#include <cstring>
#include <sstream>
#include <iomanip>
#include <functional>
#include <sys/time.h>
#include <cerrno>
using namespace std;
namespace Gempa {
namespace CAPS {
namespace {
const Time InvalidTime = Time();
template <typename T>
bool fromString(T &value, const string &str);
template<> inline bool fromString(int &value, const string &str) {
stringstream ss(str);
ss >> value;
return !ss.bad();
}
}
Connection::Connection() {
_port = 18002;
_server = "localhost";
_metaMode = false;
_realtime = true;
_ssl = false;
_sessionTable = SessionTablePtr(new SessionTable());
_sessionTable->setItemAboutToBeRemovedFunc(
bind(&Connection::onItemAboutToBeRemoved, this, placeholders::_1)
);
reset();
}
Connection::~Connection() {
close();
}
bool Connection::setServer(const string &server) {
close();
reset();
size_t pos = server.rfind(':');
string addr = server;
int timeout = 300;
if ( pos == string::npos )
_server = addr;
else {
_server = addr.substr(0, pos);
if ( !fromString(_port, addr.substr(pos+1)) ) {
CAPS_ERROR("invalid source address: %s", addr.c_str());
return false;
}
}
if ( timeout > 0 ) {
CAPS_DEBUG("setting socket timeout to %ds", timeout);
//_socket->setSocketTimeout(timeout,0);
}
return true;
}
void Connection::setCredentials(const std::string &user,
const std::string &password) {
if ( user.empty() || password.empty() ) {
_auth.clear();
CAPS_DEBUG("authentication deactivated");
}
else {
_auth = "AUTH " + user + " " + password;
CAPS_DEBUG("credentials set: %s:***", user.c_str());
}
}
Socket* Connection::createSocket() const {
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
return _ssl? new SSLSocket() : new Socket();
#else
return new Socket();
#endif
}
void Connection::disconnect() {
if ( _socket == NULL ) return;
CAPS_DEBUG("disconnecting");
if ( _state != Aborted )
_state = Error;
_socket->shutdown();
_socket->close();
_sessionTable->reset();
_currentID = - 1;
_currentItem = NULL;
}
void Connection::close() {
boost::mutex::scoped_lock l(_mutex);
// _state = Aborted;
disconnect();
_state = Aborted;
}
void Connection::abort() {
boost::mutex::scoped_lock l(_mutex);
if ( _state == Aborted ) {
CAPS_WARNING("abort already requested");
return;
}
if ( _socket->isValid() && _state == Active ) {
static string line("ABORT\n");
if ( _socket->write(line.c_str(), line.size()) != (int) line.size() ) {
CAPS_ERROR("could not send abort request");
disconnect();
}
else {
CAPS_DEBUG("abort command sent");
}
}
_state = Aborted;
}
bool Connection::setTimeout(int /*seconds*/) {
return false;
}
bool Connection::addStream(const string &net, const string &sta,
const string &loc, const string &cha) {
return addRequest(net, sta, loc, cha, _startTime, _endTime, false);
}
bool Connection::addStream(const string &net, const string &sta,
const string &loc, const string &cha,
const Time &stime,
const Time &etime) {
return addRequest(net, sta, loc, cha, stime, etime, false);
}
bool Connection::addRequest(const string &net, const string &sta,
const string &loc, const string &cha,
const Time &stime,
const Time &etime,
bool receivedData) {
boost::mutex::scoped_lock l(_mutex);
if ( _state != EOD ) {
if ( _state == Active )
CAPS_WARNING("cannot add streams to an active session, invoke "
"abort() or close() first");
else
CAPS_WARNING("cannot add streams to an erroneous or aborted "
"session, invoke reset() first");
return false;
}
string streamID = net + "." + sta + "." + loc + "." + cha;
Request &req = _requests[streamID];
req.net = net;
req.sta = sta;
req.loc = loc;
req.cha = cha;
req.start = stime;
req.end = etime;
req.receivedData = receivedData;
return true;
}
DataRecord* Connection::next() {
if ( !handshake() ) return NULL;
while ( true ) {
{
boost::mutex::scoped_lock l(_mutex);
// skip unread bytes of previous iteration and check connection state
if ( !seekToReadLimit() || _state == Error || _state == EOD )
return NULL;
}
ResponseHeader responseHeader;
if ( !responseHeader.get(_socketBuf) ) {
boost::mutex::scoped_lock l(_mutex);
if ( _state != Aborted)
CAPS_ERROR("could not read header");
disconnect();
return NULL;
}
CAPS_DEBUG("read header (id/size): %i/%lu", responseHeader.id,
(unsigned long)responseHeader.size);
_socketBuf.set_read_limit(responseHeader.size);
// if ( _abortRequested ) break;
// State or session table update
if ( responseHeader.id == 0 ) {
while ( responseHeader.size > 0 /*&& _state == Active*/) {
istream is(&_socketBuf);
if ( is.getline(_lineBuf, 200).fail() ) {
boost::mutex::scoped_lock l(_mutex);
CAPS_ERROR("header line exceeds maximum of 200 characters");
disconnect();
return NULL;
}
responseHeader.size -= is.gcount();
SessionTable::Status status =
_sessionTable->handleResponse(_lineBuf, is.gcount());
if ( status == SessionTable::Error ) {
boost::mutex::scoped_lock l(_mutex);
disconnect();
return NULL;
}
if ( status == SessionTable::EOD ) {
_state = EOD;
break;
}
}
continue;
}
CAPS_DEBUG("reading data record");
// data
if ( _currentID != responseHeader.id ) {
_currentItem = _sessionTable->getItem(responseHeader.id);
_currentID = responseHeader.id;
if ( _currentItem == NULL ) {
CAPS_WARNING("unknown data request ID %d", responseHeader.id);
continue;
}
}
int size = responseHeader.size;
// To improve performance CAPS uses an optimized protocol
// to deliver RAW packets. The RAW packet class can't be used
// to read the header from socket.
DataRecord *dataRecord = NULL;
if ( _currentItem->type == RawDataPacket ) {
// Read optimized header
RawResponseHeader rawResponseHeader;
if ( !rawResponseHeader.get(_socketBuf) ) {
boost::mutex::scoped_lock l(_mutex);
CAPS_ERROR("failed to extract raw response header %s",
_currentItem->streamID.c_str());
disconnect();
return NULL;
}
Time time(rawResponseHeader.timeSeconds, rawResponseHeader.timeMicroSeconds);
size -= rawResponseHeader.dataSize();
// Create raw record, set header and read payload
RawDataRecord *record = new RawDataRecord;
record->setStartTime(time);
record->setSamplingFrequency(_currentItem->samplingFrequency,
_currentItem->samplingFrequencyDivider);
record->setDataType(_currentItem->dataType);
record->getData(_socketBuf, size);
dataRecord = record;
}
else if ( _currentItem->type == ANYPacket ) {
AnyDataRecord *record = new AnyDataRecord;
record->get(_socketBuf, size, InvalidTime, InvalidTime, size);
dataRecord = record;
}
else if ( _currentItem->type == MSEEDPacket ) {
MSEEDDataRecord *record = new MSEEDDataRecord;
record->get(_socketBuf, size, InvalidTime, InvalidTime, size);
dataRecord = record;
}
else if ( _currentItem->type == MetaDataPacket ) {
// Read start time from optimized header
MetaResponseHeader metaResponseHeader;
if ( !metaResponseHeader.get(_socketBuf) ) {
boost::mutex::scoped_lock l(_mutex);
CAPS_ERROR("failed to extract meta information from stream %s",
_currentItem->streamID.c_str());
disconnect();
return NULL;
}
Time stime(metaResponseHeader.startTime.seconds,
metaResponseHeader.endTime.microSeconds),
etime(metaResponseHeader.endTime.seconds,
metaResponseHeader.endTime.microSeconds);
size -= metaResponseHeader.dataSize();
MetaDataRecord::MetaHeader recHeader;
recHeader.dataHeader.setSamplingTime(stime);
recHeader.setEndTime(etime);
recHeader.dataHeader.samplingFrequencyNumerator = _currentItem->samplingFrequency;
recHeader.dataHeader.samplingFrequencyDenominator = _currentItem->samplingFrequencyDivider;
recHeader.dataHeader.dataType = _currentItem->dataType;
MetaDataRecord *record = new MetaDataRecord;
record->setHeader(recHeader);
dataRecord = record;
}
else {
CAPS_ERROR("received unknown packet type %d in stream %s",
_currentItem->type, _currentItem->streamID.c_str());
}
if ( dataRecord == NULL ) {
boost::mutex::scoped_lock l(_mutex);
disconnect();
return NULL;
}
CAPS_DEBUG("data record read");
RequestList::iterator it = _requests.find(_currentItem->streamID);
if ( it == _requests.end() ) {
// TODO: Search request map for wildcard match. Add entry with
// streamID -> (record->endTime, wildcardItem->endTime) to
// restrict data query in case of reconnect
// CAPS_WARNING("received unrequested record: %s: %s - %s",
// _currentItem->streamID.c_str(),
// dataRecord->startTime().iso().c_str(),
// dataRecord->endTime().iso().c_str());
}
// Update request map to reflect current stream state
else if ( dataRecord->endTime() > it->second.start ) {
it->second.start = dataRecord->endTime();
it->second.receivedData = true;
}
return dataRecord;
}
return NULL;
}
bool Connection::handshake() {
{
boost::mutex::scoped_lock l(_mutex);
if ( _state == Error || _state == Aborted) {
CAPS_ERROR("cannot read from an erroneous or aborted session, invoke "
"reset() first");
return false;
}
else if ( _state == Active )
return true;
// _state is set to EOD
else if ( _requests.empty() ) {
CAPS_WARNING("no stream requested, invoke addStream() first");
return false;
}
}
if ( _socket == NULL ) {
_socket = SocketPtr(createSocket());
if ( _socket == NULL ) return false;
}
// Connect to server if necessary
while ( !_socket->isValid() && _state == EOD ) {
if ( _socket->connect(_server, _port) == Socket::Success ) {
_socketBuf.setsocket(_socket.get());
CAPS_DEBUG("connection to %s:%d established", _server.c_str(), _port);
// _state = ios_base::eofbit;
break;
}
CAPS_WARNING("unable to connect to %s:%d, retrying in 5 seconds",
_server.c_str(), _port);
// Wait 5 seconds and keep response latency low
for ( int i = 0; (i < 10) && _state == EOD; ++i )
usleep(500000);
}
if ( _state != EOD ) return false;
// if ( _socket->isValid() ) {
// // Read all data from session
// if ( !seekToReadLimit() ) return false;
//// fd_set set;
//// FD_ZERO(&set);
//// FD_SET(_socket->fd(), &set);
//// timeval t = (struct timeval) {0};
//// /* select returns 0 on timeout, 1 if input/output is available, -1 on error. */
//// int retn = TEMP_FAILURE_RETRY(select(_socket->fd()+1, &set, NULL, NULL, &t));
//// if ( retn != 0 ) CAPS_ERROR("UUPS");
// }
// if ( _state == Aborted ) return false;
// Request streams
stringstream req;
if ( !_auth.empty() )
req << _auth << endl;
req << "BEGIN REQUEST" << endl
<< "META " << (_metaMode ? "ON" : "OFF") << endl
<< "REALTIME " << ( _realtime ? "ON" : "OFF") << endl;
// First pass: continue all previous streams
for ( RequestList::const_iterator it = _requests.begin();
it != _requests.end() && _state == EOD; ++it ) {
if ( it->second.receivedData )
formatRequest(req, it);
}
// Second pass: subscribe to remaining streams
for ( RequestList::iterator it = _requests.begin();
it != _requests.end() && _state == EOD; ++it ) {
if ( !it->second.receivedData )
formatRequest(req, it);
}
req << "END" << endl;
return sendRequest(req.str());
}
bool Connection::sendRequest(const string &req) {
boost::mutex::scoped_lock l(_mutex);
if ( _state != EOD ) return false;
CAPS_DEBUG("%s", req.c_str());
if ( _socket->write(req.c_str(), req.size()) != (int) req.size() ) {
CAPS_ERROR("could not send data request");
disconnect();
return false;
}
Gempa::CAPS::ResponseHeader header;
if ( !header.get(_socketBuf) ) {
// Case to retry, connection closed by peer
CAPS_ERROR("could not read data request response header");
disconnect();
return false;
}
_socketBuf.set_read_limit(header.size);
if ( header.id != 0 ) {
CAPS_ERROR("invalid data request response header id, expected 0, got %d", header.id);
disconnect();
return false;
}
CAPS_DEBUG("data request response size: %lu", (unsigned long) header.size);
istream is(&_socketBuf);
// check line length
if ( is.getline(_lineBuf, 200).fail() )
CAPS_ERROR("data request response line exceeds maximum of 200 characters");
// skip remaining header data
else if ( !seekToReadLimit(false) ) {
CAPS_ERROR("could not seek to data request response header end");
return false;
}
if ( strncasecmp(_lineBuf, "ERROR:", 6) == 0 )
CAPS_ERROR("server responded to data request with: %s", _lineBuf);
else if ( strncasecmp(_lineBuf, "STATUS OK", 9) != 0 )
CAPS_ERROR("invalid data request response: %s", _lineBuf);
else {
CAPS_DEBUG("handshake complete");
_state = Active;
return true;
}
disconnect();
return false;
}
bool Connection::seekToReadLimit(bool log) {
if ( !_socket->isValid() ) {
disconnect();
return false;
}
// Skip unread bytes from previous record
int skippies = _socketBuf.read_limit();
if ( skippies > 0 ) {
if ( log )
CAPS_WARNING("no seemless reading, skipping %d bytes", skippies);
if ( _socketBuf.pubseekoff(skippies, ios_base::cur, ios_base::in) < 0 ) {
CAPS_ERROR("could not seek to next header");
disconnect();
return false;
}
}
_socketBuf.set_read_limit(-1);
return true;
}
void Connection::formatRequest(stringstream& req, RequestList::const_iterator it) {
req << "STREAM ADD " << it->first << endl;
req << "TIME ";
int year, mon, day, hour, minute, second;
if ( it->second.start.valid() ) {
it->second.start.get(&year, &mon, &day, &hour, &minute, &second);
req << year << "," << mon << "," << day << ","
<< hour << "," << minute << "," << second;
if ( it->second.start.microseconds() > 0 ) {
req << "," << setfill('0') << setw(6)
<< it->second.start.microseconds() << setw(0);
}
}
req << ":";
if ( it->second.end.valid() ) {
it->second.end.get(&year, &mon, &day, &hour, &minute, &second);
req << year << "," << mon << "," << day << ","
<< hour << "," << minute << "," << second;
if ( it->second.end.microseconds() > 0 ) {
req << "," << setfill('0') << setw(6)
<< it->second.end.microseconds() << setw(0);
}
}
req << endl;
}
void Connection::onItemAboutToBeRemoved(const SessionTableItem *item) {
if ( _currentItem == item) {
_currentID = -1;
_currentItem = NULL;
}
// remove request since all data has been received
if ( item != NULL ) _requests.erase(item->streamID);
}
void Connection::reset(bool clearStreams) {
boost::mutex::scoped_lock l(_mutex);
CAPS_DEBUG("resetting connection");
if ( _state == Active ) {
CAPS_WARNING("cannot reset an active connection, invoking close()");
close();
}
// else if ( _state == Aborted ) {
// fd_set set;
// FD_ZERO(&set);
// FD_SET(_socket->fd(), &set);
// timeval t = (struct timeval) {0};
// /* select returns 0 on timeout, 1 if input/output is available, -1 on error. */
// while ( _socket->isValid() ) {
// int retn = TEMP_FAILURE_RETRY(select(_socket->fd()+1, &set, NULL, NULL, &t));
// if ( retn == 0 ) break;
// if ( retn != 0 ) CAPS_ERROR("UUPS");
// }
_state = EOD;
if ( clearStreams )
_requests.clear();
}
void Connection::setStartTime(const Time &stime) {
_startTime = stime;
CAPS_DEBUG("set global start time to %s", _startTime.toString("%F %T.%f").c_str());
}
void Connection::setEndTime(const Time &etime) {
_endTime = etime;
CAPS_DEBUG("set global end time to %s", _endTime.toString("%F %T.%f").c_str());
}
void Connection::setTimeWindow(const Time &stime, const Time &etime) {
_startTime = stime;
_endTime = etime;
CAPS_DEBUG("set global timewindow to %s~%s", _startTime.toString("%F %T.%f").c_str(),
_endTime.toString("%F %T.%f").c_str());
}
}
}