db_iter.cc 64 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_iter.h"
  10. #include <limits>
  11. #include <string>
  12. #include "db/dbformat.h"
  13. #include "db/merge_context.h"
  14. #include "db/merge_helper.h"
  15. #include "db/pinned_iterators_manager.h"
  16. #include "db/wide/wide_column_serialization.h"
  17. #include "db/wide/wide_columns_helper.h"
  18. #include "file/filename.h"
  19. #include "logging/logging.h"
  20. #include "memory/arena.h"
  21. #include "monitoring/perf_context_imp.h"
  22. #include "rocksdb/env.h"
  23. #include "rocksdb/iterator.h"
  24. #include "rocksdb/merge_operator.h"
  25. #include "rocksdb/options.h"
  26. #include "rocksdb/system_clock.h"
  27. #include "table/internal_iterator.h"
  28. #include "table/iterator_wrapper.h"
  29. #include "trace_replay/trace_replay.h"
  30. #include "util/mutexlock.h"
  31. #include "util/string_util.h"
  32. #include "util/user_comparator_wrapper.h"
  33. namespace ROCKSDB_NAMESPACE {
  34. DBIter::DBIter(Env* _env, const ReadOptions& read_options,
  35. const ImmutableOptions& ioptions,
  36. const MutableCFOptions& mutable_cf_options,
  37. const Comparator* cmp, InternalIterator* iter,
  38. const Version* version, SequenceNumber s, bool arena_mode,
  39. ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
  40. bool expose_blob_index, ReadOnlyMemTable* active_mem)
  41. : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
  42. env_(_env),
  43. clock_(ioptions.clock),
  44. logger_(ioptions.logger),
  45. user_comparator_(cmp),
  46. merge_operator_(ioptions.merge_operator.get()),
  47. iter_(iter),
  48. blob_reader_(version, read_options.read_tier,
  49. read_options.verify_checksums, read_options.fill_cache,
  50. read_options.io_activity),
  51. read_callback_(read_callback),
  52. sequence_(s),
  53. statistics_(ioptions.stats),
  54. max_skip_(mutable_cf_options.max_sequential_skip_in_iterations),
  55. max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
  56. num_internal_keys_skipped_(0),
  57. iterate_lower_bound_(read_options.iterate_lower_bound),
  58. iterate_upper_bound_(read_options.iterate_upper_bound),
  59. cfh_(cfh),
  60. timestamp_ub_(read_options.timestamp),
  61. timestamp_lb_(read_options.iter_start_ts),
  62. timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0),
  63. active_mem_(active_mem),
  64. memtable_seqno_lb_(kMaxSequenceNumber),
  65. memtable_op_scan_flush_trigger_(0),
  66. avg_op_scan_flush_trigger_(0),
  67. iter_step_since_seek_(1),
  68. mem_hidden_op_scanned_since_seek_(0),
  69. direction_(kForward),
  70. valid_(false),
  71. current_entry_is_merged_(false),
  72. is_key_seqnum_zero_(false),
  73. prefix_same_as_start_(
  74. prefix_extractor_ ? read_options.prefix_same_as_start : false),
  75. pin_thru_lifetime_(read_options.pin_data),
  76. expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
  77. read_options.total_order_seek ||
  78. read_options.auto_prefix_mode),
  79. expose_blob_index_(expose_blob_index),
  80. allow_unprepared_value_(read_options.allow_unprepared_value),
  81. is_blob_(false),
  82. arena_mode_(arena_mode) {
  83. RecordTick(statistics_, NO_ITERATOR_CREATED);
  84. if (pin_thru_lifetime_) {
  85. pinned_iters_mgr_.StartPinning();
  86. }
  87. if (iter_.iter()) {
  88. iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
  89. }
  90. status_.PermitUncheckedError();
  91. assert(timestamp_size_ ==
  92. user_comparator_.user_comparator()->timestamp_size());
  93. // prefix_seek_opt_in_only should force total_order_seek whereever the caller
  94. // is duplicating the original ReadOptions
  95. assert(!ioptions.prefix_seek_opt_in_only || read_options.total_order_seek);
  96. if (active_mem_) {
  97. // FIXME: GetEarliestSequenceNumber() may return a seqno that is one smaller
  98. // than the smallest seqno in the memtable. This violates its comment and
  99. // entries with that seqno may not be in the active memtable. Before it's
  100. // fixed, we use GetFirstSequenceNumber() for more accurate result.
  101. memtable_seqno_lb_ = active_mem_->IsEmpty()
  102. ? active_mem_->GetEarliestSequenceNumber()
  103. : active_mem_->GetFirstSequenceNumber();
  104. memtable_op_scan_flush_trigger_ =
  105. mutable_cf_options.memtable_op_scan_flush_trigger;
  106. if (memtable_op_scan_flush_trigger_) {
  107. // avg_op_scan_flush_trigger_ requires memtable_op_scan_flush_trigger_ > 0
  108. avg_op_scan_flush_trigger_ =
  109. mutable_cf_options.memtable_avg_op_scan_flush_trigger;
  110. }
  111. } else {
  112. // memtable_op_scan_flush_trigger_ and avg_op_scan_flush_trigger_ are
  113. // initialized to 0(disabled) as default.
  114. }
  115. }
  116. Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
  117. if (prop == nullptr) {
  118. return Status::InvalidArgument("prop is nullptr");
  119. }
  120. if (prop_name == "rocksdb.iterator.super-version-number") {
  121. // First try to pass the value returned from inner iterator.
  122. return iter_.iter()->GetProperty(prop_name, prop);
  123. } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
  124. if (valid_) {
  125. *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
  126. } else {
  127. *prop = "Iterator is not valid.";
  128. }
  129. return Status::OK();
  130. } else if (prop_name == "rocksdb.iterator.is-value-pinned") {
  131. if (valid_) {
  132. *prop = (pin_thru_lifetime_ && iter_.Valid() &&
  133. iter_.value().data() == value_.data())
  134. ? "1"
  135. : "0";
  136. } else {
  137. *prop = "Iterator is not valid.";
  138. }
  139. return Status::OK();
  140. } else if (prop_name == "rocksdb.iterator.internal-key") {
  141. *prop = saved_key_.GetUserKey().ToString();
  142. return Status::OK();
  143. } else if (prop_name == "rocksdb.iterator.write-time") {
  144. PutFixed64(prop, saved_write_unix_time_);
  145. return Status::OK();
  146. }
  147. return Status::InvalidArgument("Unidentified property.");
  148. }
  149. bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  150. Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);
  151. if (!s.ok()) {
  152. status_ = Status::Corruption("In DBIter: ", s.getState());
  153. valid_ = false;
  154. ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
  155. return false;
  156. } else {
  157. return true;
  158. }
  159. }
  160. void DBIter::Next() {
  161. assert(valid_);
  162. assert(status_.ok());
  163. PERF_COUNTER_ADD(iter_next_count, 1);
  164. PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
  165. // Release temporarily pinned blocks from last operation
  166. ReleaseTempPinnedData();
  167. ResetBlobData();
  168. ResetValueAndColumns();
  169. local_stats_.skip_count_ += num_internal_keys_skipped_;
  170. local_stats_.skip_count_--;
  171. num_internal_keys_skipped_ = 0;
  172. iter_step_since_seek_++;
  173. bool ok = true;
  174. if (direction_ == kReverse) {
  175. is_key_seqnum_zero_ = false;
  176. if (!ReverseToForward()) {
  177. ok = false;
  178. }
  179. } else if (!current_entry_is_merged_) {
  180. // If the current value is not a merge, the iter position is the
  181. // current key, which is already returned. We can safely issue a
  182. // Next() without checking the current key.
  183. // If the current key is a merge, very likely iter already points
  184. // to the next internal position.
  185. assert(iter_.Valid());
  186. iter_.Next();
  187. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  188. }
  189. local_stats_.next_count_++;
  190. if (ok && iter_.Valid()) {
  191. ClearSavedValue();
  192. if (prefix_same_as_start_) {
  193. assert(prefix_extractor_ != nullptr);
  194. const Slice prefix = prefix_.GetUserKey();
  195. FindNextUserEntry(true /* skipping the current user key */, &prefix);
  196. } else {
  197. FindNextUserEntry(true /* skipping the current user key */, nullptr);
  198. }
  199. } else {
  200. is_key_seqnum_zero_ = false;
  201. valid_ = false;
  202. }
  203. if (statistics_ != nullptr && valid_) {
  204. local_stats_.next_found_count_++;
  205. local_stats_.bytes_read_ += (key().size() + value().size());
  206. }
  207. }
  208. Status DBIter::BlobReader::RetrieveAndSetBlobValue(const Slice& user_key,
  209. const Slice& blob_index) {
  210. assert(blob_value_.empty());
  211. if (!version_) {
  212. return Status::Corruption("Encountered unexpected blob index.");
  213. }
  214. // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
  215. // avoid having to copy options back and forth.
  216. // TODO: plumb Env::IOPriority
  217. ReadOptions read_options;
  218. read_options.read_tier = read_tier_;
  219. read_options.verify_checksums = verify_checksums_;
  220. read_options.fill_cache = fill_cache_;
  221. read_options.io_activity = io_activity_;
  222. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  223. constexpr uint64_t* bytes_read = nullptr;
  224. const Status s = version_->GetBlob(read_options, user_key, blob_index,
  225. prefetch_buffer, &blob_value_, bytes_read);
  226. if (!s.ok()) {
  227. return s;
  228. }
  229. return Status::OK();
  230. }
  231. bool DBIter::SetValueAndColumnsFromBlobImpl(const Slice& user_key,
  232. const Slice& blob_index) {
  233. const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index);
  234. if (!s.ok()) {
  235. status_ = s;
  236. valid_ = false;
  237. is_blob_ = false;
  238. return false;
  239. }
  240. SetValueAndColumnsFromPlain(blob_reader_.GetBlobValue());
  241. return true;
  242. }
  243. bool DBIter::SetValueAndColumnsFromBlob(const Slice& user_key,
  244. const Slice& blob_index) {
  245. assert(!is_blob_);
  246. is_blob_ = true;
  247. if (expose_blob_index_) {
  248. SetValueAndColumnsFromPlain(blob_index);
  249. return true;
  250. }
  251. if (allow_unprepared_value_) {
  252. assert(value_.empty());
  253. assert(wide_columns_.empty());
  254. assert(lazy_blob_index_.empty());
  255. lazy_blob_index_ = blob_index;
  256. return true;
  257. }
  258. return SetValueAndColumnsFromBlobImpl(user_key, blob_index);
  259. }
  260. bool DBIter::SetValueAndColumnsFromEntity(Slice slice) {
  261. assert(value_.empty());
  262. assert(wide_columns_.empty());
  263. const Status s = WideColumnSerialization::Deserialize(slice, wide_columns_);
  264. if (!s.ok()) {
  265. status_ = s;
  266. valid_ = false;
  267. wide_columns_.clear();
  268. return false;
  269. }
  270. if (WideColumnsHelper::HasDefaultColumn(wide_columns_)) {
  271. value_ = WideColumnsHelper::GetDefaultColumn(wide_columns_);
  272. }
  273. return true;
  274. }
  275. bool DBIter::SetValueAndColumnsFromMergeResult(const Status& merge_status,
  276. ValueType result_type) {
  277. if (!merge_status.ok()) {
  278. valid_ = false;
  279. status_ = merge_status;
  280. return false;
  281. }
  282. if (result_type == kTypeWideColumnEntity) {
  283. if (!SetValueAndColumnsFromEntity(saved_value_)) {
  284. assert(!valid_);
  285. return false;
  286. }
  287. valid_ = true;
  288. return true;
  289. }
  290. assert(result_type == kTypeValue);
  291. SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
  292. : saved_value_);
  293. valid_ = true;
  294. return true;
  295. }
  296. bool DBIter::PrepareValue() {
  297. assert(valid_);
  298. if (lazy_blob_index_.empty()) {
  299. return true;
  300. }
  301. assert(allow_unprepared_value_);
  302. assert(is_blob_);
  303. const bool result =
  304. SetValueAndColumnsFromBlobImpl(saved_key_.GetUserKey(), lazy_blob_index_);
  305. lazy_blob_index_.clear();
  306. return result;
  307. }
  308. // PRE: saved_key_ has the current user key if skipping_saved_key
  309. // POST: saved_key_ should have the next user key if valid_,
  310. // if the current entry is a result of merge
  311. // current_entry_is_merged_ => true
  312. // saved_value_ => the merged value
  313. //
  314. // NOTE: In between, saved_key_ can point to a user key that has
  315. // a delete marker or a sequence number higher than sequence_
  316. // saved_key_ MUST have a proper user_key before calling this function
  317. //
  318. // The prefix parameter, if not null, indicates that we need to iterate
  319. // within the prefix, and the iterator needs to be made invalid, if no
  320. // more entry for the prefix can be found.
  321. bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
  322. PERF_TIMER_GUARD(find_next_user_entry_time);
  323. return FindNextUserEntryInternal(skipping_saved_key, prefix);
  324. }
  325. // Actual implementation of DBIter::FindNextUserEntry()
  326. bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
  327. const Slice* prefix) {
  328. // Loop until we hit an acceptable entry to yield
  329. assert(iter_.Valid());
  330. assert(status_.ok());
  331. assert(direction_ == kForward);
  332. current_entry_is_merged_ = false;
  333. // How many times in a row we have skipped an entry with user key less than
  334. // or equal to saved_key_. We could skip these entries either because
  335. // sequence numbers were too high or because skipping_saved_key = true.
  336. // What saved_key_ contains throughout this method:
  337. // - if skipping_saved_key : saved_key_ contains the key that we need
  338. // to skip, and we haven't seen any keys greater
  339. // than that,
  340. // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
  341. // num_skipped times, and we haven't seen any keys
  342. // greater than that,
  343. // - none of the above : saved_key_ can contain anything, it doesn't
  344. // matter.
  345. uint64_t num_skipped = 0;
  346. // For write unprepared, the target sequence number in reseek could be larger
  347. // than the snapshot, and thus needs to be skipped again. This could result in
  348. // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
  349. // to one.
  350. bool reseek_done = false;
  351. uint64_t mem_hidden_op_scanned = 0;
  352. do {
  353. // Will update is_key_seqnum_zero_ as soon as we parsed the current key
  354. // but we need to save the previous value to be used in the loop.
  355. bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
  356. if (!ParseKey(&ikey_)) {
  357. is_key_seqnum_zero_ = false;
  358. return false;
  359. }
  360. Slice user_key_without_ts =
  361. StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
  362. is_key_seqnum_zero_ = (ikey_.sequence == 0);
  363. assert(iterate_upper_bound_ == nullptr ||
  364. iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
  365. user_comparator_.CompareWithoutTimestamp(
  366. user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
  367. /*b_has_ts=*/false) < 0);
  368. if (iterate_upper_bound_ != nullptr &&
  369. iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
  370. user_comparator_.CompareWithoutTimestamp(
  371. user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
  372. /*b_has_ts=*/false) >= 0) {
  373. break;
  374. }
  375. assert(prefix == nullptr || prefix_extractor_ != nullptr);
  376. if (prefix != nullptr &&
  377. prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
  378. 0) {
  379. assert(prefix_same_as_start_);
  380. break;
  381. }
  382. if (TooManyInternalKeysSkipped()) {
  383. return false;
  384. }
  385. assert(ikey_.user_key.size() >= timestamp_size_);
  386. Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
  387. ikey_.user_key, timestamp_size_)
  388. : Slice();
  389. bool more_recent = false;
  390. if (IsVisible(ikey_.sequence, ts, &more_recent)) {
  391. // If the previous entry is of seqnum 0, the current entry will not
  392. // possibly be skipped. This condition can potentially be relaxed to
  393. // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
  394. // prone to bugs causing the same user key with the same sequence number.
  395. // Note that with current timestamp implementation, the same user key can
  396. // have different timestamps and zero sequence number on the bottommost
  397. // level. This may change in the future.
  398. if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
  399. skipping_saved_key &&
  400. CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
  401. num_skipped++; // skip this entry
  402. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  403. MarkMemtableForFlushForPerOpTrigger(mem_hidden_op_scanned);
  404. } else {
  405. assert(!skipping_saved_key ||
  406. CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
  407. num_skipped = 0;
  408. reseek_done = false;
  409. switch (ikey_.type) {
  410. case kTypeDeletion:
  411. case kTypeDeletionWithTimestamp:
  412. case kTypeSingleDeletion:
  413. // Arrange to skip all upcoming entries for this key since
  414. // they are hidden by this deletion.
  415. if (timestamp_lb_) {
  416. saved_key_.SetInternalKey(ikey_);
  417. valid_ = true;
  418. return true;
  419. } else {
  420. saved_key_.SetUserKey(
  421. ikey_.user_key, !pin_thru_lifetime_ ||
  422. !iter_.iter()->IsKeyPinned() /* copy */);
  423. skipping_saved_key = true;
  424. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  425. MarkMemtableForFlushForPerOpTrigger(mem_hidden_op_scanned);
  426. }
  427. break;
  428. case kTypeValue:
  429. case kTypeValuePreferredSeqno:
  430. case kTypeBlobIndex:
  431. case kTypeWideColumnEntity:
  432. if (!PrepareValueInternal()) {
  433. return false;
  434. }
  435. if (timestamp_lb_) {
  436. saved_key_.SetInternalKey(ikey_);
  437. } else {
  438. saved_key_.SetUserKey(
  439. ikey_.user_key, !pin_thru_lifetime_ ||
  440. !iter_.iter()->IsKeyPinned() /* copy */);
  441. }
  442. if (ikey_.type == kTypeBlobIndex) {
  443. if (!SetValueAndColumnsFromBlob(ikey_.user_key, iter_.value())) {
  444. return false;
  445. }
  446. } else if (ikey_.type == kTypeWideColumnEntity) {
  447. if (!SetValueAndColumnsFromEntity(iter_.value())) {
  448. return false;
  449. }
  450. } else {
  451. assert(ikey_.type == kTypeValue ||
  452. ikey_.type == kTypeValuePreferredSeqno);
  453. Slice value = iter_.value();
  454. saved_write_unix_time_ = iter_.write_unix_time();
  455. if (ikey_.type == kTypeValuePreferredSeqno) {
  456. value = ParsePackedValueForValue(value);
  457. }
  458. SetValueAndColumnsFromPlain(value);
  459. }
  460. valid_ = true;
  461. return true;
  462. case kTypeMerge:
  463. if (!PrepareValueInternal()) {
  464. return false;
  465. }
  466. saved_key_.SetUserKey(
  467. ikey_.user_key,
  468. !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
  469. // By now, we are sure the current ikey is going to yield a value
  470. current_entry_is_merged_ = true;
  471. valid_ = true;
  472. return MergeValuesNewToOld(); // Go to a different state machine
  473. default:
  474. valid_ = false;
  475. status_ = Status::Corruption(
  476. "Unknown value type: " +
  477. std::to_string(static_cast<unsigned int>(ikey_.type)));
  478. return false;
  479. }
  480. }
  481. } else {
  482. if (more_recent) {
  483. PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
  484. }
  485. // This key was inserted after our snapshot was taken or skipped by
  486. // timestamp range. If this happens too many times in a row for the same
  487. // user key, we want to seek to the target sequence number.
  488. int cmp = user_comparator_.CompareWithoutTimestamp(
  489. ikey_.user_key, saved_key_.GetUserKey());
  490. if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
  491. num_skipped++;
  492. } else {
  493. saved_key_.SetUserKey(
  494. ikey_.user_key,
  495. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  496. skipping_saved_key = false;
  497. num_skipped = 0;
  498. reseek_done = false;
  499. }
  500. }
  501. // If we have sequentially iterated via numerous equal keys, then it's
  502. // better to seek so that we can avoid too many key comparisons.
  503. //
  504. // To avoid infinite loops, do not reseek if we have already attempted to
  505. // reseek previously.
  506. //
  507. // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
  508. // then it does not make sense to reseek as we would actually land further
  509. // away from the desired key. There is opportunity for optimization here.
  510. if (num_skipped > max_skip_ && !reseek_done) {
  511. is_key_seqnum_zero_ = false;
  512. num_skipped = 0;
  513. reseek_done = true;
  514. std::string last_key;
  515. if (skipping_saved_key) {
  516. // We're looking for the next user-key but all we see are the same
  517. // user-key with decreasing sequence numbers. Fast forward to
  518. // sequence number 0 and type deletion (the smallest type).
  519. if (timestamp_size_ == 0) {
  520. AppendInternalKey(
  521. &last_key,
  522. ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
  523. } else {
  524. const std::string kTsMin(timestamp_size_, '\0');
  525. AppendInternalKeyWithDifferentTimestamp(
  526. &last_key,
  527. ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
  528. kTsMin);
  529. }
  530. // Don't set skipping_saved_key = false because we may still see more
  531. // user-keys equal to saved_key_.
  532. } else {
  533. // We saw multiple entries with this user key and sequence numbers
  534. // higher than sequence_. Fast forward to sequence_.
  535. // Note that this only covers a case when a higher key was overwritten
  536. // many times since our snapshot was taken, not the case when a lot of
  537. // different keys were inserted after our snapshot was taken.
  538. if (timestamp_size_ == 0) {
  539. AppendInternalKey(
  540. &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
  541. kValueTypeForSeek));
  542. } else {
  543. AppendInternalKeyWithDifferentTimestamp(
  544. &last_key,
  545. ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
  546. kValueTypeForSeek),
  547. *timestamp_ub_);
  548. }
  549. }
  550. iter_.Seek(last_key);
  551. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  552. } else {
  553. iter_.Next();
  554. }
  555. // This could be a long-running operation due to tombstones, etc.
  556. bool aborted = ROCKSDB_THREAD_YIELD_CHECK_ABORT();
  557. if (aborted) {
  558. valid_ = false;
  559. status_ = Status::Aborted("Query abort.");
  560. return false;
  561. }
  562. } while (iter_.Valid());
  563. valid_ = false;
  564. return iter_.status().ok();
  565. }
  566. // Merge values of the same user key starting from the current iter_ position
  567. // Scan from the newer entries to older entries.
  568. // PRE: iter_.key() points to the first merge type entry
  569. // saved_key_ stores the user key
  570. // iter_.PrepareValue() has been called
  571. // POST: saved_value_ has the merged value for the user key
  572. // iter_ points to the next entry (or invalid)
  573. bool DBIter::MergeValuesNewToOld() {
  574. if (!merge_operator_) {
  575. ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
  576. status_ = Status::InvalidArgument("merge_operator_ must be set.");
  577. valid_ = false;
  578. return false;
  579. }
  580. // Temporarily pin the blocks that hold merge operands
  581. TempPinData();
  582. merge_context_.Clear();
  583. // Start the merge process by pushing the first operand
  584. merge_context_.PushOperand(
  585. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  586. PERF_COUNTER_ADD(internal_merge_count, 1);
  587. TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
  588. ParsedInternalKey ikey;
  589. for (iter_.Next(); iter_.Valid(); iter_.Next()) {
  590. TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
  591. if (!ParseKey(&ikey)) {
  592. return false;
  593. }
  594. if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
  595. saved_key_.GetUserKey())) {
  596. // hit the next user key, stop right here
  597. break;
  598. }
  599. if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
  600. kTypeDeletionWithTimestamp == ikey.type) {
  601. // hit a delete with the same user key, stop right here
  602. // iter_ is positioned after delete
  603. iter_.Next();
  604. break;
  605. }
  606. if (!PrepareValueInternal()) {
  607. return false;
  608. }
  609. if (kTypeValue == ikey.type || kTypeValuePreferredSeqno == ikey.type) {
  610. Slice value = iter_.value();
  611. saved_write_unix_time_ = iter_.write_unix_time();
  612. if (kTypeValuePreferredSeqno == ikey.type) {
  613. value = ParsePackedValueForValue(value);
  614. }
  615. // hit a put or put equivalent, merge the put value with operands and
  616. // store the final result in saved_value_. We are done!
  617. if (!MergeWithPlainBaseValue(value, ikey.user_key)) {
  618. return false;
  619. }
  620. // iter_ is positioned after put
  621. iter_.Next();
  622. if (!iter_.status().ok()) {
  623. valid_ = false;
  624. return false;
  625. }
  626. return true;
  627. } else if (kTypeMerge == ikey.type) {
  628. // hit a merge, add the value as an operand and run associative merge.
  629. // when complete, add result to operands and continue.
  630. merge_context_.PushOperand(
  631. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  632. PERF_COUNTER_ADD(internal_merge_count, 1);
  633. } else if (kTypeBlobIndex == ikey.type) {
  634. if (!MergeWithBlobBaseValue(iter_.value(), ikey.user_key)) {
  635. return false;
  636. }
  637. // iter_ is positioned after put
  638. iter_.Next();
  639. if (!iter_.status().ok()) {
  640. valid_ = false;
  641. return false;
  642. }
  643. return true;
  644. } else if (kTypeWideColumnEntity == ikey.type) {
  645. if (!MergeWithWideColumnBaseValue(iter_.value(), ikey.user_key)) {
  646. return false;
  647. }
  648. // iter_ is positioned after put
  649. iter_.Next();
  650. if (!iter_.status().ok()) {
  651. valid_ = false;
  652. return false;
  653. }
  654. return true;
  655. } else {
  656. valid_ = false;
  657. status_ = Status::Corruption(
  658. "Unrecognized value type: " +
  659. std::to_string(static_cast<unsigned int>(ikey.type)));
  660. return false;
  661. }
  662. }
  663. if (!iter_.status().ok()) {
  664. valid_ = false;
  665. return false;
  666. }
  667. // we either exhausted all internal keys under this user key, or hit
  668. // a deletion marker.
  669. // feed null as the existing value to the merge operator, such that
  670. // client can differentiate this scenario and do things accordingly.
  671. if (!MergeWithNoBaseValue(saved_key_.GetUserKey())) {
  672. return false;
  673. }
  674. assert(status_.ok());
  675. return true;
  676. }
  677. void DBIter::Prev() {
  678. assert(valid_);
  679. assert(status_.ok());
  680. PERF_COUNTER_ADD(iter_prev_count, 1);
  681. PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
  682. ReleaseTempPinnedData();
  683. ResetBlobData();
  684. ResetValueAndColumns();
  685. ResetInternalKeysSkippedCounter();
  686. bool ok = true;
  687. if (direction_ == kForward) {
  688. if (!ReverseToBackward()) {
  689. ok = false;
  690. }
  691. }
  692. if (ok) {
  693. ClearSavedValue();
  694. Slice prefix;
  695. if (prefix_same_as_start_) {
  696. assert(prefix_extractor_ != nullptr);
  697. prefix = prefix_.GetUserKey();
  698. }
  699. PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
  700. }
  701. if (statistics_ != nullptr) {
  702. local_stats_.prev_count_++;
  703. if (valid_) {
  704. local_stats_.prev_found_count_++;
  705. local_stats_.bytes_read_ += (key().size() + value().size());
  706. }
  707. }
  708. }
  709. bool DBIter::ReverseToForward() {
  710. assert(iter_.status().ok());
  711. // When moving backwards, iter_ is positioned on _previous_ key, which may
  712. // not exist or may have different prefix than the current key().
  713. // If that's the case, seek iter_ to current key.
  714. if (!expect_total_order_inner_iter() || !iter_.Valid()) {
  715. std::string last_key;
  716. if (timestamp_size_ == 0) {
  717. AppendInternalKey(
  718. &last_key, ParsedInternalKey(saved_key_.GetUserKey(),
  719. kMaxSequenceNumber, kValueTypeForSeek));
  720. } else {
  721. // TODO: pre-create kTsMax.
  722. const std::string kTsMax(timestamp_size_, '\xff');
  723. AppendInternalKeyWithDifferentTimestamp(
  724. &last_key,
  725. ParsedInternalKey(saved_key_.GetUserKey(), kMaxSequenceNumber,
  726. kValueTypeForSeek),
  727. kTsMax);
  728. }
  729. iter_.Seek(last_key);
  730. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  731. }
  732. direction_ = kForward;
  733. // Skip keys less than the current key() (a.k.a. saved_key_).
  734. while (iter_.Valid()) {
  735. ParsedInternalKey ikey;
  736. if (!ParseKey(&ikey)) {
  737. return false;
  738. }
  739. if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
  740. return true;
  741. }
  742. iter_.Next();
  743. }
  744. if (!iter_.status().ok()) {
  745. valid_ = false;
  746. return false;
  747. }
  748. return true;
  749. }
  750. // Move iter_ to the key before saved_key_.
  751. bool DBIter::ReverseToBackward() {
  752. assert(iter_.status().ok());
  753. // When current_entry_is_merged_ is true, iter_ may be positioned on the next
  754. // key, which may not exist or may have prefix different from current.
  755. // If that's the case, seek to saved_key_.
  756. if (current_entry_is_merged_ &&
  757. (!expect_total_order_inner_iter() || !iter_.Valid())) {
  758. IterKey last_key;
  759. // Using kMaxSequenceNumber and kValueTypeForSeek
  760. // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
  761. // than saved_key_.
  762. last_key.SetInternalKey(ParsedInternalKey(
  763. saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
  764. if (!expect_total_order_inner_iter()) {
  765. iter_.SeekForPrev(last_key.GetInternalKey());
  766. } else {
  767. // Some iterators may not support SeekForPrev(), so we avoid using it
  768. // when prefix seek mode is disabled. This is somewhat expensive
  769. // (an extra Prev(), as well as an extra change of direction of iter_),
  770. // so we may need to reconsider it later.
  771. iter_.Seek(last_key.GetInternalKey());
  772. if (!iter_.Valid() && iter_.status().ok()) {
  773. iter_.SeekToLast();
  774. }
  775. }
  776. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  777. }
  778. direction_ = kReverse;
  779. return FindUserKeyBeforeSavedKey();
  780. }
  781. void DBIter::PrevInternal(const Slice* prefix) {
  782. while (iter_.Valid()) {
  783. saved_key_.SetUserKey(
  784. ExtractUserKey(iter_.key()),
  785. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  786. assert(prefix == nullptr || prefix_extractor_ != nullptr);
  787. if (prefix != nullptr &&
  788. prefix_extractor_
  789. ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
  790. timestamp_size_))
  791. .compare(*prefix) != 0) {
  792. assert(prefix_same_as_start_);
  793. // Current key does not have the same prefix as start
  794. valid_ = false;
  795. return;
  796. }
  797. assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
  798. user_comparator_.CompareWithoutTimestamp(
  799. saved_key_.GetUserKey(), /*a_has_ts=*/true,
  800. *iterate_lower_bound_, /*b_has_ts=*/false) >= 0);
  801. if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
  802. user_comparator_.CompareWithoutTimestamp(
  803. saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
  804. /*b_has_ts=*/false) < 0) {
  805. // We've iterated earlier than the user-specified lower bound.
  806. valid_ = false;
  807. return;
  808. }
  809. if (!FindValueForCurrentKey()) { // assigns valid_
  810. return;
  811. }
  812. // Whether or not we found a value for current key, we need iter_ to end up
  813. // on a smaller key.
  814. if (!FindUserKeyBeforeSavedKey()) {
  815. return;
  816. }
  817. if (valid_) {
  818. // Found the value.
  819. return;
  820. }
  821. if (TooManyInternalKeysSkipped(false)) {
  822. return;
  823. }
  824. }
  825. // We haven't found any key - iterator is not valid
  826. valid_ = false;
  827. }
  828. // Used for backwards iteration.
  829. // Looks at the entries with user key saved_key_ and finds the most up-to-date
  830. // value for it, or executes a merge, or determines that the value was deleted.
  831. // Sets valid_ to true if the value is found and is ready to be presented to
  832. // the user through value().
  833. // Sets valid_ to false if the value was deleted, and we should try another key.
  834. // Returns false if an error occurred, and !status().ok() and !valid_.
  835. //
  836. // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
  837. // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
  838. // the entry just before them, or on the entry just after them.
  839. bool DBIter::FindValueForCurrentKey() {
  840. assert(iter_.Valid());
  841. merge_context_.Clear();
  842. current_entry_is_merged_ = false;
  843. // last entry before merge (could be kTypeDeletion,
  844. // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue
  845. // kTypeBlobIndex, kTypeWideColumnEntity or kTypeValuePreferredSeqno)
  846. ValueType last_not_merge_type = kTypeDeletion;
  847. ValueType last_key_entry_type = kTypeDeletion;
  848. // If false, it indicates that we have not seen any valid entry, even though
  849. // last_key_entry_type is initialized to kTypeDeletion.
  850. bool valid_entry_seen = false;
  851. // Temporarily pin blocks that hold (merge operands / the value)
  852. ReleaseTempPinnedData();
  853. TempPinData();
  854. size_t num_skipped = 0;
  855. while (iter_.Valid()) {
  856. ParsedInternalKey ikey;
  857. if (!ParseKey(&ikey)) {
  858. return false;
  859. }
  860. if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
  861. saved_key_.GetUserKey())) {
  862. // Found a smaller user key, thus we are done with current user key.
  863. break;
  864. }
  865. assert(ikey.user_key.size() >= timestamp_size_);
  866. Slice ts;
  867. if (timestamp_size_ > 0) {
  868. ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
  869. timestamp_size_);
  870. }
  871. bool visible = IsVisible(ikey.sequence, ts);
  872. if (!visible &&
  873. (timestamp_lb_ == nullptr ||
  874. user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) {
  875. // Found an invisible version of the current user key, and it must have
  876. // a higher sequence number or timestamp. Therefore, we are done with the
  877. // current user key.
  878. break;
  879. }
  880. if (!ts.empty()) {
  881. saved_timestamp_.assign(ts.data(), ts.size());
  882. }
  883. if (TooManyInternalKeysSkipped()) {
  884. return false;
  885. }
  886. // This user key has lots of entries.
  887. // We're going from old to new, and it's taking too long. Let's do a Seek()
  888. // and go from new to old. This helps when a key was overwritten many times.
  889. if (num_skipped >= max_skip_) {
  890. return FindValueForCurrentKeyUsingSeek();
  891. }
  892. if (!PrepareValueInternal()) {
  893. return false;
  894. }
  895. if (timestamp_lb_ != nullptr) {
  896. // Only needed when timestamp_lb_ is not null
  897. [[maybe_unused]] const bool ret = ParseKey(&ikey_);
  898. // Since the preceding ParseKey(&ikey) succeeds, so must this.
  899. assert(ret);
  900. saved_key_.SetInternalKey(ikey);
  901. } else if (user_comparator_.Compare(ikey.user_key,
  902. saved_key_.GetUserKey()) < 0) {
  903. saved_key_.SetUserKey(
  904. ikey.user_key,
  905. !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
  906. }
  907. valid_entry_seen = true;
  908. last_key_entry_type = ikey.type;
  909. switch (last_key_entry_type) {
  910. case kTypeValue:
  911. case kTypeValuePreferredSeqno:
  912. case kTypeBlobIndex:
  913. case kTypeWideColumnEntity:
  914. if (iter_.iter()->IsValuePinned()) {
  915. saved_write_unix_time_ = iter_.write_unix_time();
  916. if (last_key_entry_type == kTypeValuePreferredSeqno) {
  917. pinned_value_ = ParsePackedValueForValue(iter_.value());
  918. } else {
  919. pinned_value_ = iter_.value();
  920. }
  921. } else {
  922. valid_ = false;
  923. status_ = Status::NotSupported(
  924. "Backward iteration not supported if underlying iterator's value "
  925. "cannot be pinned.");
  926. }
  927. merge_context_.Clear();
  928. last_not_merge_type = last_key_entry_type;
  929. if (!status_.ok()) {
  930. return false;
  931. }
  932. break;
  933. case kTypeDeletion:
  934. case kTypeDeletionWithTimestamp:
  935. case kTypeSingleDeletion:
  936. merge_context_.Clear();
  937. last_not_merge_type = last_key_entry_type;
  938. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  939. break;
  940. case kTypeMerge: {
  941. assert(merge_operator_ != nullptr);
  942. merge_context_.PushOperandBack(
  943. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  944. PERF_COUNTER_ADD(internal_merge_count, 1);
  945. } break;
  946. default:
  947. valid_ = false;
  948. status_ = Status::Corruption(
  949. "Unknown value type: " +
  950. std::to_string(static_cast<unsigned int>(last_key_entry_type)));
  951. return false;
  952. }
  953. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  954. iter_.Prev();
  955. ++num_skipped;
  956. if (visible && timestamp_lb_ != nullptr) {
  957. // If timestamp_lb_ is not nullptr, we do not have to look further for
  958. // another internal key. We can return this current internal key. Yet we
  959. // still keep the invariant that iter_ is positioned before the returned
  960. // key.
  961. break;
  962. }
  963. }
  964. if (!iter_.status().ok()) {
  965. valid_ = false;
  966. return false;
  967. }
  968. if (!valid_entry_seen) {
  969. // Since we haven't seen any valid entry, last_key_entry_type remains
  970. // unchanged and the same as its initial value.
  971. assert(last_key_entry_type == kTypeDeletion);
  972. assert(last_not_merge_type == kTypeDeletion);
  973. valid_ = false;
  974. return true;
  975. }
  976. if (timestamp_lb_ != nullptr) {
  977. assert(last_key_entry_type == ikey_.type);
  978. }
  979. switch (last_key_entry_type) {
  980. case kTypeDeletion:
  981. case kTypeDeletionWithTimestamp:
  982. case kTypeSingleDeletion:
  983. if (timestamp_lb_ == nullptr) {
  984. valid_ = false;
  985. } else {
  986. valid_ = true;
  987. }
  988. return true;
  989. case kTypeMerge:
  990. current_entry_is_merged_ = true;
  991. if (last_not_merge_type == kTypeDeletion ||
  992. last_not_merge_type == kTypeSingleDeletion ||
  993. last_not_merge_type == kTypeDeletionWithTimestamp) {
  994. if (!MergeWithNoBaseValue(saved_key_.GetUserKey())) {
  995. return false;
  996. }
  997. return true;
  998. } else if (last_not_merge_type == kTypeBlobIndex) {
  999. if (!MergeWithBlobBaseValue(pinned_value_, saved_key_.GetUserKey())) {
  1000. return false;
  1001. }
  1002. return true;
  1003. } else if (last_not_merge_type == kTypeWideColumnEntity) {
  1004. if (!MergeWithWideColumnBaseValue(pinned_value_,
  1005. saved_key_.GetUserKey())) {
  1006. return false;
  1007. }
  1008. return true;
  1009. } else {
  1010. assert(last_not_merge_type == kTypeValue ||
  1011. last_not_merge_type == kTypeValuePreferredSeqno);
  1012. if (!MergeWithPlainBaseValue(pinned_value_, saved_key_.GetUserKey())) {
  1013. return false;
  1014. }
  1015. return true;
  1016. }
  1017. case kTypeValue:
  1018. case kTypeValuePreferredSeqno:
  1019. SetValueAndColumnsFromPlain(pinned_value_);
  1020. break;
  1021. case kTypeBlobIndex:
  1022. if (!SetValueAndColumnsFromBlob(saved_key_.GetUserKey(), pinned_value_)) {
  1023. return false;
  1024. }
  1025. break;
  1026. case kTypeWideColumnEntity:
  1027. if (!SetValueAndColumnsFromEntity(pinned_value_)) {
  1028. return false;
  1029. }
  1030. break;
  1031. default:
  1032. valid_ = false;
  1033. status_ = Status::Corruption(
  1034. "Unknown value type: " +
  1035. std::to_string(static_cast<unsigned int>(last_key_entry_type)));
  1036. return false;
  1037. }
  1038. valid_ = true;
  1039. return true;
  1040. }
  1041. // This function is used in FindValueForCurrentKey.
  1042. // We use Seek() function instead of Prev() to find necessary value
  1043. // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
  1044. // Would be nice to reuse some code.
  1045. bool DBIter::FindValueForCurrentKeyUsingSeek() {
  1046. // FindValueForCurrentKey will enable pinning before calling
  1047. // FindValueForCurrentKeyUsingSeek()
  1048. assert(pinned_iters_mgr_.PinningEnabled());
  1049. std::string last_key;
  1050. if (0 == timestamp_size_) {
  1051. AppendInternalKey(&last_key,
  1052. ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
  1053. kValueTypeForSeek));
  1054. } else {
  1055. AppendInternalKeyWithDifferentTimestamp(
  1056. &last_key,
  1057. ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
  1058. kValueTypeForSeek),
  1059. timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_);
  1060. }
  1061. iter_.Seek(last_key);
  1062. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  1063. // In case read_callback presents, the value we seek to may not be visible.
  1064. // Find the next value that's visible.
  1065. ParsedInternalKey ikey;
  1066. while (true) {
  1067. if (!iter_.Valid()) {
  1068. valid_ = false;
  1069. return iter_.status().ok();
  1070. }
  1071. if (!ParseKey(&ikey)) {
  1072. return false;
  1073. }
  1074. assert(ikey.user_key.size() >= timestamp_size_);
  1075. Slice ts;
  1076. if (timestamp_size_ > 0) {
  1077. ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
  1078. timestamp_size_);
  1079. }
  1080. if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
  1081. saved_key_.GetUserKey())) {
  1082. // No visible values for this key, even though FindValueForCurrentKey()
  1083. // has seen some. This is possible if we're using a tailing iterator, and
  1084. // the entries were discarded in a compaction.
  1085. valid_ = false;
  1086. return true;
  1087. }
  1088. if (IsVisible(ikey.sequence, ts)) {
  1089. break;
  1090. }
  1091. iter_.Next();
  1092. }
  1093. if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
  1094. kTypeDeletionWithTimestamp == ikey.type) {
  1095. if (timestamp_lb_ == nullptr) {
  1096. valid_ = false;
  1097. } else {
  1098. valid_ = true;
  1099. saved_key_.SetInternalKey(ikey);
  1100. }
  1101. return true;
  1102. }
  1103. if (!PrepareValueInternal()) {
  1104. return false;
  1105. }
  1106. if (timestamp_size_ > 0) {
  1107. Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
  1108. saved_timestamp_.assign(ts.data(), ts.size());
  1109. }
  1110. if (ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno ||
  1111. ikey.type == kTypeBlobIndex || ikey.type == kTypeWideColumnEntity) {
  1112. assert(iter_.iter()->IsValuePinned());
  1113. saved_write_unix_time_ = iter_.write_unix_time();
  1114. if (ikey.type == kTypeValuePreferredSeqno) {
  1115. pinned_value_ = ParsePackedValueForValue(iter_.value());
  1116. } else {
  1117. pinned_value_ = iter_.value();
  1118. }
  1119. if (ikey.type == kTypeBlobIndex) {
  1120. if (!SetValueAndColumnsFromBlob(ikey.user_key, pinned_value_)) {
  1121. return false;
  1122. }
  1123. } else if (ikey.type == kTypeWideColumnEntity) {
  1124. if (!SetValueAndColumnsFromEntity(pinned_value_)) {
  1125. return false;
  1126. }
  1127. } else {
  1128. assert(ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno);
  1129. SetValueAndColumnsFromPlain(pinned_value_);
  1130. }
  1131. if (timestamp_lb_ != nullptr) {
  1132. saved_key_.SetInternalKey(ikey);
  1133. } else {
  1134. saved_key_.SetUserKey(ikey.user_key);
  1135. }
  1136. valid_ = true;
  1137. return true;
  1138. }
  1139. // kTypeMerge. We need to collect all kTypeMerge values and save them
  1140. // in operands
  1141. assert(ikey.type == kTypeMerge);
  1142. current_entry_is_merged_ = true;
  1143. merge_context_.Clear();
  1144. merge_context_.PushOperand(
  1145. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  1146. PERF_COUNTER_ADD(internal_merge_count, 1);
  1147. while (true) {
  1148. iter_.Next();
  1149. if (!iter_.Valid()) {
  1150. if (!iter_.status().ok()) {
  1151. valid_ = false;
  1152. return false;
  1153. }
  1154. break;
  1155. }
  1156. if (!ParseKey(&ikey)) {
  1157. return false;
  1158. }
  1159. if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
  1160. saved_key_.GetUserKey())) {
  1161. break;
  1162. }
  1163. if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
  1164. ikey.type == kTypeDeletionWithTimestamp) {
  1165. break;
  1166. }
  1167. if (!PrepareValueInternal()) {
  1168. return false;
  1169. }
  1170. if (ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno) {
  1171. Slice value = iter_.value();
  1172. if (ikey.type == kTypeValuePreferredSeqno) {
  1173. value = ParsePackedValueForValue(value);
  1174. }
  1175. if (!MergeWithPlainBaseValue(value, saved_key_.GetUserKey())) {
  1176. return false;
  1177. }
  1178. return true;
  1179. } else if (ikey.type == kTypeMerge) {
  1180. merge_context_.PushOperand(
  1181. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  1182. PERF_COUNTER_ADD(internal_merge_count, 1);
  1183. } else if (ikey.type == kTypeBlobIndex) {
  1184. if (!MergeWithBlobBaseValue(iter_.value(), saved_key_.GetUserKey())) {
  1185. return false;
  1186. }
  1187. return true;
  1188. } else if (ikey.type == kTypeWideColumnEntity) {
  1189. if (!MergeWithWideColumnBaseValue(iter_.value(),
  1190. saved_key_.GetUserKey())) {
  1191. return false;
  1192. }
  1193. return true;
  1194. } else {
  1195. valid_ = false;
  1196. status_ = Status::Corruption(
  1197. "Unknown value type: " +
  1198. std::to_string(static_cast<unsigned int>(ikey.type)));
  1199. return false;
  1200. }
  1201. }
  1202. if (!MergeWithNoBaseValue(saved_key_.GetUserKey())) {
  1203. return false;
  1204. }
  1205. // Make sure we leave iter_ in a good state. If it's valid and we don't care
  1206. // about prefixes, that's already good enough. Otherwise it needs to be
  1207. // seeked to the current key.
  1208. if (!expect_total_order_inner_iter() || !iter_.Valid()) {
  1209. if (!expect_total_order_inner_iter()) {
  1210. iter_.SeekForPrev(last_key);
  1211. } else {
  1212. iter_.Seek(last_key);
  1213. if (!iter_.Valid() && iter_.status().ok()) {
  1214. iter_.SeekToLast();
  1215. }
  1216. }
  1217. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  1218. }
  1219. valid_ = true;
  1220. return true;
  1221. }
  1222. bool DBIter::MergeWithNoBaseValue(const Slice& user_key) {
  1223. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  1224. // since a failure must be propagated regardless of its value.
  1225. ValueType result_type;
  1226. const Status s = MergeHelper::TimedFullMerge(
  1227. merge_operator_, user_key, MergeHelper::kNoBaseValue,
  1228. merge_context_.GetOperands(), logger_, statistics_, clock_,
  1229. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  1230. &saved_value_, &pinned_value_, &result_type);
  1231. return SetValueAndColumnsFromMergeResult(s, result_type);
  1232. }
  1233. bool DBIter::MergeWithPlainBaseValue(const Slice& value,
  1234. const Slice& user_key) {
  1235. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  1236. // since a failure must be propagated regardless of its value.
  1237. ValueType result_type;
  1238. const Status s = MergeHelper::TimedFullMerge(
  1239. merge_operator_, user_key, MergeHelper::kPlainBaseValue, value,
  1240. merge_context_.GetOperands(), logger_, statistics_, clock_,
  1241. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  1242. &saved_value_, &pinned_value_, &result_type);
  1243. return SetValueAndColumnsFromMergeResult(s, result_type);
  1244. }
  1245. bool DBIter::MergeWithBlobBaseValue(const Slice& blob_index,
  1246. const Slice& user_key) {
  1247. assert(!is_blob_);
  1248. if (expose_blob_index_) {
  1249. status_ =
  1250. Status::NotSupported("Legacy BlobDB does not support merge operator.");
  1251. valid_ = false;
  1252. return false;
  1253. }
  1254. const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index);
  1255. if (!s.ok()) {
  1256. status_ = s;
  1257. valid_ = false;
  1258. return false;
  1259. }
  1260. valid_ = true;
  1261. if (!MergeWithPlainBaseValue(blob_reader_.GetBlobValue(), user_key)) {
  1262. return false;
  1263. }
  1264. blob_reader_.ResetBlobValue();
  1265. return true;
  1266. }
  1267. bool DBIter::MergeWithWideColumnBaseValue(const Slice& entity,
  1268. const Slice& user_key) {
  1269. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  1270. // since a failure must be propagated regardless of its value.
  1271. ValueType result_type;
  1272. const Status s = MergeHelper::TimedFullMerge(
  1273. merge_operator_, user_key, MergeHelper::kWideBaseValue, entity,
  1274. merge_context_.GetOperands(), logger_, statistics_, clock_,
  1275. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  1276. &saved_value_, &pinned_value_, &result_type);
  1277. return SetValueAndColumnsFromMergeResult(s, result_type);
  1278. }
  1279. // Move backwards until the key smaller than saved_key_.
  1280. // Changes valid_ only if return value is false.
  1281. bool DBIter::FindUserKeyBeforeSavedKey() {
  1282. assert(status_.ok());
  1283. size_t num_skipped = 0;
  1284. while (iter_.Valid()) {
  1285. ParsedInternalKey ikey;
  1286. if (!ParseKey(&ikey)) {
  1287. return false;
  1288. }
  1289. if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) {
  1290. return true;
  1291. }
  1292. if (TooManyInternalKeysSkipped()) {
  1293. return false;
  1294. }
  1295. assert(ikey.sequence != kMaxSequenceNumber);
  1296. assert(ikey.user_key.size() >= timestamp_size_);
  1297. Slice ts;
  1298. if (timestamp_size_ > 0) {
  1299. ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
  1300. timestamp_size_);
  1301. }
  1302. if (!IsVisible(ikey.sequence, ts)) {
  1303. PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
  1304. } else {
  1305. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  1306. }
  1307. if (num_skipped >= max_skip_) {
  1308. num_skipped = 0;
  1309. std::string last_key;
  1310. if (timestamp_size_ == 0) {
  1311. AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
  1312. kMaxSequenceNumber,
  1313. kValueTypeForSeek));
  1314. } else {
  1315. // TODO: pre-create kTsMax.
  1316. const std::string kTsMax(timestamp_size_, '\xff');
  1317. AppendInternalKeyWithDifferentTimestamp(
  1318. &last_key,
  1319. ParsedInternalKey(saved_key_.GetUserKey(), kMaxSequenceNumber,
  1320. kValueTypeForSeek),
  1321. kTsMax);
  1322. }
  1323. // It would be more efficient to use SeekForPrev() here, but some
  1324. // iterators may not support it.
  1325. iter_.Seek(last_key);
  1326. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  1327. if (!iter_.Valid()) {
  1328. break;
  1329. }
  1330. } else {
  1331. ++num_skipped;
  1332. }
  1333. iter_.Prev();
  1334. }
  1335. if (!iter_.status().ok()) {
  1336. valid_ = false;
  1337. return false;
  1338. }
  1339. return true;
  1340. }
  1341. bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  1342. if ((max_skippable_internal_keys_ > 0) &&
  1343. (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
  1344. valid_ = false;
  1345. status_ = Status::Incomplete("Too many internal keys skipped.");
  1346. return true;
  1347. } else if (increment) {
  1348. num_internal_keys_skipped_++;
  1349. }
  1350. return false;
  1351. }
  1352. bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
  1353. bool* more_recent) {
  1354. // Remember that comparator orders preceding timestamp as larger.
  1355. // TODO(yanqin): support timestamp in read_callback_.
  1356. bool visible_by_seq = (read_callback_ == nullptr)
  1357. ? sequence <= sequence_
  1358. : read_callback_->IsVisible(sequence);
  1359. bool visible_by_ts =
  1360. (timestamp_ub_ == nullptr ||
  1361. user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
  1362. (timestamp_lb_ == nullptr ||
  1363. user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);
  1364. if (more_recent) {
  1365. *more_recent = !visible_by_seq;
  1366. }
  1367. return visible_by_seq && visible_by_ts;
  1368. }
  1369. void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
  1370. is_key_seqnum_zero_ = false;
  1371. SequenceNumber seq = sequence_;
  1372. saved_key_.Clear();
  1373. saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
  1374. if (iterate_lower_bound_ != nullptr &&
  1375. user_comparator_.CompareWithoutTimestamp(
  1376. saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
  1377. /*b_has_ts=*/false) < 0) {
  1378. // Seek key is smaller than the lower bound.
  1379. saved_key_.Clear();
  1380. saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
  1381. timestamp_ub_);
  1382. }
  1383. }
  1384. void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
  1385. is_key_seqnum_zero_ = false;
  1386. saved_key_.Clear();
  1387. // now saved_key is used to store internal key.
  1388. saved_key_.SetInternalKey(target, 0 /* sequence_number */,
  1389. kValueTypeForSeekForPrev, timestamp_ub_);
  1390. if (timestamp_size_ > 0) {
  1391. const std::string kTsMin(timestamp_size_, '\0');
  1392. Slice ts = kTsMin;
  1393. saved_key_.UpdateInternalKey(
  1394. /*seq=*/0, kValueTypeForSeekForPrev,
  1395. timestamp_lb_ == nullptr ? &ts : timestamp_lb_);
  1396. }
  1397. if (iterate_upper_bound_ != nullptr &&
  1398. user_comparator_.CompareWithoutTimestamp(
  1399. saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_,
  1400. /*b_has_ts=*/false) >= 0) {
  1401. saved_key_.Clear();
  1402. saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber,
  1403. kValueTypeForSeekForPrev, timestamp_ub_);
  1404. if (timestamp_size_ > 0) {
  1405. const std::string kTsMax(timestamp_size_, '\xff');
  1406. Slice ts = kTsMax;
  1407. saved_key_.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeekForPrev,
  1408. &ts);
  1409. }
  1410. }
  1411. }
  1412. Status DBIter::ValidateScanOptions(const MultiScanArgs& multiscan_opts) const {
  1413. if (multiscan_opts.empty()) {
  1414. return Status::InvalidArgument("Empty MultiScanArgs");
  1415. }
  1416. const std::vector<ScanOptions>& scan_opts = multiscan_opts.GetScanRanges();
  1417. const bool has_limit = scan_opts.front().range.limit.has_value();
  1418. if (!has_limit && scan_opts.size() > 1) {
  1419. return Status::InvalidArgument("Scan has no upper bound");
  1420. }
  1421. for (size_t i = 0; i < scan_opts.size(); ++i) {
  1422. const auto& scan_range = scan_opts[i].range;
  1423. if (!scan_range.start.has_value()) {
  1424. return Status::InvalidArgument("Scan has no start key at index " +
  1425. std::to_string(i));
  1426. }
  1427. if (scan_range.limit.has_value()) {
  1428. if (user_comparator_.CompareWithoutTimestamp(
  1429. scan_range.start.value(), /*a_has_ts=*/false,
  1430. scan_range.limit.value(), /*b_has_ts=*/false) >= 0) {
  1431. return Status::InvalidArgument(
  1432. "Scan start key is large or equal than limit at index " +
  1433. std::to_string(i));
  1434. }
  1435. }
  1436. if (i > 0) {
  1437. if (!scan_range.limit.has_value()) {
  1438. // multiple scan without limit scan ranges
  1439. return Status::InvalidArgument("Scan has no upper bound at index " +
  1440. std::to_string(i));
  1441. }
  1442. const auto& last_end_key = scan_opts[i - 1].range.limit.value();
  1443. if (user_comparator_.CompareWithoutTimestamp(
  1444. scan_range.start.value(), /*a_has_ts=*/false, last_end_key,
  1445. /*b_has_ts=*/false) < 0) {
  1446. return Status::InvalidArgument("Overlapping ranges at index " +
  1447. std::to_string(i));
  1448. }
  1449. }
  1450. }
  1451. return Status::OK();
  1452. }
  1453. void DBIter::Prepare(const MultiScanArgs& scan_opts) {
  1454. status_ = ValidateScanOptions(scan_opts);
  1455. if (!status_.ok()) {
  1456. return;
  1457. }
  1458. std::optional<MultiScanArgs> new_scan_opts;
  1459. new_scan_opts.emplace(scan_opts);
  1460. scan_opts_.swap(new_scan_opts);
  1461. scan_index_ = 0;
  1462. if (!scan_opts.empty()) {
  1463. iter_.Prepare(&scan_opts_.value());
  1464. } else {
  1465. iter_.Prepare(nullptr);
  1466. }
  1467. }
  1468. void DBIter::Seek(const Slice& target) {
  1469. PERF_COUNTER_ADD(iter_seek_count, 1);
  1470. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  1471. StopWatch sw(clock_, statistics_, DB_SEEK);
  1472. if (scan_opts_.has_value()) {
  1473. // Validate the seek target is as expected in the previously prepared range
  1474. auto const& scan_ranges = scan_opts_.value().GetScanRanges();
  1475. if (scan_index_ >= scan_ranges.size()) {
  1476. status_ = Status::InvalidArgument(
  1477. "Seek called after exhausting all of the scan ranges");
  1478. valid_ = false;
  1479. return;
  1480. }
  1481. // Validate start key of next prepare range matches the seek target
  1482. auto const& range = scan_ranges[scan_index_];
  1483. auto const& start = range.range.start;
  1484. assert(start.has_value());
  1485. if (user_comparator_.CompareWithoutTimestamp(target, *start) != 0) {
  1486. status_ = Status::InvalidArgument(
  1487. "Seek target does not match the start of the next prepared range at "
  1488. "index " +
  1489. std::to_string(scan_index_));
  1490. valid_ = false;
  1491. return;
  1492. }
  1493. // validate the upper bound is set to the same value of limit, if limit
  1494. // exists
  1495. auto const& limit = range.range.limit;
  1496. if (limit.has_value()) {
  1497. if (iterate_upper_bound_ == nullptr ||
  1498. user_comparator_.CompareWithoutTimestamp(
  1499. limit.value(), *iterate_upper_bound_) != 0) {
  1500. status_ = Status::InvalidArgument(
  1501. "Upper bound is not set to the same limit value of the next "
  1502. "prepared range at index " +
  1503. std::to_string(scan_index_));
  1504. valid_ = false;
  1505. return;
  1506. }
  1507. }
  1508. scan_index_++;
  1509. }
  1510. if (cfh_ != nullptr) {
  1511. // TODO: What do we do if this returns an error?
  1512. Slice lower_bound, upper_bound;
  1513. if (iterate_lower_bound_ != nullptr) {
  1514. lower_bound = *iterate_lower_bound_;
  1515. } else {
  1516. lower_bound = Slice("");
  1517. }
  1518. if (iterate_upper_bound_ != nullptr) {
  1519. upper_bound = *iterate_upper_bound_;
  1520. } else {
  1521. upper_bound = Slice("");
  1522. }
  1523. cfh_->db()
  1524. ->TraceIteratorSeek(cfh_->cfd()->GetID(), target, lower_bound,
  1525. upper_bound)
  1526. .PermitUncheckedError();
  1527. }
  1528. status_ = Status::OK();
  1529. ReleaseTempPinnedData();
  1530. ResetBlobData();
  1531. ResetValueAndColumns();
  1532. ResetInternalKeysSkippedCounter();
  1533. MarkMemtableForFlushForAvgTrigger();
  1534. // Seek the inner iterator based on the target key.
  1535. {
  1536. PERF_TIMER_GUARD(seek_internal_seek_time);
  1537. SetSavedKeyToSeekTarget(target);
  1538. iter_.Seek(saved_key_.GetInternalKey());
  1539. RecordTick(statistics_, NUMBER_DB_SEEK);
  1540. }
  1541. if (!iter_.Valid()) {
  1542. valid_ = false;
  1543. return;
  1544. }
  1545. direction_ = kForward;
  1546. // Now the inner iterator is placed to the target position. From there,
  1547. // we need to find out the next key that is visible to the user.
  1548. ClearSavedValue();
  1549. if (prefix_same_as_start_) {
  1550. // The case where the iterator needs to be invalidated if it has exhausted
  1551. // keys within the same prefix of the seek key.
  1552. assert(prefix_extractor_ != nullptr);
  1553. Slice target_prefix = prefix_extractor_->Transform(target);
  1554. FindNextUserEntry(false /* not skipping saved_key */,
  1555. &target_prefix /* prefix */);
  1556. if (valid_) {
  1557. // Remember the prefix of the seek key for the future Next() call to
  1558. // check.
  1559. prefix_.SetUserKey(target_prefix);
  1560. }
  1561. } else {
  1562. FindNextUserEntry(false /* not skipping saved_key */, nullptr);
  1563. }
  1564. if (!valid_) {
  1565. return;
  1566. }
  1567. // Updating stats and perf context counters.
  1568. if (statistics_ != nullptr) {
  1569. // Decrement since we don't want to count this key as skipped
  1570. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1571. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1572. }
  1573. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1574. }
  1575. void DBIter::SeekForPrev(const Slice& target) {
  1576. PERF_COUNTER_ADD(iter_seek_count, 1);
  1577. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  1578. StopWatch sw(clock_, statistics_, DB_SEEK);
  1579. if (cfh_ != nullptr) {
  1580. // TODO: What do we do if this returns an error?
  1581. Slice lower_bound, upper_bound;
  1582. if (iterate_lower_bound_ != nullptr) {
  1583. lower_bound = *iterate_lower_bound_;
  1584. } else {
  1585. lower_bound = Slice("");
  1586. }
  1587. if (iterate_upper_bound_ != nullptr) {
  1588. upper_bound = *iterate_upper_bound_;
  1589. } else {
  1590. upper_bound = Slice("");
  1591. }
  1592. cfh_->db()
  1593. ->TraceIteratorSeekForPrev(cfh_->cfd()->GetID(), target, lower_bound,
  1594. upper_bound)
  1595. .PermitUncheckedError();
  1596. }
  1597. status_ = Status::OK();
  1598. ReleaseTempPinnedData();
  1599. ResetBlobData();
  1600. ResetValueAndColumns();
  1601. ResetInternalKeysSkippedCounter();
  1602. MarkMemtableForFlushForAvgTrigger();
  1603. // Seek the inner iterator based on the target key.
  1604. {
  1605. PERF_TIMER_GUARD(seek_internal_seek_time);
  1606. SetSavedKeyToSeekForPrevTarget(target);
  1607. iter_.SeekForPrev(saved_key_.GetInternalKey());
  1608. RecordTick(statistics_, NUMBER_DB_SEEK);
  1609. }
  1610. if (!iter_.Valid()) {
  1611. valid_ = false;
  1612. return;
  1613. }
  1614. direction_ = kReverse;
  1615. // Now the inner iterator is placed to the target position. From there,
  1616. // we need to find out the first key that is visible to the user in the
  1617. // backward direction.
  1618. ClearSavedValue();
  1619. if (prefix_same_as_start_) {
  1620. // The case where the iterator needs to be invalidated if it has exhausted
  1621. // keys within the same prefix of the seek key.
  1622. assert(prefix_extractor_ != nullptr);
  1623. Slice target_prefix = prefix_extractor_->Transform(target);
  1624. PrevInternal(&target_prefix);
  1625. if (valid_) {
  1626. // Remember the prefix of the seek key for the future Prev() call to
  1627. // check.
  1628. prefix_.SetUserKey(target_prefix);
  1629. }
  1630. } else {
  1631. PrevInternal(nullptr);
  1632. }
  1633. // Report stats and perf context.
  1634. if (statistics_ != nullptr && valid_) {
  1635. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1636. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1637. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1638. }
  1639. }
  1640. void DBIter::SeekToFirst() {
  1641. if (iterate_lower_bound_ != nullptr) {
  1642. Seek(*iterate_lower_bound_);
  1643. return;
  1644. }
  1645. PERF_COUNTER_ADD(iter_seek_count, 1);
  1646. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  1647. // Don't use iter_::Seek() if we set a prefix extractor
  1648. // because prefix seek will be used.
  1649. if (!expect_total_order_inner_iter()) {
  1650. max_skip_ = std::numeric_limits<uint64_t>::max();
  1651. }
  1652. status_ = Status::OK();
  1653. // if iterator is empty, this status_ could be unchecked.
  1654. status_.PermitUncheckedError();
  1655. direction_ = kForward;
  1656. ReleaseTempPinnedData();
  1657. ResetBlobData();
  1658. ResetValueAndColumns();
  1659. ResetInternalKeysSkippedCounter();
  1660. MarkMemtableForFlushForAvgTrigger();
  1661. ClearSavedValue();
  1662. is_key_seqnum_zero_ = false;
  1663. {
  1664. PERF_TIMER_GUARD(seek_internal_seek_time);
  1665. iter_.SeekToFirst();
  1666. }
  1667. RecordTick(statistics_, NUMBER_DB_SEEK);
  1668. if (iter_.Valid()) {
  1669. saved_key_.SetUserKey(
  1670. ExtractUserKey(iter_.key()),
  1671. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  1672. FindNextUserEntry(false /* not skipping saved_key */,
  1673. nullptr /* no prefix check */);
  1674. if (statistics_ != nullptr) {
  1675. if (valid_) {
  1676. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1677. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1678. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1679. }
  1680. }
  1681. } else {
  1682. valid_ = false;
  1683. }
  1684. if (valid_ && prefix_same_as_start_) {
  1685. assert(prefix_extractor_ != nullptr);
  1686. prefix_.SetUserKey(prefix_extractor_->Transform(
  1687. StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
  1688. }
  1689. }
  1690. void DBIter::SeekToLast() {
  1691. if (iterate_upper_bound_ != nullptr) {
  1692. // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
  1693. SeekForPrev(*iterate_upper_bound_);
  1694. #ifndef NDEBUG
  1695. Slice k = Valid() ? key() : Slice();
  1696. if (Valid() && timestamp_size_ > 0 && timestamp_lb_) {
  1697. k.remove_suffix(kNumInternalBytes + timestamp_size_);
  1698. }
  1699. assert(!Valid() || user_comparator_.CompareWithoutTimestamp(
  1700. k, /*a_has_ts=*/false, *iterate_upper_bound_,
  1701. /*b_has_ts=*/false) < 0);
  1702. #endif
  1703. return;
  1704. }
  1705. PERF_COUNTER_ADD(iter_seek_count, 1);
  1706. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
  1707. // Don't use iter_::Seek() if we set a prefix extractor
  1708. // because prefix seek will be used.
  1709. if (!expect_total_order_inner_iter()) {
  1710. max_skip_ = std::numeric_limits<uint64_t>::max();
  1711. }
  1712. status_ = Status::OK();
  1713. // if iterator is empty, this status_ could be unchecked.
  1714. status_.PermitUncheckedError();
  1715. direction_ = kReverse;
  1716. ReleaseTempPinnedData();
  1717. ResetBlobData();
  1718. ResetValueAndColumns();
  1719. ResetInternalKeysSkippedCounter();
  1720. MarkMemtableForFlushForAvgTrigger();
  1721. ClearSavedValue();
  1722. is_key_seqnum_zero_ = false;
  1723. {
  1724. PERF_TIMER_GUARD(seek_internal_seek_time);
  1725. iter_.SeekToLast();
  1726. }
  1727. PrevInternal(nullptr);
  1728. if (statistics_ != nullptr) {
  1729. RecordTick(statistics_, NUMBER_DB_SEEK);
  1730. if (valid_) {
  1731. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1732. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1733. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1734. }
  1735. }
  1736. if (valid_ && prefix_same_as_start_) {
  1737. assert(prefix_extractor_ != nullptr);
  1738. prefix_.SetUserKey(prefix_extractor_->Transform(
  1739. StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
  1740. }
  1741. }
  1742. } // namespace ROCKSDB_NAMESPACE