Compare commits

..

No commits in common. 'v2' and 'master' have entirely different histories.
v2 ... master

@ -1,12 +1,6 @@
PROJECT(LIBCAPS) PROJECT(LIBCAPS)
CMAKE_MINIMUM_REQUIRED(VERSION 2.4) CMAKE_MINIMUM_REQUIRED(VERSION 2.4)
IF(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER "4.7")
ADD_DEFINITIONS(-std=c++11 -DBOOST_NO_CXX11_SCOPED_ENUM -DBOOST_NO_SCOPED_ENUMS)
ELSE()
MESSAGE(ERROR "The CAPS client library requires gcc version 4.7 or higher")
ENDIF()
SET(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/Modules) SET(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/Modules)
FIND_PACKAGE(Boost REQUIRED) FIND_PACKAGE(Boost REQUIRED)

@ -16,7 +16,7 @@ INCLUDE_DIRECTORIES(../../3rd-party/mseed)
ADD_LIBRARY(${LIB_NAME} SHARED ${${PACKAGE_NAME}_SOURCES}) ADD_LIBRARY(${LIB_NAME} SHARED ${${PACKAGE_NAME}_SOURCES})
SET_TARGET_PROPERTIES(${LIB_NAME} PROPERTIES COMPILE_FLAGS -fPIC) SET_TARGET_PROPERTIES(${LIB_NAME} PROPERTIES COMPILE_FLAGS -fPIC)
SET_TARGET_PROPERTIES(${LIB_NAME} PROPERTIES VERSION 2.0.0 SOVERSION 2) SET_TARGET_PROPERTIES(${LIB_NAME} PROPERTIES VERSION 1.0.0 SOVERSION 1)
TARGET_LINK_LIBRARIES(${LIB_NAME} mseed) TARGET_LINK_LIBRARIES(${LIB_NAME} mseed)
INSTALL(TARGETS ${LIB_NAME} DESTINATION lib) INSTALL(TARGETS ${LIB_NAME} DESTINATION lib)

@ -39,7 +39,7 @@ const string& EncoderFactory::errorString() const {
MSEEDEncoderFactory::MSEEDEncoderFactory() MSEEDEncoderFactory::MSEEDEncoderFactory()
: _recordLength(9) {} : _recordLength(9) {}
bool MSEEDEncoderFactory::setRecordLength(unsigned int recordLength) { bool MSEEDEncoderFactory::setRecordLength(uint recordLength) {
if ( recordLength < 7 || recordLength > 32) { if ( recordLength < 7 || recordLength > 32) {
_errorString = "MSEED record length out of range [7, 32]"; _errorString = "MSEED record length out of range [7, 32]";
return false; return false;

@ -84,7 +84,7 @@ class MSEEDEncoderFactory : public EncoderFactory {
* @param recLen The record length expressed as a power of 2 * @param recLen The record length expressed as a power of 2
* @return True if the record length is valid * @return True if the record length is valid
*/ */
bool setRecordLength(unsigned int recordLength); bool setRecordLength(uint recordLength);
protected: protected:
uint8_t _recordLength; uint8_t _recordLength;

@ -44,10 +44,10 @@ class Encoder {
virtual void reset() { _sampleCount = 0; } virtual void reset() { _sampleCount = 0; }
virtual int type() const = 0; virtual int type() const = 0;
const SPClock &clk() { return _clk; } const SPClock& clk() { return _clk; }
void setStartTime(const Time &time) { _clk.syncTime(time); } void setStartTime(const SPClock::INT_TIME &time) { _clk.sync_time(time); }
const Time currentTime() const { return _clk.getTime(0); } const SPClock::INT_TIME currentTime() const { return _clk.get_time(0); }
int timingQuality() { return _timingQuality; } int timingQuality() { return _timingQuality; }
void setTimingQuality(int quality) { _timingQuality = quality; } void setTimingQuality(int quality) { _timingQuality = quality; }

@ -18,6 +18,7 @@
*****************************************************************************/ *****************************************************************************/
#include <gempa/caps/mseedpacket.h> #include <gempa/caps/mseedpacket.h>
#include <gempa/caps/rawpacket.h>
#include "mseed.h" #include "mseed.h"
#include "slink.h" #include "slink.h"
@ -26,26 +27,23 @@
#include <cstring> #include <cstring>
#include <cstdlib> #include <cstdlib>
#include <cstdio> #include <cstdio>
#include <iostream>
#include <stdint.h> #include <stdint.h>
#include <netinet/in.h> #include <netinet/in.h>
namespace Gempa { namespace Gempa {
namespace CAPS { namespace CAPS {
MSEEDFormat::MSEEDFormat(const std::string &netcode, const std::string &stacode, MSEEDFormat::MSEEDFormat(const std::string &netcode, const std::string &stacode,
const std::string &loccode, const std::string &chacode, const std::string &loccode, const std::string &chacode,
unsigned short freqn, unsigned short freqd, unsigned short freqn, unsigned short freqd,
unsigned short packType, unsigned short packtype_init,
uint8_t recordLength) uint8_t recordLength)
: networkCode(netcode) : networkCode(netcode), stationCode(stacode), locationCode(loccode),
, stationCode(stacode) channelCode(chacode), packType(packtype_init), recordLength(recordLength)
, locationCode(loccode)
, channelCode(chacode)
, packType(packType)
, recordLength(recordLength)
{ {
if(freqn == 0 || freqd == 0) { if(freqn == 0 || freqd == 0) {
sample_rate_factor = 0; sample_rate_factor = 0;
@ -65,10 +63,8 @@ MSEEDFormat::MSEEDFormat(const std::string &netcode, const std::string &stacode,
} }
} }
MSEEDDataRecord *MSEEDFormat::get_buffer(const SPClock::INT_TIME &it, int usec_correction,
MSEEDDataRecord *MSEEDFormat::getBuffer(const Time &it, int usec_correction, int timing_quality, void *&dataptr, int &datalen) {
int timing_quality, void *&dataptr,
int &datalen) {
size_t buflen = 1 << recordLength; size_t buflen = 1 << recordLength;
MSEEDDataRecord *record = new MSEEDDataRecord(); MSEEDDataRecord *record = new MSEEDDataRecord();
record->data()->resize(buflen); record->data()->resize(buflen);
@ -155,10 +151,12 @@ void MSEEDFormat::updateBuffer(MSEEDDataRecord *rec, int samples, int frames) {
if ( ntohs(blkt_1000->next_blkt) != 0 ) { if ( ntohs(blkt_1000->next_blkt) != 0 ) {
sl_blkt_1001_s* blkt_1001 = (sl_blkt_1001_s *)((char *) fsdh + sl_blkt_1001_s* blkt_1001 = (sl_blkt_1001_s *)((char *) fsdh +
sizeof(sl_fsdh_s) + sizeof(sl_blkt_1000_s)); sizeof(sl_fsdh_s) + sizeof(sl_blkt_1000_s));
blkt_1001->frame_cnt = frames; blkt_1001->frame_cnt = frames;
} }
}
rec->unpackHeader();
}
} }
} }

@ -58,16 +58,16 @@ struct MSEEDFormat {
template<class T> template<class T>
MSEEDEncoderPacket<T> MSEEDEncoderPacket<T>
getPacket(const Time &it, int usec_correction, int timing_quality) { get_packet(const SPClock::INT_TIME &it, int usec_correction, int timing_quality) {
void *dataptr = NULL; void *dataptr = NULL;
int datalen = 0; int datalen = 0;
unsigned int size = 0; unsigned int size = 0;
MSEEDDataRecord *rec = getBuffer(it, usec_correction, timing_quality, dataptr, datalen); MSEEDDataRecord *rec = get_buffer(it, usec_correction, timing_quality, dataptr, datalen);
return MSEEDEncoderPacket<T>(rec, size, dataptr, datalen); return MSEEDEncoderPacket<T>(rec, size, dataptr, datalen);
} }
MSEEDDataRecord *getBuffer(const Time &it, int usec_correction, MSEEDDataRecord *get_buffer(const SPClock::INT_TIME &it, int usec_correction,
int timing_quality, int timing_quality,
void *&dataptr, int &datalen); void *&dataptr, int &datalen);

@ -17,7 +17,6 @@
* 01.01.2013 Adapted code to CAPS client library requirements (gempa GmbH) * 01.01.2013 Adapted code to CAPS client library requirements (gempa GmbH)
*****************************************************************************/ *****************************************************************************/
#ifndef CAPS_MSEED_SPCLOCK_H #ifndef CAPS_MSEED_SPCLOCK_H
#define CAPS_MSEED_SPCLOCK_H #define CAPS_MSEED_SPCLOCK_H
@ -29,43 +28,51 @@ namespace Gempa {
namespace CAPS { namespace CAPS {
class SPClock { class SPClock
{
public: public:
SPClock(int freqn, int freqd) typedef Gempa::CAPS::Time INT_TIME;
: freqn(freqn), freqd(freqd) {}
void syncTime(const Time &time) { private:
_itime = time; INT_TIME itime;
_ticks = 0; int ticks;
_corr = 0; int corr;
}
void tick() { public:
++_ticks; const int freqn;
} const int freqd;
Time getTime(int tickDiff) const { SPClock(int freqn_init, int freqd_init): ticks(0), corr(0),
int64_t correctness = (double)freqd / (double)freqn * 1000000 * (_ticks - tickDiff - _corr); freqn(freqn_init), freqd(freqd_init)
return _itime + TimeSpan(long(correctness / 1000000), long(correctness % 1000000)); {}
void sync_time(const INT_TIME &time)
{
itime = time;
ticks = 0;
corr = 0;
} }
int correction() const { void tick()
return _corr; {
++ticks;
} }
public: INT_TIME get_time(int tick_diff) const
const int freqn; {
const int freqd; int64_t correctness = (double)freqd / (double)freqn * 1000000 * (ticks - tick_diff - corr);
return itime + Gempa::CAPS::TimeSpan(long(correctness/1000000),long(correctness%1000000));
}
private: int correction() const
Time _itime; {
int _ticks{0}; return corr;
int _corr{0}; }
}; };
} }
} }
#endif #endif // SPCLOCK_H

@ -33,8 +33,8 @@ namespace CAPS {
//***************************************************************************** //*****************************************************************************
struct Steim1Frame { struct Steim1Frame {
u_int32_t nibbleWord; u_int32_t nibble_word;
u_int32_t sampleWord[15]; u_int32_t sample_word[15];
}; };
//***************************************************************************** //*****************************************************************************
@ -43,44 +43,42 @@ struct Steim1Frame {
template<typename T> template<typename T>
class Steim1Encoder: public Encoder { class Steim1Encoder: public Encoder {
public:
Steim1Encoder(MSEEDFormat *format, int freqn, int freqd)
: Encoder(freqn, freqd), _format(format), _frameCount(0)
, _bp(0), _fp(0), _spw(4), _lastSample(0), _nibbleWord(0) {}
virtual ~Steim1Encoder();
virtual void flush();
virtual void push(void *value);
virtual int type() const { return DE_STEIM1; }
private: private:
void updateSpw(int bp); MSEEDFormat *format;
int frame_count;
int bp;
int fp;
int spw;
int32_t last_sample;
int32_t buf[5];
u_int32_t nibble_word;
MSEEDEncoderPacket<Steim1Frame> current_packet;
void update_spw(int bp);
void store(int32_t value); void store(int32_t value);
void initPacket(); void init_packet();
void updatePacket(); void finish_packet();
void finishPacket(); void update_packet();
MSEEDEncoderPacket<Steim1Frame> getPacket() { MSEEDEncoderPacket<Steim1Frame> get_packet() {
return _format->getPacket<Steim1Frame>(_clk.getTime(_bp), return format->get_packet<Steim1Frame>(_clk.get_time(bp),
_clk.correction(), _timingQuality); _clk.correction(), _timingQuality);
} }
void queuePacket(MSEEDEncoderPacket<Steim1Frame> &pckt); void queue_packet(MSEEDEncoderPacket<Steim1Frame> &pckt);
int numberOfFrames(const MSEEDEncoderPacket<Steim1Frame> &packet) { int number_of_frames(const MSEEDEncoderPacket<Steim1Frame> &packet) {
return (packet.datalen >> 6); return (packet.datalen >> 6);
} }
private: public:
MSEEDFormat *_format; Steim1Encoder(MSEEDFormat *format, int freqn, int freqd)
int _frameCount; : Encoder(freqn, freqd), format(format), frame_count(0),
int _bp; bp(0), fp(0), spw(4), last_sample(0), nibble_word(0) {}
int _fp; virtual ~Steim1Encoder();
int _spw; virtual void flush();
int32_t _lastSample; virtual void push(void *value);
int32_t _buf[5]; virtual int type() const { return DE_STEIM1; }
u_int32_t _nibbleWord;
MSEEDEncoderPacket<Steim1Frame> _currentPacket;
}; };

@ -27,181 +27,158 @@
namespace Gempa { namespace Gempa {
namespace CAPS { namespace CAPS {
template<typename T> template<typename T> Steim1Encoder<T>::~Steim1Encoder() {
Steim1Encoder<T>::~Steim1Encoder() { if ( format != NULL ) delete format;
if ( _format ) {
delete _format;
}
} }
template<typename T> template<typename T> void Steim1Encoder<T>::update_spw(int bp) {
void Steim1Encoder<T>::updateSpw(int bp) {
int spw1 = 4; int spw1 = 4;
assert(bp < 4); assert(bp < 4);
if ( _buf[bp] < -32768 || _buf[bp] > 32767 ) spw1 = 1; if(buf[bp] < -32768 || buf[bp] > 32767) spw1 = 1;
else if ( _buf[bp] < -128 || _buf[bp] > 127 ) spw1 = 2; else if(buf[bp] < -128 || buf[bp] > 127) spw1 = 2;
if ( spw1 < _spw ) _spw = spw1; if(spw1 < spw) spw = spw1;
} }
template<typename T> template<typename T> void Steim1Encoder<T>::store(int32_t value) {
void Steim1Encoder<T>::store(int32_t value) { assert(bp < 4);
assert(_bp < 4); buf[bp] = value - last_sample;
_buf[_bp] = value - _lastSample; last_sample = value;
_lastSample = value; update_spw(bp);
updateSpw(_bp); ++bp;
++_bp;
} }
template<typename T> template<typename T> void Steim1Encoder<T>::init_packet() {
void Steim1Encoder<T>::initPacket() {
int i; int i;
int32_t beginSample = _lastSample; int32_t begin_sample = last_sample;
for ( i = 1; i < _bp; ++i ) { for(i = 1; i < bp; ++i) {
beginSample -= _buf[i]; begin_sample -= buf[i];
} }
reset(); reset();
_currentPacket.data[0].sampleWord[0] = htonl(beginSample); current_packet.data[0].sample_word[0] = htonl(begin_sample);
_frameCount = 0; frame_count = 0;
_nibbleWord = 0; nibble_word = 0;
_fp = 2; fp = 2;
} }
template<typename T> template<typename T> void Steim1Encoder<T>::finish_packet() {
void Steim1Encoder<T>::finishPacket() {
int i; int i;
int32_t endSample = _lastSample; int32_t end_sample = last_sample;
for ( i = 0; i < _bp; ++i ) { for(i = 0; i < bp; ++i) {
endSample -= _buf[i]; end_sample -= buf[i];
} }
_currentPacket.data[0].sampleWord[1] = htonl(endSample); current_packet.data[0].sample_word[1] = htonl(end_sample);
} }
template<typename T> template<typename T> void Steim1Encoder<T>::update_packet() {
void Steim1Encoder<T>::updatePacket() {
unsigned int nibble = 0; unsigned int nibble = 0;
u_int32_t sampleWord = 0; u_int32_t sample_word = 0;
assert(_bp < 5); assert(bp < 5);
int used = _bp; int used = bp;
while ( used > _spw ) { while(used > spw) {
--used; --used;
_spw = 4; spw = 4;
for ( int i = 0; i < used; ++i ) { for(int i = 0; i < used; ++i) update_spw(i);
updateSpw(i);
}
} }
while ( used < _spw ) { while(used < spw) spw >>= 1;
_spw >>= 1;
}
used = _spw; used = spw;
switch ( _spw ) { switch(spw) {
case 4: case 4:
nibble = 1; nibble = 1;
sampleWord = ((_buf[0] & 0xff) << 24) | ((_buf[1] & 0xff) << 16) | sample_word = ((buf[0] & 0xff) << 24) | ((buf[1] & 0xff) << 16) |
((_buf[2] & 0xff) << 8) | (_buf[3] & 0xff); ((buf[2] & 0xff) << 8) | (buf[3] & 0xff);
break; break;
case 2: case 2:
nibble = 2; nibble = 2;
sampleWord = ((_buf[0] & 0xffff) << 16) | (_buf[1] & 0xffff); sample_word = ((buf[0] & 0xffff) << 16) | (buf[1] & 0xffff);
break; break;
case 1: case 1:
nibble = 3; nibble = 3;
sampleWord = _buf[0]; sample_word = buf[0];
break; break;
default: default:
assert(0); assert(0);
} }
_nibbleWord |= (nibble << (30 - ((_fp + 1) << 1))); nibble_word |= (nibble << (30 - ((fp + 1) << 1)));
_spw = 4; spw = 4;
for ( int i = 0; i < _bp - used; ++i ) { for(int i = 0; i < bp - used; ++i) {
_buf[i] = _buf[i + used]; buf[i] = buf[i + used];
updateSpw(i); update_spw(i);
} }
_bp -= used; bp -= used;
_sampleCount += used; _sampleCount += used;
_currentPacket.data[_frameCount].nibbleWord = htonl(_nibbleWord); current_packet.data[frame_count].nibble_word = htonl(nibble_word);
_currentPacket.data[_frameCount].sampleWord[_fp] = htonl(sampleWord); current_packet.data[frame_count].sample_word[fp] = htonl(sample_word);
if(++fp < 15) return;
if ( ++_fp < 15 ) {
return;
}
_nibbleWord = 0;
_fp = 0;
++_frameCount;
nibble_word = 0;
fp = 0;
++frame_count;
return; return;
} }
template<typename T> template<typename T> void Steim1Encoder<T>::queue_packet(MSEEDEncoderPacket<Steim1Frame> &pckt) {
void Steim1Encoder<T>::queuePacket(MSEEDEncoderPacket<Steim1Frame> &pckt) { format->updateBuffer(pckt.record, _sampleCount, frame_count + (fp > 0));
_format->updateBuffer(pckt.record, _sampleCount, _frameCount + (_fp > 0));
Packet *packet = new Packet(DataRecordPtr(pckt.record), Packet *packet = new Packet(DataRecordPtr(pckt.record), format->networkCode, format->stationCode,
_format->networkCode, format->locationCode, format->channelCode);
_format->stationCode,
_format->locationCode,
_format->channelCode);
_packetQueue.push_back(PacketPtr(packet)); _packetQueue.push_back(PacketPtr(packet));
pckt.reset(); pckt.reset();
reset(); reset();
} }
template<typename T> template<typename T> void Steim1Encoder<T>::push(void *value) {
void Steim1Encoder<T>::push(void *value) {
int32_t sample_val = *static_cast<T*>(value); int32_t sample_val = *static_cast<T*>(value);
store(sample_val); store(sample_val);
_clk.tick(); _clk.tick();
while ( _bp >= _spw ) { while(bp >= spw) {
if ( !_currentPacket.valid() ) { if(!current_packet.valid()) {
_currentPacket = getPacket(); current_packet = get_packet();
initPacket(); init_packet();
} }
updatePacket(); update_packet();
if ( _frameCount == numberOfFrames(_currentPacket) ) { if(frame_count == number_of_frames(current_packet)) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }
} }
template<typename T> template<typename T> void Steim1Encoder<T>::flush() {
void Steim1Encoder<T>::flush() { while(bp) {
while ( _bp ) { if(!current_packet.valid()) {
if ( !_currentPacket.valid() ) { current_packet = get_packet();
_currentPacket = getPacket(); init_packet();
initPacket();
} }
updatePacket(); update_packet();
if ( _frameCount == numberOfFrames(_currentPacket) ) { if(frame_count == number_of_frames(current_packet)) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }
if ( _currentPacket.valid() ) { if(current_packet.valid()) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }
} }
} }

@ -31,8 +31,8 @@ namespace CAPS {
//***************************************************************************** //*****************************************************************************
struct Steim2Frame { struct Steim2Frame {
u_int32_t nibbleWord; u_int32_t nibble_word;
u_int32_t sampleWord[15]; u_int32_t sample_word[15];
}; };
//***************************************************************************** //*****************************************************************************
@ -43,8 +43,8 @@ template<typename T>
class Steim2Encoder : public Encoder { class Steim2Encoder : public Encoder {
public: public:
Steim2Encoder(MSEEDFormat *format, int freqn, int freqd) Steim2Encoder(MSEEDFormat *format, int freqn, int freqd)
: Encoder(freqn, freqd), _format(format), _frameCount(0) : Encoder(freqn, freqd), format(format), frame_count(0),
, _bp(0), _fp(0), _spw(4), _lastSample(0), _nibbleWord(0) { bp(0), fp(0), spw(4), last_sample(0), nibble_word(0) {
} }
virtual ~Steim2Encoder(); virtual ~Steim2Encoder();
virtual void flush(); virtual void flush();
@ -53,34 +53,34 @@ class Steim2Encoder : public Encoder {
virtual int type() const { return DE_STEIM2; } virtual int type() const { return DE_STEIM2; }
private: private:
void updateSpw(int bp); void update_spw(int bp);
void store(int32_t value); void store(int32_t value);
void initPacket(); void init_packet();
void updatePacket(); void finish_packet();
void finishPacket(); void update_packet();
MSEEDEncoderPacket<Steim2Frame> getPacket() { MSEEDEncoderPacket<Steim2Frame> get_packet() {
return _format->getPacket<Steim2Frame>(_clk.getTime(_bp), return format->get_packet<Steim2Frame>(_clk.get_time(bp),
_clk.correction(), _timingQuality); _clk.correction(), _timingQuality);
} }
void queuePacket(MSEEDEncoderPacket<Steim2Frame> &pckt); void queue_packet(MSEEDEncoderPacket<Steim2Frame> &pckt);
int numberOfFrames(const MSEEDEncoderPacket<Steim2Frame> &packet) const { int number_of_frames(const MSEEDEncoderPacket<Steim2Frame> &packet) {
return (packet.datalen >> 6); return (packet.datalen >> 6);
} }
private: private:
MSEEDFormat *_format; MSEEDFormat *format;
int _frameCount; int frame_count;
int _bp; int bp;
int _fp; int fp;
int32_t _last_s; int32_t last_s;
int _spw; int spw;
int32_t _lastSample; int32_t last_sample;
int32_t _buf[8]; int32_t buf[8];
u_int32_t _nibbleWord; u_int32_t nibble_word;
MSEEDEncoderPacket<Steim2Frame> _currentPacket; MSEEDEncoderPacket<Steim2Frame> current_packet;
}; };

@ -32,174 +32,163 @@ namespace CAPS {
template<typename T> Steim2Encoder<T>::~Steim2Encoder() { template<typename T> Steim2Encoder<T>::~Steim2Encoder() {
if ( _format ) { if ( format != NULL ) delete format;
delete _format;
}
} }
template<typename T> void Steim2Encoder<T>::updateSpw(int bp) { template<typename T> void Steim2Encoder<T>::update_spw(int bp) {
assert(_bp < 7); assert(bp < 7);
if ( _buf[_bp] < -536870912 ) { if(buf[bp] < -536870912) {
CAPS_WARNING("%s.%s.%s.%s: value %d is too large for Steim2 encoding", CAPS_WARNING("%s.%s.%s.%s: value %d is too large for Steim2 encoding",
_format->networkCode.c_str(), _format->stationCode.c_str(), format->networkCode.c_str(), format->stationCode.c_str(),
_format->locationCode.c_str(), _format->channelCode.c_str(), format->locationCode.c_str(), format->channelCode.c_str(),
_buf[_bp]); buf[bp]);
_buf[_bp] = -536870912; buf[bp] = -536870912;
_spw = 1; spw = 1;
return; return;
} }
if ( _buf[_bp] > 536870911 ) { if(buf[bp] > 536870911) {
CAPS_WARNING("%s.%s.%s.%s: value %d is too large for Steim2 encoding", CAPS_WARNING("%s.%s.%s.%s: value %d is too large for Steim2 encoding",
_format->networkCode.c_str(), _format->stationCode.c_str(), format->networkCode.c_str(), format->stationCode.c_str(),
_format->locationCode.c_str(), _format->channelCode.c_str(), format->locationCode.c_str(), format->channelCode.c_str(),
_buf[_bp]); buf[bp]);
_buf[_bp] = 536870911; buf[bp] = 536870911;
_spw = 1; spw = 1;
return; return;
} }
int spw1 = 7; int spw1 = 7;
if ( _buf[_bp] < -16384 || _buf[_bp] > 16383 ) spw1 = 1; if(buf[bp] < -16384 || buf[bp] > 16383) spw1 = 1;
else if ( _buf[_bp] < -512 || _buf[_bp] > 511 ) spw1 = 2; else if(buf[bp] < -512 || buf[bp] > 511) spw1 = 2;
else if ( _buf[_bp] < -128 || _buf[_bp] > 127 ) spw1 = 3; else if(buf[bp] < -128 || buf[bp] > 127) spw1 = 3;
else if ( _buf[_bp] < -32 || _buf[_bp] > 31 ) spw1 = 4; else if(buf[bp] < -32 || buf[bp] > 31) spw1 = 4;
else if ( _buf[_bp] < -16 || _buf[_bp] > 15 ) spw1 = 5; else if(buf[bp] < -16 || buf[bp] > 15) spw1 = 5;
else if ( _buf[_bp] < -8 || _buf[_bp] > 7 ) spw1 = 6; else if(buf[bp] < -8 || buf[bp] > 7) spw1 = 6;
if ( spw1 < _spw ) _spw = spw1; if(spw1 < spw) spw = spw1;
} }
template<typename T> void Steim2Encoder<T>::store(int32_t value) { template<typename T> void Steim2Encoder<T>::store(int32_t value) {
assert(_bp < 7); assert(bp < 7);
_buf[_bp] = value - _lastSample; buf[bp] = value - last_sample;
_lastSample = value; last_sample = value;
updateSpw(_bp); update_spw(bp);
++_bp; ++bp;
} }
template<typename T> void Steim2Encoder<T>::initPacket() { template<typename T> void Steim2Encoder<T>::init_packet() {
int i; int i;
int32_t begin_sample = _lastSample; int32_t begin_sample = last_sample;
for ( i = 1; i < _bp; ++i ) { for(i = 1; i < bp; ++i) {
begin_sample -= _buf[i]; begin_sample -= buf[i];
} }
reset(); reset();
_currentPacket.data[0].sampleWord[0] = htonl(begin_sample); current_packet.data[0].sample_word[0] = htonl(begin_sample);
_frameCount = 0; frame_count = 0;
_nibbleWord = 0; nibble_word = 0;
_fp = 2; fp = 2;
} }
template<typename T> void Steim2Encoder<T>::finishPacket() { template<typename T> void Steim2Encoder<T>::finish_packet() {
int i; int i;
int32_t endSample = _lastSample; int32_t end_sample = last_sample;
for ( i = 0; i < _bp; ++i ) { for(i = 0; i < bp; ++i) {
endSample -= _buf[i]; end_sample -= buf[i];
} }
_currentPacket.data[0].sampleWord[1] = htonl(endSample); current_packet.data[0].sample_word[1] = htonl(end_sample);
} }
template<typename T> void Steim2Encoder<T>::updatePacket() { template<typename T> void Steim2Encoder<T>::update_packet() {
unsigned int nibble = 0; unsigned int nibble = 0;
u_int32_t sampleWord = 0; u_int32_t sample_word = 0;
assert(_bp < 8); assert(bp < 8);
int used = _bp; int used = bp;
while ( used > _spw ) { while(used > spw) {
--used; --used;
_spw = 7; spw = 7;
for(int i = 0; i < used; ++i) update_spw(i);
for ( int i = 0; i < used; ++i ) {
updateSpw(i);
}
} }
_spw = used; spw = used;
switch ( _spw ) { switch(spw) {
case 7: case 7:
nibble = 3; nibble = 3;
sampleWord = (2U << 30) | ((_buf[0] & 0xf) << 24) | sample_word = (2U << 30) | ((buf[0] & 0xf) << 24) |
((_buf[1] & 0xf) << 20) | ((_buf[2] & 0xf) << 16) | ((buf[1] & 0xf) << 20) | ((buf[2] & 0xf) << 16) |
((_buf[3] & 0xf) << 12) | ((_buf[4] & 0xf) << 8) | ((buf[3] & 0xf) << 12) | ((buf[4] & 0xf) << 8) |
((_buf[5] & 0xf) << 4) | (_buf[6] & 0xf); ((buf[5] & 0xf) << 4) | (buf[6] & 0xf);
break; break;
case 6: case 6:
nibble = 3; nibble = 3;
sampleWord = (1U << 30) | ((_buf[0] & 0x1f) << 25) | sample_word = (1U << 30) | ((buf[0] & 0x1f) << 25) |
((_buf[1] & 0x1f) << 20) | ((_buf[2] & 0x1f) << 15) | ((buf[1] & 0x1f) << 20) | ((buf[2] & 0x1f) << 15) |
((_buf[3] & 0x1f) << 10) | ((_buf[4] & 0x1f) << 5) | ((buf[3] & 0x1f) << 10) | ((buf[4] & 0x1f) << 5) |
(_buf[5] & 0x1f); (buf[5] & 0x1f);
break; break;
case 5: case 5:
nibble = 3; nibble = 3;
sampleWord = ((_buf[0] & 0x3f) << 24) | ((_buf[1] & 0x3f) << 18) | sample_word = ((buf[0] & 0x3f) << 24) | ((buf[1] & 0x3f) << 18) |
((_buf[2] & 0x3f) << 12) | ((_buf[3] & 0x3f) << 6) | ((buf[2] & 0x3f) << 12) | ((buf[3] & 0x3f) << 6) |
(_buf[4] & 0x3f); (buf[4] & 0x3f);
break; break;
case 4: case 4:
nibble = 1; nibble = 1;
sampleWord = ((_buf[0] & 0xff) << 24) | ((_buf[1] & 0xff) << 16) | sample_word = ((buf[0] & 0xff) << 24) | ((buf[1] & 0xff) << 16) |
((_buf[2] & 0xff) << 8) | (_buf[3] & 0xff); ((buf[2] & 0xff) << 8) | (buf[3] & 0xff);
break; break;
case 3: case 3:
nibble = 2; nibble = 2;
sampleWord = (3U << 30) | ((_buf[0] & 0x3ff) << 20) | sample_word = (3U << 30) | ((buf[0] & 0x3ff) << 20) |
((_buf[1] & 0x3ff) << 10) | (_buf[2] & 0x3ff); ((buf[1] & 0x3ff) << 10) | (buf[2] & 0x3ff);
break; break;
case 2: case 2:
nibble = 2; nibble = 2;
sampleWord = (2U << 30) | ((_buf[0] & 0x7fff) << 15) | sample_word = (2U << 30) | ((buf[0] & 0x7fff) << 15) |
(_buf[1] & 0x7fff); (buf[1] & 0x7fff);
break; break;
case 1: case 1:
nibble = 2; nibble = 2;
sampleWord = (1U << 30) | (_buf[0] & 0x3fffffff); sample_word = (1U << 30) | (buf[0] & 0x3fffffff);
break; break;
default: default:
assert(0); assert(0);
break; break;
} }
_nibbleWord |= (nibble << (30 - ((_fp + 1) << 1))); nibble_word |= (nibble << (30 - ((fp + 1) << 1)));
_spw = 7; spw = 7;
for ( int i = 0; i < _bp - used; ++i ) { for(int i = 0; i < bp - used; ++i) {
_buf[i] = _buf[i + used]; buf[i] = buf[i + used];
updateSpw(i); update_spw(i);
} }
_bp -= used; bp -= used;
_sampleCount += used; _sampleCount += used;
_currentPacket.data[_frameCount].nibbleWord = htonl(_nibbleWord); current_packet.data[frame_count].nibble_word = htonl(nibble_word);
_currentPacket.data[_frameCount].sampleWord[_fp] = htonl(sampleWord); current_packet.data[frame_count].sample_word[fp] = htonl(sample_word);
if(++fp < 15) return;
if ( ++_fp < 15 ) {
return;
}
_nibbleWord = 0;
_fp = 0;
++_frameCount;
nibble_word = 0;
fp = 0;
++frame_count;
return; return;
} }
template<typename T> void Steim2Encoder<T>::queuePacket(MSEEDEncoderPacket<Steim2Frame> &pckt) { template<typename T> void Steim2Encoder<T>::queue_packet(MSEEDEncoderPacket<Steim2Frame> &pckt) {
_format->updateBuffer(pckt.record, _sampleCount, _frameCount + (_fp > 0)); format->updateBuffer(pckt.record, _sampleCount, frame_count + (fp > 0));
Packet *packet = new Packet(DataRecordPtr(pckt.record), _format->networkCode, Packet *packet = new Packet(DataRecordPtr(pckt.record), format->networkCode, format->stationCode,
_format->stationCode, format->locationCode, format->channelCode);
_format->locationCode,
_format->channelCode);
_packetQueue.push_back(PacketPtr(packet)); _packetQueue.push_back(PacketPtr(packet));
pckt.reset(); pckt.reset();
reset(); reset();
@ -211,37 +200,37 @@ template<typename T> void Steim2Encoder<T>::push(void *value) {
store(sample_val); store(sample_val);
_clk.tick(); _clk.tick();
while ( _bp >= _spw ) { while ( bp >= spw ) {
if ( !_currentPacket.valid() ) { if( !current_packet.valid() ) {
_currentPacket = getPacket(); current_packet = get_packet();
initPacket(); init_packet();
} }
updatePacket(); update_packet();
if ( _frameCount == numberOfFrames(_currentPacket) ) { if ( frame_count == number_of_frames(current_packet) ) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }
} }
template<typename T> void Steim2Encoder<T>::flush() { template<typename T> void Steim2Encoder<T>::flush() {
while ( _bp ) { while ( bp ) {
if ( !_currentPacket.valid() ) { if ( !current_packet.valid() ) {
_currentPacket = getPacket(); current_packet = get_packet();
initPacket(); init_packet();
} }
updatePacket(); update_packet();
if ( _frameCount == numberOfFrames(_currentPacket) ) { if( frame_count == number_of_frames(current_packet) ) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }
if ( _currentPacket.valid() ) { if ( current_packet.valid() ) {
finishPacket(); finish_packet();
queuePacket(_currentPacket); queue_packet(current_packet);
} }
} }

@ -12,29 +12,28 @@
* from gempa GmbH. * * from gempa GmbH. *
***************************************************************************/ ***************************************************************************/
#ifndef CAPS_MSEED_UNCOMPRESSED_H #ifndef CAPS_MSEED_UNCOMPRESSED_H
#define CAPS_MSEED_UNCOMPRESSED_H #define CAPS_MSEED_UNCOMPRESSED_H
#include "encoder.h" #include "encoder.h"
#include "mseed.h" #include "mseed.h"
#include <gempa/caps/endianess.h> #include <gempa/caps/endianess.h>
#include <iostream>
#include <netinet/in.h>
namespace Gempa { namespace Gempa {
namespace CAPS { namespace CAPS {
template<typename T> class UncompressedMSEED : public Encoder { template<typename T> class UncompressedMSEED : public Encoder {
MSEEDEncoderPacket<T> get_packet() { MSEEDEncoderPacket<T> get_packet() {
return _format->getPacket<T>(_clk.getTime(-_bp), return _format->get_packet<T>(_clk.get_time(-_bp),
_clk.correction(), _timingQuality); _clk.correction(), _timingQuality);
} }
void queuePacket(MSEEDEncoderPacket<T> &pckt) { void queue_packet(MSEEDEncoderPacket<T> &pckt) {
_format->updateBuffer(pckt.record, _sampleCount, 1); _format->updateBuffer(pckt.record, _sampleCount, 1);
Packet *packet = new Packet(DataRecordPtr(pckt.record), _format->networkCode, _format->stationCode, Packet *packet = new Packet(DataRecordPtr(pckt.record), _format->networkCode, _format->stationCode,
@ -52,7 +51,7 @@ template<typename T> class UncompressedMSEED : public Encoder {
virtual ~UncompressedMSEED() { if ( _format ) delete _format; } virtual ~UncompressedMSEED() { if ( _format ) delete _format; }
virtual void flush() { virtual void flush() {
if ( _current_packet.valid() ) { if ( _current_packet.valid() ) {
queuePacket(_current_packet); queue_packet(_current_packet);
} }
} }
@ -79,9 +78,7 @@ template<typename T> class UncompressedMSEED : public Encoder {
}; };
} }
} }
#endif // __STEIM1_H__ #endif // __STEIM1_H__

@ -54,6 +54,7 @@ bool MSEEDDataRecord::readMetaData(std::streambuf &buf, int size,
Header &header, Header &header,
Time &startTime, Time &startTime,
Time &endTime) { Time &endTime) {
#if 1 // Set this to 1 to enable no-malloc fast MSeed meta parser
fsdh_s head; fsdh_s head;
if ( size <= 0 ) { if ( size <= 0 ) {
@ -238,7 +239,22 @@ bool MSEEDDataRecord::readMetaData(std::streambuf &buf, int size,
endTime += TimeSpan((hptime_t)hptime/HPTMODULUS,(hptime_t)hptime%HPTMODULUS); endTime += TimeSpan((hptime_t)hptime/HPTMODULUS,(hptime_t)hptime%HPTMODULUS);
} }
timeToTimestamp(header.samplingTime, startTime); timeToTimestamp(_header.samplingTime, startTime);
#else
std::vector<char> data(size);
size_t read = buf.sgetn(&data[0], data.size());
if ( read != data.size() ) {
CAPS_WARNING("read metadata: input buffer underflow: only %d/%d bytes read",
(int)read, (int)data.size());
return;
}
unpackHeader(&data[0], data.size());
header = _header;
startTime = _startTime;
endTime = _endTime;
#endif
return true; return true;
} }
@ -312,6 +328,93 @@ DataRecord::ReadStatus MSEEDDataRecord::get(std::streambuf &buf, int size,
} }
return RS_Complete; return RS_Complete;
/*
// Only unpack the header structure
int state = msr_unpack(&_data[0], _data.size(), &ms_rec, 0, 0);
if ( state != MS_NOERROR ) {
switch ( state ) {
case MS_GENERROR:
CAPS_WARNING("get: generic libmseed error");
break;
case MS_NOTSEED:
CAPS_WARNING("get: input data is not seed");
break;
case MS_WRONGLENGTH:
CAPS_WARNING("get: length of data read was not correct");
break;
case MS_OUTOFRANGE:
CAPS_WARNING("get: SEED record length out of range");
break;
case MS_UNKNOWNFORMAT:
CAPS_WARNING("get: unknown data encoding format");
break;
case MS_STBADCOMPFLAG:
CAPS_WARNING("get: invalid Steim compression flag(s)");
break;
}
if ( ms_rec != NULL )
msr_free(&ms_rec);
return RS_Error;
}
hptime_t hptime = msr_starttime(ms_rec);
_startTime = Time((hptime_t)hptime/HPTMODULUS,(hptime_t)hptime%HPTMODULUS);
_endTime = _startTime;
if ( ms_rec->samprate > 0.0 && ms_rec->samplecnt > 0 ) {
hptime = (hptime_t)(((double)(ms_rec->samplecnt) / ms_rec->samprate * HPTMODULUS) + 0.5);
_endTime += TimeSpan((hptime_t)hptime/HPTMODULUS,(hptime_t)hptime%HPTMODULUS);
}
_header.dataType = DT_Unknown;
timeToTimestamp(_header.samplingTime, _startTime);
if ( ms_rec->fsdh->samprate_fact > 0 ) {
_header.samplingFrequencyNumerator = ms_rec->fsdh->samprate_fact;
_header.samplingFrequencyDenominator = 1;
}
else {
_header.samplingFrequencyNumerator = 1;
_header.samplingFrequencyDenominator = -ms_rec->fsdh->samprate_fact;
}
if ( ms_rec->fsdh->samprate_mult > 0 )
_header.samplingFrequencyNumerator *= ms_rec->fsdh->samprate_mult;
else
_header.samplingFrequencyDenominator *= -ms_rec->fsdh->samprate_mult;
switch ( ms_rec->sampletype ) {
case 'a':
_header.dataType = DT_INT8;
break;
case 'i':
_header.dataType = DT_INT32;
break;
case 'f':
_header.dataType = DT_FLOAT;
break;
case 'd':
_header.dataType = DT_DOUBLE;
break;
default:
_header.dataType = DT_Unknown;
break;
}
msr_free(&ms_rec);
if ( start.valid() || end.valid() ) {
// Out of scope?
if ( end.valid() && (end <= _startTime) )
return RS_AfterTimeWindow;
if ( start.valid() && (start >= _endTime) )
return RS_BeforeTimeWindow;
}
return RS_Complete;
*/
} }

@ -13,7 +13,6 @@
***************************************************************************/ ***************************************************************************/
#include <gempa/caps/endianess.h>
#include <gempa/caps/log.h> #include <gempa/caps/log.h>
#include <gempa/caps/plugin.h> #include <gempa/caps/plugin.h>
#include <gempa/caps/utils.h> #include <gempa/caps/utils.h>
@ -44,8 +43,6 @@
#include <boost/filesystem/convenience.hpp> #include <boost/filesystem/convenience.hpp>
#endif #endif
#include <boost/algorithm/string.hpp>
#include <cstring> #include <cstring>
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
@ -122,8 +119,6 @@ namespace CAPS {
namespace { namespace {
#define HELLO_REQUEST "HELLO\n"
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool createPath(const string &dir) { bool createPath(const string &dir) {
try { try {
@ -146,49 +141,6 @@ bool createPath(const string &dir) {
// } // }
//} //}
template<typename T> void dumpSamples(RawDataRecord *rec, size_t sampleCount) {
const vector<T> *data = reinterpret_cast<vector<T>* >(rec->data());
for ( size_t i = 0; i < sampleCount; ++i ) {
cout << data->at(i) << endl;
}
}
void dumpSList(Packet *packet) {
RawDataRecord *rec = static_cast<RawDataRecord*>(packet->record.get());
string id = packet->streamID;
boost::replace_all(id, ".", "_");
string dataType = packet->record->formatName();
boost::replace_all(dataType, "RAW/", "");
DataType dt = rec->header()->dataType;
size_t sampleCount = rec->data()->size() / dataTypeSize(dt);
cout << "TIMESERIES " << id << "_R, "
<< sampleCount << " samples, "
<< static_cast<float>(rec->header()->samplingFrequencyNumerator) / rec->header()->samplingFrequencyDenominator << " sps, "
<< packet->record->startTime().iso() << " SLIST, "
<< dataType << ", " << packet->uom << endl;
if ( dt == DT_INT8 ) {
dumpSamples<int8_t>(rec, sampleCount);
}
else if ( dt == DT_INT16 ) {
dumpSamples<int16_t>(rec, sampleCount);
}
else if ( dt == DT_INT32 ) {
dumpSamples<int32_t>(rec, sampleCount);
}
else if ( dt == DT_INT64 ) {
dumpSamples<int64_t>(rec, sampleCount);
}
else if ( dt == DT_FLOAT ) {
dumpSamples<float>(rec, sampleCount);
}
else if ( dt == DT_DOUBLE ) {
dumpSamples<double>(rec, sampleCount);
}
}
template<typename T> void fillPacket(Packet *packet, uint16_t version) { template<typename T> void fillPacket(Packet *packet, uint16_t version) {
PacketDataHeader header; PacketDataHeader header;
header.packetType = packet->record->packetType(); header.packetType = packet->record->packetType();
@ -238,9 +190,9 @@ void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) {
for ( it = buf.begin(); it != buf.end(); ++it, ++pos ) { for ( it = buf.begin(); it != buf.end(); ++it, ++pos ) {
if ( (*it)->record->endTime() > packet->record->endTime() ) { if ( (*it)->record->endTime() > packet->record->endTime() ) {
buf.insert(it, packet); buf.insert(it, packet);
CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %zu, new " CAPS_DEBUG("Backfilling buffer: packet inserted at pos: %lu, new "
"size: %zu, time window: %s~%s", "size: %lu, time window: %s~%s", (long unsigned) pos,
pos, buf.size(), (long unsigned) buf.size(),
buf.front()->record->startTime().iso().c_str(), buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str()); buf.back()->record->endTime().iso().c_str());
//dump(buf); //dump(buf);
@ -250,8 +202,8 @@ void insertPacket(Plugin::BackfillingBuffer &buf, PacketPtr packet) {
buf.push_back(packet); buf.push_back(packet);
CAPS_DEBUG("Backfilling buffer: packet appended, new size: %zu, time window: %s~%s", CAPS_DEBUG("Backfilling buffer: packet appended, new size: %lu, time window: %s~%s",
buf.size(), (long unsigned) buf.size(),
buf.front()->record->startTime().iso().c_str(), buf.front()->record->startTime().iso().c_str(),
buf.back()->record->endTime().iso().c_str()); buf.back()->record->endTime().iso().c_str());
//dump(buf); //dump(buf);
@ -282,14 +234,14 @@ Plugin::Plugin(const string &name, const string &options,
, _description(description) , _description(description)
, _bufferSize(1 << 14) , _bufferSize(1 << 14)
, _bytesBuffered(0) , _bytesBuffered(0)
, _port(18003)
, _sendTimeout(60) , _sendTimeout(60)
, _isExitRequested(false) , _isExitRequested(false)
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
, _backfillingBufferSize(0) , _backfillingBufferSize(0)
#endif #endif
{ {
_url.host = "localhost";
_url.port = 18003;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL #if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
_lastWrite = Time::GMT(); _lastWrite = Time::GMT();
_journalDirty = false; _journalDirty = false;
@ -304,7 +256,6 @@ Plugin::Plugin(const string &name, const string &options,
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL #if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = false; _useSSL = false;
#endif #endif
_dumpPackets = false;
} }
@ -335,7 +286,7 @@ void Plugin::close() {
flushEncoders(); flushEncoders();
sendBye(); sendBye();
CAPS_INFO("Closing connection to CAPS at %s:%d", _url.host.c_str(), _url.port); CAPS_INFO("Closing connection to CAPS at %s:%d", _host.c_str(), _port);
while ( !_packetBuffer.empty() && readResponse(_lastAckTimeout) ); while ( !_packetBuffer.empty() && readResponse(_lastAckTimeout) );
@ -344,7 +295,7 @@ void Plugin::close() {
#endif #endif
disconnect(); disconnect();
CAPS_INFO("Closed connection to CAPS at %s:%d", _url.host.c_str(), _url.port); CAPS_INFO("Closed connection to CAPS at %s:%d", _host.c_str(), _port);
_packetBuffer.clear(); _packetBuffer.clear();
_wasConnected = false; _wasConnected = false;
@ -362,34 +313,16 @@ bool Plugin::connect() {
#endif #endif
CAPS_INFO("Attempting to connect to CAPS at %s:%d", CAPS_INFO("Attempting to connect to CAPS at %s:%d",
_url.host.c_str(), _url.port); _host.c_str(), _port);
if ( _socket->connect(_url.host, _url.port) != Socket::Success ) { if ( _socket->connect(_host, _port) != Socket::Success ) {
CAPS_ERROR("Connection failed to CAPS at %s:%d", _url.host.c_str(), _url.port); CAPS_ERROR("Connection failed to CAPS at %s:%d", _host.c_str(), _port);
return false; return false;
} }
// Do handshake // Do handshake
if ( !_user.empty() && !_password.empty() ) {
int apiVersion = 0; string auth = "AUTH "+ _user + " " + _password + "\n";
if ( !getAPIVersion(apiVersion) ) { _socket->write(auth.c_str(), auth.length());
return false;
}
CAPS_INFO("Found CAPS API version %d", apiVersion);
if ( apiVersion >= 5 ) {
if ( !_agent.empty() ) {
_socket->write("AGENT ", 6);
_socket->write(_agent.data(), _agent.size());
_socket->write("\n", 1);
}
}
if ( !_url.user.empty() && !_url.password.empty() ) {
_socket->write("AUTH ", 5);
_socket->write(_url.user.data(), _url.user.size());
_socket->write(" ", 1);
_socket->write(_url.password.data(), _url.password.size());
_socket->write("\n", 1);
} }
_socket->setNonBlocking(true); _socket->setNonBlocking(true);
@ -400,7 +333,7 @@ bool Plugin::connect() {
FD_SET(_socket->fd(), &_readFDs); FD_SET(_socket->fd(), &_readFDs);
FD_SET(_socket->fd(), &_writeFDs); FD_SET(_socket->fd(), &_writeFDs);
CAPS_INFO("Connected to CAPS at %s:%d", _url.host.c_str(), _url.port); CAPS_INFO("Connected to CAPS at %s:%d", _host.c_str(), _port);
_responseBuf[0] = '\0'; _responseBuf[0] = '\0';
_responseBufIdx = 0; _responseBufIdx = 0;
@ -421,7 +354,7 @@ bool Plugin::connect() {
void Plugin::disconnect() { void Plugin::disconnect() {
if ( _socket && _socket->isValid() ) { if ( _socket && _socket->isValid() ) {
CAPS_INFO("Disconnect from %s:%d", _url.host.c_str(), _url.port); CAPS_INFO("Disconnect from %s:%d", _host.c_str(), _port);
_socket->shutdown(); _socket->shutdown();
_socket->close(); _socket->close();
@ -441,7 +374,7 @@ bool Plugin::readResponse(unsigned int timeout) {
bool gotResponse = false; bool gotResponse = false;
const int bufN = 512; #define bufN 512
char buf[bufN]; char buf[bufN];
int res; int res;
@ -486,7 +419,7 @@ bool Plugin::readResponse(unsigned int timeout) {
// Read confirmed packets from response // Read confirmed packets from response
int count = atoi(_responseBuf+3); int count = atoi(_responseBuf+3);
CAPS_DEBUG("Acknowledged %d packets, %zu in queue", count, _packetBuffer.size()); CAPS_DEBUG("Acknowledged %d packets, %d in queue", count, (int)_packetBuffer.size());
// Update packet buffer // Update packet buffer
for ( int i = 0; i < count; ++i ) { for ( int i = 0; i < count; ++i ) {
if ( _packetBuffer.empty() ) { if ( _packetBuffer.empty() ) {
@ -513,12 +446,8 @@ bool Plugin::readResponse(unsigned int timeout) {
} }
} }
CAPS_DEBUG("Packet buffer state: %zu packets, %zu bytes", CAPS_DEBUG("Packet buffer state: %d packets, %d bytes",
_packetBuffer.size(), _bytesBuffered); (int)_packetBuffer.size(), (int)_bytesBuffered);
}
else if ( (strncasecmp(_responseBuf, "ERROR ", 6) == 0) && (_responseBufIdx > 6) ) {
CAPS_ERROR("%s", _responseBuf + 6);
return false;
} }
_responseBuf[0] = '\0'; _responseBuf[0] = '\0';
@ -600,7 +529,7 @@ bool Plugin::readJournal(istream &is) {
#endif #endif
bool Plugin::flush() { bool Plugin::flush() {
CAPS_INFO("Flushing %zu queued packets", _packetBuffer.size()); CAPS_INFO("Flushing %d queued packets", (int)_packetBuffer.size());
PacketBuffer::iterator it = _packetBuffer.begin(); PacketBuffer::iterator it = _packetBuffer.begin();
while ( it != _packetBuffer.end() && !_isExitRequested ) { while ( it != _packetBuffer.end() && !_isExitRequested ) {
@ -638,7 +567,7 @@ bool Plugin::flush() {
} }
void Plugin::flushEncoders() { void Plugin::flushEncoders() {
CAPS_INFO("Flushing %zu encoders", _encoderItems.size()); CAPS_INFO("Flushing %d encoders", (int)_encoderItems.size());
EncoderItems::iterator it = _encoderItems.begin(); EncoderItems::iterator it = _encoderItems.begin();
while ( it != _encoderItems.end() ) { while ( it != _encoderItems.end() ) {
Encoder *encoder = it->second.encoder.get(); Encoder *encoder = it->second.encoder.get();
@ -669,7 +598,7 @@ Plugin::Status Plugin::push(const string &net, const string &sta,
rec->setStartTime(stime); rec->setStartTime(stime);
rec->setSamplingFrequency(numerator, denominator); rec->setSamplingFrequency(numerator, denominator);
rec->setDataType(dt); rec->setDataType(dt);
rec->setBuffer(static_cast<char*>(data), dtSize * count); rec->setBuffer((char*)data, dtSize * count);
return push(net, sta, loc, cha, DataRecordPtr(rec), uom, timingQuality); return push(net, sta, loc, cha, DataRecordPtr(rec), uom, timingQuality);
} }
@ -709,7 +638,7 @@ void Plugin::tryFlushBackfillingBuffer(StreamState &state) {
PacketPtr &ref_pkt = state.backfillingBuffer.front(); PacketPtr &ref_pkt = state.backfillingBuffer.front();
TimeSpan gap = ref_pkt->record->startTime() - state.lastCommitEndTime; TimeSpan gap = ref_pkt->record->startTime() - state.lastCommitEndTime;
int64_t dt_us = static_cast<int64_t>(gap.seconds()) * 1000000 + gap.microseconds(); int64_t dt_us = (int64_t)gap.seconds()*1000000+gap.microseconds();
// A gap larger than one sample? // A gap larger than one sample?
if ( dt_us >= ref_pkt->dt_us ) break; if ( dt_us >= ref_pkt->dt_us ) break;
@ -723,14 +652,15 @@ void Plugin::tryFlushBackfillingBuffer(StreamState &state) {
if ( flushed > 0 ) { if ( flushed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) { if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: %zu, time window: %s~%s", CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: %lu, time window: %s~%s",
flushed, state.backfillingBuffer.size(), (long unsigned) flushed,
(long unsigned) state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(), state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str()); state.backfillingBuffer.back()->record->endTime().iso().c_str());
} }
else { else {
CAPS_DEBUG("backfilling buffer: %zu pakets flushed, new size: 0", CAPS_DEBUG("backfilling buffer: %lu pakets flushed, new size: 0",
flushed); (long unsigned) flushed);
} }
} }
} }
@ -754,14 +684,15 @@ void Plugin::trimBackfillingBuffer(StreamState &state) {
if ( trimmed > 0 ) { if ( trimmed > 0 ) {
if ( state.backfillingBuffer.size() > 0 ) { if ( state.backfillingBuffer.size() > 0 ) {
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: %zu, time window: %s~%s", CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: %lu, time window: %s~%s",
trimmed, state.backfillingBuffer.size(), (long unsigned) trimmed,
(long unsigned) state.backfillingBuffer.size(),
state.backfillingBuffer.front()->record->startTime().iso().c_str(), state.backfillingBuffer.front()->record->startTime().iso().c_str(),
state.backfillingBuffer.back()->record->endTime().iso().c_str()); state.backfillingBuffer.back()->record->endTime().iso().c_str());
} }
else { else {
CAPS_DEBUG("backfilling buffer: %zu pakets trimmed, new size: 0", CAPS_DEBUG("backfilling buffer: %lu pakets trimmed, new size: 0",
trimmed); (long unsigned) trimmed);
} }
} }
} }
@ -861,12 +792,6 @@ void Plugin::serializePacket(Packet *packet) {
} }
bool Plugin::commitPacket(PacketPtr packet) { bool Plugin::commitPacket(PacketPtr packet) {
if ( _dumpPackets ) {
serializePacket(packet.get());
dumpPacket(packet.get());
return true;
}
// Initial connect // Initial connect
if ( !_wasConnected && !_socket ) { if ( !_wasConnected && !_socket ) {
while ( !connect() && !_isExitRequested ) { while ( !connect() && !_isExitRequested ) {
@ -878,16 +803,12 @@ bool Plugin::commitPacket(PacketPtr packet) {
} }
if ( _bytesBuffered >= _bufferSize ) { if ( _bytesBuffered >= _bufferSize ) {
CAPS_DEBUG("Packet buffer is full (%zu/%zu bytes), " CAPS_DEBUG("Packet buffer is full (%ld/%ld bytes), "
"waiting for server ack messages", "waiting for server ack messages",
_bytesBuffered, _bufferSize); _bytesBuffered, _bufferSize);
while ( _bytesBuffered >= _bufferSize ) { while ( _bytesBuffered >= _bufferSize ) {
if ( !readResponse(_ackTimeout) ) { if ( !readResponse(_ackTimeout) ) {
CAPS_WARNING("Packet buffer was full (%zu/%zu bytes), "
"did not receive ack within %d seconds",
_bytesBuffered, _bufferSize,
_ackTimeout);
disconnect(); disconnect();
while ( !_isExitRequested && !_socket->isValid() ) { while ( !_isExitRequested && !_socket->isValid() ) {
wait(); wait();
@ -900,7 +821,7 @@ bool Plugin::commitPacket(PacketPtr packet) {
} }
} }
CAPS_DEBUG("%zu/%zu bytes buffered after force wait", CAPS_DEBUG("%ld/%ld bytes buffered after force wait",
_bytesBuffered, _bufferSize); _bytesBuffered, _bufferSize);
if ( _bytesBuffered >= _bufferSize ) if ( _bytesBuffered >= _bufferSize )
@ -914,8 +835,8 @@ bool Plugin::commitPacket(PacketPtr packet) {
// Serialize data record // Serialize data record
serializePacket(packet.get()); serializePacket(packet.get());
CAPS_DEBUG("+ buffer state: %zu packets, %zu bytes", CAPS_DEBUG("+ buffer state: %d packets, %d bytes",
_packetBuffer.size(), _bytesBuffered); (int)_packetBuffer.size(), (int)_bytesBuffered);
while ( !sendPacket(packet.get() ) ) { while ( !sendPacket(packet.get() ) ) {
CAPS_ERROR("Sending failed: %s", _isExitRequested ? "abort" : "reconnect"); CAPS_ERROR("Sending failed: %s", _isExitRequested ? "abort" : "reconnect");
@ -966,11 +887,11 @@ Encoder* Plugin::getEncoder(PacketPtr packet) {
*/ */
const SPClock &clk = item->encoder->clk(); const SPClock &clk = item->encoder->clk();
bool needsFlush = false; bool needsFlush = false;
if ( clk.freqn != header->samplingFrequencyNumerator || if( clk.freqn != header->samplingFrequencyNumerator ||
clk.freqd != header->samplingFrequencyDenominator ) { clk.freqd != header->samplingFrequencyDenominator ) {
needsFlush = true; needsFlush = true;
} }
else if ( record->startTime() != clk.getTime(0) ) { else if ( record->startTime() != clk.get_time(0) ) {
needsFlush = true; needsFlush = true;
} }
else if ( item->dataType != header->dataType ) { else if ( item->dataType != header->dataType ) {
@ -1049,7 +970,7 @@ void Plugin::sendBye() {
if ( _socket == NULL ) return; if ( _socket == NULL ) return;
PacketDataHeader header; PacketDataHeader header;
memset(reinterpret_cast<char*>(&header), 0, header.dataSize()); memset((char*)&header, 0, header.dataSize());
_socket->setNonBlocking(false); _socket->setNonBlocking(false);
send((char*)&header, header.dataSize(), _sendTimeout); send((char*)&header, header.dataSize(), _sendTimeout);
@ -1156,9 +1077,8 @@ void Plugin::enableLogging() {
} }
bool Plugin::send(char *data, int len, int timeout) { bool Plugin::send(char *data, int len, int timeout) {
if ( !_socket->isValid() ) { if ( !_socket->isValid() )
return false; return false;
}
struct timeval tv; struct timeval tv;
tv.tv_sec = timeout; tv.tv_sec = timeout;
@ -1196,94 +1116,5 @@ bool Plugin::send(char *data, int len, int timeout) {
return false; return false;
} }
bool Plugin::setAddress(const string &addr, uint16_t defaultPort) {
if ( !_url.parse(addr, defaultPort) ) {
CAPS_ERROR("Failed to set address: %s",
_url.errorString.c_str());
return false;
}
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
_useSSL = boost::iequals(_url.protocol, "capss");
#endif
return true;
}
void Plugin::dumpPacket(Packet *packet) {
if ( packet->record->packetType() == RawDataPacket ) {
dumpSList(packet);
}
else if ( packet->record->packetType() == MSEEDPacket ) {
MSEEDDataRecord *rec = static_cast<MSEEDDataRecord*>(packet->record.get());
cout.write(rec->data()->data(),
static_cast<streamsize>(rec->data()->size()));
}
else {
cout << "Could not dump packet: Unsupported packet type '"
<< packet->record->packetType() << "'";
}
}
void Plugin::dumpPackets(bool enable) {
_dumpPackets = enable;
}
void Plugin::setAgent(const std::string &agent) {
_agent = agent;
}
bool Plugin::getAPIVersion(int &version) {
version = 0;
int bytesSent = _socket->send(HELLO_REQUEST);
if ( bytesSent != strlen(HELLO_REQUEST) ) {
CAPS_ERROR("Could not get CAPS API version: %s",
strerror(errno));
return false;
}
int32_t msgLength = 0;
socketbuf<Socket, 512> buf(_socket.get());
streamsize bytesRead = buf.sgetn(reinterpret_cast<char*>(&msgLength),
sizeof(int32_t));
if ( bytesRead != sizeof(int32_t) ) {
CAPS_ERROR("Could not get CAPS API version: Expected message length in "
"server response: %s", strerror(errno));
return false;
}
msgLength = Endianess::Converter::FromLittleEndian(msgLength);
// Check if the hello response comes from a CAPS server with
// API version < 5
constexpr int32_t OKTag = 0x30204b4f;
if ( msgLength == OKTag ) {
return true;
}
buf.set_read_limit(msgLength);
bool ret = true;
iostream is(&buf);
string line;
while ( getline(is, line) ) {
size_t pos = line.find("API=");
if ( pos == string::npos ) {
continue;
}
string apiStr = line.substr(4);
if ( !str2int(version, apiStr.c_str()) ) {
CAPS_ERROR("Invalid CAPS API version: Expected number");
ret = false;
}
}
return ret;
}
} }
} }

@ -34,7 +34,6 @@
#include <gempa/caps/packet.h> #include <gempa/caps/packet.h>
#include <gempa/caps/socket.h> #include <gempa/caps/socket.h>
#include <gempa/caps/url.h>
#include <gempa/caps/version.h> #include <gempa/caps/version.h>
#include <gempa/caps/pluginpacket.h> #include <gempa/caps/pluginpacket.h>
@ -73,7 +72,6 @@ class SC_GEMPA_CAPS_API Plugin {
typedef std::vector<char> Buffer; typedef std::vector<char> Buffer;
typedef boost::shared_ptr<Buffer> BufferPtr; typedef boost::shared_ptr<Buffer> BufferPtr;
typedef std::deque<PacketPtr> PacketBuffer;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING #if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
typedef std::list<PacketPtr> BackfillingBuffer; typedef std::list<PacketPtr> BackfillingBuffer;
@ -141,19 +139,11 @@ class SC_GEMPA_CAPS_API Plugin {
*/ */
void setEncoderFactory(EncoderFactory *factory); void setEncoderFactory(EncoderFactory *factory);
/** void setHost(const std::string &host) { _host = host; }
* @brief Parses connection parameters from address string. Format const std::string &host() const { return _host; }
* is [[caps|capss]://][user:pass@]host[:port]
* @param addr The address of the caps server as string
* @param defaultPort The default port used when the port is omitted
*/
bool setAddress(const std::string &addr, uint16_t defaultPort = 18003);
void setHost(const std::string &host) { _url.host = host; } void setPort(unsigned short port) { _port = port; }
const std::string &host() const { return _url.host; } unsigned short port() const { return _port; }
void setPort(unsigned short port) { _url.port = port; }
unsigned short port() const { return _url.port; }
void setBufferSize(size_t bufferSize) { _bufferSize = bufferSize; } void setBufferSize(size_t bufferSize) { _bufferSize = bufferSize; }
size_t bufferSize() const { return _bufferSize; } size_t bufferSize() const { return _bufferSize; }
@ -169,8 +159,8 @@ class SC_GEMPA_CAPS_API Plugin {
* @param password The password * @param password The password
*/ */
void setCredentials(const std::string &user, const std::string &password) { void setCredentials(const std::string &user, const std::string &password) {
_url.user = user; _user = user;
_url.password = password; _password = password;
} }
/** /**
@ -242,42 +232,12 @@ class SC_GEMPA_CAPS_API Plugin {
const std::string &data); const std::string &data);
#endif #endif
void flushEncoders();
void dumpPackets(bool enable);
/**
* @brief Returns the internal packet buffer that
* keeps packets until they have been acknowledged
* by CAPS
* @return The interal packet buffer
*/
const PacketBuffer &packetBuffer() const {
return _packetBuffer;
}
/**
* @brief Sets the agent string. Allows the server to
* identify the application that sends data.
* @param agent The agent string
*/
void setAgent(const std::string &agent);
const char *version() { const char *version() {
return LIB_CAPS_VERSION_NAME; return LIB_CAPS_VERSION_NAME;
} }
protected:
/**
* @brief Sends HELLO request and parses API version
* from server response. If the server does not support
* the API feature the method sets the version to 0.
* @param version The CAPS API version, e.g., 5
* @return True on success
*/
bool getAPIVersion(int &version);
private: private:
typedef std::deque<PacketPtr> PacketBuffer;
typedef boost::shared_ptr<Encoder> EncoderPtr; typedef boost::shared_ptr<Encoder> EncoderPtr;
struct EncoderItem { struct EncoderItem {
@ -290,7 +250,6 @@ class SC_GEMPA_CAPS_API Plugin {
private: private:
bool connect(); bool connect();
void disconnect(); void disconnect();
void dumpPacket(Packet *packet);
bool isConnected() const; bool isConnected() const;
Encoder* getEncoder(PacketPtr packet); Encoder* getEncoder(PacketPtr packet);
bool readResponse(unsigned int sec = 0); bool readResponse(unsigned int sec = 0);
@ -306,6 +265,7 @@ class SC_GEMPA_CAPS_API Plugin {
void tryFlushBackfillingBuffer(StreamState &state); void tryFlushBackfillingBuffer(StreamState &state);
void trimBackfillingBuffer(StreamState &state); void trimBackfillingBuffer(StreamState &state);
bool flush(); bool flush();
void flushEncoders();
bool send(char *data, int len, int timeout = 60); bool send(char *data, int len, int timeout = 60);
void wait(); void wait();
@ -319,6 +279,8 @@ class SC_GEMPA_CAPS_API Plugin {
PacketBuffer _packetBuffer; PacketBuffer _packetBuffer;
bool _packetBufferDirty; bool _packetBufferDirty;
size_t _bytesBuffered; size_t _bytesBuffered;
std::string _host;
unsigned short _port;
char _responseBuf[512]; char _responseBuf[512];
int _responseBufIdx; int _responseBufIdx;
fd_set _readFDs; fd_set _readFDs;
@ -344,15 +306,13 @@ class SC_GEMPA_CAPS_API Plugin {
PacketAckFunc _packetAckFunc; PacketAckFunc _packetAckFunc;
Url _url; std::string _user;
std::string _password;
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL #if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
bool _useSSL; bool _useSSL;
#endif #endif
Stats _stats; Stats _stats;
TimeSpan _maxFutureEndTime; TimeSpan _maxFutureEndTime;
bool _dumpPackets;
std::string _agent;
}; };
typedef boost::shared_ptr<Plugin> PluginPtr; typedef boost::shared_ptr<Plugin> PluginPtr;

@ -91,9 +91,13 @@ PluginApplication::PluginApplication(int argc, char **argv, const string &desc)
_sendTimeout = 60; _sendTimeout = 60;
_logStatus = false; _logStatus = false;
_statusFlushInterval = 10; _statusFlushInterval = 10;
_host = "localhost";
_port = DEFAULT_PORT;
_strAddr = "localhost:" + sc::toString(DEFAULT_PORT); _strAddr = "localhost:" + sc::toString(DEFAULT_PORT);
SC_FS_DECLARE_PATH(path, "@ROOTDIR@/var/run/" + SCCoreApp->name() + "/journal") SC_FS_DECLARE_PATH(path, "@ROOTDIR@/var/run/" + SCCoreApp->name() + "/journal");
_journalFile = path.string(); _journalFile = path.string();
_mseedEnabled = false; _mseedEnabled = false;
@ -101,7 +105,6 @@ PluginApplication::PluginApplication(int argc, char **argv, const string &desc)
_mseedRecordLength = 9; _mseedRecordLength = 9;
_strMseedEncoding = "Steim2"; _strMseedEncoding = "Steim2";
_maxFutureEndTime = 120; _maxFutureEndTime = 120;
_dumpPackets = false;
// By default we disable the acquisition autostart because not all plugins // By default we disable the acquisition autostart because not all plugins
// require this feature. It must be enabled explicitly if required. // require this feature. It must be enabled explicitly if required.
@ -111,16 +114,8 @@ PluginApplication::PluginApplication(int argc, char **argv, const string &desc)
void PluginApplication::createCommandLineDescription() { void PluginApplication::createCommandLineDescription() {
Seiscomp::Client::StreamApplication::createCommandLineDescription(); Seiscomp::Client::StreamApplication::createCommandLineDescription();
commandline().addGroup("Output"); commandline().addGroup("Output");
commandline().addOption("Output", "addr,a", commandline().addOption("Output", "addr,a", "Data output address, format is [HOST:PORT]", &_strAddr);
"Data output address\n" commandline().addOption("Output", "buffer-size,b", "Size (bytes) of the packet buffer", &_bufferSize);
"[[caps|capss]://][user:pass@]host[:port]", &_strAddr);
commandline().addOption("Output", "agent",
"Sets the agent string. Allows "
"the server to identify the "
"application that sends data.",
&_agent);
commandline().addOption("Output", "buffer-size,b",
"Size (bytes) of the packet buffer", &_bufferSize);
commandline().addOption("Output", "backfilling", commandline().addOption("Output", "backfilling",
"Enable backfilling for out-of-order records. The backfilling buffer size is " "Enable backfilling for out-of-order records. The backfilling buffer size is "
"in seconds", &_backfillingBufferSize); "in seconds", &_backfillingBufferSize);
@ -137,7 +132,6 @@ void PluginApplication::createCommandLineDescription() {
"the packet end time is greater than the current time plus this " "the packet end time is greater than the current time plus this "
"value the packet will be discarded. By default this value is set to 120 seconds.", "value the packet will be discarded. By default this value is set to 120 seconds.",
&_maxFutureEndTime); &_maxFutureEndTime);
commandline().addOption("Output", "dump-packets", "Dump packets to stdout");
commandline().addGroup("Journal"); commandline().addGroup("Journal");
commandline().addOption("Journal", "journal,j", commandline().addOption("Journal", "journal,j",
"File to store stream states. Use an empty string to disable this feature.", &_journalFile); "File to store stream states. Use an empty string to disable this feature.", &_journalFile);
@ -198,12 +192,12 @@ bool PluginApplication::init() {
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_INFO, LogInfo);
Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug); Gempa::CAPS::SetLogHandler(Gempa::CAPS::LL_DEBUG, LogDebug);
_plugin.setHost(_host);
_plugin.setPort(_port);
_plugin.setBufferSize(_bufferSize); _plugin.setBufferSize(_bufferSize);
_plugin.setFlushInterval(_flushInterval); _plugin.setFlushInterval(_flushInterval);
_plugin.setTimeouts(_ackTimeout, _lastAckTimeout, _sendTimeout); _plugin.setTimeouts(_ackTimeout, _lastAckTimeout, _sendTimeout);
_plugin.setMaxFutureEndTime(_maxFutureEndTime); _plugin.setMaxFutureEndTime(_maxFutureEndTime);
_plugin.dumpPackets(_dumpPackets);
_plugin.setAgent(_agent);
if ( _mseedEnabled ) { if ( _mseedEnabled ) {
MSEEDEncoderFactory *factory = nullptr; MSEEDEncoderFactory *factory = nullptr;
@ -217,7 +211,7 @@ bool PluginApplication::init() {
factory = new Steim1EncoderFactory(); factory = new Steim1EncoderFactory();
_plugin.setEncoderFactory(factory); _plugin.setEncoderFactory(factory);
} }
else if ( _mseedEncoding == Steim2 ) { if ( _mseedEncoding == Steim2 ) {
SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim2"); SEISCOMP_INFO("Output stream encoding set to MiniSEED/Steim2");
factory = new Steim2EncoderFactory(); factory = new Steim2EncoderFactory();
_plugin.setEncoderFactory(factory); _plugin.setEncoderFactory(factory);
@ -266,23 +260,20 @@ bool PluginApplication::init() {
bool PluginApplication::initConfiguration() { bool PluginApplication::initConfiguration() {
if ( !Seiscomp::Client::StreamApplication::initConfiguration() ) return false; if ( !Seiscomp::Client::StreamApplication::initConfiguration() ) return false;
try { try { _host = configGetString("output.host"); }
_plugin.setHost(configGetString("output.host")); catch ( ... ) { }
}
catch ( ... ) {
}
try { try { _port = configGetInt("output.port"); }
_plugin.setPort(configGetInt("output.port"));
}
catch ( ... ) { } catch ( ... ) { }
try { _sendTimeout = configGetInt("output.timeout"); } try { _sendTimeout = configGetInt("output.timeout"); }
catch ( ... ) { } catch ( ... ) { }
try { try {
string addr = configGetString("output.address"); string addr = configGetString("output.addr");
if ( !_plugin.setAddress(addr) ) { if ( !splitAddress(_host, _port, addr, DEFAULT_PORT) ) {
SEISCOMP_ERROR("%s: Invalid CAPS address, format is [HOST:PORT]",
addr.c_str());
return false; return false;
} }
} }
@ -316,19 +307,6 @@ bool PluginApplication::initConfiguration() {
try { _backfillingBufferSize = configGetInt("output.backfillingBufferSize"); } try { _backfillingBufferSize = configGetInt("output.backfillingBufferSize"); }
catch ( ... ) { } catch ( ... ) { }
try { _maxFutureEndTime = configGetInt("output.maxFutureEndTime"); }
catch ( ... ) { }
try { _agent = configGetString("output.agent"); }
catch ( ... ) {
if ( version() ) {
_agent = name() + string(" ") + version();
}
else {
_agent = name();
}
}
try { _journalFile = configGetString("journal.file"); } try { _journalFile = configGetString("journal.file"); }
catch ( ... ) {} catch ( ... ) {}
@ -347,6 +325,9 @@ bool PluginApplication::initConfiguration() {
try { _statusFlushInterval = configGetInt("statusLog.flush"); } try { _statusFlushInterval = configGetInt("statusLog.flush"); }
catch ( ... ) {} catch ( ... ) {}
try { _maxFutureEndTime= configGetInt("output.maxFutureEndTime"); }
catch ( ... ) { }
return true; return true;
} }
@ -361,10 +342,6 @@ bool PluginApplication::validateParameters() {
_logStatus = true; _logStatus = true;
} }
if ( commandline().hasOption("dump-packets") ) {
_dumpPackets = true;
}
if ( commandline().hasOption("encoding") ) { if ( commandline().hasOption("encoding") ) {
if ( !fromString(_mseedEncoding, _strMseedEncoding)) return false; if ( !fromString(_mseedEncoding, _strMseedEncoding)) return false;
} }
@ -376,7 +353,9 @@ bool PluginApplication::validateParameters() {
} }
if ( commandline().hasOption("addr") ) { if ( commandline().hasOption("addr") ) {
if ( !_plugin.setAddress(_strAddr, DEFAULT_PORT) ) { if ( !splitAddress(_host, _port, _strAddr, DEFAULT_PORT) ) {
SEISCOMP_ERROR("%s: Invalid CAPS address, format is [HOST:PORT]",
_strAddr.c_str());
return false; return false;
} }
} }

@ -83,6 +83,8 @@ class SC_GEMPA_CAPS_API PluginApplication : public Seiscomp::Client::StreamAppli
protected: protected:
Plugin _plugin; Plugin _plugin;
std::string _host;
ushort _port;
std::string _strAddr; std::string _strAddr;
size_t _bufferSize; size_t _bufferSize;
size_t _backfillingBufferSize; size_t _backfillingBufferSize;
@ -103,8 +105,6 @@ class SC_GEMPA_CAPS_API PluginApplication : public Seiscomp::Client::StreamAppli
FileRotator _statusFile; FileRotator _statusFile;
bool _logStatus; bool _logStatus;
uint _statusFlushInterval; uint _statusFlushInterval;
bool _dumpPackets;
std::string _agent;
}; };
} }

@ -249,12 +249,10 @@ DataRecord::ReadStatus RawDataRecord::getData(streambuf &buf, int size,
_startTime = timestampToTime(_header.samplingTime); _startTime = timestampToTime(_header.samplingTime);
if ( _header.samplingFrequencyDenominator > 0 && if ( _header.samplingFrequencyDenominator > 0 &&
_header.samplingFrequencyNumerator > 0 ) { _header.samplingFrequencyNumerator > 0 )
_endTime = _startTime + samplesToTimeSpan(_header, sampleCount); _endTime = _startTime + samplesToTimeSpan(_header, sampleCount);
} else
else {
_endTime = Time(); _endTime = Time();
}
if ( (start.valid() || end.valid()) && _endTime.valid() ) { if ( (start.valid() || end.valid()) && _endTime.valid() ) {
// Out of bounds? // Out of bounds?
@ -267,16 +265,15 @@ DataRecord::ReadStatus RawDataRecord::getData(streambuf &buf, int size,
// Trim packet front // Trim packet front
if ( _startTime < start ) { if ( _startTime < start ) {
TimeSpan ofs = start - _startTime; TimeSpan ofs = start - _startTime;
sampleOfs = timeSpanToSamplesFloor(_header, ofs); sampleOfs = timeSpanToSamplesCeil(_header, ofs);
sampleCount -= sampleOfs; sampleCount -= sampleOfs;
CAPS_DEBUG("Triming packet start: added offset of %d samples", sampleOfs); CAPS_DEBUG("Triming packet start: added offset of %d samples", sampleOfs);
_startTime += samplesToTimeSpan(_header, sampleOfs); _startTime += samplesToTimeSpan(_header, sampleOfs);
// Update header timespan // Update header timespan
timeToTimestamp(_header.samplingTime, _startTime); timeToTimestamp(_header.samplingTime, _startTime);
} }
else { else
sampleOfs = 0; sampleOfs = 0;
}
if ( maxBytes > 0 ) { if ( maxBytes > 0 ) {
int maxSamples = maxBytes / dataTypeSize; int maxSamples = maxBytes / dataTypeSize;
@ -299,13 +296,10 @@ DataRecord::ReadStatus RawDataRecord::getData(streambuf &buf, int size,
CAPS_DEBUG("Triming packet end: added offset of %d samples", trimEnd); CAPS_DEBUG("Triming packet end: added offset of %d samples", trimEnd);
} }
} }
else { else
sampleOfs = 0; sampleOfs = 0;
}
if ( sampleCount == 0 ) { if ( sampleCount == 0 ) return RS_Error;
return RS_Error;
}
_currentHeader = _header; _currentHeader = _header;

@ -1,77 +0,0 @@
/***************************************************************************
* Copyright (C) 2021 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#include "url.h"
#include "utils.h"
#include <boost/algorithm/string.hpp>
using namespace std;
namespace {
const string CAPS_PROTOCOL = "caps";
const string CAPS_SSL_PROTOCOL = "capss";
}
namespace Gempa {
namespace CAPS {
Url::Url()
: port(0) {}
bool Url::parse(const string &address, uint16_t defaultPort) {
string addr = trim(address);
// step 1: protocol
size_t pos = addr.find("://");
if ( pos == string::npos ) {
protocol = CAPS_PROTOCOL;
}
else {
protocol = addr.substr(0, pos);
addr = addr.substr(pos + 3);
}
if ( !boost::iequals(protocol, CAPS_PROTOCOL) &&
!boost::iequals(protocol, CAPS_SSL_PROTOCOL) ) {
errorString = "Unsupported protocol: %" + protocol;
return false;
}
// step 2: user:pass
vector<string> toks;
boost::split(toks, addr, boost::is_any_of("@"), boost::token_compress_on);
if ( toks.size() >= 2 ) {
string login = toks[0];
addr = toks[1];
boost::split(toks, login, boost::is_any_of(":"), boost::token_compress_on);
user = toks.size() > 0 ? toks[0] : "";
password = toks.size() > 1 ? toks[1] : "";
}
// step 3: address
if ( !splitAddress(host, port, addr, defaultPort) ) {
errorString = "Wrong address format, expected "
"[[caps|capss]://][user:pass@]host[:port]";
return false;
}
return true;
}
}
}

@ -1,47 +0,0 @@
/***************************************************************************
* Copyright (C) 2021 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#ifndef GEMPA_CAPS_URL_H
#define GEMPA_CAPS_URL_H
#include <string>
namespace Gempa {
namespace CAPS {
struct Url {
Url();
/**
* @brief Reads url parameters from address string. The
* addr format is [[caps|capss]://][user:pass@]host[:port].
* @param address The address as string
* @param defaultPort Default port
* @return True if the address could be parsed
*/
bool parse(const std::string &address, uint16_t defaultPort = 18002);
std::string host;
uint16_t port;
std::string user;
std::string password;
std::string protocol;
std::string errorString;
};
}
}
#endif

@ -6,8 +6,8 @@
***************************************************************************/ ***************************************************************************/
#ifndef GEMPA_CAPS_VERSION_H #ifndef __GEMPA_CAPS_VERSION_H__
#define GEMPA_CAPS_VERSION_H #define __GEMPA_CAPS_VERSION_H__
/* #if (LIB_CAPS_VERSION >= LIB_CAPS_VERSION_CHECK(1, 0, 0)) */ /* #if (LIB_CAPS_VERSION >= LIB_CAPS_VERSION_CHECK(1, 0, 0)) */
@ -19,23 +19,14 @@
/* LIB_CAPS_VERSION is (major << 16) + (minor << 8) + patch. */ /* LIB_CAPS_VERSION is (major << 16) + (minor << 8) + patch. */
#define LIB_CAPS_VERSION 0x010000 #define LIB_CAPS_VERSION 0x010000
#define LIB_CAPS_VERSION_NAME "2.0.0" #define LIB_CAPS_VERSION_NAME "1.0.0"
/****************************************************************************** /******************************************************************************
API Changelog API Changelog
****************************************************************************** ******************************************************************************
"2.0.0" 0x020000
- Add Plugin::setAddress
- Add Plugin::flushEncoders
- Add Plugin::dumpPackets
- Add Plugin::packetBuffer
- Add Plugin::setAgent
- Add Plugin::getAPIVersion
- Change Plugin memory layout
"1.0.0" 0x010000 "1.0.0" 0x010000
- Initial version - Initial version
*/ */
#endif // GEMPA_CAPS_VERSION_H #endif // __GEMPA_CAPS_VERSION_H__

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save