Files
libcapsclient/libs/gempa/caps/pluginapplication.cpp
2026-03-18 15:18:05 +01:00

563 lines
16 KiB
C++

/***************************************************************************
* Copyright (C) 2015 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#define SEISCOMP_COMPONENT PluginApplication
#include <gempa/caps/pluginapplication.h>
#include <gempa/caps/encoderfactory.h>
#include <gempa/caps/log.h>
#include <gempa/caps/utils.h>
#include <seiscomp/core/system.h>
#include <seiscomp/logging/log.h>
#include <boost/algorithm/string.hpp>
#include <seiscomp/system/environment.h>
#include <cstdlib>
#include <filesystem>
namespace fs = std::filesystem;
namespace sc = Seiscomp::Core;
namespace {
#ifdef SEISCOMP_LOG_VA
#define LOG_CAPS_CHANNEL(out, fmt) \
va_list ap;\
va_start(ap, fmt);\
out(fmt, ap);\
va_end(ap)
#else
#define LOG_CAPS_CHANNEL(out, fmt) \
va_list ap;\
va_start(ap, fmt);\
fprintf(stderr, #out" "); vfprintf(stderr, fmt, ap); fprintf(stderr, "\n");\
va_end(ap)
#endif
void LogError(const char *fmt, ...) {
LOG_CAPS_CHANNEL(SEISCOMP_VERROR, fmt);
}
void LogWarning(const char *fmt, ...) {
LOG_CAPS_CHANNEL(SEISCOMP_VWARNING, fmt);
}
void LogNotice(const char *fmt, ...) {
LOG_CAPS_CHANNEL(SEISCOMP_VNOTICE, fmt);
}
void LogInfo(const char *fmt, ...) {
LOG_CAPS_CHANNEL(SEISCOMP_VINFO, fmt);
}
void LogDebug(const char *fmt, ...) {
LOG_CAPS_CHANNEL(SEISCOMP_VDEBUG, fmt);
}
const size_t MIN_BUFFER_SIZE = 1024*16;
bool readKeyValueFile(std::map<std::string, std::string> &data,
const std::string &filename, const char *sep) {
std::ifstream ifs(filename.c_str());
if ( !ifs.is_open() ) {
return false;
}
std::string line;
while ( getline(ifs, line) ) {
const char* str = line.c_str();
int len = strlen(str);
const char *tok = nullptr;
int tok_len = 0;
int tok_count = 0;
std::string key;
for ( ; (tok = Gempa::CAPS::tokenize(str, sep, len, tok_len)) != nullptr;
++tok_count ) {
Gempa::CAPS::trim(tok, tok_len);
if ( tok_count == 0 ) {
if ( len == 0) {
break;
}
key.assign(tok, tok_len);
}
else if ( tok_count == 1 ) {
data.insert({key, std::string(tok, tok_len)});
}
}
}
return true;
}
int64_t getAvailableMemory() {
std::map<std::string, std::string> data;
if ( !readKeyValueFile(data, "/proc/meminfo", ":") ) {
return -1;
}
auto it = data.find("MemAvailable");
if ( it == data.end() ) {
return -1;
}
std::vector<std::string> tokens;
sc::split(tokens, it->second.c_str(), "kB");
if ( tokens.empty() ) {
return -1;
}
sc::trim(tokens[0]);
int64_t availableMemory = 0;
sc::fromString(availableMemory, tokens[0]);
return availableMemory;
}
}
namespace Gempa::CAPS {
PluginApplication::PluginApplication(int argc, char **argv, const std::string &name)
: Seiscomp::Client::StreamApplication(argc, argv)
, _plugin(Plugin(name)) {
fs::path path("@ROOTDIR@/var/run/" + SCCoreApp->name());
_journalFile = (path / "journal").string();
// By default we disable the acquisition autostart because not all plugins
// require this feature. It must be enabled explicitly if required.
setAutoAcquisitionStart(false);
setRecordStreamEnabled(true);
}
void PluginApplication::createCommandLineDescription() {
Seiscomp::Client::StreamApplication::createCommandLineDescription();
commandline().addGroup("Output");
commandline().addOption("Output", "output,O",
"Data output address. Format:\n"
"[[caps|capss]://][user:pass@]host[:port]", &_strAddr);
commandline().addOption("Output", "buffer-size,b",
"Size (bytes) of the packet buffer", &_bufferSize);
commandline().addOption("Output", "backfilling",
"Enable backfilling for out-of-order records. The backfilling buffer size is "
"in seconds", &_backfillingBufferSize);
commandline().addOption("Output", "mseed", "Enable on-the-fly MiniSeed "
"encoding. If the encoder does not support the input"
"type of a packet it will be forwarded. Re encoding of"
"MiniSEED packets is not supported.");
commandline().addOption("Output", "encoding", "MiniSEED encoding to use. (Uncompressed, Steim1 or Steim2)",
&_strMseedEncoding);
commandline().addOption("Output", "rec-len", "MiniSEED record length expressed as a power of 2."
"A 512 byte record would be 9.",
&_mseedRecordLength);
commandline().addOption("Output", "max-future-endtime",
"Maximum allowed relative end time for packets. If "
"the packet end time is greater than the current time plus this "
"value the packet will be discarded. By default this value is set to 120 seconds. "
"A negative value disables the check.",
&_maxFutureEndTime);
commandline().addOption("Output", "dump-packets", "Dump packets to stdout");
commandline().addGroup("Journal");
commandline().addOption("Journal", "journal,j",
"File to store stream states. Use an empty string to disable this feature.", &_journalFile);
commandline().addOption("Journal", "flush",
"Flush stream states every n seconds to disk", &_flushInterval);
commandline().addOption("Journal", "wait-for-ack",
"Wait when a sync has been forced, up to n seconds", &_ackTimeout);
commandline().addOption("Journal", "wait-for-last-ack,w",
"Wait on shutdown to receive acknownledgement messages, up to n seconds", &_lastAckTimeout);
commandline().addGroup("Status");
commandline().addOption("Status", "status-log", "Log information status "
"information e.g. max bytes buffered");
commandline().addOption("Status", "status-flush", "Flush status every n "
"seconds to disk",
&_statusFlushInterval);
commandline().addGroup("Host");
commandline().addOption("Host", "host-storage",
"Determine disc capacity and available space from this path",
&_host.storage);
commandline().addOption("Host", "host-os",
"Set host operating system information",
&_host.os);
}
void PluginApplication::done() {
LogInfo("Statistics of transmitted data:\n"
" records : %d\n"
" samples : %d\n"
" gaps : %d\n"
" start time: %s\n"
" end time : %s\n"
" files : %d",
_stats.records, _stats.samples, _stats.gaps,
_stats.startTime.valid()?_stats.startTime.iso().c_str():"",
_stats.endTime.valid()?_stats.endTime.iso().c_str():"",
_stats.files);
_plugin.close();
Seiscomp::Client::StreamApplication::done();
}
void PluginApplication::exit(int returnCode) {
Seiscomp::Client::StreamApplication::exit(returnCode);
_plugin.quit();
}
void PluginApplication::handleTimeout() {
auto time = Time::GMT();
auto seconds = time.seconds();
if ( _logStatus && (seconds % _statusFlushInterval == 0) ) {
Plugin::Stats stats = _plugin.stats();
_statusFile.stream() << time.toLocalTime().toString("%Y/%m/%d %T") << " "
<< stats.maxBytesBuffered << std::endl;
_plugin.resetMaxBytesBuffered();
}
if ( seconds % 3 == 0 ) {
updateRuntimeInfo();
}
if ( seconds % 10 == 0 ) {
sendRuntimeInfo();
}
}
bool PluginApplication::init() {
if ( !Seiscomp::Client::StreamApplication::init() ) {
return false;
}
// Setup log handlers
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_ERROR, LogError);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_WARNING, LogWarning);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_NOTICE, LogNotice);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug);
Plugin::HostInfo hostInfo;
if ( !getHostInfo(hostInfo) ) {
return false;
}
_plugin.setBufferSize(_bufferSize);
_plugin.setFlushInterval(_flushInterval);
_plugin.setConnectionTimeout(_connectionTimeout);
_plugin.setTimeouts(_ackTimeout, _lastAckTimeout, _sendTimeout);
_plugin.setMaxFutureEndTime(_maxFutureEndTime);
_plugin.dumpPackets(_dumpPackets);
_plugin.setHostInfo(hostInfo);
LogInfo("CAPS connection settings\n"
" Output CAPS server : %s\n"
" Buffer size : %zu bytes\n"
" Backfilling buffer size : %zu s\n"
" Max future end time : %d s\n"
" Connection timeout : %d s\n"
" Timeouts Ack/LastAck/Send : %d s/%d s/%d s\n"
" Agent : %s\n"
" Agent version : %s",
_strAddr.c_str(), _bufferSize, _backfillingBufferSize, _maxFutureEndTime,
_connectionTimeout, _ackTimeout, _lastAckTimeout, _sendTimeout,
hostInfo.agent.data(), hostInfo.agentVersion.data());
if ( _mseedEnabled ) {
MSEEDEncoderFactory *factory = nullptr;
if ( _mseedEncoding == Uncompressed ) {
SEISCOMP_INFO("Output stream encoding set to MiniSEED/Uncompressed");
factory = new IdentityEncoderFactory();
_plugin.setEncoderFactory(factory);
}
else if ( _mseedEncoding == Steim1 ) {
SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim1");
factory = new Steim1EncoderFactory();
_plugin.setEncoderFactory(factory);
}
else if ( _mseedEncoding == Steim2 ) {
SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim2");
factory = new Steim2EncoderFactory();
_plugin.setEncoderFactory(factory);
}
else {
SEISCOMP_ERROR("Unsupported MiniSEED encoding");
return false;
}
if ( !factory->setRecordLength(_mseedRecordLength) ) {
SEISCOMP_ERROR("%s", factory->errorString().c_str());
return false;
}
}
else {
SEISCOMP_INFO("MiniSEED encoding is disabled.");
}
if ( _backfillingBufferSize > 0 ) {
_plugin.setBackfillingBufferSize(_backfillingBufferSize);
}
std::string connectionIDFile = "@ROOTDIR@/var/run/" + name() + "/id";
connectionIDFile = Seiscomp::Environment::Instance()->absolutePath(connectionIDFile);
LogInfo("Reading connection ID from %s", connectionIDFile.c_str());
if ( !_plugin.setConnectionIDFile(connectionIDFile) ) {
return false;
}
if ( !_journalFile.empty() ) {
_journalFile = Seiscomp::Environment::Instance()->absolutePath(_journalFile);
// Recover states
LogInfo("Reading journal from %s", _journalFile.c_str());
_plugin.setJournal(_journalFile);
_plugin.readJournal();
LogInfo("Recovered %d streams", (int)_plugin.streamStates().size());
}
if ( _logStatus ) {
std::string filename = Seiscomp::Environment::Instance()->logDir() + "/" + SCCoreApp->name() + "-stats.log";
if ( !_statusFile.open(filename.c_str()) ) {
LogError("Could not open status file %s.", filename.c_str());
return false;
}
}
// This causes a connect
updateRuntimeInfo();
_plugin.setRuntimeInfo(_runtimeInfo);
enableTimer(1);
return true;
}
bool PluginApplication::initConfiguration() {
if ( !Seiscomp::Client::StreamApplication::initConfiguration() ) {
return false;
}
try {
_plugin.setHost(configGetString("output.host"));
}
catch ( ... ) {
}
try {
_plugin.setPort(configGetInt("output.port"));
}
catch ( ... ) { }
try { _sendTimeout = configGetInt("output.timeout"); }
catch ( ... ) { }
try { _connectionTimeout = configGetInt("output.connectionTimeout"); }
catch ( ... ) { }
try {
std::string addr = configGetString("output.address");
if ( !_plugin.setAddress(addr) ) {
return false;
}
}
catch ( ... ) {}
try {
_mseedEnabled = configGetBool("output.mseed.enable");
}
catch ( ... ) {}
try {
int length = configGetInt("output.mseed.recordLength");
if ( length < 0 ) {
SEISCOMP_ERROR("'output.mseed.recordLength' must be a positive integer");
return false;
}
_mseedRecordLength = uint(length);
}
catch ( ... ) {}
try {
std::string str = configGetString("output.mseed.encoding");
if ( !fromString(_mseedEncoding, str)) {
return false;
}
}
catch ( ... ) {}
try { _bufferSize = configGetInt("output.bufferSize"); }
catch ( ... ) { }
try { _backfillingBufferSize = configGetInt("output.backfillingBufferSize"); }
catch ( ... ) { }
try { _maxFutureEndTime = configGetInt("output.maxFutureEndTime"); }
catch ( ... ) { }
try { _journalFile = configGetString("journal.file"); }
catch ( ... ) {}
try { _flushInterval = configGetInt("journal.flush"); }
catch ( ... ) { }
try { _ackTimeout = configGetInt("journal.waitForAck"); }
catch ( ... ) { }
try { _lastAckTimeout = configGetInt("journal.waitForLastAck"); }
catch ( ... ) { }
_host.agent = name();
try { _host.storage = configGetString("host.storage"); }
catch ( ... ) { }
try { _host.os = configGetString("host.os"); }
catch ( ... ) { }
try { _logStatus = configGetBool("statusLog.enable"); }
catch ( ... ) { }
try { _statusFlushInterval = configGetInt("statusLog.flush"); }
catch ( ... ) {}
return true;
}
bool PluginApplication::validateParameters() {
if ( !Seiscomp::Client::StreamApplication::validateParameters() ) {
return false;
}
if ( commandline().hasOption("mseed") ) {
_mseedEnabled = true;
}
if ( commandline().hasOption("status-log") ) {
_logStatus = true;
}
if ( commandline().hasOption("dump-packets") ) {
_dumpPackets = true;
}
if ( commandline().hasOption("encoding") ) {
if ( !fromString(_mseedEncoding, _strMseedEncoding)) {
return false;
}
}
if ( _bufferSize < MIN_BUFFER_SIZE ) {
SEISCOMP_ERROR("The plugin buffer size must be at least %ld bytes.",
MIN_BUFFER_SIZE);
return false;
}
if ( commandline().hasOption("output") ) {
if ( !_plugin.setAddress(_strAddr, 18003) ) {
return false;
}
}
_host.storage = Seiscomp::Environment::Instance()->absolutePath(_host.storage);
return true;
}
bool PluginApplication::fromString(MseedEncoding &enc, std::string str) {
boost::to_lower(str);
if( str == "uncompressed" ) {
enc = Uncompressed;
}
else if ( str == "steim1" ) {
enc = Steim1;
}
else if ( str == "steim2" ) {
enc = Steim2;
}
else {
SEISCOMP_ERROR("Unsupported encoding %s", str.c_str());
return false;
}
return true;
}
bool PluginApplication::getHostInfo(Plugin::HostInfo &hostInfo) {
hostInfo.agent = _host.agent;
hostInfo.agentVersion = version() ? version() : "";
hostInfo.totalMem = _hostInfo.totalMemory();
try {
auto spaceInfo = fs::space(_host.storage);
hostInfo.totalDisc = spaceInfo.capacity / 1024;
}
catch ( const fs::filesystem_error& e ) {
SEISCOMP_ERROR("Failed to determine filesystem information: %s", e.what());
return false;
}
hostInfo.os = _host.os;
if ( hostInfo.os.empty() ) {
std::map<std::string, std::string> osRelease;
if ( !readKeyValueFile(osRelease, "/etc/os-release", "=") ) {
SEISCOMP_WARNING("Unable to read file /etc/os-release");
}
auto it = osRelease.find("PRETTY_NAME");
if ( it != osRelease.end() ) {
auto unquote = [](const std::string& str) {
if ( str.length() >= 2 && str.front() == '"' && str.back() == '"' ) {
return str.substr(1, str.length() - 2);
}
return str;
};
hostInfo.os = unquote(it->second);
}
}
return true;
}
void PluginApplication::updateRuntimeInfo() {
_runtimeInfo.cpuUsage = std::max(0, static_cast<int>(_hostInfo.getCurrentCpuUsage() * 1000));
_runtimeInfo.procUsedMem = _hostInfo.getCurrentMemoryUsage();
_runtimeInfo.availableMem = getAvailableMemory();
getloadavg(&_runtimeInfo.systemLoad, 1);
try {
auto spaceInfo = fs::space(_host.storage);
_runtimeInfo.availableDisc = spaceInfo.available / 1024;
}
catch ( const fs::filesystem_error& e ) {
SEISCOMP_WARNING("Failed to determine filesystem information: %s", e.what());
}
}
void PluginApplication::sendRuntimeInfo() {
_plugin.setRuntimeInfo(_runtimeInfo);
}
}