block_based_table_iterator.h 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <deque>
  11. #include "db/seqno_to_time_mapping.h"
  12. #include "table/block_based/block_based_table_reader.h"
  13. #include "table/block_based/block_based_table_reader_impl.h"
  14. #include "table/block_based/block_prefetcher.h"
  15. #include "table/block_based/reader_common.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. // Iterates over the contents of BlockBasedTable.
  18. class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
  19. // compaction_readahead_size: its value will only be used if for_compaction =
  20. // true
  21. // @param read_options Must outlive this iterator.
  22. public:
  23. BlockBasedTableIterator(
  24. const BlockBasedTable* table, const ReadOptions& read_options,
  25. const InternalKeyComparator& icomp,
  26. std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter,
  27. bool check_filter, bool need_upper_bound_check,
  28. const SliceTransform* prefix_extractor, TableReaderCaller caller,
  29. size_t compaction_readahead_size = 0, bool allow_unprepared_value = false)
  30. : index_iter_(std::move(index_iter)),
  31. table_(table),
  32. read_options_(read_options),
  33. icomp_(icomp),
  34. user_comparator_(icomp.user_comparator()),
  35. pinned_iters_mgr_(nullptr),
  36. prefix_extractor_(prefix_extractor),
  37. lookup_context_(caller),
  38. block_prefetcher_(
  39. compaction_readahead_size,
  40. table_->get_rep()->table_options.initial_auto_readahead_size),
  41. allow_unprepared_value_(allow_unprepared_value),
  42. check_filter_(check_filter),
  43. need_upper_bound_check_(need_upper_bound_check),
  44. async_read_in_progress_(false),
  45. is_last_level_(table->IsLastLevel()),
  46. block_iter_points_to_real_block_(false) {
  47. multi_scan_status_.PermitUncheckedError();
  48. }
  49. ~BlockBasedTableIterator() override { ClearBlockHandles(); }
  50. void Seek(const Slice& target) override;
  51. void SeekForPrev(const Slice& target) override;
  52. void SeekToFirst() override;
  53. void SeekToLast() override;
  54. void Next() final override;
  55. bool NextAndGetResult(IterateResult* result) override;
  56. void Prev() override;
  57. bool Valid() const override {
  58. return !is_out_of_bound_ && multi_scan_status_.ok() &&
  59. (is_at_first_key_from_index_ ||
  60. (block_iter_points_to_real_block_ && block_iter_.Valid()));
  61. }
  62. // For block cache readahead lookup scenario -
  63. // If is_at_first_key_from_index_ is true, InitDataBlock hasn't been
  64. // called. It means block_handles is empty and index_ point to current block.
  65. // So index_iter_ can be accessed directly.
  66. Slice key() const override {
  67. assert(Valid());
  68. if (is_at_first_key_from_index_) {
  69. assert(!multi_scan_);
  70. return index_iter_->value().first_internal_key;
  71. } else {
  72. return block_iter_.key();
  73. }
  74. }
  75. Slice user_key() const override {
  76. assert(Valid());
  77. if (is_at_first_key_from_index_) {
  78. return ExtractUserKey(index_iter_->value().first_internal_key);
  79. } else {
  80. return block_iter_.user_key();
  81. }
  82. }
  83. bool PrepareValue() override {
  84. assert(Valid());
  85. if (!is_at_first_key_from_index_) {
  86. return true;
  87. }
  88. return const_cast<BlockBasedTableIterator*>(this)
  89. ->MaterializeCurrentBlock();
  90. }
  91. uint64_t write_unix_time() const override {
  92. assert(Valid());
  93. ParsedInternalKey pikey;
  94. SequenceNumber seqno;
  95. const SeqnoToTimeMapping& seqno_to_time_mapping =
  96. table_->GetSeqnoToTimeMapping();
  97. Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
  98. if (!s.ok()) {
  99. return std::numeric_limits<uint64_t>::max();
  100. } else if (kUnknownSeqnoBeforeAll == pikey.sequence) {
  101. return kUnknownTimeBeforeAll;
  102. } else if (seqno_to_time_mapping.Empty()) {
  103. return std::numeric_limits<uint64_t>::max();
  104. } else if (kTypeValuePreferredSeqno == pikey.type) {
  105. seqno = ParsePackedValueForSeqno(value());
  106. } else {
  107. seqno = pikey.sequence;
  108. }
  109. return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno);
  110. }
  111. Slice value() const override {
  112. // PrepareValue() must have been called.
  113. assert(!is_at_first_key_from_index_);
  114. assert(Valid());
  115. if (seek_stat_state_ & kReportOnUseful) {
  116. bool filter_used = (seek_stat_state_ & kFilterUsed) != 0;
  117. RecordTick(
  118. table_->GetStatistics(),
  119. filter_used
  120. ? (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH
  121. : NON_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH)
  122. : (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER
  123. : NON_LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER));
  124. seek_stat_state_ = kDataBlockReadSinceLastSeek;
  125. }
  126. return block_iter_.value();
  127. }
  128. Status status() const override {
  129. if (!multi_scan_status_.ok()) {
  130. return multi_scan_status_;
  131. }
  132. // In case of block cache readahead lookup, it won't add the block to
  133. // block_handles if it's index is invalid. So index_iter_->status check can
  134. // be skipped.
  135. // Prefix index set status to NotFound when the prefix does not exist.
  136. if (IsIndexAtCurr() && !index_iter_->status().ok() &&
  137. !index_iter_->status().IsNotFound()) {
  138. assert(!multi_scan_);
  139. return index_iter_->status();
  140. } else if (block_iter_points_to_real_block_) {
  141. // This is the common case.
  142. return block_iter_.status();
  143. } else if (async_read_in_progress_) {
  144. assert(!multi_scan_);
  145. return Status::TryAgain("Async read in progress");
  146. } else if (multi_scan_) {
  147. return multi_scan_status_;
  148. } else {
  149. return Status::OK();
  150. }
  151. }
  152. inline IterBoundCheck UpperBoundCheckResult() override {
  153. if (is_out_of_bound_) {
  154. return IterBoundCheck::kOutOfBound;
  155. } else if (block_upper_bound_check_ ==
  156. BlockUpperBound::kUpperBoundBeyondCurBlock) {
  157. assert(!is_out_of_bound_);
  158. // MultiScan does not do block level upper bound check yet.
  159. assert(!multi_scan_);
  160. return IterBoundCheck::kInbound;
  161. } else {
  162. return IterBoundCheck::kUnknown;
  163. }
  164. }
  165. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  166. pinned_iters_mgr_ = pinned_iters_mgr;
  167. }
  168. bool IsKeyPinned() const override {
  169. // Our key comes either from block_iter_'s current key
  170. // or index_iter_'s current *value*.
  171. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  172. ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
  173. (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
  174. }
  175. bool IsValuePinned() const override {
  176. assert(!is_at_first_key_from_index_);
  177. assert(Valid());
  178. // BlockIter::IsValuePinned() is always true. No need to check
  179. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  180. block_iter_points_to_real_block_;
  181. }
  182. void ResetDataIter() {
  183. if (block_iter_points_to_real_block_) {
  184. if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
  185. block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
  186. }
  187. block_iter_.Invalidate(Status::OK());
  188. block_iter_points_to_real_block_ = false;
  189. }
  190. block_upper_bound_check_ = BlockUpperBound::kUnknown;
  191. }
  192. void SavePrevIndexValue() {
  193. if (block_iter_points_to_real_block_ && IsIndexAtCurr()) {
  194. // Reseek. If they end up with the same data block, we shouldn't re-fetch
  195. // the same data block.
  196. prev_block_offset_ = index_iter_->value().handle.offset();
  197. }
  198. }
  199. void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
  200. if (block_prefetcher_.prefetch_buffer() != nullptr &&
  201. read_options_.adaptive_readahead) {
  202. block_prefetcher_.prefetch_buffer()->GetReadaheadState(
  203. &(readahead_file_info->data_block_readahead_info));
  204. if (index_iter_) {
  205. index_iter_->GetReadaheadState(readahead_file_info);
  206. }
  207. }
  208. }
  209. void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
  210. if (read_options_.adaptive_readahead) {
  211. block_prefetcher_.SetReadaheadState(
  212. &(readahead_file_info->data_block_readahead_info));
  213. if (index_iter_) {
  214. index_iter_->SetReadaheadState(readahead_file_info);
  215. }
  216. }
  217. }
  218. void Prepare(const MultiScanArgs* scan_opts) override;
  219. FilePrefetchBuffer* prefetch_buffer() {
  220. return block_prefetcher_.prefetch_buffer();
  221. }
  222. std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
  223. bool TEST_IsBlockPinnedByMultiScan(size_t block_idx) {
  224. if (!multi_scan_) {
  225. return false;
  226. }
  227. if (block_idx >= multi_scan_->pinned_data_blocks.size()) {
  228. return false;
  229. }
  230. return !multi_scan_->pinned_data_blocks[block_idx].IsEmpty();
  231. }
  232. private:
  233. enum class IterDirection {
  234. kForward,
  235. kBackward,
  236. };
  237. // This enum indicates whether the upper bound falls into current block
  238. // or beyond.
  239. // +-------------+
  240. // | cur block | <-- (1)
  241. // +-------------+
  242. // <-- (2)
  243. // --- <boundary key> ---
  244. // <-- (3)
  245. // +-------------+
  246. // | next block | <-- (4)
  247. // ......
  248. //
  249. // When the block is smaller than <boundary key>, kUpperBoundInCurBlock
  250. // is the value to use. The examples are (1) or (2) in the graph. It means
  251. // all keys in the next block or beyond will be out of bound. Keys within
  252. // the current block may or may not be out of bound.
  253. // When the block is larger or equal to <boundary key>,
  254. // kUpperBoundBeyondCurBlock is to be used. The examples are (3) and (4)
  255. // in the graph. It means that all keys in the current block is within the
  256. // upper bound and keys in the next block may or may not be within the uppder
  257. // bound.
  258. // If the boundary key hasn't been checked against the upper bound,
  259. // kUnknown can be used.
  260. enum class BlockUpperBound : uint8_t {
  261. kUpperBoundInCurBlock,
  262. kUpperBoundBeyondCurBlock,
  263. kUnknown,
  264. };
  265. // State bits for collecting stats on seeks and whether they returned useful
  266. // results.
  267. enum SeekStatState : uint8_t {
  268. kNone = 0,
  269. // Most recent seek checked prefix filter (or similar future feature)
  270. kFilterUsed = 1 << 0,
  271. // Already recorded that a data block was accessed since the last seek.
  272. kDataBlockReadSinceLastSeek = 1 << 1,
  273. // Have not yet recorded that a value() was accessed.
  274. kReportOnUseful = 1 << 2,
  275. };
  276. // BlockHandleInfo is used to store the info needed when block cache lookup
  277. // ahead is enabled to tune readahead_size.
  278. struct BlockHandleInfo {
  279. void SetFirstInternalKey(const Slice& key) {
  280. if (key.empty()) {
  281. return;
  282. }
  283. size_t size = key.size();
  284. buf_ = std::unique_ptr<char[]>(new char[size]);
  285. memcpy(buf_.get(), key.data(), size);
  286. first_internal_key_ = Slice(buf_.get(), size);
  287. }
  288. BlockHandle handle_;
  289. bool is_cache_hit_ = false;
  290. CachableEntry<Block> cachable_entry_;
  291. Slice first_internal_key_;
  292. std::unique_ptr<char[]> buf_;
  293. };
  294. bool IsIndexAtCurr() const { return is_index_at_curr_block_; }
  295. const BlockBasedTable* table_;
  296. const ReadOptions& read_options_;
  297. const InternalKeyComparator& icomp_;
  298. UserComparatorWrapper user_comparator_;
  299. PinnedIteratorsManager* pinned_iters_mgr_;
  300. DataBlockIter block_iter_;
  301. const SliceTransform* prefix_extractor_;
  302. uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
  303. BlockCacheLookupContext lookup_context_;
  304. BlockPrefetcher block_prefetcher_;
  305. // It stores all the block handles that are lookuped in cache ahead when
  306. // BlockCacheLookupForReadAheadSize is called. Since index_iter_ may point to
  307. // different blocks when readahead_size is calculated in
  308. // BlockCacheLookupForReadAheadSize, to avoid index_iter_ reseek,
  309. // block_handles_ is used.
  310. // `block_handles_` is lazily constructed to save CPU when it is unused
  311. std::unique_ptr<std::deque<BlockHandleInfo>> block_handles_;
  312. // The prefix of the key called with SeekImpl().
  313. // This is for readahead trimming so no data blocks containing keys of a
  314. // different prefix are prefetched
  315. std::string seek_key_prefix_for_readahead_trimming_ = "";
  316. const bool allow_unprepared_value_;
  317. // How current data block's boundary key with the next block is compared with
  318. // iterate upper bound.
  319. BlockUpperBound block_upper_bound_check_ = BlockUpperBound::kUnknown;
  320. // True if we're standing at the first key of a block, and we haven't loaded
  321. // that block yet. A call to PrepareValue() will trigger loading the block.
  322. bool is_at_first_key_from_index_ = false;
  323. bool check_filter_;
  324. // TODO(Zhongyi): pick a better name
  325. bool need_upper_bound_check_;
  326. bool async_read_in_progress_;
  327. mutable SeekStatState seek_stat_state_ = SeekStatState::kNone;
  328. bool is_last_level_;
  329. // If set to true, it'll lookup in the cache ahead to estimate the readahead
  330. // size based on cache hit and miss.
  331. bool readahead_cache_lookup_ = false;
  332. bool is_index_out_of_bound_ = false;
  333. // Used in case of auto_readahead_size to disable the block_cache lookup if
  334. // direction is reversed from forward to backward. In case of backward
  335. // direction, SeekForPrev or Prev might call Seek from db_iter. So direction
  336. // is used to disable the lookup.
  337. IterDirection direction_ = IterDirection::kForward;
  338. //*** BEGIN States used by both regular scan and multiscan
  339. // True if block_iter_ is initialized and points to the same block
  340. // as index iterator.
  341. bool block_iter_points_to_real_block_;
  342. // See InternalIteratorBase::IsOutOfBound().
  343. bool is_out_of_bound_ = false;
  344. // Mark prepared ranges as exhausted for multiscan.
  345. void MarkPreparedRangeExhausted() {
  346. assert(multi_scan_ != nullptr);
  347. if (multi_scan_->next_scan_idx <
  348. multi_scan_->block_index_ranges_per_scan.size()) {
  349. // If there are more prepared ranges, we don't ResetDataIter() here,
  350. // because next scan might be reading from the same block. ResetDataIter()
  351. // will free the underlying block cache handle and we don't want the
  352. // block to be unpinned.
  353. // Set out of bound to mark the current prepared range as exhausted.
  354. is_out_of_bound_ = true;
  355. } else {
  356. // This is the last prepared range of this file, there might be more
  357. // data on next file. Reset data iterator to indicate the iterator is
  358. // no longer valid on this file. Let LevelIter advance to the next file
  359. // instead of ending the scan.
  360. ResetDataIter();
  361. }
  362. }
  363. // During cache lookup to find readahead size, index_iter_ is iterated and it
  364. // can point to a different block.
  365. // If Prepare() is called, index_iter_ is used to prefetch data blocks for the
  366. // multiscan, so is_index_at_curr_block_ will be false.
  367. // Whether index is expected to match the current data_block_iter_.
  368. bool is_index_at_curr_block_ = true;
  369. // *** END States used by both regular scan and multiscan
  370. // *** BEGIN MultiScan related states ***
  371. struct AsyncReadState {
  372. std::unique_ptr<char[]> buf{nullptr};
  373. // Indices into pinned_data_blocks that this request reads.
  374. std::vector<size_t> block_indices;
  375. // BlockHandle for each block in block_indices.
  376. std::vector<BlockHandle> blocks;
  377. void* io_handle{nullptr};
  378. IOHandleDeleter del_fn{nullptr};
  379. // offset for this async read request.
  380. uint64_t offset{0};
  381. // These two states are populated from the FSReadRequest
  382. // by ReadAsync callback
  383. Status status;
  384. Slice result;
  385. // For direct I/O support
  386. AlignedBuf aligned_buf{nullptr};
  387. bool finished{false};
  388. AsyncReadState() = default;
  389. DECLARE_DEFAULT_MOVES(AsyncReadState);
  390. // Delete copy operations
  391. AsyncReadState(const AsyncReadState&) = delete;
  392. AsyncReadState& operator=(const AsyncReadState&) = delete;
  393. void CleanUpIOHandle() {
  394. if (io_handle != nullptr) {
  395. assert(del_fn);
  396. del_fn(io_handle);
  397. io_handle = nullptr;
  398. }
  399. finished = true;
  400. }
  401. ~AsyncReadState() {
  402. // Should be cleaned up before destruction.
  403. assert(io_handle == nullptr);
  404. }
  405. };
  406. struct MultiScanState {
  407. // For Aborting async I/Os in destructor.
  408. const std::shared_ptr<FileSystem> fs;
  409. const MultiScanArgs* scan_opts;
  410. std::vector<CachableEntry<Block>> pinned_data_blocks;
  411. // The separator of each data block in above pinned_data_blocks vector.
  412. // Its size is same as pinned_data_blocks.
  413. // The value of separator is larger than or equal to the last key in the
  414. // corresponding data block.
  415. std::vector<std::string> data_block_separators;
  416. // Track previously seeked key in multi-scan.
  417. // This is used to ensure that the seek key is keep moving forward, as
  418. // blocks that are smaller than the seek key are unpinned from memory.
  419. std::string prev_seek_key_;
  420. // Indicies into pinned_data_blocks for data blocks for each scan range.
  421. // inclusive start, exclusive end
  422. std::vector<std::tuple<size_t, size_t>> block_index_ranges_per_scan;
  423. size_t next_scan_idx;
  424. size_t cur_data_block_idx;
  425. // States for async reads.
  426. //
  427. // Each async state correspond to an async read request.
  428. // Each async read request may read content for multiple blocks
  429. // (potentially coalesced). In PollForBlock(idx), we will poll for the
  430. // completion of the async read request responsible for
  431. // pinned_data_blocks[idx], and populate `pinned_data_blocks` with all the
  432. // blocks read. To find out the async read request responsible for
  433. // pinned_data_blocks[idx], we store the mapping in
  434. // block_idx_to_readreq_idx. Index i is in block_idx_to_readreq_idx and
  435. // block_idx_to_readreq_idx[i] = j iff pinned_data_blocks[i] is read by
  436. // async_states[j].
  437. std::vector<AsyncReadState> async_states;
  438. UnorderedMap<size_t, size_t> block_idx_to_readreq_idx;
  439. size_t prefetch_max_idx;
  440. MultiScanState(
  441. const std::shared_ptr<FileSystem>& _fs, const MultiScanArgs* _scan_opts,
  442. std::vector<CachableEntry<Block>>&& _pinned_data_blocks,
  443. std::vector<std::string>&& _data_block_separators,
  444. std::vector<std::tuple<size_t, size_t>>&& _block_index_ranges_per_scan,
  445. UnorderedMap<size_t, size_t>&& _block_idx_to_readreq_idx,
  446. std::vector<AsyncReadState>&& _async_states, size_t _prefetch_max_idx)
  447. : fs(_fs),
  448. scan_opts(_scan_opts),
  449. pinned_data_blocks(std::move(_pinned_data_blocks)),
  450. data_block_separators(std::move(_data_block_separators)),
  451. block_index_ranges_per_scan(std::move(_block_index_ranges_per_scan)),
  452. next_scan_idx(0),
  453. cur_data_block_idx(0),
  454. async_states(std::move(_async_states)),
  455. block_idx_to_readreq_idx(std::move(_block_idx_to_readreq_idx)),
  456. prefetch_max_idx(_prefetch_max_idx) {}
  457. ~MultiScanState();
  458. };
  459. Status multi_scan_status_;
  460. std::unique_ptr<MultiScanState> multi_scan_;
  461. // *** END MultiScan related APIs and states ***
  462. void SeekSecondPass(const Slice* target);
  463. // If `target` is null, seek to first.
  464. void SeekImpl(const Slice* target, bool async_prefetch);
  465. void InitDataBlock();
  466. void AsyncInitDataBlock(bool is_first_pass);
  467. bool MaterializeCurrentBlock();
  468. void FindKeyForward();
  469. void FindBlockForward();
  470. void FindKeyBackward();
  471. void CheckOutOfBound();
  472. // Check if data block is fully within iterate_upper_bound.
  473. //
  474. // Note MyRocks may update iterate bounds between seek. To workaround it,
  475. // we need to check and update data_block_within_upper_bound_ accordingly.
  476. void CheckDataBlockWithinUpperBound();
  477. bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction,
  478. bool* filter_checked) {
  479. if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
  480. // Upper bound check isn't sufficient for backward direction to
  481. // guarantee the same result as total order, so disable prefix
  482. // check.
  483. return true;
  484. }
  485. if (check_filter_ &&
  486. !table_->PrefixRangeMayMatch(ikey, read_options_, prefix_extractor_,
  487. need_upper_bound_check_, &lookup_context_,
  488. filter_checked)) {
  489. // TODO remember the iterator is invalidated because of prefix
  490. // match. This can avoid the upper level file iterator to falsely
  491. // believe the position is the end of the SST file and move to
  492. // the first key of the next file.
  493. ResetDataIter();
  494. return false;
  495. }
  496. return true;
  497. }
  498. // *** BEGIN APIs relevant to auto tuning of readahead_size ***
  499. // This API is called to lookup the data blocks ahead in the cache to tune
  500. // the start and end offsets passed.
  501. void BlockCacheLookupForReadAheadSize(bool read_curr_block,
  502. uint64_t& start_offset,
  503. uint64_t& end_offset);
  504. void ResetBlockCacheLookupVar() {
  505. is_index_out_of_bound_ = false;
  506. readahead_cache_lookup_ = false;
  507. ClearBlockHandles();
  508. }
  509. bool IsNextBlockOutOfReadaheadBound() {
  510. const Slice& index_iter_user_key = index_iter_->user_key();
  511. // If curr block's index key >= iterate_upper_bound, it means all the keys
  512. // in next block or above are out of bound.
  513. bool out_of_upper_bound =
  514. read_options_.iterate_upper_bound != nullptr &&
  515. (user_comparator_.CompareWithoutTimestamp(
  516. index_iter_user_key,
  517. /*a_has_ts=*/true, *read_options_.iterate_upper_bound,
  518. /*b_has_ts=*/false) >= 0
  519. ? true
  520. : false);
  521. if (out_of_upper_bound) {
  522. return true;
  523. }
  524. // If curr block's index key has a different prefix from the seek key's, it
  525. // means all the keys in next block or above has a different prefix from the
  526. // seek key's.
  527. bool out_of_prefix_bound =
  528. (read_options_.prefix_same_as_start &&
  529. !seek_key_prefix_for_readahead_trimming_.empty() &&
  530. (prefix_extractor_->InDomain(index_iter_user_key)
  531. ? (prefix_extractor_->Transform(index_iter_user_key)
  532. .compare(seek_key_prefix_for_readahead_trimming_) != 0)
  533. : user_comparator_.CompareWithoutTimestamp(
  534. index_iter_user_key,
  535. /*a_has_ts=*/true, seek_key_prefix_for_readahead_trimming_,
  536. /*b_has_ts=*/false) > 0));
  537. if (out_of_prefix_bound) {
  538. return true;
  539. }
  540. return false;
  541. }
  542. void ClearBlockHandles() {
  543. if (block_handles_ != nullptr) {
  544. block_handles_->clear();
  545. }
  546. }
  547. // Reset prev_block_offset_. If index_iter_ has moved ahead, it won't get
  548. // accurate prev_block_offset_.
  549. void ResetPreviousBlockOffset() {
  550. prev_block_offset_ = std::numeric_limits<uint64_t>::max();
  551. }
  552. bool DoesContainBlockHandles() {
  553. return block_handles_ != nullptr && !block_handles_->empty();
  554. }
  555. void InitializeStartAndEndOffsets(bool read_curr_block,
  556. bool& found_first_miss_block,
  557. uint64_t& start_updated_offset,
  558. uint64_t& end_updated_offset,
  559. size_t& prev_handles_size);
  560. // *** END APIs relevant to auto tuning of readahead_size ***
  561. // *** BEGIN APIs relevant to multiscan ***
  562. void SeekMultiScan(const Slice* target);
  563. void FindBlockForwardInMultiScan();
  564. void PrepareReadAsyncCallBack(FSReadRequest& req, void* cb_arg) {
  565. // Record status, result and sanity check offset from `req`.
  566. AsyncReadState* async_state = static_cast<AsyncReadState*>(cb_arg);
  567. async_state->status = req.status;
  568. async_state->result = req.result;
  569. if (async_state->status.ok()) {
  570. assert(async_state->offset == req.offset);
  571. if (async_state->offset != req.offset) {
  572. async_state->status = Status::InvalidArgument(
  573. "offset mismatch between async read request " +
  574. std::to_string(async_state->offset) + " and async callback " +
  575. std::to_string(req.offset));
  576. }
  577. } else {
  578. assert(async_state->status.IsAborted());
  579. }
  580. }
  581. void MultiScanSeekTargetFromBlock(const Slice* seek_target, size_t block_idx);
  582. void MultiScanUnexpectedSeekTarget(const Slice* seek_target,
  583. const Slice* user_seek_target,
  584. size_t block_idx);
  585. // Return true, if there is an error, or end of file
  586. bool MultiScanLoadDataBlock(size_t idx) {
  587. if (idx >= multi_scan_->prefetch_max_idx) {
  588. // TODO: Fix the max_prefetch_size support for multiple files.
  589. // The goal is to limit the memory usage, prefetch could be done
  590. // incrementally.
  591. if (multi_scan_->scan_opts->max_prefetch_size == 0) {
  592. // If max_prefetch_size is not set, treat this as end of file.
  593. ResetDataIter();
  594. assert(!is_out_of_bound_);
  595. assert(!Valid());
  596. } else {
  597. // If max_prefetch_size is set, treat this as error.
  598. multi_scan_status_ = Status::PrefetchLimitReached();
  599. }
  600. return true;
  601. }
  602. if (!multi_scan_->async_states.empty()) {
  603. multi_scan_status_ = PollForBlock(idx);
  604. if (!multi_scan_status_.ok()) {
  605. return true;
  606. }
  607. }
  608. // This block should have been initialized
  609. assert(multi_scan_->pinned_data_blocks[idx].GetValue());
  610. // Note that the block_iter_ takes ownership of the pinned data block
  611. // TODO: we can delegate the clean up like with pinned_iters_mgr_ if
  612. // need to pin blocks longer.
  613. table_->NewDataBlockIterator<DataBlockIter>(
  614. read_options_, multi_scan_->pinned_data_blocks[idx], &block_iter_,
  615. Status::OK());
  616. return false;
  617. }
  618. // After PollForBlock(idx), the async request that contains
  619. // pinned_data_blocks[idx] should be done, and all blocks contained in this
  620. // read request will be initialzed in pinned_data_blocks and pinned in block
  621. // cache.
  622. Status PollForBlock(size_t idx);
  623. // Helper function to create and pin a block in cache from buffer data
  624. // Handles decompressor setup with dictionary loading and block
  625. // creation/pinning. The buffer_start_offset is the file offset where
  626. // buffer_data starts.
  627. Status CreateAndPinBlockFromBuffer(const BlockHandle& block,
  628. uint64_t buffer_start_offset,
  629. const Slice& buffer_data,
  630. CachableEntry<Block>& pinned_block_entry);
  631. Status CollectBlockHandles(
  632. const std::vector<ScanOptions>& scan_opts,
  633. std::vector<BlockHandle>* scan_block_handles,
  634. std::vector<std::tuple<size_t, size_t>>* block_index_ranges_per_scan,
  635. std::vector<std::string>* data_block_boundary_keys);
  636. Status FilterAndPinCachedBlocks(
  637. const std::vector<BlockHandle>& scan_block_handles,
  638. const MultiScanArgs* multiscan_opts,
  639. std::vector<size_t>* block_indices_to_read,
  640. std::vector<CachableEntry<Block>>* pinned_data_blocks_guard,
  641. size_t* prefetched_max_idx);
  642. void PrepareIORequests(
  643. const std::vector<size_t>& block_indices_to_read,
  644. const std::vector<BlockHandle>& scan_block_handles,
  645. const MultiScanArgs* multiscan_opts,
  646. std::vector<FSReadRequest>* read_reqs,
  647. UnorderedMap<size_t, size_t>* block_idx_to_readreq_idx,
  648. std::vector<std::vector<size_t>>* coalesced_block_indices);
  649. Status ExecuteIO(
  650. const std::vector<BlockHandle>& scan_block_handles,
  651. const MultiScanArgs* multiscan_opts,
  652. const std::vector<std::vector<size_t>>& coalesced_block_indices,
  653. std::vector<FSReadRequest>* read_reqs,
  654. std::vector<AsyncReadState>* async_states,
  655. std::vector<CachableEntry<Block>>* pinned_data_blocks_guard);
  656. // *** END APIs relevant to multiscan ***
  657. };
  658. } // namespace ROCKSDB_NAMESPACE