channel.h 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <condition_variable>
  7. #include <mutex>
  8. #include <queue>
  9. #include <utility>
  10. namespace ROCKSDB_NAMESPACE {
  11. template <class T>
  12. class channel {
  13. public:
  14. explicit channel() : eof_(false) {}
  15. channel(const channel&) = delete;
  16. void operator=(const channel&) = delete;
  17. void sendEof() {
  18. std::lock_guard<std::mutex> lk(lock_);
  19. eof_ = true;
  20. cv_.notify_all();
  21. }
  22. bool eof() {
  23. std::lock_guard<std::mutex> lk(lock_);
  24. return buffer_.empty() && eof_;
  25. }
  26. size_t size() const {
  27. std::lock_guard<std::mutex> lk(lock_);
  28. return buffer_.size();
  29. }
  30. // writes elem to the queue
  31. void write(T&& elem) {
  32. std::unique_lock<std::mutex> lk(lock_);
  33. buffer_.emplace(std::forward<T>(elem));
  34. cv_.notify_one();
  35. }
  36. /// Moves a dequeued element onto elem, blocking until an element
  37. /// is available.
  38. // returns false if EOF
  39. bool read(T& elem) {
  40. std::unique_lock<std::mutex> lk(lock_);
  41. cv_.wait(lk, [&] { return eof_ || !buffer_.empty(); });
  42. if (eof_ && buffer_.empty()) {
  43. return false;
  44. }
  45. elem = std::move(buffer_.front());
  46. buffer_.pop();
  47. cv_.notify_one();
  48. return true;
  49. }
  50. private:
  51. std::condition_variable cv_;
  52. mutable std::mutex lock_;
  53. std::queue<T> buffer_;
  54. bool eof_;
  55. };
  56. } // namespace ROCKSDB_NAMESPACE