db_iter.cc 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_iter.h"
  10. #include <string>
  11. #include <iostream>
  12. #include <limits>
  13. #include "db/dbformat.h"
  14. #include "db/merge_context.h"
  15. #include "db/merge_helper.h"
  16. #include "db/pinned_iterators_manager.h"
  17. #include "file/filename.h"
  18. #include "logging/logging.h"
  19. #include "memory/arena.h"
  20. #include "monitoring/perf_context_imp.h"
  21. #include "rocksdb/env.h"
  22. #include "rocksdb/iterator.h"
  23. #include "rocksdb/merge_operator.h"
  24. #include "rocksdb/options.h"
  25. #include "table/internal_iterator.h"
  26. #include "table/iterator_wrapper.h"
  27. #include "trace_replay/trace_replay.h"
  28. #include "util/mutexlock.h"
  29. #include "util/string_util.h"
  30. #include "util/user_comparator_wrapper.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. DBIter::DBIter(Env* _env, const ReadOptions& read_options,
  33. const ImmutableCFOptions& cf_options,
  34. const MutableCFOptions& mutable_cf_options,
  35. const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
  36. bool arena_mode, uint64_t max_sequential_skip_in_iterations,
  37. ReadCallback* read_callback, DBImpl* db_impl,
  38. ColumnFamilyData* cfd, bool allow_blob)
  39. : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
  40. env_(_env),
  41. logger_(cf_options.info_log),
  42. user_comparator_(cmp),
  43. merge_operator_(cf_options.merge_operator),
  44. iter_(iter),
  45. read_callback_(read_callback),
  46. sequence_(s),
  47. statistics_(cf_options.statistics),
  48. num_internal_keys_skipped_(0),
  49. iterate_lower_bound_(read_options.iterate_lower_bound),
  50. iterate_upper_bound_(read_options.iterate_upper_bound),
  51. direction_(kForward),
  52. valid_(false),
  53. current_entry_is_merged_(false),
  54. is_key_seqnum_zero_(false),
  55. prefix_same_as_start_(mutable_cf_options.prefix_extractor
  56. ? read_options.prefix_same_as_start
  57. : false),
  58. pin_thru_lifetime_(read_options.pin_data),
  59. expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
  60. read_options.total_order_seek ||
  61. read_options.auto_prefix_mode),
  62. allow_blob_(allow_blob),
  63. is_blob_(false),
  64. arena_mode_(arena_mode),
  65. range_del_agg_(&cf_options.internal_comparator, s),
  66. db_impl_(db_impl),
  67. cfd_(cfd),
  68. start_seqnum_(read_options.iter_start_seqnum) {
  69. RecordTick(statistics_, NO_ITERATOR_CREATED);
  70. max_skip_ = max_sequential_skip_in_iterations;
  71. max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
  72. if (pin_thru_lifetime_) {
  73. pinned_iters_mgr_.StartPinning();
  74. }
  75. if (iter_.iter()) {
  76. iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
  77. }
  78. }
  79. Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
  80. if (prop == nullptr) {
  81. return Status::InvalidArgument("prop is nullptr");
  82. }
  83. if (prop_name == "rocksdb.iterator.super-version-number") {
  84. // First try to pass the value returned from inner iterator.
  85. return iter_.iter()->GetProperty(prop_name, prop);
  86. } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
  87. if (valid_) {
  88. *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
  89. } else {
  90. *prop = "Iterator is not valid.";
  91. }
  92. return Status::OK();
  93. } else if (prop_name == "rocksdb.iterator.internal-key") {
  94. *prop = saved_key_.GetUserKey().ToString();
  95. return Status::OK();
  96. }
  97. return Status::InvalidArgument("Unidentified property.");
  98. }
  99. bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  100. if (!ParseInternalKey(iter_.key(), ikey)) {
  101. status_ = Status::Corruption("corrupted internal key in DBIter");
  102. valid_ = false;
  103. ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
  104. iter_.key().ToString(true).c_str());
  105. return false;
  106. } else {
  107. return true;
  108. }
  109. }
  110. void DBIter::Next() {
  111. assert(valid_);
  112. assert(status_.ok());
  113. PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
  114. // Release temporarily pinned blocks from last operation
  115. ReleaseTempPinnedData();
  116. local_stats_.skip_count_ += num_internal_keys_skipped_;
  117. local_stats_.skip_count_--;
  118. num_internal_keys_skipped_ = 0;
  119. bool ok = true;
  120. if (direction_ == kReverse) {
  121. is_key_seqnum_zero_ = false;
  122. if (!ReverseToForward()) {
  123. ok = false;
  124. }
  125. } else if (!current_entry_is_merged_) {
  126. // If the current value is not a merge, the iter position is the
  127. // current key, which is already returned. We can safely issue a
  128. // Next() without checking the current key.
  129. // If the current key is a merge, very likely iter already points
  130. // to the next internal position.
  131. assert(iter_.Valid());
  132. iter_.Next();
  133. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  134. }
  135. local_stats_.next_count_++;
  136. if (ok && iter_.Valid()) {
  137. Slice prefix;
  138. if (prefix_same_as_start_) {
  139. assert(prefix_extractor_ != nullptr);
  140. prefix = prefix_.GetUserKey();
  141. }
  142. FindNextUserEntry(true /* skipping the current user key */,
  143. prefix_same_as_start_ ? &prefix : nullptr);
  144. } else {
  145. is_key_seqnum_zero_ = false;
  146. valid_ = false;
  147. }
  148. if (statistics_ != nullptr && valid_) {
  149. local_stats_.next_found_count_++;
  150. local_stats_.bytes_read_ += (key().size() + value().size());
  151. }
  152. }
  153. // PRE: saved_key_ has the current user key if skipping_saved_key
  154. // POST: saved_key_ should have the next user key if valid_,
  155. // if the current entry is a result of merge
  156. // current_entry_is_merged_ => true
  157. // saved_value_ => the merged value
  158. //
  159. // NOTE: In between, saved_key_ can point to a user key that has
  160. // a delete marker or a sequence number higher than sequence_
  161. // saved_key_ MUST have a proper user_key before calling this function
  162. //
  163. // The prefix parameter, if not null, indicates that we need to iterate
  164. // within the prefix, and the iterator needs to be made invalid, if no
  165. // more entry for the prefix can be found.
  166. bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
  167. PERF_TIMER_GUARD(find_next_user_entry_time);
  168. return FindNextUserEntryInternal(skipping_saved_key, prefix);
  169. }
  170. // Actual implementation of DBIter::FindNextUserEntry()
  171. bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
  172. const Slice* prefix) {
  173. // Loop until we hit an acceptable entry to yield
  174. assert(iter_.Valid());
  175. assert(status_.ok());
  176. assert(direction_ == kForward);
  177. current_entry_is_merged_ = false;
  178. // How many times in a row we have skipped an entry with user key less than
  179. // or equal to saved_key_. We could skip these entries either because
  180. // sequence numbers were too high or because skipping_saved_key = true.
  181. // What saved_key_ contains throughout this method:
  182. // - if skipping_saved_key : saved_key_ contains the key that we need
  183. // to skip, and we haven't seen any keys greater
  184. // than that,
  185. // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
  186. // num_skipped times, and we haven't seen any keys
  187. // greater than that,
  188. // - none of the above : saved_key_ can contain anything, it doesn't
  189. // matter.
  190. uint64_t num_skipped = 0;
  191. // For write unprepared, the target sequence number in reseek could be larger
  192. // than the snapshot, and thus needs to be skipped again. This could result in
  193. // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
  194. // to one.
  195. bool reseek_done = false;
  196. is_blob_ = false;
  197. do {
  198. // Will update is_key_seqnum_zero_ as soon as we parsed the current key
  199. // but we need to save the previous value to be used in the loop.
  200. bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
  201. if (!ParseKey(&ikey_)) {
  202. is_key_seqnum_zero_ = false;
  203. return false;
  204. }
  205. is_key_seqnum_zero_ = (ikey_.sequence == 0);
  206. assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
  207. user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
  208. if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
  209. user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
  210. break;
  211. }
  212. assert(prefix == nullptr || prefix_extractor_ != nullptr);
  213. if (prefix != nullptr &&
  214. prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
  215. assert(prefix_same_as_start_);
  216. break;
  217. }
  218. if (TooManyInternalKeysSkipped()) {
  219. return false;
  220. }
  221. if (IsVisible(ikey_.sequence)) {
  222. // If the previous entry is of seqnum 0, the current entry will not
  223. // possibly be skipped. This condition can potentially be relaxed to
  224. // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
  225. // prone to bugs causing the same user key with the same sequence number.
  226. if (!is_prev_key_seqnum_zero && skipping_saved_key &&
  227. user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
  228. 0) {
  229. num_skipped++; // skip this entry
  230. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  231. } else {
  232. assert(!skipping_saved_key ||
  233. user_comparator_.Compare(ikey_.user_key,
  234. saved_key_.GetUserKey()) > 0);
  235. num_skipped = 0;
  236. reseek_done = false;
  237. switch (ikey_.type) {
  238. case kTypeDeletion:
  239. case kTypeSingleDeletion:
  240. // Arrange to skip all upcoming entries for this key since
  241. // they are hidden by this deletion.
  242. // if iterartor specified start_seqnum we
  243. // 1) return internal key, including the type
  244. // 2) return ikey only if ikey.seqnum >= start_seqnum_
  245. // note that if deletion seqnum is < start_seqnum_ we
  246. // just skip it like in normal iterator.
  247. if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
  248. saved_key_.SetInternalKey(ikey_);
  249. valid_ = true;
  250. return true;
  251. } else {
  252. saved_key_.SetUserKey(
  253. ikey_.user_key, !pin_thru_lifetime_ ||
  254. !iter_.iter()->IsKeyPinned() /* copy */);
  255. skipping_saved_key = true;
  256. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  257. }
  258. break;
  259. case kTypeValue:
  260. case kTypeBlobIndex:
  261. if (start_seqnum_ > 0) {
  262. // we are taking incremental snapshot here
  263. // incremental snapshots aren't supported on DB with range deletes
  264. assert(ikey_.type != kTypeBlobIndex);
  265. if (ikey_.sequence >= start_seqnum_) {
  266. saved_key_.SetInternalKey(ikey_);
  267. valid_ = true;
  268. return true;
  269. } else {
  270. // this key and all previous versions shouldn't be included,
  271. // skipping_saved_key
  272. saved_key_.SetUserKey(
  273. ikey_.user_key,
  274. !pin_thru_lifetime_ ||
  275. !iter_.iter()->IsKeyPinned() /* copy */);
  276. skipping_saved_key = true;
  277. }
  278. } else {
  279. saved_key_.SetUserKey(
  280. ikey_.user_key, !pin_thru_lifetime_ ||
  281. !iter_.iter()->IsKeyPinned() /* copy */);
  282. if (range_del_agg_.ShouldDelete(
  283. ikey_, RangeDelPositioningMode::kForwardTraversal)) {
  284. // Arrange to skip all upcoming entries for this key since
  285. // they are hidden by this deletion.
  286. skipping_saved_key = true;
  287. num_skipped = 0;
  288. reseek_done = false;
  289. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  290. } else if (ikey_.type == kTypeBlobIndex) {
  291. if (!allow_blob_) {
  292. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  293. status_ = Status::NotSupported(
  294. "Encounter unexpected blob index. Please open DB with "
  295. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  296. valid_ = false;
  297. return false;
  298. }
  299. is_blob_ = true;
  300. valid_ = true;
  301. return true;
  302. } else {
  303. valid_ = true;
  304. return true;
  305. }
  306. }
  307. break;
  308. case kTypeMerge:
  309. saved_key_.SetUserKey(
  310. ikey_.user_key,
  311. !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
  312. if (range_del_agg_.ShouldDelete(
  313. ikey_, RangeDelPositioningMode::kForwardTraversal)) {
  314. // Arrange to skip all upcoming entries for this key since
  315. // they are hidden by this deletion.
  316. skipping_saved_key = true;
  317. num_skipped = 0;
  318. reseek_done = false;
  319. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  320. } else {
  321. // By now, we are sure the current ikey is going to yield a
  322. // value
  323. current_entry_is_merged_ = true;
  324. valid_ = true;
  325. return MergeValuesNewToOld(); // Go to a different state machine
  326. }
  327. break;
  328. default:
  329. assert(false);
  330. break;
  331. }
  332. }
  333. } else {
  334. PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
  335. // This key was inserted after our snapshot was taken.
  336. // If this happens too many times in a row for the same user key, we want
  337. // to seek to the target sequence number.
  338. int cmp =
  339. user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
  340. if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
  341. num_skipped++;
  342. } else {
  343. saved_key_.SetUserKey(
  344. ikey_.user_key,
  345. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  346. skipping_saved_key = false;
  347. num_skipped = 0;
  348. reseek_done = false;
  349. }
  350. }
  351. // If we have sequentially iterated via numerous equal keys, then it's
  352. // better to seek so that we can avoid too many key comparisons.
  353. //
  354. // To avoid infinite loops, do not reseek if we have already attempted to
  355. // reseek previously.
  356. //
  357. // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
  358. // then it does not make sense to reseek as we would actually land further
  359. // away from the desired key. There is opportunity for optimization here.
  360. if (num_skipped > max_skip_ && !reseek_done) {
  361. is_key_seqnum_zero_ = false;
  362. num_skipped = 0;
  363. reseek_done = true;
  364. std::string last_key;
  365. if (skipping_saved_key) {
  366. // We're looking for the next user-key but all we see are the same
  367. // user-key with decreasing sequence numbers. Fast forward to
  368. // sequence number 0 and type deletion (the smallest type).
  369. AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
  370. 0, kTypeDeletion));
  371. // Don't set skipping_saved_key = false because we may still see more
  372. // user-keys equal to saved_key_.
  373. } else {
  374. // We saw multiple entries with this user key and sequence numbers
  375. // higher than sequence_. Fast forward to sequence_.
  376. // Note that this only covers a case when a higher key was overwritten
  377. // many times since our snapshot was taken, not the case when a lot of
  378. // different keys were inserted after our snapshot was taken.
  379. AppendInternalKey(&last_key,
  380. ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
  381. kValueTypeForSeek));
  382. }
  383. iter_.Seek(last_key);
  384. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  385. } else {
  386. iter_.Next();
  387. }
  388. } while (iter_.Valid());
  389. valid_ = false;
  390. return iter_.status().ok();
  391. }
  392. // Merge values of the same user key starting from the current iter_ position
  393. // Scan from the newer entries to older entries.
  394. // PRE: iter_.key() points to the first merge type entry
  395. // saved_key_ stores the user key
  396. // POST: saved_value_ has the merged value for the user key
  397. // iter_ points to the next entry (or invalid)
  398. bool DBIter::MergeValuesNewToOld() {
  399. if (!merge_operator_) {
  400. ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
  401. status_ = Status::InvalidArgument("merge_operator_ must be set.");
  402. valid_ = false;
  403. return false;
  404. }
  405. // Temporarily pin the blocks that hold merge operands
  406. TempPinData();
  407. merge_context_.Clear();
  408. // Start the merge process by pushing the first operand
  409. merge_context_.PushOperand(
  410. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  411. TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
  412. ParsedInternalKey ikey;
  413. Status s;
  414. for (iter_.Next(); iter_.Valid(); iter_.Next()) {
  415. TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
  416. if (!ParseKey(&ikey)) {
  417. return false;
  418. }
  419. if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
  420. // hit the next user key, stop right here
  421. break;
  422. } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
  423. range_del_agg_.ShouldDelete(
  424. ikey, RangeDelPositioningMode::kForwardTraversal)) {
  425. // hit a delete with the same user key, stop right here
  426. // iter_ is positioned after delete
  427. iter_.Next();
  428. break;
  429. } else if (kTypeValue == ikey.type) {
  430. // hit a put, merge the put value with operands and store the
  431. // final result in saved_value_. We are done!
  432. const Slice val = iter_.value();
  433. s = MergeHelper::TimedFullMerge(
  434. merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
  435. &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
  436. if (!s.ok()) {
  437. valid_ = false;
  438. status_ = s;
  439. return false;
  440. }
  441. // iter_ is positioned after put
  442. iter_.Next();
  443. if (!iter_.status().ok()) {
  444. valid_ = false;
  445. return false;
  446. }
  447. return true;
  448. } else if (kTypeMerge == ikey.type) {
  449. // hit a merge, add the value as an operand and run associative merge.
  450. // when complete, add result to operands and continue.
  451. merge_context_.PushOperand(
  452. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  453. PERF_COUNTER_ADD(internal_merge_count, 1);
  454. } else if (kTypeBlobIndex == ikey.type) {
  455. if (!allow_blob_) {
  456. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  457. status_ = Status::NotSupported(
  458. "Encounter unexpected blob index. Please open DB with "
  459. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  460. } else {
  461. status_ =
  462. Status::NotSupported("Blob DB does not support merge operator.");
  463. }
  464. valid_ = false;
  465. return false;
  466. } else {
  467. assert(false);
  468. }
  469. }
  470. if (!iter_.status().ok()) {
  471. valid_ = false;
  472. return false;
  473. }
  474. // we either exhausted all internal keys under this user key, or hit
  475. // a deletion marker.
  476. // feed null as the existing value to the merge operator, such that
  477. // client can differentiate this scenario and do things accordingly.
  478. s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
  479. nullptr, merge_context_.GetOperands(),
  480. &saved_value_, logger_, statistics_, env_,
  481. &pinned_value_, true);
  482. if (!s.ok()) {
  483. valid_ = false;
  484. status_ = s;
  485. return false;
  486. }
  487. assert(status_.ok());
  488. return true;
  489. }
  490. void DBIter::Prev() {
  491. assert(valid_);
  492. assert(status_.ok());
  493. PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
  494. ReleaseTempPinnedData();
  495. ResetInternalKeysSkippedCounter();
  496. bool ok = true;
  497. if (direction_ == kForward) {
  498. if (!ReverseToBackward()) {
  499. ok = false;
  500. }
  501. }
  502. if (ok) {
  503. Slice prefix;
  504. if (prefix_same_as_start_) {
  505. assert(prefix_extractor_ != nullptr);
  506. prefix = prefix_.GetUserKey();
  507. }
  508. PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
  509. }
  510. if (statistics_ != nullptr) {
  511. local_stats_.prev_count_++;
  512. if (valid_) {
  513. local_stats_.prev_found_count_++;
  514. local_stats_.bytes_read_ += (key().size() + value().size());
  515. }
  516. }
  517. }
  518. bool DBIter::ReverseToForward() {
  519. assert(iter_.status().ok());
  520. // When moving backwards, iter_ is positioned on _previous_ key, which may
  521. // not exist or may have different prefix than the current key().
  522. // If that's the case, seek iter_ to current key.
  523. if (!expect_total_order_inner_iter() || !iter_.Valid()) {
  524. IterKey last_key;
  525. last_key.SetInternalKey(ParsedInternalKey(
  526. saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
  527. iter_.Seek(last_key.GetInternalKey());
  528. }
  529. direction_ = kForward;
  530. // Skip keys less than the current key() (a.k.a. saved_key_).
  531. while (iter_.Valid()) {
  532. ParsedInternalKey ikey;
  533. if (!ParseKey(&ikey)) {
  534. return false;
  535. }
  536. if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
  537. return true;
  538. }
  539. iter_.Next();
  540. }
  541. if (!iter_.status().ok()) {
  542. valid_ = false;
  543. return false;
  544. }
  545. return true;
  546. }
  547. // Move iter_ to the key before saved_key_.
  548. bool DBIter::ReverseToBackward() {
  549. assert(iter_.status().ok());
  550. // When current_entry_is_merged_ is true, iter_ may be positioned on the next
  551. // key, which may not exist or may have prefix different from current.
  552. // If that's the case, seek to saved_key_.
  553. if (current_entry_is_merged_ &&
  554. (!expect_total_order_inner_iter() || !iter_.Valid())) {
  555. IterKey last_key;
  556. // Using kMaxSequenceNumber and kValueTypeForSeek
  557. // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
  558. // than saved_key_.
  559. last_key.SetInternalKey(ParsedInternalKey(
  560. saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
  561. if (!expect_total_order_inner_iter()) {
  562. iter_.SeekForPrev(last_key.GetInternalKey());
  563. } else {
  564. // Some iterators may not support SeekForPrev(), so we avoid using it
  565. // when prefix seek mode is disabled. This is somewhat expensive
  566. // (an extra Prev(), as well as an extra change of direction of iter_),
  567. // so we may need to reconsider it later.
  568. iter_.Seek(last_key.GetInternalKey());
  569. if (!iter_.Valid() && iter_.status().ok()) {
  570. iter_.SeekToLast();
  571. }
  572. }
  573. }
  574. direction_ = kReverse;
  575. return FindUserKeyBeforeSavedKey();
  576. }
  577. void DBIter::PrevInternal(const Slice* prefix) {
  578. while (iter_.Valid()) {
  579. saved_key_.SetUserKey(
  580. ExtractUserKey(iter_.key()),
  581. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  582. assert(prefix == nullptr || prefix_extractor_ != nullptr);
  583. if (prefix != nullptr &&
  584. prefix_extractor_->Transform(saved_key_.GetUserKey())
  585. .compare(*prefix) != 0) {
  586. assert(prefix_same_as_start_);
  587. // Current key does not have the same prefix as start
  588. valid_ = false;
  589. return;
  590. }
  591. assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
  592. user_comparator_.Compare(saved_key_.GetUserKey(),
  593. *iterate_lower_bound_) >= 0);
  594. if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
  595. user_comparator_.Compare(saved_key_.GetUserKey(),
  596. *iterate_lower_bound_) < 0) {
  597. // We've iterated earlier than the user-specified lower bound.
  598. valid_ = false;
  599. return;
  600. }
  601. if (!FindValueForCurrentKey()) { // assigns valid_
  602. return;
  603. }
  604. // Whether or not we found a value for current key, we need iter_ to end up
  605. // on a smaller key.
  606. if (!FindUserKeyBeforeSavedKey()) {
  607. return;
  608. }
  609. if (valid_) {
  610. // Found the value.
  611. return;
  612. }
  613. if (TooManyInternalKeysSkipped(false)) {
  614. return;
  615. }
  616. }
  617. // We haven't found any key - iterator is not valid
  618. valid_ = false;
  619. }
  620. // Used for backwards iteration.
  621. // Looks at the entries with user key saved_key_ and finds the most up-to-date
  622. // value for it, or executes a merge, or determines that the value was deleted.
  623. // Sets valid_ to true if the value is found and is ready to be presented to
  624. // the user through value().
  625. // Sets valid_ to false if the value was deleted, and we should try another key.
  626. // Returns false if an error occurred, and !status().ok() and !valid_.
  627. //
  628. // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
  629. // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
  630. // the entry just before them, or on the entry just after them.
  631. bool DBIter::FindValueForCurrentKey() {
  632. assert(iter_.Valid());
  633. merge_context_.Clear();
  634. current_entry_is_merged_ = false;
  635. // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
  636. // kTypeValue)
  637. ValueType last_not_merge_type = kTypeDeletion;
  638. ValueType last_key_entry_type = kTypeDeletion;
  639. // Temporarily pin blocks that hold (merge operands / the value)
  640. ReleaseTempPinnedData();
  641. TempPinData();
  642. size_t num_skipped = 0;
  643. while (iter_.Valid()) {
  644. ParsedInternalKey ikey;
  645. if (!ParseKey(&ikey)) {
  646. return false;
  647. }
  648. if (!IsVisible(ikey.sequence) ||
  649. !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
  650. break;
  651. }
  652. if (TooManyInternalKeysSkipped()) {
  653. return false;
  654. }
  655. // This user key has lots of entries.
  656. // We're going from old to new, and it's taking too long. Let's do a Seek()
  657. // and go from new to old. This helps when a key was overwritten many times.
  658. if (num_skipped >= max_skip_) {
  659. return FindValueForCurrentKeyUsingSeek();
  660. }
  661. last_key_entry_type = ikey.type;
  662. switch (last_key_entry_type) {
  663. case kTypeValue:
  664. case kTypeBlobIndex:
  665. if (range_del_agg_.ShouldDelete(
  666. ikey, RangeDelPositioningMode::kBackwardTraversal)) {
  667. last_key_entry_type = kTypeRangeDeletion;
  668. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  669. } else {
  670. assert(iter_.iter()->IsValuePinned());
  671. pinned_value_ = iter_.value();
  672. }
  673. merge_context_.Clear();
  674. last_not_merge_type = last_key_entry_type;
  675. break;
  676. case kTypeDeletion:
  677. case kTypeSingleDeletion:
  678. merge_context_.Clear();
  679. last_not_merge_type = last_key_entry_type;
  680. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  681. break;
  682. case kTypeMerge:
  683. if (range_del_agg_.ShouldDelete(
  684. ikey, RangeDelPositioningMode::kBackwardTraversal)) {
  685. merge_context_.Clear();
  686. last_key_entry_type = kTypeRangeDeletion;
  687. last_not_merge_type = last_key_entry_type;
  688. PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
  689. } else {
  690. assert(merge_operator_ != nullptr);
  691. merge_context_.PushOperandBack(
  692. iter_.value(),
  693. iter_.iter()->IsValuePinned() /* operand_pinned */);
  694. PERF_COUNTER_ADD(internal_merge_count, 1);
  695. }
  696. break;
  697. default:
  698. assert(false);
  699. }
  700. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  701. iter_.Prev();
  702. ++num_skipped;
  703. }
  704. if (!iter_.status().ok()) {
  705. valid_ = false;
  706. return false;
  707. }
  708. Status s;
  709. is_blob_ = false;
  710. switch (last_key_entry_type) {
  711. case kTypeDeletion:
  712. case kTypeSingleDeletion:
  713. case kTypeRangeDeletion:
  714. valid_ = false;
  715. return true;
  716. case kTypeMerge:
  717. current_entry_is_merged_ = true;
  718. if (last_not_merge_type == kTypeDeletion ||
  719. last_not_merge_type == kTypeSingleDeletion ||
  720. last_not_merge_type == kTypeRangeDeletion) {
  721. s = MergeHelper::TimedFullMerge(
  722. merge_operator_, saved_key_.GetUserKey(), nullptr,
  723. merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
  724. env_, &pinned_value_, true);
  725. } else if (last_not_merge_type == kTypeBlobIndex) {
  726. if (!allow_blob_) {
  727. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  728. status_ = Status::NotSupported(
  729. "Encounter unexpected blob index. Please open DB with "
  730. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  731. } else {
  732. status_ =
  733. Status::NotSupported("Blob DB does not support merge operator.");
  734. }
  735. valid_ = false;
  736. return false;
  737. } else {
  738. assert(last_not_merge_type == kTypeValue);
  739. s = MergeHelper::TimedFullMerge(
  740. merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
  741. merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
  742. env_, &pinned_value_, true);
  743. }
  744. break;
  745. case kTypeValue:
  746. // do nothing - we've already has value in pinned_value_
  747. break;
  748. case kTypeBlobIndex:
  749. if (!allow_blob_) {
  750. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  751. status_ = Status::NotSupported(
  752. "Encounter unexpected blob index. Please open DB with "
  753. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  754. valid_ = false;
  755. return false;
  756. }
  757. is_blob_ = true;
  758. break;
  759. default:
  760. assert(false);
  761. break;
  762. }
  763. if (!s.ok()) {
  764. valid_ = false;
  765. status_ = s;
  766. return false;
  767. }
  768. valid_ = true;
  769. return true;
  770. }
  771. // This function is used in FindValueForCurrentKey.
  772. // We use Seek() function instead of Prev() to find necessary value
  773. // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
  774. // Would be nice to reuse some code.
  775. bool DBIter::FindValueForCurrentKeyUsingSeek() {
  776. // FindValueForCurrentKey will enable pinning before calling
  777. // FindValueForCurrentKeyUsingSeek()
  778. assert(pinned_iters_mgr_.PinningEnabled());
  779. std::string last_key;
  780. AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
  781. sequence_, kValueTypeForSeek));
  782. iter_.Seek(last_key);
  783. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  784. // In case read_callback presents, the value we seek to may not be visible.
  785. // Find the next value that's visible.
  786. ParsedInternalKey ikey;
  787. is_blob_ = false;
  788. while (true) {
  789. if (!iter_.Valid()) {
  790. valid_ = false;
  791. return iter_.status().ok();
  792. }
  793. if (!ParseKey(&ikey)) {
  794. return false;
  795. }
  796. if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
  797. // No visible values for this key, even though FindValueForCurrentKey()
  798. // has seen some. This is possible if we're using a tailing iterator, and
  799. // the entries were discarded in a compaction.
  800. valid_ = false;
  801. return true;
  802. }
  803. if (IsVisible(ikey.sequence)) {
  804. break;
  805. }
  806. iter_.Next();
  807. }
  808. if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
  809. range_del_agg_.ShouldDelete(
  810. ikey, RangeDelPositioningMode::kBackwardTraversal)) {
  811. valid_ = false;
  812. return true;
  813. }
  814. if (ikey.type == kTypeBlobIndex && !allow_blob_) {
  815. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  816. status_ = Status::NotSupported(
  817. "Encounter unexpected blob index. Please open DB with "
  818. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  819. valid_ = false;
  820. return false;
  821. }
  822. if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
  823. assert(iter_.iter()->IsValuePinned());
  824. pinned_value_ = iter_.value();
  825. is_blob_ = (ikey.type == kTypeBlobIndex);
  826. valid_ = true;
  827. return true;
  828. }
  829. // kTypeMerge. We need to collect all kTypeMerge values and save them
  830. // in operands
  831. assert(ikey.type == kTypeMerge);
  832. current_entry_is_merged_ = true;
  833. merge_context_.Clear();
  834. merge_context_.PushOperand(
  835. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  836. while (true) {
  837. iter_.Next();
  838. if (!iter_.Valid()) {
  839. if (!iter_.status().ok()) {
  840. valid_ = false;
  841. return false;
  842. }
  843. break;
  844. }
  845. if (!ParseKey(&ikey)) {
  846. return false;
  847. }
  848. if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
  849. break;
  850. }
  851. if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
  852. range_del_agg_.ShouldDelete(
  853. ikey, RangeDelPositioningMode::kForwardTraversal)) {
  854. break;
  855. } else if (ikey.type == kTypeValue) {
  856. const Slice val = iter_.value();
  857. Status s = MergeHelper::TimedFullMerge(
  858. merge_operator_, saved_key_.GetUserKey(), &val,
  859. merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
  860. env_, &pinned_value_, true);
  861. if (!s.ok()) {
  862. valid_ = false;
  863. status_ = s;
  864. return false;
  865. }
  866. valid_ = true;
  867. return true;
  868. } else if (ikey.type == kTypeMerge) {
  869. merge_context_.PushOperand(
  870. iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
  871. PERF_COUNTER_ADD(internal_merge_count, 1);
  872. } else if (ikey.type == kTypeBlobIndex) {
  873. if (!allow_blob_) {
  874. ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
  875. status_ = Status::NotSupported(
  876. "Encounter unexpected blob index. Please open DB with "
  877. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  878. } else {
  879. status_ =
  880. Status::NotSupported("Blob DB does not support merge operator.");
  881. }
  882. valid_ = false;
  883. return false;
  884. } else {
  885. assert(false);
  886. }
  887. }
  888. Status s = MergeHelper::TimedFullMerge(
  889. merge_operator_, saved_key_.GetUserKey(), nullptr,
  890. merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
  891. &pinned_value_, true);
  892. if (!s.ok()) {
  893. valid_ = false;
  894. status_ = s;
  895. return false;
  896. }
  897. // Make sure we leave iter_ in a good state. If it's valid and we don't care
  898. // about prefixes, that's already good enough. Otherwise it needs to be
  899. // seeked to the current key.
  900. if (!expect_total_order_inner_iter() || !iter_.Valid()) {
  901. if (!expect_total_order_inner_iter()) {
  902. iter_.SeekForPrev(last_key);
  903. } else {
  904. iter_.Seek(last_key);
  905. if (!iter_.Valid() && iter_.status().ok()) {
  906. iter_.SeekToLast();
  907. }
  908. }
  909. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  910. }
  911. valid_ = true;
  912. return true;
  913. }
  914. // Move backwards until the key smaller than saved_key_.
  915. // Changes valid_ only if return value is false.
  916. bool DBIter::FindUserKeyBeforeSavedKey() {
  917. assert(status_.ok());
  918. size_t num_skipped = 0;
  919. while (iter_.Valid()) {
  920. ParsedInternalKey ikey;
  921. if (!ParseKey(&ikey)) {
  922. return false;
  923. }
  924. if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
  925. return true;
  926. }
  927. if (TooManyInternalKeysSkipped()) {
  928. return false;
  929. }
  930. assert(ikey.sequence != kMaxSequenceNumber);
  931. if (!IsVisible(ikey.sequence)) {
  932. PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
  933. } else {
  934. PERF_COUNTER_ADD(internal_key_skipped_count, 1);
  935. }
  936. if (num_skipped >= max_skip_) {
  937. num_skipped = 0;
  938. IterKey last_key;
  939. last_key.SetInternalKey(ParsedInternalKey(
  940. saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
  941. // It would be more efficient to use SeekForPrev() here, but some
  942. // iterators may not support it.
  943. iter_.Seek(last_key.GetInternalKey());
  944. RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
  945. if (!iter_.Valid()) {
  946. break;
  947. }
  948. } else {
  949. ++num_skipped;
  950. }
  951. iter_.Prev();
  952. }
  953. if (!iter_.status().ok()) {
  954. valid_ = false;
  955. return false;
  956. }
  957. return true;
  958. }
  959. bool DBIter::TooManyInternalKeysSkipped(bool increment) {
  960. if ((max_skippable_internal_keys_ > 0) &&
  961. (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
  962. valid_ = false;
  963. status_ = Status::Incomplete("Too many internal keys skipped.");
  964. return true;
  965. } else if (increment) {
  966. num_internal_keys_skipped_++;
  967. }
  968. return false;
  969. }
  970. bool DBIter::IsVisible(SequenceNumber sequence) {
  971. if (read_callback_ == nullptr) {
  972. return sequence <= sequence_;
  973. } else {
  974. return read_callback_->IsVisible(sequence);
  975. }
  976. }
  977. void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
  978. is_key_seqnum_zero_ = false;
  979. SequenceNumber seq = sequence_;
  980. saved_key_.Clear();
  981. saved_key_.SetInternalKey(target, seq);
  982. if (iterate_lower_bound_ != nullptr &&
  983. user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
  984. 0) {
  985. // Seek key is smaller than the lower bound.
  986. saved_key_.Clear();
  987. saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
  988. }
  989. }
  990. void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
  991. is_key_seqnum_zero_ = false;
  992. saved_key_.Clear();
  993. // now saved_key is used to store internal key.
  994. saved_key_.SetInternalKey(target, 0 /* sequence_number */,
  995. kValueTypeForSeekForPrev);
  996. if (iterate_upper_bound_ != nullptr &&
  997. user_comparator_.Compare(saved_key_.GetUserKey(),
  998. *iterate_upper_bound_) >= 0) {
  999. saved_key_.Clear();
  1000. saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
  1001. }
  1002. }
  1003. void DBIter::Seek(const Slice& target) {
  1004. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
  1005. StopWatch sw(env_, statistics_, DB_SEEK);
  1006. #ifndef ROCKSDB_LITE
  1007. if (db_impl_ != nullptr && cfd_ != nullptr) {
  1008. db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
  1009. }
  1010. #endif // ROCKSDB_LITE
  1011. status_ = Status::OK();
  1012. ReleaseTempPinnedData();
  1013. ResetInternalKeysSkippedCounter();
  1014. // Seek the inner iterator based on the target key.
  1015. {
  1016. PERF_TIMER_GUARD(seek_internal_seek_time);
  1017. SetSavedKeyToSeekTarget(target);
  1018. iter_.Seek(saved_key_.GetInternalKey());
  1019. range_del_agg_.InvalidateRangeDelMapPositions();
  1020. RecordTick(statistics_, NUMBER_DB_SEEK);
  1021. }
  1022. if (!iter_.Valid()) {
  1023. valid_ = false;
  1024. return;
  1025. }
  1026. direction_ = kForward;
  1027. // Now the inner iterator is placed to the target position. From there,
  1028. // we need to find out the next key that is visible to the user.
  1029. ClearSavedValue();
  1030. if (prefix_same_as_start_) {
  1031. // The case where the iterator needs to be invalidated if it has exausted
  1032. // keys within the same prefix of the seek key.
  1033. assert(prefix_extractor_ != nullptr);
  1034. Slice target_prefix = prefix_extractor_->Transform(target);
  1035. FindNextUserEntry(false /* not skipping saved_key */,
  1036. &target_prefix /* prefix */);
  1037. if (valid_) {
  1038. // Remember the prefix of the seek key for the future Next() call to
  1039. // check.
  1040. prefix_.SetUserKey(target_prefix);
  1041. }
  1042. } else {
  1043. FindNextUserEntry(false /* not skipping saved_key */, nullptr);
  1044. }
  1045. if (!valid_) {
  1046. return;
  1047. }
  1048. // Updating stats and perf context counters.
  1049. if (statistics_ != nullptr) {
  1050. // Decrement since we don't want to count this key as skipped
  1051. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1052. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1053. }
  1054. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1055. }
  1056. void DBIter::SeekForPrev(const Slice& target) {
  1057. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
  1058. StopWatch sw(env_, statistics_, DB_SEEK);
  1059. #ifndef ROCKSDB_LITE
  1060. if (db_impl_ != nullptr && cfd_ != nullptr) {
  1061. db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
  1062. }
  1063. #endif // ROCKSDB_LITE
  1064. status_ = Status::OK();
  1065. ReleaseTempPinnedData();
  1066. ResetInternalKeysSkippedCounter();
  1067. // Seek the inner iterator based on the target key.
  1068. {
  1069. PERF_TIMER_GUARD(seek_internal_seek_time);
  1070. SetSavedKeyToSeekForPrevTarget(target);
  1071. iter_.SeekForPrev(saved_key_.GetInternalKey());
  1072. range_del_agg_.InvalidateRangeDelMapPositions();
  1073. RecordTick(statistics_, NUMBER_DB_SEEK);
  1074. }
  1075. if (!iter_.Valid()) {
  1076. valid_ = false;
  1077. return;
  1078. }
  1079. direction_ = kReverse;
  1080. // Now the inner iterator is placed to the target position. From there,
  1081. // we need to find out the first key that is visible to the user in the
  1082. // backward direction.
  1083. ClearSavedValue();
  1084. if (prefix_same_as_start_) {
  1085. // The case where the iterator needs to be invalidated if it has exausted
  1086. // keys within the same prefix of the seek key.
  1087. assert(prefix_extractor_ != nullptr);
  1088. Slice target_prefix = prefix_extractor_->Transform(target);
  1089. PrevInternal(&target_prefix);
  1090. if (valid_) {
  1091. // Remember the prefix of the seek key for the future Prev() call to
  1092. // check.
  1093. prefix_.SetUserKey(target_prefix);
  1094. }
  1095. } else {
  1096. PrevInternal(nullptr);
  1097. }
  1098. // Report stats and perf context.
  1099. if (statistics_ != nullptr && valid_) {
  1100. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1101. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1102. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1103. }
  1104. }
  1105. void DBIter::SeekToFirst() {
  1106. if (iterate_lower_bound_ != nullptr) {
  1107. Seek(*iterate_lower_bound_);
  1108. return;
  1109. }
  1110. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
  1111. // Don't use iter_::Seek() if we set a prefix extractor
  1112. // because prefix seek will be used.
  1113. if (!expect_total_order_inner_iter()) {
  1114. max_skip_ = std::numeric_limits<uint64_t>::max();
  1115. }
  1116. status_ = Status::OK();
  1117. direction_ = kForward;
  1118. ReleaseTempPinnedData();
  1119. ResetInternalKeysSkippedCounter();
  1120. ClearSavedValue();
  1121. is_key_seqnum_zero_ = false;
  1122. {
  1123. PERF_TIMER_GUARD(seek_internal_seek_time);
  1124. iter_.SeekToFirst();
  1125. range_del_agg_.InvalidateRangeDelMapPositions();
  1126. }
  1127. RecordTick(statistics_, NUMBER_DB_SEEK);
  1128. if (iter_.Valid()) {
  1129. saved_key_.SetUserKey(
  1130. ExtractUserKey(iter_.key()),
  1131. !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
  1132. FindNextUserEntry(false /* not skipping saved_key */,
  1133. nullptr /* no prefix check */);
  1134. if (statistics_ != nullptr) {
  1135. if (valid_) {
  1136. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1137. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1138. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1139. }
  1140. }
  1141. } else {
  1142. valid_ = false;
  1143. }
  1144. if (valid_ && prefix_same_as_start_) {
  1145. assert(prefix_extractor_ != nullptr);
  1146. prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
  1147. }
  1148. }
  1149. void DBIter::SeekToLast() {
  1150. if (iterate_upper_bound_ != nullptr) {
  1151. // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
  1152. SeekForPrev(*iterate_upper_bound_);
  1153. if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
  1154. ReleaseTempPinnedData();
  1155. PrevInternal(nullptr);
  1156. }
  1157. return;
  1158. }
  1159. PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
  1160. // Don't use iter_::Seek() if we set a prefix extractor
  1161. // because prefix seek will be used.
  1162. if (!expect_total_order_inner_iter()) {
  1163. max_skip_ = std::numeric_limits<uint64_t>::max();
  1164. }
  1165. status_ = Status::OK();
  1166. direction_ = kReverse;
  1167. ReleaseTempPinnedData();
  1168. ResetInternalKeysSkippedCounter();
  1169. ClearSavedValue();
  1170. is_key_seqnum_zero_ = false;
  1171. {
  1172. PERF_TIMER_GUARD(seek_internal_seek_time);
  1173. iter_.SeekToLast();
  1174. range_del_agg_.InvalidateRangeDelMapPositions();
  1175. }
  1176. PrevInternal(nullptr);
  1177. if (statistics_ != nullptr) {
  1178. RecordTick(statistics_, NUMBER_DB_SEEK);
  1179. if (valid_) {
  1180. RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
  1181. RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
  1182. PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
  1183. }
  1184. }
  1185. if (valid_ && prefix_same_as_start_) {
  1186. assert(prefix_extractor_ != nullptr);
  1187. prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
  1188. }
  1189. }
  1190. Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
  1191. const ImmutableCFOptions& cf_options,
  1192. const MutableCFOptions& mutable_cf_options,
  1193. const Comparator* user_key_comparator,
  1194. InternalIterator* internal_iter,
  1195. const SequenceNumber& sequence,
  1196. uint64_t max_sequential_skip_in_iterations,
  1197. ReadCallback* read_callback, DBImpl* db_impl,
  1198. ColumnFamilyData* cfd, bool allow_blob) {
  1199. DBIter* db_iter = new DBIter(
  1200. env, read_options, cf_options, mutable_cf_options, user_key_comparator,
  1201. internal_iter, sequence, false, max_sequential_skip_in_iterations,
  1202. read_callback, db_impl, cfd, allow_blob);
  1203. return db_iter;
  1204. }
  1205. } // namespace ROCKSDB_NAMESPACE