blob_db_impl.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <atomic>
  7. #include <condition_variable>
  8. #include <limits>
  9. #include <list>
  10. #include <memory>
  11. #include <set>
  12. #include <string>
  13. #include <thread>
  14. #include <unordered_map>
  15. #include <utility>
  16. #include <vector>
  17. #include "db/blob/blob_log_format.h"
  18. #include "db/blob/blob_log_writer.h"
  19. #include "db/db_iter.h"
  20. #include "rocksdb/compaction_filter.h"
  21. #include "rocksdb/db.h"
  22. #include "rocksdb/file_system.h"
  23. #include "rocksdb/listener.h"
  24. #include "rocksdb/options.h"
  25. #include "rocksdb/statistics.h"
  26. #include "rocksdb/wal_filter.h"
  27. #include "util/mutexlock.h"
  28. #include "util/timer_queue.h"
  29. #include "utilities/blob_db/blob_db.h"
  30. #include "utilities/blob_db/blob_file.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. class DBImpl;
  33. class ColumnFamilyHandle;
  34. class ColumnFamilyData;
  35. class SystemClock;
  36. struct FlushJobInfo;
  37. namespace blob_db {
  38. struct BlobCompactionContext;
  39. struct BlobCompactionContextGC;
  40. class BlobDBImpl;
  41. class BlobFile;
  42. // Comparator to sort "TTL" aware Blob files based on the lower value of
  43. // TTL range.
  44. struct BlobFileComparatorTTL {
  45. bool operator()(const std::shared_ptr<BlobFile>& lhs,
  46. const std::shared_ptr<BlobFile>& rhs) const;
  47. };
  48. struct BlobFileComparator {
  49. bool operator()(const std::shared_ptr<BlobFile>& lhs,
  50. const std::shared_ptr<BlobFile>& rhs) const;
  51. };
  52. /**
  53. * The implementation class for BlobDB. It manages the blob logs, which
  54. * are sequentially written files. Blob logs can be of the TTL or non-TTL
  55. * varieties; the former are cleaned up when they expire, while the latter
  56. * are (optionally) garbage collected.
  57. */
  58. class BlobDBImpl : public BlobDB {
  59. friend class BlobFile;
  60. friend class BlobDBIterator;
  61. friend class BlobDBListener;
  62. friend class BlobDBListenerGC;
  63. friend class BlobIndexCompactionFilterBase;
  64. friend class BlobIndexCompactionFilterGC;
  65. public:
  66. // deletions check period
  67. static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
  68. // sanity check task
  69. static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
  70. // how many random access open files can we tolerate
  71. static constexpr uint32_t kOpenFilesTrigger = 100;
  72. // how often to schedule reclaim open files.
  73. static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
  74. // how often to schedule delete obs files periods
  75. static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
  76. // how often to schedule expired files eviction.
  77. static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
  78. // when should oldest file be evicted:
  79. // on reaching 90% of blob_dir_size
  80. static constexpr double kEvictOldestFileAtSize = 0.9;
  81. using BlobDB::Put;
  82. Status Put(const WriteOptions& options, const Slice& key,
  83. const Slice& value) override;
  84. using BlobDB::Get;
  85. Status Get(const ReadOptions& _read_options,
  86. ColumnFamilyHandle* column_family, const Slice& key,
  87. PinnableSlice* value, std::string* timestamp) override;
  88. Status Get(const ReadOptions& _read_options,
  89. ColumnFamilyHandle* column_family, const Slice& key,
  90. PinnableSlice* value, uint64_t* expiration) override;
  91. using BlobDB::NewIterator;
  92. Iterator* NewIterator(const ReadOptions& read_options) override;
  93. using BlobDB::NewIterators;
  94. Status NewIterators(
  95. const ReadOptions& /*read_options*/,
  96. const std::vector<ColumnFamilyHandle*>& /*column_families*/,
  97. std::vector<Iterator*>* /*iterators*/) override {
  98. return Status::NotSupported("Not implemented");
  99. }
  100. using BlobDB::MultiGet;
  101. void MultiGet(const ReadOptions& _read_options, size_t num_keys,
  102. ColumnFamilyHandle** column_families, const Slice* keys,
  103. PinnableSlice* values, std::string* timestamps,
  104. Status* statuses, const bool sorted_input) override;
  105. using BlobDB::Write;
  106. Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  107. Status Close() override;
  108. using BlobDB::PutWithTTL;
  109. Status PutWithTTL(const WriteOptions& options, const Slice& key,
  110. const Slice& value, uint64_t ttl) override;
  111. using BlobDB::PutUntil;
  112. Status PutUntil(const WriteOptions& options, const Slice& key,
  113. const Slice& value, uint64_t expiration) override;
  114. using BlobDB::CompactFiles;
  115. Status CompactFiles(
  116. const CompactionOptions& compact_options,
  117. const std::vector<std::string>& input_file_names, const int output_level,
  118. const int output_path_id = -1,
  119. std::vector<std::string>* const output_file_names = nullptr,
  120. CompactionJobInfo* compaction_job_info = nullptr) override;
  121. BlobDBOptions GetBlobDBOptions() const override;
  122. BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
  123. const DBOptions& db_options,
  124. const ColumnFamilyOptions& cf_options);
  125. Status DisableFileDeletions() override;
  126. Status EnableFileDeletions() override;
  127. Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size,
  128. bool flush_memtable = true) override;
  129. void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
  130. Status GetLiveFilesStorageInfo(
  131. const LiveFilesStorageInfoOptions& opts,
  132. std::vector<LiveFileStorageInfo>* files) override;
  133. ~BlobDBImpl();
  134. Status Open(std::vector<ColumnFamilyHandle*>* handles);
  135. Status SyncBlobFiles(const WriteOptions& write_options) override;
  136. // Common part of the two GetCompactionContext methods below.
  137. // REQUIRES: read lock on mutex_
  138. void GetCompactionContextCommon(BlobCompactionContext* context);
  139. void GetCompactionContext(BlobCompactionContext* context);
  140. void GetCompactionContext(BlobCompactionContext* context,
  141. BlobCompactionContextGC* context_gc);
  142. #ifndef NDEBUG
  143. Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
  144. PinnableSlice* value);
  145. void TEST_AddDummyBlobFile(uint64_t blob_file_number,
  146. SequenceNumber immutable_sequence);
  147. std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
  148. std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
  149. std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
  150. Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
  151. void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
  152. SequenceNumber obsolete_seq = 0,
  153. bool update_size = true);
  154. void TEST_EvictExpiredFiles();
  155. void TEST_DeleteObsoleteFiles();
  156. uint64_t TEST_live_sst_size();
  157. const std::string& TEST_blob_dir() const { return blob_dir_; }
  158. void TEST_InitializeBlobFileToSstMapping(
  159. const std::vector<LiveFileMetaData>& live_files);
  160. void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
  161. void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
  162. #endif // !NDEBUG
  163. private:
  164. class BlobInserter;
  165. // Create a snapshot if there isn't one in read options.
  166. // Return true if a snapshot is created.
  167. bool SetSnapshotIfNeeded(ReadOptions* read_options);
  168. Status GetImpl(const ReadOptions& read_options,
  169. ColumnFamilyHandle* column_family, const Slice& key,
  170. PinnableSlice* value, uint64_t* expiration = nullptr);
  171. Status GetBlobValue(const Slice& key, const Slice& index_entry,
  172. PinnableSlice* value, uint64_t* expiration = nullptr);
  173. Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
  174. uint64_t offset, uint64_t size,
  175. PinnableSlice* value,
  176. CompressionType* compression_type);
  177. Slice GetCompressedSlice(const Slice& raw,
  178. std::string* compression_output) const;
  179. Status DecompressSlice(const Slice& compressed_value,
  180. CompressionType compression_type,
  181. PinnableSlice* value_output) const;
  182. // Close a file by appending a footer, and removes file from open files list.
  183. // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
  184. // and the blob file's mutex_. If called on a blob file which is visible only
  185. // to a single thread (like in the case of new files written during
  186. // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be
  187. // avoided.
  188. Status CloseBlobFile(const WriteOptions& write_options,
  189. std::shared_ptr<BlobFile> bfile);
  190. // Close a file if its size exceeds blob_file_size
  191. // REQUIRES: lock held on write_mutex_.
  192. Status CloseBlobFileIfNeeded(const WriteOptions& write_options,
  193. std::shared_ptr<BlobFile>& bfile);
  194. // Mark file as obsolete and move the file to obsolete file list.
  195. //
  196. // REQUIRED: hold write lock of mutex_ or during DB open.
  197. void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
  198. SequenceNumber obsolete_seq, bool update_size);
  199. Status PutBlobValue(const WriteOptions& options, const Slice& key,
  200. const Slice& value, uint64_t expiration,
  201. WriteBatch* batch);
  202. Status AppendBlob(const WriteOptions& write_options,
  203. const std::shared_ptr<BlobFile>& bfile,
  204. const std::string& headerbuf, const Slice& key,
  205. const Slice& value, uint64_t expiration,
  206. std::string* index_entry);
  207. // Create a new blob file and associated writer.
  208. Status CreateBlobFileAndWriter(const WriteOptions& write_options,
  209. bool has_ttl,
  210. const ExpirationRange& expiration_range,
  211. const std::string& reason,
  212. std::shared_ptr<BlobFile>* blob_file,
  213. std::shared_ptr<BlobLogWriter>* writer);
  214. // Get the open non-TTL blob log file, or create a new one if no such file
  215. // exists.
  216. Status SelectBlobFile(const WriteOptions& write_options,
  217. std::shared_ptr<BlobFile>* blob_file);
  218. // Get the open TTL blob log file for a certain expiration, or create a new
  219. // one if no such file exists.
  220. Status SelectBlobFileTTL(const WriteOptions& write_options,
  221. uint64_t expiration,
  222. std::shared_ptr<BlobFile>* blob_file);
  223. std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
  224. // periodic sanity check. Bunch of checks
  225. std::pair<bool, int64_t> SanityCheck(bool aborted);
  226. // Delete files that have been marked obsolete (either because of TTL
  227. // or GC). Check whether any snapshots exist which refer to the same.
  228. std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
  229. // periodically check if open blob files and their TTL's has expired
  230. // if expired, close the sequential writer and make the file immutable
  231. std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
  232. // if the number of open files, approaches ULIMIT's this
  233. // task will close random readers, which are kept around for
  234. // efficiency
  235. std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
  236. std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
  237. // Adds the background tasks to the timer queue
  238. void StartBackgroundTasks();
  239. // add a new Blob File
  240. std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
  241. const ExpirationRange& expiration_range,
  242. const std::string& reason);
  243. // Register a new blob file.
  244. // REQUIRES: write lock on mutex_.
  245. void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
  246. // collect all the blob log files from the blob directory
  247. Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
  248. // Open all blob files found in blob_dir.
  249. Status OpenAllBlobFiles();
  250. // Link an SST to a blob file. Comes in locking and non-locking varieties
  251. // (the latter is used during Open).
  252. template <typename Linker>
  253. void LinkSstToBlobFileImpl(uint64_t sst_file_number,
  254. uint64_t blob_file_number, Linker linker);
  255. void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
  256. void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
  257. uint64_t blob_file_number);
  258. // Unlink an SST from a blob file.
  259. void UnlinkSstFromBlobFile(uint64_t sst_file_number,
  260. uint64_t blob_file_number);
  261. // Initialize the mapping between blob files and SSTs during Open.
  262. void InitializeBlobFileToSstMapping(
  263. const std::vector<LiveFileMetaData>& live_files);
  264. // Update the mapping between blob files and SSTs after a flush and mark
  265. // any unneeded blob files obsolete.
  266. void ProcessFlushJobInfo(const FlushJobInfo& info);
  267. // Update the mapping between blob files and SSTs after a compaction and
  268. // mark any unneeded blob files obsolete.
  269. void ProcessCompactionJobInfo(const CompactionJobInfo& info);
  270. // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
  271. // linked to it, and all memtables from before the blob file became immutable
  272. // have been flushed. Note: should only be called if the condition holds for
  273. // all lower-numbered non-TTL blob files as well.
  274. bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
  275. SequenceNumber obsolete_seq);
  276. // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
  277. // obsolete. Comes in two varieties; the version used during Open need not
  278. // worry about locking or snapshots.
  279. template <class Functor>
  280. void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
  281. void MarkUnreferencedBlobFilesObsolete();
  282. void MarkUnreferencedBlobFilesObsoleteDuringOpen();
  283. void UpdateLiveSSTSize(const WriteOptions& write_options);
  284. Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
  285. std::shared_ptr<RandomAccessFileReader>* reader);
  286. // hold write mutex on file and call.
  287. // Close the above Random Access reader
  288. void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
  289. // hold write mutex on file and call
  290. // creates a sequential (append) writer for this blobfile
  291. Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
  292. // returns a BlobLogWriter object for the file. If writer is not
  293. // already present, creates one. Needs Write Mutex to be held
  294. Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
  295. std::shared_ptr<BlobLogWriter>* writer);
  296. // checks if there is no snapshot which is referencing the
  297. // blobs
  298. bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
  299. bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
  300. void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
  301. uint64_t EpochNow() { return clock_->NowMicros() / 1000000; }
  302. // Check if inserting a new blob will make DB grow out of space.
  303. // If is_fifo = true, FIFO eviction will be triggered to make room for the
  304. // new blob. If force_evict = true, FIFO eviction will evict blob files
  305. // even eviction will not make enough room for the new blob.
  306. Status CheckSizeAndEvictBlobFiles(const WriteOptions& write_options,
  307. uint64_t blob_size,
  308. bool force_evict = false);
  309. Status CloseImpl();
  310. // name of the database directory
  311. std::string dbname_;
  312. // the base DB
  313. DBImpl* db_impl_;
  314. Env* env_;
  315. SystemClock* clock_;
  316. // the options that govern the behavior of Blob Storage
  317. BlobDBOptions bdb_options_;
  318. DBOptions db_options_;
  319. ColumnFamilyOptions cf_options_;
  320. FileOptions file_options_;
  321. // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
  322. // ownership.
  323. Statistics* statistics_;
  324. // by default this is "blob_dir" under dbname_
  325. // but can be configured
  326. std::string blob_dir_;
  327. // pointer to directory
  328. std::unique_ptr<FSDirectory> dir_ent_;
  329. // Read Write Mutex, which protects all the data structures
  330. // HEAVILY TRAFFICKED
  331. mutable port::RWMutex mutex_;
  332. // Writers has to hold write_mutex_ before writing.
  333. mutable port::Mutex write_mutex_;
  334. // counter for blob file number
  335. std::atomic<uint64_t> next_file_number_;
  336. // entire metadata of all the BLOB files memory
  337. std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
  338. // All live immutable non-TTL blob files.
  339. std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
  340. // The largest sequence number that has been flushed.
  341. SequenceNumber flush_sequence_;
  342. // opened non-TTL blob file.
  343. std::shared_ptr<BlobFile> open_non_ttl_file_;
  344. // all the blob files which are currently being appended to based
  345. // on variety of incoming TTL's
  346. std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
  347. // Flag to check whether Close() has been called on this DB
  348. bool closed_;
  349. // timer based queue to execute tasks
  350. TimerQueue tqueue_;
  351. // number of files opened for random access/GET
  352. // counter is used to monitor and close excess RA files.
  353. std::atomic<uint32_t> open_file_count_;
  354. // Total size of all live blob files (i.e. exclude obsolete files).
  355. std::atomic<uint64_t> total_blob_size_;
  356. // total size of SST files.
  357. std::atomic<uint64_t> live_sst_size_;
  358. // Latest FIFO eviction timestamp
  359. //
  360. // REQUIRES: access with metex_ lock held.
  361. uint64_t fifo_eviction_seq_;
  362. // The expiration up to which latest FIFO eviction evicts.
  363. //
  364. // REQUIRES: access with metex_ lock held.
  365. uint64_t evict_expiration_up_to_;
  366. std::list<std::shared_ptr<BlobFile>> obsolete_files_;
  367. // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
  368. // on the mutex to avoid contention.
  369. //
  370. // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
  371. // the difference. mutex_ only needs to be held when access the
  372. // data-structure, and delete_file_mutex_ needs to be held the whole time
  373. // during DeleteObsoleteFiles to avoid being run simultaneously with
  374. // DisableFileDeletions.
  375. //
  376. // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
  377. // to hold delete_file_mutex_ first to avoid deadlock.
  378. mutable port::Mutex delete_file_mutex_;
  379. // Each call of DisableFileDeletions will increase disable_file_deletion_
  380. // by 1. EnableFileDeletions will either decrease the count by 1 or reset
  381. // it to zero, depending on the force flag.
  382. //
  383. // REQUIRES: access with delete_file_mutex_ held.
  384. int disable_file_deletions_ = 0;
  385. uint32_t debug_level_;
  386. };
  387. Decompressor& BlobDecompressor();
  388. } // namespace blob_db
  389. } // namespace ROCKSDB_NAMESPACE