memtable.cc 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/memtable.h"
  10. #include <algorithm>
  11. #include <array>
  12. #include <limits>
  13. #include <memory>
  14. #include "db/dbformat.h"
  15. #include "db/merge_context.h"
  16. #include "db/merge_helper.h"
  17. #include "db/pinned_iterators_manager.h"
  18. #include "db/range_tombstone_fragmenter.h"
  19. #include "db/read_callback.h"
  20. #include "memory/arena.h"
  21. #include "memory/memory_usage.h"
  22. #include "monitoring/perf_context_imp.h"
  23. #include "monitoring/statistics.h"
  24. #include "port/port.h"
  25. #include "rocksdb/comparator.h"
  26. #include "rocksdb/env.h"
  27. #include "rocksdb/iterator.h"
  28. #include "rocksdb/merge_operator.h"
  29. #include "rocksdb/slice_transform.h"
  30. #include "rocksdb/write_buffer_manager.h"
  31. #include "table/internal_iterator.h"
  32. #include "table/iterator_wrapper.h"
  33. #include "table/merging_iterator.h"
  34. #include "util/autovector.h"
  35. #include "util/coding.h"
  36. #include "util/mutexlock.h"
  37. #include "util/util.h"
  38. namespace ROCKSDB_NAMESPACE {
  39. ImmutableMemTableOptions::ImmutableMemTableOptions(
  40. const ImmutableCFOptions& ioptions,
  41. const MutableCFOptions& mutable_cf_options)
  42. : arena_block_size(mutable_cf_options.arena_block_size),
  43. memtable_prefix_bloom_bits(
  44. static_cast<uint32_t>(
  45. static_cast<double>(mutable_cf_options.write_buffer_size) *
  46. mutable_cf_options.memtable_prefix_bloom_size_ratio) *
  47. 8u),
  48. memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
  49. memtable_whole_key_filtering(
  50. mutable_cf_options.memtable_whole_key_filtering),
  51. inplace_update_support(ioptions.inplace_update_support),
  52. inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
  53. inplace_callback(ioptions.inplace_callback),
  54. max_successive_merges(mutable_cf_options.max_successive_merges),
  55. statistics(ioptions.statistics),
  56. merge_operator(ioptions.merge_operator),
  57. info_log(ioptions.info_log) {}
  58. MemTable::MemTable(const InternalKeyComparator& cmp,
  59. const ImmutableCFOptions& ioptions,
  60. const MutableCFOptions& mutable_cf_options,
  61. WriteBufferManager* write_buffer_manager,
  62. SequenceNumber latest_seq, uint32_t column_family_id)
  63. : comparator_(cmp),
  64. moptions_(ioptions, mutable_cf_options),
  65. refs_(0),
  66. kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
  67. mem_tracker_(write_buffer_manager),
  68. arena_(moptions_.arena_block_size,
  69. (write_buffer_manager != nullptr &&
  70. (write_buffer_manager->enabled() ||
  71. write_buffer_manager->cost_to_cache()))
  72. ? &mem_tracker_
  73. : nullptr,
  74. mutable_cf_options.memtable_huge_page_size),
  75. table_(ioptions.memtable_factory->CreateMemTableRep(
  76. comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
  77. ioptions.info_log, column_family_id)),
  78. range_del_table_(SkipListFactory().CreateMemTableRep(
  79. comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
  80. column_family_id)),
  81. is_range_del_table_empty_(true),
  82. data_size_(0),
  83. num_entries_(0),
  84. num_deletes_(0),
  85. write_buffer_size_(mutable_cf_options.write_buffer_size),
  86. flush_in_progress_(false),
  87. flush_completed_(false),
  88. file_number_(0),
  89. first_seqno_(0),
  90. earliest_seqno_(latest_seq),
  91. creation_seq_(latest_seq),
  92. mem_next_logfile_number_(0),
  93. min_prep_log_referenced_(0),
  94. locks_(moptions_.inplace_update_support
  95. ? moptions_.inplace_update_num_locks
  96. : 0),
  97. prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
  98. flush_state_(FLUSH_NOT_REQUESTED),
  99. env_(ioptions.env),
  100. insert_with_hint_prefix_extractor_(
  101. ioptions.memtable_insert_with_hint_prefix_extractor),
  102. oldest_key_time_(std::numeric_limits<uint64_t>::max()),
  103. atomic_flush_seqno_(kMaxSequenceNumber),
  104. approximate_memory_usage_(0) {
  105. UpdateFlushState();
  106. // something went wrong if we need to flush before inserting anything
  107. assert(!ShouldScheduleFlush());
  108. // use bloom_filter_ for both whole key and prefix bloom filter
  109. if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
  110. moptions_.memtable_prefix_bloom_bits > 0) {
  111. bloom_filter_.reset(
  112. new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
  113. 6 /* hard coded 6 probes */,
  114. moptions_.memtable_huge_page_size, ioptions.info_log));
  115. }
  116. }
  117. MemTable::~MemTable() {
  118. mem_tracker_.FreeMem();
  119. assert(refs_ == 0);
  120. }
  121. size_t MemTable::ApproximateMemoryUsage() {
  122. autovector<size_t> usages = {
  123. arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
  124. range_del_table_->ApproximateMemoryUsage(),
  125. ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
  126. size_t total_usage = 0;
  127. for (size_t usage : usages) {
  128. // If usage + total_usage >= kMaxSizet, return kMaxSizet.
  129. // the following variation is to avoid numeric overflow.
  130. if (usage >= port::kMaxSizet - total_usage) {
  131. return port::kMaxSizet;
  132. }
  133. total_usage += usage;
  134. }
  135. approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
  136. // otherwise, return the actual usage
  137. return total_usage;
  138. }
  139. bool MemTable::ShouldFlushNow() {
  140. size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
  141. // In a lot of times, we cannot allocate arena blocks that exactly matches the
  142. // buffer size. Thus we have to decide if we should over-allocate or
  143. // under-allocate.
  144. // This constant variable can be interpreted as: if we still have more than
  145. // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
  146. // allocate one more block.
  147. const double kAllowOverAllocationRatio = 0.6;
  148. // If arena still have room for new block allocation, we can safely say it
  149. // shouldn't flush.
  150. auto allocated_memory = table_->ApproximateMemoryUsage() +
  151. range_del_table_->ApproximateMemoryUsage() +
  152. arena_.MemoryAllocatedBytes();
  153. approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
  154. // if we can still allocate one more block without exceeding the
  155. // over-allocation ratio, then we should not flush.
  156. if (allocated_memory + kArenaBlockSize <
  157. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  158. return false;
  159. }
  160. // if user keeps adding entries that exceeds write_buffer_size, we need to
  161. // flush earlier even though we still have much available memory left.
  162. if (allocated_memory >
  163. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  164. return true;
  165. }
  166. // In this code path, Arena has already allocated its "last block", which
  167. // means the total allocatedmemory size is either:
  168. // (1) "moderately" over allocated the memory (no more than `0.6 * arena
  169. // block size`. Or,
  170. // (2) the allocated memory is less than write buffer size, but we'll stop
  171. // here since if we allocate a new arena block, we'll over allocate too much
  172. // more (half of the arena block size) memory.
  173. //
  174. // In either case, to avoid over-allocate, the last block will stop allocation
  175. // when its usage reaches a certain ratio, which we carefully choose "0.75
  176. // full" as the stop condition because it addresses the following issue with
  177. // great simplicity: What if the next inserted entry's size is
  178. // bigger than AllocatedAndUnused()?
  179. //
  180. // The answer is: if the entry size is also bigger than 0.25 *
  181. // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
  182. // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
  183. // and regular block. In either case, we *overly* over-allocated.
  184. //
  185. // Therefore, setting the last block to be at most "0.75 full" avoids both
  186. // cases.
  187. //
  188. // NOTE: the average percentage of waste space of this approach can be counted
  189. // as: "arena block size * 0.25 / write buffer size". User who specify a small
  190. // write buffer size and/or big arena block size may suffer.
  191. return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
  192. }
  193. void MemTable::UpdateFlushState() {
  194. auto state = flush_state_.load(std::memory_order_relaxed);
  195. if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
  196. // ignore CAS failure, because that means somebody else requested
  197. // a flush
  198. flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
  199. std::memory_order_relaxed,
  200. std::memory_order_relaxed);
  201. }
  202. }
  203. void MemTable::UpdateOldestKeyTime() {
  204. uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
  205. if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
  206. int64_t current_time = 0;
  207. auto s = env_->GetCurrentTime(&current_time);
  208. if (s.ok()) {
  209. assert(current_time >= 0);
  210. // If fail, the timestamp is already set.
  211. oldest_key_time_.compare_exchange_strong(
  212. oldest_key_time, static_cast<uint64_t>(current_time),
  213. std::memory_order_relaxed, std::memory_order_relaxed);
  214. }
  215. }
  216. }
  217. int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
  218. const char* prefix_len_key2) const {
  219. // Internal keys are encoded as length-prefixed strings.
  220. Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  221. Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  222. return comparator.CompareKeySeq(k1, k2);
  223. }
  224. int MemTable::KeyComparator::operator()(const char* prefix_len_key,
  225. const KeyComparator::DecodedType& key)
  226. const {
  227. // Internal keys are encoded as length-prefixed strings.
  228. Slice a = GetLengthPrefixedSlice(prefix_len_key);
  229. return comparator.CompareKeySeq(a, key);
  230. }
  231. void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
  232. #ifndef ROCKSDB_LITE
  233. throw std::runtime_error("concurrent insert not supported");
  234. #else
  235. abort();
  236. #endif
  237. }
  238. Slice MemTableRep::UserKey(const char* key) const {
  239. Slice slice = GetLengthPrefixedSlice(key);
  240. return Slice(slice.data(), slice.size() - 8);
  241. }
  242. KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
  243. *buf = allocator_->Allocate(len);
  244. return static_cast<KeyHandle>(*buf);
  245. }
  246. // Encode a suitable internal key target for "target" and return it.
  247. // Uses *scratch as scratch space, and the returned pointer will point
  248. // into this scratch space.
  249. const char* EncodeKey(std::string* scratch, const Slice& target) {
  250. scratch->clear();
  251. PutVarint32(scratch, static_cast<uint32_t>(target.size()));
  252. scratch->append(target.data(), target.size());
  253. return scratch->data();
  254. }
  255. class MemTableIterator : public InternalIterator {
  256. public:
  257. MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
  258. Arena* arena, bool use_range_del_table = false)
  259. : bloom_(nullptr),
  260. prefix_extractor_(mem.prefix_extractor_),
  261. comparator_(mem.comparator_),
  262. valid_(false),
  263. arena_mode_(arena != nullptr),
  264. value_pinned_(
  265. !mem.GetImmutableMemTableOptions()->inplace_update_support) {
  266. if (use_range_del_table) {
  267. iter_ = mem.range_del_table_->GetIterator(arena);
  268. } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
  269. !read_options.auto_prefix_mode) {
  270. // Auto prefix mode is not implemented in memtable yet.
  271. bloom_ = mem.bloom_filter_.get();
  272. iter_ = mem.table_->GetDynamicPrefixIterator(arena);
  273. } else {
  274. iter_ = mem.table_->GetIterator(arena);
  275. }
  276. }
  277. // No copying allowed
  278. MemTableIterator(const MemTableIterator&) = delete;
  279. void operator=(const MemTableIterator&) = delete;
  280. ~MemTableIterator() override {
  281. #ifndef NDEBUG
  282. // Assert that the MemTableIterator is never deleted while
  283. // Pinning is Enabled.
  284. assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
  285. #endif
  286. if (arena_mode_) {
  287. iter_->~Iterator();
  288. } else {
  289. delete iter_;
  290. }
  291. }
  292. #ifndef NDEBUG
  293. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  294. pinned_iters_mgr_ = pinned_iters_mgr;
  295. }
  296. PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
  297. #endif
  298. bool Valid() const override { return valid_; }
  299. void Seek(const Slice& k) override {
  300. PERF_TIMER_GUARD(seek_on_memtable_time);
  301. PERF_COUNTER_ADD(seek_on_memtable_count, 1);
  302. if (bloom_) {
  303. // iterator should only use prefix bloom filter
  304. Slice user_k(ExtractUserKey(k));
  305. if (prefix_extractor_->InDomain(user_k) &&
  306. !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
  307. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  308. valid_ = false;
  309. return;
  310. } else {
  311. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  312. }
  313. }
  314. iter_->Seek(k, nullptr);
  315. valid_ = iter_->Valid();
  316. }
  317. void SeekForPrev(const Slice& k) override {
  318. PERF_TIMER_GUARD(seek_on_memtable_time);
  319. PERF_COUNTER_ADD(seek_on_memtable_count, 1);
  320. if (bloom_) {
  321. Slice user_k(ExtractUserKey(k));
  322. if (prefix_extractor_->InDomain(user_k) &&
  323. !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
  324. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  325. valid_ = false;
  326. return;
  327. } else {
  328. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  329. }
  330. }
  331. iter_->Seek(k, nullptr);
  332. valid_ = iter_->Valid();
  333. if (!Valid()) {
  334. SeekToLast();
  335. }
  336. while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
  337. Prev();
  338. }
  339. }
  340. void SeekToFirst() override {
  341. iter_->SeekToFirst();
  342. valid_ = iter_->Valid();
  343. }
  344. void SeekToLast() override {
  345. iter_->SeekToLast();
  346. valid_ = iter_->Valid();
  347. }
  348. void Next() override {
  349. PERF_COUNTER_ADD(next_on_memtable_count, 1);
  350. assert(Valid());
  351. iter_->Next();
  352. valid_ = iter_->Valid();
  353. }
  354. void Prev() override {
  355. PERF_COUNTER_ADD(prev_on_memtable_count, 1);
  356. assert(Valid());
  357. iter_->Prev();
  358. valid_ = iter_->Valid();
  359. }
  360. Slice key() const override {
  361. assert(Valid());
  362. return GetLengthPrefixedSlice(iter_->key());
  363. }
  364. Slice value() const override {
  365. assert(Valid());
  366. Slice key_slice = GetLengthPrefixedSlice(iter_->key());
  367. return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  368. }
  369. Status status() const override { return Status::OK(); }
  370. bool IsKeyPinned() const override {
  371. // memtable data is always pinned
  372. return true;
  373. }
  374. bool IsValuePinned() const override {
  375. // memtable value is always pinned, except if we allow inplace update.
  376. return value_pinned_;
  377. }
  378. private:
  379. DynamicBloom* bloom_;
  380. const SliceTransform* const prefix_extractor_;
  381. const MemTable::KeyComparator comparator_;
  382. MemTableRep::Iterator* iter_;
  383. bool valid_;
  384. bool arena_mode_;
  385. bool value_pinned_;
  386. };
  387. InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
  388. Arena* arena) {
  389. assert(arena != nullptr);
  390. auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
  391. return new (mem) MemTableIterator(*this, read_options, arena);
  392. }
  393. FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
  394. const ReadOptions& read_options, SequenceNumber read_seq) {
  395. if (read_options.ignore_range_deletions ||
  396. is_range_del_table_empty_.load(std::memory_order_relaxed)) {
  397. return nullptr;
  398. }
  399. auto* unfragmented_iter = new MemTableIterator(
  400. *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
  401. if (unfragmented_iter == nullptr) {
  402. return nullptr;
  403. }
  404. auto fragmented_tombstone_list =
  405. std::make_shared<FragmentedRangeTombstoneList>(
  406. std::unique_ptr<InternalIterator>(unfragmented_iter),
  407. comparator_.comparator);
  408. auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
  409. fragmented_tombstone_list, comparator_.comparator, read_seq);
  410. return fragmented_iter;
  411. }
  412. port::RWMutex* MemTable::GetLock(const Slice& key) {
  413. return &locks_[fastrange64(GetSliceNPHash64(key), locks_.size())];
  414. }
  415. MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
  416. const Slice& end_ikey) {
  417. uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
  418. entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
  419. if (entry_count == 0) {
  420. return {0, 0};
  421. }
  422. uint64_t n = num_entries_.load(std::memory_order_relaxed);
  423. if (n == 0) {
  424. return {0, 0};
  425. }
  426. if (entry_count > n) {
  427. // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
  428. // be larger than actual entries we have. Cap it to entries we have to limit
  429. // the inaccuracy.
  430. entry_count = n;
  431. }
  432. uint64_t data_size = data_size_.load(std::memory_order_relaxed);
  433. return {entry_count * (data_size / n), entry_count};
  434. }
  435. bool MemTable::Add(SequenceNumber s, ValueType type,
  436. const Slice& key, /* user key */
  437. const Slice& value, bool allow_concurrent,
  438. MemTablePostProcessInfo* post_process_info, void** hint) {
  439. // Format of an entry is concatenation of:
  440. // key_size : varint32 of internal_key.size()
  441. // key bytes : char[internal_key.size()]
  442. // value_size : varint32 of value.size()
  443. // value bytes : char[value.size()]
  444. uint32_t key_size = static_cast<uint32_t>(key.size());
  445. uint32_t val_size = static_cast<uint32_t>(value.size());
  446. uint32_t internal_key_size = key_size + 8;
  447. const uint32_t encoded_len = VarintLength(internal_key_size) +
  448. internal_key_size + VarintLength(val_size) +
  449. val_size;
  450. char* buf = nullptr;
  451. std::unique_ptr<MemTableRep>& table =
  452. type == kTypeRangeDeletion ? range_del_table_ : table_;
  453. KeyHandle handle = table->Allocate(encoded_len, &buf);
  454. char* p = EncodeVarint32(buf, internal_key_size);
  455. memcpy(p, key.data(), key_size);
  456. Slice key_slice(p, key_size);
  457. p += key_size;
  458. uint64_t packed = PackSequenceAndType(s, type);
  459. EncodeFixed64(p, packed);
  460. p += 8;
  461. p = EncodeVarint32(p, val_size);
  462. memcpy(p, value.data(), val_size);
  463. assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
  464. size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
  465. if (!allow_concurrent) {
  466. // Extract prefix for insert with hint.
  467. if (insert_with_hint_prefix_extractor_ != nullptr &&
  468. insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
  469. Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
  470. bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
  471. if (UNLIKELY(!res)) {
  472. return res;
  473. }
  474. } else {
  475. bool res = table->InsertKey(handle);
  476. if (UNLIKELY(!res)) {
  477. return res;
  478. }
  479. }
  480. // this is a bit ugly, but is the way to avoid locked instructions
  481. // when incrementing an atomic
  482. num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
  483. std::memory_order_relaxed);
  484. data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
  485. std::memory_order_relaxed);
  486. if (type == kTypeDeletion) {
  487. num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
  488. std::memory_order_relaxed);
  489. }
  490. if (bloom_filter_ && prefix_extractor_ &&
  491. prefix_extractor_->InDomain(key)) {
  492. bloom_filter_->Add(prefix_extractor_->Transform(key));
  493. }
  494. if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
  495. bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
  496. }
  497. // The first sequence number inserted into the memtable
  498. assert(first_seqno_ == 0 || s >= first_seqno_);
  499. if (first_seqno_ == 0) {
  500. first_seqno_.store(s, std::memory_order_relaxed);
  501. if (earliest_seqno_ == kMaxSequenceNumber) {
  502. earliest_seqno_.store(GetFirstSequenceNumber(),
  503. std::memory_order_relaxed);
  504. }
  505. assert(first_seqno_.load() >= earliest_seqno_.load());
  506. }
  507. assert(post_process_info == nullptr);
  508. UpdateFlushState();
  509. } else {
  510. bool res = (hint == nullptr)
  511. ? table->InsertKeyConcurrently(handle)
  512. : table->InsertKeyWithHintConcurrently(handle, hint);
  513. if (UNLIKELY(!res)) {
  514. return res;
  515. }
  516. assert(post_process_info != nullptr);
  517. post_process_info->num_entries++;
  518. post_process_info->data_size += encoded_len;
  519. if (type == kTypeDeletion) {
  520. post_process_info->num_deletes++;
  521. }
  522. if (bloom_filter_ && prefix_extractor_ &&
  523. prefix_extractor_->InDomain(key)) {
  524. bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
  525. }
  526. if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
  527. bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
  528. }
  529. // atomically update first_seqno_ and earliest_seqno_.
  530. uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
  531. while ((cur_seq_num == 0 || s < cur_seq_num) &&
  532. !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
  533. }
  534. uint64_t cur_earliest_seqno =
  535. earliest_seqno_.load(std::memory_order_relaxed);
  536. while (
  537. (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
  538. !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
  539. }
  540. }
  541. if (type == kTypeRangeDeletion) {
  542. is_range_del_table_empty_.store(false, std::memory_order_relaxed);
  543. }
  544. UpdateOldestKeyTime();
  545. return true;
  546. }
  547. // Callback from MemTable::Get()
  548. namespace {
  549. struct Saver {
  550. Status* status;
  551. const LookupKey* key;
  552. bool* found_final_value; // Is value set correctly? Used by KeyMayExist
  553. bool* merge_in_progress;
  554. std::string* value;
  555. SequenceNumber seq;
  556. const MergeOperator* merge_operator;
  557. // the merge operations encountered;
  558. MergeContext* merge_context;
  559. SequenceNumber max_covering_tombstone_seq;
  560. MemTable* mem;
  561. Logger* logger;
  562. Statistics* statistics;
  563. bool inplace_update_support;
  564. bool do_merge;
  565. Env* env_;
  566. ReadCallback* callback_;
  567. bool* is_blob_index;
  568. bool CheckCallback(SequenceNumber _seq) {
  569. if (callback_) {
  570. return callback_->IsVisible(_seq);
  571. }
  572. return true;
  573. }
  574. };
  575. } // namespace
  576. static bool SaveValue(void* arg, const char* entry) {
  577. Saver* s = reinterpret_cast<Saver*>(arg);
  578. assert(s != nullptr);
  579. MergeContext* merge_context = s->merge_context;
  580. SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
  581. const MergeOperator* merge_operator = s->merge_operator;
  582. assert(merge_context != nullptr);
  583. // entry format is:
  584. // klength varint32
  585. // userkey char[klength-8]
  586. // tag uint64
  587. // vlength varint32f
  588. // value char[vlength]
  589. // Check that it belongs to same user key. We do not check the
  590. // sequence number since the Seek() call above should have skipped
  591. // all entries with overly large sequence numbers.
  592. uint32_t key_length;
  593. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  594. Slice user_key_slice = Slice(key_ptr, key_length - 8);
  595. if (s->mem->GetInternalKeyComparator()
  596. .user_comparator()
  597. ->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
  598. // Correct user key
  599. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  600. ValueType type;
  601. SequenceNumber seq;
  602. UnPackSequenceAndType(tag, &seq, &type);
  603. // If the value is not in the snapshot, skip it
  604. if (!s->CheckCallback(seq)) {
  605. return true; // to continue to the next seq
  606. }
  607. s->seq = seq;
  608. if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
  609. max_covering_tombstone_seq > seq) {
  610. type = kTypeRangeDeletion;
  611. }
  612. switch (type) {
  613. case kTypeBlobIndex:
  614. if (s->is_blob_index == nullptr) {
  615. ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
  616. *(s->status) = Status::NotSupported(
  617. "Encounter unsupported blob value. Please open DB with "
  618. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  619. } else if (*(s->merge_in_progress)) {
  620. *(s->status) =
  621. Status::NotSupported("Blob DB does not support merge operator.");
  622. }
  623. if (!s->status->ok()) {
  624. *(s->found_final_value) = true;
  625. return false;
  626. }
  627. FALLTHROUGH_INTENDED;
  628. case kTypeValue: {
  629. if (s->inplace_update_support) {
  630. s->mem->GetLock(s->key->user_key())->ReadLock();
  631. }
  632. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  633. *(s->status) = Status::OK();
  634. if (*(s->merge_in_progress)) {
  635. if (s->do_merge) {
  636. if (s->value != nullptr) {
  637. *(s->status) = MergeHelper::TimedFullMerge(
  638. merge_operator, s->key->user_key(), &v,
  639. merge_context->GetOperands(), s->value, s->logger,
  640. s->statistics, s->env_, nullptr /* result_operand */, true);
  641. }
  642. } else {
  643. // Preserve the value with the goal of returning it as part of
  644. // raw merge operands to the user
  645. merge_context->PushOperand(
  646. v, s->inplace_update_support == false /* operand_pinned */);
  647. }
  648. } else if (!s->do_merge) {
  649. // Preserve the value with the goal of returning it as part of
  650. // raw merge operands to the user
  651. merge_context->PushOperand(
  652. v, s->inplace_update_support == false /* operand_pinned */);
  653. } else if (s->value != nullptr) {
  654. s->value->assign(v.data(), v.size());
  655. }
  656. if (s->inplace_update_support) {
  657. s->mem->GetLock(s->key->user_key())->ReadUnlock();
  658. }
  659. *(s->found_final_value) = true;
  660. if (s->is_blob_index != nullptr) {
  661. *(s->is_blob_index) = (type == kTypeBlobIndex);
  662. }
  663. return false;
  664. }
  665. case kTypeDeletion:
  666. case kTypeSingleDeletion:
  667. case kTypeRangeDeletion: {
  668. if (*(s->merge_in_progress)) {
  669. if (s->value != nullptr) {
  670. *(s->status) = MergeHelper::TimedFullMerge(
  671. merge_operator, s->key->user_key(), nullptr,
  672. merge_context->GetOperands(), s->value, s->logger,
  673. s->statistics, s->env_, nullptr /* result_operand */, true);
  674. }
  675. } else {
  676. *(s->status) = Status::NotFound();
  677. }
  678. *(s->found_final_value) = true;
  679. return false;
  680. }
  681. case kTypeMerge: {
  682. if (!merge_operator) {
  683. *(s->status) = Status::InvalidArgument(
  684. "merge_operator is not properly initialized.");
  685. // Normally we continue the loop (return true) when we see a merge
  686. // operand. But in case of an error, we should stop the loop
  687. // immediately and pretend we have found the value to stop further
  688. // seek. Otherwise, the later call will override this error status.
  689. *(s->found_final_value) = true;
  690. return false;
  691. }
  692. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  693. *(s->merge_in_progress) = true;
  694. merge_context->PushOperand(
  695. v, s->inplace_update_support == false /* operand_pinned */);
  696. if (s->do_merge && merge_operator->ShouldMerge(
  697. merge_context->GetOperandsDirectionBackward())) {
  698. *(s->status) = MergeHelper::TimedFullMerge(
  699. merge_operator, s->key->user_key(), nullptr,
  700. merge_context->GetOperands(), s->value, s->logger, s->statistics,
  701. s->env_, nullptr /* result_operand */, true);
  702. *(s->found_final_value) = true;
  703. return false;
  704. }
  705. return true;
  706. }
  707. default:
  708. assert(false);
  709. return true;
  710. }
  711. }
  712. // s->state could be Corrupt, merge or notfound
  713. return false;
  714. }
  715. bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
  716. MergeContext* merge_context,
  717. SequenceNumber* max_covering_tombstone_seq,
  718. SequenceNumber* seq, const ReadOptions& read_opts,
  719. ReadCallback* callback, bool* is_blob_index, bool do_merge) {
  720. // The sequence number is updated synchronously in version_set.h
  721. if (IsEmpty()) {
  722. // Avoiding recording stats for speed.
  723. return false;
  724. }
  725. PERF_TIMER_GUARD(get_from_memtable_time);
  726. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  727. NewRangeTombstoneIterator(read_opts,
  728. GetInternalKeySeqno(key.internal_key())));
  729. if (range_del_iter != nullptr) {
  730. *max_covering_tombstone_seq =
  731. std::max(*max_covering_tombstone_seq,
  732. range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
  733. }
  734. Slice user_key = key.user_key();
  735. bool found_final_value = false;
  736. bool merge_in_progress = s->IsMergeInProgress();
  737. bool may_contain = true;
  738. size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
  739. if (bloom_filter_) {
  740. // when both memtable_whole_key_filtering and prefix_extractor_ are set,
  741. // only do whole key filtering for Get() to save CPU
  742. if (moptions_.memtable_whole_key_filtering) {
  743. may_contain =
  744. bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
  745. } else {
  746. assert(prefix_extractor_);
  747. may_contain =
  748. !prefix_extractor_->InDomain(user_key) ||
  749. bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
  750. }
  751. }
  752. if (bloom_filter_ && !may_contain) {
  753. // iter is null if prefix bloom says the key does not exist
  754. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  755. *seq = kMaxSequenceNumber;
  756. } else {
  757. if (bloom_filter_) {
  758. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  759. }
  760. GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
  761. is_blob_index, value, s, merge_context, seq,
  762. &found_final_value, &merge_in_progress);
  763. }
  764. // No change to value, since we have not yet found a Put/Delete
  765. if (!found_final_value && merge_in_progress) {
  766. *s = Status::MergeInProgress();
  767. }
  768. PERF_COUNTER_ADD(get_from_memtable_count, 1);
  769. return found_final_value;
  770. }
  771. void MemTable::GetFromTable(const LookupKey& key,
  772. SequenceNumber max_covering_tombstone_seq,
  773. bool do_merge, ReadCallback* callback,
  774. bool* is_blob_index, std::string* value, Status* s,
  775. MergeContext* merge_context, SequenceNumber* seq,
  776. bool* found_final_value, bool* merge_in_progress) {
  777. Saver saver;
  778. saver.status = s;
  779. saver.found_final_value = found_final_value;
  780. saver.merge_in_progress = merge_in_progress;
  781. saver.key = &key;
  782. saver.value = value;
  783. saver.seq = kMaxSequenceNumber;
  784. saver.mem = this;
  785. saver.merge_context = merge_context;
  786. saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
  787. saver.merge_operator = moptions_.merge_operator;
  788. saver.logger = moptions_.info_log;
  789. saver.inplace_update_support = moptions_.inplace_update_support;
  790. saver.statistics = moptions_.statistics;
  791. saver.env_ = env_;
  792. saver.callback_ = callback;
  793. saver.is_blob_index = is_blob_index;
  794. saver.do_merge = do_merge;
  795. table_->Get(key, &saver, SaveValue);
  796. *seq = saver.seq;
  797. }
  798. void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  799. ReadCallback* callback, bool* is_blob) {
  800. // The sequence number is updated synchronously in version_set.h
  801. if (IsEmpty()) {
  802. // Avoiding recording stats for speed.
  803. return;
  804. }
  805. PERF_TIMER_GUARD(get_from_memtable_time);
  806. MultiGetRange temp_range(*range, range->begin(), range->end());
  807. if (bloom_filter_) {
  808. std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
  809. std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
  810. autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
  811. int num_keys = 0;
  812. for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
  813. if (!prefix_extractor_) {
  814. keys[num_keys++] = &iter->ukey;
  815. } else if (prefix_extractor_->InDomain(iter->ukey)) {
  816. prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey));
  817. keys[num_keys++] = &prefixes.back();
  818. }
  819. }
  820. bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
  821. int idx = 0;
  822. for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
  823. if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) {
  824. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  825. continue;
  826. }
  827. if (!may_match[idx]) {
  828. temp_range.SkipKey(iter);
  829. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  830. } else {
  831. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  832. }
  833. idx++;
  834. }
  835. }
  836. for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
  837. SequenceNumber seq = kMaxSequenceNumber;
  838. bool found_final_value{false};
  839. bool merge_in_progress = iter->s->IsMergeInProgress();
  840. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  841. NewRangeTombstoneIterator(
  842. read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
  843. if (range_del_iter != nullptr) {
  844. iter->max_covering_tombstone_seq = std::max(
  845. iter->max_covering_tombstone_seq,
  846. range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
  847. }
  848. GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
  849. callback, is_blob, iter->value->GetSelf(), iter->s,
  850. &(iter->merge_context), &seq, &found_final_value,
  851. &merge_in_progress);
  852. if (!found_final_value && merge_in_progress) {
  853. *(iter->s) = Status::MergeInProgress();
  854. }
  855. if (found_final_value) {
  856. iter->value->PinSelf();
  857. range->MarkKeyDone(iter);
  858. RecordTick(moptions_.statistics, MEMTABLE_HIT);
  859. }
  860. }
  861. PERF_COUNTER_ADD(get_from_memtable_count, 1);
  862. }
  863. void MemTable::Update(SequenceNumber seq,
  864. const Slice& key,
  865. const Slice& value) {
  866. LookupKey lkey(key, seq);
  867. Slice mem_key = lkey.memtable_key();
  868. std::unique_ptr<MemTableRep::Iterator> iter(
  869. table_->GetDynamicPrefixIterator());
  870. iter->Seek(lkey.internal_key(), mem_key.data());
  871. if (iter->Valid()) {
  872. // entry format is:
  873. // key_length varint32
  874. // userkey char[klength-8]
  875. // tag uint64
  876. // vlength varint32
  877. // value char[vlength]
  878. // Check that it belongs to same user key. We do not check the
  879. // sequence number since the Seek() call above should have skipped
  880. // all entries with overly large sequence numbers.
  881. const char* entry = iter->key();
  882. uint32_t key_length = 0;
  883. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  884. if (comparator_.comparator.user_comparator()->Equal(
  885. Slice(key_ptr, key_length - 8), lkey.user_key())) {
  886. // Correct user key
  887. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  888. ValueType type;
  889. SequenceNumber existing_seq;
  890. UnPackSequenceAndType(tag, &existing_seq, &type);
  891. assert(existing_seq != seq);
  892. if (type == kTypeValue) {
  893. Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
  894. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  895. uint32_t new_size = static_cast<uint32_t>(value.size());
  896. // Update value, if new value size <= previous value size
  897. if (new_size <= prev_size) {
  898. char* p =
  899. EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
  900. WriteLock wl(GetLock(lkey.user_key()));
  901. memcpy(p, value.data(), value.size());
  902. assert((unsigned)((p + value.size()) - entry) ==
  903. (unsigned)(VarintLength(key_length) + key_length +
  904. VarintLength(value.size()) + value.size()));
  905. RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
  906. return;
  907. }
  908. }
  909. }
  910. }
  911. // key doesn't exist
  912. bool add_res __attribute__((__unused__));
  913. add_res = Add(seq, kTypeValue, key, value);
  914. // We already checked unused != seq above. In that case, Add should not fail.
  915. assert(add_res);
  916. }
  917. bool MemTable::UpdateCallback(SequenceNumber seq,
  918. const Slice& key,
  919. const Slice& delta) {
  920. LookupKey lkey(key, seq);
  921. Slice memkey = lkey.memtable_key();
  922. std::unique_ptr<MemTableRep::Iterator> iter(
  923. table_->GetDynamicPrefixIterator());
  924. iter->Seek(lkey.internal_key(), memkey.data());
  925. if (iter->Valid()) {
  926. // entry format is:
  927. // key_length varint32
  928. // userkey char[klength-8]
  929. // tag uint64
  930. // vlength varint32
  931. // value char[vlength]
  932. // Check that it belongs to same user key. We do not check the
  933. // sequence number since the Seek() call above should have skipped
  934. // all entries with overly large sequence numbers.
  935. const char* entry = iter->key();
  936. uint32_t key_length = 0;
  937. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  938. if (comparator_.comparator.user_comparator()->Equal(
  939. Slice(key_ptr, key_length - 8), lkey.user_key())) {
  940. // Correct user key
  941. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  942. ValueType type;
  943. uint64_t unused;
  944. UnPackSequenceAndType(tag, &unused, &type);
  945. switch (type) {
  946. case kTypeValue: {
  947. Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
  948. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  949. char* prev_buffer = const_cast<char*>(prev_value.data());
  950. uint32_t new_prev_size = prev_size;
  951. std::string str_value;
  952. WriteLock wl(GetLock(lkey.user_key()));
  953. auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
  954. delta, &str_value);
  955. if (status == UpdateStatus::UPDATED_INPLACE) {
  956. // Value already updated by callback.
  957. assert(new_prev_size <= prev_size);
  958. if (new_prev_size < prev_size) {
  959. // overwrite the new prev_size
  960. char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
  961. new_prev_size);
  962. if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
  963. // shift the value buffer as well.
  964. memcpy(p, prev_buffer, new_prev_size);
  965. }
  966. }
  967. RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
  968. UpdateFlushState();
  969. return true;
  970. } else if (status == UpdateStatus::UPDATED) {
  971. Add(seq, kTypeValue, key, Slice(str_value));
  972. RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
  973. UpdateFlushState();
  974. return true;
  975. } else if (status == UpdateStatus::UPDATE_FAILED) {
  976. // No action required. Return.
  977. UpdateFlushState();
  978. return true;
  979. }
  980. }
  981. default:
  982. break;
  983. }
  984. }
  985. }
  986. // If the latest value is not kTypeValue
  987. // or key doesn't exist
  988. return false;
  989. }
  990. size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  991. Slice memkey = key.memtable_key();
  992. // A total ordered iterator is costly for some memtablerep (prefix aware
  993. // reps). By passing in the user key, we allow efficient iterator creation.
  994. // The iterator only needs to be ordered within the same user key.
  995. std::unique_ptr<MemTableRep::Iterator> iter(
  996. table_->GetDynamicPrefixIterator());
  997. iter->Seek(key.internal_key(), memkey.data());
  998. size_t num_successive_merges = 0;
  999. for (; iter->Valid(); iter->Next()) {
  1000. const char* entry = iter->key();
  1001. uint32_t key_length = 0;
  1002. const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  1003. if (!comparator_.comparator.user_comparator()->Equal(
  1004. Slice(iter_key_ptr, key_length - 8), key.user_key())) {
  1005. break;
  1006. }
  1007. const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
  1008. ValueType type;
  1009. uint64_t unused;
  1010. UnPackSequenceAndType(tag, &unused, &type);
  1011. if (type != kTypeMerge) {
  1012. break;
  1013. }
  1014. ++num_successive_merges;
  1015. }
  1016. return num_successive_merges;
  1017. }
  1018. void MemTableRep::Get(const LookupKey& k, void* callback_args,
  1019. bool (*callback_func)(void* arg, const char* entry)) {
  1020. auto iter = GetDynamicPrefixIterator();
  1021. for (iter->Seek(k.internal_key(), k.memtable_key().data());
  1022. iter->Valid() && callback_func(callback_args, iter->key());
  1023. iter->Next()) {
  1024. }
  1025. }
  1026. void MemTable::RefLogContainingPrepSection(uint64_t log) {
  1027. assert(log > 0);
  1028. auto cur = min_prep_log_referenced_.load();
  1029. while ((log < cur || cur == 0) &&
  1030. !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
  1031. cur = min_prep_log_referenced_.load();
  1032. }
  1033. }
  1034. uint64_t MemTable::GetMinLogContainingPrepSection() {
  1035. return min_prep_log_referenced_.load();
  1036. }
  1037. } // namespace ROCKSDB_NAMESPACE