repeatable_thread.h 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <functional>
  7. #include <string>
  8. #include "port/port.h"
  9. #include "rocksdb/env.h"
  10. #include "test_util/mock_time_env.h"
  11. #include "util/mutexlock.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. // Simple wrapper around port::Thread that supports calling a callback every
  14. // X seconds. If you pass in 0, then it will call your callback repeatedly
  15. // without delay.
  16. class RepeatableThread {
  17. public:
  18. RepeatableThread(std::function<void()> function,
  19. const std::string& thread_name, Env* env, uint64_t delay_us,
  20. uint64_t initial_delay_us = 0)
  21. : function_(function),
  22. thread_name_("rocksdb:" + thread_name),
  23. env_(env),
  24. delay_us_(delay_us),
  25. initial_delay_us_(initial_delay_us),
  26. mutex_(env),
  27. cond_var_(&mutex_),
  28. running_(true),
  29. #ifndef NDEBUG
  30. waiting_(false),
  31. run_count_(0),
  32. #endif
  33. thread_([this] { thread(); }) {
  34. }
  35. void cancel() {
  36. {
  37. InstrumentedMutexLock l(&mutex_);
  38. if (!running_) {
  39. return;
  40. }
  41. running_ = false;
  42. cond_var_.SignalAll();
  43. }
  44. thread_.join();
  45. }
  46. bool IsRunning() { return running_; }
  47. ~RepeatableThread() { cancel(); }
  48. #ifndef NDEBUG
  49. // Wait until RepeatableThread starting waiting, call the optional callback,
  50. // then wait for one run of RepeatableThread. Tests can use provide a
  51. // custom env object to mock time, and use the callback here to bump current
  52. // time and trigger RepeatableThread. See repeatable_thread_test for example.
  53. //
  54. // Note: only support one caller of this method.
  55. void TEST_WaitForRun(std::function<void()> callback = nullptr) {
  56. InstrumentedMutexLock l(&mutex_);
  57. while (!waiting_) {
  58. cond_var_.Wait();
  59. }
  60. uint64_t prev_count = run_count_;
  61. if (callback != nullptr) {
  62. callback();
  63. }
  64. cond_var_.SignalAll();
  65. while (!(run_count_ > prev_count)) {
  66. cond_var_.Wait();
  67. }
  68. }
  69. #endif
  70. private:
  71. bool wait(uint64_t delay) {
  72. InstrumentedMutexLock l(&mutex_);
  73. if (running_ && delay > 0) {
  74. uint64_t wait_until = env_->NowMicros() + delay;
  75. #ifndef NDEBUG
  76. waiting_ = true;
  77. cond_var_.SignalAll();
  78. #endif
  79. while (running_) {
  80. cond_var_.TimedWait(wait_until);
  81. if (env_->NowMicros() >= wait_until) {
  82. break;
  83. }
  84. }
  85. #ifndef NDEBUG
  86. waiting_ = false;
  87. #endif
  88. }
  89. return running_;
  90. }
  91. void thread() {
  92. #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  93. #if __GLIBC_PREREQ(2, 12)
  94. // Set thread name.
  95. auto thread_handle = thread_.native_handle();
  96. int ret __attribute__((__unused__)) =
  97. pthread_setname_np(thread_handle, thread_name_.c_str());
  98. assert(ret == 0);
  99. #endif
  100. #endif
  101. assert(delay_us_ > 0);
  102. if (!wait(initial_delay_us_)) {
  103. return;
  104. }
  105. do {
  106. function_();
  107. #ifndef NDEBUG
  108. {
  109. InstrumentedMutexLock l(&mutex_);
  110. run_count_++;
  111. cond_var_.SignalAll();
  112. }
  113. #endif
  114. } while (wait(delay_us_));
  115. }
  116. const std::function<void()> function_;
  117. const std::string thread_name_;
  118. Env* const env_;
  119. const uint64_t delay_us_;
  120. const uint64_t initial_delay_us_;
  121. // Mutex lock should be held when accessing running_, waiting_
  122. // and run_count_.
  123. InstrumentedMutex mutex_;
  124. InstrumentedCondVar cond_var_;
  125. bool running_;
  126. #ifndef NDEBUG
  127. // RepeatableThread waiting for timeout.
  128. bool waiting_;
  129. // Times function_ had run.
  130. uint64_t run_count_;
  131. #endif
  132. port::Thread thread_;
  133. };
  134. } // namespace ROCKSDB_NAMESPACE