file_prefetch_buffer.h 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <algorithm>
  11. #include <atomic>
  12. #include <deque>
  13. #include <sstream>
  14. #include <string>
  15. #include "file/random_access_file_reader.h"
  16. #include "file/readahead_file_info.h"
  17. #include "file_util.h"
  18. #include "monitoring/statistics_impl.h"
  19. #include "port/port.h"
  20. #include "rocksdb/env.h"
  21. #include "rocksdb/file_system.h"
  22. #include "rocksdb/options.h"
  23. #include "util/aligned_buffer.h"
  24. #include "util/autovector.h"
  25. #include "util/stop_watch.h"
  26. namespace ROCKSDB_NAMESPACE {
  27. #define DEFAULT_DECREMENT 8 * 1024
  28. struct IOOptions;
  29. class RandomAccessFileReader;
  30. struct ReadaheadParams {
  31. ReadaheadParams() {}
  32. // The initial readahead size.
  33. size_t initial_readahead_size = 0;
  34. // The maximum readahead size.
  35. // If max_readahead_size > readahead_size, then readahead size will be doubled
  36. // on every IO until max_readahead_size is hit. Typically this is set as a
  37. // multiple of initial_readahead_size. initial_readahead_size should be
  38. // greater than equal to initial_readahead_size.
  39. size_t max_readahead_size = 0;
  40. // If true, Readahead is enabled implicitly by rocksdb
  41. // after doing sequential scans for num_file_reads_for_auto_readahead.
  42. bool implicit_auto_readahead = false;
  43. // TODO akanksha - Remove num_file_reads when BlockPrefetcher is refactored.
  44. uint64_t num_file_reads = 0;
  45. uint64_t num_file_reads_for_auto_readahead = 0;
  46. // Number of buffers to maintain that contains prefetched data. If num_buffers
  47. // > 1 then buffers will be filled asynchronously whenever they get emptied.
  48. size_t num_buffers = 1;
  49. };
  50. struct BufferInfo {
  51. void ClearBuffer() {
  52. buffer_.Clear();
  53. initial_end_offset_ = 0;
  54. async_req_len_ = 0;
  55. }
  56. AlignedBuffer buffer_;
  57. uint64_t offset_ = 0;
  58. // Below parameters are used in case of async read flow.
  59. // Length requested for in ReadAsync.
  60. size_t async_req_len_ = 0;
  61. // async_read_in_progress can be used as mutex. Callback can update the buffer
  62. // and its size but async_read_in_progress is only set by main thread.
  63. bool async_read_in_progress_ = false;
  64. // io_handle is allocated and used by underlying file system in case of
  65. // asynchronous reads.
  66. void* io_handle_ = nullptr;
  67. IOHandleDeleter del_fn_ = nullptr;
  68. // initial_end_offset is used to keep track of the end offset of the buffer
  69. // that was originally called. It's helpful in case of autotuning of readahead
  70. // size when callback is made to BlockBasedTableIterator.
  71. // initial end offset of this buffer which will be the starting
  72. // offset of next prefetch.
  73. //
  74. // For example - if end offset of previous buffer was 100 and because of
  75. // readahead_size optimization, end_offset was trimmed to 60. Then for next
  76. // prefetch call, start_offset should be intialized to 100 i.e start_offset =
  77. // buf->initial_end_offset_.
  78. uint64_t initial_end_offset_ = 0;
  79. bool IsDataBlockInBuffer(uint64_t offset, size_t length) {
  80. assert(async_read_in_progress_ == false);
  81. return (offset >= offset_ &&
  82. offset + length <= offset_ + buffer_.CurrentSize());
  83. }
  84. bool IsOffsetInBuffer(uint64_t offset) {
  85. assert(async_read_in_progress_ == false);
  86. return (offset >= offset_ && offset < offset_ + buffer_.CurrentSize());
  87. }
  88. bool DoesBufferContainData() {
  89. assert(async_read_in_progress_ == false);
  90. return buffer_.CurrentSize() > 0;
  91. }
  92. bool IsBufferOutdated(uint64_t offset) {
  93. return (!async_read_in_progress_ && DoesBufferContainData() &&
  94. offset >= offset_ + buffer_.CurrentSize());
  95. }
  96. bool IsBufferOutdatedWithAsyncProgress(uint64_t offset) {
  97. return (async_read_in_progress_ && io_handle_ != nullptr &&
  98. offset >= offset_ + async_req_len_);
  99. }
  100. bool IsOffsetInBufferWithAsyncProgress(uint64_t offset) {
  101. return (async_read_in_progress_ && offset >= offset_ &&
  102. offset < offset_ + async_req_len_);
  103. }
  104. size_t CurrentSize() { return buffer_.CurrentSize(); }
  105. };
  106. enum class FilePrefetchBufferUsage {
  107. kTableOpenPrefetchTail,
  108. kUserScanPrefetch,
  109. kCompactionPrefetch,
  110. kUnknown,
  111. };
  112. // Implementation:
  113. // FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) with no
  114. // data and bufs_ which contains the prefetched data. Whenever a buffer is
  115. // consumed or is outdated (w.r.t. to requested offset), that buffer is cleared
  116. // and returned to free_bufs_.
  117. //
  118. // If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for
  119. // prefetching.
  120. // num_buffers_ defines how many buffers FilePrefetchBuffer can maintain at a
  121. // time that contains prefetched data with num_buffers_ == bufs_.size() +
  122. // free_bufs_.size().
  123. //
  124. // If num_buffers_ == 1, it's a sequential read flow. Read API will be called on
  125. // that one buffer whenever the data is requested and is not in the buffer.
  126. // When reusing the file system allocated buffer, overlap_buf_ is used if the
  127. // main buffer only contains part of the requested data. It is returned to
  128. // the caller after the remaining data is fetched.
  129. // If num_buffers_ > 1, then the data is prefetched asynchronosuly in the
  130. // buffers whenever the data is consumed from the buffers and that buffer is
  131. // freed.
  132. // If num_buffers > 1, then requested data can be overlapping between 2 buffers.
  133. // To return the continuous buffer, overlap_buf_ is used. The requested data is
  134. // copied from 2 buffers to the overlap_buf_ and overlap_buf_ is returned to
  135. // the caller.
  136. // FilePrefetchBuffer is a smart buffer to store and read data from a file.
  137. class FilePrefetchBuffer {
  138. public:
  139. // Constructor.
  140. //
  141. // All arguments are optional.
  142. // ReadaheadParams : Parameters to control the readahead behavior.
  143. // enable : controls whether reading from the buffer is enabled.
  144. // If false, TryReadFromCache() always return false, and we
  145. // only take stats for the minimum offset if
  146. // track_min_offset = true.
  147. // See below NOTE about mmap reads.
  148. // track_min_offset : Track the minimum offset ever read and collect stats on
  149. // it. Used for adaptable readahead of the file
  150. // footer/metadata.
  151. //
  152. // A user can construct a FilePrefetchBuffer without any arguments, but use
  153. // `Prefetch` to load data into the buffer.
  154. // NOTE: FilePrefetchBuffer is incompatible with prefetching from
  155. // RandomAccessFileReaders using mmap reads, so it is common to use
  156. // `!use_mmap_reads` for the `enable` parameter.
  157. FilePrefetchBuffer(
  158. const ReadaheadParams& readahead_params = {}, bool enable = true,
  159. bool track_min_offset = false, FileSystem* fs = nullptr,
  160. SystemClock* clock = nullptr, Statistics* stats = nullptr,
  161. const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
  162. FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
  163. : readahead_size_(readahead_params.initial_readahead_size),
  164. initial_auto_readahead_size_(readahead_params.initial_readahead_size),
  165. max_readahead_size_(readahead_params.max_readahead_size),
  166. min_offset_read_(std::numeric_limits<size_t>::max()),
  167. enable_(enable),
  168. track_min_offset_(track_min_offset),
  169. implicit_auto_readahead_(readahead_params.implicit_auto_readahead),
  170. prev_offset_(0),
  171. prev_len_(0),
  172. num_file_reads_for_auto_readahead_(
  173. readahead_params.num_file_reads_for_auto_readahead),
  174. num_file_reads_(readahead_params.num_file_reads),
  175. explicit_prefetch_submitted_(false),
  176. fs_(fs),
  177. clock_(clock),
  178. stats_(stats),
  179. usage_(usage),
  180. readaheadsize_cb_(cb),
  181. num_buffers_(readahead_params.num_buffers) {
  182. assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
  183. (num_file_reads_ == 0));
  184. // overlap_buf_ is used whenever the main buffer only has part of the
  185. // requested data. The relevant data is copied into overlap_buf_ and the
  186. // remaining data is copied in later to satisfy the user's request. This is
  187. // used in both the synchronous (num_buffers_ = 1) and asynchronous
  188. // (num_buffers_ > 1) cases. In the asynchronous case, the requested data
  189. // may be spread out over 2 buffers.
  190. if (num_buffers_ > 1 ||
  191. (fs_ != nullptr &&
  192. CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer))) {
  193. overlap_buf_ = new BufferInfo();
  194. }
  195. free_bufs_.resize(num_buffers_);
  196. for (uint32_t i = 0; i < num_buffers_; i++) {
  197. free_bufs_[i] = new BufferInfo();
  198. }
  199. }
  200. ~FilePrefetchBuffer() {
  201. // Abort any pending async read request before destroying the class object.
  202. if (fs_ != nullptr) {
  203. std::vector<void*> handles;
  204. for (auto& buf : bufs_) {
  205. if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) {
  206. handles.emplace_back(buf->io_handle_);
  207. }
  208. }
  209. if (!handles.empty()) {
  210. StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
  211. Status s = fs_->AbortIO(handles);
  212. assert(s.ok());
  213. }
  214. for (auto& buf : bufs_) {
  215. if (buf->io_handle_ != nullptr) {
  216. DestroyAndClearIOHandle(buf);
  217. buf->ClearBuffer();
  218. }
  219. buf->async_read_in_progress_ = false;
  220. }
  221. }
  222. // Prefetch buffer bytes discarded.
  223. uint64_t bytes_discarded = 0;
  224. // Iterated over buffers.
  225. for (auto& buf : bufs_) {
  226. if (buf->DoesBufferContainData()) {
  227. // If last read was from this block and some bytes are still unconsumed.
  228. if (prev_offset_ >= buf->offset_ &&
  229. prev_offset_ + prev_len_ < buf->offset_ + buf->CurrentSize()) {
  230. bytes_discarded +=
  231. buf->CurrentSize() - (prev_offset_ + prev_len_ - buf->offset_);
  232. }
  233. // If last read was from previous blocks and this block is unconsumed.
  234. else if (prev_offset_ < buf->offset_ &&
  235. prev_offset_ + prev_len_ <= buf->offset_) {
  236. bytes_discarded += buf->CurrentSize();
  237. }
  238. }
  239. }
  240. RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded);
  241. for (auto& buf : bufs_) {
  242. delete buf;
  243. buf = nullptr;
  244. }
  245. for (auto& buf : free_bufs_) {
  246. delete buf;
  247. buf = nullptr;
  248. }
  249. if (overlap_buf_ != nullptr) {
  250. delete overlap_buf_;
  251. overlap_buf_ = nullptr;
  252. }
  253. }
  254. bool Enabled() const { return enable_; }
  255. // Called externally by user to only load data into the buffer from a file
  256. // with num_buffers_ should be set to default(1).
  257. //
  258. // opts : the IO options to use.
  259. // reader : the file reader.
  260. // offset : the file offset to start reading from.
  261. // n : the number of bytes to read.
  262. //
  263. // Note: Why do we pass in the RandomAccessFileReader* for every single call
  264. // to Prefetch/PrefetchAsync/TryReadFromCache? Why can't we just pass it in at
  265. // construction time?
  266. // Although the RandomAccessFileReader* is often available when creating
  267. // the FilePrefetchBuffer, this is not true for BlobDB (see
  268. // BlobSource::GetBlob). The file reader gets retrieved or created inside
  269. // BlobFileCache::GetBlobFileReader, after we have already allocated a new
  270. // FilePrefetchBuffer.
  271. Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
  272. uint64_t offset, size_t n);
  273. // Request for reading the data from a file asynchronously.
  274. // If data already exists in the buffer, result will be updated.
  275. // reader : the file reader.
  276. // offset : the file offset to start reading from.
  277. // n : the number of bytes to read.
  278. // result : if data already exists in the buffer, result will
  279. // be updated with the data.
  280. //
  281. // If data already exist in the buffer, it will return Status::OK, otherwise
  282. // it will send asynchronous request and return Status::TryAgain.
  283. Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
  284. uint64_t offset, size_t n, Slice* result);
  285. // Tries returning the data for a file read from this buffer if that data is
  286. // in the buffer.
  287. // It handles tracking the minimum read offset if track_min_offset = true.
  288. // It also does the exponential readahead when readahead_size is set as part
  289. // of the constructor.
  290. //
  291. // opts : the IO options to use.
  292. // reader : the file reader.
  293. // offset : the file offset.
  294. // n : the number of bytes.
  295. // result : output buffer to put the data into.
  296. // s : output status.
  297. // for_compaction : true if cache read is done for compaction read.
  298. bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader,
  299. uint64_t offset, size_t n, Slice* result, Status* s,
  300. bool for_compaction = false);
  301. // The minimum `offset` ever passed to TryReadFromCache(). This will nly be
  302. // tracked if track_min_offset = true.
  303. size_t min_offset_read() const { return min_offset_read_; }
  304. size_t GetPrefetchOffset() const { return bufs_.front()->offset_; }
  305. // Called in case of implicit auto prefetching.
  306. void UpdateReadPattern(const uint64_t& offset, const size_t& len,
  307. bool decrease_readaheadsize) {
  308. if (decrease_readaheadsize) {
  309. DecreaseReadAheadIfEligible(offset, len);
  310. }
  311. prev_offset_ = offset;
  312. prev_len_ = len;
  313. explicit_prefetch_submitted_ = false;
  314. }
  315. void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) {
  316. readahead_info->readahead_size = readahead_size_;
  317. readahead_info->num_file_reads = num_file_reads_;
  318. }
  319. void DecreaseReadAheadIfEligible(uint64_t offset, size_t size,
  320. size_t value = DEFAULT_DECREMENT) {
  321. if (bufs_.empty()) {
  322. return;
  323. }
  324. // Decrease the readahead_size if
  325. // - its enabled internally by RocksDB (implicit_auto_readahead_) and,
  326. // - readahead_size is greater than 0 and,
  327. // - this block would have called prefetch API if not found in cache for
  328. // which conditions are:
  329. // - few/no bytes are in buffer and,
  330. // - block is sequential with the previous read and,
  331. // - num_file_reads_ + 1 (including this read) >
  332. // num_file_reads_for_auto_readahead_
  333. size_t curr_size = bufs_.front()->async_read_in_progress_
  334. ? bufs_.front()->async_req_len_
  335. : bufs_.front()->CurrentSize();
  336. if (implicit_auto_readahead_ && readahead_size_ > 0) {
  337. if ((offset + size > bufs_.front()->offset_ + curr_size) &&
  338. IsBlockSequential(offset) &&
  339. (num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) {
  340. readahead_size_ =
  341. std::max(initial_auto_readahead_size_,
  342. (readahead_size_ >= value ? readahead_size_ - value : 0));
  343. }
  344. }
  345. }
  346. // Callback function passed to underlying FS in case of asynchronous reads.
  347. void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg);
  348. void TEST_GetBufferOffsetandSize(
  349. std::vector<std::tuple<uint64_t, size_t, bool>>& buffer_info) {
  350. for (size_t i = 0; i < bufs_.size(); i++) {
  351. std::get<0>(buffer_info[i]) = bufs_[i]->offset_;
  352. std::get<1>(buffer_info[i]) = bufs_[i]->async_read_in_progress_
  353. ? bufs_[i]->async_req_len_
  354. : bufs_[i]->CurrentSize();
  355. std::get<2>(buffer_info[i]) = bufs_[i]->async_read_in_progress_;
  356. }
  357. }
  358. void TEST_GetOverlapBufferOffsetandSize(
  359. std::pair<uint64_t, size_t>& buffer_info) {
  360. if (overlap_buf_ != nullptr) {
  361. buffer_info.first = overlap_buf_->offset_;
  362. buffer_info.second = overlap_buf_->CurrentSize();
  363. }
  364. }
  365. private:
  366. // Calculates roundoff offset and length to be prefetched based on alignment
  367. // and data present in buffer_. It also allocates new buffer or refit tail if
  368. // required.
  369. void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset,
  370. size_t roundup_len, bool refit_tail,
  371. bool use_fs_buffer, uint64_t& aligned_useful_len);
  372. void AbortOutdatedIO(uint64_t offset);
  373. void AbortAllIOs();
  374. void ClearOutdatedData(uint64_t offset, size_t len);
  375. // It calls Poll API to check for any pending asynchronous request.
  376. void PollIfNeeded(uint64_t offset, size_t len);
  377. Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader,
  378. uint64_t offset, size_t length, size_t readahead_size,
  379. bool& copy_to_third_buffer);
  380. Status Read(BufferInfo* buf, const IOOptions& opts,
  381. RandomAccessFileReader* reader, uint64_t read_len,
  382. uint64_t aligned_useful_len, uint64_t start_offset,
  383. bool use_fs_buffer);
  384. Status ReadAsync(BufferInfo* buf, const IOOptions& opts,
  385. RandomAccessFileReader* reader, uint64_t read_len,
  386. uint64_t start_offset);
  387. // Copy the data from src to overlap_buf_.
  388. void CopyDataToOverlapBuffer(BufferInfo* src, uint64_t& offset,
  389. size_t& length);
  390. bool IsBlockSequential(const size_t& offset) {
  391. return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
  392. }
  393. // Called in case of implicit auto prefetching.
  394. void ResetValues() {
  395. num_file_reads_ = 1;
  396. readahead_size_ = initial_auto_readahead_size_;
  397. }
  398. // Called in case of implicit auto prefetching.
  399. bool IsEligibleForPrefetch(uint64_t offset, size_t n) {
  400. // Prefetch only if this read is sequential otherwise reset readahead_size_
  401. // to initial value.
  402. if (!IsBlockSequential(offset)) {
  403. UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
  404. ResetValues();
  405. return false;
  406. }
  407. num_file_reads_++;
  408. // Since async request was submitted in last call directly by calling
  409. // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the
  410. // data submitted in previous call.
  411. if (explicit_prefetch_submitted_) {
  412. return true;
  413. }
  414. if (num_file_reads_ <= num_file_reads_for_auto_readahead_) {
  415. UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
  416. return false;
  417. }
  418. return true;
  419. }
  420. bool IsEligibleForFurtherPrefetching() {
  421. if (free_bufs_.empty()) {
  422. return false;
  423. }
  424. // Readahead size can be 0 because of trimming.
  425. if (readahead_size_ == 0) {
  426. return false;
  427. }
  428. return true;
  429. }
  430. // Whether we reuse the file system provided buffer
  431. // Until we also handle the async read case, only enable this optimization
  432. // for the synchronous case when num_buffers_ = 1.
  433. // Note: Although it would be more convenient if we could determine
  434. // whether we want to reuse the file system buffer at construction time,
  435. // this would not work in all cases, because not all clients (BlobDB in
  436. // particular) have a RandomAccessFileReader* available at construction time.
  437. bool UseFSBuffer(RandomAccessFileReader* reader) {
  438. return reader->file() != nullptr && !reader->use_direct_io() &&
  439. fs_ != nullptr &&
  440. CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer) &&
  441. num_buffers_ == 1;
  442. }
  443. // When we are reusing the file system provided buffer, we are not concerned
  444. // with alignment. However, quite a bit of prefetch code incorporates
  445. // alignment, so we can put in 1 to keep the code simpler.
  446. size_t GetRequiredBufferAlignment(RandomAccessFileReader* reader) {
  447. if (UseFSBuffer(reader)) {
  448. return 1;
  449. }
  450. return reader->file()->GetRequiredBufferAlignment();
  451. }
  452. // Reuses the file system allocated buffer to avoid an extra copy
  453. IOStatus FSBufferDirectRead(RandomAccessFileReader* reader, BufferInfo* buf,
  454. const IOOptions& opts, uint64_t offset, size_t n,
  455. Slice& result) {
  456. FSReadRequest read_req;
  457. read_req.offset = offset;
  458. read_req.len = n;
  459. read_req.scratch = nullptr;
  460. IOStatus s = reader->MultiRead(opts, &read_req, 1, nullptr);
  461. if (!s.ok()) {
  462. return s;
  463. }
  464. s = read_req.status;
  465. if (!s.ok()) {
  466. return s;
  467. }
  468. buf->buffer_.SetBuffer(read_req.result, std::move(read_req.fs_scratch));
  469. buf->offset_ = offset;
  470. buf->initial_end_offset_ = offset + read_req.result.size();
  471. result = read_req.result;
  472. return s;
  473. }
  474. void DestroyAndClearIOHandle(BufferInfo* buf) {
  475. if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
  476. buf->del_fn_(buf->io_handle_);
  477. buf->io_handle_ = nullptr;
  478. buf->del_fn_ = nullptr;
  479. }
  480. buf->async_read_in_progress_ = false;
  481. }
  482. void HandleOverlappingSyncData(uint64_t offset, size_t length,
  483. uint64_t& tmp_offset, size_t& tmp_length,
  484. bool& use_overlap_buffer);
  485. Status HandleOverlappingAsyncData(const IOOptions& opts,
  486. RandomAccessFileReader* reader,
  487. uint64_t offset, size_t length,
  488. size_t readahead_size,
  489. bool& copy_to_third_buffer,
  490. uint64_t& tmp_offset, size_t& tmp_length);
  491. bool TryReadFromCacheUntracked(const IOOptions& opts,
  492. RandomAccessFileReader* reader,
  493. uint64_t offset, size_t n, Slice* result,
  494. Status* s, bool for_compaction = false);
  495. void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block,
  496. bool refit_tail, bool use_fs_buffer,
  497. uint64_t prev_buf_end_offset, size_t alignment,
  498. size_t length, size_t readahead_size,
  499. uint64_t& offset, uint64_t& end_offset,
  500. size_t& read_len, uint64_t& aligned_useful_len);
  501. void UpdateStats(bool found_in_buffer, size_t length_found) {
  502. if (usage_ != FilePrefetchBufferUsage::kUserScanPrefetch) {
  503. return;
  504. }
  505. if (found_in_buffer) {
  506. RecordTick(stats_, PREFETCH_HITS);
  507. }
  508. if (length_found > 0) {
  509. RecordTick(stats_, PREFETCH_BYTES_USEFUL, length_found);
  510. }
  511. }
  512. void UpdateReadAheadTrimmedStat(size_t initial_length,
  513. size_t updated_length) {
  514. if (initial_length != updated_length) {
  515. RecordTick(stats_, READAHEAD_TRIMMED);
  516. }
  517. }
  518. Status PrefetchRemBuffers(const IOOptions& opts,
  519. RandomAccessFileReader* reader,
  520. uint64_t end_offset1, size_t alignment,
  521. size_t readahead_size);
  522. // *** BEGIN APIs related to allocating and freeing buffers ***
  523. bool IsBufferQueueEmpty() { return bufs_.empty(); }
  524. BufferInfo* GetFirstBuffer() { return bufs_.front(); }
  525. BufferInfo* GetLastBuffer() { return bufs_.back(); }
  526. size_t NumBuffersAllocated() { return bufs_.size(); }
  527. void AllocateBuffer() {
  528. assert(!free_bufs_.empty());
  529. BufferInfo* buf = free_bufs_.front();
  530. free_bufs_.pop_front();
  531. bufs_.emplace_back(buf);
  532. }
  533. void AllocateBufferIfEmpty() {
  534. if (bufs_.empty()) {
  535. AllocateBuffer();
  536. }
  537. }
  538. void FreeFrontBuffer() {
  539. BufferInfo* buf = bufs_.front();
  540. buf->ClearBuffer();
  541. bufs_.pop_front();
  542. free_bufs_.emplace_back(buf);
  543. }
  544. void FreeLastBuffer() {
  545. BufferInfo* buf = bufs_.back();
  546. buf->ClearBuffer();
  547. bufs_.pop_back();
  548. free_bufs_.emplace_back(buf);
  549. }
  550. void FreeAllBuffers() {
  551. while (!bufs_.empty()) {
  552. BufferInfo* buf = bufs_.front();
  553. buf->ClearBuffer();
  554. bufs_.pop_front();
  555. free_bufs_.emplace_back(buf);
  556. }
  557. }
  558. void FreeEmptyBuffers() {
  559. if (bufs_.empty()) {
  560. return;
  561. }
  562. std::deque<BufferInfo*> tmp_buf;
  563. while (!bufs_.empty()) {
  564. BufferInfo* buf = bufs_.front();
  565. bufs_.pop_front();
  566. if (buf->async_read_in_progress_ || buf->DoesBufferContainData()) {
  567. tmp_buf.emplace_back(buf);
  568. } else {
  569. free_bufs_.emplace_back(buf);
  570. }
  571. }
  572. bufs_ = tmp_buf;
  573. }
  574. // *** END APIs related to allocating and freeing buffers ***
  575. std::deque<BufferInfo*> bufs_;
  576. std::deque<BufferInfo*> free_bufs_;
  577. BufferInfo* overlap_buf_ = nullptr;
  578. size_t readahead_size_;
  579. size_t initial_auto_readahead_size_;
  580. // FilePrefetchBuffer object won't be created from Iterator flow if
  581. // max_readahead_size_ = 0.
  582. size_t max_readahead_size_;
  583. // The minimum `offset` ever passed to TryReadFromCache().
  584. size_t min_offset_read_;
  585. // if false, TryReadFromCache() always return false, and we only take stats
  586. // for track_min_offset_ if track_min_offset_ = true
  587. bool enable_;
  588. // If true, track minimum `offset` ever passed to TryReadFromCache(), which
  589. // can be fetched from min_offset_read().
  590. bool track_min_offset_;
  591. // implicit_auto_readahead is enabled by rocksdb internally after 2
  592. // sequential IOs.
  593. bool implicit_auto_readahead_;
  594. uint64_t prev_offset_;
  595. size_t prev_len_;
  596. // num_file_reads_ and num_file_reads_for_auto_readahead_ is only used when
  597. // implicit_auto_readahead_ is set.
  598. uint64_t num_file_reads_for_auto_readahead_;
  599. uint64_t num_file_reads_;
  600. // If explicit_prefetch_submitted_ is set then it indicates RocksDB called
  601. // PrefetchAsync to submit request. It needs to call TryReadFromCache to
  602. // poll the submitted request without checking if data is sequential and
  603. // num_file_reads_.
  604. bool explicit_prefetch_submitted_;
  605. FileSystem* fs_;
  606. SystemClock* clock_;
  607. Statistics* stats_;
  608. FilePrefetchBufferUsage usage_;
  609. std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb_;
  610. // num_buffers_ is the number of buffers maintained by FilePrefetchBuffer to
  611. // prefetch the data at a time.
  612. size_t num_buffers_;
  613. };
  614. } // namespace ROCKSDB_NAMESPACE