memtable.cc 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/memtable.h"
  10. #include <algorithm>
  11. #include <array>
  12. #include <limits>
  13. #include <memory>
  14. #include <optional>
  15. #include "db/dbformat.h"
  16. #include "db/kv_checksum.h"
  17. #include "db/merge_context.h"
  18. #include "db/merge_helper.h"
  19. #include "db/pinned_iterators_manager.h"
  20. #include "db/range_tombstone_fragmenter.h"
  21. #include "db/read_callback.h"
  22. #include "db/wide/wide_column_serialization.h"
  23. #include "logging/logging.h"
  24. #include "memory/arena.h"
  25. #include "memory/memory_usage.h"
  26. #include "monitoring/perf_context_imp.h"
  27. #include "monitoring/statistics_impl.h"
  28. #include "port/lang.h"
  29. #include "port/port.h"
  30. #include "rocksdb/comparator.h"
  31. #include "rocksdb/env.h"
  32. #include "rocksdb/iterator.h"
  33. #include "rocksdb/merge_operator.h"
  34. #include "rocksdb/slice_transform.h"
  35. #include "rocksdb/types.h"
  36. #include "rocksdb/write_buffer_manager.h"
  37. #include "table/internal_iterator.h"
  38. #include "table/iterator_wrapper.h"
  39. #include "table/merging_iterator.h"
  40. #include "util/autovector.h"
  41. #include "util/coding.h"
  42. #include "util/mutexlock.h"
  43. namespace ROCKSDB_NAMESPACE {
  44. ImmutableMemTableOptions::ImmutableMemTableOptions(
  45. const ImmutableOptions& ioptions,
  46. const MutableCFOptions& mutable_cf_options)
  47. : arena_block_size(mutable_cf_options.arena_block_size),
  48. memtable_prefix_bloom_bits(
  49. static_cast<uint32_t>(
  50. static_cast<double>(mutable_cf_options.write_buffer_size) *
  51. mutable_cf_options.memtable_prefix_bloom_size_ratio) *
  52. 8u),
  53. memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
  54. memtable_whole_key_filtering(
  55. mutable_cf_options.memtable_whole_key_filtering),
  56. inplace_update_support(ioptions.inplace_update_support),
  57. inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
  58. inplace_callback(ioptions.inplace_callback),
  59. max_successive_merges(mutable_cf_options.max_successive_merges),
  60. strict_max_successive_merges(
  61. mutable_cf_options.strict_max_successive_merges),
  62. statistics(ioptions.stats),
  63. merge_operator(ioptions.merge_operator.get()),
  64. info_log(ioptions.logger),
  65. protection_bytes_per_key(
  66. mutable_cf_options.memtable_protection_bytes_per_key),
  67. allow_data_in_errors(ioptions.allow_data_in_errors),
  68. paranoid_memory_checks(mutable_cf_options.paranoid_memory_checks),
  69. memtable_veirfy_per_key_checksum_on_seek(
  70. mutable_cf_options.memtable_veirfy_per_key_checksum_on_seek) {}
  71. MemTable::MemTable(const InternalKeyComparator& cmp,
  72. const ImmutableOptions& ioptions,
  73. const MutableCFOptions& mutable_cf_options,
  74. WriteBufferManager* write_buffer_manager,
  75. SequenceNumber latest_seq, uint32_t column_family_id)
  76. : comparator_(cmp),
  77. moptions_(ioptions, mutable_cf_options),
  78. kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)),
  79. mem_tracker_(write_buffer_manager),
  80. arena_(moptions_.arena_block_size,
  81. (write_buffer_manager != nullptr &&
  82. (write_buffer_manager->enabled() ||
  83. write_buffer_manager->cost_to_cache()))
  84. ? &mem_tracker_
  85. : nullptr,
  86. mutable_cf_options.memtable_huge_page_size),
  87. table_(ioptions.memtable_factory->CreateMemTableRep(
  88. comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
  89. ioptions.logger, column_family_id)),
  90. range_del_table_(SkipListFactory().CreateMemTableRep(
  91. comparator_, &arena_, nullptr /* transform */, ioptions.logger,
  92. column_family_id)),
  93. is_range_del_table_empty_(true),
  94. data_size_(0),
  95. num_entries_(0),
  96. num_deletes_(0),
  97. num_range_deletes_(0),
  98. write_buffer_size_(mutable_cf_options.write_buffer_size),
  99. first_seqno_(0),
  100. earliest_seqno_(latest_seq),
  101. creation_seq_(latest_seq),
  102. min_prep_log_referenced_(0),
  103. locks_(moptions_.inplace_update_support
  104. ? moptions_.inplace_update_num_locks
  105. : 0),
  106. prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
  107. flush_state_(FLUSH_NOT_REQUESTED),
  108. clock_(ioptions.clock),
  109. insert_with_hint_prefix_extractor_(
  110. ioptions.memtable_insert_with_hint_prefix_extractor.get()),
  111. oldest_key_time_(std::numeric_limits<uint64_t>::max()),
  112. approximate_memory_usage_(0),
  113. memtable_max_range_deletions_(
  114. mutable_cf_options.memtable_max_range_deletions),
  115. key_validation_callback_(
  116. (moptions_.protection_bytes_per_key != 0 &&
  117. moptions_.memtable_veirfy_per_key_checksum_on_seek)
  118. ? std::bind(&MemTable::ValidateKey, this, std::placeholders::_1,
  119. std::placeholders::_2)
  120. : std::function<Status(const char*, bool)>(nullptr)) {
  121. UpdateFlushState();
  122. // something went wrong if we need to flush before inserting anything
  123. assert(!ShouldScheduleFlush());
  124. // use bloom_filter_ for both whole key and prefix bloom filter
  125. if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
  126. moptions_.memtable_prefix_bloom_bits > 0) {
  127. bloom_filter_.reset(
  128. new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
  129. 6 /* hard coded 6 probes */,
  130. moptions_.memtable_huge_page_size, ioptions.logger));
  131. }
  132. // Initialize cached_range_tombstone_ here since it could
  133. // be read before it is constructed in MemTable::Add(), which could also lead
  134. // to a data race on the global mutex table backing atomic shared_ptr.
  135. auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
  136. size_t size = cached_range_tombstone_.Size();
  137. for (size_t i = 0; i < size; ++i) {
  138. #if defined(__cpp_lib_atomic_shared_ptr)
  139. std::atomic<std::shared_ptr<FragmentedRangeTombstoneListCache>>*
  140. local_cache_ref_ptr = cached_range_tombstone_.AccessAtCore(i);
  141. auto new_local_cache_ref = std::make_shared<
  142. const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
  143. std::shared_ptr<FragmentedRangeTombstoneListCache> aliased_ptr(
  144. new_local_cache_ref, new_cache.get());
  145. local_cache_ref_ptr->store(std::move(aliased_ptr),
  146. std::memory_order_relaxed);
  147. #else
  148. std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
  149. cached_range_tombstone_.AccessAtCore(i);
  150. auto new_local_cache_ref = std::make_shared<
  151. const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
  152. std::atomic_store_explicit(
  153. local_cache_ref_ptr,
  154. std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref,
  155. new_cache.get()),
  156. std::memory_order_relaxed);
  157. #endif
  158. }
  159. const Comparator* ucmp = cmp.user_comparator();
  160. assert(ucmp);
  161. ts_sz_ = ucmp->timestamp_size();
  162. }
  163. MemTable::~MemTable() {
  164. mem_tracker_.FreeMem();
  165. assert(refs_ == 0);
  166. }
  167. size_t MemTable::ApproximateMemoryUsage() {
  168. autovector<size_t> usages = {
  169. arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
  170. range_del_table_->ApproximateMemoryUsage(),
  171. ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
  172. size_t total_usage = 0;
  173. for (size_t usage : usages) {
  174. // If usage + total_usage >= kMaxSizet, return kMaxSizet.
  175. // the following variation is to avoid numeric overflow.
  176. if (usage >= std::numeric_limits<size_t>::max() - total_usage) {
  177. return std::numeric_limits<size_t>::max();
  178. }
  179. total_usage += usage;
  180. }
  181. approximate_memory_usage_.StoreRelaxed(total_usage);
  182. // otherwise, return the actual usage
  183. return total_usage;
  184. }
  185. bool MemTable::ShouldFlushNow() {
  186. if (IsMarkedForFlush()) {
  187. // TODO: dedicated flush reason when marked for flush
  188. return true;
  189. }
  190. // This is set if memtable_max_range_deletions is > 0,
  191. // and that many range deletions are done
  192. if (memtable_max_range_deletions_ > 0 &&
  193. num_range_deletes_.LoadRelaxed() >=
  194. static_cast<uint64_t>(memtable_max_range_deletions_)) {
  195. return true;
  196. }
  197. size_t write_buffer_size = write_buffer_size_.LoadRelaxed();
  198. // In a lot of times, we cannot allocate arena blocks that exactly matches the
  199. // buffer size. Thus we have to decide if we should over-allocate or
  200. // under-allocate.
  201. // This constant variable can be interpreted as: if we still have more than
  202. // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
  203. // allocate one more block.
  204. const double kAllowOverAllocationRatio = 0.6;
  205. // range deletion use skip list which allocates all memeory through `arena_`
  206. assert(range_del_table_->ApproximateMemoryUsage() == 0);
  207. // If arena still have room for new block allocation, we can safely say it
  208. // shouldn't flush.
  209. auto allocated_memory =
  210. table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
  211. approximate_memory_usage_.StoreRelaxed(allocated_memory);
  212. // if we can still allocate one more block without exceeding the
  213. // over-allocation ratio, then we should not flush.
  214. if (allocated_memory + kArenaBlockSize <
  215. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  216. return false;
  217. }
  218. // if user keeps adding entries that exceeds write_buffer_size, we need to
  219. // flush earlier even though we still have much available memory left.
  220. if (allocated_memory >
  221. write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
  222. return true;
  223. }
  224. // In this code path, Arena has already allocated its "last block", which
  225. // means the total allocatedmemory size is either:
  226. // (1) "moderately" over allocated the memory (no more than `0.6 * arena
  227. // block size`. Or,
  228. // (2) the allocated memory is less than write buffer size, but we'll stop
  229. // here since if we allocate a new arena block, we'll over allocate too much
  230. // more (half of the arena block size) memory.
  231. //
  232. // In either case, to avoid over-allocate, the last block will stop allocation
  233. // when its usage reaches a certain ratio, which we carefully choose "0.75
  234. // full" as the stop condition because it addresses the following issue with
  235. // great simplicity: What if the next inserted entry's size is
  236. // bigger than AllocatedAndUnused()?
  237. //
  238. // The answer is: if the entry size is also bigger than 0.25 *
  239. // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
  240. // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
  241. // and regular block. In either case, we *overly* over-allocated.
  242. //
  243. // Therefore, setting the last block to be at most "0.75 full" avoids both
  244. // cases.
  245. //
  246. // NOTE: the average percentage of waste space of this approach can be counted
  247. // as: "arena block size * 0.25 / write buffer size". User who specify a small
  248. // write buffer size and/or big arena block size may suffer.
  249. return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
  250. }
  251. void MemTable::UpdateFlushState() {
  252. auto state = flush_state_.load(std::memory_order_relaxed);
  253. if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
  254. // ignore CAS failure, because that means somebody else requested
  255. // a flush
  256. flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
  257. std::memory_order_relaxed,
  258. std::memory_order_relaxed);
  259. }
  260. }
  261. void MemTable::UpdateOldestKeyTime() {
  262. uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
  263. if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
  264. int64_t current_time = 0;
  265. auto s = clock_->GetCurrentTime(&current_time);
  266. if (s.ok()) {
  267. assert(current_time >= 0);
  268. // If fail, the timestamp is already set.
  269. oldest_key_time_.compare_exchange_strong(
  270. oldest_key_time, static_cast<uint64_t>(current_time),
  271. std::memory_order_relaxed, std::memory_order_relaxed);
  272. }
  273. }
  274. }
  275. Status MemTable::VerifyEntryChecksum(const char* entry,
  276. uint32_t protection_bytes_per_key,
  277. bool allow_data_in_errors) {
  278. if (protection_bytes_per_key == 0) {
  279. return Status::OK();
  280. }
  281. uint32_t key_length;
  282. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  283. if (key_ptr == nullptr) {
  284. return Status::Corruption("Unable to parse internal key length");
  285. }
  286. if (key_length < 8) {
  287. return Status::Corruption("Memtable entry internal key length too short.");
  288. }
  289. Slice user_key = Slice(key_ptr, key_length - 8);
  290. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  291. ValueType type;
  292. SequenceNumber seq;
  293. UnPackSequenceAndType(tag, &seq, &type);
  294. uint32_t value_length = 0;
  295. const char* value_ptr = GetVarint32Ptr(
  296. key_ptr + key_length, key_ptr + key_length + 5, &value_length);
  297. if (value_ptr == nullptr) {
  298. return Status::Corruption("Unable to parse internal key value");
  299. }
  300. Slice value = Slice(value_ptr, value_length);
  301. const char* checksum_ptr = value_ptr + value_length;
  302. bool match =
  303. ProtectionInfo64()
  304. .ProtectKVO(user_key, value, type)
  305. .ProtectS(seq)
  306. .Verify(static_cast<uint8_t>(protection_bytes_per_key), checksum_ptr);
  307. if (!match) {
  308. std::string msg(
  309. "Corrupted memtable entry, per key-value checksum verification "
  310. "failed.");
  311. if (allow_data_in_errors) {
  312. msg.append("Unrecognized value type: " +
  313. std::to_string(static_cast<int>(type)) + ". ");
  314. msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". ");
  315. msg.append("seq: " + std::to_string(seq) + ".");
  316. }
  317. return Status::Corruption(msg.c_str());
  318. }
  319. return Status::OK();
  320. }
  321. int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
  322. const char* prefix_len_key2) const {
  323. // Internal keys are encoded as length-prefixed strings.
  324. Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  325. Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  326. return comparator.CompareKeySeq(k1, k2);
  327. }
  328. int MemTable::KeyComparator::operator()(
  329. const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
  330. // Internal keys are encoded as length-prefixed strings.
  331. Slice a = GetLengthPrefixedSlice(prefix_len_key);
  332. return comparator.CompareKeySeq(a, key);
  333. }
  334. void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
  335. throw std::runtime_error("concurrent insert not supported");
  336. }
  337. Slice MemTableRep::UserKey(const char* key) const {
  338. Slice slice = GetLengthPrefixedSlice(key);
  339. return Slice(slice.data(), slice.size() - 8);
  340. }
  341. KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
  342. *buf = allocator_->Allocate(len);
  343. return static_cast<KeyHandle>(*buf);
  344. }
  345. // Encode a suitable internal key target for "target" and return it.
  346. // Uses *scratch as scratch space, and the returned pointer will point
  347. // into this scratch space.
  348. const char* EncodeKey(std::string* scratch, const Slice& target) {
  349. scratch->clear();
  350. PutVarint32(scratch, static_cast<uint32_t>(target.size()));
  351. scratch->append(target.data(), target.size());
  352. return scratch->data();
  353. }
  354. class MemTableIterator : public InternalIterator {
  355. public:
  356. enum Kind { kPointEntries, kRangeDelEntries };
  357. MemTableIterator(
  358. Kind kind, const MemTable& mem, const ReadOptions& read_options,
  359. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping = nullptr,
  360. Arena* arena = nullptr,
  361. const SliceTransform* cf_prefix_extractor = nullptr)
  362. : bloom_(nullptr),
  363. prefix_extractor_(mem.prefix_extractor_),
  364. comparator_(mem.comparator_),
  365. seqno_to_time_mapping_(seqno_to_time_mapping),
  366. status_(Status::OK()),
  367. logger_(mem.moptions_.info_log),
  368. ts_sz_(mem.ts_sz_),
  369. protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
  370. valid_(false),
  371. value_pinned_(
  372. !mem.GetImmutableMemTableOptions()->inplace_update_support),
  373. arena_mode_(arena != nullptr),
  374. paranoid_memory_checks_(mem.moptions_.paranoid_memory_checks),
  375. validate_on_seek_(
  376. mem.moptions_.paranoid_memory_checks ||
  377. mem.moptions_.memtable_veirfy_per_key_checksum_on_seek),
  378. allow_data_in_error_(mem.moptions_.allow_data_in_errors),
  379. key_validation_callback_(mem.key_validation_callback_) {
  380. if (kind == kRangeDelEntries) {
  381. iter_ = mem.range_del_table_->GetIterator(arena);
  382. } else if (prefix_extractor_ != nullptr &&
  383. // NOTE: checking extractor equivalence when not pointer
  384. // equivalent is arguably too expensive for memtable
  385. prefix_extractor_ == cf_prefix_extractor &&
  386. (read_options.prefix_same_as_start ||
  387. (!read_options.total_order_seek &&
  388. !read_options.auto_prefix_mode))) {
  389. // Auto prefix mode is not implemented in memtable yet.
  390. assert(kind == kPointEntries);
  391. bloom_ = mem.bloom_filter_.get();
  392. iter_ = mem.table_->GetDynamicPrefixIterator(arena);
  393. } else {
  394. assert(kind == kPointEntries);
  395. iter_ = mem.table_->GetIterator(arena);
  396. }
  397. status_.PermitUncheckedError();
  398. }
  399. // No copying allowed
  400. MemTableIterator(const MemTableIterator&) = delete;
  401. void operator=(const MemTableIterator&) = delete;
  402. ~MemTableIterator() override {
  403. #ifndef NDEBUG
  404. // Assert that the MemTableIterator is never deleted while
  405. // Pinning is Enabled.
  406. assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
  407. #endif
  408. if (arena_mode_) {
  409. iter_->~Iterator();
  410. } else {
  411. delete iter_;
  412. }
  413. status_.PermitUncheckedError();
  414. }
  415. #ifndef NDEBUG
  416. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  417. pinned_iters_mgr_ = pinned_iters_mgr;
  418. }
  419. PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
  420. #endif
  421. bool Valid() const override {
  422. // If inner iter_ is not valid, then this iter should also not be valid.
  423. assert(iter_->Valid() || !(valid_ && status_.ok()));
  424. return valid_ && status_.ok();
  425. }
  426. void Seek(const Slice& k) override {
  427. PERF_TIMER_GUARD(seek_on_memtable_time);
  428. PERF_COUNTER_ADD(seek_on_memtable_count, 1);
  429. status_ = Status::OK();
  430. if (bloom_) {
  431. // iterator should only use prefix bloom filter
  432. Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
  433. if (prefix_extractor_->InDomain(user_k_without_ts)) {
  434. Slice prefix = prefix_extractor_->Transform(user_k_without_ts);
  435. if (!bloom_->MayContain(prefix)) {
  436. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  437. valid_ = false;
  438. return;
  439. } else {
  440. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  441. }
  442. }
  443. }
  444. if (validate_on_seek_) {
  445. status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error_,
  446. paranoid_memory_checks_,
  447. key_validation_callback_);
  448. } else {
  449. iter_->Seek(k, nullptr);
  450. }
  451. valid_ = iter_->Valid();
  452. VerifyEntryChecksum();
  453. }
  454. void SeekForPrev(const Slice& k) override {
  455. PERF_TIMER_GUARD(seek_on_memtable_time);
  456. PERF_COUNTER_ADD(seek_on_memtable_count, 1);
  457. status_ = Status::OK();
  458. if (bloom_) {
  459. Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
  460. if (prefix_extractor_->InDomain(user_k_without_ts)) {
  461. if (!bloom_->MayContain(
  462. prefix_extractor_->Transform(user_k_without_ts))) {
  463. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  464. valid_ = false;
  465. return;
  466. } else {
  467. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  468. }
  469. }
  470. }
  471. if (validate_on_seek_) {
  472. status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error_,
  473. paranoid_memory_checks_,
  474. key_validation_callback_);
  475. } else {
  476. iter_->Seek(k, nullptr);
  477. }
  478. valid_ = iter_->Valid();
  479. VerifyEntryChecksum();
  480. if (!Valid() && status().ok()) {
  481. SeekToLast();
  482. }
  483. while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
  484. Prev();
  485. }
  486. }
  487. void SeekToFirst() override {
  488. status_ = Status::OK();
  489. iter_->SeekToFirst();
  490. valid_ = iter_->Valid();
  491. VerifyEntryChecksum();
  492. }
  493. void SeekToLast() override {
  494. status_ = Status::OK();
  495. iter_->SeekToLast();
  496. valid_ = iter_->Valid();
  497. VerifyEntryChecksum();
  498. }
  499. void Next() override {
  500. PERF_COUNTER_ADD(next_on_memtable_count, 1);
  501. assert(Valid());
  502. if (paranoid_memory_checks_) {
  503. status_ = iter_->NextAndValidate(allow_data_in_error_);
  504. } else {
  505. iter_->Next();
  506. TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
  507. }
  508. valid_ = iter_->Valid();
  509. VerifyEntryChecksum();
  510. }
  511. bool NextAndGetResult(IterateResult* result) override {
  512. Next();
  513. bool is_valid = Valid();
  514. if (is_valid) {
  515. result->key = key();
  516. result->bound_check_result = IterBoundCheck::kUnknown;
  517. result->value_prepared = true;
  518. }
  519. return is_valid;
  520. }
  521. void Prev() override {
  522. PERF_COUNTER_ADD(prev_on_memtable_count, 1);
  523. assert(Valid());
  524. if (paranoid_memory_checks_) {
  525. status_ = iter_->PrevAndValidate(allow_data_in_error_);
  526. } else {
  527. iter_->Prev();
  528. }
  529. valid_ = iter_->Valid();
  530. VerifyEntryChecksum();
  531. }
  532. Slice key() const override {
  533. assert(Valid());
  534. return GetLengthPrefixedSlice(iter_->key());
  535. }
  536. uint64_t write_unix_time() const override {
  537. assert(Valid());
  538. ParsedInternalKey pikey;
  539. Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
  540. if (!s.ok()) {
  541. return std::numeric_limits<uint64_t>::max();
  542. } else if (kTypeValuePreferredSeqno == pikey.type) {
  543. return ParsePackedValueForWriteTime(value());
  544. } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
  545. return std::numeric_limits<uint64_t>::max();
  546. }
  547. return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence);
  548. }
  549. Slice value() const override {
  550. assert(Valid());
  551. Slice key_slice = GetLengthPrefixedSlice(iter_->key());
  552. return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  553. }
  554. Status status() const override { return status_; }
  555. bool IsKeyPinned() const override {
  556. // memtable data is always pinned
  557. return true;
  558. }
  559. bool IsValuePinned() const override {
  560. // memtable value is always pinned, except if we allow inplace update.
  561. return value_pinned_;
  562. }
  563. private:
  564. DynamicBloom* bloom_;
  565. const SliceTransform* const prefix_extractor_;
  566. const MemTable::KeyComparator comparator_;
  567. MemTableRep::Iterator* iter_;
  568. // The seqno to time mapping is owned by the SuperVersion.
  569. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
  570. Status status_;
  571. Logger* logger_;
  572. size_t ts_sz_;
  573. uint32_t protection_bytes_per_key_;
  574. bool valid_;
  575. bool value_pinned_;
  576. bool arena_mode_;
  577. const bool paranoid_memory_checks_;
  578. const bool validate_on_seek_;
  579. const bool allow_data_in_error_;
  580. const std::function<Status(const char*, bool)> key_validation_callback_;
  581. void VerifyEntryChecksum() {
  582. if (protection_bytes_per_key_ > 0 && Valid()) {
  583. status_ = MemTable::VerifyEntryChecksum(iter_->key(),
  584. protection_bytes_per_key_);
  585. if (!status_.ok()) {
  586. ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState());
  587. }
  588. }
  589. }
  590. };
  591. InternalIterator* MemTable::NewIterator(
  592. const ReadOptions& read_options,
  593. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  594. const SliceTransform* prefix_extractor, bool /*for_flush*/) {
  595. assert(arena != nullptr);
  596. auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
  597. return new (mem)
  598. MemTableIterator(MemTableIterator::kPointEntries, *this, read_options,
  599. seqno_to_time_mapping, arena, prefix_extractor);
  600. }
  601. // An iterator wrapper that wraps a MemTableIterator and logically strips each
  602. // key's user-defined timestamp.
  603. class TimestampStrippingIterator : public InternalIterator {
  604. public:
  605. TimestampStrippingIterator(
  606. MemTableIterator::Kind kind, const MemTable& memtable,
  607. const ReadOptions& read_options,
  608. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  609. const SliceTransform* cf_prefix_extractor, size_t ts_sz)
  610. : arena_mode_(arena != nullptr), kind_(kind), ts_sz_(ts_sz) {
  611. assert(ts_sz_ != 0);
  612. void* mem = arena ? arena->AllocateAligned(sizeof(MemTableIterator))
  613. : operator new(sizeof(MemTableIterator));
  614. iter_ = new (mem)
  615. MemTableIterator(kind, memtable, read_options, seqno_to_time_mapping,
  616. arena, cf_prefix_extractor);
  617. }
  618. // No copying allowed
  619. TimestampStrippingIterator(const TimestampStrippingIterator&) = delete;
  620. void operator=(const TimestampStrippingIterator&) = delete;
  621. ~TimestampStrippingIterator() override {
  622. if (arena_mode_) {
  623. iter_->~MemTableIterator();
  624. } else {
  625. delete iter_;
  626. }
  627. }
  628. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  629. iter_->SetPinnedItersMgr(pinned_iters_mgr);
  630. }
  631. bool Valid() const override { return iter_->Valid(); }
  632. void Seek(const Slice& k) override {
  633. iter_->Seek(k);
  634. UpdateKeyAndValueBuffer();
  635. }
  636. void SeekForPrev(const Slice& k) override {
  637. iter_->SeekForPrev(k);
  638. UpdateKeyAndValueBuffer();
  639. }
  640. void SeekToFirst() override {
  641. iter_->SeekToFirst();
  642. UpdateKeyAndValueBuffer();
  643. }
  644. void SeekToLast() override {
  645. iter_->SeekToLast();
  646. UpdateKeyAndValueBuffer();
  647. }
  648. void Next() override {
  649. iter_->Next();
  650. UpdateKeyAndValueBuffer();
  651. }
  652. bool NextAndGetResult(IterateResult* result) override {
  653. iter_->Next();
  654. UpdateKeyAndValueBuffer();
  655. bool is_valid = Valid();
  656. if (is_valid) {
  657. result->key = key();
  658. result->bound_check_result = IterBoundCheck::kUnknown;
  659. result->value_prepared = true;
  660. }
  661. return is_valid;
  662. }
  663. void Prev() override {
  664. iter_->Prev();
  665. UpdateKeyAndValueBuffer();
  666. }
  667. Slice key() const override {
  668. assert(Valid());
  669. return key_buf_;
  670. }
  671. uint64_t write_unix_time() const override { return iter_->write_unix_time(); }
  672. Slice value() const override {
  673. if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
  674. return value_buf_;
  675. }
  676. return iter_->value();
  677. }
  678. Status status() const override { return iter_->status(); }
  679. bool IsKeyPinned() const override {
  680. // Key is only in a buffer that is updated in each iteration.
  681. return false;
  682. }
  683. bool IsValuePinned() const override {
  684. if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
  685. return false;
  686. }
  687. return iter_->IsValuePinned();
  688. }
  689. private:
  690. void UpdateKeyAndValueBuffer() {
  691. key_buf_.clear();
  692. if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
  693. value_buf_.clear();
  694. }
  695. if (!Valid()) {
  696. return;
  697. }
  698. Slice original_key = iter_->key();
  699. ReplaceInternalKeyWithMinTimestamp(&key_buf_, original_key, ts_sz_);
  700. if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
  701. Slice original_value = iter_->value();
  702. AppendUserKeyWithMinTimestamp(&value_buf_, original_value, ts_sz_);
  703. }
  704. }
  705. bool arena_mode_;
  706. MemTableIterator::Kind kind_;
  707. size_t ts_sz_;
  708. MemTableIterator* iter_;
  709. std::string key_buf_;
  710. std::string value_buf_;
  711. };
  712. InternalIterator* MemTable::NewTimestampStrippingIterator(
  713. const ReadOptions& read_options,
  714. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  715. const SliceTransform* prefix_extractor, size_t ts_sz) {
  716. assert(arena != nullptr);
  717. auto mem = arena->AllocateAligned(sizeof(TimestampStrippingIterator));
  718. return new (mem) TimestampStrippingIterator(
  719. MemTableIterator::kPointEntries, *this, read_options,
  720. seqno_to_time_mapping, arena, prefix_extractor, ts_sz);
  721. }
  722. FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
  723. const ReadOptions& read_options, SequenceNumber read_seq,
  724. bool immutable_memtable) {
  725. if (read_options.ignore_range_deletions ||
  726. is_range_del_table_empty_.LoadRelaxed()) {
  727. return nullptr;
  728. }
  729. return NewRangeTombstoneIteratorInternal(read_options, read_seq,
  730. immutable_memtable);
  731. }
  732. FragmentedRangeTombstoneIterator*
  733. MemTable::NewTimestampStrippingRangeTombstoneIterator(
  734. const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz) {
  735. if (read_options.ignore_range_deletions ||
  736. is_range_del_table_empty_.LoadRelaxed()) {
  737. return nullptr;
  738. }
  739. if (!timestamp_stripping_fragmented_range_tombstone_list_) {
  740. // TODO: plumb Env::IOActivity, Env::IOPriority
  741. auto* unfragmented_iter = new TimestampStrippingIterator(
  742. MemTableIterator::kRangeDelEntries, *this, ReadOptions(),
  743. /*seqno_to_time_mapping*/ nullptr, /* arena */ nullptr,
  744. /* prefix_extractor */ nullptr, ts_sz);
  745. timestamp_stripping_fragmented_range_tombstone_list_ =
  746. std::make_unique<FragmentedRangeTombstoneList>(
  747. std::unique_ptr<InternalIterator>(unfragmented_iter),
  748. comparator_.comparator);
  749. }
  750. return new FragmentedRangeTombstoneIterator(
  751. timestamp_stripping_fragmented_range_tombstone_list_.get(),
  752. comparator_.comparator, read_seq, read_options.timestamp);
  753. }
  754. FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
  755. const ReadOptions& read_options, SequenceNumber read_seq,
  756. bool immutable_memtable) {
  757. if (immutable_memtable) {
  758. // Note that caller should already have verified that
  759. // !is_range_del_table_empty_
  760. assert(IsFragmentedRangeTombstonesConstructed());
  761. return new FragmentedRangeTombstoneIterator(
  762. fragmented_range_tombstone_list_.get(), comparator_.comparator,
  763. read_seq, read_options.timestamp);
  764. }
  765. // takes current cache
  766. std::shared_ptr<FragmentedRangeTombstoneListCache> cache =
  767. #if defined(__cpp_lib_atomic_shared_ptr)
  768. cached_range_tombstone_.Access()->load(std::memory_order_relaxed)
  769. #else
  770. std::atomic_load_explicit(cached_range_tombstone_.Access(),
  771. std::memory_order_relaxed)
  772. #endif
  773. ;
  774. // construct fragmented tombstone list if necessary
  775. if (!cache->initialized.load(std::memory_order_acquire)) {
  776. cache->reader_mutex.lock();
  777. if (!cache->tombstones) {
  778. auto* unfragmented_iter = new MemTableIterator(
  779. MemTableIterator::kRangeDelEntries, *this, read_options);
  780. cache->tombstones.reset(new FragmentedRangeTombstoneList(
  781. std::unique_ptr<InternalIterator>(unfragmented_iter),
  782. comparator_.comparator));
  783. cache->initialized.store(true, std::memory_order_release);
  784. }
  785. cache->reader_mutex.unlock();
  786. }
  787. auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
  788. cache, comparator_.comparator, read_seq, read_options.timestamp);
  789. return fragmented_iter;
  790. }
  791. void MemTable::ConstructFragmentedRangeTombstones() {
  792. // There should be no concurrent Construction.
  793. // We could also check fragmented_range_tombstone_list_ to avoid repeate
  794. // constructions. We just construct them here again to be safe.
  795. if (!is_range_del_table_empty_.LoadRelaxed()) {
  796. // TODO: plumb Env::IOActivity, Env::IOPriority
  797. auto* unfragmented_iter = new MemTableIterator(
  798. MemTableIterator::kRangeDelEntries, *this, ReadOptions());
  799. fragmented_range_tombstone_list_ =
  800. std::make_unique<FragmentedRangeTombstoneList>(
  801. std::unique_ptr<InternalIterator>(unfragmented_iter),
  802. comparator_.comparator);
  803. }
  804. }
  805. port::RWMutex* MemTable::GetLock(const Slice& key) {
  806. return &locks_[GetSliceRangedNPHash(key, locks_.size())];
  807. }
  808. ReadOnlyMemTable::MemTableStats MemTable::ApproximateStats(
  809. const Slice& start_ikey, const Slice& end_ikey) {
  810. uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
  811. entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
  812. if (entry_count == 0) {
  813. return {0, 0};
  814. }
  815. uint64_t n = num_entries_.LoadRelaxed();
  816. if (n == 0) {
  817. return {0, 0};
  818. }
  819. if (entry_count > n) {
  820. // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
  821. // be larger than actual entries we have. Cap it to entries we have to limit
  822. // the inaccuracy.
  823. entry_count = n;
  824. }
  825. uint64_t data_size = data_size_.LoadRelaxed();
  826. return {entry_count * (data_size / n), entry_count};
  827. }
  828. Status MemTable::VerifyEncodedEntry(Slice encoded,
  829. const ProtectionInfoKVOS64& kv_prot_info) {
  830. uint32_t ikey_len = 0;
  831. if (!GetVarint32(&encoded, &ikey_len)) {
  832. return Status::Corruption("Unable to parse internal key length");
  833. }
  834. if (ikey_len < 8 + ts_sz_) {
  835. return Status::Corruption("Internal key length too short");
  836. }
  837. if (ikey_len > encoded.size()) {
  838. return Status::Corruption("Internal key length too long");
  839. }
  840. uint32_t value_len = 0;
  841. const size_t user_key_len = ikey_len - 8;
  842. Slice key(encoded.data(), user_key_len);
  843. encoded.remove_prefix(user_key_len);
  844. uint64_t packed = DecodeFixed64(encoded.data());
  845. ValueType value_type = kMaxValue;
  846. SequenceNumber sequence_number = kMaxSequenceNumber;
  847. UnPackSequenceAndType(packed, &sequence_number, &value_type);
  848. encoded.remove_prefix(8);
  849. if (!GetVarint32(&encoded, &value_len)) {
  850. return Status::Corruption("Unable to parse value length");
  851. }
  852. if (value_len < encoded.size()) {
  853. return Status::Corruption("Value length too short");
  854. }
  855. if (value_len > encoded.size()) {
  856. return Status::Corruption("Value length too long");
  857. }
  858. Slice value(encoded.data(), value_len);
  859. return kv_prot_info.StripS(sequence_number)
  860. .StripKVO(key, value, value_type)
  861. .GetStatus();
  862. }
  863. void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
  864. const Slice& key, const Slice& value,
  865. ValueType type, SequenceNumber s,
  866. char* checksum_ptr) {
  867. if (moptions_.protection_bytes_per_key == 0) {
  868. return;
  869. }
  870. if (kv_prot_info == nullptr) {
  871. ProtectionInfo64()
  872. .ProtectKVO(key, value, type)
  873. .ProtectS(s)
  874. .Encode(static_cast<uint8_t>(moptions_.protection_bytes_per_key),
  875. checksum_ptr);
  876. } else {
  877. kv_prot_info->Encode(
  878. static_cast<uint8_t>(moptions_.protection_bytes_per_key), checksum_ptr);
  879. }
  880. }
  881. Status MemTable::Add(SequenceNumber s, ValueType type,
  882. const Slice& key, /* user key */
  883. const Slice& value,
  884. const ProtectionInfoKVOS64* kv_prot_info,
  885. bool allow_concurrent,
  886. MemTablePostProcessInfo* post_process_info, void** hint) {
  887. // Format of an entry is concatenation of:
  888. // key_size : varint32 of internal_key.size()
  889. // key bytes : char[internal_key.size()]
  890. // value_size : varint32 of value.size()
  891. // value bytes : char[value.size()]
  892. // checksum : char[moptions_.protection_bytes_per_key]
  893. uint32_t key_size = static_cast<uint32_t>(key.size());
  894. uint32_t val_size = static_cast<uint32_t>(value.size());
  895. uint32_t internal_key_size = key_size + 8;
  896. const uint32_t encoded_len = VarintLength(internal_key_size) +
  897. internal_key_size + VarintLength(val_size) +
  898. val_size + moptions_.protection_bytes_per_key;
  899. char* buf = nullptr;
  900. std::unique_ptr<MemTableRep>& table =
  901. type == kTypeRangeDeletion ? range_del_table_ : table_;
  902. KeyHandle handle = table->Allocate(encoded_len, &buf);
  903. char* p = EncodeVarint32(buf, internal_key_size);
  904. memcpy(p, key.data(), key_size);
  905. Slice key_slice(p, key_size);
  906. p += key_size;
  907. uint64_t packed = PackSequenceAndType(s, type);
  908. EncodeFixed64(p, packed);
  909. p += 8;
  910. p = EncodeVarint32(p, val_size);
  911. memcpy(p, value.data(), val_size);
  912. assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) ==
  913. (unsigned)encoded_len);
  914. UpdateEntryChecksum(kv_prot_info, key, value, type, s,
  915. buf + encoded_len - moptions_.protection_bytes_per_key);
  916. Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key);
  917. if (kv_prot_info != nullptr) {
  918. TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded);
  919. Status status = VerifyEncodedEntry(encoded, *kv_prot_info);
  920. if (!status.ok()) {
  921. return status;
  922. }
  923. }
  924. Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_);
  925. if (!allow_concurrent) {
  926. // Extract prefix for insert with hint. Hints are for point key table
  927. // (`table_`) only, not `range_del_table_`.
  928. if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr &&
  929. insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
  930. Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
  931. bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
  932. if (UNLIKELY(!res)) {
  933. return Status::TryAgain("key+seq exists");
  934. }
  935. } else {
  936. bool res = table->InsertKey(handle);
  937. if (UNLIKELY(!res)) {
  938. return Status::TryAgain("key+seq exists");
  939. }
  940. }
  941. // this is a bit ugly, but is the way to avoid locked instructions
  942. // when incrementing an atomic
  943. num_entries_.StoreRelaxed(num_entries_.LoadRelaxed() + 1);
  944. data_size_.StoreRelaxed(data_size_.LoadRelaxed() + encoded_len);
  945. if (type == kTypeDeletion || type == kTypeSingleDeletion ||
  946. type == kTypeDeletionWithTimestamp) {
  947. num_deletes_.StoreRelaxed(num_deletes_.LoadRelaxed() + 1);
  948. } else if (type == kTypeRangeDeletion) {
  949. uint64_t val = num_range_deletes_.LoadRelaxed() + 1;
  950. num_range_deletes_.StoreRelaxed(val);
  951. }
  952. if (bloom_filter_ && prefix_extractor_ &&
  953. prefix_extractor_->InDomain(key_without_ts)) {
  954. bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts));
  955. }
  956. if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
  957. bloom_filter_->Add(key_without_ts);
  958. }
  959. // The first sequence number inserted into the memtable
  960. assert(first_seqno_ == 0 || s >= first_seqno_);
  961. if (first_seqno_ == 0) {
  962. first_seqno_.store(s, std::memory_order_relaxed);
  963. if (earliest_seqno_ == kMaxSequenceNumber) {
  964. earliest_seqno_.store(GetFirstSequenceNumber(),
  965. std::memory_order_relaxed);
  966. }
  967. assert(first_seqno_.load() >= earliest_seqno_.load());
  968. }
  969. assert(post_process_info == nullptr);
  970. // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent`
  971. // is true.
  972. MaybeUpdateNewestUDT(key_slice);
  973. UpdateFlushState();
  974. } else {
  975. bool res = (hint == nullptr)
  976. ? table->InsertKeyConcurrently(handle)
  977. : table->InsertKeyWithHintConcurrently(handle, hint);
  978. if (UNLIKELY(!res)) {
  979. return Status::TryAgain("key+seq exists");
  980. }
  981. assert(post_process_info != nullptr);
  982. post_process_info->num_entries++;
  983. post_process_info->data_size += encoded_len;
  984. if (type == kTypeDeletion) {
  985. post_process_info->num_deletes++;
  986. }
  987. if (bloom_filter_ && prefix_extractor_ &&
  988. prefix_extractor_->InDomain(key_without_ts)) {
  989. bloom_filter_->AddConcurrently(
  990. prefix_extractor_->Transform(key_without_ts));
  991. }
  992. if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
  993. bloom_filter_->AddConcurrently(key_without_ts);
  994. }
  995. // atomically update first_seqno_ and earliest_seqno_.
  996. uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
  997. while ((cur_seq_num == 0 || s < cur_seq_num) &&
  998. !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
  999. }
  1000. uint64_t cur_earliest_seqno =
  1001. earliest_seqno_.load(std::memory_order_relaxed);
  1002. while (
  1003. (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
  1004. !earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
  1005. }
  1006. }
  1007. if (type == kTypeRangeDeletion) {
  1008. auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
  1009. size_t size = cached_range_tombstone_.Size();
  1010. if (allow_concurrent) {
  1011. post_process_info->num_range_deletes++;
  1012. range_del_mutex_.lock();
  1013. }
  1014. for (size_t i = 0; i < size; ++i) {
  1015. #if defined(__cpp_lib_atomic_shared_ptr)
  1016. std::atomic<std::shared_ptr<FragmentedRangeTombstoneListCache>>*
  1017. local_cache_ref_ptr = cached_range_tombstone_.AccessAtCore(i);
  1018. auto new_local_cache_ref = std::make_shared<
  1019. const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
  1020. std::shared_ptr<FragmentedRangeTombstoneListCache> aliased_ptr(
  1021. new_local_cache_ref, new_cache.get());
  1022. local_cache_ref_ptr->store(std::move(aliased_ptr),
  1023. std::memory_order_relaxed);
  1024. #else
  1025. std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
  1026. cached_range_tombstone_.AccessAtCore(i);
  1027. auto new_local_cache_ref = std::make_shared<
  1028. const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
  1029. // It is okay for some reader to load old cache during invalidation as
  1030. // the new sequence number is not published yet.
  1031. // Each core will have a shared_ptr to a shared_ptr to the cached
  1032. // fragmented range tombstones, so that ref count is maintianed locally
  1033. // per-core using the per-core shared_ptr.
  1034. std::atomic_store_explicit(
  1035. local_cache_ref_ptr,
  1036. std::shared_ptr<FragmentedRangeTombstoneListCache>(
  1037. new_local_cache_ref, new_cache.get()),
  1038. std::memory_order_relaxed);
  1039. #endif
  1040. }
  1041. if (allow_concurrent) {
  1042. range_del_mutex_.unlock();
  1043. }
  1044. is_range_del_table_empty_.StoreRelaxed(false);
  1045. }
  1046. UpdateOldestKeyTime();
  1047. TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded);
  1048. return Status::OK();
  1049. }
  1050. // Callback from MemTable::Get()
  1051. namespace {
  1052. struct Saver {
  1053. Status* status;
  1054. const LookupKey* key;
  1055. bool* found_final_value; // Is value set correctly? Used by KeyMayExist
  1056. bool* merge_in_progress;
  1057. std::string* value;
  1058. PinnableWideColumns* columns;
  1059. SequenceNumber seq;
  1060. std::string* timestamp;
  1061. const MergeOperator* merge_operator;
  1062. // the merge operations encountered;
  1063. MergeContext* merge_context;
  1064. SequenceNumber max_covering_tombstone_seq;
  1065. MemTable* mem;
  1066. Logger* logger;
  1067. Statistics* statistics;
  1068. bool inplace_update_support;
  1069. bool do_merge;
  1070. SystemClock* clock;
  1071. ReadCallback* callback_;
  1072. bool* is_blob_index;
  1073. bool allow_data_in_errors;
  1074. uint32_t protection_bytes_per_key;
  1075. bool CheckCallback(SequenceNumber _seq) {
  1076. if (callback_) {
  1077. return callback_->IsVisible(_seq);
  1078. }
  1079. return true;
  1080. }
  1081. };
  1082. } // anonymous namespace
  1083. static bool SaveValue(void* arg, const char* entry) {
  1084. Saver* s = static_cast<Saver*>(arg);
  1085. assert(s != nullptr);
  1086. assert(!s->value || !s->columns);
  1087. assert(!*(s->found_final_value));
  1088. assert(s->status->ok() || s->status->IsMergeInProgress());
  1089. MergeContext* merge_context = s->merge_context;
  1090. SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
  1091. const MergeOperator* merge_operator = s->merge_operator;
  1092. assert(merge_context != nullptr);
  1093. // Refer to comments under MemTable::Add() for entry format.
  1094. // Check that it belongs to same user key.
  1095. uint32_t key_length = 0;
  1096. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  1097. assert(key_length >= 8);
  1098. Slice user_key_slice = Slice(key_ptr, key_length - 8);
  1099. const Comparator* user_comparator =
  1100. s->mem->GetInternalKeyComparator().user_comparator();
  1101. size_t ts_sz = user_comparator->timestamp_size();
  1102. if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
  1103. // timestamp should already be set to range tombstone timestamp
  1104. assert(s->timestamp->size() == ts_sz);
  1105. }
  1106. if (user_comparator->EqualWithoutTimestamp(user_key_slice,
  1107. s->key->user_key())) {
  1108. // Correct user key
  1109. TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry);
  1110. std::optional<ReadLock> read_lock;
  1111. if (s->inplace_update_support) {
  1112. read_lock.emplace(s->mem->GetLock(s->key->user_key()));
  1113. }
  1114. if (s->protection_bytes_per_key > 0) {
  1115. *(s->status) = MemTable::VerifyEntryChecksum(
  1116. entry, s->protection_bytes_per_key, s->allow_data_in_errors);
  1117. if (!s->status->ok()) {
  1118. *(s->found_final_value) = true;
  1119. ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
  1120. // Memtable entry corrupted
  1121. return false;
  1122. }
  1123. }
  1124. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  1125. ValueType type;
  1126. SequenceNumber seq;
  1127. UnPackSequenceAndType(tag, &seq, &type);
  1128. // If the value is not in the snapshot, skip it
  1129. if (!s->CheckCallback(seq)) {
  1130. return true; // to continue to the next seq
  1131. }
  1132. if (s->seq == kMaxSequenceNumber) {
  1133. s->seq = seq;
  1134. if (s->seq > max_covering_tombstone_seq) {
  1135. if (ts_sz && s->timestamp != nullptr) {
  1136. // `timestamp` was set to range tombstone's timestamp before
  1137. // `SaveValue` is ever called. This key has a higher sequence number
  1138. // than range tombstone, and is the key with the highest seqno across
  1139. // all keys with this user_key, so we update timestamp here.
  1140. Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
  1141. s->timestamp->assign(ts.data(), ts_sz);
  1142. }
  1143. } else {
  1144. s->seq = max_covering_tombstone_seq;
  1145. }
  1146. }
  1147. if (ts_sz > 0 && s->timestamp != nullptr) {
  1148. if (!s->timestamp->empty()) {
  1149. assert(ts_sz == s->timestamp->size());
  1150. }
  1151. // TODO optimize for smaller size ts
  1152. const std::string kMaxTs(ts_sz, '\xff');
  1153. if (s->timestamp->empty() ||
  1154. user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
  1155. Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
  1156. s->timestamp->assign(ts.data(), ts_sz);
  1157. }
  1158. }
  1159. if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
  1160. type == kTypeWideColumnEntity || type == kTypeDeletion ||
  1161. type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp ||
  1162. type == kTypeValuePreferredSeqno) &&
  1163. max_covering_tombstone_seq > seq) {
  1164. type = kTypeRangeDeletion;
  1165. }
  1166. switch (type) {
  1167. case kTypeBlobIndex: {
  1168. if (!s->do_merge) {
  1169. *(s->status) = Status::NotSupported(
  1170. "GetMergeOperands not supported by stacked BlobDB");
  1171. *(s->found_final_value) = true;
  1172. return false;
  1173. }
  1174. if (*(s->merge_in_progress)) {
  1175. *(s->status) = Status::NotSupported(
  1176. "Merge operator not supported by stacked BlobDB");
  1177. *(s->found_final_value) = true;
  1178. return false;
  1179. }
  1180. if (s->is_blob_index == nullptr) {
  1181. ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
  1182. *(s->status) = Status::NotSupported(
  1183. "Encountered unexpected blob index. Please open DB with "
  1184. "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
  1185. *(s->found_final_value) = true;
  1186. return false;
  1187. }
  1188. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  1189. *(s->status) = Status::OK();
  1190. if (s->value) {
  1191. s->value->assign(v.data(), v.size());
  1192. } else if (s->columns) {
  1193. s->columns->SetPlainValue(v);
  1194. }
  1195. *(s->found_final_value) = true;
  1196. *(s->is_blob_index) = true;
  1197. return false;
  1198. }
  1199. case kTypeValue:
  1200. case kTypeValuePreferredSeqno: {
  1201. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  1202. if (type == kTypeValuePreferredSeqno) {
  1203. v = ParsePackedValueForValue(v);
  1204. }
  1205. ReadOnlyMemTable::HandleTypeValue(
  1206. s->key->user_key(), v, s->inplace_update_support == false,
  1207. s->do_merge, *(s->merge_in_progress), merge_context,
  1208. s->merge_operator, s->clock, s->statistics, s->logger, s->status,
  1209. s->value, s->columns, s->is_blob_index);
  1210. *(s->found_final_value) = true;
  1211. return false;
  1212. }
  1213. case kTypeWideColumnEntity: {
  1214. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  1215. *(s->status) = Status::OK();
  1216. if (!s->do_merge) {
  1217. // Preserve the value with the goal of returning it as part of
  1218. // raw merge operands to the user
  1219. Slice value_of_default;
  1220. *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
  1221. v, value_of_default);
  1222. if (s->status->ok()) {
  1223. merge_context->PushOperand(
  1224. value_of_default,
  1225. s->inplace_update_support == false /* operand_pinned */);
  1226. }
  1227. } else if (*(s->merge_in_progress)) {
  1228. assert(s->do_merge);
  1229. if (s->value || s->columns) {
  1230. // `op_failure_scope` (an output parameter) is not provided (set
  1231. // to nullptr) since a failure must be propagated regardless of
  1232. // its value.
  1233. *(s->status) = MergeHelper::TimedFullMerge(
  1234. merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue,
  1235. v, merge_context->GetOperands(), s->logger, s->statistics,
  1236. s->clock, /* update_num_ops_stats */ true,
  1237. /* op_failure_scope */ nullptr, s->value, s->columns);
  1238. }
  1239. } else if (s->value) {
  1240. Slice value_of_default;
  1241. *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
  1242. v, value_of_default);
  1243. if (s->status->ok()) {
  1244. s->value->assign(value_of_default.data(), value_of_default.size());
  1245. }
  1246. } else if (s->columns) {
  1247. *(s->status) = s->columns->SetWideColumnValue(v);
  1248. }
  1249. *(s->found_final_value) = true;
  1250. if (s->is_blob_index != nullptr) {
  1251. *(s->is_blob_index) = false;
  1252. }
  1253. return false;
  1254. }
  1255. case kTypeDeletion:
  1256. case kTypeDeletionWithTimestamp:
  1257. case kTypeSingleDeletion:
  1258. case kTypeRangeDeletion: {
  1259. ReadOnlyMemTable::HandleTypeDeletion(
  1260. s->key->user_key(), *(s->merge_in_progress), s->merge_context,
  1261. s->merge_operator, s->clock, s->statistics, s->logger, s->status,
  1262. s->value, s->columns);
  1263. *(s->found_final_value) = true;
  1264. return false;
  1265. }
  1266. case kTypeMerge: {
  1267. Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
  1268. *(s->merge_in_progress) = true;
  1269. *(s->found_final_value) = ReadOnlyMemTable::HandleTypeMerge(
  1270. s->key->user_key(), v, s->inplace_update_support == false,
  1271. s->do_merge, merge_context, s->merge_operator, s->clock,
  1272. s->statistics, s->logger, s->status, s->value, s->columns);
  1273. return !*(s->found_final_value);
  1274. }
  1275. default: {
  1276. std::string msg("Corrupted value not expected.");
  1277. if (s->allow_data_in_errors) {
  1278. msg.append("Unrecognized value type: " +
  1279. std::to_string(static_cast<int>(type)) + ". ");
  1280. msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) +
  1281. ". ");
  1282. msg.append("seq: " + std::to_string(seq) + ".");
  1283. }
  1284. *(s->found_final_value) = true;
  1285. *(s->status) = Status::Corruption(msg.c_str());
  1286. return false;
  1287. }
  1288. }
  1289. }
  1290. // s->state could be Corrupt, merge or notfound
  1291. return false;
  1292. }
  1293. bool MemTable::Get(const LookupKey& key, std::string* value,
  1294. PinnableWideColumns* columns, std::string* timestamp,
  1295. Status* s, MergeContext* merge_context,
  1296. SequenceNumber* max_covering_tombstone_seq,
  1297. SequenceNumber* seq, const ReadOptions& read_opts,
  1298. bool immutable_memtable, ReadCallback* callback,
  1299. bool* is_blob_index, bool do_merge) {
  1300. // The sequence number is updated synchronously in version_set.h
  1301. if (IsEmpty()) {
  1302. // Avoiding recording stats for speed.
  1303. return false;
  1304. }
  1305. PERF_TIMER_GUARD(get_from_memtable_time);
  1306. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  1307. NewRangeTombstoneIterator(read_opts,
  1308. GetInternalKeySeqno(key.internal_key()),
  1309. immutable_memtable));
  1310. if (range_del_iter != nullptr) {
  1311. SequenceNumber covering_seq =
  1312. range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
  1313. if (covering_seq > *max_covering_tombstone_seq) {
  1314. *max_covering_tombstone_seq = covering_seq;
  1315. if (timestamp) {
  1316. // Will be overwritten in SaveValue() if there is a point key with
  1317. // a higher seqno.
  1318. timestamp->assign(range_del_iter->timestamp().data(),
  1319. range_del_iter->timestamp().size());
  1320. }
  1321. }
  1322. }
  1323. bool found_final_value = false;
  1324. bool merge_in_progress = s->IsMergeInProgress();
  1325. bool may_contain = true;
  1326. Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_);
  1327. bool bloom_checked = false;
  1328. if (bloom_filter_) {
  1329. // when both memtable_whole_key_filtering and prefix_extractor_ are set,
  1330. // only do whole key filtering for Get() to save CPU
  1331. if (moptions_.memtable_whole_key_filtering) {
  1332. may_contain = bloom_filter_->MayContain(user_key_without_ts);
  1333. bloom_checked = true;
  1334. } else {
  1335. assert(prefix_extractor_);
  1336. if (prefix_extractor_->InDomain(user_key_without_ts)) {
  1337. may_contain = bloom_filter_->MayContain(
  1338. prefix_extractor_->Transform(user_key_without_ts));
  1339. bloom_checked = true;
  1340. }
  1341. }
  1342. }
  1343. if (bloom_filter_ && !may_contain) {
  1344. // iter is null if prefix bloom says the key does not exist
  1345. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  1346. *seq = kMaxSequenceNumber;
  1347. } else {
  1348. if (bloom_checked) {
  1349. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  1350. }
  1351. GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
  1352. is_blob_index, value, columns, timestamp, s, merge_context,
  1353. seq, &found_final_value, &merge_in_progress);
  1354. }
  1355. // No change to value, since we have not yet found a Put/Delete
  1356. // Propagate corruption error
  1357. if (!found_final_value && merge_in_progress) {
  1358. if (s->ok()) {
  1359. *s = Status::MergeInProgress();
  1360. } else {
  1361. assert(s->IsMergeInProgress());
  1362. }
  1363. }
  1364. PERF_COUNTER_ADD(get_from_memtable_count, 1);
  1365. return found_final_value;
  1366. }
  1367. void MemTable::GetFromTable(const LookupKey& key,
  1368. SequenceNumber max_covering_tombstone_seq,
  1369. bool do_merge, ReadCallback* callback,
  1370. bool* is_blob_index, std::string* value,
  1371. PinnableWideColumns* columns,
  1372. std::string* timestamp, Status* s,
  1373. MergeContext* merge_context, SequenceNumber* seq,
  1374. bool* found_final_value, bool* merge_in_progress) {
  1375. Saver saver;
  1376. saver.status = s;
  1377. saver.found_final_value = found_final_value;
  1378. saver.merge_in_progress = merge_in_progress;
  1379. saver.key = &key;
  1380. saver.value = value;
  1381. saver.columns = columns;
  1382. saver.timestamp = timestamp;
  1383. saver.seq = kMaxSequenceNumber;
  1384. saver.mem = this;
  1385. saver.merge_context = merge_context;
  1386. saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
  1387. saver.merge_operator = moptions_.merge_operator;
  1388. saver.logger = moptions_.info_log;
  1389. saver.inplace_update_support = moptions_.inplace_update_support;
  1390. saver.statistics = moptions_.statistics;
  1391. saver.clock = clock_;
  1392. saver.callback_ = callback;
  1393. saver.is_blob_index = is_blob_index;
  1394. saver.do_merge = do_merge;
  1395. saver.allow_data_in_errors = moptions_.allow_data_in_errors;
  1396. saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
  1397. if (!moptions_.paranoid_memory_checks &&
  1398. !moptions_.memtable_veirfy_per_key_checksum_on_seek) {
  1399. table_->Get(key, &saver, SaveValue);
  1400. } else {
  1401. Status check_s = table_->GetAndValidate(
  1402. key, &saver, SaveValue, moptions_.allow_data_in_errors,
  1403. moptions_.paranoid_memory_checks, key_validation_callback_);
  1404. if (check_s.IsCorruption()) {
  1405. *(saver.status) = check_s;
  1406. // Should stop searching the LSM.
  1407. *(saver.found_final_value) = true;
  1408. }
  1409. }
  1410. assert(s->ok() || s->IsMergeInProgress() || *found_final_value);
  1411. *seq = saver.seq;
  1412. }
  1413. Status MemTable::ValidateKey(const char* key, bool allow_data_in_errors) {
  1414. return VerifyEntryChecksum(key, moptions_.protection_bytes_per_key,
  1415. allow_data_in_errors);
  1416. }
  1417. void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  1418. ReadCallback* callback, bool immutable_memtable) {
  1419. // The sequence number is updated synchronously in version_set.h
  1420. if (IsEmpty()) {
  1421. // Avoiding recording stats for speed.
  1422. return;
  1423. }
  1424. PERF_TIMER_GUARD(get_from_memtable_time);
  1425. // For now, memtable Bloom filter is effectively disabled if there are any
  1426. // range tombstones. This is the simplest way to ensure range tombstones are
  1427. // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0
  1428. bool no_range_del = read_options.ignore_range_deletions ||
  1429. is_range_del_table_empty_.LoadRelaxed();
  1430. MultiGetRange temp_range(*range, range->begin(), range->end());
  1431. if (bloom_filter_ && no_range_del) {
  1432. bool whole_key =
  1433. !prefix_extractor_ || moptions_.memtable_whole_key_filtering;
  1434. std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
  1435. std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
  1436. std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
  1437. int num_keys = 0;
  1438. for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
  1439. if (whole_key) {
  1440. bloom_keys[num_keys] = iter->ukey_without_ts;
  1441. range_indexes[num_keys++] = iter.index();
  1442. } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
  1443. bloom_keys[num_keys] =
  1444. prefix_extractor_->Transform(iter->ukey_without_ts);
  1445. range_indexes[num_keys++] = iter.index();
  1446. }
  1447. }
  1448. bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data());
  1449. for (int i = 0; i < num_keys; ++i) {
  1450. if (!may_match[i]) {
  1451. temp_range.SkipIndex(range_indexes[i]);
  1452. PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
  1453. } else {
  1454. PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
  1455. }
  1456. }
  1457. }
  1458. for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
  1459. bool found_final_value{false};
  1460. bool merge_in_progress = iter->s->IsMergeInProgress();
  1461. if (!no_range_del) {
  1462. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  1463. NewRangeTombstoneIteratorInternal(
  1464. read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
  1465. immutable_memtable));
  1466. SequenceNumber covering_seq =
  1467. range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
  1468. if (covering_seq > iter->max_covering_tombstone_seq) {
  1469. iter->max_covering_tombstone_seq = covering_seq;
  1470. if (iter->timestamp) {
  1471. // Will be overwritten in SaveValue() if there is a point key with
  1472. // a higher seqno.
  1473. iter->timestamp->assign(range_del_iter->timestamp().data(),
  1474. range_del_iter->timestamp().size());
  1475. }
  1476. }
  1477. }
  1478. SequenceNumber dummy_seq;
  1479. GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
  1480. callback, &iter->is_blob_index,
  1481. iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
  1482. iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
  1483. &found_final_value, &merge_in_progress);
  1484. if (!found_final_value && merge_in_progress) {
  1485. if (iter->s->ok()) {
  1486. *(iter->s) = Status::MergeInProgress();
  1487. } else {
  1488. assert(iter->s->IsMergeInProgress());
  1489. }
  1490. }
  1491. if (found_final_value ||
  1492. (!iter->s->ok() && !iter->s->IsMergeInProgress())) {
  1493. // `found_final_value` should be set if an error/corruption occurs.
  1494. // The check on iter->s is just there in case GetFromTable() did not
  1495. // set `found_final_value` properly.
  1496. assert(found_final_value);
  1497. if (iter->value) {
  1498. iter->value->PinSelf();
  1499. range->AddValueSize(iter->value->size());
  1500. } else {
  1501. assert(iter->columns);
  1502. range->AddValueSize(iter->columns->serialized_size());
  1503. }
  1504. range->MarkKeyDone(iter);
  1505. RecordTick(moptions_.statistics, MEMTABLE_HIT);
  1506. if (range->GetValueSize() > read_options.value_size_soft_limit) {
  1507. // Set all remaining keys in range to Abort
  1508. for (auto range_iter = range->begin(); range_iter != range->end();
  1509. ++range_iter) {
  1510. range->MarkKeyDone(range_iter);
  1511. *(range_iter->s) = Status::Aborted();
  1512. }
  1513. break;
  1514. }
  1515. }
  1516. }
  1517. PERF_COUNTER_ADD(get_from_memtable_count, 1);
  1518. }
  1519. Status MemTable::Update(SequenceNumber seq, ValueType value_type,
  1520. const Slice& key, const Slice& value,
  1521. const ProtectionInfoKVOS64* kv_prot_info) {
  1522. LookupKey lkey(key, seq);
  1523. Slice mem_key = lkey.memtable_key();
  1524. std::unique_ptr<MemTableRep::Iterator> iter(
  1525. table_->GetDynamicPrefixIterator());
  1526. iter->Seek(lkey.internal_key(), mem_key.data());
  1527. if (iter->Valid()) {
  1528. // Refer to comments under MemTable::Add() for entry format.
  1529. // Check that it belongs to same user key. We do not check the
  1530. // sequence number since the Seek() call above should have skipped
  1531. // all entries with overly large sequence numbers.
  1532. const char* entry = iter->key();
  1533. uint32_t key_length = 0;
  1534. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  1535. if (comparator_.comparator.user_comparator()->Equal(
  1536. Slice(key_ptr, key_length - 8), lkey.user_key())) {
  1537. // Correct user key
  1538. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  1539. ValueType type;
  1540. SequenceNumber existing_seq;
  1541. UnPackSequenceAndType(tag, &existing_seq, &type);
  1542. assert(existing_seq != seq);
  1543. if (type == value_type) {
  1544. Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
  1545. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  1546. uint32_t new_size = static_cast<uint32_t>(value.size());
  1547. // Update value, if new value size <= previous value size
  1548. if (new_size <= prev_size) {
  1549. WriteLock wl(GetLock(lkey.user_key()));
  1550. char* p =
  1551. EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
  1552. memcpy(p, value.data(), value.size());
  1553. assert((unsigned)((p + value.size()) - entry) ==
  1554. (unsigned)(VarintLength(key_length) + key_length +
  1555. VarintLength(value.size()) + value.size()));
  1556. RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
  1557. if (kv_prot_info != nullptr) {
  1558. ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
  1559. // `seq` is swallowed and `existing_seq` prevails.
  1560. updated_kv_prot_info.UpdateS(seq, existing_seq);
  1561. UpdateEntryChecksum(&updated_kv_prot_info, key, value, type,
  1562. existing_seq, p + value.size());
  1563. Slice encoded(entry, p + value.size() - entry);
  1564. return VerifyEncodedEntry(encoded, updated_kv_prot_info);
  1565. } else {
  1566. UpdateEntryChecksum(nullptr, key, value, type, existing_seq,
  1567. p + value.size());
  1568. }
  1569. return Status::OK();
  1570. }
  1571. }
  1572. }
  1573. }
  1574. // The latest value is not value_type or key doesn't exist
  1575. return Add(seq, value_type, key, value, kv_prot_info);
  1576. }
  1577. Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
  1578. const Slice& delta,
  1579. const ProtectionInfoKVOS64* kv_prot_info) {
  1580. LookupKey lkey(key, seq);
  1581. Slice memkey = lkey.memtable_key();
  1582. std::unique_ptr<MemTableRep::Iterator> iter(
  1583. table_->GetDynamicPrefixIterator());
  1584. iter->Seek(lkey.internal_key(), memkey.data());
  1585. if (iter->Valid()) {
  1586. // Refer to comments under MemTable::Add() for entry format.
  1587. // Check that it belongs to same user key. We do not check the
  1588. // sequence number since the Seek() call above should have skipped
  1589. // all entries with overly large sequence numbers.
  1590. const char* entry = iter->key();
  1591. uint32_t key_length = 0;
  1592. const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  1593. if (comparator_.comparator.user_comparator()->Equal(
  1594. Slice(key_ptr, key_length - 8), lkey.user_key())) {
  1595. // Correct user key
  1596. const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  1597. ValueType type;
  1598. uint64_t existing_seq;
  1599. UnPackSequenceAndType(tag, &existing_seq, &type);
  1600. if (type == kTypeValue) {
  1601. Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
  1602. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  1603. char* prev_buffer = const_cast<char*>(prev_value.data());
  1604. uint32_t new_prev_size = prev_size;
  1605. std::string str_value;
  1606. WriteLock wl(GetLock(lkey.user_key()));
  1607. auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
  1608. delta, &str_value);
  1609. if (status == UpdateStatus::UPDATED_INPLACE) {
  1610. // Value already updated by callback.
  1611. assert(new_prev_size <= prev_size);
  1612. if (new_prev_size < prev_size) {
  1613. // overwrite the new prev_size
  1614. char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
  1615. new_prev_size);
  1616. if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
  1617. // shift the value buffer as well.
  1618. memcpy(p, prev_buffer, new_prev_size);
  1619. prev_buffer = p;
  1620. }
  1621. }
  1622. RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
  1623. UpdateFlushState();
  1624. Slice new_value(prev_buffer, new_prev_size);
  1625. if (kv_prot_info != nullptr) {
  1626. ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
  1627. // `seq` is swallowed and `existing_seq` prevails.
  1628. updated_kv_prot_info.UpdateS(seq, existing_seq);
  1629. updated_kv_prot_info.UpdateV(delta, new_value);
  1630. Slice encoded(entry, prev_buffer + new_prev_size - entry);
  1631. UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type,
  1632. existing_seq, prev_buffer + new_prev_size);
  1633. return VerifyEncodedEntry(encoded, updated_kv_prot_info);
  1634. } else {
  1635. UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq,
  1636. prev_buffer + new_prev_size);
  1637. }
  1638. return Status::OK();
  1639. } else if (status == UpdateStatus::UPDATED) {
  1640. Status s;
  1641. if (kv_prot_info != nullptr) {
  1642. ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
  1643. updated_kv_prot_info.UpdateV(delta, str_value);
  1644. s = Add(seq, kTypeValue, key, Slice(str_value),
  1645. &updated_kv_prot_info);
  1646. } else {
  1647. s = Add(seq, kTypeValue, key, Slice(str_value),
  1648. nullptr /* kv_prot_info */);
  1649. }
  1650. RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
  1651. UpdateFlushState();
  1652. return s;
  1653. } else if (status == UpdateStatus::UPDATE_FAILED) {
  1654. // `UPDATE_FAILED` is named incorrectly. It indicates no update
  1655. // happened. It does not indicate a failure happened.
  1656. UpdateFlushState();
  1657. return Status::OK();
  1658. }
  1659. }
  1660. }
  1661. }
  1662. // The latest value is not `kTypeValue` or key doesn't exist
  1663. return Status::NotFound();
  1664. }
  1665. size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key,
  1666. size_t limit) {
  1667. Slice memkey = key.memtable_key();
  1668. // A total ordered iterator is costly for some memtablerep (prefix aware
  1669. // reps). By passing in the user key, we allow efficient iterator creation.
  1670. // The iterator only needs to be ordered within the same user key.
  1671. std::unique_ptr<MemTableRep::Iterator> iter(
  1672. table_->GetDynamicPrefixIterator());
  1673. iter->Seek(key.internal_key(), memkey.data());
  1674. size_t num_successive_merges = 0;
  1675. for (; iter->Valid() && num_successive_merges < limit; iter->Next()) {
  1676. const char* entry = iter->key();
  1677. uint32_t key_length = 0;
  1678. const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  1679. if (!comparator_.comparator.user_comparator()->Equal(
  1680. Slice(iter_key_ptr, key_length - 8), key.user_key())) {
  1681. break;
  1682. }
  1683. const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
  1684. ValueType type;
  1685. uint64_t unused;
  1686. UnPackSequenceAndType(tag, &unused, &type);
  1687. if (type != kTypeMerge) {
  1688. break;
  1689. }
  1690. ++num_successive_merges;
  1691. }
  1692. return num_successive_merges;
  1693. }
  1694. void MemTableRep::Get(const LookupKey& k, void* callback_args,
  1695. bool (*callback_func)(void* arg, const char* entry)) {
  1696. auto iter = GetDynamicPrefixIterator();
  1697. for (iter->Seek(k.internal_key(), k.memtable_key().data());
  1698. iter->Valid() && callback_func(callback_args, iter->key());
  1699. iter->Next()) {
  1700. }
  1701. }
  1702. void MemTable::RefLogContainingPrepSection(uint64_t log) {
  1703. assert(log > 0);
  1704. auto cur = min_prep_log_referenced_.load();
  1705. while ((log < cur || cur == 0) &&
  1706. !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
  1707. cur = min_prep_log_referenced_.load();
  1708. }
  1709. }
  1710. uint64_t MemTable::GetMinLogContainingPrepSection() {
  1711. return min_prep_log_referenced_.load();
  1712. }
  1713. void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
  1714. if (ts_sz_ == 0) {
  1715. return;
  1716. }
  1717. const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
  1718. Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_);
  1719. if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) {
  1720. newest_udt_ = udt;
  1721. }
  1722. }
  1723. const Slice& MemTable::GetNewestUDT() const {
  1724. assert(ts_sz_ > 0);
  1725. return newest_udt_;
  1726. }
  1727. } // namespace ROCKSDB_NAMESPACE