| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "table/block_based/block_based_table_iterator.h"
- namespace ROCKSDB_NAMESPACE {
- void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); }
- void BlockBasedTableIterator::Seek(const Slice& target) {
- SeekImpl(&target, true);
- }
- void BlockBasedTableIterator::SeekSecondPass(const Slice* target) {
- AsyncInitDataBlock(/*is_first_pass=*/false);
- if (target) {
- block_iter_.Seek(*target);
- } else {
- block_iter_.SeekToFirst();
- }
- FindKeyForward();
- CheckOutOfBound();
- if (target) {
- assert(!Valid() || icomp_.Compare(*target, key()) <= 0);
- }
- }
- void BlockBasedTableIterator::SeekImpl(const Slice* target,
- bool async_prefetch) {
- // TODO(hx235): set `seek_key_prefix_for_readahead_trimming_`
- // even when `target == nullptr` that is when `SeekToFirst()` is called
- if (!multi_scan_status_.ok()) {
- return;
- }
- if (multi_scan_) {
- SeekMultiScan(target);
- return;
- }
- if (target != nullptr && prefix_extractor_ &&
- read_options_.prefix_same_as_start) {
- const Slice& seek_user_key = ExtractUserKey(*target);
- seek_key_prefix_for_readahead_trimming_ =
- prefix_extractor_->InDomain(seek_user_key)
- ? prefix_extractor_->Transform(seek_user_key).ToString()
- : "";
- }
- bool is_first_pass = !async_read_in_progress_;
- if (!is_first_pass) {
- SeekSecondPass(target);
- return;
- }
- ResetBlockCacheLookupVar();
- bool autotune_readaheadsize =
- read_options_.auto_readahead_size &&
- (read_options_.iterate_upper_bound || read_options_.prefix_same_as_start);
- if (autotune_readaheadsize &&
- table_->get_rep()->table_options.block_cache.get() &&
- direction_ == IterDirection::kForward) {
- readahead_cache_lookup_ = true;
- }
- is_out_of_bound_ = false;
- is_at_first_key_from_index_ = false;
- seek_stat_state_ = kNone;
- bool filter_checked = false;
- if (target &&
- !CheckPrefixMayMatch(*target, IterDirection::kForward, &filter_checked)) {
- ResetDataIter();
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_FILTERED
- : NON_LAST_LEVEL_SEEK_FILTERED);
- return;
- }
- if (filter_checked) {
- seek_stat_state_ = kFilterUsed;
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_FILTER_MATCH
- : NON_LAST_LEVEL_SEEK_FILTER_MATCH);
- }
- bool need_seek_index = true;
- // In case of readahead_cache_lookup_, index_iter_ could change to find the
- // readahead size in BlockCacheLookupForReadAheadSize so it needs to
- // reseek.
- if (IsIndexAtCurr() && block_iter_points_to_real_block_ &&
- block_iter_.Valid()) {
- // Reseek.
- prev_block_offset_ = index_iter_->value().handle.offset();
- if (target) {
- // We can avoid an index seek if:
- // 1. The new seek key is larger than the current key
- // 2. The new seek key is within the upper bound of the block
- // Since we don't necessarily know the internal key for either
- // the current key or the upper bound, we check user keys and
- // exclude the equality case. Considering internal keys can
- // improve for the boundary cases, but it would complicate the
- // code.
- if (user_comparator_.Compare(ExtractUserKey(*target),
- block_iter_.user_key()) > 0 &&
- user_comparator_.Compare(ExtractUserKey(*target),
- index_iter_->user_key()) < 0) {
- need_seek_index = false;
- }
- }
- }
- if (need_seek_index) {
- if (target) {
- index_iter_->Seek(*target);
- } else {
- index_iter_->SeekToFirst();
- }
- is_index_at_curr_block_ = true;
- if (!index_iter_->Valid()) {
- ResetDataIter();
- return;
- }
- }
- // After reseek, index_iter_ point to the right key i.e. target in
- // case of readahead_cache_lookup_. So index_iter_ can be used directly.
- IndexValue v = index_iter_->value();
- const bool same_block = block_iter_points_to_real_block_ &&
- v.handle.offset() == prev_block_offset_;
- if (!v.first_internal_key.empty() && !same_block &&
- (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) &&
- allow_unprepared_value_) {
- // Index contains the first key of the block, and it's >= target.
- // We can defer reading the block.
- is_at_first_key_from_index_ = true;
- // ResetDataIter() will invalidate block_iter_. Thus, there is no need to
- // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
- // as that will be done later when the data block is actually read.
- ResetDataIter();
- } else {
- // Need to use the data block.
- if (!same_block) {
- if (read_options_.async_io && async_prefetch) {
- AsyncInitDataBlock(/*is_first_pass=*/true);
- if (async_read_in_progress_) {
- // Status::TryAgain indicates asynchronous request for retrieval of
- // data blocks has been submitted. So it should return at this point
- // and Seek should be called again to retrieve the requested block
- // and execute the remaining code.
- return;
- }
- } else {
- InitDataBlock();
- }
- } else {
- // When the user does a reseek, the iterate_upper_bound might have
- // changed. CheckDataBlockWithinUpperBound() needs to be called
- // explicitly if the reseek ends up in the same data block.
- // If the reseek ends up in a different block, InitDataBlock() will do
- // the iterator upper bound check.
- CheckDataBlockWithinUpperBound();
- }
- if (target) {
- block_iter_.Seek(*target);
- } else {
- block_iter_.SeekToFirst();
- }
- FindKeyForward();
- }
- CheckOutOfBound();
- if (target) {
- assert(!Valid() || icomp_.Compare(*target, key()) <= 0);
- }
- }
- void BlockBasedTableIterator::SeekForPrev(const Slice& target) {
- multi_scan_.reset();
- direction_ = IterDirection::kBackward;
- ResetBlockCacheLookupVar();
- is_out_of_bound_ = false;
- is_at_first_key_from_index_ = false;
- seek_stat_state_ = kNone;
- bool filter_checked = false;
- // For now totally disable prefix seek in auto prefix mode because we don't
- // have logic
- if (!CheckPrefixMayMatch(target, IterDirection::kBackward, &filter_checked)) {
- ResetDataIter();
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_FILTERED
- : NON_LAST_LEVEL_SEEK_FILTERED);
- return;
- }
- if (filter_checked) {
- seek_stat_state_ = kFilterUsed;
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_FILTER_MATCH
- : NON_LAST_LEVEL_SEEK_FILTER_MATCH);
- }
- SavePrevIndexValue();
- // Call Seek() rather than SeekForPrev() in the index block, because the
- // target data block will likely to contain the position for `target`, the
- // same as Seek(), rather than than before.
- // For example, if we have three data blocks, each containing two keys:
- // [2, 4] [6, 8] [10, 12]
- // (the keys in the index block would be [4, 8, 12])
- // and the user calls SeekForPrev(7), we need to go to the second block,
- // just like if they call Seek(7).
- // The only case where the block is difference is when they seek to a position
- // in the boundary. For example, if they SeekForPrev(5), we should go to the
- // first block, rather than the second. However, we don't have the information
- // to distinguish the two unless we read the second block. In this case, we'll
- // end up with reading two blocks.
- index_iter_->Seek(target);
- is_index_at_curr_block_ = true;
- if (!index_iter_->Valid()) {
- auto seek_status = index_iter_->status();
- // Check for IO error
- if (!seek_status.IsNotFound() && !seek_status.ok()) {
- ResetDataIter();
- return;
- }
- // With prefix index, Seek() returns NotFound if the prefix doesn't exist
- if (seek_status.IsNotFound()) {
- // Any key less than the target is fine for prefix seek
- ResetDataIter();
- return;
- } else {
- index_iter_->SeekToLast();
- }
- // Check for IO error
- if (!index_iter_->Valid()) {
- ResetDataIter();
- return;
- }
- }
- InitDataBlock();
- block_iter_.SeekForPrev(target);
- FindKeyBackward();
- CheckDataBlockWithinUpperBound();
- assert(!block_iter_.Valid() ||
- icomp_.Compare(target, block_iter_.key()) >= 0);
- }
- void BlockBasedTableIterator::SeekToLast() {
- multi_scan_.reset();
- direction_ = IterDirection::kBackward;
- ResetBlockCacheLookupVar();
- is_out_of_bound_ = false;
- is_at_first_key_from_index_ = false;
- seek_stat_state_ = kNone;
- SavePrevIndexValue();
- index_iter_->SeekToLast();
- is_index_at_curr_block_ = true;
- if (!index_iter_->Valid()) {
- ResetDataIter();
- return;
- }
- InitDataBlock();
- block_iter_.SeekToLast();
- FindKeyBackward();
- CheckDataBlockWithinUpperBound();
- }
- void BlockBasedTableIterator::Next() {
- assert(Valid());
- if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) {
- assert(!multi_scan_);
- return;
- }
- assert(block_iter_points_to_real_block_);
- block_iter_.Next();
- FindKeyForward();
- CheckOutOfBound();
- }
- bool BlockBasedTableIterator::NextAndGetResult(IterateResult* result) {
- Next();
- bool is_valid = Valid();
- if (is_valid) {
- result->key = key();
- result->bound_check_result = UpperBoundCheckResult();
- result->value_prepared = !is_at_first_key_from_index_;
- }
- return is_valid;
- }
- void BlockBasedTableIterator::Prev() {
- assert(!multi_scan_);
- if ((readahead_cache_lookup_ && !IsIndexAtCurr()) || multi_scan_) {
- multi_scan_.reset();
- // In case of readahead_cache_lookup_, index_iter_ has moved forward. So we
- // need to reseek the index_iter_ to point to current block by using
- // block_iter_'s key.
- if (Valid()) {
- ResetBlockCacheLookupVar();
- direction_ = IterDirection::kBackward;
- Slice last_key = key();
- index_iter_->Seek(last_key);
- is_index_at_curr_block_ = true;
- // Check for IO error.
- if (!index_iter_->Valid()) {
- ResetDataIter();
- return;
- }
- }
- if (!Valid()) {
- ResetDataIter();
- return;
- }
- }
- ResetBlockCacheLookupVar();
- if (is_at_first_key_from_index_) {
- is_at_first_key_from_index_ = false;
- index_iter_->Prev();
- if (!index_iter_->Valid()) {
- return;
- }
- InitDataBlock();
- block_iter_.SeekToLast();
- } else {
- assert(block_iter_points_to_real_block_);
- block_iter_.Prev();
- }
- FindKeyBackward();
- }
- void BlockBasedTableIterator::InitDataBlock() {
- BlockHandle data_block_handle;
- bool is_in_cache = false;
- bool use_block_cache_for_lookup = true;
- if (DoesContainBlockHandles()) {
- data_block_handle = block_handles_->front().handle_;
- is_in_cache = block_handles_->front().is_cache_hit_;
- use_block_cache_for_lookup = false;
- } else {
- data_block_handle = index_iter_->value().handle;
- }
- if (!block_iter_points_to_real_block_ ||
- data_block_handle.offset() != prev_block_offset_ ||
- // if previous attempt of reading the block missed cache, try again
- block_iter_.status().IsIncomplete()) {
- if (block_iter_points_to_real_block_) {
- ResetDataIter();
- }
- bool is_for_compaction =
- lookup_context_.caller == TableReaderCaller::kCompaction;
- // Initialize Data Block From CacheableEntry.
- if (is_in_cache) {
- Status s;
- block_iter_.Invalidate(Status::OK());
- table_->NewDataBlockIterator<DataBlockIter>(
- read_options_, (block_handles_->front().cachable_entry_).As<Block>(),
- &block_iter_, s);
- } else {
- auto* rep = table_->get_rep();
- std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb =
- nullptr;
- if (readahead_cache_lookup_) {
- readaheadsize_cb = std::bind(
- &BlockBasedTableIterator::BlockCacheLookupForReadAheadSize, this,
- std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3);
- }
- // Prefetch additional data for range scans (iterators).
- // Implicit auto readahead:
- // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
- // Explicit user requested readahead:
- // Enabled from the very first IO when ReadOptions.readahead_size is
- // set.
- block_prefetcher_.PrefetchIfNeeded(
- rep, data_block_handle, read_options_.readahead_size,
- is_for_compaction,
- /*no_sequential_checking=*/false, read_options_, readaheadsize_cb,
- read_options_.async_io);
- Status s;
- table_->NewDataBlockIterator<DataBlockIter>(
- read_options_, data_block_handle, &block_iter_, BlockType::kData,
- /*get_context=*/nullptr, &lookup_context_,
- block_prefetcher_.prefetch_buffer(),
- /*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
- use_block_cache_for_lookup);
- }
- block_iter_points_to_real_block_ = true;
- CheckDataBlockWithinUpperBound();
- if (!is_for_compaction &&
- (seek_stat_state_ & kDataBlockReadSinceLastSeek) == 0) {
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_DATA
- : NON_LAST_LEVEL_SEEK_DATA);
- seek_stat_state_ = static_cast<SeekStatState>(
- seek_stat_state_ | kDataBlockReadSinceLastSeek | kReportOnUseful);
- }
- }
- }
- void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
- BlockHandle data_block_handle;
- bool is_for_compaction =
- lookup_context_.caller == TableReaderCaller::kCompaction;
- if (is_first_pass) {
- data_block_handle = index_iter_->value().handle;
- if (!block_iter_points_to_real_block_ ||
- data_block_handle.offset() != prev_block_offset_ ||
- // if previous attempt of reading the block missed cache, try again
- block_iter_.status().IsIncomplete()) {
- if (block_iter_points_to_real_block_) {
- ResetDataIter();
- }
- auto* rep = table_->get_rep();
- std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb =
- nullptr;
- if (readahead_cache_lookup_) {
- readaheadsize_cb = std::bind(
- &BlockBasedTableIterator::BlockCacheLookupForReadAheadSize, this,
- std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3);
- }
- // Prefetch additional data for range scans (iterators).
- // Implicit auto readahead:
- // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
- // Explicit user requested readahead:
- // Enabled from the very first IO when ReadOptions.readahead_size is
- // set.
- // In case of async_io with Implicit readahead, block_prefetcher_ will
- // always the create the prefetch buffer by setting no_sequential_checking
- // = true.
- block_prefetcher_.PrefetchIfNeeded(
- rep, data_block_handle, read_options_.readahead_size,
- is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
- read_options_, readaheadsize_cb, read_options_.async_io);
- Status s;
- table_->NewDataBlockIterator<DataBlockIter>(
- read_options_, data_block_handle, &block_iter_, BlockType::kData,
- /*get_context=*/nullptr, &lookup_context_,
- block_prefetcher_.prefetch_buffer(),
- /*for_compaction=*/is_for_compaction, /*async_read=*/true, s,
- /*use_block_cache_for_lookup=*/true);
- if (s.IsTryAgain()) {
- async_read_in_progress_ = true;
- return;
- }
- }
- } else {
- // Second pass will call the Poll to get the data block which has been
- // requested asynchronously.
- bool is_in_cache = false;
- if (DoesContainBlockHandles()) {
- data_block_handle = block_handles_->front().handle_;
- is_in_cache = block_handles_->front().is_cache_hit_;
- } else {
- data_block_handle = index_iter_->value().handle;
- }
- Status s;
- // Initialize Data Block From CacheableEntry.
- if (is_in_cache) {
- block_iter_.Invalidate(Status::OK());
- table_->NewDataBlockIterator<DataBlockIter>(
- read_options_, (block_handles_->front().cachable_entry_).As<Block>(),
- &block_iter_, s);
- } else {
- table_->NewDataBlockIterator<DataBlockIter>(
- read_options_, data_block_handle, &block_iter_, BlockType::kData,
- /*get_context=*/nullptr, &lookup_context_,
- block_prefetcher_.prefetch_buffer(),
- /*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
- /*use_block_cache_for_lookup=*/false);
- }
- }
- block_iter_points_to_real_block_ = true;
- CheckDataBlockWithinUpperBound();
- if (!is_for_compaction &&
- (seek_stat_state_ & kDataBlockReadSinceLastSeek) == 0) {
- RecordTick(table_->GetStatistics(), is_last_level_
- ? LAST_LEVEL_SEEK_DATA
- : NON_LAST_LEVEL_SEEK_DATA);
- seek_stat_state_ = static_cast<SeekStatState>(
- seek_stat_state_ | kDataBlockReadSinceLastSeek | kReportOnUseful);
- }
- async_read_in_progress_ = false;
- }
- bool BlockBasedTableIterator::MaterializeCurrentBlock() {
- assert(is_at_first_key_from_index_);
- assert(!block_iter_points_to_real_block_);
- assert(index_iter_->Valid());
- is_at_first_key_from_index_ = false;
- InitDataBlock();
- assert(block_iter_points_to_real_block_);
- if (!block_iter_.status().ok()) {
- return false;
- }
- block_iter_.SeekToFirst();
- // MaterializeCurrentBlock is called when block is actually read by
- // calling InitDataBlock. is_at_first_key_from_index_ will be false for block
- // handles placed in blockhandle. So index_ will be pointing to current block.
- // After InitDataBlock, index_iter_ can point to different block if
- // BlockCacheLookupForReadAheadSize is called.
- Slice first_internal_key;
- if (DoesContainBlockHandles()) {
- first_internal_key = block_handles_->front().first_internal_key_;
- } else {
- first_internal_key = index_iter_->value().first_internal_key;
- }
- if (!block_iter_.Valid() ||
- icomp_.Compare(block_iter_.key(), first_internal_key) != 0) {
- block_iter_.Invalidate(Status::Corruption(
- "first key in index doesn't match first key in block"));
- return false;
- }
- return true;
- }
- void BlockBasedTableIterator::FindKeyForward() {
- // This method's code is kept short to make it likely to be inlined.
- assert(!is_out_of_bound_);
- assert(block_iter_points_to_real_block_);
- if (!block_iter_.Valid()) {
- // This is the only call site of FindBlockForward(), but it's extracted into
- // a separate method to keep FindKeyForward() short and likely to be
- // inlined. When transitioning to a different block, we call
- // FindBlockForward(), which is much longer and is probably not inlined.
- FindBlockForward();
- } else {
- // This is the fast path that avoids a function call.
- }
- }
- void BlockBasedTableIterator::FindBlockForward() {
- if (multi_scan_) {
- FindBlockForwardInMultiScan();
- return;
- }
- // TODO the while loop inherits from two-level-iterator. We don't know
- // whether a block can be empty so it can be replaced by an "if".
- do {
- if (!block_iter_.status().ok()) {
- return;
- }
- // Whether next data block is out of upper bound, if there is one.
- // index_iter_ can point to different block in case of
- // readahead_cache_lookup_. readahead_cache_lookup_ will be handle the
- // upper_bound check.
- bool next_block_is_out_of_bound =
- IsIndexAtCurr() && read_options_.iterate_upper_bound != nullptr &&
- block_iter_points_to_real_block_ &&
- block_upper_bound_check_ == BlockUpperBound::kUpperBoundInCurBlock;
- assert(!next_block_is_out_of_bound ||
- user_comparator_.CompareWithoutTimestamp(
- *read_options_.iterate_upper_bound, /*a_has_ts=*/false,
- index_iter_->user_key(), /*b_has_ts=*/true) <= 0);
- ResetDataIter();
- if (DoesContainBlockHandles()) {
- // Advance and point to that next Block handle to make that block handle
- // current.
- block_handles_->pop_front();
- }
- if (!DoesContainBlockHandles()) {
- // For readahead_cache_lookup_ enabled scenario -
- // 1. In case of Seek, block_handle will be empty and it should be follow
- // as usual doing index_iter_->Next().
- // 2. If block_handles is empty and index is not at current because of
- // lookup (during Next), it should skip doing index_iter_->Next(), as
- // it's already pointing to next block;
- // 3. Last block could be out of bound and it won't iterate over that
- // during BlockCacheLookup. We need to set for that block here.
- if (IsIndexAtCurr() || is_index_out_of_bound_) {
- index_iter_->Next();
- if (is_index_out_of_bound_) {
- next_block_is_out_of_bound = is_index_out_of_bound_;
- is_index_out_of_bound_ = false;
- }
- } else {
- // Skip Next as index_iter_ already points to correct index when it
- // iterates in BlockCacheLookupForReadAheadSize.
- is_index_at_curr_block_ = true;
- }
- if (next_block_is_out_of_bound) {
- // The next block is out of bound. No need to read it.
- TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound",
- nullptr);
- // We need to make sure this is not the last data block before setting
- // is_out_of_bound_, since the index key for the last data block can be
- // larger than smallest key of the next file on the same level.
- if (index_iter_->Valid()) {
- is_out_of_bound_ = true;
- }
- return;
- }
- if (!index_iter_->Valid()) {
- return;
- }
- IndexValue v = index_iter_->value();
- if (!v.first_internal_key.empty() && allow_unprepared_value_) {
- // Index contains the first key of the block. Defer reading the block.
- is_at_first_key_from_index_ = true;
- return;
- }
- }
- InitDataBlock();
- block_iter_.SeekToFirst();
- } while (!block_iter_.Valid());
- }
- void BlockBasedTableIterator::FindKeyBackward() {
- while (!block_iter_.Valid()) {
- if (!block_iter_.status().ok()) {
- return;
- }
- ResetDataIter();
- index_iter_->Prev();
- if (index_iter_->Valid()) {
- InitDataBlock();
- block_iter_.SeekToLast();
- } else {
- return;
- }
- }
- // We could have check lower bound here too, but we opt not to do it for
- // code simplicity.
- }
- void BlockBasedTableIterator::CheckOutOfBound() {
- if (read_options_.iterate_upper_bound != nullptr &&
- block_upper_bound_check_ != BlockUpperBound::kUpperBoundBeyondCurBlock &&
- Valid()) {
- is_out_of_bound_ =
- user_comparator_.CompareWithoutTimestamp(
- *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(),
- /*b_has_ts=*/true) <= 0;
- }
- }
- void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() {
- if (IsIndexAtCurr() && read_options_.iterate_upper_bound != nullptr &&
- block_iter_points_to_real_block_) {
- block_upper_bound_check_ = (user_comparator_.CompareWithoutTimestamp(
- *read_options_.iterate_upper_bound,
- /*a_has_ts=*/false, index_iter_->user_key(),
- /*b_has_ts=*/true) > 0)
- ? BlockUpperBound::kUpperBoundBeyondCurBlock
- : BlockUpperBound::kUpperBoundInCurBlock;
- }
- }
- void BlockBasedTableIterator::InitializeStartAndEndOffsets(
- bool read_curr_block, bool& found_first_miss_block,
- uint64_t& start_updated_offset, uint64_t& end_updated_offset,
- size_t& prev_handles_size) {
- assert(block_handles_ != nullptr);
- prev_handles_size = block_handles_->size();
- size_t footer = table_->get_rep()->footer.GetBlockTrailerSize();
- // It initialize start and end offset to begin which is covered by following
- // scenarios
- if (read_curr_block) {
- if (!DoesContainBlockHandles()) {
- // Scenario 1 : read_curr_block (callback made on miss block which caller
- // was reading) and it has no existing handles in queue. i.e.
- // index_iter_ is pointing to block that is being read by
- // caller.
- //
- // Add current block here as it doesn't need any lookup.
- BlockHandleInfo block_handle_info;
- block_handle_info.handle_ = index_iter_->value().handle;
- block_handle_info.SetFirstInternalKey(
- index_iter_->value().first_internal_key);
- end_updated_offset = block_handle_info.handle_.offset() + footer +
- block_handle_info.handle_.size();
- block_handles_->emplace_back(std::move(block_handle_info));
- index_iter_->Next();
- is_index_at_curr_block_ = false;
- found_first_miss_block = true;
- } else {
- // Scenario 2 : read_curr_block (callback made on miss block which caller
- // was reading) but the queue already has some handles.
- //
- // It can be due to reading error in second buffer in FilePrefetchBuffer.
- // BlockHandles already added to the queue but there was error in fetching
- // those data blocks. So in this call they need to be read again.
- found_first_miss_block = true;
- // Initialize prev_handles_size to 0 as all those handles need to be read
- // again.
- prev_handles_size = 0;
- start_updated_offset = block_handles_->front().handle_.offset();
- end_updated_offset = block_handles_->back().handle_.offset() + footer +
- block_handles_->back().handle_.size();
- }
- } else {
- // Scenario 3 : read_curr_block is false (callback made to do additional
- // prefetching in buffers) and the queue already has some
- // handles from first buffer.
- if (DoesContainBlockHandles()) {
- start_updated_offset = block_handles_->back().handle_.offset() + footer +
- block_handles_->back().handle_.size();
- end_updated_offset = start_updated_offset;
- } else {
- // Scenario 4 : read_curr_block is false (callback made to do additional
- // prefetching in buffers) but the queue has no handle
- // from first buffer.
- //
- // It can be when Reseek is from block cache (which doesn't clear the
- // buffers in FilePrefetchBuffer but clears block handles from queue) and
- // reseek also lies within the buffer. So Next will get data from
- // exisiting buffers untill this callback is made to prefetch additional
- // data. All handles need to be added to the queue starting from
- // index_iter_.
- assert(index_iter_->Valid());
- start_updated_offset = index_iter_->value().handle.offset();
- end_updated_offset = start_updated_offset;
- }
- }
- }
- // BlockCacheLookupForReadAheadSize API lookups in the block cache and tries to
- // reduce the start and end offset passed.
- //
- // Implementation -
- // This function looks into the block cache for the blocks between start_offset
- // and end_offset and add all the handles in the queue.
- // It then iterates from the end to find first miss block and update the end
- // offset to that block.
- // It also iterates from the start and find first miss block and update the
- // start offset to that block.
- //
- // Arguments -
- // start_offset : Offset from which the caller wants to read.
- // end_offset : End offset till which the caller wants to read.
- // read_curr_block : True if this call was due to miss in the cache and
- // caller wants to read that block.
- // False if current call is to prefetch additional data in
- // extra buffers.
- void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
- bool read_curr_block, uint64_t& start_offset, uint64_t& end_offset) {
- uint64_t start_updated_offset = start_offset;
- // readahead_cache_lookup_ can be set false, if after Seek and Next
- // there is SeekForPrev or any other backward operation.
- if (!readahead_cache_lookup_) {
- return;
- }
- size_t footer = table_->get_rep()->footer.GetBlockTrailerSize();
- if (read_curr_block && !DoesContainBlockHandles() &&
- IsNextBlockOutOfReadaheadBound()) {
- end_offset = index_iter_->value().handle.offset() + footer +
- index_iter_->value().handle.size();
- return;
- }
- uint64_t end_updated_offset = start_updated_offset;
- bool found_first_miss_block = false;
- size_t prev_handles_size;
- // Initialize start and end offsets based on exisiting handles in the queue
- // and read_curr_block argument passed.
- if (block_handles_ == nullptr) {
- block_handles_.reset(new std::deque<BlockHandleInfo>());
- }
- InitializeStartAndEndOffsets(read_curr_block, found_first_miss_block,
- start_updated_offset, end_updated_offset,
- prev_handles_size);
- while (index_iter_->Valid() && !is_index_out_of_bound_) {
- BlockHandle block_handle = index_iter_->value().handle;
- // Adding this data block exceeds end offset. So this data
- // block won't be added.
- // There can be a case where passed end offset is smaller than
- // block_handle.size() + footer because of readahead_size truncated to
- // upper_bound. So we prefer to read the block rather than skip it to avoid
- // sync read calls in case of async_io.
- if (start_updated_offset != end_updated_offset &&
- (end_updated_offset + block_handle.size() + footer > end_offset)) {
- break;
- }
- // For current data block, do the lookup in the cache. Lookup should pin the
- // data block in cache.
- BlockHandleInfo block_handle_info;
- block_handle_info.handle_ = index_iter_->value().handle;
- block_handle_info.SetFirstInternalKey(
- index_iter_->value().first_internal_key);
- end_updated_offset += footer + block_handle_info.handle_.size();
- Status s = table_->LookupAndPinBlocksInCache<Block_kData>(
- read_options_, block_handle,
- &(block_handle_info.cachable_entry_).As<Block_kData>());
- if (!s.ok()) {
- #ifndef NDEBUG
- // To allow fault injection verification to pass since non-okay status in
- // `BlockCacheLookupForReadAheadSize()` won't fail the read but to have
- // less or no readahead
- IGNORE_STATUS_IF_ERROR(s);
- #endif
- break;
- }
- block_handle_info.is_cache_hit_ =
- (block_handle_info.cachable_entry_.GetValue() ||
- block_handle_info.cachable_entry_.GetCacheHandle());
- // If this is the first miss block, update start offset to this block.
- if (!found_first_miss_block && !block_handle_info.is_cache_hit_) {
- found_first_miss_block = true;
- start_updated_offset = block_handle_info.handle_.offset();
- }
- // Add the handle to the queue.
- block_handles_->emplace_back(std::move(block_handle_info));
- // Can't figure out for current block if current block
- // is out of bound. But for next block we can find that.
- // If curr block's index key >= iterate_upper_bound, it
- // means all the keys in next block or above are out of
- // bound.
- if (IsNextBlockOutOfReadaheadBound()) {
- is_index_out_of_bound_ = true;
- break;
- }
- index_iter_->Next();
- is_index_at_curr_block_ = false;
- }
- #ifndef NDEBUG
- // To allow fault injection verification to pass since non-okay status in
- // `BlockCacheLookupForReadAheadSize()` won't fail the read but to have less
- // or no readahead
- if (!index_iter_->status().ok()) {
- IGNORE_STATUS_IF_ERROR(index_iter_->status());
- }
- #endif
- if (found_first_miss_block) {
- // Iterate cache hit block handles from the end till a Miss is there, to
- // truncate and update the end offset till that Miss.
- auto it = block_handles_->rbegin();
- auto it_end =
- block_handles_->rbegin() + (block_handles_->size() - prev_handles_size);
- while (it != it_end && (*it).is_cache_hit_ &&
- start_updated_offset != (*it).handle_.offset()) {
- it++;
- }
- end_updated_offset = (*it).handle_.offset() + footer + (*it).handle_.size();
- } else {
- // Nothing to read. Can be because of IOError in index_iter_->Next() or
- // reached upper_bound.
- end_updated_offset = start_updated_offset;
- }
- end_offset = end_updated_offset;
- start_offset = start_updated_offset;
- ResetPreviousBlockOffset();
- }
- BlockBasedTableIterator::MultiScanState::~MultiScanState() {
- // Abort any pending async IO operations to prevent callback being called
- // after async read states are destructed.
- if (!async_states.empty()) {
- std::vector<void*> io_handles_to_abort;
- std::vector<AsyncReadState*> states_to_cleanup;
- // Collect all pending IO handles
- for (size_t i = 0; i < async_states.size(); ++i) {
- auto& async_read = async_states[i];
- if (async_read.io_handle != nullptr) {
- assert(!async_read.finished);
- io_handles_to_abort.push_back(async_read.io_handle);
- states_to_cleanup.push_back(&async_read);
- }
- }
- if (!io_handles_to_abort.empty()) {
- IOStatus abort_status = fs->AbortIO(io_handles_to_abort);
- if (!abort_status.ok()) {
- #ifndef NDEBUG
- fprintf(stderr, "Error aborting async IO operations: %s\n",
- abort_status.ToString().c_str());
- #endif
- assert(false);
- }
- (void)abort_status; // Suppress unused variable warning
- }
- for (auto async_read : states_to_cleanup) {
- async_read->CleanUpIOHandle();
- }
- }
- }
- // Note:
- // - Iterator should not be reused for multiple multiscans or mixing
- // multiscan with regular iterator usage.
- // - scan ranges should be non-overlapping, and have increasing start keys.
- // If a scan range's limit is not set, then there should only be one scan range.
- // - After Prepare(), the iterator expects Seek to be called on the start key
- // of each ScanOption in order. If any other Seek is done, an error status is
- // returned
- // - Whenever all blocks of a scan opt are exhausted, the iterator will become
- // invalid and UpperBoundCheckResult() will return kOutOfBound. So that the
- // upper layer (LevelIterator) will stop scanning instead thinking EOF is
- // reached and continue into the next file. The only exception is for the last
- // scan opt. If we reach the end of the last scan opt, UpperBoundCheckResult()
- // will return kUnknown instead of kOutOfBound. This mechanism requires that
- // scan opts are properly pruned such that there is no scan opt that is after
- // this file's key range.
- // FIXME: DBIter and MergingIterator may
- // internally do Seek() on child iterators, e.g. due to
- // ReadOptions::max_skippable_internal_keys or reseeking into range deletion
- // end key. These Seeks will be handled properly, as long as the target is
- // moving forward.
- void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) {
- assert(!multi_scan_);
- if (!index_iter_->status().ok()) {
- multi_scan_status_ = index_iter_->status();
- return;
- }
- if (multi_scan_) {
- multi_scan_.reset();
- multi_scan_status_ = Status::InvalidArgument("Prepare already called");
- return;
- }
- index_iter_->Prepare(multiscan_opts);
- std::vector<BlockHandle> scan_block_handles;
- std::vector<std::string> data_block_separators;
- std::vector<std::tuple<size_t, size_t>> block_index_ranges_per_scan;
- const std::vector<ScanOptions>& scan_opts = multiscan_opts->GetScanRanges();
- multi_scan_status_ =
- CollectBlockHandles(scan_opts, &scan_block_handles,
- &block_index_ranges_per_scan, &data_block_separators);
- if (!multi_scan_status_.ok()) {
- return;
- }
- // Pin already cached blocks, collect remaining blocks to read
- std::vector<size_t> block_indices_to_read;
- std::vector<CachableEntry<Block>> pinned_data_blocks_guard(
- scan_block_handles.size());
- size_t prefetched_max_idx;
- multi_scan_status_ = FilterAndPinCachedBlocks(
- scan_block_handles, multiscan_opts, &block_indices_to_read,
- &pinned_data_blocks_guard, &prefetched_max_idx);
- if (!multi_scan_status_.ok()) {
- return;
- }
- std::vector<AsyncReadState> async_states;
- // Maps from block index into async read request (index into async_states[])
- UnorderedMap<size_t, size_t> block_idx_to_readreq_idx;
- if (!block_indices_to_read.empty()) {
- std::vector<FSReadRequest> read_reqs;
- std::vector<std::vector<size_t>> coalesced_block_indices;
- PrepareIORequests(block_indices_to_read, scan_block_handles, multiscan_opts,
- &read_reqs, &block_idx_to_readreq_idx,
- &coalesced_block_indices);
- multi_scan_status_ =
- ExecuteIO(scan_block_handles, multiscan_opts, coalesced_block_indices,
- &read_reqs, &async_states, &pinned_data_blocks_guard);
- if (!multi_scan_status_.ok()) {
- return;
- }
- }
- // Successful Prepare, init related states so the iterator reads from prepared
- // blocks.
- multi_scan_ = std::make_unique<MultiScanState>(
- table_->get_rep()->ioptions.env->GetFileSystem(), multiscan_opts,
- std::move(pinned_data_blocks_guard), std::move(data_block_separators),
- std::move(block_index_ranges_per_scan),
- std::move(block_idx_to_readreq_idx), std::move(async_states),
- prefetched_max_idx);
- is_index_at_curr_block_ = false;
- block_iter_points_to_real_block_ = false;
- }
- void BlockBasedTableIterator::SeekMultiScan(const Slice* seek_target) {
- assert(multi_scan_ && multi_scan_status_.ok());
- // This is a MultiScan and Preapre() has been called.
- // Reset out of bound on seek, if it is out of bound again, it will be set
- // properly later in the code path
- is_out_of_bound_ = false;
- // Validate seek key with scan options
- if (!seek_target) {
- // start key must be set for multi-scan
- multi_scan_status_ = Status::InvalidArgument("No seek key for MultiScan");
- return;
- }
- // Check the case where there is no range prepared on this table
- if (multi_scan_->scan_opts->size() == 0) {
- // out of bound
- MarkPreparedRangeExhausted();
- return;
- }
- // Check whether seek key is moving forward.
- if (multi_scan_->prev_seek_key_.empty() ||
- icomp_.Compare(*seek_target, multi_scan_->prev_seek_key_) > 0) {
- // If seek key is empty or is larger than previous seek key, update the
- // previous seek key. Otherwise use the previous seek key as the adjusted
- // seek target moving forward. This prevents seek target going backward,
- // which would visit pages that have been unpinned.
- // This issue is caused by sub-optimal range delete handling inside merge
- // iterator.
- // TODO xingbo issues:14068 : Optimize the handling of range delete iterator
- // inside merge iterator, so that it doesn't move seek key backward. After
- // that we could return error if the key moves backward here.
- multi_scan_->prev_seek_key_ = seek_target->ToString();
- } else {
- // Seek key is adjusted to previous one, we can return here directly.
- return;
- }
- // There are 3 different Cases we need to handle:
- // The following diagram explain different seek targets seeking at various
- // position on the table, while the next_scan_idx points to the PreparedRange
- // 2.
- //
- // next_scan_idx: -------------------┐
- // ▼
- // table: : __[PreparedRange 1]__[PreparedRange 2]__[PreparedRange 3]__
- // Seek target: <----- Case 1 ------>▲<------------- Case 2 -------------->
- // │
- // Case 3
- //
- // Case 1: seek before the start of next prepared ranges. This could happen
- // due to too many delete tomestone triggered reseek or delete range.
- // Case 2: seek after the start of next prepared range.
- // This could happen due to seek key adjustment from delete range file.
- // E.g. LSM has 3 levels, each level has only 1 file:
- // L1 : key : 0---10
- // L2 : Delete range key : 0-5
- // L3 : key : 0---10
- // When a range 2-8 was prepared, the prepared key would be 2 on L3 file,
- // but the seek key would be 5, as the seek key was updated by the largest
- // key of delete range. This causes all of the cases above to be possible,
- // when the ranges are adjusted in the above examples.
- // Case 3: seek at the beginning of a prepared range (expected case)
- // Allow reseek on the start of the last prepared range due to too many
- // tombstone
- multi_scan_->next_scan_idx =
- std::min(multi_scan_->next_scan_idx,
- multi_scan_->block_index_ranges_per_scan.size() - 1);
- auto user_seek_target = ExtractUserKey(*seek_target);
- auto compare_next_scan_start_result =
- user_comparator_.CompareWithoutTimestamp(
- user_seek_target, /*a_has_ts=*/true,
- multi_scan_->scan_opts->GetScanRanges()[multi_scan_->next_scan_idx]
- .range.start.value(),
- /*b_has_ts=*/false);
- if (compare_next_scan_start_result != 0) {
- // The seek target is not exactly same as what was prepared.
- if (compare_next_scan_start_result < 0) {
- // Case 1:
- if (multi_scan_->next_scan_idx == 0) {
- // This should not happen, even when seek target is adjusted by delete
- // range. The reason is that if the seek target is before the start key
- // of the first prepared range, its end key needs to be >= the smallest
- // key of this file, otherwise it is skipped in level iterator. If its
- // end key is >= the smallest key of this file, then this range will be
- // prepared for this file. As delete range could only adjust seek
- // target forward, so it would never be before the start key of the
- // first prepared range.
- assert(false && "Seek target before the first prepared range");
- MarkPreparedRangeExhausted();
- return;
- }
- auto seek_target_before_previous_prepared_range =
- user_comparator_.CompareWithoutTimestamp(
- user_seek_target, /*a_has_ts=*/true,
- multi_scan_->scan_opts
- ->GetScanRanges()[multi_scan_->next_scan_idx - 1]
- .range.start.value(),
- /*b_has_ts=*/false) < 0;
- // Not expected to happen
- // This should never happen, the reason is that the
- // multi_scan_->next_scan_idx is set to a non zero value is due to a seek
- // target larger or equal to the start key of multi_scan_->next_scan_idx-1
- // happended earlier. If a seek happens before the start key of
- // multi_scan_->next_scan_idx-1, it would seek a key that is less than
- // what was seeked before.
- assert(!seek_target_before_previous_prepared_range);
- if (seek_target_before_previous_prepared_range) {
- multi_scan_status_ = Status::InvalidArgument(
- "Seek target is before the previous prepared range at index " +
- std::to_string(multi_scan_->next_scan_idx));
- return;
- }
- // It should only be possible to seek a key between the start of current
- // prepared scan and start of next prepared range.
- MultiScanUnexpectedSeekTarget(
- seek_target, &user_seek_target,
- std::get<0>(multi_scan_->block_index_ranges_per_scan
- [multi_scan_->next_scan_idx - 1]));
- } else {
- // Case 2:
- MultiScanUnexpectedSeekTarget(
- seek_target, &user_seek_target,
- std::get<0>(
- multi_scan_
- ->block_index_ranges_per_scan[multi_scan_->next_scan_idx]));
- }
- } else {
- // Case 2:
- assert(multi_scan_->next_scan_idx <
- multi_scan_->block_index_ranges_per_scan.size());
- auto [cur_scan_start_idx, cur_scan_end_idx] =
- multi_scan_->block_index_ranges_per_scan[multi_scan_->next_scan_idx];
- // We should have the data block already loaded
- ++multi_scan_->next_scan_idx;
- if (cur_scan_start_idx >= cur_scan_end_idx) {
- // No blocks are prepared for this range at current file.
- MarkPreparedRangeExhausted();
- return;
- }
- MultiScanSeekTargetFromBlock(seek_target, cur_scan_start_idx);
- }
- }
- void BlockBasedTableIterator::MultiScanUnexpectedSeekTarget(
- const Slice* seek_target, const Slice* user_seek_target, size_t block_idx) {
- // linear search the block that contains the seek target, and unpin blocks
- // that are before it.
- // The logic here could be confusing when there is a delete range involved.
- // E.g. we have an LSM with 3 levels, each level has only 1 file:
- // L1: data file : 0---10
- // L2: Delete range : 0-5
- // L3: data file : 0---10
- //
- // MultiScan on ranges 1-2, 3-4, and 5-6.
- // When user first do Seek(1), on level 2, due to delete range 0-5, the seek
- // key is adjusted to 5 at level 3. Therefore, we will internally do Seek(5)
- // and unpins all blocks until 5 at level 3. Then the next scan's blocks from
- // 3-4 are unpinned at level 3. It is confusing that maybe block 3-4 should
- // not be unpinned, as next scan would need it. But Seek(5) implies that these
- // keys are all covered by some range deletion, so the next Seek(3) will also
- // do Seek(5) internally, so the blocks from 3-4 could be safely unpinned.
- // advance to the right prepared range
- while (
- multi_scan_->next_scan_idx <
- multi_scan_->block_index_ranges_per_scan.size() &&
- (user_comparator_.CompareWithoutTimestamp(
- *user_seek_target, /*a_has_ts=*/true,
- multi_scan_->scan_opts->GetScanRanges()[multi_scan_->next_scan_idx]
- .range.start.value(),
- /*b_has_ts=*/false) >= 0)) {
- multi_scan_->next_scan_idx++;
- }
- // next_scan_idx is guaranteed to be higher than 0. If the seek key is before
- // the start key of first prepared range, it is already handled by caller
- // SeekMultiScan. It is equal, it would not call this funciton. If it is
- // after, next_scan_idx would be advanced by the loop above.
- assert(multi_scan_->next_scan_idx > 0);
- // Get the current range
- auto cur_scan_idx = multi_scan_->next_scan_idx - 1;
- auto [cur_scan_start_idx, cur_scan_end_idx] =
- multi_scan_->block_index_ranges_per_scan[cur_scan_idx];
- if (cur_scan_start_idx >= cur_scan_end_idx) {
- // No blocks are prepared for this range at current file.
- MarkPreparedRangeExhausted();
- return;
- }
- // Unpin all the blocks from multi_scan_->cur_data_block_idx to
- // cur_scan_start_idx
- for (auto unpin_block_idx = multi_scan_->cur_data_block_idx;
- unpin_block_idx < cur_scan_start_idx; unpin_block_idx++) {
- if (!multi_scan_->pinned_data_blocks[unpin_block_idx].IsEmpty()) {
- multi_scan_->pinned_data_blocks[unpin_block_idx].Reset();
- }
- }
- // Find the right block_idx;
- block_idx = cur_scan_start_idx;
- auto const& data_block_separators = multi_scan_->data_block_separators;
- while (block_idx < data_block_separators.size() &&
- (user_comparator_.CompareWithoutTimestamp(
- *user_seek_target, /*a_has_ts=*/true,
- data_block_separators[block_idx],
- /*b_has_ts=*/false) > 0)) {
- // Unpin the blocks that are passed
- if (!multi_scan_->pinned_data_blocks[block_idx].IsEmpty()) {
- multi_scan_->pinned_data_blocks[block_idx].Reset();
- }
- block_idx++;
- }
- if (block_idx >= data_block_separators.size()) {
- // All of the prepared blocks for this file is exhausted.
- MarkPreparedRangeExhausted();
- return;
- }
- // The current block may contain the data for the target key
- MultiScanSeekTargetFromBlock(seek_target, block_idx);
- }
- void BlockBasedTableIterator::MultiScanSeekTargetFromBlock(
- const Slice* seek_target, size_t block_idx) {
- assert(multi_scan_->cur_data_block_idx <= block_idx);
- if (!block_iter_points_to_real_block_ ||
- multi_scan_->cur_data_block_idx != block_idx) {
- if (block_iter_points_to_real_block_) {
- // Should be scan in increasing key range.
- // All blocks before cur_data_block_idx_ are not pinned anymore.
- assert(multi_scan_->cur_data_block_idx < block_idx);
- }
- ResetDataIter();
- if (MultiScanLoadDataBlock(block_idx)) {
- return;
- }
- }
- // Move current data block index forward until block_idx, meantime, unpin all
- // the blocks in between
- while (multi_scan_->cur_data_block_idx < block_idx) {
- // unpin block
- if (!multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx]
- .IsEmpty()) {
- multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx].Reset();
- }
- multi_scan_->cur_data_block_idx++;
- }
- block_iter_points_to_real_block_ = true;
- block_iter_.Seek(*seek_target);
- FindKeyForward();
- CheckOutOfBound();
- }
- void BlockBasedTableIterator::FindBlockForwardInMultiScan() {
- assert(multi_scan_);
- assert(multi_scan_->next_scan_idx >= 1);
- const auto cur_scan_end_idx = std::get<1>(
- multi_scan_->block_index_ranges_per_scan[multi_scan_->next_scan_idx - 1]);
- do {
- if (!block_iter_.status().ok()) {
- return;
- }
- // If is_out_of_bound_ is true, upper layer (LevelIterator) considers this
- // level has reached iterate_upper_bound_ and will not continue to iterate
- // into the next file. When we are doing the last scan within a MultiScan
- // for this file, it may need to continue to scan into the next file, so
- // we do not set is_out_of_bound_ in this case.
- if (multi_scan_->cur_data_block_idx + 1 >= cur_scan_end_idx) {
- MarkPreparedRangeExhausted();
- return;
- }
- // Move to the next pinned data block
- ResetDataIter();
- // Unpin previous block if it is not reset by data iterator
- if (!multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx]
- .IsEmpty()) {
- multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx].Reset();
- }
- ++multi_scan_->cur_data_block_idx;
- if (MultiScanLoadDataBlock(multi_scan_->cur_data_block_idx)) {
- return;
- }
- block_iter_points_to_real_block_ = true;
- block_iter_.SeekToFirst();
- } while (!block_iter_.Valid());
- }
- Status BlockBasedTableIterator::PollForBlock(size_t idx) {
- assert(multi_scan_);
- const auto async_idx = multi_scan_->block_idx_to_readreq_idx.find(idx);
- if (async_idx == multi_scan_->block_idx_to_readreq_idx.end()) {
- // Did not require async read, should already be pinned.
- assert(multi_scan_->pinned_data_blocks[idx].GetValue());
- return Status::OK();
- }
- AsyncReadState& async_read = multi_scan_->async_states[async_idx->second];
- if (async_read.finished) {
- assert(async_read.io_handle == nullptr);
- assert(async_read.status.ok());
- return async_read.status;
- }
- {
- std::vector<void*> handles = {async_read.io_handle};
- Status poll_s =
- table_->get_rep()->ioptions.env->GetFileSystem()->Poll(handles, 1);
- if (!poll_s.ok()) {
- return poll_s;
- }
- }
- assert(async_read.status.ok());
- if (!async_read.status.ok()) {
- return async_read.status;
- }
- async_read.CleanUpIOHandle();
- // Initialize and pin blocks from async read result.
- for (size_t i = 0; i < async_read.blocks.size(); ++i) {
- const auto& block = async_read.blocks[i];
- Status s = CreateAndPinBlockFromBuffer(
- block, async_read.offset, async_read.result,
- multi_scan_->pinned_data_blocks[async_read.block_indices[i]]);
- if (!s.ok()) {
- return s;
- }
- assert(multi_scan_->pinned_data_blocks[async_read.block_indices[i]]
- .GetValue());
- }
- assert(multi_scan_->pinned_data_blocks[idx].GetValue());
- return Status::OK();
- }
- Status BlockBasedTableIterator::CreateAndPinBlockFromBuffer(
- const BlockHandle& block, uint64_t buffer_start_offset,
- const Slice& buffer_data, CachableEntry<Block>& pinned_block_entry) {
- // Get decompressor and handle dictionary loading
- UnownedPtr<Decompressor> decompressor = table_->get_rep()->decompressor.get();
- CachableEntry<DecompressorDict> cached_dict;
- if (table_->get_rep()->uncompression_dict_reader) {
- {
- Status s =
- table_->get_rep()
- ->uncompression_dict_reader->GetOrReadUncompressionDictionary(
- /* prefetch_buffer= */ nullptr, read_options_,
- /* get_context= */ nullptr, /* lookup_context= */ nullptr,
- &cached_dict);
- if (!s.ok()) {
- #ifndef NDEBUG
- fprintf(stdout, "Prepare dictionary loading failed with %s\n",
- s.ToString().c_str());
- #endif
- return s;
- }
- }
- if (!cached_dict.GetValue()) {
- #ifndef NDEBUG
- fprintf(stdout, "Success but no dictionary read\n");
- #endif
- return Status::InvalidArgument("No dictionary found");
- }
- decompressor = cached_dict.GetValue()->decompressor_.get();
- }
- // Create block from buffer data
- const auto block_size_with_trailer =
- BlockBasedTable::BlockSizeWithTrailer(block);
- const auto block_offset_in_buffer = block.offset() - buffer_start_offset;
- CacheAllocationPtr data =
- AllocateBlock(block_size_with_trailer,
- GetMemoryAllocator(table_->get_rep()->table_options));
- memcpy(data.get(), buffer_data.data() + block_offset_in_buffer,
- block_size_with_trailer);
- BlockContents tmp_contents(std::move(data), block.size());
- #ifndef NDEBUG
- tmp_contents.has_trailer =
- table_->get_rep()->footer.GetBlockTrailerSize() > 0;
- #endif
- return table_->CreateAndPinBlockInCache<Block_kData>(
- read_options_, block, decompressor, &tmp_contents,
- &pinned_block_entry.As<Block_kData>());
- }
- constexpr auto kVerbose = false;
- Status BlockBasedTableIterator::CollectBlockHandles(
- const std::vector<ScanOptions>& scan_opts,
- std::vector<BlockHandle>* scan_block_handles,
- std::vector<std::tuple<size_t, size_t>>* block_index_ranges_per_scan,
- std::vector<std::string>* data_block_separators) {
- // print file name and level
- if (UNLIKELY(kVerbose)) {
- auto file_name = table_->get_rep()->file->file_name();
- auto level = table_->get_rep()->level;
- printf("file name : %s, level %d\n", file_name.c_str(), level);
- }
- for (const auto& scan_opt : scan_opts) {
- size_t num_blocks = 0;
- bool check_overlap = !scan_block_handles->empty();
- InternalKey start_key;
- const size_t timestamp_size =
- user_comparator_.user_comparator()->timestamp_size();
- if (timestamp_size == 0) {
- start_key = InternalKey(scan_opt.range.start.value(), kMaxSequenceNumber,
- kValueTypeForSeek);
- } else {
- std::string seek_key;
- AppendKeyWithMaxTimestamp(&seek_key, scan_opt.range.start.value(),
- timestamp_size);
- start_key = InternalKey(seek_key, kMaxSequenceNumber, kValueTypeForSeek);
- }
- index_iter_->Seek(start_key.Encode());
- while (index_iter_->status().ok() && index_iter_->Valid() &&
- (!scan_opt.range.limit.has_value() ||
- user_comparator_.CompareWithoutTimestamp(index_iter_->user_key(),
- /*a_has_ts*/ true,
- *scan_opt.range.limit,
- /*b_has_ts=*/false) < 0)) {
- // Only add the block if the index separator is smaller than limit. When
- // they are equal or larger, it will be handled later below.
- if (check_overlap &&
- scan_block_handles->back() == index_iter_->value().handle) {
- // Skip the current block since it's already in the list
- } else {
- scan_block_handles->push_back(index_iter_->value().handle);
- // clone the Slice to avoid the lifetime issue
- data_block_separators->push_back(index_iter_->user_key().ToString());
- }
- ++num_blocks;
- index_iter_->Next();
- check_overlap = false;
- }
- if (!index_iter_->status().ok()) {
- // Abort: index iterator error
- return index_iter_->status();
- }
- if (index_iter_->Valid()) {
- // Handle the last block when its separator is equal or larger than limit
- if (check_overlap &&
- scan_block_handles->back() == index_iter_->value().handle) {
- // Skip adding the current block since it's already in the list
- } else {
- scan_block_handles->push_back(index_iter_->value().handle);
- data_block_separators->push_back(index_iter_->user_key().ToString());
- }
- ++num_blocks;
- }
- block_index_ranges_per_scan->emplace_back(
- scan_block_handles->size() - num_blocks, scan_block_handles->size());
- if (UNLIKELY(kVerbose)) {
- printf("separators :");
- for (const auto& separator : *data_block_separators) {
- printf("%s, ", separator.c_str());
- }
- printf("\nblock_index_ranges_per_scan :");
- for (auto const& block_index_range : *block_index_ranges_per_scan) {
- printf("[%zu, %zu], ", std::get<0>(block_index_range),
- std::get<1>(block_index_range));
- }
- printf("\n");
- }
- }
- return Status::OK();
- }
- Status BlockBasedTableIterator::FilterAndPinCachedBlocks(
- const std::vector<BlockHandle>& scan_block_handles,
- const MultiScanArgs* multiscan_opts,
- std::vector<size_t>* block_indices_to_read,
- std::vector<CachableEntry<Block>>* pinned_data_blocks_guard,
- size_t* prefetched_max_idx) {
- uint64_t total_prefetch_size = 0;
- *prefetched_max_idx = scan_block_handles.size();
- for (size_t i = 0; i < scan_block_handles.size(); ++i) {
- const auto& data_block_handle = scan_block_handles[i];
- total_prefetch_size +=
- BlockBasedTable::BlockSizeWithTrailer(data_block_handle);
- if (multiscan_opts->max_prefetch_size > 0 &&
- total_prefetch_size > multiscan_opts->max_prefetch_size) {
- for (size_t j = i; j < scan_block_handles.size(); ++j) {
- assert((*pinned_data_blocks_guard)[j].IsEmpty());
- }
- *prefetched_max_idx = i;
- break;
- }
- Status s = table_->LookupAndPinBlocksInCache<Block_kData>(
- read_options_, data_block_handle,
- &(*pinned_data_blocks_guard)[i].As<Block_kData>());
- if (!s.ok()) {
- // Abort: block cache look up failed.
- return s;
- }
- if (!(*pinned_data_blocks_guard)[i].GetValue()) {
- // Block not in cache
- block_indices_to_read->emplace_back(i);
- }
- }
- return Status::OK();
- }
- void BlockBasedTableIterator::PrepareIORequests(
- const std::vector<size_t>& block_indices_to_read,
- const std::vector<BlockHandle>& scan_block_handles,
- const MultiScanArgs* multiscan_opts, std::vector<FSReadRequest>* read_reqs,
- UnorderedMap<size_t, size_t>* block_idx_to_readreq_idx,
- std::vector<std::vector<size_t>>* coalesced_block_indices) {
- assert(coalesced_block_indices->empty());
- coalesced_block_indices->resize(1);
- for (const auto& block_idx : block_indices_to_read) {
- if (!coalesced_block_indices->back().empty()) {
- // Check if we can coalesce.
- const auto& last_block_handle =
- scan_block_handles[coalesced_block_indices->back().back()];
- uint64_t last_block_end =
- last_block_handle.offset() +
- BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
- uint64_t current_start = scan_block_handles[block_idx].offset();
- if (current_start >
- last_block_end + multiscan_opts->io_coalesce_threshold) {
- // new IO
- coalesced_block_indices->emplace_back();
- }
- }
- coalesced_block_indices->back().emplace_back(block_idx);
- }
- assert(read_reqs->empty());
- read_reqs->reserve(coalesced_block_indices->size());
- for (const auto& block_indices : *coalesced_block_indices) {
- assert(block_indices.size());
- const auto& first_block_handle = scan_block_handles[block_indices[0]];
- const auto& last_block_handle = scan_block_handles[block_indices.back()];
- const auto start_offset = first_block_handle.offset();
- const auto end_offset =
- last_block_handle.offset() +
- BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
- #ifndef NDEBUG
- // Debug print for failing the assertion below.
- if (start_offset >= end_offset) {
- fprintf(stderr, "scan_block_handles: ");
- for (const auto& block : scan_block_handles) {
- fprintf(stderr, "offset: %" PRIu64 ", size: %" PRIu64 "; ",
- block.offset(), block.size());
- }
- fprintf(stderr,
- "\nfirst block - offset: %" PRIu64 ", size: %" PRIu64 "\n",
- first_block_handle.offset(), first_block_handle.size());
- fprintf(stderr, "last block - offset: %" PRIu64 ", size: %" PRIu64 "\n",
- last_block_handle.offset(), last_block_handle.size());
- fprintf(stderr, "coalesced_block_indices: ");
- for (const auto& b : *coalesced_block_indices) {
- fprintf(stderr, "[");
- for (const auto& block_idx : b) {
- fprintf(stderr, "%zu ", block_idx);
- }
- fprintf(stderr, "] ");
- }
- fprintf(stderr, "\ncurrent blocks: ");
- for (const auto& block_idx : block_indices) {
- fprintf(stderr, "offset: %" PRIu64 ", size: %" PRIu64 "; ",
- scan_block_handles[block_idx].offset(),
- scan_block_handles[block_idx].size());
- }
- fprintf(stderr, "\n");
- }
- #endif // NDEBUG
- assert(end_offset > start_offset);
- read_reqs->emplace_back();
- read_reqs->back().offset = start_offset;
- read_reqs->back().len = end_offset - start_offset;
- if (multiscan_opts->use_async_io) {
- for (const auto& block_idx : block_indices) {
- (*block_idx_to_readreq_idx)[block_idx] = read_reqs->size() - 1;
- }
- }
- }
- }
- Status BlockBasedTableIterator::ExecuteIO(
- const std::vector<BlockHandle>& scan_block_handles,
- const MultiScanArgs* multiscan_opts,
- const std::vector<std::vector<size_t>>& coalesced_block_indices,
- std::vector<FSReadRequest>* read_reqs,
- std::vector<AsyncReadState>* async_states,
- std::vector<CachableEntry<Block>>* pinned_data_blocks_guard) {
- IOOptions io_opts;
- Status s;
- s = table_->get_rep()->file->PrepareIOOptions(read_options_, io_opts);
- if (!s.ok()) {
- // Abort: PrepareIOOptions failed
- return s;
- }
- const bool direct_io = table_->get_rep()->file->use_direct_io();
- if (multiscan_opts->use_async_io) {
- async_states->resize(read_reqs->size());
- for (size_t i = 0; i < read_reqs->size(); ++i) {
- auto& read_req = (*read_reqs)[i];
- auto& async_read = (*async_states)[i];
- async_read.finished = false;
- async_read.offset = read_req.offset;
- async_read.block_indices = coalesced_block_indices[i];
- for (const auto idx : coalesced_block_indices[i]) {
- async_read.blocks.emplace_back(scan_block_handles[idx]);
- }
- if (direct_io) {
- read_req.scratch = nullptr;
- } else {
- async_read.buf.reset(new char[read_req.len]);
- read_req.scratch = async_read.buf.get();
- }
- auto cb = std::bind(&BlockBasedTableIterator::PrepareReadAsyncCallBack,
- this, std::placeholders::_1, std::placeholders::_2);
- // TODO: for mmap, io_handle will not be set but callback will already
- // be called.
- s = table_->get_rep()->file.get()->ReadAsync(
- read_req, io_opts, cb, &async_read, &async_read.io_handle,
- &async_read.del_fn, direct_io ? &async_read.aligned_buf : nullptr);
- if (!s.ok()) {
- #ifndef NDEBUG
- fprintf(stderr, "ReadAsync failed with %s\n", s.ToString().c_str());
- #endif
- assert(false);
- return s;
- }
- assert(async_read.io_handle);
- for (auto& req : *read_reqs) {
- if (!req.status.ok()) {
- assert(false);
- // Silence compiler warning about NRVO
- s = req.status;
- return s;
- }
- }
- }
- } else {
- // Synchronous IO using MultiRead
- std::unique_ptr<char[]> buf;
- if (direct_io) {
- for (auto& read_req : *read_reqs) {
- read_req.scratch = nullptr;
- }
- } else {
- // TODO: optimize if FSSupportedOps::kFSBuffer is supported.
- size_t total_len = 0;
- for (const auto& req : *read_reqs) {
- total_len += req.len;
- }
- buf.reset(new char[total_len]);
- size_t offset = 0;
- for (auto& read_req : *read_reqs) {
- read_req.scratch = buf.get() + offset;
- offset += read_req.len;
- }
- }
- AlignedBuf aligned_buf;
- s = table_->get_rep()->file->MultiRead(io_opts, read_reqs->data(),
- read_reqs->size(),
- direct_io ? &aligned_buf : nullptr);
- if (!s.ok()) {
- return s;
- }
- for (auto& req : *read_reqs) {
- if (!req.status.ok()) {
- // Silence compiler warning about NRVO
- s = req.status;
- return s;
- }
- }
- // Init blocks and pin them in block cache.
- assert(read_reqs->size() == coalesced_block_indices.size());
- for (size_t i = 0; i < coalesced_block_indices.size(); i++) {
- const auto& read_req = (*read_reqs)[i];
- for (const auto& block_idx : coalesced_block_indices[i]) {
- const auto& block = scan_block_handles[block_idx];
- assert((*pinned_data_blocks_guard)[block_idx].IsEmpty());
- s = CreateAndPinBlockFromBuffer(block, read_req.offset, read_req.result,
- (*pinned_data_blocks_guard)[block_idx]);
- if (!s.ok()) {
- assert(false);
- // Abort: failed to create and pin block in cache
- return s;
- }
- assert((*pinned_data_blocks_guard)[block_idx].GetValue());
- }
- }
- }
- return s;
- }
- } // namespace ROCKSDB_NAMESPACE
|