You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
9.8 KiB
C++

/***************************************************************************
* Copyright (C) 2013 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#ifndef GEMPA_CAPS_PLUGIN_H
#define GEMPA_CAPS_PLUGIN_H
/*
// Enable/disable journal
#define CAPS_FEATURES_JOURNAL 1
// Enable/disable backfilling of packets
#define CAPS_FEATURES_BACKFILLING 1
#define CAPS_FEATURES_SSL 1
// Supportted CAPS packets
#define CAPS_FEATURES_ANY 1
#define CAPS_FEATURES_MSEED 1
#define CAPS_FEATURES_RAW 1
#define CAPS_FEATURES_RTCM2 1
*/
#include "encoderfactory.h"
#include <gempa/caps/packet.h>
#include <gempa/caps/socket.h>
#include <gempa/caps/version.h>
#include <gempa/caps/pluginpacket.h>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <deque>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <list>
namespace Gempa {
namespace CAPS {
class SC_GEMPA_CAPS_API Plugin {
public:
enum Status {
Success,
PacketSize, /* Packet is bigger than the buffer size */
PacketLoss, /* The other end didn't acknowledge
transmitted data after some time */
PacketNotValid, /* Read meta data failed*/
PacketNotSupported,
MaxFutureEndTimeExceeded
};
struct Stats {
Stats() : maxBytesBuffered(0) {}
size_t maxBytesBuffered;
};
typedef std::vector<char> Buffer;
typedef boost::shared_ptr<Buffer> BufferPtr;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
typedef std::list<PacketPtr> BackfillingBuffer;
#endif
struct StreamState {
Time lastEndTime;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
Time lastCommitEndTime;
BackfillingBuffer backfillingBuffer;
#endif
};
typedef std::map<std::string, StreamState> StreamStates;
typedef boost::function<void (const std::string &, const CAPS::Time &,
const CAPS::Time &)> PacketAckFunc;
public:
Plugin(const std::string &name, const std::string &options = "",
const std::string &description = "");
~Plugin();
void close();
void quit();
void enableLogging();
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
void setBackfillingBufferSize(int secs) { _backfillingBufferSize = secs; }
int backfillingBufferSize() const { return _backfillingBufferSize; }
#endif
/**
* @brief Returns whether the plugin has been
* requested to quit or not.
* @return True, if the plugin has been requested to quit.
*/
bool isExitRequested() {
return _isExitRequested;
}
/**
* @brief Returns the stat object
* @return The stat object
*/
const Stats& stats() const {
return _stats;
}
/**
* @brief Resets the max bytes buffered counter
*/
void resetMaxBytesBuffered() {
_stats.maxBytesBuffered = 0;
}
/**
* @brief Sets the encoder factory used to created encoders for packet
* encoding, e.g. miniSEED.
* @param factory The ownership of the factory goes to the plugin and
* will be deleted if necessary.
*/
void setEncoderFactory(EncoderFactory *factory);
void setHost(const std::string &host) { _host = host; }
const std::string &host() const { return _host; }
void setPort(unsigned short port) { _port = port; }
unsigned short port() const { return _port; }
void setBufferSize(size_t bufferSize) { _bufferSize = bufferSize; }
size_t bufferSize() const { return _bufferSize; }
//! Enables SSL feature
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
void setSSLEnabled(bool enable) { _useSSL = enable; }
#endif
/**
* @brief Sets username and password
* @param user The username
* @param password The password
*/
void setCredentials(const std::string &user, const std::string &password) {
_user = user;
_password = password;
}
/**
* @brief Sets the maximum allowed relative end time for packets. If
* the packet end time is greater than the current time plus this
* value the packet will be discarded. By default this value is
* set to 120 seconds.
* @param time The time span
*/
void setMaxFutureEndTime(const TimeSpan &span) {
_maxFutureEndTime = span;
}
void setPacketAckFunc(const PacketAckFunc &func) { _packetAckFunc = func; }
void setSendTimeout(int timeout) {
_sendTimeout = timeout;
}
void setTimeouts(int ack, int lastAck) {
_ackTimeout = ack;
_lastAckTimeout = lastAck;
}
void setTimeouts(int ack, int lastAck, int send) {
_ackTimeout = ack;
_lastAckTimeout = lastAck;
_sendTimeout = send;
}
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool readJournal();
void setJournal(const std::string &filename) { _journalFile = filename; }
void setFlushInterval(int interval) { _flushInterval = interval; }
const StreamStates &streamStates() const { return _states; }
bool writeJournal();
bool writeJournal(std::ostream &ostream);
#endif
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
DataRecordPtr rec, const std::string &uom,
int timingQuality = -1);
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime,
uint16_t numerator, uint16_t denominator,
const std::string &uom,
void *data, size_t count,
DataType dt, int timingQuality = -1);
/*
* Sends given data as any packet. Before sending the data will be
* copied into an internal buffer. We introduced this method
* to simplify the creation of the python wrappers.
*/
#if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
char *data, size_t count);
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
const std::string &data);
#endif
const char *version() {
return LIB_CAPS_VERSION_NAME;
}
private:
typedef std::deque<PacketPtr> PacketBuffer;
typedef boost::shared_ptr<Encoder> EncoderPtr;
struct EncoderItem {
EncoderItem() : dataType(DT_Unknown) {}
EncoderPtr encoder;
DataType dataType;
};
typedef std::map<std::string, EncoderItem> EncoderItems;
private:
bool connect();
void disconnect();
bool isConnected() const;
Encoder* getEncoder(PacketPtr packet);
bool readResponse(unsigned int sec = 0);
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool readJournal(std::istream &istream);
void updateJournal();
#endif
void sendBye();
bool commitPacket(PacketPtr packet);
bool encodePacket(PacketPtr packet);
bool sendPacket(Packet *packet);
void serializePacket(Packet *packet);
void tryFlushBackfillingBuffer(StreamState &state);
void trimBackfillingBuffer(StreamState &state);
bool flush();
void flushEncoders();
bool send(char *data, int len, int timeout = 60);
void wait();
private:
SocketPtr _socket;
std::string _name;
std::string _options;
std::string _description;
size_t _bufferSize;
StreamStates _states;
PacketBuffer _packetBuffer;
bool _packetBufferDirty;
size_t _bytesBuffered;
std::string _host;
unsigned short _port;
char _responseBuf[512];
int _responseBufIdx;
fd_set _readFDs;
fd_set _writeFDs;
int _sendTimeout;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
std::string _journalFile;
bool _journalDirty;
Time _lastWrite;
int _flushInterval;
#endif
bool _isExitRequested;
bool _closed;
bool _wasConnected;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
int _backfillingBufferSize;
#endif
int _ackTimeout;
int _lastAckTimeout;
EncoderFactory *_encoderFactory;
EncoderItems _encoderItems;
PacketAckFunc _packetAckFunc;
std::string _user;
std::string _password;
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
bool _useSSL;
#endif
Stats _stats;
TimeSpan _maxFutureEndTime;
};
typedef boost::shared_ptr<Plugin> PluginPtr;
}
}
#endif