| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Borrowed from
- // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
- // Timer Queue
- //
- // License
- //
- // The source code in this article is licensed under the CC0 license, so feel
- // free to copy, modify, share, do whatever you want with it.
- // No attribution is required, but Ill be happy if you do.
- // CC0 license
- // The person who associated a work with this deed has dedicated the work to the
- // public domain by waiving all of his or her rights to the work worldwide
- // under copyright law, including all related and neighboring rights, to the
- // extent allowed by law. You can copy, modify, distribute and perform the
- // work, even for commercial purposes, all without asking permission.
- #pragma once
- #include <assert.h>
- #include <chrono>
- #include <condition_variable>
- #include <functional>
- #include <queue>
- #include <thread>
- #include <utility>
- #include <vector>
- #include "port/port.h"
- #include "test_util/sync_point.h"
- // Allows execution of handlers at a specified time in the future
- // Guarantees:
- // - All handlers are executed ONCE, even if cancelled (aborted parameter will
- // be set to true)
- // - If TimerQueue is destroyed, it will cancel all handlers.
- // - Handlers are ALWAYS executed in the Timer Queue worker thread.
- // - Handlers execution order is NOT guaranteed
- //
- ////////////////////////////////////////////////////////////////////////////////
- // borrowed from
- // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
- class TimerQueue {
- public:
- TimerQueue() : m_th(&TimerQueue::run, this) {}
- ~TimerQueue() { shutdown(); }
- // This function is not thread-safe.
- void shutdown() {
- if (closed_) {
- return;
- }
- cancelAll();
- // Abusing the timer queue to trigger the shutdown.
- add(0, [this](bool) {
- m_finish = true;
- return std::make_pair(false, 0);
- });
- m_th.join();
- closed_ = true;
- }
- // Adds a new timer
- // \return
- // Returns the ID of the new timer. You can use this ID to cancel the
- // timer
- uint64_t add(int64_t milliseconds,
- std::function<std::pair<bool, int64_t>(bool)> handler) {
- WorkItem item;
- Clock::time_point tp = Clock::now();
- item.end = tp + std::chrono::milliseconds(milliseconds);
- TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
- item.period = milliseconds;
- item.handler = std::move(handler);
- std::unique_lock<std::mutex> lk(m_mtx);
- uint64_t id = ++m_idcounter;
- item.id = id;
- m_items.push(std::move(item));
- // Something changed, so wake up timer thread
- m_checkWork.notify_one();
- return id;
- }
- // Cancels the specified timer
- // \return
- // 1 if the timer was cancelled.
- // 0 if you were too late to cancel (or the timer ID was never valid to
- // start with)
- size_t cancel(uint64_t id) {
- // Instead of removing the item from the container (thus breaking the
- // heap integrity), we set the item as having no handler, and put
- // that handler on a new item at the top for immediate execution
- // The timer thread will then ignore the original item, since it has no
- // handler.
- std::unique_lock<std::mutex> lk(m_mtx);
- for (auto&& item : m_items.getContainer()) {
- if (item.id == id && item.handler) {
- WorkItem newItem;
- // Zero time, so it stays at the top for immediate execution
- newItem.end = Clock::time_point();
- newItem.id = 0; // Means it is a canceled item
- // Move the handler from item to newitem (thus clearing item)
- newItem.handler = std::move(item.handler);
- m_items.push(std::move(newItem));
- // Something changed, so wake up timer thread
- m_checkWork.notify_one();
- return 1;
- }
- }
- return 0;
- }
- // Cancels all timers
- // \return
- // The number of timers cancelled
- size_t cancelAll() {
- // Setting all "end" to 0 (for immediate execution) is ok,
- // since it maintains the heap integrity
- std::unique_lock<std::mutex> lk(m_mtx);
- m_cancel = true;
- for (auto&& item : m_items.getContainer()) {
- if (item.id && item.handler) {
- item.end = Clock::time_point();
- item.id = 0;
- }
- }
- auto ret = m_items.size();
- m_checkWork.notify_one();
- return ret;
- }
- private:
- using Clock = std::chrono::steady_clock;
- TimerQueue(const TimerQueue&) = delete;
- TimerQueue& operator=(const TimerQueue&) = delete;
- void run() {
- std::unique_lock<std::mutex> lk(m_mtx);
- while (!m_finish) {
- auto end = calcWaitTime_lock();
- if (end.first) {
- // Timers found, so wait until it expires (or something else
- // changes)
- m_checkWork.wait_until(lk, end.second);
- } else {
- // No timers exist, so wait forever until something changes
- m_checkWork.wait(lk);
- }
- // Check and execute as much work as possible, such as, all expired
- // timers
- checkWork(&lk);
- }
- // If we are shutting down, we should not have any items left,
- // since the shutdown cancels all items
- assert(m_items.size() == 0);
- }
- std::pair<bool, Clock::time_point> calcWaitTime_lock() {
- while (m_items.size()) {
- if (m_items.top().handler) {
- // Item present, so return the new wait time
- return std::make_pair(true, m_items.top().end);
- } else {
- // Discard empty handlers (they were cancelled)
- m_items.pop();
- }
- }
- // No items found, so return no wait time (causes the thread to wait
- // indefinitely)
- return std::make_pair(false, Clock::time_point());
- }
- void checkWork(std::unique_lock<std::mutex>* lk) {
- while (m_items.size() && m_items.top().end <= Clock::now()) {
- WorkItem item(m_items.top());
- m_items.pop();
- if (item.handler) {
- (*lk).unlock();
- auto reschedule_pair = item.handler(item.id == 0);
- (*lk).lock();
- if (!m_cancel && reschedule_pair.first) {
- int64_t new_period = (reschedule_pair.second == -1)
- ? item.period
- : reschedule_pair.second;
- item.period = new_period;
- item.end = Clock::now() + std::chrono::milliseconds(new_period);
- m_items.push(std::move(item));
- }
- }
- }
- }
- bool m_finish = false;
- bool m_cancel = false;
- uint64_t m_idcounter = 0;
- std::condition_variable m_checkWork;
- struct WorkItem {
- Clock::time_point end;
- int64_t period;
- uint64_t id; // id==0 means it was cancelled
- std::function<std::pair<bool, int64_t>(bool)> handler;
- bool operator>(const WorkItem& other) const { return end > other.end; }
- };
- std::mutex m_mtx;
- // Inheriting from priority_queue, so we can access the internal container
- class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
- std::greater<WorkItem>> {
- public:
- std::vector<WorkItem>& getContainer() { return this->c; }
- } m_items;
- ROCKSDB_NAMESPACE::port::Thread m_th;
- bool closed_ = false;
- };
|