blob_db_impl.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <atomic>
  8. #include <condition_variable>
  9. #include <limits>
  10. #include <list>
  11. #include <memory>
  12. #include <set>
  13. #include <string>
  14. #include <thread>
  15. #include <unordered_map>
  16. #include <utility>
  17. #include <vector>
  18. #include "db/db_iter.h"
  19. #include "rocksdb/compaction_filter.h"
  20. #include "rocksdb/db.h"
  21. #include "rocksdb/listener.h"
  22. #include "rocksdb/options.h"
  23. #include "rocksdb/statistics.h"
  24. #include "rocksdb/wal_filter.h"
  25. #include "util/mutexlock.h"
  26. #include "util/timer_queue.h"
  27. #include "utilities/blob_db/blob_db.h"
  28. #include "utilities/blob_db/blob_file.h"
  29. #include "utilities/blob_db/blob_log_format.h"
  30. #include "utilities/blob_db/blob_log_reader.h"
  31. #include "utilities/blob_db/blob_log_writer.h"
  32. namespace ROCKSDB_NAMESPACE {
  33. class DBImpl;
  34. class ColumnFamilyHandle;
  35. class ColumnFamilyData;
  36. struct FlushJobInfo;
  37. namespace blob_db {
  38. struct BlobCompactionContext;
  39. struct BlobCompactionContextGC;
  40. class BlobDBImpl;
  41. class BlobFile;
  42. // Comparator to sort "TTL" aware Blob files based on the lower value of
  43. // TTL range.
  44. struct BlobFileComparatorTTL {
  45. bool operator()(const std::shared_ptr<BlobFile>& lhs,
  46. const std::shared_ptr<BlobFile>& rhs) const;
  47. };
  48. struct BlobFileComparator {
  49. bool operator()(const std::shared_ptr<BlobFile>& lhs,
  50. const std::shared_ptr<BlobFile>& rhs) const;
  51. };
  52. /**
  53. * The implementation class for BlobDB. It manages the blob logs, which
  54. * are sequentially written files. Blob logs can be of the TTL or non-TTL
  55. * varieties; the former are cleaned up when they expire, while the latter
  56. * are (optionally) garbage collected.
  57. */
  58. class BlobDBImpl : public BlobDB {
  59. friend class BlobFile;
  60. friend class BlobDBIterator;
  61. friend class BlobDBListener;
  62. friend class BlobDBListenerGC;
  63. friend class BlobIndexCompactionFilterGC;
  64. public:
  65. // deletions check period
  66. static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
  67. // sanity check task
  68. static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
  69. // how many random access open files can we tolerate
  70. static constexpr uint32_t kOpenFilesTrigger = 100;
  71. // how often to schedule reclaim open files.
  72. static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
  73. // how often to schedule delete obs files periods
  74. static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
  75. // how often to schedule expired files eviction.
  76. static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
  77. // when should oldest file be evicted:
  78. // on reaching 90% of blob_dir_size
  79. static constexpr double kEvictOldestFileAtSize = 0.9;
  80. using BlobDB::Put;
  81. Status Put(const WriteOptions& options, const Slice& key,
  82. const Slice& value) override;
  83. using BlobDB::Get;
  84. Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
  85. const Slice& key, PinnableSlice* value) override;
  86. Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
  87. const Slice& key, PinnableSlice* value,
  88. uint64_t* expiration) override;
  89. using BlobDB::NewIterator;
  90. virtual Iterator* NewIterator(const ReadOptions& read_options) override;
  91. using BlobDB::NewIterators;
  92. virtual Status NewIterators(
  93. const ReadOptions& /*read_options*/,
  94. const std::vector<ColumnFamilyHandle*>& /*column_families*/,
  95. std::vector<Iterator*>* /*iterators*/) override {
  96. return Status::NotSupported("Not implemented");
  97. }
  98. using BlobDB::MultiGet;
  99. virtual std::vector<Status> MultiGet(
  100. const ReadOptions& read_options,
  101. const std::vector<Slice>& keys,
  102. std::vector<std::string>* values) override;
  103. virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  104. virtual Status Close() override;
  105. using BlobDB::PutWithTTL;
  106. Status PutWithTTL(const WriteOptions& options, const Slice& key,
  107. const Slice& value, uint64_t ttl) override;
  108. using BlobDB::PutUntil;
  109. Status PutUntil(const WriteOptions& options, const Slice& key,
  110. const Slice& value, uint64_t expiration) override;
  111. using BlobDB::CompactFiles;
  112. Status CompactFiles(
  113. const CompactionOptions& compact_options,
  114. const std::vector<std::string>& input_file_names, const int output_level,
  115. const int output_path_id = -1,
  116. std::vector<std::string>* const output_file_names = nullptr,
  117. CompactionJobInfo* compaction_job_info = nullptr) override;
  118. BlobDBOptions GetBlobDBOptions() const override;
  119. BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
  120. const DBOptions& db_options,
  121. const ColumnFamilyOptions& cf_options);
  122. virtual Status DisableFileDeletions() override;
  123. virtual Status EnableFileDeletions(bool force) override;
  124. virtual Status GetLiveFiles(std::vector<std::string>&,
  125. uint64_t* manifest_file_size,
  126. bool flush_memtable = true) override;
  127. virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
  128. ~BlobDBImpl();
  129. Status Open(std::vector<ColumnFamilyHandle*>* handles);
  130. Status SyncBlobFiles() override;
  131. // Common part of the two GetCompactionContext methods below.
  132. // REQUIRES: read lock on mutex_
  133. void GetCompactionContextCommon(BlobCompactionContext* context) const;
  134. void GetCompactionContext(BlobCompactionContext* context);
  135. void GetCompactionContext(BlobCompactionContext* context,
  136. BlobCompactionContextGC* context_gc);
  137. #ifndef NDEBUG
  138. Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
  139. PinnableSlice* value);
  140. void TEST_AddDummyBlobFile(uint64_t blob_file_number,
  141. SequenceNumber immutable_sequence);
  142. std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
  143. std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
  144. std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
  145. Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
  146. void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
  147. SequenceNumber obsolete_seq = 0,
  148. bool update_size = true);
  149. void TEST_EvictExpiredFiles();
  150. void TEST_DeleteObsoleteFiles();
  151. uint64_t TEST_live_sst_size();
  152. const std::string& TEST_blob_dir() const { return blob_dir_; }
  153. void TEST_InitializeBlobFileToSstMapping(
  154. const std::vector<LiveFileMetaData>& live_files);
  155. void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
  156. void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
  157. #endif // !NDEBUG
  158. private:
  159. class BlobInserter;
  160. // Create a snapshot if there isn't one in read options.
  161. // Return true if a snapshot is created.
  162. bool SetSnapshotIfNeeded(ReadOptions* read_options);
  163. Status GetImpl(const ReadOptions& read_options,
  164. ColumnFamilyHandle* column_family, const Slice& key,
  165. PinnableSlice* value, uint64_t* expiration = nullptr);
  166. Status GetBlobValue(const Slice& key, const Slice& index_entry,
  167. PinnableSlice* value, uint64_t* expiration = nullptr);
  168. Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
  169. uint64_t offset, uint64_t size,
  170. PinnableSlice* value,
  171. CompressionType* compression_type);
  172. Slice GetCompressedSlice(const Slice& raw,
  173. std::string* compression_output) const;
  174. // Close a file by appending a footer, and removes file from open files list.
  175. // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
  176. // and the blob file's mutex_. If called on a blob file which is visible only
  177. // to a single thread (like in the case of new files written during GC), the
  178. // locks on write_mutex_ and the blob file's mutex_ can be avoided.
  179. Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
  180. // Close a file if its size exceeds blob_file_size
  181. // REQUIRES: lock held on write_mutex_.
  182. Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
  183. // Mark file as obsolete and move the file to obsolete file list.
  184. //
  185. // REQUIRED: hold write lock of mutex_ or during DB open.
  186. void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
  187. SequenceNumber obsolete_seq, bool update_size);
  188. Status PutBlobValue(const WriteOptions& options, const Slice& key,
  189. const Slice& value, uint64_t expiration,
  190. WriteBatch* batch);
  191. Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
  192. const std::string& headerbuf, const Slice& key,
  193. const Slice& value, uint64_t expiration,
  194. std::string* index_entry);
  195. // Create a new blob file and associated writer.
  196. Status CreateBlobFileAndWriter(bool has_ttl,
  197. const ExpirationRange& expiration_range,
  198. const std::string& reason,
  199. std::shared_ptr<BlobFile>* blob_file,
  200. std::shared_ptr<Writer>* writer);
  201. // Get the open non-TTL blob log file, or create a new one if no such file
  202. // exists.
  203. Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
  204. // Get the open TTL blob log file for a certain expiration, or create a new
  205. // one if no such file exists.
  206. Status SelectBlobFileTTL(uint64_t expiration,
  207. std::shared_ptr<BlobFile>* blob_file);
  208. std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
  209. // periodic sanity check. Bunch of checks
  210. std::pair<bool, int64_t> SanityCheck(bool aborted);
  211. // Delete files that have been marked obsolete (either because of TTL
  212. // or GC). Check whether any snapshots exist which refer to the same.
  213. std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
  214. // periodically check if open blob files and their TTL's has expired
  215. // if expired, close the sequential writer and make the file immutable
  216. std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
  217. // if the number of open files, approaches ULIMIT's this
  218. // task will close random readers, which are kept around for
  219. // efficiency
  220. std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
  221. std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
  222. // Adds the background tasks to the timer queue
  223. void StartBackgroundTasks();
  224. // add a new Blob File
  225. std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
  226. const ExpirationRange& expiration_range,
  227. const std::string& reason);
  228. // Register a new blob file.
  229. // REQUIRES: write lock on mutex_.
  230. void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
  231. // collect all the blob log files from the blob directory
  232. Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
  233. // Open all blob files found in blob_dir.
  234. Status OpenAllBlobFiles();
  235. // Link an SST to a blob file. Comes in locking and non-locking varieties
  236. // (the latter is used during Open).
  237. template <typename Linker>
  238. void LinkSstToBlobFileImpl(uint64_t sst_file_number,
  239. uint64_t blob_file_number, Linker linker);
  240. void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
  241. void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
  242. uint64_t blob_file_number);
  243. // Unlink an SST from a blob file.
  244. void UnlinkSstFromBlobFile(uint64_t sst_file_number,
  245. uint64_t blob_file_number);
  246. // Initialize the mapping between blob files and SSTs during Open.
  247. void InitializeBlobFileToSstMapping(
  248. const std::vector<LiveFileMetaData>& live_files);
  249. // Update the mapping between blob files and SSTs after a flush and mark
  250. // any unneeded blob files obsolete.
  251. void ProcessFlushJobInfo(const FlushJobInfo& info);
  252. // Update the mapping between blob files and SSTs after a compaction and
  253. // mark any unneeded blob files obsolete.
  254. void ProcessCompactionJobInfo(const CompactionJobInfo& info);
  255. // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
  256. // linked to it, and all memtables from before the blob file became immutable
  257. // have been flushed. Note: should only be called if the condition holds for
  258. // all lower-numbered non-TTL blob files as well.
  259. bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
  260. SequenceNumber obsolete_seq);
  261. // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
  262. // obsolete. Comes in two varieties; the version used during Open need not
  263. // worry about locking or snapshots.
  264. template <class Functor>
  265. void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
  266. void MarkUnreferencedBlobFilesObsolete();
  267. void MarkUnreferencedBlobFilesObsoleteDuringOpen();
  268. void UpdateLiveSSTSize();
  269. Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
  270. std::shared_ptr<RandomAccessFileReader>* reader);
  271. // hold write mutex on file and call.
  272. // Close the above Random Access reader
  273. void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
  274. // hold write mutex on file and call
  275. // creates a sequential (append) writer for this blobfile
  276. Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
  277. // returns a Writer object for the file. If writer is not
  278. // already present, creates one. Needs Write Mutex to be held
  279. Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
  280. std::shared_ptr<Writer>* writer);
  281. // checks if there is no snapshot which is referencing the
  282. // blobs
  283. bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
  284. bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
  285. void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
  286. uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
  287. // Check if inserting a new blob will make DB grow out of space.
  288. // If is_fifo = true, FIFO eviction will be triggered to make room for the
  289. // new blob. If force_evict = true, FIFO eviction will evict blob files
  290. // even eviction will not make enough room for the new blob.
  291. Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
  292. bool force_evict = false);
  293. // name of the database directory
  294. std::string dbname_;
  295. // the base DB
  296. DBImpl* db_impl_;
  297. Env* env_;
  298. // the options that govern the behavior of Blob Storage
  299. BlobDBOptions bdb_options_;
  300. DBOptions db_options_;
  301. ColumnFamilyOptions cf_options_;
  302. EnvOptions env_options_;
  303. // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
  304. // ownership.
  305. Statistics* statistics_;
  306. // by default this is "blob_dir" under dbname_
  307. // but can be configured
  308. std::string blob_dir_;
  309. // pointer to directory
  310. std::unique_ptr<Directory> dir_ent_;
  311. // Read Write Mutex, which protects all the data structures
  312. // HEAVILY TRAFFICKED
  313. mutable port::RWMutex mutex_;
  314. // Writers has to hold write_mutex_ before writing.
  315. mutable port::Mutex write_mutex_;
  316. // counter for blob file number
  317. std::atomic<uint64_t> next_file_number_;
  318. // entire metadata of all the BLOB files memory
  319. std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
  320. // All live immutable non-TTL blob files.
  321. std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
  322. // The largest sequence number that has been flushed.
  323. SequenceNumber flush_sequence_;
  324. // opened non-TTL blob file.
  325. std::shared_ptr<BlobFile> open_non_ttl_file_;
  326. // all the blob files which are currently being appended to based
  327. // on variety of incoming TTL's
  328. std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
  329. // Flag to check whether Close() has been called on this DB
  330. bool closed_;
  331. // timer based queue to execute tasks
  332. TimerQueue tqueue_;
  333. // number of files opened for random access/GET
  334. // counter is used to monitor and close excess RA files.
  335. std::atomic<uint32_t> open_file_count_;
  336. // Total size of all live blob files (i.e. exclude obsolete files).
  337. std::atomic<uint64_t> total_blob_size_;
  338. // total size of SST files.
  339. std::atomic<uint64_t> live_sst_size_;
  340. // Latest FIFO eviction timestamp
  341. //
  342. // REQUIRES: access with metex_ lock held.
  343. uint64_t fifo_eviction_seq_;
  344. // The expiration up to which latest FIFO eviction evicts.
  345. //
  346. // REQUIRES: access with metex_ lock held.
  347. uint64_t evict_expiration_up_to_;
  348. std::list<std::shared_ptr<BlobFile>> obsolete_files_;
  349. // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
  350. // on the mutex to avoid contention.
  351. //
  352. // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
  353. // the difference. mutex_ only needs to be held when access the
  354. // data-structure, and delete_file_mutex_ needs to be held the whole time
  355. // during DeleteObsoleteFiles to avoid being run simultaneously with
  356. // DisableFileDeletions.
  357. //
  358. // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
  359. // to hold delete_file_mutex_ first to avoid deadlock.
  360. mutable port::Mutex delete_file_mutex_;
  361. // Each call of DisableFileDeletions will increase disable_file_deletion_
  362. // by 1. EnableFileDeletions will either decrease the count by 1 or reset
  363. // it to zeor, depending on the force flag.
  364. //
  365. // REQUIRES: access with delete_file_mutex_ held.
  366. int disable_file_deletions_ = 0;
  367. uint32_t debug_level_;
  368. };
  369. } // namespace blob_db
  370. } // namespace ROCKSDB_NAMESPACE
  371. #endif // ROCKSDB_LITE