| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#pragma once#include <atomic>#include <deque>#include <functional>#include <memory>#include <string>#include <unordered_map>#include <vector>#include "db/dbformat.h"#include "db/range_tombstone_fragmenter.h"#include "db/read_callback.h"#include "db/version_edit.h"#include "memory/allocator.h"#include "memory/concurrent_arena.h"#include "monitoring/instrumented_mutex.h"#include "options/cf_options.h"#include "rocksdb/db.h"#include "rocksdb/env.h"#include "rocksdb/memtablerep.h"#include "table/multiget_context.h"#include "util/dynamic_bloom.h"#include "util/hash.h"namespace ROCKSDB_NAMESPACE {struct FlushJobInfo;class Mutex;class MemTableIterator;class MergeContext;struct ImmutableMemTableOptions {  explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions,                                    const MutableCFOptions& mutable_cf_options);  size_t arena_block_size;  uint32_t memtable_prefix_bloom_bits;  size_t memtable_huge_page_size;  bool memtable_whole_key_filtering;  bool inplace_update_support;  size_t inplace_update_num_locks;  UpdateStatus (*inplace_callback)(char* existing_value,                                   uint32_t* existing_value_size,                                   Slice delta_value,                                   std::string* merged_value);  size_t max_successive_merges;  Statistics* statistics;  MergeOperator* merge_operator;  Logger* info_log;};// Batched counters to updated when inserting keys in one write batch.// In post process of the write batch, these can be updated together.// Only used in concurrent memtable insert case.struct MemTablePostProcessInfo {  uint64_t data_size = 0;  uint64_t num_entries = 0;  uint64_t num_deletes = 0;};using MultiGetRange = MultiGetContext::Range;// Note:  Many of the methods in this class have comments indicating that// external synchronization is required as these methods are not thread-safe.// It is up to higher layers of code to decide how to prevent concurrent// invokation of these methods.  This is usually done by acquiring either// the db mutex or the single writer thread.//// Some of these methods are documented to only require external// synchronization if this memtable is immutable.  Calling MarkImmutable() is// not sufficient to guarantee immutability.  It is up to higher layers of// code to determine if this MemTable can still be modified by other threads.// Eg: The Superversion stores a pointer to the current MemTable (that can// be modified) and a separate list of the MemTables that can no longer be// written to (aka the 'immutable memtables').class MemTable { public:  struct KeyComparator : public MemTableRep::KeyComparator {    const InternalKeyComparator comparator;    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }    virtual int operator()(const char* prefix_len_key1,                           const char* prefix_len_key2) const override;    virtual int operator()(const char* prefix_len_key,                           const DecodedType& key) const override;  };  // MemTables are reference counted.  The initial reference count  // is zero and the caller must call Ref() at least once.  //  // earliest_seq should be the current SequenceNumber in the db such that any  // key inserted into this memtable will have an equal or larger seq number.  // (When a db is first created, the earliest sequence number will be 0).  // If the earliest sequence number is not known, kMaxSequenceNumber may be  // used, but this may prevent some transactions from succeeding until the  // first key is inserted into the memtable.  explicit MemTable(const InternalKeyComparator& comparator,                    const ImmutableCFOptions& ioptions,                    const MutableCFOptions& mutable_cf_options,                    WriteBufferManager* write_buffer_manager,                    SequenceNumber earliest_seq, uint32_t column_family_id);  // No copying allowed  MemTable(const MemTable&) = delete;  MemTable& operator=(const MemTable&) = delete;  // Do not delete this MemTable unless Unref() indicates it not in use.  ~MemTable();  // Increase reference count.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  void Ref() { ++refs_; }  // Drop reference count.  // If the refcount goes to zero return this memtable, otherwise return null.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  MemTable* Unref() {    --refs_;    assert(refs_ >= 0);    if (refs_ <= 0) {      return this;    }    return nullptr;  }  // Returns an estimate of the number of bytes of data in use by this  // data structure.  //  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable (unless this Memtable is immutable).  size_t ApproximateMemoryUsage();  // As a cheap version of `ApproximateMemoryUsage()`, this function doens't  // require external synchronization. The value may be less accurate though  size_t ApproximateMemoryUsageFast() const {    return approximate_memory_usage_.load(std::memory_order_relaxed);  }  // This method heuristically determines if the memtable should continue to  // host more data.  bool ShouldScheduleFlush() const {    return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;  }  // Returns true if a flush should be scheduled and the caller should  // be the one to schedule it  bool MarkFlushScheduled() {    auto before = FLUSH_REQUESTED;    return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,                                                std::memory_order_relaxed,                                                std::memory_order_relaxed);  }  // Return an iterator that yields the contents of the memtable.  //  // The caller must ensure that the underlying MemTable remains live  // while the returned iterator is live.  The keys returned by this  // iterator are internal keys encoded by AppendInternalKey in the  // db/dbformat.{h,cc} module.  //  // By default, it returns an iterator for prefix seek if prefix_extractor  // is configured in Options.  // arena: If not null, the arena needs to be used to allocate the Iterator.  //        Calling ~Iterator of the iterator will destroy all the states but  //        those allocated in arena.  InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);  FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(      const ReadOptions& read_options, SequenceNumber read_seq);  // Add an entry into memtable that maps key to value at the  // specified sequence number and with the specified type.  // Typically value will be empty if type==kTypeDeletion.  //  // REQUIRES: if allow_concurrent = false, external synchronization to prevent  // simultaneous operations on the same MemTable.  //  // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and  // the <key, seq> already exists.  bool Add(SequenceNumber seq, ValueType type, const Slice& key,           const Slice& value, bool allow_concurrent = false,           MemTablePostProcessInfo* post_process_info = nullptr,           void** hint = nullptr);  // Used to Get value associated with key or Get Merge Operands associated  // with key.  // If do_merge = true the default behavior which is Get value for key is  // executed. Expected behavior is described right below.  // If memtable contains a value for key, store it in *value and return true.  // If memtable contains a deletion for key, store a NotFound() error  // in *status and return true.  // If memtable contains Merge operation as the most recent entry for a key,  //   and the merge process does not stop (not reaching a value or delete),  //   prepend the current merge operand to *operands.  //   store MergeInProgress in s, and return false.  // Else, return false.  // If any operation was found, its most recent sequence number  // will be stored in *seq on success (regardless of whether true/false is  // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.  // On success, *s may be set to OK, NotFound, or MergeInProgress.  Any other  // status returned indicates a corruption or other unexpected error.  // If do_merge = false then any Merge Operands encountered for key are simply  // stored in merge_context.operands_list and never actually merged to get a  // final value. The raw Merge Operands are eventually returned to the user.  bool Get(const LookupKey& key, std::string* value, Status* s,           MergeContext* merge_context,           SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,           const ReadOptions& read_opts, ReadCallback* callback = nullptr,           bool* is_blob_index = nullptr, bool do_merge = true);  bool Get(const LookupKey& key, std::string* value, Status* s,           MergeContext* merge_context,           SequenceNumber* max_covering_tombstone_seq,           const ReadOptions& read_opts, ReadCallback* callback = nullptr,           bool* is_blob_index = nullptr, bool do_merge = true) {    SequenceNumber seq;    return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,               read_opts, callback, is_blob_index, do_merge);  }  void MultiGet(const ReadOptions& read_options, MultiGetRange* range,                ReadCallback* callback, bool* is_blob);  // Attempts to update the new_value inplace, else does normal Add  // Pseudocode  //   if key exists in current memtable && prev_value is of type kTypeValue  //     if new sizeof(new_value) <= sizeof(prev_value)  //       update inplace  //     else add(key, new_value)  //   else add(key, new_value)  //  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  void Update(SequenceNumber seq,              const Slice& key,              const Slice& value);  // If prev_value for key exists, attempts to update it inplace.  // else returns false  // Pseudocode  //   if key exists in current memtable && prev_value is of type kTypeValue  //     new_value = delta(prev_value)  //     if sizeof(new_value) <= sizeof(prev_value)  //       update inplace  //     else add(key, new_value)  //   else return false  //  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  bool UpdateCallback(SequenceNumber seq,                      const Slice& key,                      const Slice& delta);  // Returns the number of successive merge entries starting from the newest  // entry for the key up to the last non-merge entry or last entry for the  // key in the memtable.  size_t CountSuccessiveMergeEntries(const LookupKey& key);  // Update counters and flush status after inserting a whole write batch  // Used in concurrent memtable inserts.  void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {    num_entries_.fetch_add(update_counters.num_entries,                           std::memory_order_relaxed);    data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed);    if (update_counters.num_deletes != 0) {      num_deletes_.fetch_add(update_counters.num_deletes,                             std::memory_order_relaxed);    }    UpdateFlushState();  }  // Get total number of entries in the mem table.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable (unless this Memtable is immutable).  uint64_t num_entries() const {    return num_entries_.load(std::memory_order_relaxed);  }  // Get total number of deletes in the mem table.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable (unless this Memtable is immutable).  uint64_t num_deletes() const {    return num_deletes_.load(std::memory_order_relaxed);  }  uint64_t get_data_size() const {    return data_size_.load(std::memory_order_relaxed);  }  // Dynamically change the memtable's capacity. If set below the current usage,  // the next key added will trigger a flush. Can only increase size when  // memtable prefix bloom is disabled, since we can't easily allocate more  // space.  void UpdateWriteBufferSize(size_t new_write_buffer_size) {    if (bloom_filter_ == nullptr ||        new_write_buffer_size < write_buffer_size_) {      write_buffer_size_.store(new_write_buffer_size,                               std::memory_order_relaxed);    }  }  // Returns the edits area that is needed for flushing the memtable  VersionEdit* GetEdits() { return &edit_; }  // Returns if there is no entry inserted to the mem table.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable (unless this Memtable is immutable).  bool IsEmpty() const { return first_seqno_ == 0; }  // Returns the sequence number of the first element that was inserted  // into the memtable.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable (unless this Memtable is immutable).  SequenceNumber GetFirstSequenceNumber() {    return first_seqno_.load(std::memory_order_relaxed);  }  // Returns the sequence number that is guaranteed to be smaller than or equal  // to the sequence number of any key that could be inserted into this  // memtable. It can then be assumed that any write with a larger(or equal)  // sequence number will be present in this memtable or a later memtable.  //  // If the earliest sequence number could not be determined,  // kMaxSequenceNumber will be returned.  SequenceNumber GetEarliestSequenceNumber() {    return earliest_seqno_.load(std::memory_order_relaxed);  }  // DB's latest sequence ID when the memtable is created. This number  // may be updated to a more recent one before any key is inserted.  SequenceNumber GetCreationSeq() const { return creation_seq_; }  void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }  // Returns the next active logfile number when this memtable is about to  // be flushed to storage  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }  // Sets the next active logfile number when this memtable is about to  // be flushed to storage  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }  // if this memtable contains data from a committed  // two phase transaction we must take note of the  // log which contains that data so we can know  // when to relese that log  void RefLogContainingPrepSection(uint64_t log);  uint64_t GetMinLogContainingPrepSection();  // Notify the underlying storage that no more items will be added.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  // After MarkImmutable() is called, you should not attempt to  // write anything to this MemTable().  (Ie. do not call Add() or Update()).  void MarkImmutable() {    table_->MarkReadOnly();    mem_tracker_.DoneAllocating();  }  // Notify the underlying storage that all data it contained has been  // persisted.  // REQUIRES: external synchronization to prevent simultaneous  // operations on the same MemTable.  void MarkFlushed() {    table_->MarkFlushed();  }  // return true if the current MemTableRep supports merge operator.  bool IsMergeOperatorSupported() const {    return table_->IsMergeOperatorSupported();  }  // return true if the current MemTableRep supports snapshots.  // inplace update prevents snapshots,  bool IsSnapshotSupported() const {    return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;  }  struct MemTableStats {    uint64_t size;    uint64_t count;  };  MemTableStats ApproximateStats(const Slice& start_ikey,                                 const Slice& end_ikey);  // Get the lock associated for the key  port::RWMutex* GetLock(const Slice& key);  const InternalKeyComparator& GetInternalKeyComparator() const {    return comparator_.comparator;  }  const ImmutableMemTableOptions* GetImmutableMemTableOptions() const {    return &moptions_;  }  uint64_t ApproximateOldestKeyTime() const {    return oldest_key_time_.load(std::memory_order_relaxed);  }  // REQUIRES: db_mutex held.  void SetID(uint64_t id) { id_ = id; }  uint64_t GetID() const { return id_; }  void SetFlushCompleted(bool completed) { flush_completed_ = completed; }  uint64_t GetFileNumber() const { return file_number_; }  void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }  void SetFlushInProgress(bool in_progress) {    flush_in_progress_ = in_progress;  }#ifndef ROCKSDB_LITE  void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {    flush_job_info_ = std::move(info);  }  std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {    return std::move(flush_job_info_);  }#endif  // !ROCKSDB_LITE private:  enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };  friend class MemTableIterator;  friend class MemTableBackwardIterator;  friend class MemTableList;  KeyComparator comparator_;  const ImmutableMemTableOptions moptions_;  int refs_;  const size_t kArenaBlockSize;  AllocTracker mem_tracker_;  ConcurrentArena arena_;  std::unique_ptr<MemTableRep> table_;  std::unique_ptr<MemTableRep> range_del_table_;  std::atomic_bool is_range_del_table_empty_;  // Total data size of all data inserted  std::atomic<uint64_t> data_size_;  std::atomic<uint64_t> num_entries_;  std::atomic<uint64_t> num_deletes_;  // Dynamically changeable memtable option  std::atomic<size_t> write_buffer_size_;  // These are used to manage memtable flushes to storage  bool flush_in_progress_; // started the flush  bool flush_completed_;   // finished the flush  uint64_t file_number_;    // filled up after flush is complete  // The updates to be applied to the transaction log when this  // memtable is flushed to storage.  VersionEdit edit_;  // The sequence number of the kv that was inserted first  std::atomic<SequenceNumber> first_seqno_;  // The db sequence number at the time of creation or kMaxSequenceNumber  // if not set.  std::atomic<SequenceNumber> earliest_seqno_;  SequenceNumber creation_seq_;  // The log files earlier than this number can be deleted.  uint64_t mem_next_logfile_number_;  // the earliest log containing a prepared section  // which has been inserted into this memtable.  std::atomic<uint64_t> min_prep_log_referenced_;  // rw locks for inplace updates  std::vector<port::RWMutex> locks_;  const SliceTransform* const prefix_extractor_;  std::unique_ptr<DynamicBloom> bloom_filter_;  std::atomic<FlushStateEnum> flush_state_;  Env* env_;  // Extract sequential insert prefixes.  const SliceTransform* insert_with_hint_prefix_extractor_;  // Insert hints for each prefix.  std::unordered_map<Slice, void*, SliceHasher> insert_hints_;  // Timestamp of oldest key  std::atomic<uint64_t> oldest_key_time_;  // Memtable id to track flush.  uint64_t id_ = 0;  // Sequence number of the atomic flush that is responsible for this memtable.  // The sequence number of atomic flush is a seq, such that no writes with  // sequence numbers greater than or equal to seq are flushed, while all  // writes with sequence number smaller than seq are flushed.  SequenceNumber atomic_flush_seqno_;  // keep track of memory usage in table_, arena_, and range_del_table_.  // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`  std::atomic<uint64_t> approximate_memory_usage_;#ifndef ROCKSDB_LITE  // Flush job info of the current memtable.  std::unique_ptr<FlushJobInfo> flush_job_info_;#endif  // !ROCKSDB_LITE  // Returns a heuristic flush decision  bool ShouldFlushNow();  // Updates flush_state_ using ShouldFlushNow()  void UpdateFlushState();  void UpdateOldestKeyTime();  void GetFromTable(const LookupKey& key,                    SequenceNumber max_covering_tombstone_seq, bool do_merge,                    ReadCallback* callback, bool* is_blob_index,                    std::string* value, Status* s, MergeContext* merge_context,                    SequenceNumber* seq, bool* found_final_value,                    bool* merge_in_progress);};extern const char* EncodeKey(std::string* scratch, const Slice& target);}  // namespace ROCKSDB_NAMESPACE
 |