Class CacheMap

Synopsis

#include <lib/inc/drogon/CacheMap.h>

template <typename T1, typename T2>
class CacheMap

Description

Cache Map.

Template Parameters:

T1 - The keyword type.

T2 - The value type.

Note
Four wheels with 200 buckets per wheel means the cache map can work with a timeout up to 200^4 seconds (about 50 years).

Structures

MapValue

Methods

CacheMapconstructor
~CacheMap
eraseErase the value of the keyword.
findCheck if the value of the keyword exists.
findAndFetchAtomically find and get the value of a keyword.
getLoopGet the event loop object.
insertInsert a key-value pair into the cache.
modifyModify or visit the data identified by the key parameter.
operator[]Return the value of the keyword.
runAfterrun the task function after a period of time.

Source

Lines 69-478 in lib/inc/drogon/CacheMap.h.

template <typename T1, typename T2>
class CacheMap
{
  public:
    /// constructor
    /**
     * @param loop
     * eventloop pointer
     * @param tickInterval
     * second
     * @param wheelsNum
     * number of wheels
     * @param bucketsNumPerWheel
     * buckets number per wheel
     * The max delay of the CacheMap is about
     * tickInterval*(bucketsNumPerWheel^wheelsNum) seconds.
     */
    CacheMap(trantor::EventLoop *loop,
             float tickInterval = TICK_INTERVAL,
             size_t wheelsNum = WHEELS_NUM,
             size_t bucketsNumPerWheel = BUCKET_NUM_PER_WHEEL)
        : loop_(loop),
          tickInterval_(tickInterval),
          wheelsNumber_(wheelsNum),
          bucketsNumPerWheel_(bucketsNumPerWheel)
    {
        wheels_.resize(wheelsNumber_);
        for (size_t i = 0; i < wheelsNumber_; ++i)
        {
            wheels_[i].resize(bucketsNumPerWheel_);
        }
        if (tickInterval_ > 0 && wheelsNumber_ > 0 && bucketsNumPerWheel_ > 0)
        {
            timerId_ = loop_->runEvery(tickInterval_, [this]() {
                size_t t = ++ticksCounter_;
                size_t pow = 1;
                for (size_t i = 0; i < wheelsNumber_; ++i)
                {
                    if ((t % pow) == 0)
                    {
                        CallbackBucket tmp;
                        {
                            std::lock_guard<std::mutex> lock(bucketMutex_);
                            // use tmp val to make this critical area as short
                            // as possible.
                            wheels_[i].front().swap(tmp);
                            wheels_[i].pop_front();
                            wheels_[i].push_back(CallbackBucket());
                        }
                    }
                    pow = pow * bucketsNumPerWheel_;
                }
            });
        }
        else
        {
            noWheels_ = true;
        }
    };
    ~CacheMap()
    {
        map_.clear();
        for (auto iter = wheels_.rbegin(); iter != wheels_.rend(); ++iter)
        {
            iter->clear();
        }
        LOG_TRACE << "CacheMap destruct!";
    }
    struct MapValue
    {
        MapValue(const T2 &value,
                 size_t timeout,
                 std::function<void()> &&callback)
            : value_(value),
              timeout_(timeout),
              timeoutCallback_(std::move(callback))
        {
        }
        MapValue(T2 &&value, size_t timeout, std::function<void()> &&callback)
            : value_(std::move(value)),
              timeout_(timeout),
              timeoutCallback_(std::move(callback))
        {
        }
        MapValue(T2 &&value, size_t timeout)
            : value_(std::move(value)), timeout_(timeout)
        {
        }
        MapValue(const T2 &value, size_t timeout)
            : value_(value), timeout_(timeout)
        {
        }
        MapValue(T2 &&value) : value_(std::move(value))
        {
        }
        MapValue(const T2 &value) : value_(value)
        {
        }
        MapValue() = default;
        T2 value_;
        size_t timeout_{0};
        std::function<void()> timeoutCallback_;
        WeakCallbackEntryPtr weakEntryPtr_;
    };

    /**
     * @brief Insert a key-value pair into the cache.
     *
     * @param key The key
     * @param value The value
     * @param timeout The timeout in seconds, if timeout > 0, the value will be
     * erased within the 'timeout' seconds after the last access. If the timeout
     * is zero, the value exists until being removed explicitly.
     * @param timeoutCallback is called when the timeout expires.
     */
    void insert(const T1 &key,
                T2 &&value,
                size_t timeout = 0,
                std::function<void()> timeoutCallback = std::function<void()>())
    {
        if (timeout > 0)
        {
            MapValue v{std::move(value), timeout, std::move(timeoutCallback)};
            std::lock_guard<std::mutex> lock(mtx_);
            map_.insert(std::make_pair(key, std::move(v)));
            eraseAfter(timeout, key);
        }
        else
        {
            MapValue v{std::move(value)};
            std::lock_guard<std::mutex> lock(mtx_);
            map_.insert(std::make_pair(key, std::move(v)));
        }
    }
    /**
     * @brief Insert a key-value pair into the cache.
     *
     * @param key The key
     * @param value The value
     * @param timeout The timeout in seconds, if timeout > 0, the value will be
     * erased within the 'timeout' seconds after the last access. If the timeout
     * is zero, the value exists until being removed explicitly.
     * @param timeoutCallback is called when the timeout expires.
     */
    void insert(const T1 &key,
                const T2 &value,
                size_t timeout = 0,
                std::function<void()> timeoutCallback = std::function<void()>())
    {
        if (timeout > 0)
        {
            MapValue v{value, timeout, std::move(timeoutCallback)};
            std::lock_guard<std::mutex> lock(mtx_);
            map_.insert(std::make_pair(key, std::move(v)));
            eraseAfter(timeout, key);
        }
        else
        {
            MapValue v{value};
            std::lock_guard<std::mutex> lock(mtx_);
            map_.insert(std::make_pair(key, std::move(v)));
        }
    }

    /**
     * @brief Return the value of the keyword.
     *
     * @param key
     * @return T2
     * @note This function returns a copy of the data in the cache. If the data
     * is not found, a default T2 type value is returned and nothing is inserted
     * into the cache.
     */
    T2 operator[](const T1 &key)
    {
        int timeout = 0;
        std::lock_guard<std::mutex> lock(mtx_);
        auto iter = map_.find(key);
        if (iter != map_.end())
        {
            timeout = iter->second.timeout_;
            if (timeout > 0)
                eraseAfter(timeout, key);
            return iter->second.value_;
        }
        return T2();
    }

    /**
     * @brief Modify or visit the data identified by the key parameter.
     *
     * @tparam Callable the type of the handler.
     * @param key
     * @param handler A callable that can modify or visit the data. The
     * signature of the handler should be equivalent to 'void(T2&)' or
     * 'void(const T2&)'
     * @param timeout In seconds.
     *
     * @note This function is multiple-thread safe. if the data identified by
     * the key doesn't exist, a new one is created and passed to the handler and
     * stored in the cache with the timeout parameter. The changing of the data
     * is protected by the mutex of the cache.
     */
    template <typename Callable>
    void modify(const T1 &key, Callable &&handler, size_t timeout = 0)
    {
        std::lock_guard<std::mutex> lock(mtx_);
        auto iter = map_.find(key);
        if (iter != map_.end())
        {
            timeout = iter->second.timeout_;
            handler(iter->second.value_);
            if (timeout > 0)
                eraseAfter(timeout, key);
            return;
        }
        MapValue v{T2(), timeout};
        handler(v.value_);
        map_.insert(std::make_pair(key, std::move(v)));
        if (timeout > 0)
        {
            eraseAfter(timeout, key);
        }
    }
    /// Check if the value of the keyword exists
    bool find(const T1 &key)
    {
        int timeout = 0;
        bool flag = false;

        std::lock_guard<std::mutex> lock(mtx_);
        auto iter = map_.find(key);
        if (iter != map_.end())
        {
            timeout = iter->second.timeout_;
            flag = true;
        }

        if (timeout > 0)
            eraseAfter(timeout, key);

        return flag;
    }

    /// Atomically find and get the value of a keyword
    /**
     * Return true when the value is found, and the value
     * is assigned to the value argument.
     */
    bool findAndFetch(const T1 &key, T2 &value)
    {
        size_t timeout = 0;
        bool flag = false;
        std::lock_guard<std::mutex> lock(mtx_);
        auto iter = map_.find(key);
        if (iter != map_.end())
        {
            timeout = iter->second.timeout_;
            flag = true;
            value = iter->second.value_;
        }

        if (timeout > 0)
            eraseAfter(timeout, key);

        return flag;
    }

    /// Erase the value of the keyword.
    /**
     * @param key the keyword.
     * @note This function does not cause the timeout callback to be executed.
     */
    void erase(const T1 &key)
    {
        // in this case,we don't evoke the timeout callback;
        std::lock_guard<std::mutex> lock(mtx_);
        map_.erase(key);
    }
    /**
     * @brief Get the event loop object
     *
     * @return trantor::EventLoop*
     */
    trantor::EventLoop *getLoop()
    {
        return loop_;
    }

    /**
     * @brief run the task function after a period of time.
     *
     * @param delay in seconds
     * @param task
     * @note This timer is a low-precision timer whose accuracy depends on the
     * tickInterval parameter of the cache. The advantage of the timer is its
     * low cost.
     */
    void runAfter(size_t delay, std::function<void()> &&task)
    {
        std::lock_guard<std::mutex> lock(bucketMutex_);
        insertEntry(delay, std::make_shared<CallbackEntry>(std::move(task)));
    }
    void runAfter(size_t delay, const std::function<void()> &task)
    {
        std::lock_guard<std::mutex> lock(bucketMutex_);
        insertEntry(delay, std::make_shared<CallbackEntry>(task));
    }

  private:
    std::unordered_map<T1, MapValue> map_;

    std::vector<CallbackBucketQueue> wheels_;

    std::atomic<size_t> ticksCounter_{0};

    std::mutex mtx_;
    std::mutex bucketMutex_;
    trantor::TimerId timerId_;
    trantor::EventLoop *loop_;

    float tickInterval_;
    size_t wheelsNumber_;
    size_t bucketsNumPerWheel_;

    bool noWheels_{false};

    void insertEntry(size_t delay, CallbackEntryPtr entryPtr)
    {
        // protected by bucketMutex;
        if (delay <= 0)
            return;
        delay = static_cast<size_t>(delay / tickInterval_ + 1);
        size_t t = ticksCounter_;
        for (size_t i = 0; i < wheelsNumber_; ++i)
        {
            if (delay <= bucketsNumPerWheel_)
            {
                wheels_[i][delay - 1].insert(entryPtr);
                break;
            }
            if (i < (wheelsNumber_ - 1))
            {
                entryPtr = std::make_shared<CallbackEntry>(
                    [this, delay, i, t, entryPtr]() {
                        if (delay > 0)
                        {
                            std::lock_guard<std::mutex> lock(bucketMutex_);
                            wheels_[i][(delay + (t % bucketsNumPerWheel_) - 1) %
                                       bucketsNumPerWheel_]
                                .insert(entryPtr);
                        }
                    });
            }
            else
            {
                // delay is too long to put entry at valid position in wheels;
                wheels_[i][bucketsNumPerWheel_ - 1].insert(entryPtr);
            }
            delay =
                (delay + (t % bucketsNumPerWheel_) - 1) / bucketsNumPerWheel_;
            t = t / bucketsNumPerWheel_;
        }
    }
    void eraseAfter(size_t delay, const T1 &key)
    {
        if (noWheels_)
            return;
        assert(map_.find(key) != map_.end());

        CallbackEntryPtr entryPtr;

        if (map_.find(key) != map_.end())
        {
            entryPtr = map_[key].weakEntryPtr_.lock();
        }

        if (entryPtr)
        {
            std::lock_guard<std::mutex> lock(bucketMutex_);
            insertEntry(delay, entryPtr);
        }
        else
        {
            std::function<void()> cb = [this, key]() {
                std::lock_guard<std::mutex> lock(mtx_);
                if (map_.find(key) != map_.end())
                {
                    auto &value = map_[key];
                    auto entryPtr = value.weakEntryPtr_.lock();
                    // entryPtr is used to avoid race conditions
                    if (value.timeout_ > 0 && !entryPtr)
                    {
                        if (value.timeoutCallback_)
                        {
                            value.timeoutCallback_();
                        }
                        map_.erase(key);
                    }
                }
            };
            entryPtr = std::make_shared<CallbackEntry>(std::move(cb));
            map_[key].weakEntryPtr_ = entryPtr;
            {
                std::lock_guard<std::mutex> lock(bucketMutex_);
                insertEntry(delay, entryPtr);
            }
        }
    }
};





Add Discussion as Guest

Log in to DocsForge