You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

365 lines
11 KiB
C++

/***************************************************************************
* Copyright (C) 2013 by gempa GmbH *
* *
* All Rights Reserved. *
* *
* NOTICE: All information contained herein is, and remains *
* the property of gempa GmbH and its suppliers, if any. The intellectual *
* and technical concepts contained herein are proprietary to gempa GmbH *
* and its suppliers. *
* Dissemination of this information or reproduction of this material *
* is strictly forbidden unless prior written permission is obtained *
* from gempa GmbH. *
***************************************************************************/
#ifndef GEMPA_CAPS_PLUGIN_H
#define GEMPA_CAPS_PLUGIN_H
/*
// Enable/disable journal
#define CAPS_FEATURES_JOURNAL 1
// Enable/disable backfilling of packets
#define CAPS_FEATURES_BACKFILLING 1
#define CAPS_FEATURES_SSL 1
// Supportted CAPS packets
#define CAPS_FEATURES_ANY 1
#define CAPS_FEATURES_MSEED 1
#define CAPS_FEATURES_RAW 1
#define CAPS_FEATURES_RTCM2 1
*/
#include "encoderfactory.h"
#include <gempa/caps/packet.h>
#include <gempa/caps/socket.h>
#include <gempa/caps/url.h>
#include <gempa/caps/version.h>
#include <gempa/caps/pluginpacket.h>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <deque>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <list>
namespace Gempa {
namespace CAPS {
class SC_GEMPA_CAPS_API Plugin {
public:
enum Status {
Success,
PacketSize, /* Packet is bigger than the buffer size */
PacketLoss, /* The other end didn't acknowledge
transmitted data after some time */
PacketNotValid, /* Read meta data failed*/
PacketNotSupported,
MaxFutureEndTimeExceeded
};
struct Stats {
Stats() : maxBytesBuffered(0) {}
size_t maxBytesBuffered;
};
typedef std::vector<char> Buffer;
typedef boost::shared_ptr<Buffer> BufferPtr;
typedef std::deque<PacketPtr> PacketBuffer;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
typedef std::list<PacketPtr> BackfillingBuffer;
#endif
struct StreamState {
Time lastEndTime;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
Time lastCommitEndTime;
BackfillingBuffer backfillingBuffer;
#endif
};
typedef std::map<std::string, StreamState> StreamStates;
typedef boost::function<void (const std::string &, const CAPS::Time &,
const CAPS::Time &)> PacketAckFunc;
public:
Plugin(const std::string &name, const std::string &options = "",
const std::string &description = "");
~Plugin();
void close();
void quit();
void enableLogging();
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
void setBackfillingBufferSize(int secs) { _backfillingBufferSize = secs; }
int backfillingBufferSize() const { return _backfillingBufferSize; }
#endif
/**
* @brief Returns whether the plugin has been
* requested to quit or not.
* @return True, if the plugin has been requested to quit.
*/
bool isExitRequested() {
return _isExitRequested;
}
/**
* @brief Returns the stat object
* @return The stat object
*/
const Stats& stats() const {
return _stats;
}
/**
* @brief Resets the max bytes buffered counter
*/
void resetMaxBytesBuffered() {
_stats.maxBytesBuffered = 0;
}
/**
* @brief Sets the encoder factory used to created encoders for packet
* encoding, e.g. miniSEED.
* @param factory The ownership of the factory goes to the plugin and
* will be deleted if necessary.
*/
void setEncoderFactory(EncoderFactory *factory);
/**
* @brief Parses connection parameters from address string. Format
* is [[caps|capss]://][user:pass@]host[:port]
* @param addr The address of the caps server as string
* @param defaultPort The default port used when the port is omitted
*/
bool setAddress(const std::string &addr, uint16_t defaultPort = 18003);
void setHost(const std::string &host) { _url.host = host; }
const std::string &host() const { return _url.host; }
void setPort(unsigned short port) { _url.port = port; }
unsigned short port() const { return _url.port; }
void setBufferSize(size_t bufferSize) { _bufferSize = bufferSize; }
size_t bufferSize() const { return _bufferSize; }
//! Enables SSL feature
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
void setSSLEnabled(bool enable) { _useSSL = enable; }
#endif
/**
* @brief Sets username and password
* @param user The username
* @param password The password
*/
void setCredentials(const std::string &user, const std::string &password) {
_url.user = user;
_url.password = password;
}
/**
* @brief Sets the maximum allowed relative end time for packets. If
* the packet end time is greater than the current time plus this
* value the packet will be discarded. By default this value is
* set to 120 seconds.
* @param time The time span
*/
void setMaxFutureEndTime(const TimeSpan &span) {
_maxFutureEndTime = span;
}
void setPacketAckFunc(const PacketAckFunc &func) { _packetAckFunc = func; }
void setSendTimeout(int timeout) {
_sendTimeout = timeout;
}
void setTimeouts(int ack, int lastAck) {
_ackTimeout = ack;
_lastAckTimeout = lastAck;
}
void setTimeouts(int ack, int lastAck, int send) {
_ackTimeout = ack;
_lastAckTimeout = lastAck;
_sendTimeout = send;
}
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool readJournal();
void setJournal(const std::string &filename) { _journalFile = filename; }
void setFlushInterval(int interval) { _flushInterval = interval; }
const StreamStates &streamStates() const { return _states; }
bool writeJournal();
bool writeJournal(std::ostream &ostream);
#endif
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
DataRecordPtr rec, const std::string &uom,
int timingQuality = -1);
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime,
uint16_t numerator, uint16_t denominator,
const std::string &uom,
void *data, size_t count,
DataType dt, int timingQuality = -1);
/*
* Sends given data as any packet. Before sending the data will be
* copied into an internal buffer. We introduced this method
* to simplify the creation of the python wrappers.
*/
#if !defined(CAPS_FEATURES_ANY) || CAPS_FEATURES_ANY
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
char *data, size_t count);
Status push(const std::string &net, const std::string &sta,
const std::string &loc, const std::string &cha,
const Time &stime, uint16_t numerator,
uint16_t denominator, const std::string &format,
const std::string &data);
#endif
void flushEncoders();
void dumpPackets(bool enable);
/**
* @brief Returns the internal packet buffer that
* keeps packets until they have been acknowledged
* by CAPS
* @return The interal packet buffer
*/
const PacketBuffer &packetBuffer() const {
return _packetBuffer;
}
/**
* @brief Sets the agent string. Allows the server to
* identify the application that sends data.
* @param agent The agent string
*/
void setAgent(const std::string &agent);
const char *version() {
return LIB_CAPS_VERSION_NAME;
}
protected:
/**
* @brief Sends HELLO request and parses API version
* from server response. If the server does not support
* the API feature the method sets the version to 0.
* @param version The CAPS API version, e.g., 5
* @return True on success
*/
bool getAPIVersion(int &version);
private:
typedef boost::shared_ptr<Encoder> EncoderPtr;
struct EncoderItem {
EncoderItem() : dataType(DT_Unknown) {}
EncoderPtr encoder;
DataType dataType;
};
typedef std::map<std::string, EncoderItem> EncoderItems;
private:
bool connect();
void disconnect();
void dumpPacket(Packet *packet);
bool isConnected() const;
Encoder* getEncoder(PacketPtr packet);
bool readResponse(unsigned int sec = 0);
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
bool readJournal(std::istream &istream);
void updateJournal();
#endif
void sendBye();
bool commitPacket(PacketPtr packet);
bool encodePacket(PacketPtr packet);
bool sendPacket(Packet *packet);
void serializePacket(Packet *packet);
void tryFlushBackfillingBuffer(StreamState &state);
void trimBackfillingBuffer(StreamState &state);
bool flush();
bool send(char *data, int len, int timeout = 60);
void wait();
private:
SocketPtr _socket;
std::string _name;
std::string _options;
std::string _description;
size_t _bufferSize;
StreamStates _states;
PacketBuffer _packetBuffer;
bool _packetBufferDirty;
size_t _bytesBuffered;
char _responseBuf[512];
int _responseBufIdx;
fd_set _readFDs;
fd_set _writeFDs;
int _sendTimeout;
#if !defined(CAPS_FEATURES_JOURNAL) || CAPS_FEATURES_JOURNAL
std::string _journalFile;
bool _journalDirty;
Time _lastWrite;
int _flushInterval;
#endif
bool _isExitRequested;
bool _closed;
bool _wasConnected;
#if !defined(CAPS_FEATURES_BACKFILLING) || CAPS_FEATURES_BACKFILLING
int _backfillingBufferSize;
#endif
int _ackTimeout;
int _lastAckTimeout;
EncoderFactory *_encoderFactory;
EncoderItems _encoderItems;
PacketAckFunc _packetAckFunc;
Url _url;
#if !defined(CAPS_FEATURES_SSL) || CAPS_FEATURES_SSL
bool _useSSL;
#endif
Stats _stats;
TimeSpan _maxFutureEndTime;
bool _dumpPackets;
std::string _agent;
};
typedef boost::shared_ptr<Plugin> PluginPtr;
}
}
#endif