memtable.h 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <deque>
  11. #include <functional>
  12. #include <memory>
  13. #include <string>
  14. #include <unordered_set>
  15. #include <vector>
  16. #include "db/dbformat.h"
  17. #include "db/kv_checksum.h"
  18. #include "db/merge_helper.h"
  19. #include "db/range_tombstone_fragmenter.h"
  20. #include "db/read_callback.h"
  21. #include "db/seqno_to_time_mapping.h"
  22. #include "db/version_edit.h"
  23. #include "memory/allocator.h"
  24. #include "memory/concurrent_arena.h"
  25. #include "monitoring/instrumented_mutex.h"
  26. #include "options/cf_options.h"
  27. #include "rocksdb/db.h"
  28. #include "rocksdb/memtablerep.h"
  29. #include "table/multiget_context.h"
  30. #include "util/atomic.h"
  31. #include "util/cast_util.h"
  32. #include "util/dynamic_bloom.h"
  33. #include "util/hash.h"
  34. #include "util/hash_containers.h"
  35. namespace ROCKSDB_NAMESPACE {
  36. struct FlushJobInfo;
  37. class Mutex;
  38. class MemTableIterator;
  39. class MergeContext;
  40. class SystemClock;
  41. struct ImmutableMemTableOptions {
  42. explicit ImmutableMemTableOptions(const ImmutableOptions& ioptions,
  43. const MutableCFOptions& mutable_cf_options);
  44. size_t arena_block_size;
  45. uint32_t memtable_prefix_bloom_bits;
  46. size_t memtable_huge_page_size;
  47. bool memtable_whole_key_filtering;
  48. bool inplace_update_support;
  49. size_t inplace_update_num_locks;
  50. UpdateStatus (*inplace_callback)(char* existing_value,
  51. uint32_t* existing_value_size,
  52. Slice delta_value,
  53. std::string* merged_value);
  54. size_t max_successive_merges;
  55. bool strict_max_successive_merges;
  56. Statistics* statistics;
  57. MergeOperator* merge_operator;
  58. Logger* info_log;
  59. uint32_t protection_bytes_per_key;
  60. bool allow_data_in_errors;
  61. bool paranoid_memory_checks;
  62. bool memtable_veirfy_per_key_checksum_on_seek;
  63. };
  64. // Batched counters to updated when inserting keys in one write batch.
  65. // In post process of the write batch, these can be updated together.
  66. // Only used in concurrent memtable insert case.
  67. struct MemTablePostProcessInfo {
  68. uint64_t data_size = 0;
  69. uint64_t num_entries = 0;
  70. uint64_t num_deletes = 0;
  71. uint64_t num_range_deletes = 0;
  72. };
  73. using MultiGetRange = MultiGetContext::Range;
  74. // For each CF, rocksdb maintains an active memtable that accept writes,
  75. // and zero or more sealed memtables that we call immutable memtables.
  76. // This interface contains all methods required for immutable memtables.
  77. // MemTable class inherit from `ReadOnlyMemTable` and implements additional
  78. // methods required for active memtables.
  79. // Immutable memtable list (MemTableList) maintains a list of ReadOnlyMemTable
  80. // objects. This interface enables feature like direct ingestion of an
  81. // immutable memtable with custom implementation, bypassing memtable writes.
  82. //
  83. // Note: Many of the methods in this class have comments indicating that
  84. // external synchronization is required as these methods are not thread-safe.
  85. // It is up to higher layers of code to decide how to prevent concurrent
  86. // invocation of these methods. This is usually done by acquiring either
  87. // the db mutex or the single writer thread.
  88. //
  89. // Some of these methods are documented to only require external
  90. // synchronization if this memtable is immutable. Calling MarkImmutable() is
  91. // not sufficient to guarantee immutability. It is up to higher layers of
  92. // code to determine if this MemTable can still be modified by other threads.
  93. // Eg: The Superversion stores a pointer to the current MemTable (that can
  94. // be modified) and a separate list of the MemTables that can no longer be
  95. // written to (aka the 'immutable memtables').
  96. //
  97. // MemTables are reference counted. The initial reference count
  98. // is zero and the caller must call Ref() at least once.
  99. class ReadOnlyMemTable {
  100. public:
  101. // Do not delete this MemTable unless Unref() indicates it not in use.
  102. virtual ~ReadOnlyMemTable() = default;
  103. virtual const char* Name() const = 0;
  104. // Returns an estimate of the number of bytes of data in use by this
  105. // data structure.
  106. //
  107. // REQUIRES: external synchronization to prevent simultaneous
  108. // operations on the same MemTable (unless this Memtable is immutable).
  109. virtual size_t ApproximateMemoryUsage() = 0;
  110. // used by MemTableListVersion::MemoryAllocatedBytesExcludingLast
  111. virtual size_t MemoryAllocatedBytes() const = 0;
  112. // Returns a vector of unique random memtable entries of size 'sample_size'.
  113. //
  114. // Note: the entries are stored in the unordered_set as length-prefixed keys,
  115. // hence their representation in the set as "const char*".
  116. // Note2: the size of the output set 'entries' is not enforced to be strictly
  117. // equal to 'target_sample_size'. Its final size might be slightly
  118. // greater or slightly less than 'target_sample_size'
  119. //
  120. // REQUIRES: external synchronization to prevent simultaneous
  121. // operations on the same MemTable (unless this Memtable is immutable).
  122. // REQUIRES: SkipList memtable representation. This function is not
  123. // implemented for any other type of memtable representation (vectorrep,
  124. // hashskiplist,...).
  125. virtual void UniqueRandomSample(const uint64_t& target_sample_size,
  126. std::unordered_set<const char*>* entries) = 0;
  127. // Return an iterator that yields the contents of the memtable.
  128. //
  129. // The caller must ensure that the underlying MemTable remains live
  130. // while the returned iterator is live. The keys returned by this
  131. // iterator are internal keys encoded by AppendInternalKey in the
  132. // db/dbformat.{h,cc} module.
  133. //
  134. // By default, it returns an iterator for prefix seek if prefix_extractor
  135. // is configured in Options.
  136. // arena: If not null, the arena needs to be used to allocate the Iterator.
  137. // Calling ~Iterator of the iterator will destroy all the states but
  138. // those allocated in arena.
  139. // seqno_to_time_mapping: it's used to support return write unix time for the
  140. // data, currently only needed for iterators serving user reads.
  141. virtual InternalIterator* NewIterator(
  142. const ReadOptions& read_options,
  143. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  144. const SliceTransform* prefix_extractor, bool for_flush) = 0;
  145. // Returns an iterator that wraps a MemTableIterator and logically strips the
  146. // user-defined timestamp of each key. This API is only used by flush when
  147. // user-defined timestamps in MemTable only feature is enabled.
  148. virtual InternalIterator* NewTimestampStrippingIterator(
  149. const ReadOptions& read_options,
  150. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  151. const SliceTransform* prefix_extractor, size_t ts_sz) = 0;
  152. // Returns an iterator that yields the range tombstones of the memtable.
  153. // The caller must ensure that the underlying MemTable remains live
  154. // while the returned iterator is live.
  155. // @param immutable_memtable Whether this memtable is an immutable memtable.
  156. // This information is not stored in memtable itself, so it needs to be
  157. // specified by the caller. This flag is used internally to decide whether a
  158. // cached fragmented range tombstone list can be returned. This cached version
  159. // is constructed when a memtable becomes immutable. Setting the flag to false
  160. // will always yield correct result, but may incur performance penalty as it
  161. // always creates a new fragmented range tombstone list.
  162. virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
  163. const ReadOptions& read_options, SequenceNumber read_seq,
  164. bool immutable_memtable) = 0;
  165. // Returns an iterator that yields the range tombstones of the memtable and
  166. // logically strips the user-defined timestamp of each key (including start
  167. // key, and end key). This API is only used by flush when user-defined
  168. // timestamps in MemTable only feature is enabled.
  169. virtual FragmentedRangeTombstoneIterator*
  170. NewTimestampStrippingRangeTombstoneIterator(const ReadOptions& read_options,
  171. SequenceNumber read_seq,
  172. size_t ts_sz) = 0;
  173. // Used to get value associated with `key`, or Merge operands associated
  174. // with key, or get the latest sequence number of `key` (e.g. transaction
  175. // conflict checking).
  176. //
  177. // Keys are considered if they are no smaller than the parameter `key` in
  178. // the order defined by comparator and share the save user key with `key`.
  179. //
  180. // If do_merge = true the default behavior which is Get value for key is
  181. // executed. Expected behavior is described right below.
  182. // If memtable contains a value for key, store it in *value and return true.
  183. // If memtable contains a deletion for key, store NotFound() in *status and
  184. // return true.
  185. // If memtable contains Merge operation as the most recent entry for a key,
  186. // and the merge process does not stop (not reaching a value or delete),
  187. // prepend the current merge operand to *operands.
  188. // store MergeInProgress in s, and return false.
  189. // If an unexpected error or corruption occurs, store Corruption() or other
  190. // error in *status and return true.
  191. // Else, return false.
  192. // If any operation was found, its most recent sequence number
  193. // will be stored in *seq on success (regardless of whether true/false is
  194. // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
  195. // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
  196. // status returned indicates a corruption or other unexpected error.
  197. //
  198. // If do_merge = false then any Merge Operands encountered for key are simply
  199. // stored in merge_context.operands_list and never actually merged to get a
  200. // final value. The raw Merge Operands are eventually returned to the user.
  201. // @param value If not null and memtable contains a value for key, `value`
  202. // will be set to the result value.
  203. // @param column If not null and memtable contains a value/WideColumn for key,
  204. // `column` will be set to the result value/WideColumn.
  205. // Note: only one of `value` and `column` can be non-nullptr.
  206. // To only query for key existence or the latest sequence number of a key,
  207. // `value` and `column` can be both nullptr. In this case, returned status can
  208. // be OK, NotFound or MergeInProgress if a key is found.
  209. // @param immutable_memtable Whether this memtable is immutable. Used
  210. // internally by NewRangeTombstoneIterator(). See comment above
  211. // NewRangeTombstoneIterator() for more detail.
  212. virtual bool Get(const LookupKey& key, std::string* value,
  213. PinnableWideColumns* columns, std::string* timestamp,
  214. Status* s, MergeContext* merge_context,
  215. SequenceNumber* max_covering_tombstone_seq,
  216. SequenceNumber* seq, const ReadOptions& read_opts,
  217. bool immutable_memtable, ReadCallback* callback = nullptr,
  218. bool* is_blob_index = nullptr, bool do_merge = true) = 0;
  219. bool Get(const LookupKey& key, std::string* value,
  220. PinnableWideColumns* columns, std::string* timestamp, Status* s,
  221. MergeContext* merge_context,
  222. SequenceNumber* max_covering_tombstone_seq,
  223. const ReadOptions& read_opts, bool immutable_memtable,
  224. ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
  225. bool do_merge = true) {
  226. SequenceNumber seq;
  227. return Get(key, value, columns, timestamp, s, merge_context,
  228. max_covering_tombstone_seq, &seq, read_opts, immutable_memtable,
  229. callback, is_blob_index, do_merge);
  230. }
  231. // @param immutable_memtable Whether this memtable is immutable. Used
  232. // internally by NewRangeTombstoneIterator(). See comment above
  233. // NewRangeTombstoneIterator() for more detail.
  234. virtual void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  235. ReadCallback* callback, bool immutable_memtable) = 0;
  236. // Get total number of entries in the mem table.
  237. // REQUIRES: external synchronization to prevent simultaneous
  238. // operations on the same MemTable (unless this Memtable is immutable).
  239. virtual uint64_t NumEntries() const = 0;
  240. // Get total number of point deletes in the mem table.
  241. // REQUIRES: external synchronization to prevent simultaneous
  242. // operations on the same MemTable (unless this Memtable is immutable).
  243. virtual uint64_t NumDeletion() const = 0;
  244. // Get total number of range deletions in the mem table.
  245. // REQUIRES: external synchronization to prevent simultaneous
  246. // operations on the same MemTable (unless this Memtable is immutable).
  247. virtual uint64_t NumRangeDeletion() const = 0;
  248. virtual uint64_t GetDataSize() const = 0;
  249. // Returns the sequence number of the first element that was inserted
  250. // into the memtable.
  251. // REQUIRES: external synchronization to prevent simultaneous
  252. // operations on the same MemTable (unless this Memtable is immutable).
  253. virtual SequenceNumber GetFirstSequenceNumber() = 0;
  254. // Returns if there is no entry inserted to the mem table.
  255. // REQUIRES: external synchronization to prevent simultaneous
  256. // operations on the same MemTable (unless this Memtable is immutable).
  257. virtual bool IsEmpty() const = 0;
  258. // Returns the sequence number that is guaranteed to be smaller than or equal
  259. // to the sequence number of any key that could be inserted into this
  260. // memtable. It can then be assumed that any write with a larger(or equal)
  261. // sequence number will be present in this memtable or a later memtable.
  262. //
  263. // If the earliest sequence number could not be determined,
  264. // kMaxSequenceNumber will be returned.
  265. virtual SequenceNumber GetEarliestSequenceNumber() = 0;
  266. virtual uint64_t GetMinLogContainingPrepSection() = 0;
  267. // Notify the underlying storage that no more items will be added.
  268. // REQUIRES: external synchronization to prevent simultaneous
  269. // operations on the same MemTable.
  270. // After MarkImmutable() is called, you should not attempt to
  271. // write anything to this MemTable(). (Ie. do not call Add() or Update()).
  272. virtual void MarkImmutable() = 0;
  273. // Notify the underlying storage that all data it contained has been
  274. // persisted.
  275. // REQUIRES: external synchronization to prevent simultaneous
  276. // operations on the same MemTable.
  277. virtual void MarkFlushed() = 0;
  278. struct MemTableStats {
  279. uint64_t size;
  280. uint64_t count;
  281. };
  282. virtual MemTableStats ApproximateStats(const Slice& start_ikey,
  283. const Slice& end_ikey) = 0;
  284. virtual const InternalKeyComparator& GetInternalKeyComparator() const = 0;
  285. virtual uint64_t ApproximateOldestKeyTime() const = 0;
  286. // Returns whether a fragmented range tombstone list is already constructed
  287. // for this memtable. It should be constructed right before a memtable is
  288. // added to an immutable memtable list. Note that if a memtable does not have
  289. // any range tombstone, then no range tombstone list will ever be constructed
  290. // and true is returned in that case.
  291. virtual bool IsFragmentedRangeTombstonesConstructed() const = 0;
  292. // Get the newest user-defined timestamp contained in this MemTable. Check
  293. // `newest_udt_` for what newer means. This method should only be invoked for
  294. // an MemTable that has enabled user-defined timestamp feature and set
  295. // `persist_user_defined_timestamps` to false. The tracked newest UDT will be
  296. // used by flush job in the background to help check the MemTable's
  297. // eligibility for Flush.
  298. virtual const Slice& GetNewestUDT() const = 0;
  299. // Increase reference count.
  300. // REQUIRES: external synchronization to prevent simultaneous
  301. // operations on the same MemTable.
  302. void Ref() { ++refs_; }
  303. // Drop reference count.
  304. // If the refcount goes to zero return this memtable, otherwise return null.
  305. // REQUIRES: external synchronization to prevent simultaneous
  306. // operations on the same MemTable.
  307. ReadOnlyMemTable* Unref() {
  308. --refs_;
  309. assert(refs_ >= 0);
  310. if (refs_ <= 0) {
  311. return this;
  312. }
  313. return nullptr;
  314. }
  315. // Returns the edits area that is needed for flushing the memtable
  316. VersionEdit* GetEdits() { return &edit_; }
  317. // Returns the next active logfile number when this memtable is about to
  318. // be flushed to storage
  319. // REQUIRES: external synchronization to prevent simultaneous
  320. // operations on the same MemTable.
  321. uint64_t GetNextLogNumber() const { return mem_next_walfile_number_; }
  322. // Sets the next active logfile number when this memtable is about to
  323. // be flushed to storage
  324. // REQUIRES: external synchronization to prevent simultaneous
  325. // operations on the same MemTable.
  326. void SetNextLogNumber(uint64_t num) { mem_next_walfile_number_ = num; }
  327. // REQUIRES: db_mutex held.
  328. void SetID(uint64_t id) { id_ = id; }
  329. uint64_t GetID() const { return id_; }
  330. void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
  331. uint64_t GetFileNumber() const { return file_number_; }
  332. void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
  333. void SetFlushInProgress(bool in_progress) {
  334. flush_in_progress_ = in_progress;
  335. }
  336. void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
  337. flush_job_info_ = std::move(info);
  338. }
  339. std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
  340. return std::move(flush_job_info_);
  341. }
  342. static void HandleTypeValue(
  343. const Slice& lookup_user_key, const Slice& value, bool value_pinned,
  344. bool do_merge, bool merge_in_progress, MergeContext* merge_context,
  345. const MergeOperator* merge_operator, SystemClock* clock,
  346. Statistics* statistics, Logger* info_log, Status* s,
  347. std::string* out_value, PinnableWideColumns* out_columns,
  348. bool* is_blob_index) {
  349. *s = Status::OK();
  350. if (!do_merge) {
  351. // Preserve the value with the goal of returning it as part of
  352. // raw merge operands to the user
  353. // TODO(yanqin) update MergeContext so that timestamps information
  354. // can also be retained.
  355. merge_context->PushOperand(value, value_pinned);
  356. } else if (merge_in_progress) {
  357. // `op_failure_scope` (an output parameter) is not provided (set to
  358. // nullptr) since a failure must be propagated regardless of its
  359. // value.
  360. if (out_value || out_columns) {
  361. *s = MergeHelper::TimedFullMerge(
  362. merge_operator, lookup_user_key, MergeHelper::kPlainBaseValue,
  363. value, merge_context->GetOperands(), info_log, statistics, clock,
  364. /* update_num_ops_stats */ true,
  365. /* op_failure_scope */ nullptr, out_value, out_columns);
  366. }
  367. } else if (out_value) {
  368. out_value->assign(value.data(), value.size());
  369. } else if (out_columns) {
  370. out_columns->SetPlainValue(value);
  371. }
  372. if (is_blob_index) {
  373. *is_blob_index = false;
  374. }
  375. }
  376. static void HandleTypeDeletion(
  377. const Slice& lookup_user_key, bool merge_in_progress,
  378. MergeContext* merge_context, const MergeOperator* merge_operator,
  379. SystemClock* clock, Statistics* statistics, Logger* logger, Status* s,
  380. std::string* out_value, PinnableWideColumns* out_columns) {
  381. if (merge_in_progress) {
  382. if (out_value || out_columns) {
  383. // `op_failure_scope` (an output parameter) is not provided (set to
  384. // nullptr) since a failure must be propagated regardless of its
  385. // value.
  386. *s = MergeHelper::TimedFullMerge(
  387. merge_operator, lookup_user_key, MergeHelper::kNoBaseValue,
  388. merge_context->GetOperands(), logger, statistics, clock,
  389. /* update_num_ops_stats */ true,
  390. /* op_failure_scope */ nullptr, out_value, out_columns);
  391. } else {
  392. // We have found a final value (a base deletion) and have newer
  393. // merge operands that we do not intend to merge. Nothing remains
  394. // to be done so assign status to OK.
  395. *s = Status::OK();
  396. }
  397. } else {
  398. *s = Status::NotFound();
  399. }
  400. }
  401. // Returns if a final value is found.
  402. static bool HandleTypeMerge(const Slice& lookup_user_key, const Slice& value,
  403. bool value_pinned, bool do_merge,
  404. MergeContext* merge_context,
  405. const MergeOperator* merge_operator,
  406. SystemClock* clock, Statistics* statistics,
  407. Logger* logger, Status* s, std::string* out_value,
  408. PinnableWideColumns* out_columns) {
  409. if (!merge_operator) {
  410. *s = Status::InvalidArgument(
  411. "merge_operator is not properly initialized.");
  412. // Normally we continue the loop (return true) when we see a merge
  413. // operand. But in case of an error, we should stop the loop
  414. // immediately and pretend we have found the value to stop further
  415. // seek. Otherwise, the later call will override this error status.
  416. return true;
  417. }
  418. merge_context->PushOperand(value, value_pinned /* operand_pinned */);
  419. PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1);
  420. if (do_merge && merge_operator->ShouldMerge(
  421. merge_context->GetOperandsDirectionBackward())) {
  422. if (out_value || out_columns) {
  423. // `op_failure_scope` (an output parameter) is not provided (set to
  424. // nullptr) since a failure must be propagated regardless of its
  425. // value.
  426. *s = MergeHelper::TimedFullMerge(
  427. merge_operator, lookup_user_key, MergeHelper::kNoBaseValue,
  428. merge_context->GetOperands(), logger, statistics, clock,
  429. /* update_num_ops_stats */ true,
  430. /* op_failure_scope */ nullptr, out_value, out_columns);
  431. }
  432. return true;
  433. }
  434. if (merge_context->get_merge_operands_options != nullptr &&
  435. merge_context->get_merge_operands_options->continue_cb != nullptr &&
  436. !merge_context->get_merge_operands_options->continue_cb(value)) {
  437. // We were told not to continue. `status` may be MergeInProress(),
  438. // overwrite to signal the end of successful get. This status
  439. // will be checked at the end of GetImpl().
  440. *s = Status::OK();
  441. return true;
  442. }
  443. // no final value found yet
  444. return false;
  445. }
  446. void MarkForFlush() { marked_for_flush_.StoreRelaxed(true); }
  447. bool IsMarkedForFlush() const { return marked_for_flush_.LoadRelaxed(); }
  448. protected:
  449. friend class MemTableList;
  450. int refs_{0};
  451. // These are used to manage memtable flushes to storage
  452. bool flush_in_progress_{false}; // started the flush
  453. bool flush_completed_{false}; // finished the flush
  454. uint64_t file_number_{0};
  455. // The updates to be applied to the transaction log when this
  456. // memtable is flushed to storage.
  457. VersionEdit edit_;
  458. // The log files earlier than this number can be deleted.
  459. uint64_t mem_next_walfile_number_{0};
  460. // Memtable id to track flush.
  461. uint64_t id_ = 0;
  462. // Sequence number of the atomic flush that is responsible for this memtable.
  463. // The sequence number of atomic flush is a seq, such that no writes with
  464. // sequence numbers greater than or equal to seq are flushed, while all
  465. // writes with sequence number smaller than seq are flushed.
  466. SequenceNumber atomic_flush_seqno_{kMaxSequenceNumber};
  467. // Flush job info of the current memtable.
  468. std::unique_ptr<FlushJobInfo> flush_job_info_;
  469. RelaxedAtomic<bool> marked_for_flush_{false};
  470. };
  471. class MemTable final : public ReadOnlyMemTable {
  472. public:
  473. struct KeyComparator final : public MemTableRep::KeyComparator {
  474. const InternalKeyComparator comparator;
  475. explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
  476. int operator()(const char* prefix_len_key1,
  477. const char* prefix_len_key2) const override;
  478. int operator()(const char* prefix_len_key,
  479. const DecodedType& key) const override;
  480. };
  481. // earliest_seq should be the current SequenceNumber in the db such that any
  482. // key inserted into this memtable will have an equal or larger seq number.
  483. // (When a db is first created, the earliest sequence number will be 0).
  484. // If the earliest sequence number is not known, kMaxSequenceNumber may be
  485. // used, but this may prevent some transactions from succeeding until the
  486. // first key is inserted into the memtable.
  487. explicit MemTable(const InternalKeyComparator& comparator,
  488. const ImmutableOptions& ioptions,
  489. const MutableCFOptions& mutable_cf_options,
  490. WriteBufferManager* write_buffer_manager,
  491. SequenceNumber earliest_seq, uint32_t column_family_id);
  492. // No copying allowed
  493. MemTable(const MemTable&) = delete;
  494. MemTable& operator=(const MemTable&) = delete;
  495. ~MemTable() override;
  496. const char* Name() const override { return "MemTable"; }
  497. size_t ApproximateMemoryUsage() override;
  498. // As a cheap version of `ApproximateMemoryUsage()`, this function doesn't
  499. // require external synchronization. The value may be less accurate though
  500. size_t ApproximateMemoryUsageFast() const {
  501. return approximate_memory_usage_.LoadRelaxed();
  502. }
  503. size_t MemoryAllocatedBytes() const override {
  504. return table_->ApproximateMemoryUsage() +
  505. range_del_table_->ApproximateMemoryUsage() +
  506. arena_.MemoryAllocatedBytes();
  507. }
  508. void UniqueRandomSample(const uint64_t& target_sample_size,
  509. std::unordered_set<const char*>* entries) override {
  510. // TODO(bjlemaire): at the moment, only supported by skiplistrep.
  511. // Extend it to all other memtable representations.
  512. table_->UniqueRandomSample(NumEntries(), target_sample_size, entries);
  513. }
  514. // This method heuristically determines if the memtable should continue to
  515. // host more data.
  516. bool ShouldScheduleFlush() const {
  517. return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  518. }
  519. // Returns true if a flush should be scheduled and the caller should
  520. // be the one to schedule it
  521. bool MarkFlushScheduled() {
  522. auto before = FLUSH_REQUESTED;
  523. return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
  524. std::memory_order_relaxed,
  525. std::memory_order_relaxed);
  526. }
  527. InternalIterator* NewIterator(
  528. const ReadOptions& read_options,
  529. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  530. const SliceTransform* prefix_extractor, bool for_flush) override;
  531. InternalIterator* NewTimestampStrippingIterator(
  532. const ReadOptions& read_options,
  533. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
  534. const SliceTransform* prefix_extractor, size_t ts_sz) override;
  535. FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
  536. const ReadOptions& read_options, SequenceNumber read_seq,
  537. bool immutable_memtable) override;
  538. FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
  539. const ReadOptions& read_options, SequenceNumber read_seq,
  540. size_t ts_sz) override;
  541. Status VerifyEncodedEntry(Slice encoded,
  542. const ProtectionInfoKVOS64& kv_prot_info);
  543. // Add an entry into memtable that maps key to value at the
  544. // specified sequence number and with the specified type.
  545. // Typically, value will be empty if type==kTypeDeletion.
  546. //
  547. // REQUIRES: if allow_concurrent = false, external synchronization to prevent
  548. // simultaneous operations on the same MemTable.
  549. //
  550. // Returns `Status::TryAgain` if the `seq`, `key` combination already exists
  551. // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
  552. // The next attempt should try a larger value for `seq`.
  553. Status Add(SequenceNumber seq, ValueType type, const Slice& key,
  554. const Slice& value, const ProtectionInfoKVOS64* kv_prot_info,
  555. bool allow_concurrent = false,
  556. MemTablePostProcessInfo* post_process_info = nullptr,
  557. void** hint = nullptr);
  558. using ReadOnlyMemTable::Get;
  559. bool Get(const LookupKey& key, std::string* value,
  560. PinnableWideColumns* columns, std::string* timestamp, Status* s,
  561. MergeContext* merge_context,
  562. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  563. const ReadOptions& read_opts, bool immutable_memtable,
  564. ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
  565. bool do_merge = true) override;
  566. void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  567. ReadCallback* callback, bool immutable_memtable) override;
  568. // If `key` exists in current memtable with type value_type and the existing
  569. // value is at least as large as the new value, updates it in-place. Otherwise
  570. // adds the new value to the memtable out-of-place.
  571. //
  572. // Returns `Status::TryAgain` if the `seq`, `key` combination already exists
  573. // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
  574. // The next attempt should try a larger value for `seq`.
  575. //
  576. // REQUIRES: external synchronization to prevent simultaneous
  577. // operations on the same MemTable.
  578. Status Update(SequenceNumber seq, ValueType value_type, const Slice& key,
  579. const Slice& value, const ProtectionInfoKVOS64* kv_prot_info);
  580. // If `key` exists in current memtable with type `kTypeValue` and the existing
  581. // value is at least as large as the new value, updates it in-place. Otherwise
  582. // if `key` exists in current memtable with type `kTypeValue`, adds the new
  583. // value to the memtable out-of-place.
  584. //
  585. // Returns `Status::NotFound` if `key` does not exist in current memtable or
  586. // the latest version of `key` does not have `kTypeValue`.
  587. //
  588. // Returns `Status::TryAgain` if the `seq`, `key` combination already exists
  589. // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
  590. // The next attempt should try a larger value for `seq`.
  591. //
  592. // REQUIRES: external synchronization to prevent simultaneous
  593. // operations on the same MemTable.
  594. Status UpdateCallback(SequenceNumber seq, const Slice& key,
  595. const Slice& delta,
  596. const ProtectionInfoKVOS64* kv_prot_info);
  597. // Returns the number of successive merge entries starting from the newest
  598. // entry for the key. The count ends when the oldest entry in the memtable
  599. // with which the newest entry would be merged is reached, or the count
  600. // reaches `limit`.
  601. size_t CountSuccessiveMergeEntries(const LookupKey& key, size_t limit);
  602. // Update counters and flush status after inserting a whole write batch
  603. // Used in concurrent memtable inserts.
  604. void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {
  605. table_->BatchPostProcess();
  606. num_entries_.FetchAddRelaxed(update_counters.num_entries);
  607. data_size_.FetchAddRelaxed(update_counters.data_size);
  608. if (update_counters.num_deletes != 0) {
  609. num_deletes_.FetchAddRelaxed(update_counters.num_deletes);
  610. }
  611. if (update_counters.num_range_deletes > 0) {
  612. num_range_deletes_.FetchAddRelaxed(update_counters.num_range_deletes);
  613. // noop for skip-list memtable
  614. // Besides correctness test in stress test, memtable flush record count
  615. // check will catch this if it were not noop.
  616. // range_del_table_->BatchPostProcess();
  617. }
  618. UpdateFlushState();
  619. }
  620. uint64_t NumEntries() const override { return num_entries_.LoadRelaxed(); }
  621. uint64_t NumDeletion() const override { return num_deletes_.LoadRelaxed(); }
  622. uint64_t NumRangeDeletion() const override {
  623. return num_range_deletes_.LoadRelaxed();
  624. }
  625. uint64_t GetDataSize() const override { return data_size_.LoadRelaxed(); }
  626. size_t write_buffer_size() const { return write_buffer_size_.LoadRelaxed(); }
  627. // Dynamically change the memtable's capacity. If set below the current usage,
  628. // the next key added will trigger a flush. Can only increase size when
  629. // memtable prefix bloom is disabled, since we can't easily allocate more
  630. // space. Non-atomic update ok because this is only called with DB mutex held.
  631. void UpdateWriteBufferSize(size_t new_write_buffer_size) {
  632. if (bloom_filter_ == nullptr ||
  633. new_write_buffer_size < write_buffer_size_.LoadRelaxed()) {
  634. write_buffer_size_.StoreRelaxed(new_write_buffer_size);
  635. }
  636. }
  637. bool IsEmpty() const override { return first_seqno_ == 0; }
  638. SequenceNumber GetFirstSequenceNumber() override {
  639. return first_seqno_.load(std::memory_order_relaxed);
  640. }
  641. // Returns the sequence number of the first element that was inserted
  642. // into the memtable.
  643. // REQUIRES: external synchronization to prevent simultaneous
  644. // operations on the same MemTable (unless this Memtable is immutable).
  645. void SetFirstSequenceNumber(SequenceNumber first_seqno) {
  646. return first_seqno_.store(first_seqno, std::memory_order_relaxed);
  647. }
  648. SequenceNumber GetEarliestSequenceNumber() override {
  649. // With file ingestion and empty memtable, this seqno needs to be fixed.
  650. return earliest_seqno_.load(std::memory_order_relaxed);
  651. }
  652. // Sets the sequence number that is guaranteed to be smaller than or equal
  653. // to the sequence number of any key that could be inserted into this
  654. // memtable. It can then be assumed that any write with a larger(or equal)
  655. // sequence number will be present in this memtable or a later memtable.
  656. // Used only for MemPurge operation
  657. void SetEarliestSequenceNumber(SequenceNumber earliest_seqno) {
  658. return earliest_seqno_.store(earliest_seqno, std::memory_order_relaxed);
  659. }
  660. // DB's latest sequence ID when the memtable is created. This number
  661. // may be updated to a more recent one before any key is inserted.
  662. SequenceNumber GetCreationSeq() const { return creation_seq_; }
  663. void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
  664. // If this memtable contains data from a committed two phase transaction we
  665. // must take note of the log which contains that data so we can know when
  666. // to release that log.
  667. void RefLogContainingPrepSection(uint64_t log);
  668. uint64_t GetMinLogContainingPrepSection() override;
  669. void MarkImmutable() override {
  670. table_->MarkReadOnly();
  671. mem_tracker_.DoneAllocating();
  672. }
  673. void MarkFlushed() override { table_->MarkFlushed(); }
  674. // return true if the current MemTableRep supports merge operator.
  675. bool IsMergeOperatorSupported() const {
  676. return table_->IsMergeOperatorSupported();
  677. }
  678. // return true if the current MemTableRep supports snapshots.
  679. // inplace update prevents snapshots,
  680. bool IsSnapshotSupported() const {
  681. return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
  682. }
  683. MemTableStats ApproximateStats(const Slice& start_ikey,
  684. const Slice& end_ikey) override;
  685. // Get the lock associated for the key
  686. port::RWMutex* GetLock(const Slice& key);
  687. const InternalKeyComparator& GetInternalKeyComparator() const override {
  688. return comparator_.comparator;
  689. }
  690. const ImmutableMemTableOptions* GetImmutableMemTableOptions() const {
  691. return &moptions_;
  692. }
  693. uint64_t ApproximateOldestKeyTime() const override {
  694. return oldest_key_time_.load(std::memory_order_relaxed);
  695. }
  696. // Returns a heuristic flush decision
  697. bool ShouldFlushNow();
  698. // Updates `fragmented_range_tombstone_list_` that will be used to serve reads
  699. // when this memtable becomes an immutable memtable (in some
  700. // MemtableListVersion::memlist_). Should be called when this memtable is
  701. // about to become immutable. May be called multiple times since
  702. // SwitchMemtable() may fail.
  703. void ConstructFragmentedRangeTombstones();
  704. bool IsFragmentedRangeTombstonesConstructed() const override {
  705. return fragmented_range_tombstone_list_.get() != nullptr ||
  706. is_range_del_table_empty_.LoadRelaxed();
  707. }
  708. // Gets the newest user defined timestamps in the memtable. This should only
  709. // be called when user defined timestamp is enabled.
  710. const Slice& GetNewestUDT() const override;
  711. // Returns Corruption status if verification fails.
  712. static Status VerifyEntryChecksum(const char* entry,
  713. uint32_t protection_bytes_per_key,
  714. bool allow_data_in_errors = false);
  715. // Validate the checksum of the key/value pair.
  716. Status ValidateKey(const char* key, bool allow_data_in_errors);
  717. private:
  718. enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
  719. friend class MemTableIterator;
  720. friend class MemTableBackwardIterator;
  721. friend class MemTableList;
  722. KeyComparator comparator_;
  723. const ImmutableMemTableOptions moptions_;
  724. const size_t kArenaBlockSize;
  725. AllocTracker mem_tracker_;
  726. ConcurrentArena arena_;
  727. std::unique_ptr<MemTableRep> table_;
  728. std::unique_ptr<MemTableRep> range_del_table_;
  729. // This is OK to be relaxed access because consistency between table_ and
  730. // range_del_table_ is provided by explicit multi-versioning with sequence
  731. // numbers. It's ok for stale memory to say the range_del_table_ is empty when
  732. // it's actually not because if it was relevant to our read (based on sequence
  733. // number), the relaxed memory read would get a sufficiently updated value
  734. // because of the ordering provided by LastPublishedSequence().
  735. RelaxedAtomic<bool> is_range_del_table_empty_;
  736. // Total data size of all data inserted
  737. RelaxedAtomic<uint64_t> data_size_;
  738. RelaxedAtomic<uint64_t> num_entries_;
  739. RelaxedAtomic<uint64_t> num_deletes_;
  740. RelaxedAtomic<uint64_t> num_range_deletes_;
  741. // Dynamically changeable memtable option
  742. RelaxedAtomic<size_t> write_buffer_size_;
  743. // The sequence number of the kv that was inserted first
  744. std::atomic<SequenceNumber> first_seqno_;
  745. // The db sequence number at the time of creation or kMaxSequenceNumber
  746. // if not set.
  747. std::atomic<SequenceNumber> earliest_seqno_;
  748. SequenceNumber creation_seq_;
  749. // the earliest log containing a prepared section
  750. // which has been inserted into this memtable.
  751. std::atomic<uint64_t> min_prep_log_referenced_;
  752. // rw locks for inplace updates
  753. std::vector<port::RWMutex> locks_;
  754. const SliceTransform* const prefix_extractor_;
  755. std::unique_ptr<DynamicBloom> bloom_filter_;
  756. std::atomic<FlushStateEnum> flush_state_;
  757. SystemClock* clock_;
  758. // Extract sequential insert prefixes.
  759. const SliceTransform* insert_with_hint_prefix_extractor_;
  760. // Insert hints for each prefix.
  761. UnorderedMapH<Slice, void*, SliceHasher32> insert_hints_;
  762. // Timestamp of oldest key
  763. std::atomic<uint64_t> oldest_key_time_;
  764. // keep track of memory usage in table_, arena_, and range_del_table_.
  765. // Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
  766. RelaxedAtomic<uint64_t> approximate_memory_usage_;
  767. // max range deletions in a memtable, before automatic flushing, 0 for
  768. // unlimited.
  769. uint32_t memtable_max_range_deletions_ = 0;
  770. // Size in bytes for the user-defined timestamps.
  771. size_t ts_sz_;
  772. // Newest user-defined timestamp contained in this MemTable. For ts1, and ts2
  773. // if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than
  774. // ts2. We track this field for a MemTable if its column family has UDT
  775. // feature enabled.
  776. Slice newest_udt_;
  777. // Updates flush_state_ using ShouldFlushNow()
  778. void UpdateFlushState();
  779. void UpdateOldestKeyTime();
  780. void GetFromTable(const LookupKey& key,
  781. SequenceNumber max_covering_tombstone_seq, bool do_merge,
  782. ReadCallback* callback, bool* is_blob_index,
  783. std::string* value, PinnableWideColumns* columns,
  784. std::string* timestamp, Status* s,
  785. MergeContext* merge_context, SequenceNumber* seq,
  786. bool* found_final_value, bool* merge_in_progress);
  787. // Always returns non-null and assumes certain pre-checks (e.g.,
  788. // is_range_del_table_empty_) are done. This is only valid during the lifetime
  789. // of the underlying memtable.
  790. // read_seq and read_options.timestamp will be used as the upper bound
  791. // for range tombstones.
  792. FragmentedRangeTombstoneIterator* NewRangeTombstoneIteratorInternal(
  793. const ReadOptions& read_options, SequenceNumber read_seq,
  794. bool immutable_memtable);
  795. // The fragmented range tombstones of this memtable.
  796. // This is constructed when this memtable becomes immutable
  797. // if !is_range_del_table_empty_.
  798. std::unique_ptr<FragmentedRangeTombstoneList>
  799. fragmented_range_tombstone_list_;
  800. // The fragmented range tombstone of this memtable with all keys' user-defined
  801. // timestamps logically stripped. This is constructed and used by flush when
  802. // user-defined timestamps in memtable only feature is enabled.
  803. std::unique_ptr<FragmentedRangeTombstoneList>
  804. timestamp_stripping_fragmented_range_tombstone_list_;
  805. // makes sure there is a single range tombstone writer to invalidate cache
  806. std::mutex range_del_mutex_;
  807. #if defined(__cpp_lib_atomic_shared_ptr)
  808. CoreLocalArray<
  809. std::atomic<std::shared_ptr<FragmentedRangeTombstoneListCache>>>
  810. cached_range_tombstone_;
  811. #else
  812. CoreLocalArray<std::shared_ptr<FragmentedRangeTombstoneListCache>>
  813. cached_range_tombstone_;
  814. #endif
  815. void UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
  816. const Slice& key, const Slice& value, ValueType type,
  817. SequenceNumber s, char* checksum_ptr);
  818. void MaybeUpdateNewestUDT(const Slice& user_key);
  819. const std::function<Status(const char*, bool)> key_validation_callback_;
  820. };
  821. const char* EncodeKey(std::string* scratch, const Slice& target);
  822. } // namespace ROCKSDB_NAMESPACE