compaction_job.h 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <deque>
  12. #include <functional>
  13. #include <limits>
  14. #include <set>
  15. #include <string>
  16. #include <utility>
  17. #include <vector>
  18. #include "db/column_family.h"
  19. #include "db/compaction/compaction_iterator.h"
  20. #include "db/dbformat.h"
  21. #include "db/flush_scheduler.h"
  22. #include "db/internal_stats.h"
  23. #include "db/job_context.h"
  24. #include "db/log_writer.h"
  25. #include "db/memtable_list.h"
  26. #include "db/range_del_aggregator.h"
  27. #include "db/version_edit.h"
  28. #include "db/write_controller.h"
  29. #include "db/write_thread.h"
  30. #include "logging/event_logger.h"
  31. #include "options/cf_options.h"
  32. #include "options/db_options.h"
  33. #include "port/port.h"
  34. #include "rocksdb/compaction_filter.h"
  35. #include "rocksdb/compaction_job_stats.h"
  36. #include "rocksdb/db.h"
  37. #include "rocksdb/env.h"
  38. #include "rocksdb/memtablerep.h"
  39. #include "rocksdb/transaction_log.h"
  40. #include "table/scoped_arena_iterator.h"
  41. #include "util/autovector.h"
  42. #include "util/stop_watch.h"
  43. #include "util/thread_local.h"
  44. namespace ROCKSDB_NAMESPACE {
  45. class Arena;
  46. class ErrorHandler;
  47. class MemTable;
  48. class SnapshotChecker;
  49. class TableCache;
  50. class Version;
  51. class VersionEdit;
  52. class VersionSet;
  53. // CompactionJob is responsible for executing the compaction. Each (manual or
  54. // automated) compaction corresponds to a CompactionJob object, and usually
  55. // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
  56. // will divide the compaction into subcompactions and execute them in parallel
  57. // if needed.
  58. class CompactionJob {
  59. public:
  60. CompactionJob(int job_id, Compaction* compaction,
  61. const ImmutableDBOptions& db_options,
  62. const FileOptions& file_options, VersionSet* versions,
  63. const std::atomic<bool>* shutting_down,
  64. const SequenceNumber preserve_deletes_seqnum,
  65. LogBuffer* log_buffer, Directory* db_directory,
  66. Directory* output_directory, Statistics* stats,
  67. InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
  68. std::vector<SequenceNumber> existing_snapshots,
  69. SequenceNumber earliest_write_conflict_snapshot,
  70. const SnapshotChecker* snapshot_checker,
  71. std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
  72. bool paranoid_file_checks, bool measure_io_stats,
  73. const std::string& dbname,
  74. CompactionJobStats* compaction_job_stats,
  75. Env::Priority thread_pri,
  76. const std::atomic<bool>* manual_compaction_paused = nullptr);
  77. ~CompactionJob();
  78. // no copy/move
  79. CompactionJob(CompactionJob&& job) = delete;
  80. CompactionJob(const CompactionJob& job) = delete;
  81. CompactionJob& operator=(const CompactionJob& job) = delete;
  82. // REQUIRED: mutex held
  83. // Prepare for the compaction by setting up boundaries for each subcompaction
  84. void Prepare();
  85. // REQUIRED mutex not held
  86. // Launch threads for each subcompaction and wait for them to finish. After
  87. // that, verify table is usable and finally do bookkeeping to unify
  88. // subcompaction results
  89. Status Run();
  90. // REQUIRED: mutex held
  91. // Add compaction input/output to the current version
  92. Status Install(const MutableCFOptions& mutable_cf_options);
  93. private:
  94. struct SubcompactionState;
  95. void AggregateStatistics();
  96. // Generates a histogram representing potential divisions of key ranges from
  97. // the input. It adds the starting and/or ending keys of certain input files
  98. // to the working set and then finds the approximate size of data in between
  99. // each consecutive pair of slices. Then it divides these ranges into
  100. // consecutive groups such that each group has a similar size.
  101. void GenSubcompactionBoundaries();
  102. // update the thread status for starting a compaction.
  103. void ReportStartedCompaction(Compaction* compaction);
  104. void AllocateCompactionOutputFileNumbers();
  105. // Call compaction filter. Then iterate through input and compact the
  106. // kv-pairs
  107. void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
  108. Status FinishCompactionOutputFile(
  109. const Status& input_status, SubcompactionState* sub_compact,
  110. CompactionRangeDelAggregator* range_del_agg,
  111. CompactionIterationStats* range_del_out_stats,
  112. const Slice* next_table_min_key = nullptr);
  113. Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
  114. void RecordCompactionIOStats();
  115. Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
  116. void CleanupCompaction();
  117. void UpdateCompactionJobStats(
  118. const InternalStats::CompactionStats& stats) const;
  119. void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
  120. CompactionJobStats* compaction_job_stats = nullptr);
  121. void UpdateCompactionStats();
  122. void UpdateCompactionInputStatsHelper(
  123. int* num_files, uint64_t* bytes_read, int input_level);
  124. void LogCompaction();
  125. int job_id_;
  126. // CompactionJob state
  127. struct CompactionState;
  128. CompactionState* compact_;
  129. CompactionJobStats* compaction_job_stats_;
  130. InternalStats::CompactionStats compaction_stats_;
  131. // DBImpl state
  132. const std::string& dbname_;
  133. const ImmutableDBOptions& db_options_;
  134. const FileOptions file_options_;
  135. Env* env_;
  136. FileSystem* fs_;
  137. // env_option optimized for compaction table reads
  138. FileOptions file_options_for_read_;
  139. VersionSet* versions_;
  140. const std::atomic<bool>* shutting_down_;
  141. const std::atomic<bool>* manual_compaction_paused_;
  142. const SequenceNumber preserve_deletes_seqnum_;
  143. LogBuffer* log_buffer_;
  144. Directory* db_directory_;
  145. Directory* output_directory_;
  146. Statistics* stats_;
  147. InstrumentedMutex* db_mutex_;
  148. ErrorHandler* db_error_handler_;
  149. // If there were two snapshots with seq numbers s1 and
  150. // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  151. // entirely within s1 and s2, then the earlier version of k1 can be safely
  152. // deleted because that version is not visible in any snapshot.
  153. std::vector<SequenceNumber> existing_snapshots_;
  154. // This is the earliest snapshot that could be used for write-conflict
  155. // checking by a transaction. For any user-key newer than this snapshot, we
  156. // should make sure not to remove evidence that a write occurred.
  157. SequenceNumber earliest_write_conflict_snapshot_;
  158. const SnapshotChecker* const snapshot_checker_;
  159. std::shared_ptr<Cache> table_cache_;
  160. EventLogger* event_logger_;
  161. // Is this compaction creating a file in the bottom most level?
  162. bool bottommost_level_;
  163. bool paranoid_file_checks_;
  164. bool measure_io_stats_;
  165. // Stores the Slices that designate the boundaries for each subcompaction
  166. std::vector<Slice> boundaries_;
  167. // Stores the approx size of keys covered in the range of each subcompaction
  168. std::vector<uint64_t> sizes_;
  169. Env::WriteLifeTimeHint write_hint_;
  170. Env::Priority thread_pri_;
  171. };
  172. } // namespace ROCKSDB_NAMESPACE