| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #ifndef ROCKSDB_LITE
- #include <atomic>
- #include <condition_variable>
- #include <limits>
- #include <list>
- #include <memory>
- #include <set>
- #include <string>
- #include <thread>
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #include "db/db_iter.h"
- #include "rocksdb/compaction_filter.h"
- #include "rocksdb/db.h"
- #include "rocksdb/listener.h"
- #include "rocksdb/options.h"
- #include "rocksdb/statistics.h"
- #include "rocksdb/wal_filter.h"
- #include "util/mutexlock.h"
- #include "util/timer_queue.h"
- #include "utilities/blob_db/blob_db.h"
- #include "utilities/blob_db/blob_file.h"
- #include "utilities/blob_db/blob_log_format.h"
- #include "utilities/blob_db/blob_log_reader.h"
- #include "utilities/blob_db/blob_log_writer.h"
- namespace ROCKSDB_NAMESPACE {
- class DBImpl;
- class ColumnFamilyHandle;
- class ColumnFamilyData;
- struct FlushJobInfo;
- namespace blob_db {
- struct BlobCompactionContext;
- struct BlobCompactionContextGC;
- class BlobDBImpl;
- class BlobFile;
- // Comparator to sort "TTL" aware Blob files based on the lower value of
- // TTL range.
- struct BlobFileComparatorTTL {
- bool operator()(const std::shared_ptr<BlobFile>& lhs,
- const std::shared_ptr<BlobFile>& rhs) const;
- };
- struct BlobFileComparator {
- bool operator()(const std::shared_ptr<BlobFile>& lhs,
- const std::shared_ptr<BlobFile>& rhs) const;
- };
- /**
- * The implementation class for BlobDB. It manages the blob logs, which
- * are sequentially written files. Blob logs can be of the TTL or non-TTL
- * varieties; the former are cleaned up when they expire, while the latter
- * are (optionally) garbage collected.
- */
- class BlobDBImpl : public BlobDB {
- friend class BlobFile;
- friend class BlobDBIterator;
- friend class BlobDBListener;
- friend class BlobDBListenerGC;
- friend class BlobIndexCompactionFilterGC;
- public:
- // deletions check period
- static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
- // sanity check task
- static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
- // how many random access open files can we tolerate
- static constexpr uint32_t kOpenFilesTrigger = 100;
- // how often to schedule reclaim open files.
- static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
- // how often to schedule delete obs files periods
- static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
- // how often to schedule expired files eviction.
- static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
- // when should oldest file be evicted:
- // on reaching 90% of blob_dir_size
- static constexpr double kEvictOldestFileAtSize = 0.9;
- using BlobDB::Put;
- Status Put(const WriteOptions& options, const Slice& key,
- const Slice& value) override;
- using BlobDB::Get;
- Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
- const Slice& key, PinnableSlice* value) override;
- Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
- const Slice& key, PinnableSlice* value,
- uint64_t* expiration) override;
- using BlobDB::NewIterator;
- virtual Iterator* NewIterator(const ReadOptions& read_options) override;
- using BlobDB::NewIterators;
- virtual Status NewIterators(
- const ReadOptions& /*read_options*/,
- const std::vector<ColumnFamilyHandle*>& /*column_families*/,
- std::vector<Iterator*>* /*iterators*/) override {
- return Status::NotSupported("Not implemented");
- }
- using BlobDB::MultiGet;
- virtual std::vector<Status> MultiGet(
- const ReadOptions& read_options,
- const std::vector<Slice>& keys,
- std::vector<std::string>* values) override;
- virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
- virtual Status Close() override;
- using BlobDB::PutWithTTL;
- Status PutWithTTL(const WriteOptions& options, const Slice& key,
- const Slice& value, uint64_t ttl) override;
- using BlobDB::PutUntil;
- Status PutUntil(const WriteOptions& options, const Slice& key,
- const Slice& value, uint64_t expiration) override;
- using BlobDB::CompactFiles;
- Status CompactFiles(
- const CompactionOptions& compact_options,
- const std::vector<std::string>& input_file_names, const int output_level,
- const int output_path_id = -1,
- std::vector<std::string>* const output_file_names = nullptr,
- CompactionJobInfo* compaction_job_info = nullptr) override;
- BlobDBOptions GetBlobDBOptions() const override;
- BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
- const DBOptions& db_options,
- const ColumnFamilyOptions& cf_options);
- virtual Status DisableFileDeletions() override;
- virtual Status EnableFileDeletions(bool force) override;
- virtual Status GetLiveFiles(std::vector<std::string>&,
- uint64_t* manifest_file_size,
- bool flush_memtable = true) override;
- virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
- ~BlobDBImpl();
- Status Open(std::vector<ColumnFamilyHandle*>* handles);
- Status SyncBlobFiles() override;
- // Common part of the two GetCompactionContext methods below.
- // REQUIRES: read lock on mutex_
- void GetCompactionContextCommon(BlobCompactionContext* context) const;
- void GetCompactionContext(BlobCompactionContext* context);
- void GetCompactionContext(BlobCompactionContext* context,
- BlobCompactionContextGC* context_gc);
- #ifndef NDEBUG
- Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
- PinnableSlice* value);
- void TEST_AddDummyBlobFile(uint64_t blob_file_number,
- SequenceNumber immutable_sequence);
- std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
- std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
- std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
- Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
- void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
- SequenceNumber obsolete_seq = 0,
- bool update_size = true);
- void TEST_EvictExpiredFiles();
- void TEST_DeleteObsoleteFiles();
- uint64_t TEST_live_sst_size();
- const std::string& TEST_blob_dir() const { return blob_dir_; }
- void TEST_InitializeBlobFileToSstMapping(
- const std::vector<LiveFileMetaData>& live_files);
- void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
- void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
- #endif // !NDEBUG
- private:
- class BlobInserter;
- // Create a snapshot if there isn't one in read options.
- // Return true if a snapshot is created.
- bool SetSnapshotIfNeeded(ReadOptions* read_options);
- Status GetImpl(const ReadOptions& read_options,
- ColumnFamilyHandle* column_family, const Slice& key,
- PinnableSlice* value, uint64_t* expiration = nullptr);
- Status GetBlobValue(const Slice& key, const Slice& index_entry,
- PinnableSlice* value, uint64_t* expiration = nullptr);
- Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
- uint64_t offset, uint64_t size,
- PinnableSlice* value,
- CompressionType* compression_type);
- Slice GetCompressedSlice(const Slice& raw,
- std::string* compression_output) const;
- // Close a file by appending a footer, and removes file from open files list.
- // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
- // and the blob file's mutex_. If called on a blob file which is visible only
- // to a single thread (like in the case of new files written during GC), the
- // locks on write_mutex_ and the blob file's mutex_ can be avoided.
- Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
- // Close a file if its size exceeds blob_file_size
- // REQUIRES: lock held on write_mutex_.
- Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
- // Mark file as obsolete and move the file to obsolete file list.
- //
- // REQUIRED: hold write lock of mutex_ or during DB open.
- void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
- SequenceNumber obsolete_seq, bool update_size);
- Status PutBlobValue(const WriteOptions& options, const Slice& key,
- const Slice& value, uint64_t expiration,
- WriteBatch* batch);
- Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
- const std::string& headerbuf, const Slice& key,
- const Slice& value, uint64_t expiration,
- std::string* index_entry);
- // Create a new blob file and associated writer.
- Status CreateBlobFileAndWriter(bool has_ttl,
- const ExpirationRange& expiration_range,
- const std::string& reason,
- std::shared_ptr<BlobFile>* blob_file,
- std::shared_ptr<Writer>* writer);
- // Get the open non-TTL blob log file, or create a new one if no such file
- // exists.
- Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
- // Get the open TTL blob log file for a certain expiration, or create a new
- // one if no such file exists.
- Status SelectBlobFileTTL(uint64_t expiration,
- std::shared_ptr<BlobFile>* blob_file);
- std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
- // periodic sanity check. Bunch of checks
- std::pair<bool, int64_t> SanityCheck(bool aborted);
- // Delete files that have been marked obsolete (either because of TTL
- // or GC). Check whether any snapshots exist which refer to the same.
- std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
- // periodically check if open blob files and their TTL's has expired
- // if expired, close the sequential writer and make the file immutable
- std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
- // if the number of open files, approaches ULIMIT's this
- // task will close random readers, which are kept around for
- // efficiency
- std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
- std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
- // Adds the background tasks to the timer queue
- void StartBackgroundTasks();
- // add a new Blob File
- std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
- const ExpirationRange& expiration_range,
- const std::string& reason);
- // Register a new blob file.
- // REQUIRES: write lock on mutex_.
- void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
- // collect all the blob log files from the blob directory
- Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
- // Open all blob files found in blob_dir.
- Status OpenAllBlobFiles();
- // Link an SST to a blob file. Comes in locking and non-locking varieties
- // (the latter is used during Open).
- template <typename Linker>
- void LinkSstToBlobFileImpl(uint64_t sst_file_number,
- uint64_t blob_file_number, Linker linker);
- void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
- void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
- uint64_t blob_file_number);
- // Unlink an SST from a blob file.
- void UnlinkSstFromBlobFile(uint64_t sst_file_number,
- uint64_t blob_file_number);
- // Initialize the mapping between blob files and SSTs during Open.
- void InitializeBlobFileToSstMapping(
- const std::vector<LiveFileMetaData>& live_files);
- // Update the mapping between blob files and SSTs after a flush and mark
- // any unneeded blob files obsolete.
- void ProcessFlushJobInfo(const FlushJobInfo& info);
- // Update the mapping between blob files and SSTs after a compaction and
- // mark any unneeded blob files obsolete.
- void ProcessCompactionJobInfo(const CompactionJobInfo& info);
- // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
- // linked to it, and all memtables from before the blob file became immutable
- // have been flushed. Note: should only be called if the condition holds for
- // all lower-numbered non-TTL blob files as well.
- bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
- SequenceNumber obsolete_seq);
- // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
- // obsolete. Comes in two varieties; the version used during Open need not
- // worry about locking or snapshots.
- template <class Functor>
- void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
- void MarkUnreferencedBlobFilesObsolete();
- void MarkUnreferencedBlobFilesObsoleteDuringOpen();
- void UpdateLiveSSTSize();
- Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
- std::shared_ptr<RandomAccessFileReader>* reader);
- // hold write mutex on file and call.
- // Close the above Random Access reader
- void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
- // hold write mutex on file and call
- // creates a sequential (append) writer for this blobfile
- Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
- // returns a Writer object for the file. If writer is not
- // already present, creates one. Needs Write Mutex to be held
- Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
- std::shared_ptr<Writer>* writer);
- // checks if there is no snapshot which is referencing the
- // blobs
- bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
- bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
- void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
- uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
- // Check if inserting a new blob will make DB grow out of space.
- // If is_fifo = true, FIFO eviction will be triggered to make room for the
- // new blob. If force_evict = true, FIFO eviction will evict blob files
- // even eviction will not make enough room for the new blob.
- Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
- bool force_evict = false);
- // name of the database directory
- std::string dbname_;
- // the base DB
- DBImpl* db_impl_;
- Env* env_;
- // the options that govern the behavior of Blob Storage
- BlobDBOptions bdb_options_;
- DBOptions db_options_;
- ColumnFamilyOptions cf_options_;
- EnvOptions env_options_;
- // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
- // ownership.
- Statistics* statistics_;
- // by default this is "blob_dir" under dbname_
- // but can be configured
- std::string blob_dir_;
- // pointer to directory
- std::unique_ptr<Directory> dir_ent_;
- // Read Write Mutex, which protects all the data structures
- // HEAVILY TRAFFICKED
- mutable port::RWMutex mutex_;
- // Writers has to hold write_mutex_ before writing.
- mutable port::Mutex write_mutex_;
- // counter for blob file number
- std::atomic<uint64_t> next_file_number_;
- // entire metadata of all the BLOB files memory
- std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
- // All live immutable non-TTL blob files.
- std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
- // The largest sequence number that has been flushed.
- SequenceNumber flush_sequence_;
- // opened non-TTL blob file.
- std::shared_ptr<BlobFile> open_non_ttl_file_;
- // all the blob files which are currently being appended to based
- // on variety of incoming TTL's
- std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
- // Flag to check whether Close() has been called on this DB
- bool closed_;
- // timer based queue to execute tasks
- TimerQueue tqueue_;
- // number of files opened for random access/GET
- // counter is used to monitor and close excess RA files.
- std::atomic<uint32_t> open_file_count_;
- // Total size of all live blob files (i.e. exclude obsolete files).
- std::atomic<uint64_t> total_blob_size_;
- // total size of SST files.
- std::atomic<uint64_t> live_sst_size_;
- // Latest FIFO eviction timestamp
- //
- // REQUIRES: access with metex_ lock held.
- uint64_t fifo_eviction_seq_;
- // The expiration up to which latest FIFO eviction evicts.
- //
- // REQUIRES: access with metex_ lock held.
- uint64_t evict_expiration_up_to_;
- std::list<std::shared_ptr<BlobFile>> obsolete_files_;
- // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
- // on the mutex to avoid contention.
- //
- // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
- // the difference. mutex_ only needs to be held when access the
- // data-structure, and delete_file_mutex_ needs to be held the whole time
- // during DeleteObsoleteFiles to avoid being run simultaneously with
- // DisableFileDeletions.
- //
- // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
- // to hold delete_file_mutex_ first to avoid deadlock.
- mutable port::Mutex delete_file_mutex_;
- // Each call of DisableFileDeletions will increase disable_file_deletion_
- // by 1. EnableFileDeletions will either decrease the count by 1 or reset
- // it to zeor, depending on the force flag.
- //
- // REQUIRES: access with delete_file_mutex_ held.
- int disable_file_deletions_ = 0;
- uint32_t debug_level_;
- };
- } // namespace blob_db
- } // namespace ROCKSDB_NAMESPACE
- #endif // ROCKSDB_LITE
|