Update to version 2

This commit is contained in:
2022-11-18 13:42:30 +01:00
parent 284fef3ec2
commit 8934eeac6b
23 changed files with 5109 additions and 5695 deletions

View File

@@ -13,6 +13,7 @@
***************************************************************************/
#include <gempa/caps/endianess.h>
#include <gempa/caps/log.h>
#include <gempa/caps/plugin.h>
#include <gempa/caps/utils.h>
@@ -43,6 +44,8 @@
#include <boost/filesystem/convenience.hpp>
#endif
#include <boost/algorithm/string.hpp>
#include <cstring>
#include <fstream>
#include <sstream>
@@ -119,6 +122,8 @@ namespace CAPS {
namespace {
#define HELLO_REQUEST "HELLO\n"
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool createPath(const string &dir) {
try {
@@ -141,6 +146,49 @@ bool createPath(const string &dir) {
// }
//}
template<typename T> void dumpSamples(RawDataRecord *rec, size_t sampleCount) {
const vector<T> *data = reinterpret_cast<vector<T>* >(rec->data());
for ( size_t i = 0; i < sampleCount; ++i ) {
cout << data->at(i) << endl;
}
}
void dumpSList(Packet *packet) {
RawDataRecord *rec = static_cast<RawDataRecord*>(packet->record.get());
string id = packet->streamID;
boost::replace_all(id, ".", "_");
string dataType = packet->record->formatName();
boost::replace_all(dataType, "RAW/", "");
DataType dt = rec->header()->dataType;
size_t sampleCount = rec->data()->size() / dataTypeSize(dt);
cout << "TIMESERIES " << id << "_R, "
<< sampleCount << " samples, "
<< static_cast<float>(rec->header()->samplingFrequencyNumerator) / rec->header()->samplingFrequencyDenominator << " sps, "
<< packet->record->startTime().iso() << " SLIST, "
<< dataType << ", " << packet->uom << endl;
if ( dt == DT_INT8 ) {
dumpSamples<int8_t>(rec, sampleCount);
}
else if ( dt == DT_INT16 ) {
dumpSamples<int16_t>(rec, sampleCount);
}
else if ( dt == DT_INT32 ) {
dumpSamples<int32_t>(rec, sampleCount);
}
else if ( dt == DT_INT64 ) {
dumpSamples<int64_t>(rec, sampleCount);
}
else if ( dt == DT_FLOAT ) {
dumpSamples<float>(rec, sampleCount);
}
else if ( dt == DT_DOUBLE ) {
dumpSamples<double>(rec, sampleCount);
}
}
template<typename T> void fillPacket(Packet *packet, uint16_t version) {
PacketDataHeader header;
header.packetType = packet->record->packetType();
@@ -190,9 +238,9 @@ void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) {
for ( it = buf.begin(); it != buf.end(); ++it, ++pos ) {
if ( (*it)->record->endTime() > packet->record->endTime() ) {
buf.insert(it, packet);
CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %lu, new "
"size: %lu, time window: %s~%s", (long unsigned) pos,
(long unsigned) buf.size(),
CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %zu, new "
"size: %zu, time window: %s~%s",
pos, buf.size(),
buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str());
//dump(buf);
@@ -202,8 +250,8 @@ void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) {
buf.push_back(packet);
CAPS_DEBUG("Backfilling buffer: packet appended, new size: %lu, time window: %s~%s",
(long unsigned) buf.size(),
CAPS_DEBUG("Backfilling buffer: packet appended, new size: %zu, time window: %s~%s",
buf.size(),
buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str());
//dump(buf);
@@ -234,14 +282,14 @@ Plugin::Plugin(const string &name, const string &options,
, _description(description)
, _bufferSize(1 << 14)
, _bytesBuffered(0)
, _port(18003)
, _sendTimeout(60)
, _isExitRequested(false)
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
, _backfillingBufferSize(0)
#endif
{
_url.host = "localhost";
_url.port = 18003;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
_lastWrite = Time::GMT();
_journalDirty = false;
@@ -256,6 +304,7 @@ Plugin::Plugin(const string &name, const string &options,
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = false;
#endif
_dumpPackets = false;
}
@@ -286,7 +335,7 @@ void Plugin::close() {
flushEncoders();
sendBye();
CAPS_INFO("Closing connection to CAPS at %s:%d", _host.c_str(), _port);
CAPS_INFO("Closing connection to CAPS at %s:%d", _url.host.c_str(), _url.port);
while ( !_packetBuffer.empty() && readResponse(_lastAckTimeout) );
@@ -295,7 +344,7 @@ void Plugin::close() {
#endif
disconnect();
CAPS_INFO("Closed connection to CAPS at %s:%d", _host.c_str(), _port);
CAPS_INFO("Closed connection to CAPS at %s:%d", _url.host.c_str(), _url.port);
_packetBuffer.clear();
_wasConnected = false;
@@ -313,16 +362,34 @@ bool Plugin::connect() {
#endif
CAPS_INFO("Attempting to connect to CAPS at %s:%d",
_host.c_str(), _port);
if ( _socket->connect(_host, _port) != Socket::Success ) {
CAPS_ERROR("Connection failed to CAPS at %s:%d", _host.c_str(), _port);
_url.host.c_str(), _url.port);
if ( _socket->connect(_url.host, _url.port) != Socket::Success ) {
CAPS_ERROR("Connection failed to CAPS at %s:%d", _url.host.c_str(), _url.port);
return false;
}
// Do handshake
if ( !_user.empty() && !_password.empty() ) {
string auth = "AUTH "+ _user + " " + _password + "\n";
_socket->write(auth.c_str(), auth.length());
int apiVersion = 0;
if ( !getAPIVersion(apiVersion) ) {
return false;
}
CAPS_INFO("Found CAPS API version %d", apiVersion);
if ( apiVersion >= 5 ) {
if ( !_agent.empty() ) {
_socket->write("AGENT ", 6);
_socket->write(_agent.data(), _agent.size());
_socket->write("\n", 1);
}
}
if ( !_url.user.empty() && !_url.password.empty() ) {
_socket->write("AUTH ", 5);
_socket->write(_url.user.data(), _url.user.size());
_socket->write(" ", 1);
_socket->write(_url.password.data(), _url.password.size());
_socket->write("\n", 1);
}
_socket->setNonBlocking(true);
@@ -333,7 +400,7 @@ bool Plugin::connect() {
FD_SET(_socket->fd(), &_readFDs);
FD_SET(_socket->fd(), &_writeFDs);
CAPS_INFO("Connected to CAPS at %s:%d", _host.c_str(), _port);
CAPS_INFO("Connected to CAPS at %s:%d", _url.host.c_str(), _url.port);
_responseBuf[0] = '\0';
_responseBufIdx = 0;
@@ -354,7 +421,7 @@ bool Plugin::connect() {
void Plugin::disconnect() {
if ( _socket && _socket->isValid() ) {
CAPS_INFO("Disconnect from %s:%d", _host.c_str(), _port);
CAPS_INFO("Disconnect from %s:%d", _url.host.c_str(), _url.port);
_socket->shutdown();
_socket->close();
@@ -374,7 +441,7 @@ bool Plugin::readResponse(unsigned int timeout) {
bool gotResponse = false;
#define bufN 512
const int bufN = 512;
char buf[bufN];
int res;
@@ -419,7 +486,7 @@ bool Plugin::readResponse(unsigned int timeout) {
// Read confirmed packets from response
int count = atoi(_responseBuf+3);
CAPS_DEBUG("Acknowledged %d packets, %d in queue", count, (int)_packetBuffer.size());
CAPS_DEBUG("Acknowledged %d packets, %zu in queue", count, _packetBuffer.size());
// Update packet buffer
for ( int i = 0; i < count; ++i ) {
if ( _packetBuffer.empty() ) {
@@ -446,8 +513,12 @@ bool Plugin::readResponse(unsigned int timeout) {
}
}
CAPS_DEBUG("Packet buffer state: %d packets, %d bytes",
(int)_packetBuffer.size(), (int)_bytesBuffered);
CAPS_DEBUG("Packet buffer state: %zu packets, %zu bytes",
_packetBuffer.size(), _bytesBuffered);
}
else if ( (strncasecmp(_responseBuf, "ERROR ", 6) == 0) && (_responseBufIdx > 6) ) {
CAPS_ERROR("%s", _responseBuf + 6);
return false;
}
_responseBuf[0] = '\0';
@@ -529,7 +600,7 @@ bool Plugin::readJournal(istream &is) {
#endif
bool Plugin::flush() {
CAPS_INFO("Flushing %d queued packets", (int)_packetBuffer.size());
CAPS_INFO("Flushing %zu queued packets", _packetBuffer.size());
PacketBuffer::iterator it = _packetBuffer.begin();
while ( it != _packetBuffer.end() && !_isExitRequested ) {
@@ -567,7 +638,7 @@ bool Plugin::flush() {
}
void Plugin::flushEncoders() {
CAPS_INFO("Flushing %d encoders", (int)_encoderItems.size());
CAPS_INFO("Flushing %zu encoders", _encoderItems.size());
EncoderItems::iterator it = _encoderItems.begin();
while ( it != _encoderItems.end() ) {
Encoder *encoder = it->second.encoder.get();
@@ -598,7 +669,7 @@ Plugin::Status Plugin::push(const string &net, const string &sta,
rec->setStartTime(stime);
rec->setSamplingFrequency(numerator, denominator);
rec->setDataType(dt);
rec->setBuffer((char*)data, dtSize * count);
rec->setBuffer(static_cast<char*>(data), dtSize * count);
return push(net, sta, loc, cha, DataRecordPtr(rec), uom, timingQuality);
}
@@ -638,7 +709,7 @@ void Plugin::tryFlushBackfillingBuffer(StreamState &state) {
PacketPtr &ref_pkt = state.backfillingBuffer.front();
TimeSpan gap = ref_pkt->record->startTime() - state.lastCommitEndTime;
int64_t dt_us = (int64_t)gap.seconds()*1000000+gap.microseconds();
int64_t dt_us = static_cast<int64_t>(gap.seconds()) * 1000000 + gap.microseconds();
// A gap larger than one sample?
if ( dt_us >= ref_pkt->dt_us ) break;
@@ -652,15 +723,14 @@ void Plugin::tryFlushBackfillingBuffer(StreamState &state) {
if ( flushed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: %lu, time window: %s~%s",
(long unsigned) flushed,
(long unsigned) state.backfillingBuffer.size(),
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: %zu, time window: %s~%s",
flushed, state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str());
}
else {
CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: 0",
(long unsigned) flushed);
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: 0",
flushed);
}
}
}
@@ -684,15 +754,14 @@ void Plugin::trimBackfillingBuffer(StreamState &state) {
if ( trimmed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: %lu, time window: %s~%s",
(long unsigned) trimmed,
(long unsigned) state.backfillingBuffer.size(),
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: %zu, time window: %s~%s",
trimmed, state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str());
}
else {
CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: 0",
(long unsigned) trimmed);
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: 0",
trimmed);
}
}
}
@@ -792,6 +861,12 @@ void Plugin::serializePacket(Packet *packet) {
}
bool Plugin::commitPacket(PacketPtr packet) {
if ( _dumpPackets ) {
serializePacket(packet.get());
dumpPacket(packet.get());
return true;
}
// Initial connect
if ( !_wasConnected && !_socket ) {
while ( !connect() && !_isExitRequested ) {
@@ -803,12 +878,16 @@ bool Plugin::commitPacket(PacketPtr packet) {
}
if ( _bytesBuffered >= _bufferSize ) {
CAPS_DEBUG("Packet buffer is full (%ld/%ld bytes), "
CAPS_DEBUG("Packet buffer is full (%zu/%zu bytes), "
"waiting for server ack messages",
_bytesBuffered, _bufferSize);
while ( _bytesBuffered >= _bufferSize ) {
if ( !readResponse(_ackTimeout) ) {
CAPS_WARNING("Packet buffer was full (%zu/%zu bytes), "
"did not receive ack within %d seconds",
_bytesBuffered, _bufferSize,
_ackTimeout);
disconnect();
while ( !_isExitRequested && !_socket->isValid() ) {
wait();
@@ -821,7 +900,7 @@ bool Plugin::commitPacket(PacketPtr packet) {
}
}
CAPS_DEBUG("%ld/%ld bytes buffered after force wait",
CAPS_DEBUG("%zu/%zu bytes buffered after force wait",
_bytesBuffered, _bufferSize);
if ( _bytesBuffered >= _bufferSize )
@@ -835,8 +914,8 @@ bool Plugin::commitPacket(PacketPtr packet) {
// Serialize data record
serializePacket(packet.get());
CAPS_DEBUG("+ buffer state: %d packets, %d bytes",
(int)_packetBuffer.size(), (int)_bytesBuffered);
CAPS_DEBUG("+ buffer state: %zu packets, %zu bytes",
_packetBuffer.size(), _bytesBuffered);
while ( !sendPacket(packet.get() ) ) {
CAPS_ERROR("Sending failed: %s", _isExitRequested ? "abort" : "reconnect");
@@ -887,11 +966,11 @@ Encoder* Plugin::getEncoder(PacketPtr packet) {
*/
const SPClock &clk = item->encoder->clk();
bool needsFlush = false;
if( clk.freqn != header->samplingFrequencyNumerator ||
if ( clk.freqn != header->samplingFrequencyNumerator ||
clk.freqd != header->samplingFrequencyDenominator ) {
needsFlush = true;
}
else if ( record->startTime() != clk.get_time(0) ) {
else if ( record->startTime() != clk.getTime(0) ) {
needsFlush = true;
}
else if ( item->dataType != header->dataType ) {
@@ -970,7 +1049,7 @@ void Plugin::sendBye() {
if ( _socket == NULL ) return;
PacketDataHeader header;
memset((char*)&header, 0, header.dataSize());
memset(reinterpret_cast<char*>(&header), 0, header.dataSize());
_socket->setNonBlocking(false);
send((char*)&header, header.dataSize(), _sendTimeout);
@@ -1077,8 +1156,9 @@ void Plugin::enableLogging() {
}
bool Plugin::send(char *data, int len, int timeout) {
if ( !_socket->isValid() )
if ( !_socket->isValid() ) {
return false;
}
struct timeval tv;
tv.tv_sec = timeout;
@@ -1116,5 +1196,94 @@ bool Plugin::send(char *data, int len, int timeout) {
return false;
}
bool Plugin::setAddress(const string &addr, uint16_t defaultPort) {
if ( !_url.parse(addr, defaultPort) ) {
CAPS_ERROR("Failed to set address: %s",
_url.errorString.c_str());
return false;
}
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = boost::iequals(_url.protocol, "capss");
#endif
return true;
}
void Plugin::dumpPacket(Packet *packet) {
if ( packet->record->packetType() == RawDataPacket ) {
dumpSList(packet);
}
else if ( packet->record->packetType() == MSEEDPacket ) {
MSEEDDataRecord *rec = static_cast<MSEEDDataRecord*>(packet->record.get());
cout.write(rec->data()->data(),
static_cast<streamsize>(rec->data()->size()));
}
else {
cout << "Could not dump packet: Unsupported packet type '"
<< packet->record->packetType() << "'";
}
}
void Plugin::dumpPackets(bool enable) {
_dumpPackets = enable;
}
void Plugin::setAgent(const std::string &agent) {
_agent = agent;
}
bool Plugin::getAPIVersion(int &version) {
version = 0;
int bytesSent = _socket->send(HELLO_REQUEST);
if ( bytesSent != strlen(HELLO_REQUEST) ) {
CAPS_ERROR("Could not get CAPS API version: %s",
strerror(errno));
return false;
}
int32_t msgLength = 0;
socketbuf<Socket, 512> buf(_socket.get());
streamsize bytesRead = buf.sgetn(reinterpret_cast<char*>(&msgLength),
sizeof(int32_t));
if ( bytesRead != sizeof(int32_t) ) {
CAPS_ERROR("Could not get CAPS API version: Expected message length in "
"server response: %s", strerror(errno));
return false;
}
msgLength = Endianess::Converter::FromLittleEndian(msgLength);
// Check if the hello response comes from a CAPS server with
// API version < 5
constexpr int32_t OKTag = 0x30204b4f;
if ( msgLength == OKTag ) {
return true;
}
buf.set_read_limit(msgLength);
bool ret = true;
iostream is(&buf);
string line;
while ( getline(is, line) ) {
size_t pos = line.find("API=");
if ( pos == string::npos ) {
continue;
}
string apiStr = line.substr(4);
if ( !str2int(version, apiStr.c_str()) ) {
CAPS_ERROR("Invalid CAPS API version: Expected number");
ret = false;
}
}
return ret;
}
}
}