flush_job.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <deque>
  12. #include <limits>
  13. #include <list>
  14. #include <set>
  15. #include <string>
  16. #include <utility>
  17. #include <vector>
  18. #include "db/blob/blob_file_completion_callback.h"
  19. #include "db/column_family.h"
  20. #include "db/flush_scheduler.h"
  21. #include "db/internal_stats.h"
  22. #include "db/job_context.h"
  23. #include "db/log_writer.h"
  24. #include "db/logs_with_prep_tracker.h"
  25. #include "db/memtable_list.h"
  26. #include "db/seqno_to_time_mapping.h"
  27. #include "db/snapshot_impl.h"
  28. #include "db/version_edit.h"
  29. #include "db/write_controller.h"
  30. #include "db/write_thread.h"
  31. #include "logging/event_logger.h"
  32. #include "monitoring/instrumented_mutex.h"
  33. #include "options/db_options.h"
  34. #include "port/port.h"
  35. #include "rocksdb/db.h"
  36. #include "rocksdb/env.h"
  37. #include "rocksdb/listener.h"
  38. #include "rocksdb/memtablerep.h"
  39. #include "rocksdb/transaction_log.h"
  40. #include "util/autovector.h"
  41. #include "util/stop_watch.h"
  42. #include "util/thread_local.h"
  43. namespace ROCKSDB_NAMESPACE {
  44. class DBImpl;
  45. class MemTable;
  46. class SnapshotChecker;
  47. class TableCache;
  48. class Version;
  49. class VersionEdit;
  50. class VersionSet;
  51. class Arena;
  52. class FlushJob {
  53. public:
  54. // TODO(icanadi) make effort to reduce number of parameters here
  55. // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
  56. FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
  57. const ImmutableDBOptions& db_options,
  58. const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
  59. const FileOptions& file_options, VersionSet* versions,
  60. InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
  61. JobContext* job_context, FlushReason flush_reason,
  62. LogBuffer* log_buffer, FSDirectory* db_directory,
  63. FSDirectory* output_file_directory,
  64. CompressionType output_compression, Statistics* stats,
  65. EventLogger* event_logger, bool measure_io_stats,
  66. const bool sync_output_directory, const bool write_manifest,
  67. Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
  68. std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
  69. const std::string& db_id = "", const std::string& db_session_id = "",
  70. std::string full_history_ts_low = "",
  71. BlobFileCompletionCallback* blob_callback = nullptr);
  72. ~FlushJob();
  73. // Require db_mutex held.
  74. // Once PickMemTable() is called, either Run() or Cancel() has to be called.
  75. void PickMemTable();
  76. // @param skip_since_bg_error If not nullptr and if atomic_flush=false,
  77. // then it is set to true if flush installation is skipped and memtable
  78. // is rolled back due to existing background error.
  79. Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
  80. FileMetaData* file_meta = nullptr,
  81. bool* switched_to_mempurge = nullptr,
  82. bool* skipped_since_bg_error = nullptr,
  83. ErrorHandler* error_handler = nullptr);
  84. void Cancel();
  85. const autovector<ReadOnlyMemTable*>& GetMemTables() const { return mems_; }
  86. std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
  87. return &committed_flush_jobs_info_;
  88. }
  89. private:
  90. friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;
  91. void ReportStartedFlush();
  92. static void ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems);
  93. void RecordFlushIOStats();
  94. Status WriteLevel0Table();
  95. // Memtable Garbage Collection algorithm: a MemPurge takes the list
  96. // of immutable memtables and filters out (or "purge") the outdated bytes
  97. // out of it. The output (the filtered bytes, or "useful payload") is
  98. // then transfered into a new memtable. If this memtable is filled, then
  99. // the mempurge is aborted and rerouted to a regular flush process. Else,
  100. // depending on the heuristics, placed onto the immutable memtable list.
  101. // The addition to the imm list will not trigger a flush operation. The
  102. // flush of the imm list will instead be triggered once the mutable memtable
  103. // is added to the imm list.
  104. // This process is typically intended for workloads with heavy overwrites
  105. // when we want to avoid SSD writes (and reads) as much as possible.
  106. // "MemPurge" is an experimental feature still at a very early stage
  107. // of development. At the moment it is only compatible with the Get, Put,
  108. // Delete operations as well as Iterators and CompactionFilters.
  109. // For this early version, "MemPurge" is called by setting the
  110. // options.experimental_mempurge_threshold value as >0.0. When this is
  111. // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
  112. // first go through the MemPurge process. Therefore, we strongly
  113. // recommend all users not to set this flag as true given that the MemPurge
  114. // process has not matured yet.
  115. Status MemPurge();
  116. bool MemPurgeDecider(double threshold);
  117. // The rate limiter priority (io_priority) is determined dynamically here.
  118. Env::IOPriority GetRateLimiterPriority();
  119. std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
  120. // Require db_mutex held.
  121. // Called only when UDT feature is enabled and
  122. // `persist_user_defined_timestamps` flag is false. Because we will refrain
  123. // from flushing as long as there are still UDTs in a memtable that hasn't
  124. // expired w.r.t `full_history_ts_low`. However, flush is continued if there
  125. // is risk of entering write stall mode. In that case, we need
  126. // to track the effective cutoff timestamp below which all the udts are
  127. // removed because of flush, and use it to increase `full_history_ts_low` if
  128. // the effective cutoff timestamp is newer. See
  129. // `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
  130. void GetEffectiveCutoffUDTForPickedMemTables();
  131. // If this column family enables tiering feature, it will find the current
  132. // `preclude_last_level_min_seqno_`, and the smaller one between this and
  133. // the `earliset_snapshot_` will later be announced to user property
  134. // collectors. It indicates to tiering use cases which data are old enough to
  135. // be placed on the last level.
  136. void GetPrecludeLastLevelMinSeqno();
  137. Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
  138. const std::string& dbname_;
  139. const std::string db_id_;
  140. const std::string db_session_id_;
  141. ColumnFamilyData* cfd_;
  142. const ImmutableDBOptions& db_options_;
  143. const MutableCFOptions& mutable_cf_options_;
  144. // A variable storing the largest memtable id to flush in this
  145. // flush job. RocksDB uses this variable to select the memtables to flush in
  146. // this job. All memtables in this column family with an ID smaller than or
  147. // equal to max_memtable_id_ will be selected for flush.
  148. uint64_t max_memtable_id_;
  149. FileOptions file_options_;
  150. VersionSet* versions_;
  151. InstrumentedMutex* db_mutex_;
  152. std::atomic<bool>* shutting_down_;
  153. SequenceNumber earliest_snapshot_;
  154. JobContext* job_context_;
  155. FlushReason flush_reason_;
  156. LogBuffer* log_buffer_;
  157. FSDirectory* db_directory_;
  158. FSDirectory* output_file_directory_;
  159. CompressionType output_compression_;
  160. Statistics* stats_;
  161. EventLogger* event_logger_;
  162. TableProperties table_properties_;
  163. bool measure_io_stats_;
  164. // True if this flush job should call fsync on the output directory. False
  165. // otherwise.
  166. // Usually sync_output_directory_ is true. A flush job needs to call sync on
  167. // the output directory before committing to the MANIFEST.
  168. // However, an individual flush job does not have to call sync on the output
  169. // directory if it is part of an atomic flush. After all flush jobs in the
  170. // atomic flush succeed, call sync once on each distinct output directory.
  171. const bool sync_output_directory_;
  172. // True if this flush job should write to MANIFEST after successfully
  173. // flushing memtables. False otherwise.
  174. // Usually write_manifest_ is true. A flush job commits to the MANIFEST after
  175. // flushing the memtables.
  176. // However, an individual flush job cannot rashly write to the MANIFEST
  177. // immediately after it finishes the flush if it is part of an atomic flush.
  178. // In this case, only after all flush jobs succeed in flush can RocksDB
  179. // commit to the MANIFEST.
  180. const bool write_manifest_;
  181. // The current flush job can commit flush result of a concurrent flush job.
  182. // We collect FlushJobInfo of all jobs committed by current job and fire
  183. // OnFlushCompleted for them.
  184. std::list<std::unique_ptr<FlushJobInfo>> committed_flush_jobs_info_;
  185. // Variables below are set by PickMemTable():
  186. FileMetaData meta_;
  187. // Memtables to be flushed by this job.
  188. // Ordered by increasing memtable id, i.e., oldest memtable first.
  189. autovector<ReadOnlyMemTable*> mems_;
  190. VersionEdit* edit_;
  191. Version* base_;
  192. bool pick_memtable_called;
  193. Env::Priority thread_pri_;
  194. const std::shared_ptr<IOTracer> io_tracer_;
  195. SystemClock* clock_;
  196. const std::string full_history_ts_low_;
  197. BlobFileCompletionCallback* blob_callback_;
  198. // Shared copy of DB's seqno to time mapping stored in SuperVersion. The
  199. // ownership is shared with this FlushJob when it's created.
  200. // FlushJob accesses and ref counts immutable MemTables directly via
  201. // `MemTableListVersion` instead of ref `SuperVersion`, so we need to give
  202. // the flush job shared ownership of the mapping.
  203. // Note this is only installed when seqno to time recording feature is
  204. // enables, so it could be nullptr.
  205. std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
  206. // Keeps track of the newest user-defined timestamp for this flush job if
  207. // `persist_user_defined_timestamps` flag is false.
  208. std::string cutoff_udt_;
  209. // The current minimum seqno that compaction jobs will preclude the data from
  210. // the last level. Data with seqnos larger than this or larger than
  211. // `earliest_snapshot_` will be output to the proximal level had it gone
  212. // through a compaction to the last level.
  213. SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
  214. };
  215. } // namespace ROCKSDB_NAMESPACE