| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/forward_iterator.h"
- #include <limits>
- #include <string>
- #include <utility>
- #include "db/column_family.h"
- #include "db/db_impl/db_impl.h"
- #include "db/db_iter.h"
- #include "db/dbformat.h"
- #include "db/job_context.h"
- #include "db/range_del_aggregator.h"
- #include "db/range_tombstone_fragmenter.h"
- #include "rocksdb/env.h"
- #include "rocksdb/slice.h"
- #include "rocksdb/slice_transform.h"
- #include "table/merging_iterator.h"
- #include "test_util/sync_point.h"
- #include "util/string_util.h"
- namespace ROCKSDB_NAMESPACE {
- // Usage:
- // ForwardLevelIterator iter;
- // iter.SetFileIndex(file_index);
- // iter.Seek(target); // or iter.SeekToFirst();
- // iter.Next()
- class ForwardLevelIterator : public InternalIterator {
- public:
- ForwardLevelIterator(const ColumnFamilyData* const cfd,
- const ReadOptions& read_options,
- const std::vector<FileMetaData*>& files,
- const MutableCFOptions& mutable_cf_options,
- bool allow_unprepared_value)
- : cfd_(cfd),
- read_options_(read_options),
- files_(files),
- valid_(false),
- file_index_(std::numeric_limits<uint32_t>::max()),
- file_iter_(nullptr),
- pinned_iters_mgr_(nullptr),
- mutable_cf_options_(mutable_cf_options),
- allow_unprepared_value_(allow_unprepared_value) {
- status_.PermitUncheckedError(); // Allow uninitialized status through
- }
- ~ForwardLevelIterator() override {
- // Reset current pointer
- if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
- pinned_iters_mgr_->PinIterator(file_iter_);
- } else {
- delete file_iter_;
- }
- }
- void SetFileIndex(uint32_t file_index) {
- assert(file_index < files_.size());
- status_ = Status::OK();
- if (file_index != file_index_) {
- file_index_ = file_index;
- Reset();
- }
- }
- void Reset() {
- assert(file_index_ < files_.size());
- // Reset current pointer
- if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
- pinned_iters_mgr_->PinIterator(file_iter_);
- } else {
- delete file_iter_;
- }
- ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
- kMaxSequenceNumber /* upper_bound */);
- file_iter_ = cfd_->table_cache()->NewIterator(
- read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
- *files_[file_index_],
- read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- mutable_cf_options_, /*table_reader_ptr=*/nullptr,
- /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
- /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
- /*max_file_size_for_l0_meta_pin=*/0,
- /*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
- file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
- valid_ = false;
- if (!range_del_agg.IsEmpty()) {
- status_ = Status::NotSupported(
- "Range tombstones unsupported with ForwardIterator");
- }
- }
- void SeekToLast() override {
- status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
- valid_ = false;
- }
- void Prev() override {
- status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
- valid_ = false;
- }
- bool Valid() const override { return valid_; }
- void SeekToFirst() override {
- assert(file_iter_ != nullptr);
- if (!status_.ok()) {
- assert(!valid_);
- return;
- }
- file_iter_->SeekToFirst();
- valid_ = file_iter_->Valid();
- }
- void Seek(const Slice& internal_key) override {
- assert(file_iter_ != nullptr);
- // This deviates from the usual convention for InternalIterator::Seek() in
- // that it doesn't discard pre-existing error status. That's because this
- // Seek() is only supposed to be called immediately after SetFileIndex()
- // (which discards pre-existing error status), and SetFileIndex() may set
- // an error status, which we shouldn't discard.
- if (!status_.ok()) {
- assert(!valid_);
- return;
- }
- file_iter_->Seek(internal_key);
- valid_ = file_iter_->Valid();
- }
- void SeekForPrev(const Slice& /*internal_key*/) override {
- status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
- valid_ = false;
- }
- void Next() override {
- assert(valid_);
- file_iter_->Next();
- for (;;) {
- valid_ = file_iter_->Valid();
- if (!file_iter_->status().ok()) {
- assert(!valid_);
- return;
- }
- if (valid_) {
- return;
- }
- if (file_index_ + 1 >= files_.size()) {
- valid_ = false;
- return;
- }
- SetFileIndex(file_index_ + 1);
- if (!status_.ok()) {
- assert(!valid_);
- return;
- }
- file_iter_->SeekToFirst();
- }
- }
- Slice key() const override {
- assert(valid_);
- return file_iter_->key();
- }
- Slice value() const override {
- assert(valid_);
- return file_iter_->value();
- }
- uint64_t write_unix_time() const override {
- assert(valid_);
- return file_iter_->write_unix_time();
- }
- Status status() const override {
- if (!status_.ok()) {
- return status_;
- } else if (file_iter_) {
- return file_iter_->status();
- }
- return Status::OK();
- }
- bool PrepareValue() override {
- assert(valid_);
- if (file_iter_->PrepareValue()) {
- return true;
- }
- assert(!file_iter_->Valid());
- valid_ = false;
- return false;
- }
- bool IsKeyPinned() const override {
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- file_iter_->IsKeyPinned();
- }
- bool IsValuePinned() const override {
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- file_iter_->IsValuePinned();
- }
- void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
- pinned_iters_mgr_ = pinned_iters_mgr;
- if (file_iter_) {
- file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- }
- private:
- const ColumnFamilyData* const cfd_;
- const ReadOptions& read_options_;
- const std::vector<FileMetaData*>& files_;
- bool valid_;
- uint32_t file_index_;
- Status status_;
- InternalIterator* file_iter_;
- PinnedIteratorsManager* pinned_iters_mgr_;
- const MutableCFOptions& mutable_cf_options_;
- const bool allow_unprepared_value_;
- };
- ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
- ColumnFamilyData* cfd,
- SuperVersion* current_sv,
- bool allow_unprepared_value)
- : db_(db),
- read_options_(read_options),
- cfd_(cfd),
- prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
- user_comparator_(cfd->user_comparator()),
- allow_unprepared_value_(allow_unprepared_value),
- immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
- sv_(current_sv),
- mutable_iter_(nullptr),
- current_(nullptr),
- valid_(false),
- status_(Status::OK()),
- immutable_status_(Status::OK()),
- has_iter_trimmed_for_upper_bound_(false),
- current_over_upper_bound_(false),
- is_prev_set_(false),
- is_prev_inclusive_(false),
- pinned_iters_mgr_(nullptr) {
- if (sv_) {
- RebuildIterators(false);
- }
- if (!CheckFSFeatureSupport(cfd_->ioptions().env->GetFileSystem().get(),
- FSSupportedOps::kAsyncIO)) {
- read_options_.async_io = false;
- }
- // immutable_status_ is a local aggregation of the
- // status of the immutable Iterators.
- // We have to PermitUncheckedError in case it is never
- // used, otherwise it will fail ASSERT_STATUS_CHECKED.
- immutable_status_.PermitUncheckedError();
- }
- ForwardIterator::~ForwardIterator() { Cleanup(true); }
- void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
- bool background_purge_on_iterator_cleanup) {
- if (sv->Unref()) {
- // Job id == 0 means that this is not our background process, but rather
- // user thread
- JobContext job_context(0);
- db->mutex_.Lock();
- sv->Cleanup();
- db->FindObsoleteFiles(&job_context, false, true);
- if (background_purge_on_iterator_cleanup) {
- db->ScheduleBgLogWriterClose(&job_context);
- db->AddSuperVersionsToFreeQueue(sv);
- db->SchedulePurge();
- }
- db->mutex_.Unlock();
- if (!background_purge_on_iterator_cleanup) {
- delete sv;
- }
- if (job_context.HaveSomethingToDelete()) {
- db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
- }
- job_context.Clean();
- }
- }
- namespace {
- struct SVCleanupParams {
- DBImpl* db;
- SuperVersion* sv;
- bool background_purge_on_iterator_cleanup;
- };
- } // anonymous namespace
- // Used in PinnedIteratorsManager to release pinned SuperVersion
- void ForwardIterator::DeferredSVCleanup(void* arg) {
- auto d = static_cast<SVCleanupParams*>(arg);
- ForwardIterator::SVCleanup(d->db, d->sv,
- d->background_purge_on_iterator_cleanup);
- delete d;
- }
- void ForwardIterator::SVCleanup() {
- if (sv_ == nullptr) {
- return;
- }
- bool background_purge =
- read_options_.background_purge_on_iterator_cleanup ||
- db_->immutable_db_options().avoid_unnecessary_blocking_io;
- if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
- // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
- // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
- // The slices may point into some memtables owned by sv_, so we need to keep
- // sv_ referenced until pinned_iters_mgr_ unpins everything.
- auto p = new SVCleanupParams{db_, sv_, background_purge};
- pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
- } else {
- SVCleanup(db_, sv_, background_purge);
- }
- }
- void ForwardIterator::Cleanup(bool release_sv) {
- if (mutable_iter_ != nullptr) {
- DeleteIterator(mutable_iter_, true /* is_arena */);
- }
- for (auto* m : imm_iters_) {
- DeleteIterator(m, true /* is_arena */);
- }
- imm_iters_.clear();
- for (auto* f : l0_iters_) {
- DeleteIterator(f);
- }
- l0_iters_.clear();
- for (auto* l : level_iters_) {
- DeleteIterator(l);
- }
- level_iters_.clear();
- if (release_sv) {
- SVCleanup();
- }
- }
- bool ForwardIterator::Valid() const {
- // See UpdateCurrent().
- return valid_ ? !current_over_upper_bound_ : false;
- }
- void ForwardIterator::SeekToFirst() {
- if (sv_ == nullptr) {
- RebuildIterators(true);
- } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
- RenewIterators();
- } else if (immutable_status_.IsIncomplete()) {
- ResetIncompleteIterators();
- }
- SeekInternal(Slice(), true, false);
- }
- bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
- return !(read_options_.iterate_upper_bound == nullptr ||
- cfd_->internal_comparator().user_comparator()->Compare(
- ExtractUserKey(internal_key),
- *read_options_.iterate_upper_bound) < 0);
- }
- void ForwardIterator::Seek(const Slice& internal_key) {
- if (sv_ == nullptr) {
- RebuildIterators(true);
- } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
- RenewIterators();
- } else if (immutable_status_.IsIncomplete()) {
- ResetIncompleteIterators();
- }
- SeekInternal(internal_key, false, false);
- if (read_options_.async_io) {
- SeekInternal(internal_key, false, true);
- }
- }
- // In case of async_io, SeekInternal is called twice with seek_after_async_io
- // enabled in second call which only does seeking part to retrieve the blocks.
- void ForwardIterator::SeekInternal(const Slice& internal_key,
- bool seek_to_first,
- bool seek_after_async_io) {
- assert(mutable_iter_);
- // mutable
- if (!seek_after_async_io) {
- seek_to_first ? mutable_iter_->SeekToFirst()
- : mutable_iter_->Seek(internal_key);
- }
- // immutable
- // TODO(ljin): NeedToSeekImmutable has negative impact on performance
- // if it turns to need to seek immutable often. We probably want to have
- // an option to turn it off.
- if (seek_to_first || seek_after_async_io ||
- NeedToSeekImmutable(internal_key)) {
- if (!seek_after_async_io) {
- immutable_status_ = Status::OK();
- if (has_iter_trimmed_for_upper_bound_ &&
- (
- // prev_ is not set yet
- is_prev_set_ == false ||
- // We are doing SeekToFirst() and internal_key.size() = 0
- seek_to_first ||
- // prev_key_ > internal_key
- cfd_->internal_comparator().InternalKeyComparator::Compare(
- prev_key_.GetInternalKey(), internal_key) > 0)) {
- // Some iterators are trimmed. Need to rebuild.
- RebuildIterators(true);
- // Already seeked mutable iter, so seek again
- seek_to_first ? mutable_iter_->SeekToFirst()
- : mutable_iter_->Seek(internal_key);
- }
- {
- auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
- immutable_min_heap_.swap(tmp);
- }
- for (size_t i = 0; i < imm_iters_.size(); i++) {
- auto* m = imm_iters_[i];
- seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
- if (!m->status().ok()) {
- immutable_status_ = m->status();
- } else if (m->Valid()) {
- immutable_min_heap_.push(m);
- }
- }
- }
- Slice target_user_key;
- if (!seek_to_first) {
- target_user_key = ExtractUserKey(internal_key);
- }
- const VersionStorageInfo* vstorage = sv_->current->storage_info();
- const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
- for (size_t i = 0; i < l0.size(); ++i) {
- if (!l0_iters_[i]) {
- continue;
- }
- if (seek_after_async_io) {
- if (!l0_iters_[i]->status().IsTryAgain()) {
- continue;
- }
- }
- if (seek_to_first) {
- l0_iters_[i]->SeekToFirst();
- } else {
- // If the target key passes over the largest key, we are sure Next()
- // won't go over this file.
- if (seek_after_async_io == false &&
- user_comparator_->Compare(target_user_key,
- l0[i]->largest.user_key()) > 0) {
- if (read_options_.iterate_upper_bound != nullptr) {
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(l0_iters_[i]);
- l0_iters_[i] = nullptr;
- }
- continue;
- }
- l0_iters_[i]->Seek(internal_key);
- }
- if (l0_iters_[i]->status().IsTryAgain()) {
- assert(!seek_after_async_io);
- continue;
- } else if (!l0_iters_[i]->status().ok()) {
- immutable_status_ = l0_iters_[i]->status();
- } else if (l0_iters_[i]->Valid() &&
- !IsOverUpperBound(l0_iters_[i]->key())) {
- immutable_min_heap_.push(l0_iters_[i]);
- } else {
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(l0_iters_[i]);
- l0_iters_[i] = nullptr;
- }
- }
- for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
- const std::vector<FileMetaData*>& level_files =
- vstorage->LevelFiles(level);
- if (level_files.empty()) {
- continue;
- }
- if (level_iters_[level - 1] == nullptr) {
- continue;
- }
- if (seek_after_async_io) {
- if (!level_iters_[level - 1]->status().IsTryAgain()) {
- continue;
- }
- }
- uint32_t f_idx = 0;
- if (!seek_to_first && !seek_after_async_io) {
- f_idx = FindFileInRange(level_files, internal_key, 0,
- static_cast<uint32_t>(level_files.size()));
- }
- // Seek
- if (seek_after_async_io || f_idx < level_files.size()) {
- if (!seek_after_async_io) {
- level_iters_[level - 1]->SetFileIndex(f_idx);
- }
- seek_to_first ? level_iters_[level - 1]->SeekToFirst()
- : level_iters_[level - 1]->Seek(internal_key);
- if (level_iters_[level - 1]->status().IsTryAgain()) {
- assert(!seek_after_async_io);
- continue;
- } else if (!level_iters_[level - 1]->status().ok()) {
- immutable_status_ = level_iters_[level - 1]->status();
- } else if (level_iters_[level - 1]->Valid() &&
- !IsOverUpperBound(level_iters_[level - 1]->key())) {
- immutable_min_heap_.push(level_iters_[level - 1]);
- } else {
- // Nothing in this level is interesting. Remove.
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(level_iters_[level - 1]);
- level_iters_[level - 1] = nullptr;
- }
- }
- }
- if (seek_to_first) {
- is_prev_set_ = false;
- } else {
- prev_key_.SetInternalKey(internal_key);
- is_prev_set_ = true;
- is_prev_inclusive_ = true;
- }
- TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
- } else if (current_ && current_ != mutable_iter_) {
- // current_ is one of immutable iterators, push it back to the heap
- immutable_min_heap_.push(current_);
- }
- // For async_io, it should be updated when seek_after_async_io is true (in
- // second call).
- if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
- UpdateCurrent();
- }
- TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
- }
- void ForwardIterator::Next() {
- assert(valid_);
- bool update_prev_key = false;
- if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
- std::string current_key = key().ToString();
- Slice old_key(current_key.data(), current_key.size());
- if (sv_ == nullptr) {
- RebuildIterators(true);
- } else {
- RenewIterators();
- }
- SeekInternal(old_key, false, false);
- if (read_options_.async_io) {
- SeekInternal(old_key, false, true);
- }
- if (!valid_ || key().compare(old_key) != 0) {
- return;
- }
- } else if (current_ != mutable_iter_) {
- // It is going to advance immutable iterator
- if (is_prev_set_ && prefix_extractor_) {
- // advance prev_key_ to current_ only if they share the same prefix
- update_prev_key =
- prefix_extractor_->Transform(prev_key_.GetUserKey())
- .compare(prefix_extractor_->Transform(current_->key())) == 0;
- } else {
- update_prev_key = true;
- }
- if (update_prev_key) {
- prev_key_.SetInternalKey(current_->key());
- is_prev_set_ = true;
- is_prev_inclusive_ = false;
- }
- }
- current_->Next();
- if (current_ != mutable_iter_) {
- if (!current_->status().ok()) {
- immutable_status_ = current_->status();
- } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
- immutable_min_heap_.push(current_);
- } else {
- if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
- // remove the current iterator
- DeleteCurrentIter();
- current_ = nullptr;
- }
- if (update_prev_key) {
- mutable_iter_->Seek(prev_key_.GetInternalKey());
- }
- }
- }
- UpdateCurrent();
- TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
- }
- Slice ForwardIterator::key() const {
- assert(valid_);
- return current_->key();
- }
- uint64_t ForwardIterator::write_unix_time() const {
- assert(valid_);
- return current_->write_unix_time();
- }
- Slice ForwardIterator::value() const {
- assert(valid_);
- return current_->value();
- }
- Status ForwardIterator::status() const {
- if (!status_.ok()) {
- return status_;
- } else if (!mutable_iter_->status().ok()) {
- return mutable_iter_->status();
- }
- return immutable_status_;
- }
- bool ForwardIterator::PrepareValue() {
- assert(valid_);
- if (current_->PrepareValue()) {
- return true;
- }
- assert(!current_->Valid());
- assert(!current_->status().ok());
- assert(current_ != mutable_iter_); // memtable iterator can't fail
- assert(immutable_status_.ok());
- valid_ = false;
- immutable_status_ = current_->status();
- return false;
- }
- Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
- assert(prop != nullptr);
- if (prop_name == "rocksdb.iterator.super-version-number") {
- *prop = std::to_string(sv_->version_number);
- return Status::OK();
- }
- return Status::InvalidArgument("Unrecognized property: " + prop_name);
- }
- void ForwardIterator::SetPinnedItersMgr(
- PinnedIteratorsManager* pinned_iters_mgr) {
- pinned_iters_mgr_ = pinned_iters_mgr;
- UpdateChildrenPinnedItersMgr();
- }
- void ForwardIterator::UpdateChildrenPinnedItersMgr() {
- // Set PinnedIteratorsManager for mutable memtable iterator.
- if (mutable_iter_) {
- mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- // Set PinnedIteratorsManager for immutable memtable iterators.
- for (InternalIterator* child_iter : imm_iters_) {
- if (child_iter) {
- child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- }
- // Set PinnedIteratorsManager for L0 files iterators.
- for (InternalIterator* child_iter : l0_iters_) {
- if (child_iter) {
- child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- }
- // Set PinnedIteratorsManager for L1+ levels iterators.
- for (ForwardLevelIterator* child_iter : level_iters_) {
- if (child_iter) {
- child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- }
- }
- bool ForwardIterator::IsKeyPinned() const {
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- current_->IsKeyPinned();
- }
- bool ForwardIterator::IsValuePinned() const {
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- current_->IsValuePinned();
- }
- void ForwardIterator::RebuildIterators(bool refresh_sv) {
- // Clean up
- Cleanup(refresh_sv);
- if (refresh_sv) {
- // New
- sv_ = cfd_->GetReferencedSuperVersion(db_);
- }
- ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
- kMaxSequenceNumber /* upper_bound */);
- UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
- sv_->GetSeqnoToTimeMapping();
- mutable_iter_ =
- sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
- sv_->mutable_cf_options.prefix_extractor.get(),
- /*for_flush=*/false);
- sv_->imm->AddIterators(read_options_, seqno_to_time_mapping,
- sv_->mutable_cf_options.prefix_extractor.get(),
- &imm_iters_, &arena_);
- if (!read_options_.ignore_range_deletions) {
- std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
- sv_->mem->NewRangeTombstoneIterator(
- read_options_, sv_->current->version_set()->LastSequence(),
- false /* immutable_memtable */));
- range_del_agg.AddTombstones(std::move(range_del_iter));
- // Always return Status::OK().
- Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
- &range_del_agg);
- assert(temp_s.ok());
- }
- has_iter_trimmed_for_upper_bound_ = false;
- const auto* vstorage = sv_->current->storage_info();
- const auto& l0_files = vstorage->LevelFiles(0);
- l0_iters_.reserve(l0_files.size());
- for (const auto* l0 : l0_files) {
- if ((read_options_.iterate_upper_bound != nullptr) &&
- cfd_->internal_comparator().user_comparator()->Compare(
- l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
- // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
- // will never be interested in files with smallest key above
- // iterate_upper_bound, since iterate_upper_bound can't be changed.
- l0_iters_.push_back(nullptr);
- continue;
- }
- l0_iters_.push_back(cfd_->table_cache()->NewIterator(
- read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
- read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- sv_->mutable_cf_options,
- /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
- TableReaderCaller::kUserIterator, /*arena=*/nullptr,
- /*skip_filters=*/false, /*level=*/-1,
- MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
- /*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
- }
- BuildLevelIterators(vstorage, sv_);
- current_ = nullptr;
- is_prev_set_ = false;
- UpdateChildrenPinnedItersMgr();
- if (!range_del_agg.IsEmpty()) {
- status_ = Status::NotSupported(
- "Range tombstones unsupported with ForwardIterator");
- valid_ = false;
- }
- }
- void ForwardIterator::RenewIterators() {
- SuperVersion* svnew;
- assert(sv_);
- svnew = cfd_->GetReferencedSuperVersion(db_);
- if (mutable_iter_ != nullptr) {
- DeleteIterator(mutable_iter_, true /* is_arena */);
- }
- for (auto* m : imm_iters_) {
- DeleteIterator(m, true /* is_arena */);
- }
- imm_iters_.clear();
- UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
- svnew->GetSeqnoToTimeMapping();
- mutable_iter_ =
- svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
- svnew->mutable_cf_options.prefix_extractor.get(),
- /*for_flush=*/false);
- svnew->imm->AddIterators(read_options_, seqno_to_time_mapping,
- svnew->mutable_cf_options.prefix_extractor.get(),
- &imm_iters_, &arena_);
- ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
- kMaxSequenceNumber /* upper_bound */);
- if (!read_options_.ignore_range_deletions) {
- std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
- svnew->mem->NewRangeTombstoneIterator(
- read_options_, sv_->current->version_set()->LastSequence(),
- false /* immutable_memtable */));
- range_del_agg.AddTombstones(std::move(range_del_iter));
- // Always return Status::OK().
- Status temp_s = svnew->imm->AddRangeTombstoneIterators(
- read_options_, &arena_, &range_del_agg);
- assert(temp_s.ok());
- }
- const auto* vstorage = sv_->current->storage_info();
- const auto& l0_files = vstorage->LevelFiles(0);
- const auto* vstorage_new = svnew->current->storage_info();
- const auto& l0_files_new = vstorage_new->LevelFiles(0);
- size_t iold, inew;
- bool found;
- std::vector<InternalIterator*> l0_iters_new;
- l0_iters_new.reserve(l0_files_new.size());
- for (inew = 0; inew < l0_files_new.size(); inew++) {
- found = false;
- for (iold = 0; iold < l0_files.size(); iold++) {
- if (l0_files[iold] == l0_files_new[inew]) {
- found = true;
- break;
- }
- }
- if (found) {
- if (l0_iters_[iold] == nullptr) {
- l0_iters_new.push_back(nullptr);
- TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
- } else {
- l0_iters_new.push_back(l0_iters_[iold]);
- l0_iters_[iold] = nullptr;
- TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
- }
- continue;
- }
- l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
- read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
- *l0_files_new[inew],
- read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- svnew->mutable_cf_options,
- /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
- TableReaderCaller::kUserIterator, /*arena=*/nullptr,
- /*skip_filters=*/false, /*level=*/-1,
- MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
- /*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
- }
- for (auto* f : l0_iters_) {
- DeleteIterator(f);
- }
- l0_iters_.clear();
- l0_iters_ = l0_iters_new;
- for (auto* l : level_iters_) {
- DeleteIterator(l);
- }
- level_iters_.clear();
- BuildLevelIterators(vstorage_new, svnew);
- current_ = nullptr;
- is_prev_set_ = false;
- SVCleanup();
- sv_ = svnew;
- UpdateChildrenPinnedItersMgr();
- if (!range_del_agg.IsEmpty()) {
- status_ = Status::NotSupported(
- "Range tombstones unsupported with ForwardIterator");
- valid_ = false;
- }
- }
- void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
- SuperVersion* sv) {
- level_iters_.reserve(vstorage->num_levels() - 1);
- for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
- const auto& level_files = vstorage->LevelFiles(level);
- if ((level_files.empty()) ||
- ((read_options_.iterate_upper_bound != nullptr) &&
- (user_comparator_->Compare(*read_options_.iterate_upper_bound,
- level_files[0]->smallest.user_key()) <
- 0))) {
- level_iters_.push_back(nullptr);
- if (!level_files.empty()) {
- has_iter_trimmed_for_upper_bound_ = true;
- }
- } else {
- level_iters_.push_back(new ForwardLevelIterator(
- cfd_, read_options_, level_files, sv->mutable_cf_options,
- allow_unprepared_value_));
- }
- }
- }
- void ForwardIterator::ResetIncompleteIterators() {
- const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
- for (size_t i = 0; i < l0_iters_.size(); ++i) {
- assert(i < l0_files.size());
- if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
- continue;
- }
- DeleteIterator(l0_iters_[i]);
- l0_iters_[i] = cfd_->table_cache()->NewIterator(
- read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
- *l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
- /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
- TableReaderCaller::kUserIterator, /*arena=*/nullptr,
- /*skip_filters=*/false, /*level=*/-1,
- MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
- /*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
- l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
- }
- for (auto* level_iter : level_iters_) {
- if (level_iter && level_iter->status().IsIncomplete()) {
- level_iter->Reset();
- }
- }
- current_ = nullptr;
- is_prev_set_ = false;
- }
- void ForwardIterator::UpdateCurrent() {
- if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
- current_ = nullptr;
- } else if (immutable_min_heap_.empty()) {
- current_ = mutable_iter_;
- } else if (!mutable_iter_->Valid()) {
- current_ = immutable_min_heap_.top();
- immutable_min_heap_.pop();
- } else {
- current_ = immutable_min_heap_.top();
- assert(current_ != nullptr);
- assert(current_->Valid());
- int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
- mutable_iter_->key(), current_->key());
- assert(cmp != 0);
- if (cmp > 0) {
- immutable_min_heap_.pop();
- } else {
- current_ = mutable_iter_;
- }
- }
- valid_ = current_ != nullptr && immutable_status_.ok();
- if (!status_.ok()) {
- status_ = Status::OK();
- }
- // Upper bound doesn't apply to the memtable iterator. We want Valid() to
- // return false when all iterators are over iterate_upper_bound, but can't
- // just set valid_ to false, as that would effectively disable the tailing
- // optimization (Seek() would be called on all immutable iterators regardless
- // of whether the target key is greater than prev_key_).
- current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
- }
- bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
- // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
- // such that there are no records with keys within that range in
- // immutable_min_heap_. Since immutable structures (SST files and immutable
- // memtables) can't change in this version, we don't need to do a seek if
- // 'target' belongs to that interval (immutable_min_heap_.top() is already
- // at the correct position).
- if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
- return true;
- }
- Slice prev_key = prev_key_.GetInternalKey();
- if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
- prefix_extractor_->Transform(prev_key)) != 0) {
- return true;
- }
- if (cfd_->internal_comparator().InternalKeyComparator::Compare(
- prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
- return true;
- }
- if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
- // Nothing to seek on.
- return false;
- }
- if (cfd_->internal_comparator().InternalKeyComparator::Compare(
- target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
- : current_->key()) > 0) {
- return true;
- }
- return false;
- }
- void ForwardIterator::DeleteCurrentIter() {
- const VersionStorageInfo* vstorage = sv_->current->storage_info();
- const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
- for (size_t i = 0; i < l0.size(); ++i) {
- if (!l0_iters_[i]) {
- continue;
- }
- if (l0_iters_[i] == current_) {
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(l0_iters_[i]);
- l0_iters_[i] = nullptr;
- return;
- }
- }
- for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
- if (level_iters_[level - 1] == nullptr) {
- continue;
- }
- if (level_iters_[level - 1] == current_) {
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(level_iters_[level - 1]);
- level_iters_[level - 1] = nullptr;
- }
- }
- }
- bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
- int* pnum_iters) {
- bool retval = false;
- int deleted_iters = 0;
- int num_iters = 0;
- const VersionStorageInfo* vstorage = sv_->current->storage_info();
- const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
- for (size_t i = 0; i < l0.size(); ++i) {
- if (!l0_iters_[i]) {
- retval = true;
- deleted_iters++;
- } else {
- num_iters++;
- }
- }
- for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
- if ((level_iters_[level - 1] == nullptr) &&
- (!vstorage->LevelFiles(level).empty())) {
- retval = true;
- deleted_iters++;
- } else if (!vstorage->LevelFiles(level).empty()) {
- num_iters++;
- }
- }
- if ((!retval) && num_iters <= 1) {
- retval = true;
- }
- if (pdeleted_iters) {
- *pdeleted_iters = deleted_iters;
- }
- if (pnum_iters) {
- *pnum_iters = num_iters;
- }
- return retval;
- }
- uint32_t ForwardIterator::FindFileInRange(
- const std::vector<FileMetaData*>& files, const Slice& internal_key,
- uint32_t left, uint32_t right) {
- auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
- return cfd_->internal_comparator().InternalKeyComparator::Compare(
- f->largest.Encode(), k) < 0;
- };
- const auto& b = files.begin();
- return static_cast<uint32_t>(
- std::lower_bound(b + left, b + right, internal_key, cmp) - b);
- }
- void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
- if (iter == nullptr) {
- return;
- }
- if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
- pinned_iters_mgr_->PinIterator(iter, is_arena);
- } else {
- if (is_arena) {
- iter->~InternalIterator();
- } else {
- delete iter;
- }
- }
- }
- } // namespace ROCKSDB_NAMESPACE
|