/*************************************************************************** * Copyright (C) 2015 by gempa GmbH * * * * All Rights Reserved. * * * * NOTICE: All information contained herein is, and remains * * the property of gempa GmbH and its suppliers, if any. The intellectual * * and technical concepts contained herein are proprietary to gempa GmbH * * and its suppliers. * * Dissemination of this information or reproduction of this material * * is strictly forbidden unless prior written permission is obtained * * from gempa GmbH. * ***************************************************************************/ #define SEISCOMP_COMPONENT PluginApplication #include #include #include #include #include #include #include #ifdef SC_GEMPA_SEATTLE #include #else #include #endif using namespace std; namespace sc = Seiscomp::Core; namespace { #ifdef SEISCOMP_LOG_VA #define LOG_CAPS_CHANNEL(out, fmt) \ va_list ap;\ va_start(ap, fmt);\ out(fmt, ap);\ va_end(ap) #else #define LOG_CAPS_CHANNEL(out, fmt) \ va_list ap;\ va_start(ap, fmt);\ fprintf(stderr, #out" "); vfprintf(stderr, fmt, ap); fprintf(stderr, "\n");\ va_end(ap) #endif void LogError(const char *fmt, ...) { LOG_CAPS_CHANNEL(SEISCOMP_VERROR, fmt); } void LogWarning(const char *fmt, ...) { LOG_CAPS_CHANNEL(SEISCOMP_VWARNING, fmt); } void LogNotice(const char *fmt, ...) { LOG_CAPS_CHANNEL(SEISCOMP_VNOTICE, fmt); } void LogInfo(const char *fmt, ...) { LOG_CAPS_CHANNEL(SEISCOMP_VINFO, fmt); } void LogDebug(const char *fmt, ...) { LOG_CAPS_CHANNEL(SEISCOMP_VDEBUG, fmt); } const size_t MIN_BUFFER_SIZE = 1024*16; const uint16_t DEFAULT_PORT = 18003; } namespace Gempa { namespace CAPS { PluginApplication::PluginApplication(int argc, char **argv, const string &desc) : Seiscomp::Client::StreamApplication(argc, argv) , _plugin(Plugin(desc)) { _bufferSize = 1 << 20; _backfillingBufferSize = 180; _flushInterval = 10; _ackTimeout = 60; _lastAckTimeout = 5; _sendTimeout = 60; _logStatus = false; _statusFlushInterval = 10; _host = "localhost"; _port = DEFAULT_PORT; _strAddr = "localhost:" + sc::toString(DEFAULT_PORT); SC_FS_DECLARE_PATH(path, "@ROOTDIR@/var/run/" + SCCoreApp->name() + "/journal"); _journalFile = path.string(); _mseedEnabled = false; _mseedEncoding = Steim2; _mseedRecordLength = 9; _strMseedEncoding = "Steim2"; _maxFutureEndTime = 120; // By default we disable the acquisition autostart because not all plugins // require this feature. It must be enabled explicitly if required. setAutoAcquisitionStart(false); } void PluginApplication::createCommandLineDescription() { Seiscomp::Client::StreamApplication::createCommandLineDescription(); commandline().addGroup("Output"); commandline().addOption("Output", "addr,a", "Data output address, format is [HOST:PORT]", &_strAddr); commandline().addOption("Output", "buffer-size,b", "Size (bytes) of the packet buffer", &_bufferSize); commandline().addOption("Output", "backfilling", "Enable backfilling for out-of-order records. The backfilling buffer size is " "in seconds", &_backfillingBufferSize); commandline().addOption("Output", "mseed", "Enable on-the-fly MiniSeed " "encoding. If the encoder does not support the input" "type of a packet it will be forwarded. Re encoding of" "MiniSEED packets is not supported."); commandline().addOption("Output", "encoding", "MiniSEED encoding to use. (Uncompressed, Steim1 or Steim2)", &_strMseedEncoding); commandline().addOption("Output", "rec-len", "MiniSEED record length expressed as a power of 2." "A 512 byte record would be 9.", &_mseedRecordLength); commandline().addOption("Output", "max-future-endtime", "Maximum allowed relative end time for packets. If " "the packet end time is greater than the current time plus this " "value the packet will be discarded. By default this value is set to 120 seconds.", &_maxFutureEndTime); commandline().addGroup("Journal"); commandline().addOption("Journal", "journal,j", "File to store stream states. Use an empty string to disable this feature.", &_journalFile); commandline().addOption("Journal", "flush", "Flush stream states every n seconds to disk", &_flushInterval); commandline().addOption("Journal", "wait-for-ack", "Wait when a sync has been forced, up to n seconds", &_ackTimeout); commandline().addOption("Journal", "wait-for-last-ack,w", "Wait on shutdown to receive acknownledgement messages, up to n seconds", &_lastAckTimeout); commandline().addGroup("Status"); commandline().addOption("Status", "status-log", "Log information status " "information e.g. max bytes buffered"); commandline().addOption("Status", "status-flush", "Flush status every n " "seconds to disk", &_statusFlushInterval); } void PluginApplication::done() { LogInfo("Statistics of transmitted data:\n" " records : %d\n" " samples : %d\n" " gaps : %d\n" " start time: %s\n" " end time : %s\n" " files : %d", _stats.records, _stats.samples, _stats.gaps, _stats.startTime.valid()?_stats.startTime.iso().c_str():"", _stats.endTime.valid()?_stats.endTime.iso().c_str():"", _stats.files); Seiscomp::Client::StreamApplication::done(); } void PluginApplication::exit(int returnCode) { Seiscomp::Client::StreamApplication::exit(returnCode); _plugin.quit(); } void PluginApplication::handleTimeout() { sc::Time time = sc::Time::GMT().toLocalTime(); Plugin::Stats stats = _plugin.stats(); _statusFile.stream() << time.toString("%Y/%m/%d %T") << " " << stats.maxBytesBuffered << endl; _plugin.resetMaxBytesBuffered(); } bool PluginApplication::init() { if ( !Seiscomp::Client::StreamApplication::init() ) return false; // Setup log handlers Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_ERROR, LogError); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_WARNING, LogWarning); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_NOTICE, LogNotice); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug); _plugin.setHost(_host); _plugin.setPort(_port); _plugin.setBufferSize(_bufferSize); _plugin.setFlushInterval(_flushInterval); _plugin.setTimeouts(_ackTimeout, _lastAckTimeout, _sendTimeout); _plugin.setMaxFutureEndTime(_maxFutureEndTime); if ( _mseedEnabled ) { MSEEDEncoderFactory *factory = nullptr; if ( _mseedEncoding == Uncompressed ) { SEISCOMP_INFO("Output stream encoding set to MiniSEED/Uncompressed"); factory = new IdentityEncoderFactory(); _plugin.setEncoderFactory(factory); } else if ( _mseedEncoding == Steim1 ) { SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim1"); factory = new Steim1EncoderFactory(); _plugin.setEncoderFactory(factory); } if ( _mseedEncoding == Steim2 ) { SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim2"); factory = new Steim2EncoderFactory(); _plugin.setEncoderFactory(factory); } else { SEISCOMP_ERROR("Unsupported MiniSEED encoding"); return false; } if ( !factory->setRecordLength(_mseedRecordLength) ) { SEISCOMP_ERROR("%s", factory->errorString().c_str()); return false; } } else { SEISCOMP_INFO("MiniSEED encoding is disabled."); } if ( _backfillingBufferSize > 0 ) { _plugin.setBackfillingBufferSize(_backfillingBufferSize); LogInfo("Backfilling buffer size set to %d", _backfillingBufferSize); } if ( !_journalFile.empty() ) { _journalFile = Seiscomp::Environment::Instance()->absolutePath(_journalFile); // Recover states LogInfo("Reading journal from %s", _journalFile.c_str()); _plugin.setJournal(_journalFile); _plugin.readJournal(); LogInfo("Recovered %d streams", (int)_plugin.streamStates().size()); } if ( _logStatus ) { string filename = Seiscomp::Environment::Instance()->logDir() + "/" + SCCoreApp->name() + "-stats.log"; if ( !_statusFile.open(filename.c_str()) ) { LogError("Could not open status file %s.", filename.c_str()); return false; } enableTimer(_statusFlushInterval); } return true; } bool PluginApplication::initConfiguration() { if ( !Seiscomp::Client::StreamApplication::initConfiguration() ) return false; try { _host = configGetString("output.host"); } catch ( ... ) { } try { _port = configGetInt("output.port"); } catch ( ... ) { } try { _sendTimeout = configGetInt("output.timeout"); } catch ( ... ) { } try { string addr = configGetString("output.addr"); if ( !splitAddress(_host, _port, addr, DEFAULT_PORT) ) { SEISCOMP_ERROR("%s: Invalid CAPS address, format is [HOST:PORT]", addr.c_str()); return false; } } catch ( ... ) {} try { _mseedEnabled = configGetBool("output.mseed.enable"); } catch ( ... ) {} try { int length = configGetInt("output.mseed.recordLength"); if ( length < 0 ) { SEISCOMP_ERROR("'output.mseed.recordLength' must be a positive integer"); return false; } _mseedRecordLength = uint(length); } catch ( ... ) {} try { string str = configGetString("output.mseed.encoding"); if ( !fromString(_mseedEncoding, str)) return false; } catch ( ... ) {} try { _bufferSize = configGetInt("output.bufferSize"); } catch ( ... ) { } try { _backfillingBufferSize = configGetInt("output.backfillingBufferSize"); } catch ( ... ) { } try { _journalFile = configGetString("journal.file"); } catch ( ... ) {} try { _flushInterval = configGetInt("journal.flush"); } catch ( ... ) { } try { _ackTimeout = configGetInt("journal.waitForAck"); } catch ( ... ) { } try { _lastAckTimeout = configGetInt("journal.waitForLastAck"); } catch ( ... ) { } try { _logStatus = configGetBool("statusLog.enable"); } catch ( ... ) { } try { _statusFlushInterval = configGetInt("statusLog.flush"); } catch ( ... ) {} try { _maxFutureEndTime= configGetInt("output.maxFutureEndTime"); } catch ( ... ) { } return true; } bool PluginApplication::validateParameters() { if ( !Seiscomp::Client::StreamApplication::validateParameters() ) return false; if ( commandline().hasOption("mseed") ) { _mseedEnabled = true; } if ( commandline().hasOption("status-log") ) { _logStatus = true; } if ( commandline().hasOption("encoding") ) { if ( !fromString(_mseedEncoding, _strMseedEncoding)) return false; } if ( _bufferSize < MIN_BUFFER_SIZE ) { SEISCOMP_ERROR("The plugin buffer size must be at least %ld bytes.", MIN_BUFFER_SIZE); return false; } if ( commandline().hasOption("addr") ) { if ( !splitAddress(_host, _port, _strAddr, DEFAULT_PORT) ) { SEISCOMP_ERROR("%s: Invalid CAPS address, format is [HOST:PORT]", _strAddr.c_str()); return false; } } return true; } bool PluginApplication::fromString(MseedEncoding &enc, string str) { boost::to_lower(str); if( str == "uncompressed" ) enc = Uncompressed; else if ( str == "steim1" ) enc = Steim1; else if ( str == "steim2" ) enc = Steim2; else { SEISCOMP_ERROR("Unsupported encoding %s", str.c_str()); return false; } return true; } } }