You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
411 lines
13 KiB
C
411 lines
13 KiB
C
1 year ago
|
/***************************************************************************
|
||
|
* Copyright (C) gempa GmbH *
|
||
|
* All rights reserved. *
|
||
|
* Contact: gempa GmbH (seiscomp-dev@gempa.de) *
|
||
|
* *
|
||
|
* Author: Jan Becker *
|
||
|
* Email: jabe@gempa.de *
|
||
|
* *
|
||
|
* GNU Affero General Public License Usage *
|
||
|
* This file may be used under the terms of the GNU Affero *
|
||
|
* Public License version 3.0 as published by the Free Software Foundation *
|
||
|
* and appearing in the file LICENSE included in the packaging of this *
|
||
|
* file. Please review the following information to ensure the GNU Affero *
|
||
|
* Public License version 3.0 requirements will be met: *
|
||
|
* https://www.gnu.org/licenses/agpl-3.0.html. *
|
||
|
* *
|
||
|
* Other Usage *
|
||
|
* Alternatively, this file may be used in accordance with the terms and *
|
||
|
* conditions contained in a signed written agreement between you and *
|
||
|
* gempa GmbH. *
|
||
|
***************************************************************************/
|
||
|
|
||
|
|
||
|
#ifndef SEISCOMP_BROKER_QUEUE_H__
|
||
|
#define SEISCOMP_BROKER_QUEUE_H__
|
||
|
|
||
|
|
||
|
#include <seiscomp/core/enumeration.h>
|
||
|
#include <seiscomp/core/message.h>
|
||
|
|
||
|
#include <seiscomp/broker/messageprocessor.h>
|
||
|
#include <seiscomp/broker/hashset.h>
|
||
|
#include <seiscomp/broker/group.h>
|
||
|
#include <seiscomp/broker/message.h>
|
||
|
#include <seiscomp/broker/statistics.h>
|
||
|
|
||
|
#include <seiscomp/broker/utils/utils.h>
|
||
|
#include <seiscomp/broker/utils/circular.h>
|
||
|
|
||
|
#include <thread>
|
||
|
#include <vector>
|
||
|
|
||
|
|
||
|
namespace Seiscomp {
|
||
|
namespace Messaging {
|
||
|
namespace Broker {
|
||
|
|
||
|
|
||
|
class Client;
|
||
|
class MessageDispatcher;
|
||
|
|
||
|
DEFINE_SMARTPOINTER(MessageProcessor);
|
||
|
|
||
|
|
||
|
/**
|
||
|
* @brief The Queue class implements the central messaging service.
|
||
|
*
|
||
|
* The Queue receives messages, queues them and distributes them to subscribed
|
||
|
* clients.
|
||
|
*/
|
||
|
class SC_BROKER_API Queue {
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Public types
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
using StringList = std::vector<std::string>;
|
||
|
using MessageProcessors = std::vector<MessageProcessorPtr>;
|
||
|
|
||
|
using KeyValueCStrPair = MessageProcessor::KeyValueCStrPair;
|
||
|
using KeyCStrValues = MessageProcessor::KeyCStrValues;
|
||
|
|
||
|
using KeyValuePair = MessageProcessor::KeyValuePair;
|
||
|
using KeyValues = MessageProcessor::KeyValues;
|
||
|
|
||
|
enum Constants {
|
||
|
MaxAdditionalParams = MessageProcessor::MaxAdditionalParams
|
||
|
};
|
||
|
|
||
|
MAKEENUM(
|
||
|
Result,
|
||
|
EVALUES(
|
||
|
Success,
|
||
|
InternalError,
|
||
|
ClientNameNotUnique,
|
||
|
ClientNotAccepted,
|
||
|
GroupNameNotUnique,
|
||
|
GroupDoesNotExist,
|
||
|
GroupAlreadySubscribed,
|
||
|
GroupNotSubscribed,
|
||
|
MessageNotAccepted,
|
||
|
MessageDecodingFailed,
|
||
|
MessageEncodingFailed,
|
||
|
NotEnoughClientHeap
|
||
|
),
|
||
|
ENAMES(
|
||
|
"Success",
|
||
|
"Internal error",
|
||
|
"Client name is not unique",
|
||
|
"Client was not accepted",
|
||
|
"Group name is not unique",
|
||
|
"Group does not exist",
|
||
|
"Already subscribed to group",
|
||
|
"Not subscribed to group",
|
||
|
"Message not accepted",
|
||
|
"Message could not be decoded",
|
||
|
"Message could not be encoded",
|
||
|
"Not enough client heap"
|
||
|
)
|
||
|
);
|
||
|
|
||
|
const std::string StatusGroup = "STATUS_GROUP";
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// X'truction
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
//! C'tor
|
||
|
Queue(const std::string &name, uint64_t maxPayloadSize);
|
||
|
~Queue();
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Public interface
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
/**
|
||
|
* @return The queue name
|
||
|
*/
|
||
|
const std::string &name() const;
|
||
|
|
||
|
/**
|
||
|
* @brief Adds a message processor to the list of processors.
|
||
|
* @param proc The processor instance which is managed by the queue
|
||
|
* with a smart pointer.
|
||
|
* @return Success flag
|
||
|
*/
|
||
|
bool add(MessageProcessor *proc);
|
||
|
|
||
|
/**
|
||
|
* @brief Adds a group/topic to the queue.
|
||
|
* @param name The name of the group
|
||
|
* @return true on success, false otherwise
|
||
|
*/
|
||
|
Result addGroup(const std::string &name);
|
||
|
|
||
|
/**
|
||
|
* @brief Returns a list of available group names
|
||
|
* @return The list of names
|
||
|
*/
|
||
|
const StringList &groups() const { return _groupNames; }
|
||
|
|
||
|
/**
|
||
|
* @brief Return the sender name of the queue.
|
||
|
* @return A NULL terminated const string
|
||
|
*/
|
||
|
const char *senderName() const;
|
||
|
|
||
|
/**
|
||
|
* @brief Sets the message dispatcher for thread synchronisation.
|
||
|
*
|
||
|
* The queue runs a thread to process messages via plugins. If the
|
||
|
* message is processed the thread notifies the queue about it. The
|
||
|
* queue could now call publish but that is probably not thread-safe
|
||
|
* and inefficient to implement on each subscriber. The message
|
||
|
* dispatcher receives a notification about a new message and can then
|
||
|
* implement any inter-thread communication to publish the message in
|
||
|
* the same context as it has been created.
|
||
|
*
|
||
|
* @param dispatcher The dispatcher instance not managed by the queue.
|
||
|
*/
|
||
|
void setMessageDispatcher(MessageDispatcher *dispatcher);
|
||
|
|
||
|
/**
|
||
|
* @brief Subscribe a client to a particular group
|
||
|
* @param client The client
|
||
|
* @param group The name of the group
|
||
|
* @return The result code
|
||
|
*/
|
||
|
Result subscribe(Client *client, const std::string &group);
|
||
|
|
||
|
/**
|
||
|
* @brief Unsubscribes a client from a particular group
|
||
|
* @param client The client
|
||
|
* @param group The name of the group
|
||
|
* @return The result code
|
||
|
*/
|
||
|
Result unsubscribe(Client *client, const std::string &group);
|
||
|
|
||
|
/**
|
||
|
* @brief Returns a buffered message after a particular sequence number
|
||
|
* @param sequenceNumber The sequence number to continue with.
|
||
|
*
|
||
|
* The returned message must have a sequence number greater than
|
||
|
* this parameter or lower if a wrap has occured but never the
|
||
|
* same.
|
||
|
* @param client The client instance to filter subscriptions for
|
||
|
* @return A message pointer or NULL if no message is available
|
||
|
*/
|
||
|
Message *getMessage(SequenceNumber sequenceNumber,
|
||
|
const Client *client) const;
|
||
|
|
||
|
/**
|
||
|
* @brief Pushes a message from a client to the queue
|
||
|
*
|
||
|
* This method is called from Client subclasses that received a message
|
||
|
* through their transport protocol. The message pointer will either
|
||
|
* be managed in a smart pointer or deleted. If false is returned the
|
||
|
* caller must take care of deleting the message.
|
||
|
*
|
||
|
* @param sender The sender instance
|
||
|
* @param msg The message
|
||
|
* @param packetLength The size in bytes of the received packet including
|
||
|
* protocol specific header data. This is only
|
||
|
* used for statistics.
|
||
|
* @return The result code
|
||
|
*/
|
||
|
Result push(Client *sender, Message *msg, int packetSize = 0);
|
||
|
|
||
|
/**
|
||
|
* @brief Activates the queue and starts the processing thread.
|
||
|
*/
|
||
|
void activate();
|
||
|
|
||
|
/**
|
||
|
* @brief Shutdown the queue and finished the processing thread if
|
||
|
* running.
|
||
|
*
|
||
|
* This will also shutdown all processors associated with the queue.
|
||
|
*
|
||
|
* Note that this call can block depending how many plugins are
|
||
|
* running and currently processing a message. This method waits until
|
||
|
* the processing thread is finished.
|
||
|
*/
|
||
|
void shutdown();
|
||
|
|
||
|
/**
|
||
|
* @brief Callback to notify the queue about some timeout.
|
||
|
*
|
||
|
* This function is used to check expiration of outstanding
|
||
|
* acknowledgement messages. This function is not thread-safe and
|
||
|
* must be called from within the thread the queue is running in.
|
||
|
*/
|
||
|
void timeout();
|
||
|
|
||
|
/**
|
||
|
* @brief Populates the passed statistics structure.
|
||
|
* @param stats[out] The target structure
|
||
|
* @param reset[in] Whether to reset the internal statistics or not.
|
||
|
*/
|
||
|
void getStatisticsSnapshot(QueueStatistics &stats, bool reset = true);
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Client memory interface
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
/**
|
||
|
* @brief Allocates additional client heap. Once allocated the heap
|
||
|
* cannot be free'd anymore. This is mainly used for plugins
|
||
|
* that are initialized once and need to store additional
|
||
|
* data in a client structure.
|
||
|
* @param bytes The number of bytes to allocate
|
||
|
* @return An offset to the local client heap or a negative number
|
||
|
* in case of an error. The absolute value (-result) of the
|
||
|
* error translates to a status code (@Result).
|
||
|
*/
|
||
|
int allocateClientHeap(int bytes);
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Publisher interface
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
/**
|
||
|
* @brief Registers a client in the queue and sets up the PubSub
|
||
|
* connections.
|
||
|
*
|
||
|
* This is called when the client calls connect and is part of the
|
||
|
* PublisherBase interface.
|
||
|
* @param client The client to be registered
|
||
|
* @param slot The slot
|
||
|
* @return The result code
|
||
|
*/
|
||
|
Result connect(Client *client, const KeyCStrValues params, int paramCount,
|
||
|
KeyValues &outParams);
|
||
|
|
||
|
/**
|
||
|
* @brief Deregisters a client from the queue and clears the PubSub
|
||
|
* connections.
|
||
|
*
|
||
|
* This is called when the client calls disconnect and is part of the
|
||
|
* PublisherBase interface.
|
||
|
* @param client The client to be deregistered
|
||
|
* @return The result code
|
||
|
*/
|
||
|
Result disconnect(Client *client);
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Settings interface
|
||
|
// ----------------------------------------------------------------------
|
||
|
public:
|
||
|
uint64_t maxPayloadSize() const;
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Private interface
|
||
|
// ----------------------------------------------------------------------
|
||
|
private:
|
||
|
using ProcessingTask = std::pair<Client*,Message*>;
|
||
|
using TaskQueue = Utils::BlockingDequeue<ProcessingTask>;
|
||
|
|
||
|
/**
|
||
|
* @brief Publishes a message from a client to all registered clients
|
||
|
*
|
||
|
* This method is called from Client subclasses that received a message
|
||
|
* through their transport protocol.
|
||
|
*
|
||
|
* @param sender The sender instance
|
||
|
* @param msg The message
|
||
|
* @return true on success, false otherwise
|
||
|
*/
|
||
|
bool publish(Client *sender, Message *msg);
|
||
|
|
||
|
/**
|
||
|
* @brief Pops all messages from the processing queue and publishes them.
|
||
|
*
|
||
|
* This call does not block.
|
||
|
*/
|
||
|
void flushProcessedMessages();
|
||
|
|
||
|
/**
|
||
|
* @brief The processing loop running in a different thread.
|
||
|
*/
|
||
|
void processingLoop();
|
||
|
|
||
|
/**
|
||
|
* @brief Processes a message e.g. via plugins.
|
||
|
* @param task The task to be processed
|
||
|
*/
|
||
|
void process(ProcessingTask &task);
|
||
|
|
||
|
/**
|
||
|
* @brief Called from the processing thread informing the queue that
|
||
|
* the message is processed and can be forwarded to clients.
|
||
|
* @param task The task
|
||
|
*/
|
||
|
void taskReady(const ProcessingTask &task);
|
||
|
|
||
|
/**
|
||
|
* @brief Replaces the incoming message with a response
|
||
|
* @param task The task to be updated
|
||
|
*/
|
||
|
void returnToSender(Message *msg, Core::BaseObject *obj);
|
||
|
|
||
|
|
||
|
// ----------------------------------------------------------------------
|
||
|
// Private members
|
||
|
// ----------------------------------------------------------------------
|
||
|
private:
|
||
|
using Groups = std::map<std::string, GroupPtr>;
|
||
|
using MessageRing = circular_buffer<MessagePtr>;
|
||
|
using ClientNames = KHashSet<const char*>;
|
||
|
using Clients = KHashMap<const char*, Client*>;
|
||
|
|
||
|
std::string _name;
|
||
|
MessageProcessors _processors;
|
||
|
MessageProcessors _connectionProcessors;
|
||
|
MessageProcessors _messageProcessors;
|
||
|
MessageDispatcher *_processedMessageDispatcher;
|
||
|
SequenceNumber _sequenceNumber;
|
||
|
Groups _groups;
|
||
|
StringList _groupNames;
|
||
|
MessageRing _messages;
|
||
|
Clients _clients;
|
||
|
std::thread *_messageProcessor;
|
||
|
TaskQueue _tasks;
|
||
|
TaskQueue _results;
|
||
|
Core::Time _created;
|
||
|
Core::Time _lastSOHTimestamp;
|
||
|
int _allocatedClientHeap;
|
||
|
int _sohInterval;
|
||
|
int _inactivityLimit;
|
||
|
uint64_t _maxPayloadSize;
|
||
|
mutable Tx _txMessages;
|
||
|
mutable Tx _txBytes;
|
||
|
mutable Tx _txPayload;
|
||
|
|
||
|
|
||
|
friend class MessageDispatcher;
|
||
|
};
|
||
|
|
||
|
|
||
|
inline const std::string &Queue::name() const {
|
||
|
return _name;
|
||
|
}
|
||
|
|
||
|
|
||
|
inline uint64_t Queue::maxPayloadSize() const {
|
||
|
return _maxPayloadSize;
|
||
|
}
|
||
|
|
||
|
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
#endif
|