forward_iterator.cc 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "db/forward_iterator.h"
  7. #include <limits>
  8. #include <string>
  9. #include <utility>
  10. #include "db/column_family.h"
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/db_iter.h"
  13. #include "db/dbformat.h"
  14. #include "db/job_context.h"
  15. #include "db/range_del_aggregator.h"
  16. #include "db/range_tombstone_fragmenter.h"
  17. #include "rocksdb/env.h"
  18. #include "rocksdb/slice.h"
  19. #include "rocksdb/slice_transform.h"
  20. #include "table/merging_iterator.h"
  21. #include "test_util/sync_point.h"
  22. #include "util/string_util.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. // Usage:
  25. // ForwardLevelIterator iter;
  26. // iter.SetFileIndex(file_index);
  27. // iter.Seek(target); // or iter.SeekToFirst();
  28. // iter.Next()
  29. class ForwardLevelIterator : public InternalIterator {
  30. public:
  31. ForwardLevelIterator(const ColumnFamilyData* const cfd,
  32. const ReadOptions& read_options,
  33. const std::vector<FileMetaData*>& files,
  34. const SliceTransform* prefix_extractor)
  35. : cfd_(cfd),
  36. read_options_(read_options),
  37. files_(files),
  38. valid_(false),
  39. file_index_(std::numeric_limits<uint32_t>::max()),
  40. file_iter_(nullptr),
  41. pinned_iters_mgr_(nullptr),
  42. prefix_extractor_(prefix_extractor) {}
  43. ~ForwardLevelIterator() override {
  44. // Reset current pointer
  45. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  46. pinned_iters_mgr_->PinIterator(file_iter_);
  47. } else {
  48. delete file_iter_;
  49. }
  50. }
  51. void SetFileIndex(uint32_t file_index) {
  52. assert(file_index < files_.size());
  53. status_ = Status::OK();
  54. if (file_index != file_index_) {
  55. file_index_ = file_index;
  56. Reset();
  57. }
  58. }
  59. void Reset() {
  60. assert(file_index_ < files_.size());
  61. // Reset current pointer
  62. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  63. pinned_iters_mgr_->PinIterator(file_iter_);
  64. } else {
  65. delete file_iter_;
  66. }
  67. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  68. kMaxSequenceNumber /* upper_bound */);
  69. file_iter_ = cfd_->table_cache()->NewIterator(
  70. read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
  71. *files_[file_index_],
  72. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  73. prefix_extractor_, /*table_reader_ptr=*/nullptr,
  74. /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
  75. /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
  76. /*smallest_compaction_key=*/nullptr,
  77. /*largest_compaction_key=*/nullptr);
  78. file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  79. valid_ = false;
  80. if (!range_del_agg.IsEmpty()) {
  81. status_ = Status::NotSupported(
  82. "Range tombstones unsupported with ForwardIterator");
  83. }
  84. }
  85. void SeekToLast() override {
  86. status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
  87. valid_ = false;
  88. }
  89. void Prev() override {
  90. status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
  91. valid_ = false;
  92. }
  93. bool Valid() const override {
  94. return valid_;
  95. }
  96. void SeekToFirst() override {
  97. assert(file_iter_ != nullptr);
  98. if (!status_.ok()) {
  99. assert(!valid_);
  100. return;
  101. }
  102. file_iter_->SeekToFirst();
  103. valid_ = file_iter_->Valid();
  104. }
  105. void Seek(const Slice& internal_key) override {
  106. assert(file_iter_ != nullptr);
  107. // This deviates from the usual convention for InternalIterator::Seek() in
  108. // that it doesn't discard pre-existing error status. That's because this
  109. // Seek() is only supposed to be called immediately after SetFileIndex()
  110. // (which discards pre-existing error status), and SetFileIndex() may set
  111. // an error status, which we shouldn't discard.
  112. if (!status_.ok()) {
  113. assert(!valid_);
  114. return;
  115. }
  116. file_iter_->Seek(internal_key);
  117. valid_ = file_iter_->Valid();
  118. }
  119. void SeekForPrev(const Slice& /*internal_key*/) override {
  120. status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
  121. valid_ = false;
  122. }
  123. void Next() override {
  124. assert(valid_);
  125. file_iter_->Next();
  126. for (;;) {
  127. valid_ = file_iter_->Valid();
  128. if (!file_iter_->status().ok()) {
  129. assert(!valid_);
  130. return;
  131. }
  132. if (valid_) {
  133. return;
  134. }
  135. if (file_index_ + 1 >= files_.size()) {
  136. valid_ = false;
  137. return;
  138. }
  139. SetFileIndex(file_index_ + 1);
  140. if (!status_.ok()) {
  141. assert(!valid_);
  142. return;
  143. }
  144. file_iter_->SeekToFirst();
  145. }
  146. }
  147. Slice key() const override {
  148. assert(valid_);
  149. return file_iter_->key();
  150. }
  151. Slice value() const override {
  152. assert(valid_);
  153. return file_iter_->value();
  154. }
  155. Status status() const override {
  156. if (!status_.ok()) {
  157. return status_;
  158. } else if (file_iter_) {
  159. return file_iter_->status();
  160. }
  161. return Status::OK();
  162. }
  163. bool IsKeyPinned() const override {
  164. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  165. file_iter_->IsKeyPinned();
  166. }
  167. bool IsValuePinned() const override {
  168. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  169. file_iter_->IsValuePinned();
  170. }
  171. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  172. pinned_iters_mgr_ = pinned_iters_mgr;
  173. if (file_iter_) {
  174. file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  175. }
  176. }
  177. private:
  178. const ColumnFamilyData* const cfd_;
  179. const ReadOptions& read_options_;
  180. const std::vector<FileMetaData*>& files_;
  181. bool valid_;
  182. uint32_t file_index_;
  183. Status status_;
  184. InternalIterator* file_iter_;
  185. PinnedIteratorsManager* pinned_iters_mgr_;
  186. const SliceTransform* prefix_extractor_;
  187. };
  188. ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
  189. ColumnFamilyData* cfd,
  190. SuperVersion* current_sv)
  191. : db_(db),
  192. read_options_(read_options),
  193. cfd_(cfd),
  194. prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
  195. user_comparator_(cfd->user_comparator()),
  196. immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
  197. sv_(current_sv),
  198. mutable_iter_(nullptr),
  199. current_(nullptr),
  200. valid_(false),
  201. status_(Status::OK()),
  202. immutable_status_(Status::OK()),
  203. has_iter_trimmed_for_upper_bound_(false),
  204. current_over_upper_bound_(false),
  205. is_prev_set_(false),
  206. is_prev_inclusive_(false),
  207. pinned_iters_mgr_(nullptr) {
  208. if (sv_) {
  209. RebuildIterators(false);
  210. }
  211. }
  212. ForwardIterator::~ForwardIterator() {
  213. Cleanup(true);
  214. }
  215. void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
  216. bool background_purge_on_iterator_cleanup) {
  217. if (sv->Unref()) {
  218. // Job id == 0 means that this is not our background process, but rather
  219. // user thread
  220. JobContext job_context(0);
  221. db->mutex_.Lock();
  222. sv->Cleanup();
  223. db->FindObsoleteFiles(&job_context, false, true);
  224. if (background_purge_on_iterator_cleanup) {
  225. db->ScheduleBgLogWriterClose(&job_context);
  226. db->AddSuperVersionsToFreeQueue(sv);
  227. db->SchedulePurge();
  228. }
  229. db->mutex_.Unlock();
  230. if (!background_purge_on_iterator_cleanup) {
  231. delete sv;
  232. }
  233. if (job_context.HaveSomethingToDelete()) {
  234. db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
  235. }
  236. job_context.Clean();
  237. }
  238. }
  239. namespace {
  240. struct SVCleanupParams {
  241. DBImpl* db;
  242. SuperVersion* sv;
  243. bool background_purge_on_iterator_cleanup;
  244. };
  245. }
  246. // Used in PinnedIteratorsManager to release pinned SuperVersion
  247. void ForwardIterator::DeferredSVCleanup(void* arg) {
  248. auto d = reinterpret_cast<SVCleanupParams*>(arg);
  249. ForwardIterator::SVCleanup(
  250. d->db, d->sv, d->background_purge_on_iterator_cleanup);
  251. delete d;
  252. }
  253. void ForwardIterator::SVCleanup() {
  254. if (sv_ == nullptr) {
  255. return;
  256. }
  257. bool background_purge =
  258. read_options_.background_purge_on_iterator_cleanup ||
  259. db_->immutable_db_options().avoid_unnecessary_blocking_io;
  260. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  261. // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
  262. // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
  263. // The slices may point into some memtables owned by sv_, so we need to keep
  264. // sv_ referenced until pinned_iters_mgr_ unpins everything.
  265. auto p = new SVCleanupParams{db_, sv_, background_purge};
  266. pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
  267. } else {
  268. SVCleanup(db_, sv_, background_purge);
  269. }
  270. }
  271. void ForwardIterator::Cleanup(bool release_sv) {
  272. if (mutable_iter_ != nullptr) {
  273. DeleteIterator(mutable_iter_, true /* is_arena */);
  274. }
  275. for (auto* m : imm_iters_) {
  276. DeleteIterator(m, true /* is_arena */);
  277. }
  278. imm_iters_.clear();
  279. for (auto* f : l0_iters_) {
  280. DeleteIterator(f);
  281. }
  282. l0_iters_.clear();
  283. for (auto* l : level_iters_) {
  284. DeleteIterator(l);
  285. }
  286. level_iters_.clear();
  287. if (release_sv) {
  288. SVCleanup();
  289. }
  290. }
  291. bool ForwardIterator::Valid() const {
  292. // See UpdateCurrent().
  293. return valid_ ? !current_over_upper_bound_ : false;
  294. }
  295. void ForwardIterator::SeekToFirst() {
  296. if (sv_ == nullptr) {
  297. RebuildIterators(true);
  298. } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
  299. RenewIterators();
  300. } else if (immutable_status_.IsIncomplete()) {
  301. ResetIncompleteIterators();
  302. }
  303. SeekInternal(Slice(), true);
  304. }
  305. bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
  306. return !(read_options_.iterate_upper_bound == nullptr ||
  307. cfd_->internal_comparator().user_comparator()->Compare(
  308. ExtractUserKey(internal_key),
  309. *read_options_.iterate_upper_bound) < 0);
  310. }
  311. void ForwardIterator::Seek(const Slice& internal_key) {
  312. if (sv_ == nullptr) {
  313. RebuildIterators(true);
  314. } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
  315. RenewIterators();
  316. } else if (immutable_status_.IsIncomplete()) {
  317. ResetIncompleteIterators();
  318. }
  319. SeekInternal(internal_key, false);
  320. }
  321. void ForwardIterator::SeekInternal(const Slice& internal_key,
  322. bool seek_to_first) {
  323. assert(mutable_iter_);
  324. // mutable
  325. seek_to_first ? mutable_iter_->SeekToFirst() :
  326. mutable_iter_->Seek(internal_key);
  327. // immutable
  328. // TODO(ljin): NeedToSeekImmutable has negative impact on performance
  329. // if it turns to need to seek immutable often. We probably want to have
  330. // an option to turn it off.
  331. if (seek_to_first || NeedToSeekImmutable(internal_key)) {
  332. immutable_status_ = Status::OK();
  333. if (has_iter_trimmed_for_upper_bound_ &&
  334. (
  335. // prev_ is not set yet
  336. is_prev_set_ == false ||
  337. // We are doing SeekToFirst() and internal_key.size() = 0
  338. seek_to_first ||
  339. // prev_key_ > internal_key
  340. cfd_->internal_comparator().InternalKeyComparator::Compare(
  341. prev_key_.GetInternalKey(), internal_key) > 0)) {
  342. // Some iterators are trimmed. Need to rebuild.
  343. RebuildIterators(true);
  344. // Already seeked mutable iter, so seek again
  345. seek_to_first ? mutable_iter_->SeekToFirst()
  346. : mutable_iter_->Seek(internal_key);
  347. }
  348. {
  349. auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
  350. immutable_min_heap_.swap(tmp);
  351. }
  352. for (size_t i = 0; i < imm_iters_.size(); i++) {
  353. auto* m = imm_iters_[i];
  354. seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
  355. if (!m->status().ok()) {
  356. immutable_status_ = m->status();
  357. } else if (m->Valid()) {
  358. immutable_min_heap_.push(m);
  359. }
  360. }
  361. Slice target_user_key;
  362. if (!seek_to_first) {
  363. target_user_key = ExtractUserKey(internal_key);
  364. }
  365. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  366. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  367. for (size_t i = 0; i < l0.size(); ++i) {
  368. if (!l0_iters_[i]) {
  369. continue;
  370. }
  371. if (seek_to_first) {
  372. l0_iters_[i]->SeekToFirst();
  373. } else {
  374. // If the target key passes over the larget key, we are sure Next()
  375. // won't go over this file.
  376. if (user_comparator_->Compare(target_user_key,
  377. l0[i]->largest.user_key()) > 0) {
  378. if (read_options_.iterate_upper_bound != nullptr) {
  379. has_iter_trimmed_for_upper_bound_ = true;
  380. DeleteIterator(l0_iters_[i]);
  381. l0_iters_[i] = nullptr;
  382. }
  383. continue;
  384. }
  385. l0_iters_[i]->Seek(internal_key);
  386. }
  387. if (!l0_iters_[i]->status().ok()) {
  388. immutable_status_ = l0_iters_[i]->status();
  389. } else if (l0_iters_[i]->Valid() &&
  390. !IsOverUpperBound(l0_iters_[i]->key())) {
  391. immutable_min_heap_.push(l0_iters_[i]);
  392. } else {
  393. has_iter_trimmed_for_upper_bound_ = true;
  394. DeleteIterator(l0_iters_[i]);
  395. l0_iters_[i] = nullptr;
  396. }
  397. }
  398. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  399. const std::vector<FileMetaData*>& level_files =
  400. vstorage->LevelFiles(level);
  401. if (level_files.empty()) {
  402. continue;
  403. }
  404. if (level_iters_[level - 1] == nullptr) {
  405. continue;
  406. }
  407. uint32_t f_idx = 0;
  408. if (!seek_to_first) {
  409. f_idx = FindFileInRange(level_files, internal_key, 0,
  410. static_cast<uint32_t>(level_files.size()));
  411. }
  412. // Seek
  413. if (f_idx < level_files.size()) {
  414. level_iters_[level - 1]->SetFileIndex(f_idx);
  415. seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
  416. level_iters_[level - 1]->Seek(internal_key);
  417. if (!level_iters_[level - 1]->status().ok()) {
  418. immutable_status_ = level_iters_[level - 1]->status();
  419. } else if (level_iters_[level - 1]->Valid() &&
  420. !IsOverUpperBound(level_iters_[level - 1]->key())) {
  421. immutable_min_heap_.push(level_iters_[level - 1]);
  422. } else {
  423. // Nothing in this level is interesting. Remove.
  424. has_iter_trimmed_for_upper_bound_ = true;
  425. DeleteIterator(level_iters_[level - 1]);
  426. level_iters_[level - 1] = nullptr;
  427. }
  428. }
  429. }
  430. if (seek_to_first) {
  431. is_prev_set_ = false;
  432. } else {
  433. prev_key_.SetInternalKey(internal_key);
  434. is_prev_set_ = true;
  435. is_prev_inclusive_ = true;
  436. }
  437. TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
  438. } else if (current_ && current_ != mutable_iter_) {
  439. // current_ is one of immutable iterators, push it back to the heap
  440. immutable_min_heap_.push(current_);
  441. }
  442. UpdateCurrent();
  443. TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
  444. }
  445. void ForwardIterator::Next() {
  446. assert(valid_);
  447. bool update_prev_key = false;
  448. if (sv_ == nullptr ||
  449. sv_->version_number != cfd_->GetSuperVersionNumber()) {
  450. std::string current_key = key().ToString();
  451. Slice old_key(current_key.data(), current_key.size());
  452. if (sv_ == nullptr) {
  453. RebuildIterators(true);
  454. } else {
  455. RenewIterators();
  456. }
  457. SeekInternal(old_key, false);
  458. if (!valid_ || key().compare(old_key) != 0) {
  459. return;
  460. }
  461. } else if (current_ != mutable_iter_) {
  462. // It is going to advance immutable iterator
  463. if (is_prev_set_ && prefix_extractor_) {
  464. // advance prev_key_ to current_ only if they share the same prefix
  465. update_prev_key =
  466. prefix_extractor_->Transform(prev_key_.GetUserKey())
  467. .compare(prefix_extractor_->Transform(current_->key())) == 0;
  468. } else {
  469. update_prev_key = true;
  470. }
  471. if (update_prev_key) {
  472. prev_key_.SetInternalKey(current_->key());
  473. is_prev_set_ = true;
  474. is_prev_inclusive_ = false;
  475. }
  476. }
  477. current_->Next();
  478. if (current_ != mutable_iter_) {
  479. if (!current_->status().ok()) {
  480. immutable_status_ = current_->status();
  481. } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
  482. immutable_min_heap_.push(current_);
  483. } else {
  484. if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
  485. // remove the current iterator
  486. DeleteCurrentIter();
  487. current_ = nullptr;
  488. }
  489. if (update_prev_key) {
  490. mutable_iter_->Seek(prev_key_.GetInternalKey());
  491. }
  492. }
  493. }
  494. UpdateCurrent();
  495. TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
  496. }
  497. Slice ForwardIterator::key() const {
  498. assert(valid_);
  499. return current_->key();
  500. }
  501. Slice ForwardIterator::value() const {
  502. assert(valid_);
  503. return current_->value();
  504. }
  505. Status ForwardIterator::status() const {
  506. if (!status_.ok()) {
  507. return status_;
  508. } else if (!mutable_iter_->status().ok()) {
  509. return mutable_iter_->status();
  510. }
  511. return immutable_status_;
  512. }
  513. Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
  514. assert(prop != nullptr);
  515. if (prop_name == "rocksdb.iterator.super-version-number") {
  516. *prop = ToString(sv_->version_number);
  517. return Status::OK();
  518. }
  519. return Status::InvalidArgument();
  520. }
  521. void ForwardIterator::SetPinnedItersMgr(
  522. PinnedIteratorsManager* pinned_iters_mgr) {
  523. pinned_iters_mgr_ = pinned_iters_mgr;
  524. UpdateChildrenPinnedItersMgr();
  525. }
  526. void ForwardIterator::UpdateChildrenPinnedItersMgr() {
  527. // Set PinnedIteratorsManager for mutable memtable iterator.
  528. if (mutable_iter_) {
  529. mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
  530. }
  531. // Set PinnedIteratorsManager for immutable memtable iterators.
  532. for (InternalIterator* child_iter : imm_iters_) {
  533. if (child_iter) {
  534. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  535. }
  536. }
  537. // Set PinnedIteratorsManager for L0 files iterators.
  538. for (InternalIterator* child_iter : l0_iters_) {
  539. if (child_iter) {
  540. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  541. }
  542. }
  543. // Set PinnedIteratorsManager for L1+ levels iterators.
  544. for (ForwardLevelIterator* child_iter : level_iters_) {
  545. if (child_iter) {
  546. child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
  547. }
  548. }
  549. }
  550. bool ForwardIterator::IsKeyPinned() const {
  551. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  552. current_->IsKeyPinned();
  553. }
  554. bool ForwardIterator::IsValuePinned() const {
  555. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  556. current_->IsValuePinned();
  557. }
  558. void ForwardIterator::RebuildIterators(bool refresh_sv) {
  559. // Clean up
  560. Cleanup(refresh_sv);
  561. if (refresh_sv) {
  562. // New
  563. sv_ = cfd_->GetReferencedSuperVersion(db_);
  564. }
  565. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  566. kMaxSequenceNumber /* upper_bound */);
  567. mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
  568. sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
  569. if (!read_options_.ignore_range_deletions) {
  570. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  571. sv_->mem->NewRangeTombstoneIterator(
  572. read_options_, sv_->current->version_set()->LastSequence()));
  573. range_del_agg.AddTombstones(std::move(range_del_iter));
  574. sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
  575. &range_del_agg);
  576. }
  577. has_iter_trimmed_for_upper_bound_ = false;
  578. const auto* vstorage = sv_->current->storage_info();
  579. const auto& l0_files = vstorage->LevelFiles(0);
  580. l0_iters_.reserve(l0_files.size());
  581. for (const auto* l0 : l0_files) {
  582. if ((read_options_.iterate_upper_bound != nullptr) &&
  583. cfd_->internal_comparator().user_comparator()->Compare(
  584. l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
  585. // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
  586. // will never be interested in files with smallest key above
  587. // iterate_upper_bound, since iterate_upper_bound can't be changed.
  588. l0_iters_.push_back(nullptr);
  589. continue;
  590. }
  591. l0_iters_.push_back(cfd_->table_cache()->NewIterator(
  592. read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
  593. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  594. sv_->mutable_cf_options.prefix_extractor.get(),
  595. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  596. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  597. /*skip_filters=*/false, /*level=*/-1,
  598. /*smallest_compaction_key=*/nullptr,
  599. /*largest_compaction_key=*/nullptr));
  600. }
  601. BuildLevelIterators(vstorage);
  602. current_ = nullptr;
  603. is_prev_set_ = false;
  604. UpdateChildrenPinnedItersMgr();
  605. if (!range_del_agg.IsEmpty()) {
  606. status_ = Status::NotSupported(
  607. "Range tombstones unsupported with ForwardIterator");
  608. valid_ = false;
  609. }
  610. }
  611. void ForwardIterator::RenewIterators() {
  612. SuperVersion* svnew;
  613. assert(sv_);
  614. svnew = cfd_->GetReferencedSuperVersion(db_);
  615. if (mutable_iter_ != nullptr) {
  616. DeleteIterator(mutable_iter_, true /* is_arena */);
  617. }
  618. for (auto* m : imm_iters_) {
  619. DeleteIterator(m, true /* is_arena */);
  620. }
  621. imm_iters_.clear();
  622. mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
  623. svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
  624. ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
  625. kMaxSequenceNumber /* upper_bound */);
  626. if (!read_options_.ignore_range_deletions) {
  627. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  628. svnew->mem->NewRangeTombstoneIterator(
  629. read_options_, sv_->current->version_set()->LastSequence()));
  630. range_del_agg.AddTombstones(std::move(range_del_iter));
  631. svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
  632. &range_del_agg);
  633. }
  634. const auto* vstorage = sv_->current->storage_info();
  635. const auto& l0_files = vstorage->LevelFiles(0);
  636. const auto* vstorage_new = svnew->current->storage_info();
  637. const auto& l0_files_new = vstorage_new->LevelFiles(0);
  638. size_t iold, inew;
  639. bool found;
  640. std::vector<InternalIterator*> l0_iters_new;
  641. l0_iters_new.reserve(l0_files_new.size());
  642. for (inew = 0; inew < l0_files_new.size(); inew++) {
  643. found = false;
  644. for (iold = 0; iold < l0_files.size(); iold++) {
  645. if (l0_files[iold] == l0_files_new[inew]) {
  646. found = true;
  647. break;
  648. }
  649. }
  650. if (found) {
  651. if (l0_iters_[iold] == nullptr) {
  652. l0_iters_new.push_back(nullptr);
  653. TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
  654. } else {
  655. l0_iters_new.push_back(l0_iters_[iold]);
  656. l0_iters_[iold] = nullptr;
  657. TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
  658. }
  659. continue;
  660. }
  661. l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
  662. read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
  663. *l0_files_new[inew],
  664. read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
  665. svnew->mutable_cf_options.prefix_extractor.get(),
  666. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  667. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  668. /*skip_filters=*/false, /*level=*/-1,
  669. /*smallest_compaction_key=*/nullptr,
  670. /*largest_compaction_key=*/nullptr));
  671. }
  672. for (auto* f : l0_iters_) {
  673. DeleteIterator(f);
  674. }
  675. l0_iters_.clear();
  676. l0_iters_ = l0_iters_new;
  677. for (auto* l : level_iters_) {
  678. DeleteIterator(l);
  679. }
  680. level_iters_.clear();
  681. BuildLevelIterators(vstorage_new);
  682. current_ = nullptr;
  683. is_prev_set_ = false;
  684. SVCleanup();
  685. sv_ = svnew;
  686. UpdateChildrenPinnedItersMgr();
  687. if (!range_del_agg.IsEmpty()) {
  688. status_ = Status::NotSupported(
  689. "Range tombstones unsupported with ForwardIterator");
  690. valid_ = false;
  691. }
  692. }
  693. void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
  694. level_iters_.reserve(vstorage->num_levels() - 1);
  695. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  696. const auto& level_files = vstorage->LevelFiles(level);
  697. if ((level_files.empty()) ||
  698. ((read_options_.iterate_upper_bound != nullptr) &&
  699. (user_comparator_->Compare(*read_options_.iterate_upper_bound,
  700. level_files[0]->smallest.user_key()) <
  701. 0))) {
  702. level_iters_.push_back(nullptr);
  703. if (!level_files.empty()) {
  704. has_iter_trimmed_for_upper_bound_ = true;
  705. }
  706. } else {
  707. level_iters_.push_back(new ForwardLevelIterator(
  708. cfd_, read_options_, level_files,
  709. sv_->mutable_cf_options.prefix_extractor.get()));
  710. }
  711. }
  712. }
  713. void ForwardIterator::ResetIncompleteIterators() {
  714. const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
  715. for (size_t i = 0; i < l0_iters_.size(); ++i) {
  716. assert(i < l0_files.size());
  717. if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
  718. continue;
  719. }
  720. DeleteIterator(l0_iters_[i]);
  721. l0_iters_[i] = cfd_->table_cache()->NewIterator(
  722. read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
  723. *l0_files[i], /*range_del_agg=*/nullptr,
  724. sv_->mutable_cf_options.prefix_extractor.get(),
  725. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  726. TableReaderCaller::kUserIterator, /*arena=*/nullptr,
  727. /*skip_filters=*/false, /*level=*/-1,
  728. /*smallest_compaction_key=*/nullptr,
  729. /*largest_compaction_key=*/nullptr);
  730. l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
  731. }
  732. for (auto* level_iter : level_iters_) {
  733. if (level_iter && level_iter->status().IsIncomplete()) {
  734. level_iter->Reset();
  735. }
  736. }
  737. current_ = nullptr;
  738. is_prev_set_ = false;
  739. }
  740. void ForwardIterator::UpdateCurrent() {
  741. if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
  742. current_ = nullptr;
  743. } else if (immutable_min_heap_.empty()) {
  744. current_ = mutable_iter_;
  745. } else if (!mutable_iter_->Valid()) {
  746. current_ = immutable_min_heap_.top();
  747. immutable_min_heap_.pop();
  748. } else {
  749. current_ = immutable_min_heap_.top();
  750. assert(current_ != nullptr);
  751. assert(current_->Valid());
  752. int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
  753. mutable_iter_->key(), current_->key());
  754. assert(cmp != 0);
  755. if (cmp > 0) {
  756. immutable_min_heap_.pop();
  757. } else {
  758. current_ = mutable_iter_;
  759. }
  760. }
  761. valid_ = current_ != nullptr && immutable_status_.ok();
  762. if (!status_.ok()) {
  763. status_ = Status::OK();
  764. }
  765. // Upper bound doesn't apply to the memtable iterator. We want Valid() to
  766. // return false when all iterators are over iterate_upper_bound, but can't
  767. // just set valid_ to false, as that would effectively disable the tailing
  768. // optimization (Seek() would be called on all immutable iterators regardless
  769. // of whether the target key is greater than prev_key_).
  770. current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
  771. }
  772. bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
  773. // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
  774. // such that there are no records with keys within that range in
  775. // immutable_min_heap_. Since immutable structures (SST files and immutable
  776. // memtables) can't change in this version, we don't need to do a seek if
  777. // 'target' belongs to that interval (immutable_min_heap_.top() is already
  778. // at the correct position).
  779. if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
  780. return true;
  781. }
  782. Slice prev_key = prev_key_.GetInternalKey();
  783. if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
  784. prefix_extractor_->Transform(prev_key)) != 0) {
  785. return true;
  786. }
  787. if (cfd_->internal_comparator().InternalKeyComparator::Compare(
  788. prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
  789. return true;
  790. }
  791. if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
  792. // Nothing to seek on.
  793. return false;
  794. }
  795. if (cfd_->internal_comparator().InternalKeyComparator::Compare(
  796. target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
  797. : current_->key()) > 0) {
  798. return true;
  799. }
  800. return false;
  801. }
  802. void ForwardIterator::DeleteCurrentIter() {
  803. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  804. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  805. for (size_t i = 0; i < l0.size(); ++i) {
  806. if (!l0_iters_[i]) {
  807. continue;
  808. }
  809. if (l0_iters_[i] == current_) {
  810. has_iter_trimmed_for_upper_bound_ = true;
  811. DeleteIterator(l0_iters_[i]);
  812. l0_iters_[i] = nullptr;
  813. return;
  814. }
  815. }
  816. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  817. if (level_iters_[level - 1] == nullptr) {
  818. continue;
  819. }
  820. if (level_iters_[level - 1] == current_) {
  821. has_iter_trimmed_for_upper_bound_ = true;
  822. DeleteIterator(level_iters_[level - 1]);
  823. level_iters_[level - 1] = nullptr;
  824. }
  825. }
  826. }
  827. bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
  828. int* pnum_iters) {
  829. bool retval = false;
  830. int deleted_iters = 0;
  831. int num_iters = 0;
  832. const VersionStorageInfo* vstorage = sv_->current->storage_info();
  833. const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
  834. for (size_t i = 0; i < l0.size(); ++i) {
  835. if (!l0_iters_[i]) {
  836. retval = true;
  837. deleted_iters++;
  838. } else {
  839. num_iters++;
  840. }
  841. }
  842. for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
  843. if ((level_iters_[level - 1] == nullptr) &&
  844. (!vstorage->LevelFiles(level).empty())) {
  845. retval = true;
  846. deleted_iters++;
  847. } else if (!vstorage->LevelFiles(level).empty()) {
  848. num_iters++;
  849. }
  850. }
  851. if ((!retval) && num_iters <= 1) {
  852. retval = true;
  853. }
  854. if (pdeleted_iters) {
  855. *pdeleted_iters = deleted_iters;
  856. }
  857. if (pnum_iters) {
  858. *pnum_iters = num_iters;
  859. }
  860. return retval;
  861. }
  862. uint32_t ForwardIterator::FindFileInRange(
  863. const std::vector<FileMetaData*>& files, const Slice& internal_key,
  864. uint32_t left, uint32_t right) {
  865. auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
  866. return cfd_->internal_comparator().InternalKeyComparator::Compare(
  867. f->largest.Encode(), key) < 0;
  868. };
  869. const auto &b = files.begin();
  870. return static_cast<uint32_t>(std::lower_bound(b + left,
  871. b + right, internal_key, cmp) - b);
  872. }
  873. void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
  874. if (iter == nullptr) {
  875. return;
  876. }
  877. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  878. pinned_iters_mgr_->PinIterator(iter, is_arena);
  879. } else {
  880. if (is_arena) {
  881. iter->~InternalIterator();
  882. } else {
  883. delete iter;
  884. }
  885. }
  886. }
  887. } // namespace ROCKSDB_NAMESPACE
  888. #endif // ROCKSDB_LITE