async_file_reader.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).#pragma once
  6. #pragma once
  7. #if USE_COROUTINES
  8. #include "file/random_access_file_reader.h"
  9. #include "folly/coro/ViaIfAsync.h"
  10. #include "port/port.h"
  11. #include "rocksdb/file_system.h"
  12. #include "rocksdb/statistics.h"
  13. #include "util/autovector.h"
  14. #include "util/stop_watch.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. class SingleThreadExecutor;
  17. // AsyncFileReader implements the Awaitable concept, which allows calling
  18. // coroutines to co_await it. When the AsyncFileReader Awaitable is
  19. // resumed, it initiates the fie reads requested by the awaiting caller
  20. // by calling RandomAccessFileReader's ReadAsync. It then suspends the
  21. // awaiting coroutine. The suspended awaiter is later resumed by Wait().
  22. class AsyncFileReader {
  23. class ReadAwaiter;
  24. template <typename Awaiter>
  25. class ReadOperation;
  26. public:
  27. AsyncFileReader(FileSystem* fs, Statistics* stats) : fs_(fs), stats_(stats) {}
  28. ~AsyncFileReader() {}
  29. ReadOperation<ReadAwaiter> MultiReadAsync(RandomAccessFileReader* file,
  30. const IOOptions& opts,
  31. FSReadRequest* read_reqs,
  32. size_t num_reqs,
  33. AlignedBuf* aligned_buf,
  34. IODebugContext* dbg) noexcept {
  35. return ReadOperation<ReadAwaiter>{*this, file, opts, read_reqs,
  36. num_reqs, aligned_buf, dbg};
  37. }
  38. private:
  39. friend SingleThreadExecutor;
  40. // Implementation of the Awaitable concept
  41. class ReadAwaiter {
  42. public:
  43. explicit ReadAwaiter(AsyncFileReader& reader, RandomAccessFileReader* file,
  44. const IOOptions& opts, FSReadRequest* read_reqs,
  45. size_t num_reqs, AlignedBuf* /*aligned_buf*/,
  46. IODebugContext* dbg) noexcept
  47. : reader_(reader),
  48. file_(file),
  49. opts_(opts),
  50. read_reqs_(read_reqs),
  51. num_reqs_(num_reqs),
  52. dbg_(dbg),
  53. next_(nullptr) {}
  54. bool await_ready() noexcept { return false; }
  55. // A return value of true means suspend the awaiter (calling coroutine). The
  56. // awaiting_coro parameter is the handle of the awaiter. The handle can be
  57. // resumed later, so we cache it here.
  58. bool await_suspend(
  59. folly::coro::impl::coroutine_handle<> awaiting_coro) noexcept {
  60. awaiting_coro_ = awaiting_coro;
  61. // MultiReadAsyncImpl always returns true, so caller will be suspended
  62. return reader_.MultiReadAsyncImpl(this);
  63. }
  64. void await_resume() noexcept {}
  65. private:
  66. friend AsyncFileReader;
  67. // The parameters passed to MultiReadAsync are cached here when the caller
  68. // calls MultiReadAsync. Later, when the execution of this awaitable is
  69. // started, these are used to do the actual IO
  70. AsyncFileReader& reader_;
  71. RandomAccessFileReader* file_;
  72. const IOOptions& opts_;
  73. FSReadRequest* read_reqs_;
  74. size_t num_reqs_;
  75. IODebugContext* dbg_;
  76. autovector<void*, 32> io_handle_;
  77. autovector<IOHandleDeleter, 32> del_fn_;
  78. folly::coro::impl::coroutine_handle<> awaiting_coro_;
  79. // Use this to link to the next ReadAwaiter in the suspended coroutine
  80. // list. The head and tail of the list are tracked by AsyncFileReader.
  81. // We use this approach rather than an STL container in order to avoid
  82. // extra memory allocations. The coroutine call already allocates a
  83. // ReadAwaiter object.
  84. ReadAwaiter* next_;
  85. };
  86. // An instance of ReadOperation is returned to the caller of MultiGetAsync.
  87. // This represents an awaitable that can be started later.
  88. template <typename Awaiter>
  89. class ReadOperation {
  90. public:
  91. explicit ReadOperation(AsyncFileReader& reader,
  92. RandomAccessFileReader* file, const IOOptions& opts,
  93. FSReadRequest* read_reqs, size_t num_reqs,
  94. AlignedBuf* aligned_buf,
  95. IODebugContext* dbg) noexcept
  96. : reader_(reader),
  97. file_(file),
  98. opts_(opts),
  99. read_reqs_(read_reqs),
  100. num_reqs_(num_reqs),
  101. aligned_buf_(aligned_buf),
  102. dbg_(dbg) {}
  103. auto viaIfAsync(folly::Executor::KeepAlive<> executor) const {
  104. return folly::coro::co_viaIfAsync(
  105. std::move(executor), Awaiter{reader_, file_, opts_, read_reqs_,
  106. num_reqs_, aligned_buf_, dbg_});
  107. }
  108. private:
  109. AsyncFileReader& reader_;
  110. RandomAccessFileReader* file_;
  111. const IOOptions& opts_;
  112. FSReadRequest* read_reqs_;
  113. size_t num_reqs_;
  114. AlignedBuf* aligned_buf_;
  115. IODebugContext* dbg_;
  116. };
  117. // This function does the actual work when this awaitable starts execution
  118. bool MultiReadAsyncImpl(ReadAwaiter* awaiter);
  119. // Called by the SingleThreadExecutor to poll for async IO completion.
  120. // This also resumes the awaiting coroutines.
  121. void Wait();
  122. // Head of the queue of awaiters waiting for async IO completion
  123. ReadAwaiter* head_ = nullptr;
  124. // Tail of the awaiter queue
  125. ReadAwaiter* tail_ = nullptr;
  126. // Total number of pending async IOs
  127. size_t num_reqs_ = 0;
  128. FileSystem* fs_;
  129. Statistics* stats_;
  130. };
  131. } // namespace ROCKSDB_NAMESPACE
  132. #endif // USE_COROUTINES