work_queue.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. /*
  6. * Copyright (c) 2016-present, Facebook, Inc.
  7. * All rights reserved.
  8. *
  9. * This source code is licensed under both the BSD-style license (found in the
  10. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  11. * in the COPYING file in the root directory of this source tree).
  12. */
  13. #pragma once
  14. #include <atomic>
  15. #include <cassert>
  16. #include <condition_variable>
  17. #include <cstddef>
  18. #include <functional>
  19. #include <mutex>
  20. #include <queue>
  21. #include "rocksdb/rocksdb_namespace.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. /// Unbounded thread-safe work queue.
  24. //
  25. // This file is an excerpt from Facebook's zstd repo at
  26. // https://github.com/facebook/zstd/. The relevant file is
  27. // contrib/pzstd/utils/WorkQueue.h.
  28. template <typename T>
  29. class WorkQueue {
  30. // Protects all member variable access
  31. std::mutex mutex_;
  32. std::condition_variable readerCv_;
  33. std::condition_variable writerCv_;
  34. std::condition_variable finishCv_;
  35. std::queue<T> queue_;
  36. bool done_;
  37. std::size_t maxSize_;
  38. // Must have lock to call this function
  39. bool full() const {
  40. if (maxSize_ == 0) {
  41. return false;
  42. }
  43. return queue_.size() >= maxSize_;
  44. }
  45. public:
  46. /**
  47. * Constructs an empty work queue with an optional max size.
  48. * If `maxSize == 0` the queue size is unbounded.
  49. *
  50. * @param maxSize The maximum allowed size of the work queue.
  51. */
  52. WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
  53. /**
  54. * Push an item onto the work queue. Notify a single thread that work is
  55. * available. If `finish()` has been called, do nothing and return false.
  56. * If `push()` returns false, then `item` has not been copied from.
  57. *
  58. * @param item Item to push onto the queue.
  59. * @returns True upon success, false if `finish()` has been called. An
  60. * item was pushed iff `push()` returns true.
  61. */
  62. template <typename U>
  63. bool push(U&& item) {
  64. {
  65. std::unique_lock<std::mutex> lock(mutex_);
  66. while (full() && !done_) {
  67. writerCv_.wait(lock);
  68. }
  69. if (done_) {
  70. return false;
  71. }
  72. queue_.push(std::forward<U>(item));
  73. }
  74. readerCv_.notify_one();
  75. return true;
  76. }
  77. /**
  78. * Attempts to pop an item off the work queue. It will block until data is
  79. * available or `finish()` has been called.
  80. *
  81. * @param[out] item If `pop` returns `true`, it contains the popped item.
  82. * If `pop` returns `false`, it is unmodified.
  83. * @returns True upon success. False if the queue is empty and
  84. * `finish()` has been called.
  85. */
  86. bool pop(T& item) {
  87. {
  88. std::unique_lock<std::mutex> lock(mutex_);
  89. while (queue_.empty() && !done_) {
  90. readerCv_.wait(lock);
  91. }
  92. if (queue_.empty()) {
  93. assert(done_);
  94. return false;
  95. }
  96. item = queue_.front();
  97. queue_.pop();
  98. }
  99. writerCv_.notify_one();
  100. return true;
  101. }
  102. /**
  103. * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
  104. *
  105. * @param maxSize The new maximum queue size.
  106. */
  107. void setMaxSize(std::size_t maxSize) {
  108. {
  109. std::lock_guard<std::mutex> lock(mutex_);
  110. maxSize_ = maxSize;
  111. }
  112. writerCv_.notify_all();
  113. }
  114. /**
  115. * Promise that `push()` won't be called again, so once the queue is empty
  116. * there will never any more work.
  117. */
  118. void finish() {
  119. {
  120. std::lock_guard<std::mutex> lock(mutex_);
  121. assert(!done_);
  122. done_ = true;
  123. }
  124. readerCv_.notify_all();
  125. writerCv_.notify_all();
  126. finishCv_.notify_all();
  127. }
  128. /// Blocks until `finish()` has been called (but the queue may not be empty).
  129. void waitUntilFinished() {
  130. std::unique_lock<std::mutex> lock(mutex_);
  131. while (!done_) {
  132. finishCv_.wait(lock);
  133. }
  134. }
  135. };
  136. } // namespace ROCKSDB_NAMESPACE