forward_iterator.cc 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/forward_iterator.h"
  6. #include <limits>
  7. #include <string>
  8. #include <utility>
  9. #include "db/column_family.h"
  10. #include "db/db_impl/db_impl.h"
  11. #include "db/db_iter.h"
  12. #include "db/dbformat.h"
  13. #include "db/job_context.h"
  14. #include "db/range_del_aggregator.h"
  15. #include "db/range_tombstone_fragmenter.h"
  16. #include "rocksdb/env.h"
  17. #include "rocksdb/slice.h"
  18. #include "rocksdb/slice_transform.h"
  19. #include "table/merging_iterator.h"
  20. #include "test_util/sync_point.h"
  21. #include "util/string_util.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. // Usage:
  24. // ForwardLevelIterator iter;
  25. // iter.SetFileIndex(file_index);
  26. // iter.Seek(target); // or iter.SeekToFirst();
  27. // iter.Next()
  28. class ForwardLevelIterator : public InternalIterator {
  29. public:
  30. ForwardLevelIterator(const ColumnFamilyData* const cfd,
  31. const ReadOptions& read_options,
  32. const std::vector<FileMetaData*>& files,
  33. const MutableCFOptions& mutable_cf_options,
  34. bool allow_unprepared_value)
  35. : cfd_(cfd),
  36. read_options_(read_options),
  37. files_(files),
  38. valid_(false),
  39. file_index_(std::numeric_limits<uint32_t>::max()),
  40. file_iter_(nullptr),
  41. pinned_iters_mgr_(nullptr),
  42. mutable_cf_options_(mutable_cf_options),
  43. allow_unprepared_value_(allow_unprepared_value) {
  44. status_.PermitUncheckedError(); // Allow uninitialized status through
  45. }
  46. ~ForwardLevelIterator() override {
  47. // Reset current pointer
  48. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  49. pinned_iters_mgr_->PinIterator(file_iter_);
  50. } else {
  51. delete file_iter_;
  52. }
  53. }
  54. void SetFileIndex(uint32_t file_index) {
  55. assert(file_index < files_.size());
  56. status_ = Status::OK();
  57. if (file_index != file_index_) {
  58. file_index_ = file_index;
  59. Reset();
  60. }
  61. }
  62. void Reset() {
  63. assert(file_index_ < files_.size());
  64. // Reset current pointer
  65. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  66. pinned_iters_mgr_->PinIterator(file_iter_);
  67. } else {
  68. delete file_iter_;
  69. }
  70. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  71. kMaxSequenceNumber /* upper_bound */);
  72. file_iter_ = cfd_->table_cache()->NewIterator(
  73. read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
  74. *files_[file_index_],
  75. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  76. mutable_cf_options_, /*table_reader_ptr=*/nullptr,
  77. /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
  78. /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
  79. /*max_file_size_for_l0_meta_pin=*/0,
  80. /*smallest_compaction_key=*/nullptr,
  81. /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
  82. file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  83. valid_ = false;
  84. if (!range_del_agg.IsEmpty()) {
  85. status_ = Status::NotSupported(
  86. "Range tombstones unsupported with ForwardIterator");
  87. }
  88. }
  89. void SeekToLast() override {
  90. status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
  91. valid_ = false;
  92. }
  93. void Prev() override {
  94. status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
  95. valid_ = false;
  96. }
  97. bool Valid() const override { return valid_; }
  98. void SeekToFirst() override {
  99. assert(file_iter_ != nullptr);
  100. if (!status_.ok()) {
  101. assert(!valid_);
  102. return;
  103. }
  104. file_iter_->SeekToFirst();
  105. valid_ = file_iter_->Valid();
  106. }
  107. void Seek(const Slice& internal_key) override {
  108. assert(file_iter_ != nullptr);
  109. // This deviates from the usual convention for InternalIterator::Seek() in
  110. // that it doesn't discard pre-existing error status. That's because this
  111. // Seek() is only supposed to be called immediately after SetFileIndex()
  112. // (which discards pre-existing error status), and SetFileIndex() may set
  113. // an error status, which we shouldn't discard.
  114. if (!status_.ok()) {
  115. assert(!valid_);
  116. return;
  117. }
  118. file_iter_->Seek(internal_key);
  119. valid_ = file_iter_->Valid();
  120. }
  121. void SeekForPrev(const Slice& /*internal_key*/) override {
  122. status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
  123. valid_ = false;
  124. }
  125. void Next() override {
  126. assert(valid_);
  127. file_iter_->Next();
  128. for (;;) {
  129. valid_ = file_iter_->Valid();
  130. if (!file_iter_->status().ok()) {
  131. assert(!valid_);
  132. return;
  133. }
  134. if (valid_) {
  135. return;
  136. }
  137. if (file_index_ + 1 >= files_.size()) {
  138. valid_ = false;
  139. return;
  140. }
  141. SetFileIndex(file_index_ + 1);
  142. if (!status_.ok()) {
  143. assert(!valid_);
  144. return;
  145. }
  146. file_iter_->SeekToFirst();
  147. }
  148. }
  149. Slice key() const override {
  150. assert(valid_);
  151. return file_iter_->key();
  152. }
  153. Slice value() const override {
  154. assert(valid_);
  155. return file_iter_->value();
  156. }
  157. uint64_t write_unix_time() const override {
  158. assert(valid_);
  159. return file_iter_->write_unix_time();
  160. }
  161. Status status() const override {
  162. if (!status_.ok()) {
  163. return status_;
  164. } else if (file_iter_) {
  165. return file_iter_->status();
  166. }
  167. return Status::OK();
  168. }
  169. bool PrepareValue() override {
  170. assert(valid_);
  171. if (file_iter_->PrepareValue()) {
  172. return true;
  173. }
  174. assert(!file_iter_->Valid());
  175. valid_ = false;
  176. return false;
  177. }
  178. bool IsKeyPinned() const override {
  179. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  180. file_iter_->IsKeyPinned();
  181. }
  182. bool IsValuePinned() const override {
  183. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  184. file_iter_->IsValuePinned();
  185. }
  186. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  187. pinned_iters_mgr_ = pinned_iters_mgr;
  188. if (file_iter_) {
  189. file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  190. }
  191. }
  192. private:
  193. const ColumnFamilyData* const cfd_;
  194. const ReadOptions& read_options_;
  195. const std::vector<FileMetaData*>& files_;
  196. bool valid_;
  197. uint32_t file_index_;
  198. Status status_;
  199. InternalIterator* file_iter_;
  200. PinnedIteratorsManager* pinned_iters_mgr_;
  201. const MutableCFOptions& mutable_cf_options_;
  202. const bool allow_unprepared_value_;
  203. };
  204. ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
  205. ColumnFamilyData* cfd,
  206. SuperVersion* current_sv,
  207. bool allow_unprepared_value)
  208. : db_(db),
  209. read_options_(read_options),
  210. cfd_(cfd),
  211. prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
  212. user_comparator_(cfd->user_comparator()),
  213. allow_unprepared_value_(allow_unprepared_value),
  214. immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
  215. sv_(current_sv),
  216. mutable_iter_(nullptr),
  217. current_(nullptr),
  218. valid_(false),
  219. status_(Status::OK()),
  220. immutable_status_(Status::OK()),
  221. has_iter_trimmed_for_upper_bound_(false),
  222. current_over_upper_bound_(false),
  223. is_prev_set_(false),
  224. is_prev_inclusive_(false),
  225. pinned_iters_mgr_(nullptr) {
  226. if (sv_) {
  227. RebuildIterators(false);
  228. }
  229. if (!CheckFSFeatureSupport(cfd_->ioptions().env->GetFileSystem().get(),
  230. FSSupportedOps::kAsyncIO)) {
  231. read_options_.async_io = false;
  232. }
  233. // immutable_status_ is a local aggregation of the
  234. // status of the immutable Iterators.
  235. // We have to PermitUncheckedError in case it is never
  236. // used, otherwise it will fail ASSERT_STATUS_CHECKED.
  237. immutable_status_.PermitUncheckedError();
  238. }
  239. ForwardIterator::~ForwardIterator() { Cleanup(true); }
  240. void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
  241. bool background_purge_on_iterator_cleanup) {
  242. if (sv->Unref()) {
  243. // Job id == 0 means that this is not our background process, but rather
  244. // user thread
  245. JobContext job_context(0);
  246. db->mutex_.Lock();
  247. sv->Cleanup();
  248. db->FindObsoleteFiles(&job_context, false, true);
  249. if (background_purge_on_iterator_cleanup) {
  250. db->ScheduleBgLogWriterClose(&job_context);
  251. db->AddSuperVersionsToFreeQueue(sv);
  252. db->SchedulePurge();
  253. }
  254. db->mutex_.Unlock();
  255. if (!background_purge_on_iterator_cleanup) {
  256. delete sv;
  257. }
  258. if (job_context.HaveSomethingToDelete()) {
  259. db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
  260. }
  261. job_context.Clean();
  262. }
  263. }
  264. namespace {
  265. struct SVCleanupParams {
  266. DBImpl* db;
  267. SuperVersion* sv;
  268. bool background_purge_on_iterator_cleanup;
  269. };
  270. } // anonymous namespace
  271. // Used in PinnedIteratorsManager to release pinned SuperVersion
  272. void ForwardIterator::DeferredSVCleanup(void* arg) {
  273. auto d = static_cast<SVCleanupParams*>(arg);
  274. ForwardIterator::SVCleanup(d->db, d->sv,
  275. d->background_purge_on_iterator_cleanup);
  276. delete d;
  277. }
  278. void ForwardIterator::SVCleanup() {
  279. if (sv_ == nullptr) {
  280. return;
  281. }
  282. bool background_purge =
  283. read_options_.background_purge_on_iterator_cleanup ||
  284. db_->immutable_db_options().avoid_unnecessary_blocking_io;
  285. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  286. // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
  287. // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
  288. // The slices may point into some memtables owned by sv_, so we need to keep
  289. // sv_ referenced until pinned_iters_mgr_ unpins everything.
  290. auto p = new SVCleanupParams{db_, sv_, background_purge};
  291. pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
  292. } else {
  293. SVCleanup(db_, sv_, background_purge);
  294. }
  295. }
  296. void ForwardIterator::Cleanup(bool release_sv) {
  297. if (mutable_iter_ != nullptr) {
  298. DeleteIterator(mutable_iter_, true /* is_arena */);
  299. }
  300. for (auto* m : imm_iters_) {
  301. DeleteIterator(m, true /* is_arena */);
  302. }
  303. imm_iters_.clear();
  304. for (auto* f : l0_iters_) {
  305. DeleteIterator(f);
  306. }
  307. l0_iters_.clear();
  308. for (auto* l : level_iters_) {
  309. DeleteIterator(l);
  310. }
  311. level_iters_.clear();
  312. if (release_sv) {
  313. SVCleanup();
  314. }
  315. }
  316. bool ForwardIterator::Valid() const {
  317. // See UpdateCurrent().
  318. return valid_ ? !current_over_upper_bound_ : false;
  319. }
  320. void ForwardIterator::SeekToFirst() {
  321. if (sv_ == nullptr) {
  322. RebuildIterators(true);
  323. } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
  324. RenewIterators();
  325. } else if (immutable_status_.IsIncomplete()) {
  326. ResetIncompleteIterators();
  327. }
  328. SeekInternal(Slice(), true, false);
  329. }
  330. bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
  331. return !(read_options_.iterate_upper_bound == nullptr ||
  332. cfd_->internal_comparator().user_comparator()->Compare(
  333. ExtractUserKey(internal_key),
  334. *read_options_.iterate_upper_bound) < 0);
  335. }
  336. void ForwardIterator::Seek(const Slice& internal_key) {
  337. if (sv_ == nullptr) {
  338. RebuildIterators(true);
  339. } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
  340. RenewIterators();
  341. } else if (immutable_status_.IsIncomplete()) {
  342. ResetIncompleteIterators();
  343. }
  344. SeekInternal(internal_key, false, false);
  345. if (read_options_.async_io) {
  346. SeekInternal(internal_key, false, true);
  347. }
  348. }
  349. // In case of async_io, SeekInternal is called twice with seek_after_async_io
  350. // enabled in second call which only does seeking part to retrieve the blocks.
  351. void ForwardIterator::SeekInternal(const Slice& internal_key,
  352. bool seek_to_first,
  353. bool seek_after_async_io) {
  354. assert(mutable_iter_);
  355. // mutable
  356. if (!seek_after_async_io) {
  357. seek_to_first ? mutable_iter_->SeekToFirst()
  358. : mutable_iter_->Seek(internal_key);
  359. }
  360. // immutable
  361. // TODO(ljin): NeedToSeekImmutable has negative impact on performance
  362. // if it turns to need to seek immutable often. We probably want to have
  363. // an option to turn it off.
  364. if (seek_to_first || seek_after_async_io ||
  365. NeedToSeekImmutable(internal_key)) {
  366. if (!seek_after_async_io) {
  367. immutable_status_ = Status::OK();
  368. if (has_iter_trimmed_for_upper_bound_ &&
  369. (
  370. // prev_ is not set yet
  371. is_prev_set_ == false ||
  372. // We are doing SeekToFirst() and internal_key.size() = 0
  373. seek_to_first ||
  374. // prev_key_ > internal_key
  375. cfd_->internal_comparator().InternalKeyComparator::Compare(
  376. prev_key_.GetInternalKey(), internal_key) > 0)) {
  377. // Some iterators are trimmed. Need to rebuild.
  378. RebuildIterators(true);
  379. // Already seeked mutable iter, so seek again
  380. seek_to_first ? mutable_iter_->SeekToFirst()
  381. : mutable_iter_->Seek(internal_key);
  382. }
  383. {
  384. auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
  385. immutable_min_heap_.swap(tmp);
  386. }
  387. for (size_t i = 0; i < imm_iters_.size(); i++) {
  388. auto* m = imm_iters_[i];
  389. seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
  390. if (!m->status().ok()) {
  391. immutable_status_ = m->status();
  392. } else if (m->Valid()) {
  393. immutable_min_heap_.push(m);
  394. }
  395. }
  396. }
  397. Slice target_user_key;
  398. if (!seek_to_first) {
  399. target_user_key = ExtractUserKey(internal_key);
  400. }
  401. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  402. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  403. for (size_t i = 0; i < l0.size(); ++i) {
  404. if (!l0_iters_[i]) {
  405. continue;
  406. }
  407. if (seek_after_async_io) {
  408. if (!l0_iters_[i]->status().IsTryAgain()) {
  409. continue;
  410. }
  411. }
  412. if (seek_to_first) {
  413. l0_iters_[i]->SeekToFirst();
  414. } else {
  415. // If the target key passes over the largest key, we are sure Next()
  416. // won't go over this file.
  417. if (seek_after_async_io == false &&
  418. user_comparator_->Compare(target_user_key,
  419. l0[i]->largest.user_key()) > 0) {
  420. if (read_options_.iterate_upper_bound != nullptr) {
  421. has_iter_trimmed_for_upper_bound_ = true;
  422. DeleteIterator(l0_iters_[i]);
  423. l0_iters_[i] = nullptr;
  424. }
  425. continue;
  426. }
  427. l0_iters_[i]->Seek(internal_key);
  428. }
  429. if (l0_iters_[i]->status().IsTryAgain()) {
  430. assert(!seek_after_async_io);
  431. continue;
  432. } else if (!l0_iters_[i]->status().ok()) {
  433. immutable_status_ = l0_iters_[i]->status();
  434. } else if (l0_iters_[i]->Valid() &&
  435. !IsOverUpperBound(l0_iters_[i]->key())) {
  436. immutable_min_heap_.push(l0_iters_[i]);
  437. } else {
  438. has_iter_trimmed_for_upper_bound_ = true;
  439. DeleteIterator(l0_iters_[i]);
  440. l0_iters_[i] = nullptr;
  441. }
  442. }
  443. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  444. const std::vector<FileMetaData*>& level_files =
  445. vstorage->LevelFiles(level);
  446. if (level_files.empty()) {
  447. continue;
  448. }
  449. if (level_iters_[level - 1] == nullptr) {
  450. continue;
  451. }
  452. if (seek_after_async_io) {
  453. if (!level_iters_[level - 1]->status().IsTryAgain()) {
  454. continue;
  455. }
  456. }
  457. uint32_t f_idx = 0;
  458. if (!seek_to_first && !seek_after_async_io) {
  459. f_idx = FindFileInRange(level_files, internal_key, 0,
  460. static_cast<uint32_t>(level_files.size()));
  461. }
  462. // Seek
  463. if (seek_after_async_io || f_idx < level_files.size()) {
  464. if (!seek_after_async_io) {
  465. level_iters_[level - 1]->SetFileIndex(f_idx);
  466. }
  467. seek_to_first ? level_iters_[level - 1]->SeekToFirst()
  468. : level_iters_[level - 1]->Seek(internal_key);
  469. if (level_iters_[level - 1]->status().IsTryAgain()) {
  470. assert(!seek_after_async_io);
  471. continue;
  472. } else if (!level_iters_[level - 1]->status().ok()) {
  473. immutable_status_ = level_iters_[level - 1]->status();
  474. } else if (level_iters_[level - 1]->Valid() &&
  475. !IsOverUpperBound(level_iters_[level - 1]->key())) {
  476. immutable_min_heap_.push(level_iters_[level - 1]);
  477. } else {
  478. // Nothing in this level is interesting. Remove.
  479. has_iter_trimmed_for_upper_bound_ = true;
  480. DeleteIterator(level_iters_[level - 1]);
  481. level_iters_[level - 1] = nullptr;
  482. }
  483. }
  484. }
  485. if (seek_to_first) {
  486. is_prev_set_ = false;
  487. } else {
  488. prev_key_.SetInternalKey(internal_key);
  489. is_prev_set_ = true;
  490. is_prev_inclusive_ = true;
  491. }
  492. TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
  493. } else if (current_ && current_ != mutable_iter_) {
  494. // current_ is one of immutable iterators, push it back to the heap
  495. immutable_min_heap_.push(current_);
  496. }
  497. // For async_io, it should be updated when seek_after_async_io is true (in
  498. // second call).
  499. if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
  500. UpdateCurrent();
  501. }
  502. TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
  503. }
  504. void ForwardIterator::Next() {
  505. assert(valid_);
  506. bool update_prev_key = false;
  507. if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
  508. std::string current_key = key().ToString();
  509. Slice old_key(current_key.data(), current_key.size());
  510. if (sv_ == nullptr) {
  511. RebuildIterators(true);
  512. } else {
  513. RenewIterators();
  514. }
  515. SeekInternal(old_key, false, false);
  516. if (read_options_.async_io) {
  517. SeekInternal(old_key, false, true);
  518. }
  519. if (!valid_ || key().compare(old_key) != 0) {
  520. return;
  521. }
  522. } else if (current_ != mutable_iter_) {
  523. // It is going to advance immutable iterator
  524. if (is_prev_set_ && prefix_extractor_) {
  525. // advance prev_key_ to current_ only if they share the same prefix
  526. update_prev_key =
  527. prefix_extractor_->Transform(prev_key_.GetUserKey())
  528. .compare(prefix_extractor_->Transform(current_->key())) == 0;
  529. } else {
  530. update_prev_key = true;
  531. }
  532. if (update_prev_key) {
  533. prev_key_.SetInternalKey(current_->key());
  534. is_prev_set_ = true;
  535. is_prev_inclusive_ = false;
  536. }
  537. }
  538. current_->Next();
  539. if (current_ != mutable_iter_) {
  540. if (!current_->status().ok()) {
  541. immutable_status_ = current_->status();
  542. } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
  543. immutable_min_heap_.push(current_);
  544. } else {
  545. if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
  546. // remove the current iterator
  547. DeleteCurrentIter();
  548. current_ = nullptr;
  549. }
  550. if (update_prev_key) {
  551. mutable_iter_->Seek(prev_key_.GetInternalKey());
  552. }
  553. }
  554. }
  555. UpdateCurrent();
  556. TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
  557. }
  558. Slice ForwardIterator::key() const {
  559. assert(valid_);
  560. return current_->key();
  561. }
  562. uint64_t ForwardIterator::write_unix_time() const {
  563. assert(valid_);
  564. return current_->write_unix_time();
  565. }
  566. Slice ForwardIterator::value() const {
  567. assert(valid_);
  568. return current_->value();
  569. }
  570. Status ForwardIterator::status() const {
  571. if (!status_.ok()) {
  572. return status_;
  573. } else if (!mutable_iter_->status().ok()) {
  574. return mutable_iter_->status();
  575. }
  576. return immutable_status_;
  577. }
  578. bool ForwardIterator::PrepareValue() {
  579. assert(valid_);
  580. if (current_->PrepareValue()) {
  581. return true;
  582. }
  583. assert(!current_->Valid());
  584. assert(!current_->status().ok());
  585. assert(current_ != mutable_iter_); // memtable iterator can't fail
  586. assert(immutable_status_.ok());
  587. valid_ = false;
  588. immutable_status_ = current_->status();
  589. return false;
  590. }
  591. Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
  592. assert(prop != nullptr);
  593. if (prop_name == "rocksdb.iterator.super-version-number") {
  594. *prop = std::to_string(sv_->version_number);
  595. return Status::OK();
  596. }
  597. return Status::InvalidArgument("Unrecognized property: " + prop_name);
  598. }
  599. void ForwardIterator::SetPinnedItersMgr(
  600. PinnedIteratorsManager* pinned_iters_mgr) {
  601. pinned_iters_mgr_ = pinned_iters_mgr;
  602. UpdateChildrenPinnedItersMgr();
  603. }
  604. void ForwardIterator::UpdateChildrenPinnedItersMgr() {
  605. // Set PinnedIteratorsManager for mutable memtable iterator.
  606. if (mutable_iter_) {
  607. mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  608. }
  609. // Set PinnedIteratorsManager for immutable memtable iterators.
  610. for (InternalIterator* child_iter : imm_iters_) {
  611. if (child_iter) {
  612. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  613. }
  614. }
  615. // Set PinnedIteratorsManager for L0 files iterators.
  616. for (InternalIterator* child_iter : l0_iters_) {
  617. if (child_iter) {
  618. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  619. }
  620. }
  621. // Set PinnedIteratorsManager for L1+ levels iterators.
  622. for (ForwardLevelIterator* child_iter : level_iters_) {
  623. if (child_iter) {
  624. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  625. }
  626. }
  627. }
  628. bool ForwardIterator::IsKeyPinned() const {
  629. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  630. current_->IsKeyPinned();
  631. }
  632. bool ForwardIterator::IsValuePinned() const {
  633. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  634. current_->IsValuePinned();
  635. }
  636. void ForwardIterator::RebuildIterators(bool refresh_sv) {
  637. // Clean up
  638. Cleanup(refresh_sv);
  639. if (refresh_sv) {
  640. // New
  641. sv_ = cfd_->GetReferencedSuperVersion(db_);
  642. }
  643. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  644. kMaxSequenceNumber /* upper_bound */);
  645. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
  646. sv_->GetSeqnoToTimeMapping();
  647. mutable_iter_ =
  648. sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
  649. sv_->mutable_cf_options.prefix_extractor.get(),
  650. /*for_flush=*/false);
  651. sv_->imm->AddIterators(read_options_, seqno_to_time_mapping,
  652. sv_->mutable_cf_options.prefix_extractor.get(),
  653. &imm_iters_, &arena_);
  654. if (!read_options_.ignore_range_deletions) {
  655. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  656. sv_->mem->NewRangeTombstoneIterator(
  657. read_options_, sv_->current->version_set()->LastSequence(),
  658. false /* immutable_memtable */));
  659. range_del_agg.AddTombstones(std::move(range_del_iter));
  660. // Always return Status::OK().
  661. Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
  662. &range_del_agg);
  663. assert(temp_s.ok());
  664. }
  665. has_iter_trimmed_for_upper_bound_ = false;
  666. const auto* vstorage = sv_->current->storage_info();
  667. const auto& l0_files = vstorage->LevelFiles(0);
  668. l0_iters_.reserve(l0_files.size());
  669. for (const auto* l0 : l0_files) {
  670. if ((read_options_.iterate_upper_bound != nullptr) &&
  671. cfd_->internal_comparator().user_comparator()->Compare(
  672. l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
  673. // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
  674. // will never be interested in files with smallest key above
  675. // iterate_upper_bound, since iterate_upper_bound can't be changed.
  676. l0_iters_.push_back(nullptr);
  677. continue;
  678. }
  679. l0_iters_.push_back(cfd_->table_cache()->NewIterator(
  680. read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
  681. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  682. sv_->mutable_cf_options,
  683. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  684. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  685. /*skip_filters=*/false, /*level=*/-1,
  686. MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
  687. /*smallest_compaction_key=*/nullptr,
  688. /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
  689. }
  690. BuildLevelIterators(vstorage, sv_);
  691. current_ = nullptr;
  692. is_prev_set_ = false;
  693. UpdateChildrenPinnedItersMgr();
  694. if (!range_del_agg.IsEmpty()) {
  695. status_ = Status::NotSupported(
  696. "Range tombstones unsupported with ForwardIterator");
  697. valid_ = false;
  698. }
  699. }
  700. void ForwardIterator::RenewIterators() {
  701. SuperVersion* svnew;
  702. assert(sv_);
  703. svnew = cfd_->GetReferencedSuperVersion(db_);
  704. if (mutable_iter_ != nullptr) {
  705. DeleteIterator(mutable_iter_, true /* is_arena */);
  706. }
  707. for (auto* m : imm_iters_) {
  708. DeleteIterator(m, true /* is_arena */);
  709. }
  710. imm_iters_.clear();
  711. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
  712. svnew->GetSeqnoToTimeMapping();
  713. mutable_iter_ =
  714. svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
  715. svnew->mutable_cf_options.prefix_extractor.get(),
  716. /*for_flush=*/false);
  717. svnew->imm->AddIterators(read_options_, seqno_to_time_mapping,
  718. svnew->mutable_cf_options.prefix_extractor.get(),
  719. &imm_iters_, &arena_);
  720. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  721. kMaxSequenceNumber /* upper_bound */);
  722. if (!read_options_.ignore_range_deletions) {
  723. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  724. svnew->mem->NewRangeTombstoneIterator(
  725. read_options_, sv_->current->version_set()->LastSequence(),
  726. false /* immutable_memtable */));
  727. range_del_agg.AddTombstones(std::move(range_del_iter));
  728. // Always return Status::OK().
  729. Status temp_s = svnew->imm->AddRangeTombstoneIterators(
  730. read_options_, &arena_, &range_del_agg);
  731. assert(temp_s.ok());
  732. }
  733. const auto* vstorage = sv_->current->storage_info();
  734. const auto& l0_files = vstorage->LevelFiles(0);
  735. const auto* vstorage_new = svnew->current->storage_info();
  736. const auto& l0_files_new = vstorage_new->LevelFiles(0);
  737. size_t iold, inew;
  738. bool found;
  739. std::vector<InternalIterator*> l0_iters_new;
  740. l0_iters_new.reserve(l0_files_new.size());
  741. for (inew = 0; inew < l0_files_new.size(); inew++) {
  742. found = false;
  743. for (iold = 0; iold < l0_files.size(); iold++) {
  744. if (l0_files[iold] == l0_files_new[inew]) {
  745. found = true;
  746. break;
  747. }
  748. }
  749. if (found) {
  750. if (l0_iters_[iold] == nullptr) {
  751. l0_iters_new.push_back(nullptr);
  752. TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
  753. } else {
  754. l0_iters_new.push_back(l0_iters_[iold]);
  755. l0_iters_[iold] = nullptr;
  756. TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
  757. }
  758. continue;
  759. }
  760. l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
  761. read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
  762. *l0_files_new[inew],
  763. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  764. svnew->mutable_cf_options,
  765. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  766. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  767. /*skip_filters=*/false, /*level=*/-1,
  768. MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
  769. /*smallest_compaction_key=*/nullptr,
  770. /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
  771. }
  772. for (auto* f : l0_iters_) {
  773. DeleteIterator(f);
  774. }
  775. l0_iters_.clear();
  776. l0_iters_ = l0_iters_new;
  777. for (auto* l : level_iters_) {
  778. DeleteIterator(l);
  779. }
  780. level_iters_.clear();
  781. BuildLevelIterators(vstorage_new, svnew);
  782. current_ = nullptr;
  783. is_prev_set_ = false;
  784. SVCleanup();
  785. sv_ = svnew;
  786. UpdateChildrenPinnedItersMgr();
  787. if (!range_del_agg.IsEmpty()) {
  788. status_ = Status::NotSupported(
  789. "Range tombstones unsupported with ForwardIterator");
  790. valid_ = false;
  791. }
  792. }
  793. void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
  794. SuperVersion* sv) {
  795. level_iters_.reserve(vstorage->num_levels() - 1);
  796. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  797. const auto& level_files = vstorage->LevelFiles(level);
  798. if ((level_files.empty()) ||
  799. ((read_options_.iterate_upper_bound != nullptr) &&
  800. (user_comparator_->Compare(*read_options_.iterate_upper_bound,
  801. level_files[0]->smallest.user_key()) <
  802. 0))) {
  803. level_iters_.push_back(nullptr);
  804. if (!level_files.empty()) {
  805. has_iter_trimmed_for_upper_bound_ = true;
  806. }
  807. } else {
  808. level_iters_.push_back(new ForwardLevelIterator(
  809. cfd_, read_options_, level_files, sv->mutable_cf_options,
  810. allow_unprepared_value_));
  811. }
  812. }
  813. }
  814. void ForwardIterator::ResetIncompleteIterators() {
  815. const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
  816. for (size_t i = 0; i < l0_iters_.size(); ++i) {
  817. assert(i < l0_files.size());
  818. if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
  819. continue;
  820. }
  821. DeleteIterator(l0_iters_[i]);
  822. l0_iters_[i] = cfd_->table_cache()->NewIterator(
  823. read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
  824. *l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
  825. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  826. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  827. /*skip_filters=*/false, /*level=*/-1,
  828. MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
  829. /*smallest_compaction_key=*/nullptr,
  830. /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
  831. l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
  832. }
  833. for (auto* level_iter : level_iters_) {
  834. if (level_iter && level_iter->status().IsIncomplete()) {
  835. level_iter->Reset();
  836. }
  837. }
  838. current_ = nullptr;
  839. is_prev_set_ = false;
  840. }
  841. void ForwardIterator::UpdateCurrent() {
  842. if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
  843. current_ = nullptr;
  844. } else if (immutable_min_heap_.empty()) {
  845. current_ = mutable_iter_;
  846. } else if (!mutable_iter_->Valid()) {
  847. current_ = immutable_min_heap_.top();
  848. immutable_min_heap_.pop();
  849. } else {
  850. current_ = immutable_min_heap_.top();
  851. assert(current_ != nullptr);
  852. assert(current_->Valid());
  853. int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
  854. mutable_iter_->key(), current_->key());
  855. assert(cmp != 0);
  856. if (cmp > 0) {
  857. immutable_min_heap_.pop();
  858. } else {
  859. current_ = mutable_iter_;
  860. }
  861. }
  862. valid_ = current_ != nullptr && immutable_status_.ok();
  863. if (!status_.ok()) {
  864. status_ = Status::OK();
  865. }
  866. // Upper bound doesn't apply to the memtable iterator. We want Valid() to
  867. // return false when all iterators are over iterate_upper_bound, but can't
  868. // just set valid_ to false, as that would effectively disable the tailing
  869. // optimization (Seek() would be called on all immutable iterators regardless
  870. // of whether the target key is greater than prev_key_).
  871. current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
  872. }
  873. bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
  874. // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
  875. // such that there are no records with keys within that range in
  876. // immutable_min_heap_. Since immutable structures (SST files and immutable
  877. // memtables) can't change in this version, we don't need to do a seek if
  878. // 'target' belongs to that interval (immutable_min_heap_.top() is already
  879. // at the correct position).
  880. if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
  881. return true;
  882. }
  883. Slice prev_key = prev_key_.GetInternalKey();
  884. if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
  885. prefix_extractor_->Transform(prev_key)) != 0) {
  886. return true;
  887. }
  888. if (cfd_->internal_comparator().InternalKeyComparator::Compare(
  889. prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
  890. return true;
  891. }
  892. if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
  893. // Nothing to seek on.
  894. return false;
  895. }
  896. if (cfd_->internal_comparator().InternalKeyComparator::Compare(
  897. target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
  898. : current_->key()) > 0) {
  899. return true;
  900. }
  901. return false;
  902. }
  903. void ForwardIterator::DeleteCurrentIter() {
  904. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  905. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  906. for (size_t i = 0; i < l0.size(); ++i) {
  907. if (!l0_iters_[i]) {
  908. continue;
  909. }
  910. if (l0_iters_[i] == current_) {
  911. has_iter_trimmed_for_upper_bound_ = true;
  912. DeleteIterator(l0_iters_[i]);
  913. l0_iters_[i] = nullptr;
  914. return;
  915. }
  916. }
  917. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  918. if (level_iters_[level - 1] == nullptr) {
  919. continue;
  920. }
  921. if (level_iters_[level - 1] == current_) {
  922. has_iter_trimmed_for_upper_bound_ = true;
  923. DeleteIterator(level_iters_[level - 1]);
  924. level_iters_[level - 1] = nullptr;
  925. }
  926. }
  927. }
  928. bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
  929. int* pnum_iters) {
  930. bool retval = false;
  931. int deleted_iters = 0;
  932. int num_iters = 0;
  933. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  934. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  935. for (size_t i = 0; i < l0.size(); ++i) {
  936. if (!l0_iters_[i]) {
  937. retval = true;
  938. deleted_iters++;
  939. } else {
  940. num_iters++;
  941. }
  942. }
  943. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  944. if ((level_iters_[level - 1] == nullptr) &&
  945. (!vstorage->LevelFiles(level).empty())) {
  946. retval = true;
  947. deleted_iters++;
  948. } else if (!vstorage->LevelFiles(level).empty()) {
  949. num_iters++;
  950. }
  951. }
  952. if ((!retval) && num_iters <= 1) {
  953. retval = true;
  954. }
  955. if (pdeleted_iters) {
  956. *pdeleted_iters = deleted_iters;
  957. }
  958. if (pnum_iters) {
  959. *pnum_iters = num_iters;
  960. }
  961. return retval;
  962. }
  963. uint32_t ForwardIterator::FindFileInRange(
  964. const std::vector<FileMetaData*>& files, const Slice& internal_key,
  965. uint32_t left, uint32_t right) {
  966. auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
  967. return cfd_->internal_comparator().InternalKeyComparator::Compare(
  968. f->largest.Encode(), k) < 0;
  969. };
  970. const auto& b = files.begin();
  971. return static_cast<uint32_t>(
  972. std::lower_bound(b + left, b + right, internal_key, cmp) - b);
  973. }
  974. void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
  975. if (iter == nullptr) {
  976. return;
  977. }
  978. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  979. pinned_iters_mgr_->PinIterator(iter, is_arena);
  980. } else {
  981. if (is_arena) {
  982. iter->~InternalIterator();
  983. } else {
  984. delete iter;
  985. }
  986. }
  987. }
  988. } // namespace ROCKSDB_NAMESPACE