timer_queue.h 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Borrowed from
  7. // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
  8. // Timer Queue
  9. //
  10. // License
  11. //
  12. // The source code in this article is licensed under the CC0 license, so feel
  13. // free to copy, modify, share, do whatever you want with it.
  14. // No attribution is required, but Ill be happy if you do.
  15. // CC0 license
  16. // The person who associated a work with this deed has dedicated the work to the
  17. // public domain by waiving all of his or her rights to the work worldwide
  18. // under copyright law, including all related and neighboring rights, to the
  19. // extent allowed by law. You can copy, modify, distribute and perform the
  20. // work, even for commercial purposes, all without asking permission.
  21. #pragma once
  22. #include <assert.h>
  23. #include <chrono>
  24. #include <condition_variable>
  25. #include <functional>
  26. #include <queue>
  27. #include <thread>
  28. #include <utility>
  29. #include <vector>
  30. #include "port/port.h"
  31. #include "test_util/sync_point.h"
  32. // Allows execution of handlers at a specified time in the future
  33. // Guarantees:
  34. // - All handlers are executed ONCE, even if cancelled (aborted parameter will
  35. // be set to true)
  36. // - If TimerQueue is destroyed, it will cancel all handlers.
  37. // - Handlers are ALWAYS executed in the Timer Queue worker thread.
  38. // - Handlers execution order is NOT guaranteed
  39. //
  40. ////////////////////////////////////////////////////////////////////////////////
  41. // borrowed from
  42. // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
  43. class TimerQueue {
  44. public:
  45. TimerQueue() : m_th(&TimerQueue::run, this) {}
  46. ~TimerQueue() { shutdown(); }
  47. // This function is not thread-safe.
  48. void shutdown() {
  49. if (closed_) {
  50. return;
  51. }
  52. cancelAll();
  53. // Abusing the timer queue to trigger the shutdown.
  54. add(0, [this](bool) {
  55. m_finish = true;
  56. return std::make_pair(false, 0);
  57. });
  58. m_th.join();
  59. closed_ = true;
  60. }
  61. // Adds a new timer
  62. // \return
  63. // Returns the ID of the new timer. You can use this ID to cancel the
  64. // timer
  65. uint64_t add(int64_t milliseconds,
  66. std::function<std::pair<bool, int64_t>(bool)> handler) {
  67. WorkItem item;
  68. Clock::time_point tp = Clock::now();
  69. item.end = tp + std::chrono::milliseconds(milliseconds);
  70. TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
  71. item.period = milliseconds;
  72. item.handler = std::move(handler);
  73. std::unique_lock<std::mutex> lk(m_mtx);
  74. uint64_t id = ++m_idcounter;
  75. item.id = id;
  76. m_items.push(std::move(item));
  77. // Something changed, so wake up timer thread
  78. m_checkWork.notify_one();
  79. return id;
  80. }
  81. // Cancels the specified timer
  82. // \return
  83. // 1 if the timer was cancelled.
  84. // 0 if you were too late to cancel (or the timer ID was never valid to
  85. // start with)
  86. size_t cancel(uint64_t id) {
  87. // Instead of removing the item from the container (thus breaking the
  88. // heap integrity), we set the item as having no handler, and put
  89. // that handler on a new item at the top for immediate execution
  90. // The timer thread will then ignore the original item, since it has no
  91. // handler.
  92. std::unique_lock<std::mutex> lk(m_mtx);
  93. for (auto&& item : m_items.getContainer()) {
  94. if (item.id == id && item.handler) {
  95. WorkItem newItem;
  96. // Zero time, so it stays at the top for immediate execution
  97. newItem.end = Clock::time_point();
  98. newItem.id = 0; // Means it is a canceled item
  99. // Move the handler from item to newitem (thus clearing item)
  100. newItem.handler = std::move(item.handler);
  101. m_items.push(std::move(newItem));
  102. // Something changed, so wake up timer thread
  103. m_checkWork.notify_one();
  104. return 1;
  105. }
  106. }
  107. return 0;
  108. }
  109. // Cancels all timers
  110. // \return
  111. // The number of timers cancelled
  112. size_t cancelAll() {
  113. // Setting all "end" to 0 (for immediate execution) is ok,
  114. // since it maintains the heap integrity
  115. std::unique_lock<std::mutex> lk(m_mtx);
  116. m_cancel = true;
  117. for (auto&& item : m_items.getContainer()) {
  118. if (item.id && item.handler) {
  119. item.end = Clock::time_point();
  120. item.id = 0;
  121. }
  122. }
  123. auto ret = m_items.size();
  124. m_checkWork.notify_one();
  125. return ret;
  126. }
  127. private:
  128. using Clock = std::chrono::steady_clock;
  129. TimerQueue(const TimerQueue&) = delete;
  130. TimerQueue& operator=(const TimerQueue&) = delete;
  131. void run() {
  132. std::unique_lock<std::mutex> lk(m_mtx);
  133. while (!m_finish) {
  134. auto end = calcWaitTime_lock();
  135. if (end.first) {
  136. // Timers found, so wait until it expires (or something else
  137. // changes)
  138. m_checkWork.wait_until(lk, end.second);
  139. } else {
  140. // No timers exist, so wait forever until something changes
  141. m_checkWork.wait(lk);
  142. }
  143. // Check and execute as much work as possible, such as, all expired
  144. // timers
  145. checkWork(&lk);
  146. }
  147. // If we are shutting down, we should not have any items left,
  148. // since the shutdown cancels all items
  149. assert(m_items.size() == 0);
  150. }
  151. std::pair<bool, Clock::time_point> calcWaitTime_lock() {
  152. while (m_items.size()) {
  153. if (m_items.top().handler) {
  154. // Item present, so return the new wait time
  155. return std::make_pair(true, m_items.top().end);
  156. } else {
  157. // Discard empty handlers (they were cancelled)
  158. m_items.pop();
  159. }
  160. }
  161. // No items found, so return no wait time (causes the thread to wait
  162. // indefinitely)
  163. return std::make_pair(false, Clock::time_point());
  164. }
  165. void checkWork(std::unique_lock<std::mutex>* lk) {
  166. while (m_items.size() && m_items.top().end <= Clock::now()) {
  167. WorkItem item(m_items.top());
  168. m_items.pop();
  169. if (item.handler) {
  170. (*lk).unlock();
  171. auto reschedule_pair = item.handler(item.id == 0);
  172. (*lk).lock();
  173. if (!m_cancel && reschedule_pair.first) {
  174. int64_t new_period = (reschedule_pair.second == -1)
  175. ? item.period
  176. : reschedule_pair.second;
  177. item.period = new_period;
  178. item.end = Clock::now() + std::chrono::milliseconds(new_period);
  179. m_items.push(std::move(item));
  180. }
  181. }
  182. }
  183. }
  184. bool m_finish = false;
  185. bool m_cancel = false;
  186. uint64_t m_idcounter = 0;
  187. std::condition_variable m_checkWork;
  188. struct WorkItem {
  189. Clock::time_point end;
  190. int64_t period;
  191. uint64_t id; // id==0 means it was cancelled
  192. std::function<std::pair<bool, int64_t>(bool)> handler;
  193. bool operator>(const WorkItem& other) const { return end > other.end; }
  194. };
  195. std::mutex m_mtx;
  196. // Inheriting from priority_queue, so we can access the internal container
  197. class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
  198. std::greater<WorkItem>> {
  199. public:
  200. std::vector<WorkItem>& getContainer() { return this->c; }
  201. } m_items;
  202. ROCKSDB_NAMESPACE::port::Thread m_th;
  203. bool closed_ = false;
  204. };