channel.h 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <condition_variable>
  7. #include <mutex>
  8. #include <queue>
  9. #include <utility>
  10. #include "rocksdb/rocksdb_namespace.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. template <class T>
  13. class channel {
  14. public:
  15. explicit channel() : eof_(false) {}
  16. channel(const channel&) = delete;
  17. void operator=(const channel&) = delete;
  18. void sendEof() {
  19. std::lock_guard<std::mutex> lk(lock_);
  20. eof_ = true;
  21. cv_.notify_all();
  22. }
  23. bool eof() {
  24. std::lock_guard<std::mutex> lk(lock_);
  25. return buffer_.empty() && eof_;
  26. }
  27. size_t size() const {
  28. std::lock_guard<std::mutex> lk(lock_);
  29. return buffer_.size();
  30. }
  31. // writes elem to the queue
  32. void write(T&& elem) {
  33. std::unique_lock<std::mutex> lk(lock_);
  34. buffer_.emplace(std::forward<T>(elem));
  35. cv_.notify_one();
  36. }
  37. /// Moves a dequeued element onto elem, blocking until an element
  38. /// is available.
  39. // returns false if EOF
  40. bool read(T& elem) {
  41. std::unique_lock<std::mutex> lk(lock_);
  42. cv_.wait(lk, [&] { return eof_ || !buffer_.empty(); });
  43. if (eof_ && buffer_.empty()) {
  44. return false;
  45. }
  46. elem = std::move(buffer_.front());
  47. buffer_.pop();
  48. cv_.notify_one();
  49. return true;
  50. }
  51. private:
  52. std::condition_variable cv_;
  53. mutable std::mutex lock_;
  54. std::queue<T> buffer_;
  55. bool eof_;
  56. };
  57. } // namespace ROCKSDB_NAMESPACE