subcompaction_state.h 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. //
  7. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  8. // Use of this source code is governed by a BSD-style license that can be
  9. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  10. #pragma once
  11. #include <optional>
  12. #include "db/blob/blob_file_addition.h"
  13. #include "db/blob/blob_garbage_meter.h"
  14. #include "db/compaction/compaction.h"
  15. #include "db/compaction/compaction_iterator.h"
  16. #include "db/compaction/compaction_outputs.h"
  17. #include "db/internal_stats.h"
  18. #include "db/output_validator.h"
  19. #include "db/range_del_aggregator.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. // Maintains state and outputs for each sub-compaction
  22. // It contains 2 `CompactionOutputs`:
  23. // 1. one for the normal output files
  24. // 2. another for the proximal level outputs
  25. // a `current` pointer maintains the current output group, when calling
  26. // `AddToOutput()`, it checks the output of the current compaction_iterator key
  27. // and point `current` to the target output group. By default, it just points to
  28. // normal compaction_outputs, if the compaction_iterator key should be placed on
  29. // the proximal level, `current` is changed to point to
  30. // `proximal_level_outputs`.
  31. // The later operations uses `Current()` to get the target group.
  32. //
  33. // +----------+ +-----------------------------+ +---------+
  34. // | *current |--------> | compaction_outputs |----->| output |
  35. // +----------+ +-----------------------------+ +---------+
  36. // | | output |
  37. // | +---------+
  38. // | | ... |
  39. // |
  40. // | +-----------------------------+ +---------+
  41. // +-------------> | proximal_level_outputs |----->| output |
  42. // +-----------------------------+ +---------+
  43. // | ... |
  44. class SubcompactionState {
  45. public:
  46. const Compaction* compaction;
  47. // The boundaries of the key-range this compaction is interested in. No two
  48. // sub-compactions may have overlapping key-ranges.
  49. // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
  50. const std::optional<Slice> start, end;
  51. // The return status of this sub-compaction
  52. Status status;
  53. // The return IO Status of this sub-compaction
  54. IOStatus io_status;
  55. // Notify on sub-compaction completion only if listener was notified on
  56. // sub-compaction begin.
  57. bool notify_on_subcompaction_completion = false;
  58. // compaction job stats for this sub-compaction
  59. CompactionJobStats compaction_job_stats;
  60. // sub-compaction job id, which is used to identify different sub-compaction
  61. // within the same compaction job.
  62. const uint32_t sub_job_id;
  63. Slice SmallestUserKey() const;
  64. Slice LargestUserKey() const;
  65. // Get all outputs from the subcompaction. For per_key_placement compaction,
  66. // it returns both the last level outputs and proximal level outputs.
  67. OutputIterator GetOutputs() const;
  68. // Assign range dels aggregator. The various tombstones will potentially
  69. // be filtered to different outputs.
  70. void AssignRangeDelAggregator(
  71. std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
  72. assert(range_del_agg_ == nullptr);
  73. assert(range_del_agg);
  74. range_del_agg_ = std::move(range_del_agg);
  75. }
  76. void RemoveLastEmptyOutput() {
  77. compaction_outputs_.RemoveLastEmptyOutput();
  78. proximal_level_outputs_.RemoveLastEmptyOutput();
  79. }
  80. void BuildSubcompactionJobInfo(
  81. SubcompactionJobInfo& subcompaction_job_info) const {
  82. const Compaction* c = compaction;
  83. const ColumnFamilyData* cfd = c->column_family_data();
  84. subcompaction_job_info.cf_id = cfd->GetID();
  85. subcompaction_job_info.cf_name = cfd->GetName();
  86. subcompaction_job_info.status = status;
  87. subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
  88. subcompaction_job_info.base_input_level = c->start_level();
  89. subcompaction_job_info.output_level = c->output_level();
  90. subcompaction_job_info.compaction_reason = c->compaction_reason();
  91. subcompaction_job_info.compression = c->output_compression();
  92. subcompaction_job_info.stats = compaction_job_stats;
  93. subcompaction_job_info.blob_compression_type =
  94. c->mutable_cf_options().blob_compression_type;
  95. }
  96. SubcompactionState() = delete;
  97. SubcompactionState(const SubcompactionState&) = delete;
  98. SubcompactionState& operator=(const SubcompactionState&) = delete;
  99. SubcompactionState(Compaction* c, const std::optional<Slice> _start,
  100. const std::optional<Slice> _end, uint32_t _sub_job_id)
  101. : compaction(c),
  102. start(_start),
  103. end(_end),
  104. sub_job_id(_sub_job_id),
  105. compaction_outputs_(c, /*is_proximal_level=*/false),
  106. proximal_level_outputs_(c, /*is_proximal_level=*/true) {
  107. assert(compaction != nullptr);
  108. // Set output split key (used for RoundRobin feature) only for normal
  109. // compaction_outputs, output to proximal_level feature doesn't support
  110. // RoundRobin feature (and may never going to be supported, because for
  111. // RoundRobin, the data time is mostly naturally sorted, no need to have
  112. // per-key placement with output_to_proximal_level).
  113. compaction_outputs_.SetOutputSlitKey(start, end);
  114. }
  115. SubcompactionState(SubcompactionState&& state) noexcept
  116. : compaction(state.compaction),
  117. start(state.start),
  118. end(state.end),
  119. status(std::move(state.status)),
  120. io_status(std::move(state.io_status)),
  121. notify_on_subcompaction_completion(
  122. state.notify_on_subcompaction_completion),
  123. compaction_job_stats(std::move(state.compaction_job_stats)),
  124. sub_job_id(state.sub_job_id),
  125. compaction_outputs_(std::move(state.compaction_outputs_)),
  126. proximal_level_outputs_(std::move(state.proximal_level_outputs_)),
  127. range_del_agg_(std::move(state.range_del_agg_)) {
  128. current_outputs_ = state.current_outputs_ == &state.proximal_level_outputs_
  129. ? &proximal_level_outputs_
  130. : &compaction_outputs_;
  131. }
  132. // Add all the new files from this compaction to version_edit
  133. void AddOutputsEdit(VersionEdit* out_edit) const {
  134. for (const auto& file : proximal_level_outputs_.outputs_) {
  135. out_edit->AddFile(compaction->GetProximalLevel(), file.meta);
  136. }
  137. for (const auto& file : compaction_outputs_.outputs_) {
  138. out_edit->AddFile(compaction->output_level(), file.meta);
  139. }
  140. }
  141. void Cleanup(Cache* cache);
  142. void AggregateCompactionOutputStats(
  143. InternalStats::CompactionStatsFull& internal_stats) const;
  144. CompactionOutputs& Current() const {
  145. assert(current_outputs_);
  146. return *current_outputs_;
  147. }
  148. CompactionOutputs* Outputs(bool is_proximal_level) {
  149. assert(compaction);
  150. if (is_proximal_level) {
  151. assert(compaction->SupportsPerKeyPlacement());
  152. return &proximal_level_outputs_;
  153. }
  154. return &compaction_outputs_;
  155. }
  156. // Per-level stats for the output
  157. InternalStats::CompactionStats* OutputStats(bool is_proximal_level) {
  158. assert(compaction);
  159. if (is_proximal_level) {
  160. assert(compaction->SupportsPerKeyPlacement());
  161. return &proximal_level_outputs_.stats_;
  162. }
  163. return &compaction_outputs_.stats_;
  164. }
  165. uint64_t GetWorkerCPUMicros() const {
  166. uint64_t rv = compaction_outputs_.GetWorkerCPUMicros();
  167. if (compaction->SupportsPerKeyPlacement()) {
  168. rv += proximal_level_outputs_.GetWorkerCPUMicros();
  169. }
  170. return rv;
  171. }
  172. CompactionRangeDelAggregator* RangeDelAgg() const {
  173. return range_del_agg_.get();
  174. }
  175. // if the outputs have range delete, range delete is also data
  176. bool HasRangeDel() const {
  177. return range_del_agg_ && !range_del_agg_->IsEmpty();
  178. }
  179. void SetSubcompactionProgress(
  180. const SubcompactionProgress& subcompaction_progress) {
  181. subcompaction_progress_ = subcompaction_progress;
  182. }
  183. SubcompactionProgress& GetSubcompactionProgressRef() {
  184. return subcompaction_progress_;
  185. }
  186. // Add compaction_iterator key/value to the `Current` output group.
  187. Status AddToOutput(const CompactionIterator& iter, bool use_proximal_output,
  188. const CompactionFileOpenFunc& open_file_func,
  189. const CompactionFileCloseFunc& close_file_func,
  190. const ParsedInternalKey& prev_table_last_internal_key);
  191. // Close all compaction output files, both output_to_proximal_level outputs
  192. // and normal outputs.
  193. Status CloseCompactionFiles(const Status& curr_status,
  194. const CompactionFileOpenFunc& open_file_func,
  195. const CompactionFileCloseFunc& close_file_func) {
  196. auto per_key = compaction->SupportsPerKeyPlacement();
  197. // Call FinishCompactionOutputFile() even if status is not ok: it needs to
  198. // close the output file.
  199. // CloseOutput() may open new compaction output files.
  200. Status s = curr_status;
  201. if (per_key) {
  202. s = proximal_level_outputs_.CloseOutput(s, range_del_agg_.get(),
  203. open_file_func, close_file_func);
  204. } else {
  205. assert(proximal_level_outputs_.HasBuilder() == false);
  206. assert(proximal_level_outputs_.HasOutput() == false);
  207. }
  208. s = compaction_outputs_.CloseOutput(s, range_del_agg_.get(), open_file_func,
  209. close_file_func);
  210. return s;
  211. }
  212. private:
  213. // State kept for output being generated
  214. CompactionOutputs compaction_outputs_;
  215. CompactionOutputs proximal_level_outputs_;
  216. CompactionOutputs* current_outputs_ = &compaction_outputs_;
  217. std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
  218. SubcompactionProgress subcompaction_progress_;
  219. };
  220. } // namespace ROCKSDB_NAMESPACE