/*************************************************************************** * Copyright (C) 2013 by gempa GmbH * * * * All Rights Reserved. * * * * NOTICE: All information contained herein is, and remains * * the property of gempa GmbH and its suppliers, if any. The intellectual * * and technical concepts contained herein are proprietary to gempa GmbH * * and its suppliers. * * Dissemination of this information or reproduction of this material * * is strictly forbidden unless prior written permission is obtained * * from gempa GmbH. * ***************************************************************************/ #include #include #include #ifdef WIN32 #define EWOULDBLOCK WSAEWOULDBLOCK #endif #if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY #include #endif #if !defined(CAPS_FEATURES_MSEED) || CAPS_FEATURES_MSEED #include #endif #if !defined(CAPS_FEATURES_RAW) || CAPS_FEATURES_RAW #include #endif #if !defined(CAPS_FEATURES_RTCM2) || CAPS_FEATURES_RTCM2 #include #endif #include #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL #include #endif #include #include #include #include #include using namespace std; #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL namespace fs = boost::filesystem; #endif //see, http://www.boost.org/doc/libs/1_51_0/libs/filesystem/doc/deprecated.html #if BOOST_FILESYSTEM_VERSION >= 3 #define BOOST_FILESYSTEM_NO_DEPRECATED // path #define FS_PATH(PATH_STR) boost::filesystem::path(PATH_STR) #define FS_DECLARE_PATH(NAME, PATH_STR) \ boost::filesystem::path NAME(PATH_STR); #define FS_HAS_PARENT_PATH(PATH) PATH.has_parent_path() #define FS_PARENT_PATH(PATH) PATH.parent_path() #else // path #define FS_PATH(PATH_STR) boost::filesystem::path(PATH_STR,\ boost::filesystem::native) #define FS_DECLARE_PATH(NAME, PATH_STR) \ boost::filesystem::path NAME(PATH_STR, boost::filesystem::native); #if BOOST_VERSION < 103600 #define FS_HAS_PARENT_PATH(PATH) PATH.has_branch_path() #define FS_PARENT_PATH(PATH) PATH.branch_path() #else #define FS_HAS_PARENT_PATH(PATH) PATH.has_parent_path() #define FS_PARENT_PATH(PATH) PATH.parent_path() #endif #endif namespace { #define LOG_CHANNEL(out, fmt) \ va_list ap;\ va_start(ap, fmt);\ fprintf(stderr, #out" "); vfprintf(stderr, fmt, ap); fprintf(stderr, "\n");\ va_end(ap) void LogError(const char *fmt, ...) { LOG_CHANNEL(ERROR, fmt); } void LogWarning(const char *fmt, ...) { LOG_CHANNEL(WARNING, fmt); } void LogNotice(const char *fmt, ...) { LOG_CHANNEL(NOTICE, fmt); } void LogInfo(const char *fmt, ...) { LOG_CHANNEL(INFO, fmt); } void LogDebug(const char *fmt, ...) { LOG_CHANNEL(DEBUG, fmt); } } namespace Gempa { namespace CAPS { namespace { #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL bool createPath(const string &dir) { try { FS_DECLARE_PATH(path, dir) fs::is_directory(path); fs::create_directories(path); return true; } catch ( ... ) { return false; } } #endif #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING //void dump(const Plugin::BackfillingBuffer &buf) { // Plugin::BackfillingBuffer::const_iterator it; // int idx = 0; // for ( it = buf.begin(); it != buf.end(); ++it, ++idx ) { // std::cerr << idx << ": " << (*it)->record->startTime().iso() << std::endl; // } //} template void fillPacket(Packet *packet, uint16_t version) { PacketDataHeader header; header.packetType = packet->record->packetType(); header.setUOM(packet->uom.c_str()); T packetHeader; uint32_t size = packetHeader.dataSize(); size_t dataSize = packet->record->dataSize(); size += header.dataSize() + dataSize + packet->networkCode.size() + packet->stationCode.size() + packet->locationCode.size() + packet->channelCode.size(); Plugin::BufferPtr buf = Plugin::BufferPtr(new Plugin::Buffer()); buf->resize(size); arraybuf abuf(buf->data(), buf->size()); packetHeader.size = dataSize; packetHeader.SIDSize[NetworkCode] = packet->networkCode.size(); packetHeader.SIDSize[StationCode] = packet->stationCode.size(); packetHeader.SIDSize[LocationCode] = packet->locationCode.size(); packetHeader.SIDSize[ChannelCode] = packet->channelCode.size(); header.version = version; header.put(abuf); packetHeader.put(abuf); abuf.sputn(packet->networkCode.c_str(), packetHeader.SIDSize[NetworkCode]); abuf.sputn(packet->stationCode.c_str(), packetHeader.SIDSize[StationCode]); abuf.sputn(packet->locationCode.c_str(), packetHeader.SIDSize[LocationCode]); abuf.sputn(packet->channelCode.c_str(), packetHeader.SIDSize[ChannelCode]); packet->record->put(abuf); packet->buffer = buf; } void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) { // Brute-force algorithm, should be replaced by something smarter such as // binary search. A list might be the wrong container for that. Plugin::BackfillingBuffer::iterator it; size_t pos = 0; for ( it = buf.begin(); it != buf.end(); ++it, ++pos ) { if ( (*it)->record->endTime() > packet->record->endTime() ) { buf.insert(it, packet); CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %lu, new " "size: %lu, time window: %s~%s", (long unsigned) pos, (long unsigned) buf.size(), buf.front()->record->startTime().iso().c_str(), buf.back()->record->endTime().iso().c_str()); //dump(buf); return; } } buf.push_back(packet); CAPS_DEBUG("Backfilling buffer: packet appended, new size: %lu, time window: %s~%s", (long unsigned) buf.size(), buf.front()->record->startTime().iso().c_str(), buf.back()->record->endTime().iso().c_str()); //dump(buf); } #endif Time getLastSampleTime(DataRecord *rec) { if ( !rec ) { return Time(); } const DataRecord::Header *header = rec->header(); if ( header->samplingFrequencyNumerator != 0 && header->samplingFrequencyDenominator != 0 ) { TimeSpan samplingInterval = header->samplingFrequencyDenominator / header->samplingFrequencyNumerator; return rec->endTime() - samplingInterval; } return rec->endTime(); } } Plugin::Plugin(const string &name, const string &options, const string &description) : _name(name) , _options(options) , _description(description) , _bufferSize(1 << 14) , _bytesBuffered(0) , _port(18003) , _sendTimeout(60) , _isExitRequested(false) #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING , _backfillingBufferSize(0) #endif { #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL _lastWrite = Time::GMT(); _journalDirty = false; _flushInterval = 10; #endif _closed = false; _wasConnected = false; _ackTimeout = 5; _lastAckTimeout = 5; _encoderFactory = NULL; _maxFutureEndTime = TimeSpan(120, 0); #if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL _useSSL = false; #endif } Plugin::~Plugin() { if ( !_closed ) close(); setEncoderFactory(NULL); } void Plugin::close() { #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL _lastWrite = Time(); #endif _closed = true; for ( StreamStates::iterator it = _states.begin(); it != _states.end(); ++it ) { StreamState &state = it->second; while ( !state.backfillingBuffer.empty() ) { PacketPtr &ref_pkt = state.backfillingBuffer.front(); state.lastCommitEndTime = ref_pkt->record->endTime(); encodePacket(ref_pkt); state.backfillingBuffer.pop_front(); } } flushEncoders(); sendBye(); CAPS_INFO("Closing connection to CAPS at %s:%d", _host.c_str(), _port); while ( !_packetBuffer.empty() && readResponse(_lastAckTimeout) ); #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL updateJournal(); #endif disconnect(); CAPS_INFO("Closed connection to CAPS at %s:%d", _host.c_str(), _port); _packetBuffer.clear(); _wasConnected = false; } void Plugin::quit() { _isExitRequested = true; } bool Plugin::connect() { #if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL _socket = SocketPtr(_useSSL ? new CAPS::SSLSocket() : new CAPS::Socket()); #else _socket = SocketPtr(new CAPS::Socket()); #endif CAPS_INFO("Attempting to connect to CAPS at %s:%d", _host.c_str(), _port); if ( _socket->connect(_host, _port) != Socket::Success ) { CAPS_ERROR("Connection failed to CAPS at %s:%d", _host.c_str(), _port); return false; } // Do handshake if ( !_user.empty() && !_password.empty() ) { string auth = "AUTH "+ _user + " " + _password + "\n"; _socket->write(auth.c_str(), auth.length()); } _socket->setNonBlocking(true); FD_ZERO(&_readFDs); FD_ZERO(&_writeFDs); FD_SET(_socket->fd(), &_readFDs); FD_SET(_socket->fd(), &_writeFDs); CAPS_INFO("Connected to CAPS at %s:%d", _host.c_str(), _port); _responseBuf[0] = '\0'; _responseBufIdx = 0; bool result = true; // Disable packet flushing for the first connection establishment to // avoid duplicate records if ( _wasConnected && !flush() ) { disconnect(); result = false; } _wasConnected = true; return result; } void Plugin::disconnect() { if ( _socket && _socket->isValid() ) { CAPS_INFO("Disconnect from %s:%d", _host.c_str(), _port); _socket->shutdown(); _socket->close(); _responseBuf[0] = '\0'; _responseBufIdx = 0; } } bool Plugin::isConnected() const { return _socket && _socket->isValid(); } bool Plugin::readResponse(unsigned int timeout) { if ( !_socket || !_socket->isValid() ) return false; _socket->setNonBlocking(true); bool gotResponse = false; #define bufN 512 char buf[bufN]; int res; if ( timeout > 0 ) { struct timeval tv; tv.tv_sec = timeout; tv.tv_usec = 0; FD_ZERO(&_readFDs); FD_SET(_socket->fd(), &_readFDs); res = select(_socket->fd() + 1, &_readFDs, NULL, NULL, &tv); if ( res <= 0 ) return gotResponse; } while ( true ) { res = _socket->read(buf, bufN); if ( res < 0 ) { if ( errno != EAGAIN && errno != EWOULDBLOCK ) { CAPS_ERROR("Reading failed: %s: disconnect", strerror(errno)); disconnect(); } break; } else if ( res == 0 ) { CAPS_INFO("Peer closed connection"); disconnect(); break; } char *in = buf; // Parse the input buffer into the line buffer for ( int i = 0; i < res; ++i, ++in ) { if ( *in == '\n' ) { _responseBuf[_responseBufIdx] = '\0'; // Handle line if ( (strncasecmp(_responseBuf, "OK ", 3) == 0) && (_responseBufIdx > 3) ) { // Got OK response // Read confirmed packets from response int count = atoi(_responseBuf+3); CAPS_DEBUG("Acknowledged %d packets, %d in queue", count, (int)_packetBuffer.size()); // Update packet buffer for ( int i = 0; i < count; ++i ) { if ( _packetBuffer.empty() ) { CAPS_ERROR("Synchronization error: more packages acknowledged than in queue"); break; } PacketPtr packet = _packetBuffer.front(); _states[packet->streamID].lastEndTime = packet->record->endTime(); _packetBuffer.pop_front(); _bytesBuffered -= packet->record->dataSize(); _packetBufferDirty = true; #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL _journalDirty = true; #endif gotResponse = true; CAPS_DEBUG("Packet acknowledged by CAPS, stream: %s' time window: %s~%s", packet->streamID.c_str(), packet->record->startTime().iso().c_str(), packet->record->endTime().iso().c_str()); if ( _packetAckFunc ) { _packetAckFunc(packet->streamID, packet->record->startTime(), packet->record->endTime()); } } CAPS_DEBUG("Packet buffer state: %d packets, %d bytes", (int)_packetBuffer.size(), (int)_bytesBuffered); } _responseBuf[0] = '\0'; _responseBufIdx = 0; } else { _responseBuf[_responseBufIdx] = *in; ++_responseBufIdx; } } if ( _packetBuffer.empty() ) break; } return gotResponse; } #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL void Plugin::updateJournal() { Time currentTime = Time::GMT(); if ( !_journalFile.empty() && _journalDirty && _lastWrite + TimeSpan(_flushInterval) <= currentTime ) { writeJournal(); _journalDirty = false; _lastWrite = currentTime; } } bool Plugin::readJournal() { if ( _journalFile.empty() ) return false; ifstream ifs; ifs.open(_journalFile.c_str()); return readJournal(ifs); } bool Plugin::readJournal(istream &is) { _states.clear(); string streamID, strTime; string line; Time time; int lineNumber = 0; Time now = Time::GMT(); Time maxAllowedEndTime = now + TimeSpan(86400); while ( getline(is, line) ) { ++lineNumber; size_t p = line.find(' '); if ( p == string::npos ) streamID = trim(line); else { streamID = trim(line.substr(0, p)); strTime = trim(line.substr(p+1)); } if ( !strTime.empty() ) { if ( !time.fromString(strTime.c_str(), "%FT%T.%fZ") ) { CAPS_ERROR("journal:%d: Invalid time: %s", lineNumber, strTime.c_str()); return false; } } else time = Time(); if ( time > maxAllowedEndTime ) { CAPS_WARNING("journal:%d:%s: Timestamp %s is more than one day " "ahead of current time, respecting it nevertheless.", lineNumber, streamID.c_str(), time.iso().c_str()); } _states[streamID].lastEndTime = time; } return true; } #endif bool Plugin::flush() { CAPS_INFO("Flushing %d queued packets", (int)_packetBuffer.size()); PacketBuffer::iterator it = _packetBuffer.begin(); while ( it != _packetBuffer.end() && !_isExitRequested ) { PacketPtr packet = *it; _packetBufferDirty = false; if ( !sendPacket(packet.get() ) && !_isExitRequested ) { if ( _packetBufferDirty ) { CAPS_ERROR("Uh oh, buffer dirty but sending failed!"); } return false; } if ( readResponse() ) { #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL updateJournal(); #endif } if ( _packetBufferDirty ) { it = std::find(_packetBuffer.begin(), _packetBuffer.end(), packet); if ( it != _packetBuffer.end() ) ++it; else { CAPS_DEBUG("Last packet removed, reset flush queue iterator"); it = _packetBuffer.begin(); } } else ++it; } return true; } void Plugin::flushEncoders() { CAPS_INFO("Flushing %d encoders", (int)_encoderItems.size()); EncoderItems::iterator it = _encoderItems.begin(); while ( it != _encoderItems.end() ) { Encoder *encoder = it->second.encoder.get(); if ( encoder ) { encoder->flush(); PacketPtr encodedPacket; while ( (encodedPacket = encoder->pop()) ) { commitPacket(encodedPacket); } } it++; } } Plugin::Status Plugin::push(const string &net, const string &sta, const string &loc, const string &cha, const Time &stime, uint16_t numerator, uint16_t denominator, const string &uom, void *data, size_t count, DataType dt, int timingQuality) { uint8_t dtSize = dataTypeSize(dt); if ( dtSize == 0 ) return Plugin::PacketLoss; RawDataRecord *rec = new RawDataRecord(); rec->setStartTime(stime); rec->setSamplingFrequency(numerator, denominator); rec->setDataType(dt); rec->setBuffer((char*)data, dtSize * count); return push(net, sta, loc, cha, DataRecordPtr(rec), uom, timingQuality); } #if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY Plugin::Status Plugin::push(const string &net, const std::string &sta, const std::string &loc, const std::string &cha, const Time &stime, uint16_t numerator, uint16_t denominator, const std::string &format, char *data, size_t count) { AnyDataRecord *rec = new AnyDataRecord; rec->setStartTime(stime); rec->setEndTime(stime); rec->setSamplingFrequency(numerator, denominator); rec->setType(format.c_str()); rec->setData(data, count); return push(net, sta, loc, cha, DataRecordPtr(rec), "px"); } Plugin::Status Plugin::push(const string &net, const std::string &sta, const std::string &loc, const std::string &cha, const Time &stime, uint16_t numerator, uint16_t denominator, const std::string &format, const std::string &str) { return push(net, sta, loc, cha, stime, numerator, denominator, format, const_cast(&str[0]), str.size()); } #endif void Plugin::tryFlushBackfillingBuffer(StreamState &state) { if ( state.backfillingBuffer.empty() ) return; size_t flushed = 0; while ( !state.backfillingBuffer.empty() ) { PacketPtr &ref_pkt = state.backfillingBuffer.front(); TimeSpan gap = ref_pkt->record->startTime() - state.lastCommitEndTime; int64_t dt_us = (int64_t)gap.seconds()*1000000+gap.microseconds(); // A gap larger than one sample? if ( dt_us >= ref_pkt->dt_us ) break; state.lastCommitEndTime = ref_pkt->record->endTime(); encodePacket(ref_pkt); state.backfillingBuffer.pop_front(); ++flushed; //dump(state.backfillingBuffer); } if ( flushed > 0 ) { if ( state.backfillingBuffer.size() > 0 ) { CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: %lu, time window: %s~%s", (long unsigned) flushed, (long unsigned) state.backfillingBuffer.size(), state.backfillingBuffer.front()->record->startTime().iso().c_str(), state.backfillingBuffer.back()->record->endTime().iso().c_str()); } else { CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: 0", (long unsigned) flushed); } } } void Plugin::trimBackfillingBuffer(StreamState &state) { if ( state.backfillingBuffer.empty() ) return; size_t trimmed = 0; while ( !state.backfillingBuffer.empty() ) { TimeSpan diff = state.backfillingBuffer.back()->record->endTime() - state.backfillingBuffer.front()->record->startTime(); if ( diff.seconds() <= _backfillingBufferSize ) break; PacketPtr &ref_pkt = state.backfillingBuffer.front(); state.lastCommitEndTime = ref_pkt->record->endTime(); encodePacket(ref_pkt); state.backfillingBuffer.pop_front(); } if ( trimmed > 0 ) { if ( state.backfillingBuffer.size() > 0 ) { CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: %lu, time window: %s~%s", (long unsigned) trimmed, (long unsigned) state.backfillingBuffer.size(), state.backfillingBuffer.front()->record->startTime().iso().c_str(), state.backfillingBuffer.back()->record->endTime().iso().c_str()); } else { CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: 0", (long unsigned) trimmed); } } } Plugin::Status Plugin::push(const string &net, const string &sta, const string &loc, const string &cha, DataRecordPtr rec, const string &uom, int timingQuality) { static bool showVersion = true; if ( showVersion ) { CAPS_NOTICE("LIB CAPS version %s", version()); showVersion = false; } if ( rec == NULL ) return PacketNotValid; if ( rec->dataSize() > _bufferSize ) return PacketSize; if ( !rec->endTime().valid() ) return PacketNotValid; Time endTime = getLastSampleTime(rec.get()); Time maxAllowedEndTime = Time::GMT() + TimeSpan(_maxFutureEndTime); if ( endTime > maxAllowedEndTime ) { CAPS_WARNING("%s.%s.%s.%s: Future time stamp detected: Packet end time %s exceeds " "max allowed end time %s. Discard packet.", net.c_str(), sta.c_str(), loc.c_str(), cha.c_str(), rec->endTime().iso().c_str(), maxAllowedEndTime.iso().c_str()); return MaxFutureEndTimeExceeded; } const DataRecord::Header *header = rec->header(); PacketPtr packet(new Packet(rec, net, sta, loc, cha)); packet->uom = uom; packet->dataType = header->dataType; packet->timingQuality = timingQuality; #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING if ( header->samplingFrequencyNumerator == 0 ) { CAPS_DEBUG("detected division by zero, invalid sampling frequency numerator"); return PacketNotValid; } packet->dt_us = ((int64_t)header->samplingFrequencyDenominator * 1000000) / header->samplingFrequencyNumerator; #endif StreamState &state = _states[packet->streamID]; #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING // If buffering is enabled if ( _backfillingBufferSize > 0 ) { if ( state.lastCommitEndTime.valid() ) { int64_t dt_us = ((int64_t)rec->header()->samplingFrequencyDenominator * 1000000) / rec->header()->samplingFrequencyNumerator; TimeSpan gap = packet->record->startTime() - state.lastCommitEndTime; // A gap larger than one sample? if ( ((int64_t)gap.seconds()*1000000+gap.microseconds()) >= dt_us ) { CAPS_DEBUG("detected gap on stream: %s", packet->streamID.c_str()); insertPacket(state.backfillingBuffer, packet); trimBackfillingBuffer(state); tryFlushBackfillingBuffer(state); return Success; } } if ( !encodePacket(packet) ) return PacketLoss; if ( rec->endTime() > state.lastCommitEndTime) { state.lastCommitEndTime = rec->endTime(); } tryFlushBackfillingBuffer(state); } else { #endif if ( !encodePacket(packet) ) return PacketLoss; #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING } #endif return Success; } void Plugin::serializePacket(Packet *packet) { if ( packet->buffer ) return; size_t dataSize = packet->record->dataSize(); bool isV1 = dataSize < (1 << 16); if ( isV1 ) { fillPacket(packet, 1); } else { fillPacket(packet, 2); } } bool Plugin::commitPacket(PacketPtr packet) { // Initial connect if ( !_wasConnected && !_socket ) { while ( !connect() && !_isExitRequested ) { disconnect(); wait(); } if ( !isConnected() ) return false; } if ( _bytesBuffered >= _bufferSize ) { CAPS_DEBUG("Packet buffer is full (%ld/%ld bytes), " "waiting for server ack messages", _bytesBuffered, _bufferSize); while ( _bytesBuffered >= _bufferSize ) { if ( !readResponse(_ackTimeout) ) { disconnect(); while ( !_isExitRequested && !_socket->isValid() ) { wait(); disconnect(); connect(); } if ( _isExitRequested ) break; } } CAPS_DEBUG("%ld/%ld bytes buffered after force wait", _bytesBuffered, _bufferSize); if ( _bytesBuffered >= _bufferSize ) return false; #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL CAPS_DEBUG("Force journal update"); #endif } // Serialize data record serializePacket(packet.get()); CAPS_DEBUG("+ buffer state: %d packets, %d bytes", (int)_packetBuffer.size(), (int)_bytesBuffered); while ( !sendPacket(packet.get() ) ) { CAPS_ERROR("Sending failed: %s", _isExitRequested ? "abort" : "reconnect"); readResponse(); disconnect(); while ( !_isExitRequested && !isConnected() ) { wait(); disconnect(); connect(); } if ( !isConnected() ) return false; } _packetBuffer.push_back(packet); _bytesBuffered += packet->record->dataSize(); _stats.maxBytesBuffered = max(_bytesBuffered, _stats.maxBytesBuffered); if ( readResponse() ) { #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL updateJournal(); #endif } return true; } Encoder* Plugin::getEncoder(PacketPtr packet) { EncoderItems::iterator it = _encoderItems.find(packet->streamID); if ( it == _encoderItems.end() ) { EncoderItem item; it = _encoderItems.insert(make_pair(packet->streamID, item)).first; } DataRecord *record = packet->record.get(); const DataRecord::Header *header = record->header(); EncoderItem *item = &it->second; if ( item->encoder != NULL ) { /* Before we can feed data into the encoder we have to check if the sampling frequency or data type have been changed. Also gaps must be handled. */ const SPClock &clk = item->encoder->clk(); bool needsFlush = false; if( clk.freqn != header->samplingFrequencyNumerator || clk.freqd != header->samplingFrequencyDenominator ) { needsFlush = true; } else if ( record->startTime() != clk.get_time(0) ) { needsFlush = true; } else if ( item->dataType != header->dataType ) { needsFlush = true; } else if ( item->encoder->timingQuality() != packet->timingQuality ) { needsFlush = true; } if ( needsFlush ) { item->encoder->flush(); PacketPtr encodedPacket; while ( (encodedPacket = item->encoder->pop()) ) { // TODO check return value? commitPacket(encodedPacket); } item->encoder = EncoderPtr(); } } if ( item->encoder == NULL ) { Encoder *encoder = _encoderFactory->create(packet->networkCode, packet->stationCode, packet->locationCode, packet->channelCode, header->dataType, header->samplingFrequencyNumerator, header->samplingFrequencyDenominator); if ( encoder != NULL ) { encoder->setStartTime(record->startTime()); encoder->setTimingQuality(packet->timingQuality); item->encoder = EncoderPtr(encoder); item->dataType = header->dataType; } } return item->encoder.get(); } bool Plugin::encodePacket(PacketPtr packet) { if ( _encoderFactory == NULL || !_encoderFactory->supportsRecord(packet->record.get()) ) return commitPacket(packet); Buffer *buffer = packet->record->data(); if ( buffer == NULL ) return commitPacket(packet); Encoder *encoder = getEncoder(packet); if ( encoder == NULL ) return commitPacket(packet); for ( size_t i = 0; i < buffer->size(); i = i + dataTypeSize(packet->dataType) ) { char *data = buffer->data() + i; encoder->push(data); } PacketPtr encodedPacket; while ( (encodedPacket = encoder->pop()) ) { // TODO check return value? commitPacket(encodedPacket); } return true; } void Plugin::setEncoderFactory(EncoderFactory *factory) { if ( _encoderFactory ) { delete _encoderFactory; _encoderFactory = NULL; } _encoderFactory = factory; } void Plugin::sendBye() { if ( _socket == NULL ) return; PacketDataHeader header; memset((char*)&header, 0, header.dataSize()); _socket->setNonBlocking(false); send((char*)&header, header.dataSize(), _sendTimeout); } bool Plugin::sendPacket(Packet *packet) { if ( !_packetBuffer.empty() ) { readResponse(); #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL updateJournal(); #endif } // Send packet data header and packet header _socket->setNonBlocking(false); // Instead of directly write data to the socket we call // a wrapper function that checks if we can write data or not. // If we do not do the check the system send operation // hangs when the TCP buffer is full and the connection is // is in a undefined state. // The check and the system send operation run in a loop until // the data has been sent successfully or the user requests to // to stop => The function call might block. if ( !send(packet->buffer->data(), packet->buffer->size(), _sendTimeout) ) { return false; } CAPS_DEBUG("Packet sent to CAPS, stream: %s, time window: %s~%s", packet->streamID.c_str(), packet->record->startTime().iso().c_str(), packet->record->endTime().iso().c_str()); return true; } void Plugin::wait() { // Wait 5 seconds and keep response latency low for ( int i = 0; (i < 10) && !_isExitRequested; ++i ) usleep(500000); } #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL bool Plugin::writeJournal() { //if ( _states.empty() ) return true; if ( !_journalFile.empty() ) { FS_DECLARE_PATH(path, _journalFile) string filename = ".journal.tmp"; if ( FS_HAS_PARENT_PATH(path) ) { createPath(FS_PARENT_PATH(path).string()); filename = (FS_PARENT_PATH(path) / FS_PATH(filename)).string(); } ofstream ofs(filename.c_str()); if ( !ofs.is_open() ) return false; if ( writeJournal(ofs) ) { ofs.close(); if ( FS_HAS_PARENT_PATH(path) ) { createPath(FS_PARENT_PATH(path).string()); } if ( rename(filename.c_str(), path.string().c_str()) != 0 ) CAPS_ERROR("Failed to create journal %s, %s(%d)", _journalFile.c_str(), strerror(errno), errno); } else { ofs.close(); try { fs::remove(filename); } catch ( ... ) {} } } else return writeJournal(cout); return false; } bool Plugin::writeJournal(ostream &os) { for ( StreamStates::iterator it = _states.begin(); it != _states.end(); ++it ) { os << it->first << " " << it->second.lastEndTime.iso() << endl; if ( !os.good() ) return false; } return true; } #endif void Plugin::enableLogging() { Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_ERROR, LogError); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_WARNING, LogWarning); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_NOTICE, LogNotice); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug); } bool Plugin::send(char *data, int len, int timeout) { if ( !_socket->isValid() ) return false; struct timeval tv; tv.tv_sec = timeout; tv.tv_usec = 0; FD_ZERO(&_writeFDs); FD_SET(_socket->fd(), &_writeFDs); int res = select(_socket->fd() + 1, NULL, &_writeFDs, NULL, &tv); if ( res > 0 ) { res = _socket->write(data, len); if ( res == len ) { return true; } else if ( res == -1 ) { CAPS_ERROR("Failed to send data: %s. " "Forcing reconnect and trying to send data again.", strerror(errno)); } else { CAPS_ERROR("Incomplete send operation: " "Only %d/%d bytes have been sent. " "Forcing reconnect and trying to send data again.", len, res); } } else if ( !res ) { CAPS_ERROR("Detected hanging TCP connection. " "Forcing reconnect and trying to send data again."); } else { CAPS_ERROR("Send error: %s", strerror(errno)); } return false; } } }