Class PubSubService

Synopsis

#include <lib/inc/drogon/PubSubService.h>

template <typename MessageType>
class PubSubService : public trantor::NonCopyable

Description

This class template implements a publish-subscribe pattern with multiple named topics.

Template Parameters:

MessageType - The message type.

Inheritance

Ancestors: NonCopyable

Methods

clearremove all topics.
publishPublish a message to a topic. The message will be broadcasted to every subscriber.
removeTopicRemove a topic.
sizereturn the number of topics.
subscribeSubscribe to a topic. When a message is published to the topic, the handler is invoked by passing the topic and message as parameters.
unsubscribeUnsubscribe from a topic.

Source

Lines 125-271 in lib/inc/drogon/PubSubService.h. Line 127 in lib/inc/drogon/PubSubService.h.

template <typename MessageType>
class PubSubService : public trantor::NonCopyable
{
  public:
    using MessageHandler =
        std::function<void(const std::string &, const MessageType &)>;
#if __cplusplus >= 201703L | defined _WIN32
    using SharedMutex = std::shared_mutex;
#else
    using SharedMutex = std::shared_timed_mutex;
#endif

    /**
     * @brief Publish a message to a topic. The message will be broadcasted to
     * every subscriber.
     */
    void publish(const std::string &topicName, const MessageType &message) const
    {
        std::shared_ptr<Topic<MessageType>> topicPtr;
        {
            std::shared_lock<SharedMutex> lock(mutex_);
            auto iter = topicMap_.find(topicName);
            if (iter != topicMap_.end())
            {
                topicPtr = iter->second;
            }
            else
            {
                return;
            }
        }
        topicPtr->publish(message);
    }

    /**
     * @brief Subscribe to a topic. When a message is published to the topic,
     * the handler is invoked by passing the topic and message as parameters.
     */
    SubscriberID subscribe(const std::string &topicName,
                           const MessageHandler &handler)
    {
        auto topicHandler = [topicName, handler](const MessageType &message) {
            handler(topicName, message);
        };
        return subscribeToTopic(topicName, std::move(topicHandler));
    }

    /**
     * @brief Subscribe to a topic. When a message is published to the topic,
     * the handler is invoked by passing the topic and message as parameters.
     */
    SubscriberID subscribe(const std::string &topicName,
                           MessageHandler &&handler)
    {
        auto topicHandler = [topicName, handler = std::move(handler)](
                                const MessageType &message) {
            handler(topicName, message);
        };
        return subscribeToTopic(topicName, std::move(topicHandler));
    }

    /**
     * @brief Unsubscribe from a topic.
     *
     * @param topic
     * @param id The subscriber ID returned from the subscribe method.
     */
    void unsubscribe(const std::string &topicName, SubscriberID id)
    {
        {
            std::shared_lock<SharedMutex> lock(mutex_);
            auto iter = topicMap_.find(topicName);
            if (iter == topicMap_.end())
            {
                return;
            }
            iter->second->unsubscribe(id);
            if (!iter->second->empty())
                return;
        }
        std::unique_lock<SharedMutex> lock(mutex_);
        auto iter = topicMap_.find(topicName);
        if (iter == topicMap_.end())
        {
            return;
        }
        if (iter->second->empty())
            topicMap_.erase(iter);
    }

    /**
     * @brief return the number of topics.
     */
    size_t size() const
    {
        std::shared_lock<SharedMutex> lock(mutex_);
        return topicMap_.size();
    }

    /**
     * @brief remove all topics.
     */
    void clear()
    {
        std::unique_lock<SharedMutex> lock(mutex_);
        topicMap_.clear();
    }

    /**
     * @brief Remove a topic
     *
     */
    void removeTopic(const std::string &topicName)
    {
        std::unique_lock<SharedMutex> lock(mutex_);
        topicMap_.erase(topicName);
    }

  private:
    std::unordered_map<std::string, std::shared_ptr<Topic<MessageType>>>
        topicMap_;
    mutable SharedMutex mutex_;
    SubscriberID subID_ = 0;
    SubscriberID subscribeToTopic(
        const std::string &topicName,
        typename Topic<MessageType>::MessageHandler &&handler)
    {
        {
            std::shared_lock<SharedMutex> lock(mutex_);
            auto iter = topicMap_.find(topicName);
            if (iter != topicMap_.end())
            {
                return iter->second->subscribe(std::move(handler));
            }
        }
        std::unique_lock<SharedMutex> lock(mutex_);
        auto iter = topicMap_.find(topicName);
        if (iter != topicMap_.end())
        {
            return iter->second->subscribe(std::move(handler));
        }
        auto topicPtr = std::make_shared<Topic<MessageType>>();
        auto id = topicPtr->subscribe(std::move(handler));
        topicMap_[topicName] = std::move(topicPtr);
        return id;
    }
};





Add Discussion as Guest

Log in to DocsForge