You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1290 lines
34 KiB
C++

/***************************************************************************
* Copyright (C) 2013 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#include <gempa/caps/endianess.h>
#include <gempa/caps/log.h>
#include <gempa/caps/plugin.h>
#include <gempa/caps/utils.h>
#ifdef WIN32
#define EWOULDBLOCK WSAEWOULDBLOCK
#endif
#if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY
#include <gempa/caps/anypacket.h>
#endif
#if !defined(CAPS_FEATURES_MSEED) || CAPS_FEATURES_MSEED
#include <gempa/caps/mseedpacket.h>
#endif
#if !defined(CAPS_FEATURES_RAW) || CAPS_FEATURES_RAW
#include <gempa/caps/rawpacket.h>
#endif
#if !defined(CAPS_FEATURES_RTCM2) || CAPS_FEATURES_RTCM2
#include <gempa/caps/rtcm2packet.h>
#endif
#include <boost/version.hpp>
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
#include <boost/filesystem/convenience.hpp>
#endif
#include <boost/algorithm/string.hpp>
#include <cstring>
#include <fstream>
#include <sstream>
#include <cerrno>
#include <cstdarg>
using namespace std;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
namespace fs = boost::filesystem;
#endif
//see, http://www.boost.org/doc/libs/1_51_0/libs/filesystem/doc/deprecated.html
#if BOOST_FILESYSTEM_VERSION >= 3
#define BOOST_FILESYSTEM_NO_DEPRECATED
// path
#define FS_PATH(PATH_STR) boost::filesystem::path(PATH_STR)
#define FS_DECLARE_PATH(NAME, PATH_STR) \
boost::filesystem::path NAME(PATH_STR);
#define FS_HAS_PARENT_PATH(PATH) PATH.has_parent_path()
#define FS_PARENT_PATH(PATH) PATH.parent_path()
#else
// path
#define FS_PATH(PATH_STR) boost::filesystem::path(PATH_STR,\
boost::filesystem::native)
#define FS_DECLARE_PATH(NAME, PATH_STR) \
boost::filesystem::path NAME(PATH_STR, boost::filesystem::native);
#if BOOST_VERSION < 103600
#define FS_HAS_PARENT_PATH(PATH) PATH.has_branch_path()
#define FS_PARENT_PATH(PATH) PATH.branch_path()
#else
#define FS_HAS_PARENT_PATH(PATH) PATH.has_parent_path()
#define FS_PARENT_PATH(PATH) PATH.parent_path()
#endif
#endif
namespace {
#define LOG_CHANNEL(out, fmt) \
va_list ap;\
va_start(ap, fmt);\
fprintf(stderr, #out" "); vfprintf(stderr, fmt, ap); fprintf(stderr, "\n");\
va_end(ap)
void LogError(const char *fmt, ...) {
LOG_CHANNEL(ERROR, fmt);
}
void LogWarning(const char *fmt, ...) {
LOG_CHANNEL(WARNING, fmt);
}
void LogNotice(const char *fmt, ...) {
LOG_CHANNEL(NOTICE, fmt);
}
void LogInfo(const char *fmt, ...) {
LOG_CHANNEL(INFO, fmt);
}
void LogDebug(const char *fmt, ...) {
LOG_CHANNEL(DEBUG, fmt);
}
}
namespace Gempa {
namespace CAPS {
namespace {
#define HELLO_REQUEST "HELLO\n"
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool createPath(const string &dir) {
try {
FS_DECLARE_PATH(path, dir)
fs::is_directory(path);
fs::create_directories(path);
return true;
} catch ( ... ) {
return false;
}
}
#endif
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
//void dump(const Plugin::BackfillingBuffer &buf) {
// Plugin::BackfillingBuffer::const_iterator it;
// int idx = 0;
// for ( it = buf.begin(); it != buf.end(); ++it, ++idx ) {
// std::cerr << idx << ": " << (*it)->record->startTime().iso() << std::endl;
// }
//}
template<typename T> void dumpSamples(RawDataRecord *rec, size_t sampleCount) {
const vector<T> *data = reinterpret_cast<vector<T>* >(rec->data());
for ( size_t i = 0; i < sampleCount; ++i ) {
cout << data->at(i) << endl;
}
}
void dumpSList(Packet *packet) {
RawDataRecord *rec = static_cast<RawDataRecord*>(packet->record.get());
string id = packet->streamID;
boost::replace_all(id, ".", "_");
string dataType = packet->record->formatName();
boost::replace_all(dataType, "RAW/", "");
DataType dt = rec->header()->dataType;
size_t sampleCount = rec->data()->size() / dataTypeSize(dt);
cout << "TIMESERIES " << id << "_R, "
<< sampleCount << " samples, "
<< static_cast<float>(rec->header()->samplingFrequencyNumerator) / rec->header()->samplingFrequencyDenominator << " sps, "
<< packet->record->startTime().iso() << " SLIST, "
<< dataType << ", " << packet->uom << endl;
if ( dt == DT_INT8 ) {
dumpSamples<int8_t>(rec, sampleCount);
}
else if ( dt == DT_INT16 ) {
dumpSamples<int16_t>(rec, sampleCount);
}
else if ( dt == DT_INT32 ) {
dumpSamples<int32_t>(rec, sampleCount);
}
else if ( dt == DT_INT64 ) {
dumpSamples<int64_t>(rec, sampleCount);
}
else if ( dt == DT_FLOAT ) {
dumpSamples<float>(rec, sampleCount);
}
else if ( dt == DT_DOUBLE ) {
dumpSamples<double>(rec, sampleCount);
}
}
template<typename T> void fillPacket(Packet *packet, uint16_t version) {
PacketDataHeader header;
header.packetType = packet->record->packetType();
header.setUOM(packet->uom.c_str());
T packetHeader;
uint32_t size = packetHeader.dataSize();
size_t dataSize = packet->record->dataSize();
size += header.dataSize() + dataSize +
packet->networkCode.size() +
packet->stationCode.size() +
packet->locationCode.size() +
packet->channelCode.size();
Plugin::BufferPtr buf = Plugin::BufferPtr(new Plugin::Buffer());
buf->resize(size);
arraybuf abuf(buf->data(), buf->size());
packetHeader.size = dataSize;
packetHeader.SIDSize[NetworkCode] = packet->networkCode.size();
packetHeader.SIDSize[StationCode] = packet->stationCode.size();
packetHeader.SIDSize[LocationCode] = packet->locationCode.size();
packetHeader.SIDSize[ChannelCode] = packet->channelCode.size();
header.version = version;
header.put(abuf);
packetHeader.put(abuf);
abuf.sputn(packet->networkCode.c_str(), packetHeader.SIDSize[NetworkCode]);
abuf.sputn(packet->stationCode.c_str(), packetHeader.SIDSize[StationCode]);
abuf.sputn(packet->locationCode.c_str(), packetHeader.SIDSize[LocationCode]);
abuf.sputn(packet->channelCode.c_str(), packetHeader.SIDSize[ChannelCode]);
packet->record->put(abuf);
packet->buffer = buf;
}
void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) {
// Brute-force algorithm, should be replaced by something smarter such as
// binary search. A list might be the wrong container for that.
Plugin::BackfillingBuffer::iterator it;
size_t pos = 0;
for ( it = buf.begin(); it != buf.end(); ++it, ++pos ) {
if ( (*it)->record->endTime() > packet->record->endTime() ) {
buf.insert(it, packet);
CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %zu, new "
"size: %zu, time window: %s~%s",
pos, buf.size(),
buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str());
//dump(buf);
return;
}
}
buf.push_back(packet);
CAPS_DEBUG("Backfilling buffer: packet appended, new size: %zu, time window: %s~%s",
buf.size(),
buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str());
//dump(buf);
}
#endif
Time getLastSampleTime(DataRecord *rec) {
if ( !rec ) {
return Time();
}
const DataRecord::Header *header = rec->header();
if ( header->samplingFrequencyNumerator != 0 &&
header->samplingFrequencyDenominator != 0 ) {
TimeSpan samplingInterval = header->samplingFrequencyDenominator / header->samplingFrequencyNumerator;
return rec->endTime() - samplingInterval;
}
return rec->endTime();
}
}
Plugin::Plugin(const string &name, const string &options,
const string &description)
: _name(name)
, _options(options)
, _description(description)
, _bufferSize(1 << 14)
, _bytesBuffered(0)
, _sendTimeout(60)
, _isExitRequested(false)
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
, _backfillingBufferSize(0)
#endif
{
_url.host = "localhost";
_url.port = 18003;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
_lastWrite = Time::GMT();
_journalDirty = false;
_flushInterval = 10;
#endif
_closed = false;
_wasConnected = false;
_ackTimeout = 5;
_lastAckTimeout = 5;
_encoderFactory = NULL;
_maxFutureEndTime = TimeSpan(120, 0);
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = false;
#endif
_dumpPackets = false;
}
Plugin::~Plugin() {
if ( !_closed ) close();
setEncoderFactory(NULL);
}
void Plugin::close() {
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
_lastWrite = Time();
#endif
_closed = true;
for ( StreamStates::iterator it = _states.begin();
it != _states.end(); ++it ) {
StreamState &state = it->second;
while ( !state.backfillingBuffer.empty() ) {
PacketPtr &ref_pkt = state.backfillingBuffer.front();
state.lastCommitEndTime = ref_pkt->record->endTime();
encodePacket(ref_pkt);
state.backfillingBuffer.pop_front();
}
}
flushEncoders();
sendBye();
CAPS_INFO("Closing connection to CAPS at %s:%d", _url.host.c_str(), _url.port);
while ( !_packetBuffer.empty() && readResponse(_lastAckTimeout) );
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
updateJournal();
#endif
disconnect();
CAPS_INFO("Closed connection to CAPS at %s:%d", _url.host.c_str(), _url.port);
_packetBuffer.clear();
_wasConnected = false;
}
void Plugin::quit() {
_isExitRequested = true;
}
bool Plugin::connect() {
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_socket = SocketPtr(_useSSL ? new CAPS::SSLSocket() : new CAPS::Socket());
#else
_socket = SocketPtr(new CAPS::Socket());
#endif
CAPS_INFO("Attempting to connect to CAPS at %s:%d",
_url.host.c_str(), _url.port);
if ( _socket->connect(_url.host, _url.port) != Socket::Success ) {
CAPS_ERROR("Connection failed to CAPS at %s:%d", _url.host.c_str(), _url.port);
return false;
}
// Do handshake
int apiVersion = 0;
if ( !getAPIVersion(apiVersion) ) {
return false;
}
CAPS_INFO("Found CAPS API version %d", apiVersion);
if ( apiVersion >= 5 ) {
if ( !_agent.empty() ) {
_socket->write("AGENT ", 6);
_socket->write(_agent.data(), _agent.size());
_socket->write("\n", 1);
}
}
if ( !_url.user.empty() && !_url.password.empty() ) {
_socket->write("AUTH ", 5);
_socket->write(_url.user.data(), _url.user.size());
_socket->write(" ", 1);
_socket->write(_url.password.data(), _url.password.size());
_socket->write("\n", 1);
}
_socket->setNonBlocking(true);
FD_ZERO(&_readFDs);
FD_ZERO(&_writeFDs);
FD_SET(_socket->fd(), &_readFDs);
FD_SET(_socket->fd(), &_writeFDs);
CAPS_INFO("Connected to CAPS at %s:%d", _url.host.c_str(), _url.port);
_responseBuf[0] = '\0';
_responseBufIdx = 0;
bool result = true;
// Disable packet flushing for the first connection establishment to
// avoid duplicate records
if ( _wasConnected && !flush() ) {
disconnect();
result = false;
}
_wasConnected = true;
return result;
}
void Plugin::disconnect() {
if ( _socket && _socket->isValid() ) {
CAPS_INFO("Disconnect from %s:%d", _url.host.c_str(), _url.port);
_socket->shutdown();
_socket->close();
_responseBuf[0] = '\0';
_responseBufIdx = 0;
}
}
bool Plugin::isConnected() const {
return _socket && _socket->isValid();
}
bool Plugin::readResponse(unsigned int timeout) {
if ( !_socket || !_socket->isValid() ) return false;
_socket->setNonBlocking(true);
bool gotResponse = false;
const int bufN = 512;
char buf[bufN];
int res;
if ( timeout > 0 ) {
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&_readFDs);
FD_SET(_socket->fd(), &_readFDs);
res = select(_socket->fd() + 1, &_readFDs, NULL, NULL, &tv);
if ( res <= 0 )
return gotResponse;
}
while ( true ) {
res = _socket->read(buf, bufN);
if ( res < 0 ) {
if ( errno != EAGAIN && errno != EWOULDBLOCK ) {
CAPS_ERROR("Reading failed: %s: disconnect", strerror(errno));
disconnect();
}
break;
}
else if ( res == 0 ) {
CAPS_INFO("Peer closed connection");
disconnect();
break;
}
char *in = buf;
// Parse the input buffer into the line buffer
for ( int i = 0; i < res; ++i, ++in ) {
if ( *in == '\n' ) {
_responseBuf[_responseBufIdx] = '\0';
// Handle line
if ( (strncasecmp(_responseBuf, "OK ", 3) == 0) && (_responseBufIdx > 3) ) {
// Got OK response
// Read confirmed packets from response
int count = atoi(_responseBuf+3);
CAPS_DEBUG("Acknowledged %d packets, %zu in queue", count, _packetBuffer.size());
// Update packet buffer
for ( int i = 0; i < count; ++i ) {
if ( _packetBuffer.empty() ) {
CAPS_ERROR("Synchronization error: more packages acknowledged than in queue");
break;
}
PacketPtr packet = _packetBuffer.front();
_states[packet->streamID].lastEndTime = packet->record->endTime();
_packetBuffer.pop_front();
_bytesBuffered -= packet->record->dataSize();
_packetBufferDirty = true;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
_journalDirty = true;
#endif
gotResponse = true;
CAPS_DEBUG("Packet acknowledged by CAPS, stream: %s' time window: %s~%s",
packet->streamID.c_str(), packet->record->startTime().iso().c_str(),
packet->record->endTime().iso().c_str());
if ( _packetAckFunc ) {
_packetAckFunc(packet->streamID, packet->record->startTime(), packet->record->endTime());
}
}
CAPS_DEBUG("Packet buffer state: %zu packets, %zu bytes",
_packetBuffer.size(), _bytesBuffered);
}
else if ( (strncasecmp(_responseBuf, "ERROR ", 6) == 0) && (_responseBufIdx > 6) ) {
CAPS_ERROR("%s", _responseBuf + 6);
return false;
}
_responseBuf[0] = '\0';
_responseBufIdx = 0;
}
else {
_responseBuf[_responseBufIdx] = *in;
++_responseBufIdx;
}
}
if ( _packetBuffer.empty() ) break;
}
return gotResponse;
}
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
void Plugin::updateJournal() {
Time currentTime = Time::GMT();
if ( !_journalFile.empty() && _journalDirty &&
_lastWrite + TimeSpan(_flushInterval) <= currentTime ) {
writeJournal();
_journalDirty = false;
_lastWrite = currentTime;
}
}
bool Plugin::readJournal() {
if ( _journalFile.empty() ) return false;
ifstream ifs;
ifs.open(_journalFile.c_str());
return readJournal(ifs);
}
bool Plugin::readJournal(istream &is) {
_states.clear();
string streamID, strTime;
string line;
Time time;
int lineNumber = 0;
Time now = Time::GMT();
Time maxAllowedEndTime = now + TimeSpan(86400);
while ( getline(is, line) ) {
++lineNumber;
size_t p = line.find(' ');
if ( p == string::npos )
streamID = trim(line);
else {
streamID = trim(line.substr(0, p));
strTime = trim(line.substr(p+1));
}
if ( !strTime.empty() ) {
if ( !time.fromString(strTime.c_str(), "%FT%T.%fZ") ) {
CAPS_ERROR("journal:%d: Invalid time: %s", lineNumber, strTime.c_str());
return false;
}
}
else
time = Time();
if ( time > maxAllowedEndTime ) {
CAPS_WARNING("journal:%d:%s: Timestamp %s is more than one day "
"ahead of current time, respecting it nevertheless.",
lineNumber, streamID.c_str(), time.iso().c_str());
}
_states[streamID].lastEndTime = time;
}
return true;
}
#endif
bool Plugin::flush() {
CAPS_INFO("Flushing %zu queued packets", _packetBuffer.size());
PacketBuffer::iterator it = _packetBuffer.begin();
while ( it != _packetBuffer.end() && !_isExitRequested ) {
PacketPtr packet = *it;
_packetBufferDirty = false;
if ( !sendPacket(packet.get() ) && !_isExitRequested ) {
if ( _packetBufferDirty ) {
CAPS_ERROR("Uh oh, buffer dirty but sending failed!");
}
return false;
}
if ( readResponse() ) {
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
updateJournal();
#endif
}
if ( _packetBufferDirty ) {
it = std::find(_packetBuffer.begin(), _packetBuffer.end(), packet);
if ( it != _packetBuffer.end() )
++it;
else {
CAPS_DEBUG("Last packet removed, reset flush queue iterator");
it = _packetBuffer.begin();
}
}
else
++it;
}
return true;
}
void Plugin::flushEncoders() {
CAPS_INFO("Flushing %zu encoders", _encoderItems.size());
EncoderItems::iterator it = _encoderItems.begin();
while ( it != _encoderItems.end() ) {
Encoder *encoder = it->second.encoder.get();
if ( encoder ) {
encoder->flush();
PacketPtr encodedPacket;
while ( (encodedPacket = encoder->pop()) ) {
commitPacket(encodedPacket);
}
}
it++;
}
}
Plugin::Status Plugin::push(const string &net, const string &sta,
const string &loc, const string &cha,
const Time &stime,
uint16_t numerator, uint16_t denominator,
const string &uom,
void *data, size_t count, DataType dt,
int timingQuality) {
uint8_t dtSize = dataTypeSize(dt);
if ( dtSize == 0 ) return Plugin::PacketLoss;
RawDataRecord *rec = new RawDataRecord();
rec->setStartTime(stime);
rec->setSamplingFrequency(numerator, denominator);
rec->setDataType(dt);
rec->setBuffer(static_cast<char*>(data), dtSize * count);
return push(net, sta, loc, cha, DataRecordPtr(rec), uom, timingQuality);
}
#if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY
Plugin::Status Plugin::push(const string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
char *data, size_t count) {
AnyDataRecord *rec = new AnyDataRecord;
rec->setStartTime(stime);
rec->setEndTime(stime);
rec->setSamplingFrequency(numerator, denominator);
rec->setType(format.c_str());
rec->setData(data, count);
return push(net, sta, loc, cha, DataRecordPtr(rec), "px");
}
Plugin::Status Plugin::push(const string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
const std::string &str) {
return push(net, sta, loc, cha, stime, numerator, denominator, format,
const_cast<char*>(&str[0]), str.size());
}
#endif
void Plugin::tryFlushBackfillingBuffer(StreamState &state) {
if ( state.backfillingBuffer.empty() )
return;
size_t flushed = 0;
while ( !state.backfillingBuffer.empty() ) {
PacketPtr &ref_pkt = state.backfillingBuffer.front();
TimeSpan gap = ref_pkt->record->startTime() - state.lastCommitEndTime;
int64_t dt_us = static_cast<int64_t>(gap.seconds()) * 1000000 + gap.microseconds();
// A gap larger than one sample?
if ( dt_us >= ref_pkt->dt_us ) break;
state.lastCommitEndTime = ref_pkt->record->endTime();
encodePacket(ref_pkt);
state.backfillingBuffer.pop_front();
++flushed;
//dump(state.backfillingBuffer);
}
if ( flushed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: %zu, time window: %s~%s",
flushed, state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str());
}
else {
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: 0",
flushed);
}
}
}
void Plugin::trimBackfillingBuffer(StreamState &state) {
if ( state.backfillingBuffer.empty() )
return;
size_t trimmed = 0;
while ( !state.backfillingBuffer.empty() ) {
TimeSpan diff = state.backfillingBuffer.back()->record->endTime() -
state.backfillingBuffer.front()->record->startTime();
if ( diff.seconds() <= _backfillingBufferSize )
break;
PacketPtr &ref_pkt = state.backfillingBuffer.front();
state.lastCommitEndTime = ref_pkt->record->endTime();
encodePacket(ref_pkt);
state.backfillingBuffer.pop_front();
}
if ( trimmed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: %zu, time window: %s~%s",
trimmed, state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str());
}
else {
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: 0",
trimmed);
}
}
}
Plugin::Status Plugin::push(const string &net, const string &sta,
const string &loc, const string &cha,
DataRecordPtr rec, const string &uom,
int timingQuality) {
static bool showVersion = true;
if ( showVersion ) {
CAPS_NOTICE("LIB CAPS version %s", version());
showVersion = false;
}
if ( rec == NULL ) return PacketNotValid;
if ( rec->dataSize() > _bufferSize ) return PacketSize;
if ( !rec->endTime().valid() ) return PacketNotValid;
Time endTime = getLastSampleTime(rec.get());
Time maxAllowedEndTime = Time::GMT() + TimeSpan(_maxFutureEndTime);
if ( endTime > maxAllowedEndTime ) {
CAPS_WARNING("%s.%s.%s.%s: Future time stamp detected: Packet end time %s exceeds "
"max allowed end time %s. Discard packet.",
net.c_str(), sta.c_str(), loc.c_str(), cha.c_str(),
rec->endTime().iso().c_str(),
maxAllowedEndTime.iso().c_str());
return MaxFutureEndTimeExceeded;
}
const DataRecord::Header *header = rec->header();
PacketPtr packet(new Packet(rec, net, sta, loc, cha));
packet->uom = uom;
packet->dataType = header->dataType;
packet->timingQuality = timingQuality;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
if ( header->samplingFrequencyNumerator == 0 ) {
CAPS_DEBUG("detected division by zero, invalid sampling frequency numerator");
return PacketNotValid;
}
packet->dt_us = ((int64_t)header->samplingFrequencyDenominator * 1000000) /
header->samplingFrequencyNumerator;
#endif
StreamState &state = _states[packet->streamID];
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
// If buffering is enabled
if ( _backfillingBufferSize > 0 ) {
if ( state.lastCommitEndTime.valid() ) {
int64_t dt_us = ((int64_t)rec->header()->samplingFrequencyDenominator * 1000000) /
rec->header()->samplingFrequencyNumerator;
TimeSpan gap = packet->record->startTime() - state.lastCommitEndTime;
// A gap larger than one sample?
if ( ((int64_t)gap.seconds()*1000000+gap.microseconds()) >= dt_us ) {
CAPS_DEBUG("detected gap on stream: %s", packet->streamID.c_str());
insertPacket(state.backfillingBuffer, packet);
trimBackfillingBuffer(state);
tryFlushBackfillingBuffer(state);
return Success;
}
}
if ( !encodePacket(packet) )
return PacketLoss;
if ( rec->endTime() > state.lastCommitEndTime) {
state.lastCommitEndTime = rec->endTime();
}
tryFlushBackfillingBuffer(state);
}
else {
#endif
if ( !encodePacket(packet) )
return PacketLoss;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
}
#endif
return Success;
}
void Plugin::serializePacket(Packet *packet) {
if ( packet->buffer ) return;
size_t dataSize = packet->record->dataSize();
bool isV1 = dataSize < (1 << 16);
if ( isV1 ) {
fillPacket<PacketHeaderV1>(packet, 1);
}
else {
fillPacket<PacketHeaderV2>(packet, 2);
}
}
bool Plugin::commitPacket(PacketPtr packet) {
if ( _dumpPackets ) {
serializePacket(packet.get());
dumpPacket(packet.get());
return true;
}
// Initial connect
if ( !_wasConnected && !_socket ) {
while ( !connect() && !_isExitRequested ) {
disconnect();
wait();
}
if ( !isConnected() ) return false;
}
if ( _bytesBuffered >= _bufferSize ) {
CAPS_DEBUG("Packet buffer is full (%zu/%zu bytes), "
"waiting for server ack messages",
_bytesBuffered, _bufferSize);
while ( _bytesBuffered >= _bufferSize ) {
if ( !readResponse(_ackTimeout) ) {
CAPS_WARNING("Packet buffer was full (%zu/%zu bytes), "
"did not receive ack within %d seconds",
_bytesBuffered, _bufferSize,
_ackTimeout);
disconnect();
while ( !_isExitRequested && !_socket->isValid() ) {
wait();
disconnect();
connect();
}
if ( _isExitRequested )
break;
}
}
CAPS_DEBUG("%zu/%zu bytes buffered after force wait",
_bytesBuffered, _bufferSize);
if ( _bytesBuffered >= _bufferSize )
return false;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
CAPS_DEBUG("Force journal update");
#endif
}
// Serialize data record
serializePacket(packet.get());
CAPS_DEBUG("+ buffer state: %zu packets, %zu bytes",
_packetBuffer.size(), _bytesBuffered);
while ( !sendPacket(packet.get() ) ) {
CAPS_ERROR("Sending failed: %s", _isExitRequested ? "abort" : "reconnect");
readResponse();
disconnect();
while ( !_isExitRequested && !isConnected() ) {
wait();
disconnect();
connect();
}
if ( !isConnected() )
return false;
}
_packetBuffer.push_back(packet);
_bytesBuffered += packet->record->dataSize();
_stats.maxBytesBuffered = max(_bytesBuffered, _stats.maxBytesBuffered);
if ( readResponse() ) {
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
updateJournal();
#endif
}
return true;
}
Encoder* Plugin::getEncoder(PacketPtr packet) {
EncoderItems::iterator it = _encoderItems.find(packet->streamID);
if ( it == _encoderItems.end() ) {
EncoderItem item;
it = _encoderItems.insert(make_pair(packet->streamID, item)).first;
}
DataRecord *record = packet->record.get();
const DataRecord::Header *header = record->header();
EncoderItem *item = &it->second;
if ( item->encoder != NULL ) {
/* Before we can feed data into the encoder we
have to check if the sampling frequency
or data type have been changed. Also gaps
must be handled.
*/
const SPClock &clk = item->encoder->clk();
bool needsFlush = false;
if ( clk.freqn != header->samplingFrequencyNumerator ||
clk.freqd != header->samplingFrequencyDenominator ) {
needsFlush = true;
}
else if ( record->startTime() != clk.getTime(0) ) {
needsFlush = true;
}
else if ( item->dataType != header->dataType ) {
needsFlush = true;
}
else if ( item->encoder->timingQuality() != packet->timingQuality ) {
needsFlush = true;
}
if ( needsFlush ) {
item->encoder->flush();
PacketPtr encodedPacket;
while ( (encodedPacket = item->encoder->pop()) ) {
// TODO check return value?
commitPacket(encodedPacket);
}
item->encoder = EncoderPtr();
}
}
if ( item->encoder == NULL ) {
Encoder *encoder =
_encoderFactory->create(packet->networkCode, packet->stationCode,
packet->locationCode, packet->channelCode,
header->dataType,
header->samplingFrequencyNumerator,
header->samplingFrequencyDenominator);
if ( encoder != NULL ) {
encoder->setStartTime(record->startTime());
encoder->setTimingQuality(packet->timingQuality);
item->encoder = EncoderPtr(encoder);
item->dataType = header->dataType;
}
}
return item->encoder.get();
}
bool Plugin::encodePacket(PacketPtr packet) {
if ( _encoderFactory == NULL ||
!_encoderFactory->supportsRecord(packet->record.get()) )
return commitPacket(packet);
Buffer *buffer = packet->record->data();
if ( buffer == NULL ) return commitPacket(packet);
Encoder *encoder = getEncoder(packet);
if ( encoder == NULL ) return commitPacket(packet);
for ( size_t i = 0; i < buffer->size(); i = i + dataTypeSize(packet->dataType) ) {
char *data = buffer->data() + i;
encoder->push(data);
}
PacketPtr encodedPacket;
while ( (encodedPacket = encoder->pop()) ) {
// TODO check return value?
commitPacket(encodedPacket);
}
return true;
}
void Plugin::setEncoderFactory(EncoderFactory *factory) {
if ( _encoderFactory ) {
delete _encoderFactory;
_encoderFactory = NULL;
}
_encoderFactory = factory;
}
void Plugin::sendBye() {
if ( _socket == NULL ) return;
PacketDataHeader header;
memset(reinterpret_cast<char*>(&header), 0, header.dataSize());
_socket->setNonBlocking(false);
send((char*)&header, header.dataSize(), _sendTimeout);
}
bool Plugin::sendPacket(Packet *packet) {
if ( !_packetBuffer.empty() ) {
readResponse();
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
updateJournal();
#endif
}
// Send packet data header and packet header
_socket->setNonBlocking(false);
// Instead of directly write data to the socket we call
// a wrapper function that checks if we can write data or not.
// If we do not do the check the system send operation
// hangs when the TCP buffer is full and the connection is
// is in a undefined state.
// The check and the system send operation run in a loop until
// the data has been sent successfully or the user requests to
// to stop => The function call might block.
if ( !send(packet->buffer->data(), packet->buffer->size(), _sendTimeout) ) {
return false;
}
CAPS_DEBUG("Packet sent to CAPS, stream: %s, time window: %s~%s",
packet->streamID.c_str(), packet->record->startTime().iso().c_str(),
packet->record->endTime().iso().c_str());
return true;
}
void Plugin::wait() {
// Wait 5 seconds and keep response latency low
for ( int i = 0; (i < 10) && !_isExitRequested; ++i )
usleep(500000);
}
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool Plugin::writeJournal() {
//if ( _states.empty() ) return true;
if ( !_journalFile.empty() ) {
FS_DECLARE_PATH(path, _journalFile)
string filename = ".journal.tmp";
if ( FS_HAS_PARENT_PATH(path) ) {
createPath(FS_PARENT_PATH(path).string());
filename = (FS_PARENT_PATH(path) / FS_PATH(filename)).string();
}
ofstream ofs(filename.c_str());
if ( !ofs.is_open() ) return false;
if ( writeJournal(ofs) ) {
ofs.close();
if ( FS_HAS_PARENT_PATH(path) ) {
createPath(FS_PARENT_PATH(path).string());
}
if ( rename(filename.c_str(), path.string().c_str()) != 0 )
CAPS_ERROR("Failed to create journal %s, %s(%d)", _journalFile.c_str(),
strerror(errno), errno);
}
else {
ofs.close();
try {
fs::remove(filename);
}
catch ( ... ) {}
}
}
else
return writeJournal(cout);
return false;
}
bool Plugin::writeJournal(ostream &os) {
for ( StreamStates::iterator it = _states.begin();
it != _states.end(); ++it ) {
os << it->first << " " << it->second.lastEndTime.iso() << endl;
if ( !os.good() ) return false;
}
return true;
}
#endif
void Plugin::enableLogging() {
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_ERROR, LogError);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_WARNING, LogWarning);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_NOTICE, LogNotice);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug);
}
bool Plugin::send(char *data, int len, int timeout) {
if ( !_socket->isValid() ) {
return false;
}
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&_writeFDs);
FD_SET(_socket->fd(), &_writeFDs);
int res = select(_socket->fd() + 1, NULL, &_writeFDs, NULL, &tv);
if ( res > 0 ) {
res = _socket->write(data, len);
if ( res == len ) {
return true;
}
else if ( res == -1 ) {
CAPS_ERROR("Failed to send data: %s. "
"Forcing reconnect and trying to send data again.",
strerror(errno));
}
else {
CAPS_ERROR("Incomplete send operation: "
"Only %d/%d bytes have been sent. "
"Forcing reconnect and trying to send data again.",
len, res);
}
}
else if ( !res ) {
CAPS_ERROR("Detected hanging TCP connection. "
"Forcing reconnect and trying to send data again.");
}
else {
CAPS_ERROR("Send error: %s", strerror(errno));
}
return false;
}
bool Plugin::setAddress(const string &addr, uint16_t defaultPort) {
if ( !_url.parse(addr, defaultPort) ) {
CAPS_ERROR("Failed to set address: %s",
_url.errorString.c_str());
return false;
}
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = boost::iequals(_url.protocol, "capss");
#endif
return true;
}
void Plugin::dumpPacket(Packet *packet) {
if ( packet->record->packetType() == RawDataPacket ) {
dumpSList(packet);
}
else if ( packet->record->packetType() == MSEEDPacket ) {
MSEEDDataRecord *rec = static_cast<MSEEDDataRecord*>(packet->record.get());
cout.write(rec->data()->data(),
static_cast<streamsize>(rec->data()->size()));
}
else {
cout << "Could not dump packet: Unsupported packet type '"
<< packet->record->packetType() << "'";
}
}
void Plugin::dumpPackets(bool enable) {
_dumpPackets = enable;
}
void Plugin::setAgent(const std::string &agent) {
_agent = agent;
}
bool Plugin::getAPIVersion(int &version) {
version = 0;
int bytesSent = _socket->send(HELLO_REQUEST);
if ( bytesSent != strlen(HELLO_REQUEST) ) {
CAPS_ERROR("Could not get CAPS API version: %s",
strerror(errno));
return false;
}
int32_t msgLength = 0;
socketbuf<Socket, 512> buf(_socket.get());
streamsize bytesRead = buf.sgetn(reinterpret_cast<char*>(&msgLength),
sizeof(int32_t));
if ( bytesRead != sizeof(int32_t) ) {
CAPS_ERROR("Could not get CAPS API version: Expected message length in "
"server response: %s", strerror(errno));
return false;
}
msgLength = Endianess::Converter::FromLittleEndian(msgLength);
// Check if the hello response comes from a CAPS server with
// API version < 5
constexpr int32_t OKTag = 0x30204b4f;
if ( msgLength == OKTag ) {
return true;
}
buf.set_read_limit(msgLength);
bool ret = true;
iostream is(&buf);
string line;
while ( getline(is, line) ) {
size_t pos = line.find("API=");
if ( pos == string::npos ) {
continue;
}
string apiStr = line.substr(4);
if ( !str2int(version, apiStr.c_str()) ) {
CAPS_ERROR("Invalid CAPS API version: Expected number");
ret = false;
}
}
return ret;
}
}
}