db_iter.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <cstdint>
  11. #include <string>
  12. #include "db/db_impl/db_impl.h"
  13. #include "memory/arena.h"
  14. #include "options/cf_options.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/iterator.h"
  17. #include "rocksdb/wide_columns.h"
  18. #include "table/iterator_wrapper.h"
  19. #include "util/autovector.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class Version;
  22. // This file declares the factory functions of DBIter, in its original form
  23. // or a wrapped form with class ArenaWrappedDBIter, which is defined here.
  24. // Class DBIter, which is declared and implemented inside db_iter.cc, is
  25. // an iterator that converts internal keys (yielded by an InternalIterator)
  26. // that were live at the specified sequence number into appropriate user
  27. // keys.
  28. // Each internal key consists of a user key, a sequence number, and a value
  29. // type. DBIter deals with multiple key versions, tombstones, merge operands,
  30. // etc, and exposes an Iterator.
  31. // For example, DBIter may wrap following InternalIterator:
  32. // user key: AAA value: v3 seqno: 100 type: Put
  33. // user key: AAA value: v2 seqno: 97 type: Put
  34. // user key: AAA value: v1 seqno: 95 type: Put
  35. // user key: BBB value: v1 seqno: 90 type: Put
  36. // user key: BBC value: N/A seqno: 98 type: Delete
  37. // user key: BBC value: v1 seqno: 95 type: Put
  38. // If the snapshot passed in is 102, then the DBIter is expected to
  39. // expose the following iterator:
  40. // key: AAA value: v3
  41. // key: BBB value: v1
  42. // If the snapshot passed in is 96, then it should expose:
  43. // key: AAA value: v1
  44. // key: BBB value: v1
  45. // key: BBC value: v1
  46. //
  47. // Memtables and sstables that make the DB representation contain
  48. // (userkey,seq,type) => uservalue entries. DBIter
  49. // combines multiple entries for the same userkey found in the DB
  50. // representation into a single entry while accounting for sequence
  51. // numbers, deletion markers, overwrites, etc.
  52. class DBIter final : public Iterator {
  53. public:
  54. // Return a new DBIter that reads from `internal_iter` at the specified
  55. // `sequence` number.
  56. //
  57. // @param active_mem Pointer to the active memtable that `internal_iter`
  58. // is reading from. If not null, the memtable can be marked for flush
  59. // according to options mutable_cf_options.memtable_op_scan_flush_trigger
  60. // and mutable_cf_options.memtable_avg_op_scan_flush_trigger.
  61. // @param arena_mode If true, the DBIter will be allocated from the arena.
  62. static DBIter* NewIter(Env* env, const ReadOptions& read_options,
  63. const ImmutableOptions& ioptions,
  64. const MutableCFOptions& mutable_cf_options,
  65. const Comparator* user_key_comparator,
  66. InternalIterator* internal_iter,
  67. const Version* version, const SequenceNumber& sequence,
  68. ReadCallback* read_callback,
  69. ReadOnlyMemTable* active_mem,
  70. ColumnFamilyHandleImpl* cfh = nullptr,
  71. bool expose_blob_index = false,
  72. Arena* arena = nullptr) {
  73. void* mem = arena ? arena->AllocateAligned(sizeof(DBIter))
  74. : operator new(sizeof(DBIter));
  75. DBIter* db_iter = new (mem)
  76. DBIter(env, read_options, ioptions, mutable_cf_options,
  77. user_key_comparator, internal_iter, version, sequence, arena,
  78. read_callback, cfh, expose_blob_index, active_mem);
  79. return db_iter;
  80. }
  81. // The following is grossly complicated. TODO: clean it up
  82. // Which direction is the iterator currently moving?
  83. // (1) When moving forward:
  84. // (1a) if current_entry_is_merged_ = false, the internal iterator is
  85. // positioned at the exact entry that yields this->key(), this->value()
  86. // (1b) if current_entry_is_merged_ = true, the internal iterator is
  87. // positioned immediately after the last entry that contributed to the
  88. // current this->value(). That entry may or may not have key equal to
  89. // this->key().
  90. // (2) When moving backwards, the internal iterator is positioned
  91. // just before all entries whose user key == this->key().
  92. enum Direction : uint8_t { kForward, kReverse };
  93. // LocalStatistics contain Statistics counters that will be aggregated per
  94. // each iterator instance and then will be sent to the global statistics when
  95. // the iterator is destroyed.
  96. //
  97. // The purpose of this approach is to avoid perf regression happening
  98. // when multiple threads bump the atomic counters from a DBIter::Next().
  99. struct LocalStatistics {
  100. explicit LocalStatistics() { ResetCounters(); }
  101. void ResetCounters() {
  102. next_count_ = 0;
  103. next_found_count_ = 0;
  104. prev_count_ = 0;
  105. prev_found_count_ = 0;
  106. bytes_read_ = 0;
  107. skip_count_ = 0;
  108. }
  109. void BumpGlobalStatistics(Statistics* global_statistics) {
  110. RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
  111. RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
  112. RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
  113. RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
  114. RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
  115. RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
  116. PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
  117. ResetCounters();
  118. }
  119. // Map to Tickers::NUMBER_DB_NEXT
  120. uint64_t next_count_;
  121. // Map to Tickers::NUMBER_DB_NEXT_FOUND
  122. uint64_t next_found_count_;
  123. // Map to Tickers::NUMBER_DB_PREV
  124. uint64_t prev_count_;
  125. // Map to Tickers::NUMBER_DB_PREV_FOUND
  126. uint64_t prev_found_count_;
  127. // Map to Tickers::ITER_BYTES_READ
  128. uint64_t bytes_read_;
  129. // Map to Tickers::NUMBER_ITER_SKIP
  130. uint64_t skip_count_;
  131. };
  132. // No copying allowed
  133. DBIter(const DBIter&) = delete;
  134. void operator=(const DBIter&) = delete;
  135. ~DBIter() override {
  136. MarkMemtableForFlushForAvgTrigger();
  137. ThreadStatus::OperationType cur_op_type =
  138. ThreadStatusUtil::GetThreadOperation();
  139. ThreadStatusUtil::SetThreadOperation(
  140. ThreadStatus::OperationType::OP_UNKNOWN);
  141. // Release pinned data if any
  142. if (pinned_iters_mgr_.PinningEnabled()) {
  143. pinned_iters_mgr_.ReleasePinnedData();
  144. }
  145. RecordTick(statistics_, NO_ITERATOR_DELETED);
  146. ResetInternalKeysSkippedCounter();
  147. local_stats_.BumpGlobalStatistics(statistics_);
  148. iter_.DeleteIter(arena_mode_);
  149. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  150. }
  151. void SetIter(InternalIterator* iter) {
  152. assert(iter_.iter() == nullptr);
  153. iter_.Set(iter);
  154. iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
  155. }
  156. bool Valid() const override {
  157. #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
  158. if (valid_) {
  159. status_.PermitUncheckedError();
  160. }
  161. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  162. return valid_;
  163. }
  164. Slice key() const override {
  165. assert(valid_);
  166. if (timestamp_lb_) {
  167. return saved_key_.GetInternalKey();
  168. } else {
  169. const Slice ukey_and_ts = saved_key_.GetUserKey();
  170. return Slice(ukey_and_ts.data(), ukey_and_ts.size() - timestamp_size_);
  171. }
  172. }
  173. Slice value() const override {
  174. assert(valid_);
  175. return value_;
  176. }
  177. const WideColumns& columns() const override {
  178. assert(valid_);
  179. return wide_columns_;
  180. }
  181. Status status() const override {
  182. if (status_.ok()) {
  183. return iter_.status();
  184. } else {
  185. assert(!valid_);
  186. return status_;
  187. }
  188. }
  189. Slice timestamp() const override {
  190. assert(valid_);
  191. assert(timestamp_size_ > 0);
  192. if (direction_ == kReverse) {
  193. return saved_timestamp_;
  194. }
  195. const Slice ukey_and_ts = saved_key_.GetUserKey();
  196. assert(timestamp_size_ < ukey_and_ts.size());
  197. return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_);
  198. }
  199. bool IsBlob() const {
  200. assert(valid_);
  201. return is_blob_;
  202. }
  203. Status GetProperty(std::string prop_name, std::string* prop) override;
  204. void Next() final override;
  205. void Prev() final override;
  206. // 'target' does not contain timestamp, even if user timestamp feature is
  207. // enabled.
  208. void Seek(const Slice& target) final override;
  209. void SeekForPrev(const Slice& target) final override;
  210. void SeekToFirst() final override;
  211. void SeekToLast() final override;
  212. Env* env() const { return env_; }
  213. void set_sequence(uint64_t s) {
  214. sequence_ = s;
  215. if (read_callback_) {
  216. read_callback_->Refresh(s);
  217. }
  218. iter_.SetRangeDelReadSeqno(s);
  219. }
  220. void set_valid(bool v) { valid_ = v; }
  221. bool PrepareValue() override;
  222. void Prepare(const MultiScanArgs& scan_opts) override;
  223. Status ValidateScanOptions(const MultiScanArgs& multiscan_opts) const;
  224. private:
  225. DBIter(Env* _env, const ReadOptions& read_options,
  226. const ImmutableOptions& ioptions,
  227. const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
  228. InternalIterator* iter, const Version* version, SequenceNumber s,
  229. bool arena_mode, ReadCallback* read_callback,
  230. ColumnFamilyHandleImpl* cfh, bool expose_blob_index,
  231. ReadOnlyMemTable* active_mem);
  232. class BlobReader {
  233. public:
  234. BlobReader(const Version* version, ReadTier read_tier,
  235. bool verify_checksums, bool fill_cache,
  236. Env::IOActivity io_activity)
  237. : version_(version),
  238. read_tier_(read_tier),
  239. verify_checksums_(verify_checksums),
  240. fill_cache_(fill_cache),
  241. io_activity_(io_activity) {}
  242. const Slice& GetBlobValue() const { return blob_value_; }
  243. Status RetrieveAndSetBlobValue(const Slice& user_key,
  244. const Slice& blob_index);
  245. void ResetBlobValue() { blob_value_.Reset(); }
  246. private:
  247. PinnableSlice blob_value_;
  248. const Version* version_;
  249. ReadTier read_tier_;
  250. bool verify_checksums_;
  251. bool fill_cache_;
  252. Env::IOActivity io_activity_;
  253. };
  254. // For all methods in this block:
  255. // PRE: iter_->Valid() && status_.ok()
  256. // Return false if there was an error, and status() is non-ok, valid_ = false;
  257. // in this case callers would usually stop what they were doing and return.
  258. bool ReverseToForward();
  259. bool ReverseToBackward();
  260. // Set saved_key_ to the seek key to target, with proper sequence number set.
  261. // It might get adjusted if the seek key is smaller than iterator lower bound.
  262. // target does not have timestamp.
  263. void SetSavedKeyToSeekTarget(const Slice& target);
  264. // Set saved_key_ to the seek key to target, with proper sequence number set.
  265. // It might get adjusted if the seek key is larger than iterator upper bound.
  266. // target does not have timestamp.
  267. void SetSavedKeyToSeekForPrevTarget(const Slice& target);
  268. bool FindValueForCurrentKey();
  269. bool FindValueForCurrentKeyUsingSeek();
  270. bool FindUserKeyBeforeSavedKey();
  271. // If `skipping_saved_key` is true, the function will keep iterating until it
  272. // finds a user key that is larger than `saved_key_`.
  273. // If `prefix` is not null, the iterator needs to stop when all keys for the
  274. // prefix are exhausted and the iterator is set to invalid.
  275. bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix);
  276. // Internal implementation of FindNextUserEntry().
  277. bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix);
  278. bool ParseKey(ParsedInternalKey* key);
  279. bool MergeValuesNewToOld();
  280. // If prefix is not null, we need to set the iterator to invalid if no more
  281. // entry can be found within the prefix.
  282. void PrevInternal(const Slice* prefix);
  283. bool TooManyInternalKeysSkipped(bool increment = true);
  284. bool IsVisible(SequenceNumber sequence, const Slice& ts,
  285. bool* more_recent = nullptr);
  286. // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
  287. // is called
  288. void TempPinData() {
  289. if (!pin_thru_lifetime_) {
  290. pinned_iters_mgr_.StartPinning();
  291. }
  292. }
  293. // Release blocks pinned by TempPinData()
  294. void ReleaseTempPinnedData() {
  295. if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
  296. pinned_iters_mgr_.ReleasePinnedData();
  297. }
  298. }
  299. inline void ClearSavedValue() {
  300. if (saved_value_.capacity() > 1048576) {
  301. std::string empty;
  302. swap(empty, saved_value_);
  303. } else {
  304. saved_value_.clear();
  305. }
  306. }
  307. inline void ResetInternalKeysSkippedCounter() {
  308. local_stats_.skip_count_ += num_internal_keys_skipped_;
  309. if (valid_) {
  310. local_stats_.skip_count_--;
  311. }
  312. num_internal_keys_skipped_ = 0;
  313. }
  314. bool expect_total_order_inner_iter() {
  315. assert(expect_total_order_inner_iter_ || prefix_extractor_ != nullptr);
  316. return expect_total_order_inner_iter_;
  317. }
  318. // If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need
  319. // to return versions of the same key. We cannot just skip if the key value
  320. // is the same but timestamps are different but fall in timestamp range.
  321. inline int CompareKeyForSkip(const Slice& a, const Slice& b) {
  322. return timestamp_lb_ != nullptr
  323. ? user_comparator_.Compare(a, b)
  324. : user_comparator_.CompareWithoutTimestamp(a, b);
  325. }
  326. void SetValueAndColumnsFromPlain(const Slice& slice) {
  327. assert(value_.empty());
  328. assert(wide_columns_.empty());
  329. value_ = slice;
  330. wide_columns_.emplace_back(kDefaultWideColumnName, slice);
  331. }
  332. bool SetValueAndColumnsFromBlobImpl(const Slice& user_key,
  333. const Slice& blob_index);
  334. bool SetValueAndColumnsFromBlob(const Slice& user_key,
  335. const Slice& blob_index);
  336. bool SetValueAndColumnsFromEntity(Slice slice);
  337. bool SetValueAndColumnsFromMergeResult(const Status& merge_status,
  338. ValueType result_type);
  339. void ResetValueAndColumns() {
  340. value_.clear();
  341. wide_columns_.clear();
  342. }
  343. void ResetBlobData() {
  344. blob_reader_.ResetBlobValue();
  345. lazy_blob_index_.clear();
  346. is_blob_ = false;
  347. }
  348. // The following methods perform the actual merge operation for the
  349. // no/plain/blob/wide-column base value cases.
  350. // If user-defined timestamp is enabled, `user_key` includes timestamp.
  351. bool MergeWithNoBaseValue(const Slice& user_key);
  352. bool MergeWithPlainBaseValue(const Slice& value, const Slice& user_key);
  353. bool MergeWithBlobBaseValue(const Slice& blob_index, const Slice& user_key);
  354. bool MergeWithWideColumnBaseValue(const Slice& entity, const Slice& user_key);
  355. bool PrepareValueInternal() {
  356. if (!iter_.PrepareValue()) {
  357. assert(!iter_.status().ok());
  358. valid_ = false;
  359. return false;
  360. }
  361. // ikey_ could change as BlockBasedTableIterator does Block cache
  362. // lookup and index_iter_ could point to different block resulting
  363. // in ikey_ pointing to wrong key. So ikey_ needs to be updated in
  364. // case of Seek/Next calls to point to right key again.
  365. if (!ParseKey(&ikey_)) {
  366. return false;
  367. }
  368. return true;
  369. }
  370. void MarkMemtableForFlushForAvgTrigger() {
  371. if (avg_op_scan_flush_trigger_ &&
  372. mem_hidden_op_scanned_since_seek_ >= memtable_op_scan_flush_trigger_ &&
  373. mem_hidden_op_scanned_since_seek_ >=
  374. static_cast<uint64_t>(iter_step_since_seek_) *
  375. avg_op_scan_flush_trigger_) {
  376. assert(memtable_op_scan_flush_trigger_ > 0);
  377. active_mem_->MarkForFlush();
  378. avg_op_scan_flush_trigger_ = 0;
  379. memtable_op_scan_flush_trigger_ = 0;
  380. }
  381. iter_step_since_seek_ = 1;
  382. mem_hidden_op_scanned_since_seek_ = 0;
  383. }
  384. void MarkMemtableForFlushForPerOpTrigger(uint64_t& mem_hidden_op_scanned) {
  385. if (memtable_op_scan_flush_trigger_ &&
  386. ikey_.sequence >= memtable_seqno_lb_) {
  387. if (++mem_hidden_op_scanned >= memtable_op_scan_flush_trigger_) {
  388. active_mem_->MarkForFlush();
  389. // Turn off the flush trigger checks.
  390. memtable_op_scan_flush_trigger_ = 0;
  391. avg_op_scan_flush_trigger_ = 0;
  392. }
  393. if (avg_op_scan_flush_trigger_) {
  394. ++mem_hidden_op_scanned_since_seek_;
  395. }
  396. }
  397. }
  398. const SliceTransform* prefix_extractor_;
  399. Env* const env_;
  400. SystemClock* clock_;
  401. Logger* logger_;
  402. UserComparatorWrapper user_comparator_;
  403. const MergeOperator* const merge_operator_;
  404. IteratorWrapper iter_;
  405. BlobReader blob_reader_;
  406. ReadCallback* read_callback_;
  407. // Max visible sequence number. It is normally the snapshot seq unless we have
  408. // uncommitted data in db as in WriteUnCommitted.
  409. SequenceNumber sequence_;
  410. IterKey saved_key_;
  411. // Reusable internal key data structure. This is only used inside one function
  412. // and should not be used across functions. Reusing this object can reduce
  413. // overhead of calling construction of the function if creating it each time.
  414. ParsedInternalKey ikey_;
  415. // The approximate write time for the entry. It is deduced from the entry's
  416. // sequence number if the seqno to time mapping is available. For a
  417. // kTypeValuePreferredSeqno entry, this is the write time specified by the
  418. // user.
  419. uint64_t saved_write_unix_time_;
  420. std::string saved_value_;
  421. Slice pinned_value_;
  422. // for prefix seek mode to support prev()
  423. // Value of the default column
  424. Slice value_;
  425. // All columns (i.e. name-value pairs)
  426. WideColumns wide_columns_;
  427. Statistics* statistics_;
  428. uint64_t max_skip_;
  429. uint64_t max_skippable_internal_keys_;
  430. uint64_t num_internal_keys_skipped_;
  431. const Slice* iterate_lower_bound_;
  432. const Slice* iterate_upper_bound_;
  433. // The prefix of the seek key. It is only used when prefix_same_as_start_
  434. // is true and prefix extractor is not null. In Next() or Prev(), current keys
  435. // will be checked against this prefix, so that the iterator can be
  436. // invalidated if the keys in this prefix has been exhausted. Set it using
  437. // SetUserKey() and use it using GetUserKey().
  438. IterKey prefix_;
  439. Status status_;
  440. Slice lazy_blob_index_;
  441. // List of operands for merge operator.
  442. MergeContext merge_context_;
  443. LocalStatistics local_stats_;
  444. PinnedIteratorsManager pinned_iters_mgr_;
  445. ColumnFamilyHandleImpl* cfh_;
  446. const Slice* const timestamp_ub_;
  447. const Slice* const timestamp_lb_;
  448. const size_t timestamp_size_;
  449. std::string saved_timestamp_;
  450. std::optional<MultiScanArgs> scan_opts_;
  451. size_t scan_index_{0};
  452. ReadOnlyMemTable* const active_mem_;
  453. SequenceNumber memtable_seqno_lb_;
  454. uint32_t memtable_op_scan_flush_trigger_;
  455. uint32_t avg_op_scan_flush_trigger_;
  456. uint32_t iter_step_since_seek_;
  457. uint32_t mem_hidden_op_scanned_since_seek_;
  458. Direction direction_;
  459. bool valid_;
  460. bool current_entry_is_merged_;
  461. // True if we know that the current entry's seqnum is 0.
  462. // This information is used as that the next entry will be for another
  463. // user key.
  464. bool is_key_seqnum_zero_;
  465. const bool prefix_same_as_start_;
  466. // Means that we will pin all data blocks we read as long the Iterator
  467. // is not deleted, will be true if ReadOptions::pin_data is true
  468. const bool pin_thru_lifetime_;
  469. // Expect the inner iterator to maintain a total order.
  470. // prefix_extractor_ must be non-NULL if the value is false.
  471. const bool expect_total_order_inner_iter_;
  472. // Whether the iterator is allowed to expose blob references. Set to true when
  473. // the stacked BlobDB implementation is used, false otherwise.
  474. bool expose_blob_index_;
  475. bool allow_unprepared_value_;
  476. bool is_blob_;
  477. bool arena_mode_;
  478. };
  479. } // namespace ROCKSDB_NAMESPACE