// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include #include "db/seqno_to_time_mapping.h" #include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader_impl.h" #include "table/block_based/block_prefetcher.h" #include "table/block_based/reader_common.h" namespace ROCKSDB_NAMESPACE { // Iterates over the contents of BlockBasedTable. class BlockBasedTableIterator : public InternalIteratorBase { // compaction_readahead_size: its value will only be used if for_compaction = // true // @param read_options Must outlive this iterator. public: BlockBasedTableIterator( const BlockBasedTable* table, const ReadOptions& read_options, const InternalKeyComparator& icomp, std::unique_ptr>&& index_iter, bool check_filter, bool need_upper_bound_check, const SliceTransform* prefix_extractor, TableReaderCaller caller, size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) : index_iter_(std::move(index_iter)), table_(table), read_options_(read_options), icomp_(icomp), user_comparator_(icomp.user_comparator()), pinned_iters_mgr_(nullptr), prefix_extractor_(prefix_extractor), lookup_context_(caller), block_prefetcher_( compaction_readahead_size, table_->get_rep()->table_options.initial_auto_readahead_size), allow_unprepared_value_(allow_unprepared_value), check_filter_(check_filter), need_upper_bound_check_(need_upper_bound_check), async_read_in_progress_(false), is_last_level_(table->IsLastLevel()), block_iter_points_to_real_block_(false) { multi_scan_status_.PermitUncheckedError(); } ~BlockBasedTableIterator() override { ClearBlockHandles(); } void Seek(const Slice& target) override; void SeekForPrev(const Slice& target) override; void SeekToFirst() override; void SeekToLast() override; void Next() final override; bool NextAndGetResult(IterateResult* result) override; void Prev() override; bool Valid() const override { return !is_out_of_bound_ && multi_scan_status_.ok() && (is_at_first_key_from_index_ || (block_iter_points_to_real_block_ && block_iter_.Valid())); } // For block cache readahead lookup scenario - // If is_at_first_key_from_index_ is true, InitDataBlock hasn't been // called. It means block_handles is empty and index_ point to current block. // So index_iter_ can be accessed directly. Slice key() const override { assert(Valid()); if (is_at_first_key_from_index_) { assert(!multi_scan_); return index_iter_->value().first_internal_key; } else { return block_iter_.key(); } } Slice user_key() const override { assert(Valid()); if (is_at_first_key_from_index_) { return ExtractUserKey(index_iter_->value().first_internal_key); } else { return block_iter_.user_key(); } } bool PrepareValue() override { assert(Valid()); if (!is_at_first_key_from_index_) { return true; } return const_cast(this) ->MaterializeCurrentBlock(); } uint64_t write_unix_time() const override { assert(Valid()); ParsedInternalKey pikey; SequenceNumber seqno; const SeqnoToTimeMapping& seqno_to_time_mapping = table_->GetSeqnoToTimeMapping(); Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false); if (!s.ok()) { return std::numeric_limits::max(); } else if (kUnknownSeqnoBeforeAll == pikey.sequence) { return kUnknownTimeBeforeAll; } else if (seqno_to_time_mapping.Empty()) { return std::numeric_limits::max(); } else if (kTypeValuePreferredSeqno == pikey.type) { seqno = ParsePackedValueForSeqno(value()); } else { seqno = pikey.sequence; } return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno); } Slice value() const override { // PrepareValue() must have been called. assert(!is_at_first_key_from_index_); assert(Valid()); if (seek_stat_state_ & kReportOnUseful) { bool filter_used = (seek_stat_state_ & kFilterUsed) != 0; RecordTick( table_->GetStatistics(), filter_used ? (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH : NON_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH) : (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER : NON_LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER)); seek_stat_state_ = kDataBlockReadSinceLastSeek; } return block_iter_.value(); } Status status() const override { if (!multi_scan_status_.ok()) { return multi_scan_status_; } // In case of block cache readahead lookup, it won't add the block to // block_handles if it's index is invalid. So index_iter_->status check can // be skipped. // Prefix index set status to NotFound when the prefix does not exist. if (IsIndexAtCurr() && !index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { assert(!multi_scan_); return index_iter_->status(); } else if (block_iter_points_to_real_block_) { // This is the common case. return block_iter_.status(); } else if (async_read_in_progress_) { assert(!multi_scan_); return Status::TryAgain("Async read in progress"); } else if (multi_scan_) { return multi_scan_status_; } else { return Status::OK(); } } inline IterBoundCheck UpperBoundCheckResult() override { if (is_out_of_bound_) { return IterBoundCheck::kOutOfBound; } else if (block_upper_bound_check_ == BlockUpperBound::kUpperBoundBeyondCurBlock) { assert(!is_out_of_bound_); // MultiScan does not do block level upper bound check yet. assert(!multi_scan_); return IterBoundCheck::kInbound; } else { return IterBoundCheck::kUnknown; } } void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { pinned_iters_mgr_ = pinned_iters_mgr; } bool IsKeyPinned() const override { // Our key comes either from block_iter_'s current key // or index_iter_'s current *value*. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) || (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); } bool IsValuePinned() const override { assert(!is_at_first_key_from_index_); assert(Valid()); // BlockIter::IsValuePinned() is always true. No need to check return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && block_iter_points_to_real_block_; } void ResetDataIter() { if (block_iter_points_to_real_block_) { if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); } block_iter_.Invalidate(Status::OK()); block_iter_points_to_real_block_ = false; } block_upper_bound_check_ = BlockUpperBound::kUnknown; } void SavePrevIndexValue() { if (block_iter_points_to_real_block_ && IsIndexAtCurr()) { // Reseek. If they end up with the same data block, we shouldn't re-fetch // the same data block. prev_block_offset_ = index_iter_->value().handle.offset(); } } void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { if (block_prefetcher_.prefetch_buffer() != nullptr && read_options_.adaptive_readahead) { block_prefetcher_.prefetch_buffer()->GetReadaheadState( &(readahead_file_info->data_block_readahead_info)); if (index_iter_) { index_iter_->GetReadaheadState(readahead_file_info); } } } void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { if (read_options_.adaptive_readahead) { block_prefetcher_.SetReadaheadState( &(readahead_file_info->data_block_readahead_info)); if (index_iter_) { index_iter_->SetReadaheadState(readahead_file_info); } } } void Prepare(const MultiScanArgs* scan_opts) override; FilePrefetchBuffer* prefetch_buffer() { return block_prefetcher_.prefetch_buffer(); } std::unique_ptr> index_iter_; bool TEST_IsBlockPinnedByMultiScan(size_t block_idx) { if (!multi_scan_) { return false; } if (block_idx >= multi_scan_->pinned_data_blocks.size()) { return false; } return !multi_scan_->pinned_data_blocks[block_idx].IsEmpty(); } private: enum class IterDirection { kForward, kBackward, }; // This enum indicates whether the upper bound falls into current block // or beyond. // +-------------+ // | cur block | <-- (1) // +-------------+ // <-- (2) // --- --- // <-- (3) // +-------------+ // | next block | <-- (4) // ...... // // When the block is smaller than , kUpperBoundInCurBlock // is the value to use. The examples are (1) or (2) in the graph. It means // all keys in the next block or beyond will be out of bound. Keys within // the current block may or may not be out of bound. // When the block is larger or equal to , // kUpperBoundBeyondCurBlock is to be used. The examples are (3) and (4) // in the graph. It means that all keys in the current block is within the // upper bound and keys in the next block may or may not be within the uppder // bound. // If the boundary key hasn't been checked against the upper bound, // kUnknown can be used. enum class BlockUpperBound : uint8_t { kUpperBoundInCurBlock, kUpperBoundBeyondCurBlock, kUnknown, }; // State bits for collecting stats on seeks and whether they returned useful // results. enum SeekStatState : uint8_t { kNone = 0, // Most recent seek checked prefix filter (or similar future feature) kFilterUsed = 1 << 0, // Already recorded that a data block was accessed since the last seek. kDataBlockReadSinceLastSeek = 1 << 1, // Have not yet recorded that a value() was accessed. kReportOnUseful = 1 << 2, }; // BlockHandleInfo is used to store the info needed when block cache lookup // ahead is enabled to tune readahead_size. struct BlockHandleInfo { void SetFirstInternalKey(const Slice& key) { if (key.empty()) { return; } size_t size = key.size(); buf_ = std::unique_ptr(new char[size]); memcpy(buf_.get(), key.data(), size); first_internal_key_ = Slice(buf_.get(), size); } BlockHandle handle_; bool is_cache_hit_ = false; CachableEntry cachable_entry_; Slice first_internal_key_; std::unique_ptr buf_; }; bool IsIndexAtCurr() const { return is_index_at_curr_block_; } const BlockBasedTable* table_; const ReadOptions& read_options_; const InternalKeyComparator& icomp_; UserComparatorWrapper user_comparator_; PinnedIteratorsManager* pinned_iters_mgr_; DataBlockIter block_iter_; const SliceTransform* prefix_extractor_; uint64_t prev_block_offset_ = std::numeric_limits::max(); BlockCacheLookupContext lookup_context_; BlockPrefetcher block_prefetcher_; // It stores all the block handles that are lookuped in cache ahead when // BlockCacheLookupForReadAheadSize is called. Since index_iter_ may point to // different blocks when readahead_size is calculated in // BlockCacheLookupForReadAheadSize, to avoid index_iter_ reseek, // block_handles_ is used. // `block_handles_` is lazily constructed to save CPU when it is unused std::unique_ptr> block_handles_; // The prefix of the key called with SeekImpl(). // This is for readahead trimming so no data blocks containing keys of a // different prefix are prefetched std::string seek_key_prefix_for_readahead_trimming_ = ""; const bool allow_unprepared_value_; // How current data block's boundary key with the next block is compared with // iterate upper bound. BlockUpperBound block_upper_bound_check_ = BlockUpperBound::kUnknown; // True if we're standing at the first key of a block, and we haven't loaded // that block yet. A call to PrepareValue() will trigger loading the block. bool is_at_first_key_from_index_ = false; bool check_filter_; // TODO(Zhongyi): pick a better name bool need_upper_bound_check_; bool async_read_in_progress_; mutable SeekStatState seek_stat_state_ = SeekStatState::kNone; bool is_last_level_; // If set to true, it'll lookup in the cache ahead to estimate the readahead // size based on cache hit and miss. bool readahead_cache_lookup_ = false; bool is_index_out_of_bound_ = false; // Used in case of auto_readahead_size to disable the block_cache lookup if // direction is reversed from forward to backward. In case of backward // direction, SeekForPrev or Prev might call Seek from db_iter. So direction // is used to disable the lookup. IterDirection direction_ = IterDirection::kForward; //*** BEGIN States used by both regular scan and multiscan // True if block_iter_ is initialized and points to the same block // as index iterator. bool block_iter_points_to_real_block_; // See InternalIteratorBase::IsOutOfBound(). bool is_out_of_bound_ = false; // Mark prepared ranges as exhausted for multiscan. void MarkPreparedRangeExhausted() { assert(multi_scan_ != nullptr); if (multi_scan_->next_scan_idx < multi_scan_->block_index_ranges_per_scan.size()) { // If there are more prepared ranges, we don't ResetDataIter() here, // because next scan might be reading from the same block. ResetDataIter() // will free the underlying block cache handle and we don't want the // block to be unpinned. // Set out of bound to mark the current prepared range as exhausted. is_out_of_bound_ = true; } else { // This is the last prepared range of this file, there might be more // data on next file. Reset data iterator to indicate the iterator is // no longer valid on this file. Let LevelIter advance to the next file // instead of ending the scan. ResetDataIter(); } } // During cache lookup to find readahead size, index_iter_ is iterated and it // can point to a different block. // If Prepare() is called, index_iter_ is used to prefetch data blocks for the // multiscan, so is_index_at_curr_block_ will be false. // Whether index is expected to match the current data_block_iter_. bool is_index_at_curr_block_ = true; // *** END States used by both regular scan and multiscan // *** BEGIN MultiScan related states *** struct AsyncReadState { std::unique_ptr buf{nullptr}; // Indices into pinned_data_blocks that this request reads. std::vector block_indices; // BlockHandle for each block in block_indices. std::vector blocks; void* io_handle{nullptr}; IOHandleDeleter del_fn{nullptr}; // offset for this async read request. uint64_t offset{0}; // These two states are populated from the FSReadRequest // by ReadAsync callback Status status; Slice result; // For direct I/O support AlignedBuf aligned_buf{nullptr}; bool finished{false}; AsyncReadState() = default; DECLARE_DEFAULT_MOVES(AsyncReadState); // Delete copy operations AsyncReadState(const AsyncReadState&) = delete; AsyncReadState& operator=(const AsyncReadState&) = delete; void CleanUpIOHandle() { if (io_handle != nullptr) { assert(del_fn); del_fn(io_handle); io_handle = nullptr; } finished = true; } ~AsyncReadState() { // Should be cleaned up before destruction. assert(io_handle == nullptr); } }; struct MultiScanState { // For Aborting async I/Os in destructor. const std::shared_ptr fs; const MultiScanArgs* scan_opts; std::vector> pinned_data_blocks; // The separator of each data block in above pinned_data_blocks vector. // Its size is same as pinned_data_blocks. // The value of separator is larger than or equal to the last key in the // corresponding data block. std::vector data_block_separators; // Track previously seeked key in multi-scan. // This is used to ensure that the seek key is keep moving forward, as // blocks that are smaller than the seek key are unpinned from memory. std::string prev_seek_key_; // Indicies into pinned_data_blocks for data blocks for each scan range. // inclusive start, exclusive end std::vector> block_index_ranges_per_scan; size_t next_scan_idx; size_t cur_data_block_idx; // States for async reads. // // Each async state correspond to an async read request. // Each async read request may read content for multiple blocks // (potentially coalesced). In PollForBlock(idx), we will poll for the // completion of the async read request responsible for // pinned_data_blocks[idx], and populate `pinned_data_blocks` with all the // blocks read. To find out the async read request responsible for // pinned_data_blocks[idx], we store the mapping in // block_idx_to_readreq_idx. Index i is in block_idx_to_readreq_idx and // block_idx_to_readreq_idx[i] = j iff pinned_data_blocks[i] is read by // async_states[j]. std::vector async_states; UnorderedMap block_idx_to_readreq_idx; size_t prefetch_max_idx; MultiScanState( const std::shared_ptr& _fs, const MultiScanArgs* _scan_opts, std::vector>&& _pinned_data_blocks, std::vector&& _data_block_separators, std::vector>&& _block_index_ranges_per_scan, UnorderedMap&& _block_idx_to_readreq_idx, std::vector&& _async_states, size_t _prefetch_max_idx) : fs(_fs), scan_opts(_scan_opts), pinned_data_blocks(std::move(_pinned_data_blocks)), data_block_separators(std::move(_data_block_separators)), block_index_ranges_per_scan(std::move(_block_index_ranges_per_scan)), next_scan_idx(0), cur_data_block_idx(0), async_states(std::move(_async_states)), block_idx_to_readreq_idx(std::move(_block_idx_to_readreq_idx)), prefetch_max_idx(_prefetch_max_idx) {} ~MultiScanState(); }; Status multi_scan_status_; std::unique_ptr multi_scan_; // *** END MultiScan related APIs and states *** void SeekSecondPass(const Slice* target); // If `target` is null, seek to first. void SeekImpl(const Slice* target, bool async_prefetch); void InitDataBlock(); void AsyncInitDataBlock(bool is_first_pass); bool MaterializeCurrentBlock(); void FindKeyForward(); void FindBlockForward(); void FindKeyBackward(); void CheckOutOfBound(); // Check if data block is fully within iterate_upper_bound. // // Note MyRocks may update iterate bounds between seek. To workaround it, // we need to check and update data_block_within_upper_bound_ accordingly. void CheckDataBlockWithinUpperBound(); bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction, bool* filter_checked) { if (need_upper_bound_check_ && direction == IterDirection::kBackward) { // Upper bound check isn't sufficient for backward direction to // guarantee the same result as total order, so disable prefix // check. return true; } if (check_filter_ && !table_->PrefixRangeMayMatch(ikey, read_options_, prefix_extractor_, need_upper_bound_check_, &lookup_context_, filter_checked)) { // TODO remember the iterator is invalidated because of prefix // match. This can avoid the upper level file iterator to falsely // believe the position is the end of the SST file and move to // the first key of the next file. ResetDataIter(); return false; } return true; } // *** BEGIN APIs relevant to auto tuning of readahead_size *** // This API is called to lookup the data blocks ahead in the cache to tune // the start and end offsets passed. void BlockCacheLookupForReadAheadSize(bool read_curr_block, uint64_t& start_offset, uint64_t& end_offset); void ResetBlockCacheLookupVar() { is_index_out_of_bound_ = false; readahead_cache_lookup_ = false; ClearBlockHandles(); } bool IsNextBlockOutOfReadaheadBound() { const Slice& index_iter_user_key = index_iter_->user_key(); // If curr block's index key >= iterate_upper_bound, it means all the keys // in next block or above are out of bound. bool out_of_upper_bound = read_options_.iterate_upper_bound != nullptr && (user_comparator_.CompareWithoutTimestamp( index_iter_user_key, /*a_has_ts=*/true, *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0 ? true : false); if (out_of_upper_bound) { return true; } // If curr block's index key has a different prefix from the seek key's, it // means all the keys in next block or above has a different prefix from the // seek key's. bool out_of_prefix_bound = (read_options_.prefix_same_as_start && !seek_key_prefix_for_readahead_trimming_.empty() && (prefix_extractor_->InDomain(index_iter_user_key) ? (prefix_extractor_->Transform(index_iter_user_key) .compare(seek_key_prefix_for_readahead_trimming_) != 0) : user_comparator_.CompareWithoutTimestamp( index_iter_user_key, /*a_has_ts=*/true, seek_key_prefix_for_readahead_trimming_, /*b_has_ts=*/false) > 0)); if (out_of_prefix_bound) { return true; } return false; } void ClearBlockHandles() { if (block_handles_ != nullptr) { block_handles_->clear(); } } // Reset prev_block_offset_. If index_iter_ has moved ahead, it won't get // accurate prev_block_offset_. void ResetPreviousBlockOffset() { prev_block_offset_ = std::numeric_limits::max(); } bool DoesContainBlockHandles() { return block_handles_ != nullptr && !block_handles_->empty(); } void InitializeStartAndEndOffsets(bool read_curr_block, bool& found_first_miss_block, uint64_t& start_updated_offset, uint64_t& end_updated_offset, size_t& prev_handles_size); // *** END APIs relevant to auto tuning of readahead_size *** // *** BEGIN APIs relevant to multiscan *** void SeekMultiScan(const Slice* target); void FindBlockForwardInMultiScan(); void PrepareReadAsyncCallBack(FSReadRequest& req, void* cb_arg) { // Record status, result and sanity check offset from `req`. AsyncReadState* async_state = static_cast(cb_arg); async_state->status = req.status; async_state->result = req.result; if (async_state->status.ok()) { assert(async_state->offset == req.offset); if (async_state->offset != req.offset) { async_state->status = Status::InvalidArgument( "offset mismatch between async read request " + std::to_string(async_state->offset) + " and async callback " + std::to_string(req.offset)); } } else { assert(async_state->status.IsAborted()); } } void MultiScanSeekTargetFromBlock(const Slice* seek_target, size_t block_idx); void MultiScanUnexpectedSeekTarget(const Slice* seek_target, const Slice* user_seek_target, size_t block_idx); // Return true, if there is an error, or end of file bool MultiScanLoadDataBlock(size_t idx) { if (idx >= multi_scan_->prefetch_max_idx) { // TODO: Fix the max_prefetch_size support for multiple files. // The goal is to limit the memory usage, prefetch could be done // incrementally. if (multi_scan_->scan_opts->max_prefetch_size == 0) { // If max_prefetch_size is not set, treat this as end of file. ResetDataIter(); assert(!is_out_of_bound_); assert(!Valid()); } else { // If max_prefetch_size is set, treat this as error. multi_scan_status_ = Status::PrefetchLimitReached(); } return true; } if (!multi_scan_->async_states.empty()) { multi_scan_status_ = PollForBlock(idx); if (!multi_scan_status_.ok()) { return true; } } // This block should have been initialized assert(multi_scan_->pinned_data_blocks[idx].GetValue()); // Note that the block_iter_ takes ownership of the pinned data block // TODO: we can delegate the clean up like with pinned_iters_mgr_ if // need to pin blocks longer. table_->NewDataBlockIterator( read_options_, multi_scan_->pinned_data_blocks[idx], &block_iter_, Status::OK()); return false; } // After PollForBlock(idx), the async request that contains // pinned_data_blocks[idx] should be done, and all blocks contained in this // read request will be initialzed in pinned_data_blocks and pinned in block // cache. Status PollForBlock(size_t idx); // Helper function to create and pin a block in cache from buffer data // Handles decompressor setup with dictionary loading and block // creation/pinning. The buffer_start_offset is the file offset where // buffer_data starts. Status CreateAndPinBlockFromBuffer(const BlockHandle& block, uint64_t buffer_start_offset, const Slice& buffer_data, CachableEntry& pinned_block_entry); Status CollectBlockHandles( const std::vector& scan_opts, std::vector* scan_block_handles, std::vector>* block_index_ranges_per_scan, std::vector* data_block_boundary_keys); Status FilterAndPinCachedBlocks( const std::vector& scan_block_handles, const MultiScanArgs* multiscan_opts, std::vector* block_indices_to_read, std::vector>* pinned_data_blocks_guard, size_t* prefetched_max_idx); void PrepareIORequests( const std::vector& block_indices_to_read, const std::vector& scan_block_handles, const MultiScanArgs* multiscan_opts, std::vector* read_reqs, UnorderedMap* block_idx_to_readreq_idx, std::vector>* coalesced_block_indices); Status ExecuteIO( const std::vector& scan_block_handles, const MultiScanArgs* multiscan_opts, const std::vector>& coalesced_block_indices, std::vector* read_reqs, std::vector* async_states, std::vector>* pinned_data_blocks_guard); // *** END APIs relevant to multiscan *** }; } // namespace ROCKSDB_NAMESPACE