memtable.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <deque>
  12. #include <functional>
  13. #include <memory>
  14. #include <string>
  15. #include <unordered_map>
  16. #include <vector>
  17. #include "db/dbformat.h"
  18. #include "db/range_tombstone_fragmenter.h"
  19. #include "db/read_callback.h"
  20. #include "db/version_edit.h"
  21. #include "memory/allocator.h"
  22. #include "memory/concurrent_arena.h"
  23. #include "monitoring/instrumented_mutex.h"
  24. #include "options/cf_options.h"
  25. #include "rocksdb/db.h"
  26. #include "rocksdb/env.h"
  27. #include "rocksdb/memtablerep.h"
  28. #include "table/multiget_context.h"
  29. #include "util/dynamic_bloom.h"
  30. #include "util/hash.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. struct FlushJobInfo;
  33. class Mutex;
  34. class MemTableIterator;
  35. class MergeContext;
  36. struct ImmutableMemTableOptions {
  37. explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions,
  38. const MutableCFOptions& mutable_cf_options);
  39. size_t arena_block_size;
  40. uint32_t memtable_prefix_bloom_bits;
  41. size_t memtable_huge_page_size;
  42. bool memtable_whole_key_filtering;
  43. bool inplace_update_support;
  44. size_t inplace_update_num_locks;
  45. UpdateStatus (*inplace_callback)(char* existing_value,
  46. uint32_t* existing_value_size,
  47. Slice delta_value,
  48. std::string* merged_value);
  49. size_t max_successive_merges;
  50. Statistics* statistics;
  51. MergeOperator* merge_operator;
  52. Logger* info_log;
  53. };
  54. // Batched counters to updated when inserting keys in one write batch.
  55. // In post process of the write batch, these can be updated together.
  56. // Only used in concurrent memtable insert case.
  57. struct MemTablePostProcessInfo {
  58. uint64_t data_size = 0;
  59. uint64_t num_entries = 0;
  60. uint64_t num_deletes = 0;
  61. };
  62. using MultiGetRange = MultiGetContext::Range;
  63. // Note: Many of the methods in this class have comments indicating that
  64. // external synchronization is required as these methods are not thread-safe.
  65. // It is up to higher layers of code to decide how to prevent concurrent
  66. // invokation of these methods. This is usually done by acquiring either
  67. // the db mutex or the single writer thread.
  68. //
  69. // Some of these methods are documented to only require external
  70. // synchronization if this memtable is immutable. Calling MarkImmutable() is
  71. // not sufficient to guarantee immutability. It is up to higher layers of
  72. // code to determine if this MemTable can still be modified by other threads.
  73. // Eg: The Superversion stores a pointer to the current MemTable (that can
  74. // be modified) and a separate list of the MemTables that can no longer be
  75. // written to (aka the 'immutable memtables').
  76. class MemTable {
  77. public:
  78. struct KeyComparator : public MemTableRep::KeyComparator {
  79. const InternalKeyComparator comparator;
  80. explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
  81. virtual int operator()(const char* prefix_len_key1,
  82. const char* prefix_len_key2) const override;
  83. virtual int operator()(const char* prefix_len_key,
  84. const DecodedType& key) const override;
  85. };
  86. // MemTables are reference counted. The initial reference count
  87. // is zero and the caller must call Ref() at least once.
  88. //
  89. // earliest_seq should be the current SequenceNumber in the db such that any
  90. // key inserted into this memtable will have an equal or larger seq number.
  91. // (When a db is first created, the earliest sequence number will be 0).
  92. // If the earliest sequence number is not known, kMaxSequenceNumber may be
  93. // used, but this may prevent some transactions from succeeding until the
  94. // first key is inserted into the memtable.
  95. explicit MemTable(const InternalKeyComparator& comparator,
  96. const ImmutableCFOptions& ioptions,
  97. const MutableCFOptions& mutable_cf_options,
  98. WriteBufferManager* write_buffer_manager,
  99. SequenceNumber earliest_seq, uint32_t column_family_id);
  100. // No copying allowed
  101. MemTable(const MemTable&) = delete;
  102. MemTable& operator=(const MemTable&) = delete;
  103. // Do not delete this MemTable unless Unref() indicates it not in use.
  104. ~MemTable();
  105. // Increase reference count.
  106. // REQUIRES: external synchronization to prevent simultaneous
  107. // operations on the same MemTable.
  108. void Ref() { ++refs_; }
  109. // Drop reference count.
  110. // If the refcount goes to zero return this memtable, otherwise return null.
  111. // REQUIRES: external synchronization to prevent simultaneous
  112. // operations on the same MemTable.
  113. MemTable* Unref() {
  114. --refs_;
  115. assert(refs_ >= 0);
  116. if (refs_ <= 0) {
  117. return this;
  118. }
  119. return nullptr;
  120. }
  121. // Returns an estimate of the number of bytes of data in use by this
  122. // data structure.
  123. //
  124. // REQUIRES: external synchronization to prevent simultaneous
  125. // operations on the same MemTable (unless this Memtable is immutable).
  126. size_t ApproximateMemoryUsage();
  127. // As a cheap version of `ApproximateMemoryUsage()`, this function doens't
  128. // require external synchronization. The value may be less accurate though
  129. size_t ApproximateMemoryUsageFast() const {
  130. return approximate_memory_usage_.load(std::memory_order_relaxed);
  131. }
  132. // This method heuristically determines if the memtable should continue to
  133. // host more data.
  134. bool ShouldScheduleFlush() const {
  135. return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  136. }
  137. // Returns true if a flush should be scheduled and the caller should
  138. // be the one to schedule it
  139. bool MarkFlushScheduled() {
  140. auto before = FLUSH_REQUESTED;
  141. return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
  142. std::memory_order_relaxed,
  143. std::memory_order_relaxed);
  144. }
  145. // Return an iterator that yields the contents of the memtable.
  146. //
  147. // The caller must ensure that the underlying MemTable remains live
  148. // while the returned iterator is live. The keys returned by this
  149. // iterator are internal keys encoded by AppendInternalKey in the
  150. // db/dbformat.{h,cc} module.
  151. //
  152. // By default, it returns an iterator for prefix seek if prefix_extractor
  153. // is configured in Options.
  154. // arena: If not null, the arena needs to be used to allocate the Iterator.
  155. // Calling ~Iterator of the iterator will destroy all the states but
  156. // those allocated in arena.
  157. InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
  158. FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
  159. const ReadOptions& read_options, SequenceNumber read_seq);
  160. // Add an entry into memtable that maps key to value at the
  161. // specified sequence number and with the specified type.
  162. // Typically value will be empty if type==kTypeDeletion.
  163. //
  164. // REQUIRES: if allow_concurrent = false, external synchronization to prevent
  165. // simultaneous operations on the same MemTable.
  166. //
  167. // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
  168. // the <key, seq> already exists.
  169. bool Add(SequenceNumber seq, ValueType type, const Slice& key,
  170. const Slice& value, bool allow_concurrent = false,
  171. MemTablePostProcessInfo* post_process_info = nullptr,
  172. void** hint = nullptr);
  173. // Used to Get value associated with key or Get Merge Operands associated
  174. // with key.
  175. // If do_merge = true the default behavior which is Get value for key is
  176. // executed. Expected behavior is described right below.
  177. // If memtable contains a value for key, store it in *value and return true.
  178. // If memtable contains a deletion for key, store a NotFound() error
  179. // in *status and return true.
  180. // If memtable contains Merge operation as the most recent entry for a key,
  181. // and the merge process does not stop (not reaching a value or delete),
  182. // prepend the current merge operand to *operands.
  183. // store MergeInProgress in s, and return false.
  184. // Else, return false.
  185. // If any operation was found, its most recent sequence number
  186. // will be stored in *seq on success (regardless of whether true/false is
  187. // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
  188. // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
  189. // status returned indicates a corruption or other unexpected error.
  190. // If do_merge = false then any Merge Operands encountered for key are simply
  191. // stored in merge_context.operands_list and never actually merged to get a
  192. // final value. The raw Merge Operands are eventually returned to the user.
  193. bool Get(const LookupKey& key, std::string* value, Status* s,
  194. MergeContext* merge_context,
  195. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  196. const ReadOptions& read_opts, ReadCallback* callback = nullptr,
  197. bool* is_blob_index = nullptr, bool do_merge = true);
  198. bool Get(const LookupKey& key, std::string* value, Status* s,
  199. MergeContext* merge_context,
  200. SequenceNumber* max_covering_tombstone_seq,
  201. const ReadOptions& read_opts, ReadCallback* callback = nullptr,
  202. bool* is_blob_index = nullptr, bool do_merge = true) {
  203. SequenceNumber seq;
  204. return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
  205. read_opts, callback, is_blob_index, do_merge);
  206. }
  207. void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  208. ReadCallback* callback, bool* is_blob);
  209. // Attempts to update the new_value inplace, else does normal Add
  210. // Pseudocode
  211. // if key exists in current memtable && prev_value is of type kTypeValue
  212. // if new sizeof(new_value) <= sizeof(prev_value)
  213. // update inplace
  214. // else add(key, new_value)
  215. // else add(key, new_value)
  216. //
  217. // REQUIRES: external synchronization to prevent simultaneous
  218. // operations on the same MemTable.
  219. void Update(SequenceNumber seq,
  220. const Slice& key,
  221. const Slice& value);
  222. // If prev_value for key exists, attempts to update it inplace.
  223. // else returns false
  224. // Pseudocode
  225. // if key exists in current memtable && prev_value is of type kTypeValue
  226. // new_value = delta(prev_value)
  227. // if sizeof(new_value) <= sizeof(prev_value)
  228. // update inplace
  229. // else add(key, new_value)
  230. // else return false
  231. //
  232. // REQUIRES: external synchronization to prevent simultaneous
  233. // operations on the same MemTable.
  234. bool UpdateCallback(SequenceNumber seq,
  235. const Slice& key,
  236. const Slice& delta);
  237. // Returns the number of successive merge entries starting from the newest
  238. // entry for the key up to the last non-merge entry or last entry for the
  239. // key in the memtable.
  240. size_t CountSuccessiveMergeEntries(const LookupKey& key);
  241. // Update counters and flush status after inserting a whole write batch
  242. // Used in concurrent memtable inserts.
  243. void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {
  244. num_entries_.fetch_add(update_counters.num_entries,
  245. std::memory_order_relaxed);
  246. data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed);
  247. if (update_counters.num_deletes != 0) {
  248. num_deletes_.fetch_add(update_counters.num_deletes,
  249. std::memory_order_relaxed);
  250. }
  251. UpdateFlushState();
  252. }
  253. // Get total number of entries in the mem table.
  254. // REQUIRES: external synchronization to prevent simultaneous
  255. // operations on the same MemTable (unless this Memtable is immutable).
  256. uint64_t num_entries() const {
  257. return num_entries_.load(std::memory_order_relaxed);
  258. }
  259. // Get total number of deletes in the mem table.
  260. // REQUIRES: external synchronization to prevent simultaneous
  261. // operations on the same MemTable (unless this Memtable is immutable).
  262. uint64_t num_deletes() const {
  263. return num_deletes_.load(std::memory_order_relaxed);
  264. }
  265. uint64_t get_data_size() const {
  266. return data_size_.load(std::memory_order_relaxed);
  267. }
  268. // Dynamically change the memtable's capacity. If set below the current usage,
  269. // the next key added will trigger a flush. Can only increase size when
  270. // memtable prefix bloom is disabled, since we can't easily allocate more
  271. // space.
  272. void UpdateWriteBufferSize(size_t new_write_buffer_size) {
  273. if (bloom_filter_ == nullptr ||
  274. new_write_buffer_size < write_buffer_size_) {
  275. write_buffer_size_.store(new_write_buffer_size,
  276. std::memory_order_relaxed);
  277. }
  278. }
  279. // Returns the edits area that is needed for flushing the memtable
  280. VersionEdit* GetEdits() { return &edit_; }
  281. // Returns if there is no entry inserted to the mem table.
  282. // REQUIRES: external synchronization to prevent simultaneous
  283. // operations on the same MemTable (unless this Memtable is immutable).
  284. bool IsEmpty() const { return first_seqno_ == 0; }
  285. // Returns the sequence number of the first element that was inserted
  286. // into the memtable.
  287. // REQUIRES: external synchronization to prevent simultaneous
  288. // operations on the same MemTable (unless this Memtable is immutable).
  289. SequenceNumber GetFirstSequenceNumber() {
  290. return first_seqno_.load(std::memory_order_relaxed);
  291. }
  292. // Returns the sequence number that is guaranteed to be smaller than or equal
  293. // to the sequence number of any key that could be inserted into this
  294. // memtable. It can then be assumed that any write with a larger(or equal)
  295. // sequence number will be present in this memtable or a later memtable.
  296. //
  297. // If the earliest sequence number could not be determined,
  298. // kMaxSequenceNumber will be returned.
  299. SequenceNumber GetEarliestSequenceNumber() {
  300. return earliest_seqno_.load(std::memory_order_relaxed);
  301. }
  302. // DB's latest sequence ID when the memtable is created. This number
  303. // may be updated to a more recent one before any key is inserted.
  304. SequenceNumber GetCreationSeq() const { return creation_seq_; }
  305. void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
  306. // Returns the next active logfile number when this memtable is about to
  307. // be flushed to storage
  308. // REQUIRES: external synchronization to prevent simultaneous
  309. // operations on the same MemTable.
  310. uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
  311. // Sets the next active logfile number when this memtable is about to
  312. // be flushed to storage
  313. // REQUIRES: external synchronization to prevent simultaneous
  314. // operations on the same MemTable.
  315. void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
  316. // if this memtable contains data from a committed
  317. // two phase transaction we must take note of the
  318. // log which contains that data so we can know
  319. // when to relese that log
  320. void RefLogContainingPrepSection(uint64_t log);
  321. uint64_t GetMinLogContainingPrepSection();
  322. // Notify the underlying storage that no more items will be added.
  323. // REQUIRES: external synchronization to prevent simultaneous
  324. // operations on the same MemTable.
  325. // After MarkImmutable() is called, you should not attempt to
  326. // write anything to this MemTable(). (Ie. do not call Add() or Update()).
  327. void MarkImmutable() {
  328. table_->MarkReadOnly();
  329. mem_tracker_.DoneAllocating();
  330. }
  331. // Notify the underlying storage that all data it contained has been
  332. // persisted.
  333. // REQUIRES: external synchronization to prevent simultaneous
  334. // operations on the same MemTable.
  335. void MarkFlushed() {
  336. table_->MarkFlushed();
  337. }
  338. // return true if the current MemTableRep supports merge operator.
  339. bool IsMergeOperatorSupported() const {
  340. return table_->IsMergeOperatorSupported();
  341. }
  342. // return true if the current MemTableRep supports snapshots.
  343. // inplace update prevents snapshots,
  344. bool IsSnapshotSupported() const {
  345. return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
  346. }
  347. struct MemTableStats {
  348. uint64_t size;
  349. uint64_t count;
  350. };
  351. MemTableStats ApproximateStats(const Slice& start_ikey,
  352. const Slice& end_ikey);
  353. // Get the lock associated for the key
  354. port::RWMutex* GetLock(const Slice& key);
  355. const InternalKeyComparator& GetInternalKeyComparator() const {
  356. return comparator_.comparator;
  357. }
  358. const ImmutableMemTableOptions* GetImmutableMemTableOptions() const {
  359. return &moptions_;
  360. }
  361. uint64_t ApproximateOldestKeyTime() const {
  362. return oldest_key_time_.load(std::memory_order_relaxed);
  363. }
  364. // REQUIRES: db_mutex held.
  365. void SetID(uint64_t id) { id_ = id; }
  366. uint64_t GetID() const { return id_; }
  367. void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
  368. uint64_t GetFileNumber() const { return file_number_; }
  369. void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
  370. void SetFlushInProgress(bool in_progress) {
  371. flush_in_progress_ = in_progress;
  372. }
  373. #ifndef ROCKSDB_LITE
  374. void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
  375. flush_job_info_ = std::move(info);
  376. }
  377. std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
  378. return std::move(flush_job_info_);
  379. }
  380. #endif // !ROCKSDB_LITE
  381. private:
  382. enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
  383. friend class MemTableIterator;
  384. friend class MemTableBackwardIterator;
  385. friend class MemTableList;
  386. KeyComparator comparator_;
  387. const ImmutableMemTableOptions moptions_;
  388. int refs_;
  389. const size_t kArenaBlockSize;
  390. AllocTracker mem_tracker_;
  391. ConcurrentArena arena_;
  392. std::unique_ptr<MemTableRep> table_;
  393. std::unique_ptr<MemTableRep> range_del_table_;
  394. std::atomic_bool is_range_del_table_empty_;
  395. // Total data size of all data inserted
  396. std::atomic<uint64_t> data_size_;
  397. std::atomic<uint64_t> num_entries_;
  398. std::atomic<uint64_t> num_deletes_;
  399. // Dynamically changeable memtable option
  400. std::atomic<size_t> write_buffer_size_;
  401. // These are used to manage memtable flushes to storage
  402. bool flush_in_progress_; // started the flush
  403. bool flush_completed_; // finished the flush
  404. uint64_t file_number_; // filled up after flush is complete
  405. // The updates to be applied to the transaction log when this
  406. // memtable is flushed to storage.
  407. VersionEdit edit_;
  408. // The sequence number of the kv that was inserted first
  409. std::atomic<SequenceNumber> first_seqno_;
  410. // The db sequence number at the time of creation or kMaxSequenceNumber
  411. // if not set.
  412. std::atomic<SequenceNumber> earliest_seqno_;
  413. SequenceNumber creation_seq_;
  414. // The log files earlier than this number can be deleted.
  415. uint64_t mem_next_logfile_number_;
  416. // the earliest log containing a prepared section
  417. // which has been inserted into this memtable.
  418. std::atomic<uint64_t> min_prep_log_referenced_;
  419. // rw locks for inplace updates
  420. std::vector<port::RWMutex> locks_;
  421. const SliceTransform* const prefix_extractor_;
  422. std::unique_ptr<DynamicBloom> bloom_filter_;
  423. std::atomic<FlushStateEnum> flush_state_;
  424. Env* env_;
  425. // Extract sequential insert prefixes.
  426. const SliceTransform* insert_with_hint_prefix_extractor_;
  427. // Insert hints for each prefix.
  428. std::unordered_map<Slice, void*, SliceHasher> insert_hints_;
  429. // Timestamp of oldest key
  430. std::atomic<uint64_t> oldest_key_time_;
  431. // Memtable id to track flush.
  432. uint64_t id_ = 0;
  433. // Sequence number of the atomic flush that is responsible for this memtable.
  434. // The sequence number of atomic flush is a seq, such that no writes with
  435. // sequence numbers greater than or equal to seq are flushed, while all
  436. // writes with sequence number smaller than seq are flushed.
  437. SequenceNumber atomic_flush_seqno_;
  438. // keep track of memory usage in table_, arena_, and range_del_table_.
  439. // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
  440. std::atomic<uint64_t> approximate_memory_usage_;
  441. #ifndef ROCKSDB_LITE
  442. // Flush job info of the current memtable.
  443. std::unique_ptr<FlushJobInfo> flush_job_info_;
  444. #endif // !ROCKSDB_LITE
  445. // Returns a heuristic flush decision
  446. bool ShouldFlushNow();
  447. // Updates flush_state_ using ShouldFlushNow()
  448. void UpdateFlushState();
  449. void UpdateOldestKeyTime();
  450. void GetFromTable(const LookupKey& key,
  451. SequenceNumber max_covering_tombstone_seq, bool do_merge,
  452. ReadCallback* callback, bool* is_blob_index,
  453. std::string* value, Status* s, MergeContext* merge_context,
  454. SequenceNumber* seq, bool* found_final_value,
  455. bool* merge_in_progress);
  456. };
  457. extern const char* EncodeKey(std::string* scratch, const Slice& target);
  458. } // namespace ROCKSDB_NAMESPACE