compaction_outputs.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. //
  7. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  8. // Use of this source code is governed by a BSD-style license that can be
  9. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  10. #pragma once
  11. #include "db/blob/blob_garbage_meter.h"
  12. #include "db/compaction/compaction.h"
  13. #include "db/compaction/compaction_iterator.h"
  14. #include "db/internal_stats.h"
  15. #include "db/output_validator.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class CompactionOutputs;
  18. using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
  19. using CompactionFileCloseFunc =
  20. std::function<Status(const Status&, const ParsedInternalKey&, const Slice&,
  21. const CompactionIterator*, CompactionOutputs&)>;
  22. // Files produced by subcompaction, most of the functions are used by
  23. // compaction_job Open/Close compaction file functions.
  24. class CompactionOutputs {
  25. public:
  26. // compaction output file
  27. struct Output {
  28. Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
  29. bool _enable_hash, bool _finished, uint64_t precalculated_hash,
  30. bool _is_proximal_level)
  31. : meta(std::move(_meta)),
  32. validator(_icmp, _enable_hash, precalculated_hash),
  33. finished(_finished),
  34. is_proximal_level(_is_proximal_level) {}
  35. FileMetaData meta;
  36. OutputValidator validator;
  37. bool finished;
  38. bool is_proximal_level;
  39. std::shared_ptr<const TableProperties> table_properties;
  40. };
  41. CompactionOutputs() = delete;
  42. explicit CompactionOutputs(const Compaction* compaction,
  43. const bool is_proximal_level);
  44. bool IsProximalLevel() const { return is_proximal_level_; }
  45. // Add generated output to the list
  46. void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
  47. bool enable_hash, bool finished = false,
  48. uint64_t precalculated_hash = 0) {
  49. outputs_.emplace_back(std::move(meta), icmp, enable_hash, finished,
  50. precalculated_hash, is_proximal_level_);
  51. }
  52. const std::vector<Output>& GetOutputs() const { return outputs_; }
  53. // Set new table builder for the current output
  54. void NewBuilder(const TableBuilderOptions& tboptions);
  55. // Assign a new WritableFileWriter to the current output
  56. void AssignFileWriter(WritableFileWriter* writer) {
  57. file_writer_.reset(writer);
  58. }
  59. // TODO: Move the BlobDB builder into CompactionOutputs
  60. const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
  61. if (is_proximal_level_) {
  62. assert(blob_file_additions_.empty());
  63. }
  64. return blob_file_additions_;
  65. }
  66. std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
  67. assert(!is_proximal_level_);
  68. return &blob_file_additions_;
  69. }
  70. bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
  71. BlobGarbageMeter* CreateBlobGarbageMeter() {
  72. assert(!is_proximal_level_);
  73. blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
  74. return blob_garbage_meter_.get();
  75. }
  76. BlobGarbageMeter* GetBlobGarbageMeter() const {
  77. if (is_proximal_level_) {
  78. // blobdb doesn't support per_key_placement yet
  79. assert(blob_garbage_meter_ == nullptr);
  80. return nullptr;
  81. }
  82. return blob_garbage_meter_.get();
  83. }
  84. void UpdateBlobStats() {
  85. assert(!is_proximal_level_);
  86. stats_.num_output_files_blob =
  87. static_cast<int>(blob_file_additions_.size());
  88. for (const auto& blob : blob_file_additions_) {
  89. stats_.bytes_written_blob += blob.GetTotalBlobBytes();
  90. }
  91. }
  92. // Finish the current output file
  93. Status Finish(const Status& intput_status,
  94. const SeqnoToTimeMapping& seqno_to_time_mapping);
  95. // Update output table properties from already populated TableProperties.
  96. // Used for remote compaction
  97. void UpdateTableProperties(const TableProperties& table_properties) {
  98. current_output().table_properties =
  99. std::make_shared<TableProperties>(table_properties);
  100. }
  101. // Update output table properties from table builder
  102. void UpdateTableProperties() {
  103. current_output().table_properties =
  104. std::make_shared<TableProperties>(GetTableProperties());
  105. }
  106. IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
  107. Statistics* statistics, bool use_fsync);
  108. TableProperties GetTableProperties() {
  109. return builder_->GetTableProperties();
  110. }
  111. Slice SmallestUserKey() const {
  112. if (!outputs_.empty() && outputs_[0].finished) {
  113. return outputs_[0].meta.smallest.user_key();
  114. } else {
  115. return Slice{nullptr, 0};
  116. }
  117. }
  118. Slice LargestUserKey() const {
  119. if (!outputs_.empty() && outputs_.back().finished) {
  120. return outputs_.back().meta.largest.user_key();
  121. } else {
  122. return Slice{nullptr, 0};
  123. }
  124. }
  125. // In case the last output file is empty, which doesn't need to keep.
  126. void RemoveLastEmptyOutput() {
  127. if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
  128. // An error occurred, so ignore the last output.
  129. outputs_.pop_back();
  130. }
  131. }
  132. // Remove the last output, for example the last output doesn't have data (no
  133. // entry and no range-dels), but file_size might not be 0, as it has SST
  134. // metadata.
  135. void RemoveLastOutput() {
  136. assert(!outputs_.empty());
  137. outputs_.pop_back();
  138. }
  139. bool HasBuilder() const { return builder_ != nullptr; }
  140. FileMetaData* GetMetaData() { return &current_output().meta; }
  141. bool HasOutput() const { return !outputs_.empty(); }
  142. uint64_t NumEntries() const { return builder_->NumEntries(); }
  143. uint64_t GetWorkerCPUMicros() const {
  144. return worker_cpu_micros_ + (builder_ ? builder_->GetWorkerCPUMicros() : 0);
  145. }
  146. void ResetBuilder() {
  147. builder_.reset();
  148. current_output_file_size_ = 0;
  149. }
  150. // Add range deletions from the range_del_agg_ to the current output file.
  151. // Input parameters, `range_tombstone_lower_bound_` and current output's
  152. // metadata determine the bounds on range deletions to add. Updates output
  153. // file metadata boundary if extended by range tombstones.
  154. //
  155. // @param comp_start_user_key and comp_end_user_key include timestamp if
  156. // user-defined timestamp is enabled. Their timestamp should be max timestamp.
  157. // @param next_table_min_key internal key lower bound for the next compaction
  158. // output.
  159. // @param full_history_ts_low used for range tombstone garbage collection.
  160. Status AddRangeDels(
  161. CompactionRangeDelAggregator& range_del_agg,
  162. const Slice* comp_start_user_key, const Slice* comp_end_user_key,
  163. CompactionIterationStats& range_del_out_stats, bool bottommost_level,
  164. const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
  165. std::pair<SequenceNumber, SequenceNumber> keep_seqno_range,
  166. const Slice& next_table_min_key, const std::string& full_history_ts_low);
  167. void SetNumOutputRecords(uint64_t num_output_records) {
  168. stats_.num_output_records = num_output_records;
  169. }
  170. private:
  171. friend class SubcompactionState;
  172. void FillFilesToCutForTtl();
  173. void SetOutputSlitKey(const std::optional<Slice> start,
  174. const std::optional<Slice> end) {
  175. const InternalKeyComparator* icmp =
  176. &compaction_->column_family_data()->internal_comparator();
  177. const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
  178. // Invalid output_split_key indicates that we do not need to split
  179. if (output_split_key != nullptr) {
  180. // We may only split the output when the cursor is in the range. Split
  181. if ((!end.has_value() ||
  182. icmp->user_comparator()->Compare(
  183. ExtractUserKey(output_split_key->Encode()), *end) < 0) &&
  184. (!start.has_value() ||
  185. icmp->user_comparator()->Compare(
  186. ExtractUserKey(output_split_key->Encode()), *start) > 0)) {
  187. local_output_split_key_ = output_split_key;
  188. }
  189. }
  190. }
  191. // Returns true iff we should stop building the current output
  192. // before processing the current key in compaction iterator.
  193. bool ShouldStopBefore(const CompactionIterator& c_iter);
  194. void Cleanup() {
  195. if (builder_ != nullptr) {
  196. // May happen if we get a shutdown call in the middle of compaction
  197. builder_->Abandon();
  198. builder_.reset();
  199. }
  200. }
  201. // Updates states related to file cutting for TTL.
  202. // Returns a boolean value indicating whether the current
  203. // compaction output file should be cut before `internal_key`.
  204. //
  205. // @param internal_key the current key to be added to output.
  206. bool UpdateFilesToCutForTTLStates(const Slice& internal_key);
  207. // update tracked grandparents information like grandparent index, if it's
  208. // in the gap between 2 grandparent files, accumulated grandparent files size
  209. // etc.
  210. // It returns how many boundaries it crosses by including current key.
  211. size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
  212. // helper function to get the overlapped grandparent files size, it's only
  213. // used for calculating the first key's overlap.
  214. uint64_t GetCurrentKeyGrandparentOverlappedBytes(
  215. const Slice& internal_key) const;
  216. // Add current key from compaction_iterator to the output file. If needed
  217. // close and open new compaction output with the functions provided.
  218. Status AddToOutput(const CompactionIterator& c_iter,
  219. const CompactionFileOpenFunc& open_file_func,
  220. const CompactionFileCloseFunc& close_file_func,
  221. const ParsedInternalKey& prev_table_last_internal_key);
  222. // Close the current output. `open_file_func` is needed for creating new file
  223. // for range-dels only output file.
  224. Status CloseOutput(const Status& curr_status,
  225. CompactionRangeDelAggregator* range_del_agg,
  226. const CompactionFileOpenFunc& open_file_func,
  227. const CompactionFileCloseFunc& close_file_func) {
  228. Status status = curr_status;
  229. // Handle subcompaction containing only range deletions. They could
  230. // be dropped or sent to another output level, so this is only an
  231. // over-approximate check for whether opening is needed.
  232. if (status.ok() && !HasBuilder() && !HasOutput() && range_del_agg &&
  233. !range_del_agg->IsEmpty()) {
  234. status = open_file_func(*this);
  235. }
  236. if (HasBuilder()) {
  237. const ParsedInternalKey empty_internal_key{};
  238. const Slice empty_key{};
  239. Status s = close_file_func(status, empty_internal_key, empty_key,
  240. nullptr /* c_iter */, *this);
  241. if (!s.ok() && status.ok()) {
  242. status = s;
  243. }
  244. }
  245. return status;
  246. }
  247. // This subcompaction's output could be empty if compaction was aborted before
  248. // this subcompaction had a chance to generate any output files. When
  249. // subcompactions are executed sequentially this is more likely and will be
  250. // particularly likely for the later subcompactions to be empty. Once they are
  251. // run in parallel however it should be much rarer.
  252. // It's caller's responsibility to make sure it's not empty.
  253. Output& current_output() {
  254. assert(!outputs_.empty());
  255. return outputs_.back();
  256. }
  257. const Compaction* compaction_;
  258. // current output builder and writer
  259. std::unique_ptr<TableBuilder> builder_;
  260. std::unique_ptr<WritableFileWriter> file_writer_;
  261. uint64_t current_output_file_size_ = 0;
  262. SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber;
  263. // Sum of all the GetWorkerCPUMicros() for all the closed builders so far.
  264. uint64_t worker_cpu_micros_ = 0;
  265. // all the compaction outputs so far
  266. std::vector<Output> outputs_;
  267. // BlobDB info
  268. std::vector<BlobFileAddition> blob_file_additions_;
  269. std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
  270. // Per level's output stat
  271. InternalStats::CompactionStats stats_;
  272. // indicate if this CompactionOutputs obj for proximal_level, should always
  273. // be false if per_key_placement feature is not enabled.
  274. const bool is_proximal_level_;
  275. // partitioner information
  276. std::string last_key_for_partitioner_;
  277. std::unique_ptr<SstPartitioner> partitioner_;
  278. // A flag determines if this subcompaction has been split by the cursor
  279. // for RoundRobin compaction
  280. bool is_split_ = false;
  281. // We also maintain the output split key for each subcompaction to avoid
  282. // repetitive comparison in ShouldStopBefore()
  283. const InternalKey* local_output_split_key_ = nullptr;
  284. // Some identified files with old oldest ancester time and the range should be
  285. // isolated out so that the output file(s) in that range can be merged down
  286. // for TTL and clear the timestamps for the range.
  287. std::vector<FileMetaData*> files_to_cut_for_ttl_;
  288. int cur_files_to_cut_for_ttl_ = -1;
  289. int next_files_to_cut_for_ttl_ = 0;
  290. // An index that used to speed up ShouldStopBefore().
  291. size_t grandparent_index_ = 0;
  292. // if the output key is being grandparent files gap, so:
  293. // key > grandparents[grandparent_index_ - 1].largest &&
  294. // key < grandparents[grandparent_index_].smallest
  295. bool being_grandparent_gap_ = true;
  296. // The number of bytes overlapping between the current output and
  297. // grandparent files used in ShouldStopBefore().
  298. uint64_t grandparent_overlapped_bytes_ = 0;
  299. // A flag determines whether the key has been seen in ShouldStopBefore()
  300. bool seen_key_ = false;
  301. // for the current output file, how many file boundaries has it crossed,
  302. // basically number of files overlapped * 2
  303. size_t grandparent_boundary_switched_num_ = 0;
  304. // The smallest key of the current output file, this is set when current
  305. // output file's smallest key is a range tombstone start key.
  306. InternalKey range_tombstone_lower_bound_;
  307. // Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in
  308. // CompactionOutputs::AddRangeDels().
  309. // level_ptrs_[i] holds index of the file that was checked during the last
  310. // call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows
  311. // future calls to the function to pick up where it left off, since each
  312. // range tombstone added to output file within each subcompaction is in
  313. // increasing key range.
  314. std::vector<size_t> level_ptrs_;
  315. };
  316. // helper struct to concatenate the last level and proximal level outputs
  317. // which could be replaced by std::ranges::join_view() in c++20
  318. struct OutputIterator {
  319. public:
  320. explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
  321. const std::vector<CompactionOutputs::Output>& b)
  322. : a_(a), b_(b) {
  323. within_a = !a_.empty();
  324. idx_ = 0;
  325. }
  326. OutputIterator begin() { return *this; }
  327. OutputIterator end() { return *this; }
  328. size_t size() { return a_.size() + b_.size(); }
  329. const CompactionOutputs::Output& operator*() const {
  330. return within_a ? a_[idx_] : b_[idx_];
  331. }
  332. OutputIterator& operator++() {
  333. idx_++;
  334. if (within_a && idx_ >= a_.size()) {
  335. within_a = false;
  336. idx_ = 0;
  337. }
  338. assert(within_a || idx_ <= b_.size());
  339. return *this;
  340. }
  341. bool operator!=(const OutputIterator& /*rhs*/) const {
  342. return within_a || idx_ < b_.size();
  343. }
  344. private:
  345. const std::vector<CompactionOutputs::Output>& a_;
  346. const std::vector<CompactionOutputs::Output>& b_;
  347. bool within_a;
  348. size_t idx_;
  349. };
  350. } // namespace ROCKSDB_NAMESPACE