flush_scheduler.cc 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/flush_scheduler.h"
  6. #include <cassert>
  7. #include "db/column_family.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) {
  10. #ifndef NDEBUG
  11. {
  12. std::lock_guard<std::mutex> lock(checking_mutex_);
  13. assert(checking_set_.count(cfd) == 0);
  14. checking_set_.insert(cfd);
  15. }
  16. #endif // NDEBUG
  17. cfd->Ref();
  18. // Suppress false positive clang analyzer warnings.
  19. #ifndef __clang_analyzer__
  20. Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
  21. while (!head_.compare_exchange_strong(
  22. node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
  23. // failing CAS updates the first param, so we are already set for
  24. // retry. TakeNextColumnFamily won't happen until after another
  25. // inter-thread synchronization, so we don't even need release
  26. // semantics for this CAS
  27. }
  28. #endif // __clang_analyzer__
  29. }
  30. ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
  31. while (true) {
  32. if (head_.load(std::memory_order_relaxed) == nullptr) {
  33. return nullptr;
  34. }
  35. // dequeue the head
  36. Node* node = head_.load(std::memory_order_relaxed);
  37. head_.store(node->next, std::memory_order_relaxed);
  38. ColumnFamilyData* cfd = node->column_family;
  39. delete node;
  40. #ifndef NDEBUG
  41. {
  42. std::lock_guard<std::mutex> lock(checking_mutex_);
  43. auto iter = checking_set_.find(cfd);
  44. assert(iter != checking_set_.end());
  45. checking_set_.erase(iter);
  46. }
  47. #endif // NDEBUG
  48. if (!cfd->IsDropped()) {
  49. // success
  50. return cfd;
  51. }
  52. // no longer relevant, retry
  53. cfd->UnrefAndTryDelete();
  54. }
  55. }
  56. bool FlushScheduler::Empty() {
  57. auto rv = head_.load(std::memory_order_relaxed) == nullptr;
  58. #ifndef NDEBUG
  59. std::lock_guard<std::mutex> lock(checking_mutex_);
  60. // Empty is allowed to be called concurrnetly with ScheduleFlush. It would
  61. // only miss the recent schedules.
  62. assert((rv == checking_set_.empty()) || rv);
  63. #endif // NDEBUG
  64. return rv;
  65. }
  66. void FlushScheduler::Clear() {
  67. ColumnFamilyData* cfd;
  68. while ((cfd = TakeNextColumnFamily()) != nullptr) {
  69. cfd->UnrefAndTryDelete();
  70. }
  71. assert(head_.load(std::memory_order_relaxed) == nullptr);
  72. }
  73. } // namespace ROCKSDB_NAMESPACE