periodic_task_scheduler.cc 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "db/periodic_task_scheduler.h"
  7. #include "rocksdb/system_clock.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. // `timer_mutex` is a global mutex serves 3 purposes currently:
  10. // (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
  11. // they are currently not implemented in a thread-safe way; and
  12. // (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
  13. // the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
  14. // (3) protect tasks_map_ in PeriodicTaskScheduler
  15. // Note: It's not efficient to have a static global mutex, for
  16. // PeriodicTaskScheduler it should be okay, as the operations are called
  17. // infrequently.
  18. static port::Mutex timer_mutex;
  19. static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
  20. {PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
  21. {PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
  22. {PeriodicTaskType::kFlushInfoLog, 10},
  23. {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
  24. {PeriodicTaskType::kTriggerCompaction, 12 * 60 * 60} // 12 hours
  25. };
  26. static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
  27. {PeriodicTaskType::kDumpStats, "dump_st"},
  28. {PeriodicTaskType::kPersistStats, "pst_st"},
  29. {PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
  30. {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
  31. {PeriodicTaskType::kTriggerCompaction, "trigger_compaction"},
  32. };
  33. Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
  34. const PeriodicTaskFunc& fn,
  35. bool run_immediately) {
  36. return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type),
  37. run_immediately);
  38. }
  39. Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
  40. const PeriodicTaskFunc& fn,
  41. uint64_t repeat_period_seconds,
  42. bool run_immediately) {
  43. MutexLock l(&timer_mutex);
  44. static std::atomic<uint64_t> initial_delay(0);
  45. if (repeat_period_seconds == kInvalidPeriodSec) {
  46. return Status::InvalidArgument("Invalid task repeat period");
  47. }
  48. auto it = tasks_map_.find(task_type);
  49. if (it != tasks_map_.end()) {
  50. // the task already exists and it's the same, no update needed
  51. if (it->second.repeat_every_sec == repeat_period_seconds) {
  52. return Status::OK();
  53. }
  54. // cancel the existing one before register new one
  55. timer_->Cancel(it->second.name);
  56. tasks_map_.erase(it);
  57. }
  58. timer_->Start();
  59. // put task type name as prefix, for easy debug
  60. std::string unique_id =
  61. kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
  62. uint64_t initial_delay_micros =
  63. (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond;
  64. if (!run_immediately) {
  65. initial_delay_micros += repeat_period_seconds * kMicrosInSecond;
  66. }
  67. bool succeeded = timer_->Add(fn, unique_id, initial_delay_micros,
  68. repeat_period_seconds * kMicrosInSecond);
  69. if (!succeeded) {
  70. return Status::Aborted("Failed to register periodic task");
  71. }
  72. auto result = tasks_map_.try_emplace(
  73. task_type, TaskInfo{unique_id, repeat_period_seconds});
  74. if (!result.second) {
  75. return Status::Aborted("Failed to add periodic task");
  76. }
  77. return Status::OK();
  78. }
  79. Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
  80. MutexLock l(&timer_mutex);
  81. auto it = tasks_map_.find(task_type);
  82. if (it != tasks_map_.end()) {
  83. timer_->Cancel(it->second.name);
  84. tasks_map_.erase(it);
  85. }
  86. if (!timer_->HasPendingTask()) {
  87. timer_->Shutdown();
  88. }
  89. return Status::OK();
  90. }
  91. Timer* PeriodicTaskScheduler::Default() {
  92. STATIC_AVOID_DESTRUCTION(Timer, timer)(SystemClock::Default().get());
  93. return &timer;
  94. }
  95. #ifndef NDEBUG
  96. void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
  97. static Timer test_timer(clock);
  98. test_timer.TEST_OverrideTimer(clock);
  99. MutexLock l(&timer_mutex);
  100. timer_ = &test_timer;
  101. }
  102. #endif // NDEBUG
  103. } // namespace ROCKSDB_NAMESPACE