timer.h 10 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #pragma once
  7. #include <functional>
  8. #include <memory>
  9. #include <queue>
  10. #include <unordered_map>
  11. #include <utility>
  12. #include <vector>
  13. #include "monitoring/instrumented_mutex.h"
  14. #include "rocksdb/system_clock.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/mutexlock.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. // A Timer class to handle repeated work.
  19. //
  20. // `Start()` and `Shutdown()` are currently not thread-safe. The client must
  21. // serialize calls to these two member functions.
  22. //
  23. // A single timer instance can handle multiple functions via a single thread.
  24. // It is better to leave long running work to a dedicated thread pool.
  25. //
  26. // Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
  27. // Work (in terms of a `void function`) can be scheduled by calling `Add` with
  28. // a unique function name and de-scheduled by calling `Cancel`.
  29. // Many functions can be added.
  30. //
  31. // Impl Details:
  32. // A heap is used to keep track of when the next timer goes off.
  33. // A map from a function name to the function keeps track of all the functions.
  34. class Timer {
  35. public:
  36. explicit Timer(SystemClock* clock)
  37. : clock_(clock),
  38. mutex_(clock),
  39. cond_var_(&mutex_),
  40. running_(false),
  41. executing_task_(false) {}
  42. ~Timer() { Shutdown(); }
  43. // Add a new function to run.
  44. // fn_name has to be identical, otherwise it will fail to add and return false
  45. // start_after_us is the initial delay.
  46. // repeat_every_us is the interval between ending time of the last call and
  47. // starting time of the next call. For example, repeat_every_us = 2000 and
  48. // the function takes 1000us to run. If it starts at time [now]us, then it
  49. // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
  50. // repeat_every_us == 0 means do not repeat.
  51. bool Add(std::function<void()> fn, const std::string& fn_name,
  52. uint64_t start_after_us, uint64_t repeat_every_us) {
  53. auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
  54. repeat_every_us);
  55. InstrumentedMutexLock l(&mutex_);
  56. // Assign time within mutex to make sure the next_run_time is larger than
  57. // the current running one
  58. fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
  59. // the new task start time should never before the current task executing
  60. // time, as the executing task can only be running if it's next_run_time_us
  61. // is due (<= clock_->NowMicros()).
  62. if (executing_task_ &&
  63. fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
  64. return false;
  65. }
  66. auto it = map_.find(fn_name);
  67. if (it == map_.end()) {
  68. heap_.push(fn_info.get());
  69. map_.try_emplace(fn_name, std::move(fn_info));
  70. } else {
  71. // timer doesn't support duplicated function name
  72. return false;
  73. }
  74. cond_var_.SignalAll();
  75. return true;
  76. }
  77. void Cancel(const std::string& fn_name) {
  78. InstrumentedMutexLock l(&mutex_);
  79. // Mark the function with fn_name as invalid so that it will not be
  80. // requeued.
  81. auto it = map_.find(fn_name);
  82. if (it != map_.end() && it->second) {
  83. it->second->Cancel();
  84. }
  85. // If the currently running function is fn_name, then we need to wait
  86. // until it finishes before returning to caller.
  87. while (!heap_.empty() && executing_task_) {
  88. FunctionInfo* func_info = heap_.top();
  89. assert(func_info);
  90. if (func_info->name == fn_name) {
  91. WaitForTaskCompleteIfNecessary();
  92. } else {
  93. break;
  94. }
  95. }
  96. }
  97. void CancelAll() {
  98. InstrumentedMutexLock l(&mutex_);
  99. CancelAllWithLock();
  100. }
  101. // Start the Timer
  102. bool Start() {
  103. InstrumentedMutexLock l(&mutex_);
  104. if (running_) {
  105. return false;
  106. }
  107. running_ = true;
  108. thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
  109. return true;
  110. }
  111. // Shutdown the Timer
  112. bool Shutdown() {
  113. {
  114. InstrumentedMutexLock l(&mutex_);
  115. if (!running_) {
  116. return false;
  117. }
  118. running_ = false;
  119. CancelAllWithLock();
  120. cond_var_.SignalAll();
  121. }
  122. if (thread_) {
  123. thread_->join();
  124. }
  125. return true;
  126. }
  127. bool HasPendingTask() const {
  128. InstrumentedMutexLock l(&mutex_);
  129. for (const auto& fn_info : map_) {
  130. if (fn_info.second->IsValid()) {
  131. return true;
  132. }
  133. }
  134. return false;
  135. }
  136. #ifndef NDEBUG
  137. // Wait until Timer starting waiting, call the optional callback, then wait
  138. // for Timer waiting again.
  139. // Tests can provide a custom Clock object to mock time, and use the callback
  140. // here to bump current time and trigger Timer. See timer_test for example.
  141. //
  142. // Note: only support one caller of this method.
  143. void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
  144. InstrumentedMutexLock l(&mutex_);
  145. // It act as a spin lock
  146. while (executing_task_ ||
  147. (!heap_.empty() &&
  148. heap_.top()->next_run_time_us <= clock_->NowMicros())) {
  149. cond_var_.TimedWait(clock_->NowMicros() + 1000);
  150. }
  151. if (callback != nullptr) {
  152. callback();
  153. }
  154. cond_var_.SignalAll();
  155. do {
  156. cond_var_.TimedWait(clock_->NowMicros() + 1000);
  157. } while (executing_task_ ||
  158. (!heap_.empty() &&
  159. heap_.top()->next_run_time_us <= clock_->NowMicros()));
  160. }
  161. size_t TEST_GetPendingTaskNum() const {
  162. InstrumentedMutexLock l(&mutex_);
  163. size_t ret = 0;
  164. for (const auto& fn_info : map_) {
  165. if (fn_info.second->IsValid()) {
  166. ret++;
  167. }
  168. }
  169. return ret;
  170. }
  171. void TEST_OverrideTimer(SystemClock* clock) {
  172. InstrumentedMutexLock l(&mutex_);
  173. clock_ = clock;
  174. }
  175. #endif // NDEBUG
  176. private:
  177. void Run() {
  178. InstrumentedMutexLock l(&mutex_);
  179. while (running_) {
  180. if (heap_.empty()) {
  181. // wait
  182. TEST_SYNC_POINT("Timer::Run::Waiting");
  183. cond_var_.Wait();
  184. continue;
  185. }
  186. FunctionInfo* current_fn = heap_.top();
  187. assert(current_fn);
  188. if (!current_fn->IsValid()) {
  189. heap_.pop();
  190. map_.erase(current_fn->name);
  191. continue;
  192. }
  193. if (current_fn->next_run_time_us <= clock_->NowMicros()) {
  194. // make a copy of the function so it won't be changed after
  195. // mutex_.unlock.
  196. std::function<void()> fn = current_fn->fn;
  197. executing_task_ = true;
  198. mutex_.Unlock();
  199. // Execute the work
  200. fn();
  201. mutex_.Lock();
  202. executing_task_ = false;
  203. cond_var_.SignalAll();
  204. // Remove the work from the heap once it is done executing, make sure
  205. // it's the same function after executing the work while mutex is
  206. // released.
  207. // Note that we are just removing the pointer from the heap. Its
  208. // memory is still managed in the map (as it holds a unique ptr).
  209. // So current_fn is still a valid ptr.
  210. assert(heap_.top() == current_fn);
  211. heap_.pop();
  212. // current_fn may be cancelled already.
  213. if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
  214. assert(running_);
  215. current_fn->next_run_time_us =
  216. clock_->NowMicros() + current_fn->repeat_every_us;
  217. // Schedule new work into the heap with new time.
  218. heap_.push(current_fn);
  219. } else {
  220. // if current_fn is cancelled or no need to repeat, remove it from the
  221. // map to avoid leak.
  222. map_.erase(current_fn->name);
  223. }
  224. } else {
  225. cond_var_.TimedWait(current_fn->next_run_time_us);
  226. }
  227. }
  228. }
  229. void CancelAllWithLock() {
  230. mutex_.AssertHeld();
  231. if (map_.empty() && heap_.empty()) {
  232. return;
  233. }
  234. // With mutex_ held, set all tasks to invalid so that they will not be
  235. // re-queued.
  236. for (auto& elem : map_) {
  237. auto& func_info = elem.second;
  238. assert(func_info);
  239. func_info->Cancel();
  240. }
  241. // WaitForTaskCompleteIfNecessary() may release mutex_
  242. WaitForTaskCompleteIfNecessary();
  243. while (!heap_.empty()) {
  244. heap_.pop();
  245. }
  246. map_.clear();
  247. }
  248. // A wrapper around std::function to keep track when it should run next
  249. // and at what frequency.
  250. struct FunctionInfo {
  251. // the actual work
  252. std::function<void()> fn;
  253. // name of the function
  254. std::string name;
  255. // when the function should run next
  256. uint64_t next_run_time_us;
  257. // repeat interval
  258. uint64_t repeat_every_us;
  259. // controls whether this function is valid.
  260. // A function is valid upon construction and until someone explicitly
  261. // calls `Cancel()`.
  262. bool valid;
  263. FunctionInfo(std::function<void()>&& _fn, std::string _name,
  264. const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
  265. : fn(std::move(_fn)),
  266. name(std::move(_name)),
  267. next_run_time_us(_next_run_time_us),
  268. repeat_every_us(_repeat_every_us),
  269. valid(true) {}
  270. void Cancel() { valid = false; }
  271. bool IsValid() const { return valid; }
  272. };
  273. void WaitForTaskCompleteIfNecessary() {
  274. mutex_.AssertHeld();
  275. while (executing_task_) {
  276. TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
  277. cond_var_.Wait();
  278. }
  279. }
  280. struct RunTimeOrder {
  281. bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
  282. return f1->next_run_time_us > f2->next_run_time_us;
  283. }
  284. };
  285. SystemClock* clock_;
  286. // This mutex controls both the heap_ and the map_. It needs to be held for
  287. // making any changes in them.
  288. mutable InstrumentedMutex mutex_;
  289. InstrumentedCondVar cond_var_;
  290. std::unique_ptr<port::Thread> thread_;
  291. bool running_;
  292. bool executing_task_;
  293. std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
  294. heap_;
  295. // In addition to providing a mapping from a function name to a function,
  296. // it is also responsible for memory management.
  297. std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
  298. };
  299. } // namespace ROCKSDB_NAMESPACE