| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- #pragma once
- #include <deque>
- #include <limits>
- #include <list>
- #include <set>
- #include <string>
- #include <vector>
- #include "db/logs_with_prep_tracker.h"
- #include "db/memtable.h"
- #include "db/range_del_aggregator.h"
- #include "file/filename.h"
- #include "logging/log_buffer.h"
- #include "monitoring/instrumented_mutex.h"
- #include "rocksdb/db.h"
- #include "rocksdb/iterator.h"
- #include "rocksdb/options.h"
- #include "rocksdb/types.h"
- #include "util/autovector.h"
- namespace ROCKSDB_NAMESPACE {
- class ColumnFamilyData;
- class InternalKeyComparator;
- class InstrumentedMutex;
- class MergeIteratorBuilder;
- class MemTableList;
- struct FlushJobInfo;
- // keeps a list of immutable memtables (ReadOnlyMemtable*) in a vector.
- // The list is immutable if refcount is bigger than one. It is used as
- // a state for Get() and iterator code paths.
- //
- // This class is not thread-safe. External synchronization is required
- // (such as holding the db mutex or being on the write thread).
- class MemTableListVersion {
- public:
- explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
- const MemTableListVersion& old);
- explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
- int64_t max_write_buffer_size_to_maintain);
- void Ref();
- void Unref(autovector<ReadOnlyMemTable*>* to_delete = nullptr);
- // Search all the memtables starting from the most recent one.
- // Return the most recent value found, if any.
- //
- // If any operation was found for this key, its most recent sequence number
- // will be stored in *seq on success (regardless of whether true/false is
- // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
- bool Get(const LookupKey& key, std::string* value,
- PinnableWideColumns* columns, std::string* timestamp, Status* s,
- MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
- const ReadOptions& read_opts, ReadCallback* callback = nullptr,
- bool* is_blob_index = nullptr);
- bool Get(const LookupKey& key, std::string* value,
- PinnableWideColumns* columns, std::string* timestamp, Status* s,
- MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq,
- const ReadOptions& read_opts, ReadCallback* callback = nullptr,
- bool* is_blob_index = nullptr) {
- SequenceNumber seq;
- return Get(key, value, columns, timestamp, s, merge_context,
- max_covering_tombstone_seq, &seq, read_opts, callback,
- is_blob_index);
- }
- void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
- ReadCallback* callback);
- // Returns all the merge operands corresponding to the key by searching all
- // memtables starting from the most recent one.
- bool GetMergeOperands(const LookupKey& key, Status* s,
- MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq,
- const ReadOptions& read_opts);
- // Similar to Get(), but searches the Memtable history of memtables that
- // have already been flushed. Should only be used from in-memory only
- // queries (such as Transaction validation) as the history may contain
- // writes that are also present in the SST files.
- bool GetFromHistory(const LookupKey& key, std::string* value,
- PinnableWideColumns* columns, std::string* timestamp,
- Status* s, MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq,
- SequenceNumber* seq, const ReadOptions& read_opts,
- bool* is_blob_index = nullptr);
- bool GetFromHistory(const LookupKey& key, std::string* value,
- PinnableWideColumns* columns, std::string* timestamp,
- Status* s, MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq,
- const ReadOptions& read_opts,
- bool* is_blob_index = nullptr) {
- SequenceNumber seq;
- return GetFromHistory(key, value, columns, timestamp, s, merge_context,
- max_covering_tombstone_seq, &seq, read_opts,
- is_blob_index);
- }
- Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
- RangeDelAggregator* range_del_agg);
- void AddIterators(const ReadOptions& options,
- UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
- const SliceTransform* prefix_extractor,
- std::vector<InternalIterator*>* iterator_list,
- Arena* arena);
- void AddIterators(const ReadOptions& options,
- UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
- const SliceTransform* prefix_extractor,
- MergeIteratorBuilder* merge_iter_builder,
- bool add_range_tombstone_iter);
- uint64_t GetTotalNumEntries() const;
- uint64_t GetTotalNumDeletes() const;
- ReadOnlyMemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
- const Slice& end_ikey) const;
- // Returns the value of MemTable::GetEarliestSequenceNumber() on the most
- // recent MemTable in this list or kMaxSequenceNumber if the list is empty.
- // If include_history=true, will also search Memtables in MemTableList
- // History.
- SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
- // Return the first sequence number from the memtable list, which is the
- // smallest sequence number of all FirstSequenceNumber.
- // Return kMaxSequenceNumber if the list is empty.
- SequenceNumber GetFirstSequenceNumber() const;
- // REQUIRES: db_mutex held.
- void SetID(uint64_t id) { id_ = id; }
- uint64_t GetID() const { return id_; }
- int NumNotFlushed() const { return static_cast<int>(memlist_.size()); }
- int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }
- // Gets the newest user defined timestamps from the immutable memtables.
- // This returns the newest user defined timestamp found in the most recent
- // immutable memtable. This should only be called when user defined timestamp
- // is enabled.
- const Slice& GetNewestUDT() const;
- private:
- friend class MemTableList;
- friend Status InstallMemtableAtomicFlushResults(
- const autovector<MemTableList*>* imm_lists,
- const autovector<ColumnFamilyData*>& cfds,
- const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
- VersionSet* vset, LogsWithPrepTracker* prep_tracker,
- InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
- const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
- committed_flush_jobs_info,
- autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
- LogBuffer* log_buffer);
- // REQUIRE: m is an immutable memtable
- void Add(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
- // REQUIRE: m is an immutable memtable
- void Remove(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
- // Return true if the memtable list should be trimmed to get memory usage
- // under budget.
- bool HistoryShouldBeTrimmed(size_t usage);
- // Trim history, Return true if memtable is trimmed
- bool TrimHistory(autovector<ReadOnlyMemTable*>* to_delete, size_t usage);
- bool GetFromList(std::list<ReadOnlyMemTable*>* list, const LookupKey& key,
- std::string* value, PinnableWideColumns* columns,
- std::string* timestamp, Status* s,
- MergeContext* merge_context,
- SequenceNumber* max_covering_tombstone_seq,
- SequenceNumber* seq, const ReadOptions& read_opts,
- ReadCallback* callback = nullptr,
- bool* is_blob_index = nullptr);
- void AddMemTable(ReadOnlyMemTable* m);
- void UnrefMemTable(autovector<ReadOnlyMemTable*>* to_delete,
- ReadOnlyMemTable* m);
- // Calculate the total amount of memory used by memlist_ and memlist_history_
- // excluding the last MemTable in memlist_history_. The reason for excluding
- // the last MemTable is to see if dropping the last MemTable will keep total
- // memory usage above or equal to max_write_buffer_size_to_maintain_
- size_t MemoryAllocatedBytesExcludingLast() const;
- // Whether this version contains flushed memtables that are only kept around
- // for transaction conflict checking.
- bool HasHistory() const { return !memlist_history_.empty(); }
- bool MemtableLimitExceeded(size_t usage);
- // Immutable MemTables that have not yet been flushed.
- std::list<ReadOnlyMemTable*> memlist_;
- // MemTables that have already been flushed
- // (used during Transaction validation)
- std::list<ReadOnlyMemTable*> memlist_history_;
- // Maximum size of MemTables to keep in memory (including both flushed
- // and not-yet-flushed tables).
- const int64_t max_write_buffer_size_to_maintain_;
- int refs_ = 0;
- size_t* parent_memtable_list_memory_usage_;
- // MemtableListVersion id to track for flush results checking.
- uint64_t id_ = 0;
- };
- // This class stores references to all the immutable memtables.
- // The memtables are flushed to L0 as soon as possible and in
- // any order. If there are more than one immutable memtable, their
- // flushes can occur concurrently. However, they are 'committed'
- // to the manifest in FIFO order to maintain correctness and
- // recoverability from a crash.
- //
- //
- // Other than imm_flush_needed and imm_trim_needed, this class is not
- // thread-safe and requires external synchronization (such as holding the db
- // mutex or being on the write thread.)
- class MemTableList {
- public:
- // A list of memtables.
- explicit MemTableList(int min_write_buffer_number_to_merge,
- int64_t max_write_buffer_size_to_maintain)
- : imm_flush_needed(false),
- imm_trim_needed(false),
- min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
- current_(new MemTableListVersion(¤t_memory_usage_,
- max_write_buffer_size_to_maintain)),
- num_flush_not_started_(0),
- commit_in_progress_(false),
- flush_requested_(false),
- current_memory_usage_(0),
- current_memory_allocted_bytes_excluding_last_(0),
- current_has_history_(false),
- last_memtable_list_version_id_(0) {
- current_->Ref();
- }
- // Should not delete MemTableList without making sure MemTableList::current()
- // is Unref()'d.
- ~MemTableList() {}
- MemTableListVersion* current() const { return current_; }
- // so that background threads can detect non-nullptr pointer to
- // determine whether there is anything more to start flushing.
- std::atomic<bool> imm_flush_needed;
- std::atomic<bool> imm_trim_needed;
- // Returns the total number of memtables in the list that haven't yet
- // been flushed and logged.
- int NumNotFlushed() const;
- // Returns total number of memtables in the list that have been
- // completely flushed and logged.
- int NumFlushed() const;
- // Returns true if there is at least one memtable on which flush has
- // not yet started.
- bool IsFlushPending() const;
- // Returns true if there is at least one memtable that is pending flush or
- // flushing.
- bool IsFlushPendingOrRunning() const;
- // Returns the earliest memtables that needs to be flushed. The returned
- // memtables are guaranteed to be in the ascending order of created time.
- void PickMemtablesToFlush(uint64_t max_memtable_id,
- autovector<ReadOnlyMemTable*>* mems,
- uint64_t* max_next_log_number = nullptr);
- // Reset status of the given memtable list back to pending state so that
- // they can get picked up again on the next round of flush.
- //
- // @param rollback_succeeding_memtables If true, will rollback adjacent
- // younger memtables whose flush is completed. Specifically, suppose the
- // current immutable memtables are M_0,M_1...M_N ordered from youngest to
- // oldest. Suppose that the youngest memtable in `mems` is M_K. We will try to
- // rollback M_K-1, M_K-2... until the first memtable whose flush is
- // not completed. These are the memtables that would have been installed
- // by this flush job if it were to succeed. This flag is currently used
- // by non atomic_flush rollback.
- // Note that we also do rollback in `write_manifest_cb` by calling
- // `RemoveMemTablesOrRestoreFlags()`. There we rollback the entire batch so
- // it is similar to what we do here with rollback_succeeding_memtables=true.
- void RollbackMemtableFlush(const autovector<ReadOnlyMemTable*>& mems,
- bool rollback_succeeding_memtables);
- // Try commit a successful flush in the manifest file. It might just return
- // Status::OK letting a concurrent flush to do the actual the recording.
- Status TryInstallMemtableFlushResults(
- ColumnFamilyData* cfd, const autovector<ReadOnlyMemTable*>& m,
- LogsWithPrepTracker* prep_tracker, VersionSet* vset,
- InstrumentedMutex* mu, uint64_t file_number,
- autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
- LogBuffer* log_buffer,
- std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
- bool write_edits = true);
- // New memtables are inserted at the front of the list.
- // Takes ownership of the referenced held on *m by the caller of Add().
- // By default, adding memtables will flag that the memtable list needs to be
- // flushed, but in certain situations, like after a mempurge, we may want to
- // avoid flushing the memtable list upon addition of a memtable.
- void Add(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
- // Returns an estimate of the number of bytes of data in use.
- size_t ApproximateMemoryUsage();
- // Returns the cached current_memory_allocted_bytes_excluding_last_ value.
- size_t MemoryAllocatedBytesExcludingLast() const;
- // Returns the cached current_has_history_ value.
- bool HasHistory() const;
- // Updates current_memory_allocted_bytes_excluding_last_ and
- // current_has_history_ from MemTableListVersion. Must be called whenever
- // InstallNewVersion is called.
- void UpdateCachedValuesFromMemTableListVersion();
- // `usage` is the current size of the mutable Memtable. When
- // max_write_buffer_size_to_maintain is used, total size of mutable and
- // immutable memtables is checked against it to decide whether to trim
- // memtable list.
- //
- // Return true if memtable is trimmed
- bool TrimHistory(autovector<ReadOnlyMemTable*>* to_delete, size_t usage);
- // Returns an estimate of the number of bytes of data used by
- // the unflushed mem-tables.
- size_t ApproximateUnflushedMemTablesMemoryUsage();
- // Returns an estimate of the timestamp of the earliest key.
- uint64_t ApproximateOldestKeyTime() const;
- // Request a flush of all existing memtables to storage. This will
- // cause future calls to IsFlushPending() to return true if this list is
- // non-empty (regardless of the min_write_buffer_number_to_merge
- // parameter). This flush request will persist until the next time
- // PickMemtablesToFlush() is called.
- void FlushRequested() {
- flush_requested_ = true;
- // If there are some memtables stored in imm() that don't trigger
- // flush (eg: mempurge output memtable), then update imm_flush_needed.
- // Note: if race condition and imm_flush_needed is set to true
- // when there is num_flush_not_started_==0, then there is no
- // impact whatsoever. Imm_flush_needed is only used in an assert
- // in IsFlushPending().
- if (num_flush_not_started_ > 0) {
- imm_flush_needed.store(true, std::memory_order_release);
- }
- }
- bool HasFlushRequested() { return flush_requested_; }
- // Returns true if a trim history should be scheduled and the caller should
- // be the one to schedule it
- bool MarkTrimHistoryNeeded() {
- auto expected = false;
- return imm_trim_needed.compare_exchange_strong(
- expected, true, std::memory_order_relaxed, std::memory_order_relaxed);
- }
- void ResetTrimHistoryNeeded() {
- auto expected = true;
- imm_trim_needed.compare_exchange_strong(
- expected, false, std::memory_order_relaxed, std::memory_order_relaxed);
- }
- // Copying allowed
- // MemTableList(const MemTableList&);
- // void operator=(const MemTableList&);
- size_t* current_memory_usage() { return ¤t_memory_usage_; }
- // Returns the WAL number of the oldest WAL that contains a prepared
- // transaction that corresponds to the content in this MemTableList,
- // after memtables listed in `memtables_to_flush` are flushed and their
- // status is persisted in manifest.
- uint64_t PrecomputeMinLogContainingPrepSection(
- const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush =
- nullptr) const;
- uint64_t GetEarliestMemTableID() const {
- auto& memlist = current_->memlist_;
- if (memlist.empty()) {
- return std::numeric_limits<uint64_t>::max();
- }
- return memlist.back()->GetID();
- }
- uint64_t GetLatestMemTableID(bool for_atomic_flush) const {
- auto& memlist = current_->memlist_;
- if (memlist.empty()) {
- return 0;
- }
- if (for_atomic_flush) {
- // Scan the memtable list from new to old
- for (auto it = memlist.begin(); it != memlist.end(); ++it) {
- ReadOnlyMemTable* m = *it;
- if (m->atomic_flush_seqno_ != kMaxSequenceNumber) {
- return m->GetID();
- }
- }
- return 0;
- }
- return memlist.front()->GetID();
- }
- // DB mutex held.
- // Gets the newest user-defined timestamp for the Memtables in ascending ID
- // order, up to the `max_memtable_id`. Used by background flush job
- // to check Memtables' eligibility for flush w.r.t retaining UDTs.
- std::vector<Slice> GetTablesNewestUDT(uint64_t max_memtable_id) {
- std::vector<Slice> newest_udts;
- auto& memlist = current_->memlist_;
- // Iterating through the memlist starting at the end, the vector<MemTable*>
- // ret is filled with memtables already sorted in increasing MemTable ID.
- for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
- ReadOnlyMemTable* m = *it;
- if (m->GetID() > max_memtable_id) {
- break;
- }
- newest_udts.push_back(m->GetNewestUDT());
- }
- return newest_udts;
- }
- void AssignAtomicFlushSeq(const SequenceNumber& seq) {
- const auto& memlist = current_->memlist_;
- // Scan the memtable list from new to old
- for (auto it = memlist.begin(); it != memlist.end(); ++it) {
- ReadOnlyMemTable* mem = *it;
- if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
- mem->atomic_flush_seqno_ = seq;
- } else {
- // Earlier memtables must have been assigned a atomic flush seq, no
- // need to continue scan.
- break;
- }
- }
- }
- // Used only by DBImplSecondary during log replay.
- // Remove memtables whose data were written before the WAL with log_number
- // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
- // not freed, but put into a vector for future deref and reclamation.
- void RemoveOldMemTables(uint64_t log_number,
- autovector<ReadOnlyMemTable*>* to_delete);
- // This API is only used by atomic date replacement. To get an edit for
- // dropping the current `MemTableListVersion`.
- VersionEdit GetEditForDroppingCurrentVersion(
- const ColumnFamilyData* cfd, VersionSet* vset,
- LogsWithPrepTracker* prep_tracker) const;
- private:
- friend Status InstallMemtableAtomicFlushResults(
- const autovector<MemTableList*>* imm_lists,
- const autovector<ColumnFamilyData*>& cfds,
- const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
- VersionSet* vset, LogsWithPrepTracker* prep_tracker,
- InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
- const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
- committed_flush_jobs_info,
- autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
- LogBuffer* log_buffer);
- // DB mutex held
- void InstallNewVersion();
- // DB mutex held
- // Called after writing to MANIFEST
- void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd,
- size_t batch_count, LogBuffer* log_buffer,
- autovector<ReadOnlyMemTable*>* to_delete,
- InstrumentedMutex* mu);
- const int min_write_buffer_number_to_merge_;
- MemTableListVersion* current_;
- // the number of elements that still need flushing
- int num_flush_not_started_;
- // committing in progress
- bool commit_in_progress_;
- // Requested a flush of memtables to storage. It's possible to request that
- // a subset of memtables be flushed.
- bool flush_requested_;
- // The current memory usage.
- size_t current_memory_usage_;
- // Cached value of current_->MemoryAllocatedBytesExcludingLast().
- std::atomic<size_t> current_memory_allocted_bytes_excluding_last_;
- // Cached value of current_->HasHistory().
- std::atomic<bool> current_has_history_;
- // Last memtabe list version id, increase by 1 each time a new
- // MemtableListVersion is installed.
- uint64_t last_memtable_list_version_id_;
- };
- // Installs memtable atomic flush results.
- // In most cases, imm_lists is nullptr, and the function simply uses the
- // immutable memtable lists associated with the cfds. There are unit tests that
- // installs flush results for external immutable memtable lists other than the
- // cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case,
- // imm_lists parameter is not nullptr.
- Status InstallMemtableAtomicFlushResults(
- const autovector<MemTableList*>* imm_lists,
- const autovector<ColumnFamilyData*>& cfds,
- const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
- VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
- const autovector<FileMetaData*>& file_meta,
- const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
- committed_flush_jobs_info,
- autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
- LogBuffer* log_buffer);
- } // namespace ROCKSDB_NAMESPACE
|