| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- #pragma once
- #include <functional>
- #include <memory>
- #include <queue>
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #include "monitoring/instrumented_mutex.h"
- #include "rocksdb/system_clock.h"
- #include "test_util/sync_point.h"
- #include "util/mutexlock.h"
- namespace ROCKSDB_NAMESPACE {
- // A Timer class to handle repeated work.
- //
- // `Start()` and `Shutdown()` are currently not thread-safe. The client must
- // serialize calls to these two member functions.
- //
- // A single timer instance can handle multiple functions via a single thread.
- // It is better to leave long running work to a dedicated thread pool.
- //
- // Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
- // Work (in terms of a `void function`) can be scheduled by calling `Add` with
- // a unique function name and de-scheduled by calling `Cancel`.
- // Many functions can be added.
- //
- // Impl Details:
- // A heap is used to keep track of when the next timer goes off.
- // A map from a function name to the function keeps track of all the functions.
- class Timer {
- public:
- explicit Timer(SystemClock* clock)
- : clock_(clock),
- mutex_(clock),
- cond_var_(&mutex_),
- running_(false),
- executing_task_(false) {}
- ~Timer() { Shutdown(); }
- // Add a new function to run.
- // fn_name has to be identical, otherwise it will fail to add and return false
- // start_after_us is the initial delay.
- // repeat_every_us is the interval between ending time of the last call and
- // starting time of the next call. For example, repeat_every_us = 2000 and
- // the function takes 1000us to run. If it starts at time [now]us, then it
- // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
- // repeat_every_us == 0 means do not repeat.
- bool Add(std::function<void()> fn, const std::string& fn_name,
- uint64_t start_after_us, uint64_t repeat_every_us) {
- auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
- repeat_every_us);
- InstrumentedMutexLock l(&mutex_);
- // Assign time within mutex to make sure the next_run_time is larger than
- // the current running one
- fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
- // the new task start time should never before the current task executing
- // time, as the executing task can only be running if it's next_run_time_us
- // is due (<= clock_->NowMicros()).
- if (executing_task_ &&
- fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
- return false;
- }
- auto it = map_.find(fn_name);
- if (it == map_.end()) {
- heap_.push(fn_info.get());
- map_.try_emplace(fn_name, std::move(fn_info));
- } else {
- // timer doesn't support duplicated function name
- return false;
- }
- cond_var_.SignalAll();
- return true;
- }
- void Cancel(const std::string& fn_name) {
- InstrumentedMutexLock l(&mutex_);
- // Mark the function with fn_name as invalid so that it will not be
- // requeued.
- auto it = map_.find(fn_name);
- if (it != map_.end() && it->second) {
- it->second->Cancel();
- }
- // If the currently running function is fn_name, then we need to wait
- // until it finishes before returning to caller.
- while (!heap_.empty() && executing_task_) {
- FunctionInfo* func_info = heap_.top();
- assert(func_info);
- if (func_info->name == fn_name) {
- WaitForTaskCompleteIfNecessary();
- } else {
- break;
- }
- }
- }
- void CancelAll() {
- InstrumentedMutexLock l(&mutex_);
- CancelAllWithLock();
- }
- // Start the Timer
- bool Start() {
- InstrumentedMutexLock l(&mutex_);
- if (running_) {
- return false;
- }
- running_ = true;
- thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
- return true;
- }
- // Shutdown the Timer
- bool Shutdown() {
- {
- InstrumentedMutexLock l(&mutex_);
- if (!running_) {
- return false;
- }
- running_ = false;
- CancelAllWithLock();
- cond_var_.SignalAll();
- }
- if (thread_) {
- thread_->join();
- }
- return true;
- }
- bool HasPendingTask() const {
- InstrumentedMutexLock l(&mutex_);
- for (const auto& fn_info : map_) {
- if (fn_info.second->IsValid()) {
- return true;
- }
- }
- return false;
- }
- #ifndef NDEBUG
- // Wait until Timer starting waiting, call the optional callback, then wait
- // for Timer waiting again.
- // Tests can provide a custom Clock object to mock time, and use the callback
- // here to bump current time and trigger Timer. See timer_test for example.
- //
- // Note: only support one caller of this method.
- void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
- InstrumentedMutexLock l(&mutex_);
- // It act as a spin lock
- while (executing_task_ ||
- (!heap_.empty() &&
- heap_.top()->next_run_time_us <= clock_->NowMicros())) {
- cond_var_.TimedWait(clock_->NowMicros() + 1000);
- }
- if (callback != nullptr) {
- callback();
- }
- cond_var_.SignalAll();
- do {
- cond_var_.TimedWait(clock_->NowMicros() + 1000);
- } while (executing_task_ ||
- (!heap_.empty() &&
- heap_.top()->next_run_time_us <= clock_->NowMicros()));
- }
- size_t TEST_GetPendingTaskNum() const {
- InstrumentedMutexLock l(&mutex_);
- size_t ret = 0;
- for (const auto& fn_info : map_) {
- if (fn_info.second->IsValid()) {
- ret++;
- }
- }
- return ret;
- }
- void TEST_OverrideTimer(SystemClock* clock) {
- InstrumentedMutexLock l(&mutex_);
- clock_ = clock;
- }
- #endif // NDEBUG
- private:
- void Run() {
- InstrumentedMutexLock l(&mutex_);
- while (running_) {
- if (heap_.empty()) {
- // wait
- TEST_SYNC_POINT("Timer::Run::Waiting");
- cond_var_.Wait();
- continue;
- }
- FunctionInfo* current_fn = heap_.top();
- assert(current_fn);
- if (!current_fn->IsValid()) {
- heap_.pop();
- map_.erase(current_fn->name);
- continue;
- }
- if (current_fn->next_run_time_us <= clock_->NowMicros()) {
- // make a copy of the function so it won't be changed after
- // mutex_.unlock.
- std::function<void()> fn = current_fn->fn;
- executing_task_ = true;
- mutex_.Unlock();
- // Execute the work
- fn();
- mutex_.Lock();
- executing_task_ = false;
- cond_var_.SignalAll();
- // Remove the work from the heap once it is done executing, make sure
- // it's the same function after executing the work while mutex is
- // released.
- // Note that we are just removing the pointer from the heap. Its
- // memory is still managed in the map (as it holds a unique ptr).
- // So current_fn is still a valid ptr.
- assert(heap_.top() == current_fn);
- heap_.pop();
- // current_fn may be cancelled already.
- if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
- assert(running_);
- current_fn->next_run_time_us =
- clock_->NowMicros() + current_fn->repeat_every_us;
- // Schedule new work into the heap with new time.
- heap_.push(current_fn);
- } else {
- // if current_fn is cancelled or no need to repeat, remove it from the
- // map to avoid leak.
- map_.erase(current_fn->name);
- }
- } else {
- cond_var_.TimedWait(current_fn->next_run_time_us);
- }
- }
- }
- void CancelAllWithLock() {
- mutex_.AssertHeld();
- if (map_.empty() && heap_.empty()) {
- return;
- }
- // With mutex_ held, set all tasks to invalid so that they will not be
- // re-queued.
- for (auto& elem : map_) {
- auto& func_info = elem.second;
- assert(func_info);
- func_info->Cancel();
- }
- // WaitForTaskCompleteIfNecessary() may release mutex_
- WaitForTaskCompleteIfNecessary();
- while (!heap_.empty()) {
- heap_.pop();
- }
- map_.clear();
- }
- // A wrapper around std::function to keep track when it should run next
- // and at what frequency.
- struct FunctionInfo {
- // the actual work
- std::function<void()> fn;
- // name of the function
- std::string name;
- // when the function should run next
- uint64_t next_run_time_us;
- // repeat interval
- uint64_t repeat_every_us;
- // controls whether this function is valid.
- // A function is valid upon construction and until someone explicitly
- // calls `Cancel()`.
- bool valid;
- FunctionInfo(std::function<void()>&& _fn, std::string _name,
- const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
- : fn(std::move(_fn)),
- name(std::move(_name)),
- next_run_time_us(_next_run_time_us),
- repeat_every_us(_repeat_every_us),
- valid(true) {}
- void Cancel() { valid = false; }
- bool IsValid() const { return valid; }
- };
- void WaitForTaskCompleteIfNecessary() {
- mutex_.AssertHeld();
- while (executing_task_) {
- TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
- cond_var_.Wait();
- }
- }
- struct RunTimeOrder {
- bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
- return f1->next_run_time_us > f2->next_run_time_us;
- }
- };
- SystemClock* clock_;
- // This mutex controls both the heap_ and the map_. It needs to be held for
- // making any changes in them.
- mutable InstrumentedMutex mutex_;
- InstrumentedCondVar cond_var_;
- std::unique_ptr<port::Thread> thread_;
- bool running_;
- bool executing_task_;
- std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
- heap_;
- // In addition to providing a mapping from a function name to a function,
- // it is also responsible for memory management.
- std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
- };
- } // namespace ROCKSDB_NAMESPACE
|