compaction_picker_universal.cc 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/compaction/compaction_picker_universal.h"
  10. #include <cstdint>
  11. #include <limits>
  12. #include <queue>
  13. #include <string>
  14. #include <utility>
  15. #include "db/column_family.h"
  16. #include "file/filename.h"
  17. #include "logging/log_buffer.h"
  18. #include "logging/logging.h"
  19. #include "monitoring/statistics_impl.h"
  20. #include "test_util/sync_point.h"
  21. #include "util/random.h"
  22. #include "util/string_util.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. namespace {
  25. // A helper class that form universal compactions. The class is used by
  26. // UniversalCompactionPicker::PickCompaction().
  27. // The usage is to create the class, and get the compaction object by calling
  28. // PickCompaction().
  29. class UniversalCompactionBuilder {
  30. public:
  31. UniversalCompactionBuilder(
  32. const ImmutableOptions& ioptions, const InternalKeyComparator* icmp,
  33. const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
  34. const MutableDBOptions& mutable_db_options,
  35. const std::vector<SequenceNumber>& existing_snapshots,
  36. const SnapshotChecker* snapshot_checker, VersionStorageInfo* vstorage,
  37. UniversalCompactionPicker* picker, LogBuffer* log_buffer,
  38. bool require_max_output_level)
  39. : ioptions_(ioptions),
  40. icmp_(icmp),
  41. cf_name_(cf_name),
  42. mutable_cf_options_(mutable_cf_options),
  43. mutable_db_options_(mutable_db_options),
  44. vstorage_(vstorage),
  45. picker_(picker),
  46. log_buffer_(log_buffer),
  47. require_max_output_level_(require_max_output_level),
  48. allow_ingest_behind_(ioptions.cf_allow_ingest_behind ||
  49. ioptions.allow_ingest_behind) {
  50. assert(icmp_);
  51. const auto* ucmp = icmp_->user_comparator();
  52. assert(ucmp);
  53. // These parameters are only passed when user-defined timestamp is not
  54. // enabled.
  55. if (ucmp->timestamp_size() == 0) {
  56. earliest_snapshot_ = existing_snapshots.empty()
  57. ? kMaxSequenceNumber
  58. : existing_snapshots.at(0);
  59. snapshot_checker_ = snapshot_checker;
  60. }
  61. }
  62. // Form and return the compaction object. The caller owns return object.
  63. Compaction* PickCompaction();
  64. private:
  65. struct SortedRun {
  66. SortedRun(int _level, FileMetaData* _file, uint64_t _size,
  67. uint64_t _compensated_file_size, bool _being_compacted,
  68. bool _level_has_marked_standalone_rangedel)
  69. : level(_level),
  70. file(_file),
  71. size(_size),
  72. compensated_file_size(_compensated_file_size),
  73. being_compacted(_being_compacted),
  74. level_has_marked_standalone_rangedel(
  75. _level_has_marked_standalone_rangedel) {
  76. assert(compensated_file_size > 0);
  77. assert(level != 0 || file != nullptr);
  78. }
  79. void Dump(char* out_buf, size_t out_buf_size,
  80. bool print_path = false) const;
  81. // sorted_run_count is added into the string to print
  82. void DumpSizeInfo(char* out_buf, size_t out_buf_size,
  83. size_t sorted_run_count) const;
  84. int level;
  85. // `file` Will be null for level > 0. For level = 0, the sorted run is
  86. // for this file.
  87. FileMetaData* file;
  88. // For level > 0, `size` and `compensated_file_size` are sum of sizes all
  89. // files in the level. `being_compacted` should be the same for all files
  90. // in a non-zero level. Use the value here.
  91. uint64_t size;
  92. uint64_t compensated_file_size;
  93. bool being_compacted;
  94. // True if this level has any file that is a standalone range deletion file
  95. // marked for compaction. Best effort is made to make only deletion
  96. // triggered compaction pick this type of file.
  97. bool level_has_marked_standalone_rangedel;
  98. };
  99. unsigned int GetMaxNumFilesToCompactBasedOnMaxReadAmp(
  100. const int file_num_compaction_trigger, const unsigned int ratio,
  101. int* num_sr_not_compacted_output, int* max_num_runs_output) const {
  102. assert(num_sr_not_compacted_output);
  103. assert(max_num_runs_output);
  104. int max_num_runs =
  105. mutable_cf_options_.compaction_options_universal.max_read_amp;
  106. if (max_num_runs < 0) {
  107. // any value < -1 is not valid
  108. assert(max_num_runs == -1);
  109. // By default, fall back to `level0_file_num_compaction_trigger`
  110. max_num_runs = file_num_compaction_trigger;
  111. } else if (max_num_runs == 0) {
  112. if (mutable_cf_options_.compaction_options_universal.stop_style ==
  113. kCompactionStopStyleTotalSize) {
  114. // 0 means auto-tuning by RocksDB. We estimate max num run based on
  115. // max_run_size, size_ratio and write buffer size:
  116. // Assume the size of the lowest level size is equal to
  117. // write_buffer_size. Each subsequent level is the max size without
  118. // triggering size_ratio compaction. `max_num_runs` is the minimum
  119. // number of levels required such that the target size of the
  120. // largest level is at least `max_run_size_`.
  121. max_num_runs = 1;
  122. double cur_level_max_size =
  123. static_cast<double>(mutable_cf_options_.write_buffer_size);
  124. double total_run_size = 0;
  125. while (cur_level_max_size < static_cast<double>(max_run_size_)) {
  126. // This loop should not take too many iterations since
  127. // cur_level_max_size at least doubles each iteration.
  128. total_run_size += cur_level_max_size;
  129. cur_level_max_size = (100.0 + ratio) / 100.0 * total_run_size;
  130. ++max_num_runs;
  131. }
  132. } else {
  133. // TODO: implement the auto-tune logic for this stop style
  134. max_num_runs = file_num_compaction_trigger;
  135. }
  136. } else {
  137. // max_num_runs > 0, it's the limit on the number of sorted run
  138. }
  139. // Get the total number of sorted runs that are not being compacted
  140. int num_sr_not_compacted = 0;
  141. for (size_t i = 0; i < sorted_runs_.size(); i++) {
  142. if (sorted_runs_[i].being_compacted == false &&
  143. !sorted_runs_[i].level_has_marked_standalone_rangedel) {
  144. num_sr_not_compacted++;
  145. }
  146. }
  147. *num_sr_not_compacted_output = num_sr_not_compacted;
  148. *max_num_runs_output = max_num_runs;
  149. if (num_sr_not_compacted > max_num_runs) {
  150. return num_sr_not_compacted - max_num_runs + 1;
  151. } else {
  152. return 0;
  153. }
  154. }
  155. Compaction* MaybePickPeriodicCompaction(Compaction* const prev_picked_c) {
  156. if (prev_picked_c != nullptr ||
  157. vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
  158. return prev_picked_c;
  159. }
  160. // Always need to do a full compaction for periodic compaction.
  161. Compaction* c = PickPeriodicCompaction();
  162. TEST_SYNC_POINT_CALLBACK("PostPickPeriodicCompaction", c);
  163. if (c != nullptr) {
  164. ROCKS_LOG_BUFFER(log_buffer_,
  165. "[%s] Universal: picked for periodic compaction\n",
  166. cf_name_.c_str());
  167. }
  168. return c;
  169. }
  170. Compaction* MaybePickSizeAmpCompaction(Compaction* const prev_picked_c,
  171. int file_num_compaction_trigger) {
  172. if (prev_picked_c != nullptr ||
  173. sorted_runs_.size() <
  174. static_cast<size_t>(file_num_compaction_trigger)) {
  175. return prev_picked_c;
  176. }
  177. Compaction* c = PickCompactionToReduceSizeAmp();
  178. if (c != nullptr) {
  179. TEST_SYNC_POINT("PickCompactionToReduceSizeAmpReturnNonnullptr");
  180. ROCKS_LOG_BUFFER(log_buffer_,
  181. "[%s] Universal: picked for size amp compaction \n",
  182. cf_name_.c_str());
  183. }
  184. return c;
  185. }
  186. Compaction* MaybePickCompactionToReduceSortedRunsBasedFileRatio(
  187. Compaction* const prev_picked_c, int file_num_compaction_trigger,
  188. unsigned int ratio) {
  189. if (prev_picked_c != nullptr ||
  190. sorted_runs_.size() <
  191. static_cast<size_t>(file_num_compaction_trigger)) {
  192. return prev_picked_c;
  193. }
  194. Compaction* c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX);
  195. if (c != nullptr) {
  196. TEST_SYNC_POINT("PickCompactionToReduceSortedRunsReturnNonnullptr");
  197. ROCKS_LOG_BUFFER(log_buffer_,
  198. "[%s] Universal: picked for size ratio compaction to "
  199. "reduce sorted run\n",
  200. cf_name_.c_str());
  201. }
  202. return c;
  203. }
  204. Compaction* MaybePickCompactionToReduceSortedRuns(
  205. Compaction* const prev_picked_c, int file_num_compaction_trigger,
  206. unsigned int ratio) {
  207. if (prev_picked_c != nullptr ||
  208. sorted_runs_.size() <
  209. static_cast<size_t>(file_num_compaction_trigger)) {
  210. return prev_picked_c;
  211. }
  212. int num_sr_not_compacted = 0;
  213. int max_num_runs = 0;
  214. const unsigned int max_num_files_to_compact =
  215. GetMaxNumFilesToCompactBasedOnMaxReadAmp(file_num_compaction_trigger,
  216. ratio, &num_sr_not_compacted,
  217. &max_num_runs);
  218. if (max_num_files_to_compact == 0) {
  219. ROCKS_LOG_BUFFER(
  220. log_buffer_,
  221. "[%s] Universal: skipping compaction to reduce sorted run, num "
  222. "sorted runs not "
  223. "being compacted -- %u, max num runs allowed -- %d, max_run_size "
  224. "-- %" PRIu64 "\n",
  225. cf_name_.c_str(), num_sr_not_compacted, max_num_runs, max_run_size_);
  226. return nullptr;
  227. }
  228. Compaction* c =
  229. PickCompactionToReduceSortedRuns(UINT_MAX, max_num_files_to_compact);
  230. if (c != nullptr) {
  231. ROCKS_LOG_BUFFER(log_buffer_,
  232. "[%s] Universal: picked for sorted run num compaction "
  233. "to reduce sorted run, to "
  234. "compact file num -- %u, max num runs allowed"
  235. "-- %d, max_run_size -- %" PRIu64 "\n",
  236. cf_name_.c_str(), max_num_files_to_compact, max_num_runs,
  237. max_run_size_);
  238. }
  239. return c;
  240. }
  241. Compaction* MaybePickDeleteTriggeredCompaction(
  242. Compaction* const prev_picked_c) {
  243. if (prev_picked_c != nullptr) {
  244. return prev_picked_c;
  245. }
  246. Compaction* c = PickDeleteTriggeredCompaction();
  247. if (c != nullptr) {
  248. TEST_SYNC_POINT("PickDeleteTriggeredCompactionReturnNonnullptr");
  249. ROCKS_LOG_BUFFER(
  250. log_buffer_,
  251. "[%s] Universal: picked for delete triggered compaction\n",
  252. cf_name_.c_str());
  253. }
  254. return c;
  255. }
  256. // Pick Universal compaction to limit read amplification
  257. Compaction* PickCompactionToReduceSortedRuns(
  258. unsigned int ratio, unsigned int max_number_of_files_to_compact);
  259. // Pick Universal compaction to limit space amplification.
  260. Compaction* PickCompactionToReduceSizeAmp();
  261. // Try to pick incremental compaction to reduce space amplification.
  262. // It will return null if it cannot find a fanout within the threshold.
  263. // Fanout is defined as
  264. // total size of files to compact at output level
  265. // --------------------------------------------------
  266. // total size of files to compact at other levels
  267. Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold);
  268. Compaction* PickDeleteTriggeredCompaction();
  269. // Returns true if this given file (that is marked be compaction) should be
  270. // skipped from being picked for now. We do this to best use standalone range
  271. // tombstone files.
  272. bool ShouldSkipMarkedFile(const FileMetaData* file) const;
  273. // Form a compaction from the sorted run indicated by start_index to the
  274. // oldest sorted run.
  275. // The caller is responsible for making sure that those files are not in
  276. // compaction.
  277. Compaction* PickCompactionToOldest(size_t start_index,
  278. CompactionReason compaction_reason);
  279. Compaction* PickCompactionWithSortedRunRange(
  280. size_t start_index, size_t end_index, CompactionReason compaction_reason);
  281. // Try to pick periodic compaction. The caller should only call it
  282. // if there is at least one file marked for periodic compaction.
  283. // null will be returned if no such a compaction can be formed
  284. // because some files are being compacted.
  285. Compaction* PickPeriodicCompaction();
  286. bool ShouldSkipLastSortedRunForSizeAmpCompaction() const {
  287. assert(!sorted_runs_.empty());
  288. return mutable_cf_options_.preclude_last_level_data_seconds > 0 &&
  289. ioptions_.num_levels > 2 &&
  290. sorted_runs_.back().level == ioptions_.num_levels - 1 &&
  291. sorted_runs_.size() > 1;
  292. }
  293. // Used in universal compaction when the allow_trivial_move
  294. // option is set. Checks whether there are any overlapping files
  295. // in the input. Returns true if the input files are non
  296. // overlapping.
  297. bool IsInputFilesNonOverlapping(Compaction* c);
  298. uint64_t GetMaxOverlappingBytes() const;
  299. // To conditionally exclude some of the newest L0 files
  300. // from a size amp compaction. This is to prevent a large number of L0
  301. // files from being locked by a size amp compaction, potentially leading to
  302. // write stop with a few more flushes.
  303. //
  304. // Such exclusion is based on `num_l0_input_pre_exclusion`,
  305. // `level0_stop_writes_trigger`, `max/min_merge_width` and the pre-exclusion
  306. // compaction score. Noted that it will not make the size amp compaction of
  307. // interest invalid from running as a size amp compaction as long as its
  308. // pre-exclusion compaction score satisfies the condition to run.
  309. //
  310. // @param `num_l0_input_pre_exclusion` Number of L0 input files prior to
  311. // exclusion
  312. // @param `end_index` Index of the last sorted run selected as compaction
  313. // input. Will not be affected by this exclusion.
  314. // @param `start_index` Index of the first input sorted run prior to
  315. // exclusion. Will be modified as output based on the exclusion.
  316. // @param `candidate_size` Total size of all except for the last input sorted
  317. // runs prior to exclusion. Will be modified as output based on the exclusion.
  318. //
  319. // @return Number of L0 files to exclude. `start_index` and
  320. // `candidate_size` will be modified accordingly
  321. std::size_t MightExcludeNewL0sToReduceWriteStop(
  322. std::size_t num_l0_input_pre_exclusion, std::size_t end_index,
  323. std::size_t& start_index, uint64_t& candidate_size) const {
  324. if (num_l0_input_pre_exclusion == 0) {
  325. return 0;
  326. }
  327. assert(start_index <= end_index && sorted_runs_.size() > end_index);
  328. assert(mutable_cf_options_.level0_stop_writes_trigger > 0);
  329. const std::size_t level0_stop_writes_trigger = static_cast<std::size_t>(
  330. mutable_cf_options_.level0_stop_writes_trigger);
  331. const std::size_t max_merge_width = static_cast<std::size_t>(
  332. mutable_cf_options_.compaction_options_universal.max_merge_width);
  333. const std::size_t min_merge_width = static_cast<std::size_t>(
  334. mutable_cf_options_.compaction_options_universal.min_merge_width);
  335. const uint64_t max_size_amplification_percent =
  336. mutable_cf_options_.compaction_options_universal
  337. .max_size_amplification_percent;
  338. const uint64_t base_sr_size = sorted_runs_[end_index].size;
  339. // Leave at least 1 L0 file and 2 input sorted runs after exclusion
  340. const std::size_t max_num_l0_to_exclude =
  341. std::min(num_l0_input_pre_exclusion - 1, end_index - start_index - 1);
  342. // In universal compaction, sorted runs from non L0 levels are counted
  343. // toward `level0_stop_writes_trigger`. Therefore we need to subtract the
  344. // total number of sorted runs picked originally for this compaction from
  345. // `level0_stop_writes_trigger` to calculate
  346. // `num_extra_l0_before_write_stop`
  347. const std::size_t num_extra_l0_before_write_stop =
  348. level0_stop_writes_trigger -
  349. std::min(level0_stop_writes_trigger, end_index - start_index + 1);
  350. const std::size_t num_l0_to_exclude_for_max_merge_width =
  351. std::min(max_merge_width -
  352. std::min(max_merge_width, num_extra_l0_before_write_stop),
  353. max_num_l0_to_exclude);
  354. const std::size_t num_l0_to_exclude_for_min_merge_width =
  355. std::min(min_merge_width -
  356. std::min(min_merge_width, num_extra_l0_before_write_stop),
  357. max_num_l0_to_exclude);
  358. std::size_t num_l0_to_exclude = 0;
  359. uint64_t candidate_size_post_exclusion = candidate_size;
  360. for (std::size_t possible_num_l0_to_exclude =
  361. num_l0_to_exclude_for_min_merge_width;
  362. possible_num_l0_to_exclude <= num_l0_to_exclude_for_max_merge_width;
  363. ++possible_num_l0_to_exclude) {
  364. uint64_t current_candidate_size = candidate_size_post_exclusion;
  365. for (std::size_t j = num_l0_to_exclude; j < possible_num_l0_to_exclude;
  366. ++j) {
  367. current_candidate_size -=
  368. sorted_runs_.at(start_index + j).compensated_file_size;
  369. }
  370. // To ensure the compaction score before and after exclusion is similar
  371. // so this exclusion will not make the size amp compaction of
  372. // interest invalid from running as a size amp compaction as long as its
  373. // pre-exclusion compaction score satisfies the condition to run.
  374. if (current_candidate_size * 100 <
  375. max_size_amplification_percent * base_sr_size ||
  376. current_candidate_size < candidate_size * 9 / 10) {
  377. break;
  378. }
  379. num_l0_to_exclude = possible_num_l0_to_exclude;
  380. candidate_size_post_exclusion = current_candidate_size;
  381. }
  382. start_index += num_l0_to_exclude;
  383. candidate_size = candidate_size_post_exclusion;
  384. return num_l0_to_exclude;
  385. }
  386. bool MeetsOutputLevelRequirements(int output_level) const {
  387. return !require_max_output_level_ ||
  388. Compaction::OutputToNonZeroMaxOutputLevel(
  389. output_level, vstorage_->MaxOutputLevel(allow_ingest_behind_));
  390. }
  391. const ImmutableOptions& ioptions_;
  392. const InternalKeyComparator* icmp_;
  393. double score_;
  394. std::vector<SortedRun> sorted_runs_;
  395. uint64_t max_run_size_;
  396. const std::string& cf_name_;
  397. const MutableCFOptions& mutable_cf_options_;
  398. const MutableDBOptions& mutable_db_options_;
  399. VersionStorageInfo* vstorage_;
  400. UniversalCompactionPicker* picker_;
  401. LogBuffer* log_buffer_;
  402. // Optional earliest snapshot at time of compaction picking. This is only
  403. // provided if the column family doesn't enable user-defined timestamps.
  404. // And this information is only passed to `Compaction` picked by deletion
  405. // triggered compaction for possible optimizations.
  406. std::optional<SequenceNumber> earliest_snapshot_;
  407. const SnapshotChecker* snapshot_checker_;
  408. // Mapping from file id to its index in the sorted run for the files that are
  409. // marked for compaction. This is only populated when snapshot info is
  410. // populated.
  411. std::map<uint64_t, size_t> file_marked_for_compaction_to_sorted_run_index_;
  412. bool require_max_output_level_;
  413. bool allow_ingest_behind_;
  414. std::vector<UniversalCompactionBuilder::SortedRun> CalculateSortedRuns(
  415. const VersionStorageInfo& vstorage, int last_level,
  416. uint64_t* max_run_size);
  417. // Pick a path ID to place a newly generated file, with its estimated file
  418. // size.
  419. static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
  420. const MutableCFOptions& mutable_cf_options,
  421. uint64_t file_size);
  422. };
  423. // Used in universal compaction when trivial move is enabled.
  424. // This structure is used for the construction of min heap
  425. // that contains the file meta data, the level of the file
  426. // and the index of the file in that level
  427. struct InputFileInfo {
  428. InputFileInfo() : InputFileInfo(nullptr, 0, 0) {}
  429. InputFileInfo(FileMetaData* file_meta, size_t l, size_t i)
  430. : f(file_meta), level(l), index(i) {}
  431. FileMetaData* f;
  432. size_t level;
  433. size_t index;
  434. };
  435. // Used in universal compaction when trivial move is enabled.
  436. // This comparator is used for the construction of min heap
  437. // based on the smallest key of the file.
  438. struct SmallestKeyHeapComparator {
  439. explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
  440. bool operator()(InputFileInfo i1, InputFileInfo i2) const {
  441. return (ucmp_->CompareWithoutTimestamp(i1.f->smallest.user_key(),
  442. i2.f->smallest.user_key()) > 0);
  443. }
  444. private:
  445. const Comparator* ucmp_;
  446. };
  447. using SmallestKeyHeap =
  448. std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
  449. SmallestKeyHeapComparator>;
  450. // This function creates the heap that is used to find if the files are
  451. // overlapping during universal compaction when the allow_trivial_move
  452. // is set.
  453. SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
  454. SmallestKeyHeap smallest_key_priority_q =
  455. SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
  456. for (size_t l = 0; l < c->num_input_levels(); l++) {
  457. if (c->num_input_files(l) != 0) {
  458. if (l == 0 && c->start_level() == 0) {
  459. for (size_t i = 0; i < c->num_input_files(0); i++) {
  460. smallest_key_priority_q.emplace(c->input(0, i), 0, i);
  461. }
  462. } else {
  463. smallest_key_priority_q.emplace(c->input(l, 0), l, 0);
  464. }
  465. }
  466. }
  467. return smallest_key_priority_q;
  468. }
  469. #ifndef NDEBUG
  470. // smallest_seqno and largest_seqno are set iff. `files` is not empty.
  471. void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
  472. SequenceNumber* smallest_seqno,
  473. SequenceNumber* largest_seqno) {
  474. bool is_first = true;
  475. for (FileMetaData* f : files) {
  476. assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
  477. if (is_first) {
  478. is_first = false;
  479. *smallest_seqno = f->fd.smallest_seqno;
  480. *largest_seqno = f->fd.largest_seqno;
  481. } else {
  482. if (f->fd.smallest_seqno < *smallest_seqno) {
  483. *smallest_seqno = f->fd.smallest_seqno;
  484. }
  485. if (f->fd.largest_seqno > *largest_seqno) {
  486. *largest_seqno = f->fd.largest_seqno;
  487. }
  488. }
  489. }
  490. }
  491. #endif
  492. } // namespace
  493. // Algorithm that checks to see if there are any overlapping
  494. // files in the input
  495. bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
  496. auto comparator = icmp_->user_comparator();
  497. int first_iter = 1;
  498. InputFileInfo prev, curr;
  499. SmallestKeyHeap smallest_key_priority_q =
  500. create_level_heap(c, icmp_->user_comparator());
  501. while (!smallest_key_priority_q.empty()) {
  502. curr = smallest_key_priority_q.top();
  503. smallest_key_priority_q.pop();
  504. if (first_iter) {
  505. prev = curr;
  506. first_iter = 0;
  507. } else {
  508. if (comparator->CompareWithoutTimestamp(
  509. prev.f->largest.user_key(), curr.f->smallest.user_key()) >= 0) {
  510. // found overlapping files, return false
  511. return false;
  512. }
  513. assert(comparator->CompareWithoutTimestamp(
  514. curr.f->largest.user_key(), prev.f->largest.user_key()) > 0);
  515. prev = curr;
  516. }
  517. if (c->level(curr.level) != 0 &&
  518. curr.index < c->num_input_files(curr.level) - 1) {
  519. smallest_key_priority_q.emplace(c->input(curr.level, curr.index + 1),
  520. curr.level, curr.index + 1);
  521. }
  522. }
  523. return true;
  524. }
  525. bool UniversalCompactionPicker::NeedsCompaction(
  526. const VersionStorageInfo* vstorage) const {
  527. const int kLevel0 = 0;
  528. if (vstorage->CompactionScore(kLevel0) >= 1) {
  529. return true;
  530. }
  531. if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
  532. return true;
  533. }
  534. if (!vstorage->FilesMarkedForCompaction().empty()) {
  535. return true;
  536. }
  537. return false;
  538. }
  539. Compaction* UniversalCompactionPicker::PickCompaction(
  540. const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
  541. const MutableDBOptions& mutable_db_options,
  542. const std::vector<SequenceNumber>& existing_snapshots,
  543. const SnapshotChecker* snapshot_checker, VersionStorageInfo* vstorage,
  544. LogBuffer* log_buffer, bool require_max_output_level) {
  545. UniversalCompactionBuilder builder(
  546. ioptions_, icmp_, cf_name, mutable_cf_options, mutable_db_options,
  547. existing_snapshots, snapshot_checker, vstorage, this, log_buffer,
  548. require_max_output_level);
  549. return builder.PickCompaction();
  550. }
  551. void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf,
  552. size_t out_buf_size,
  553. bool print_path) const {
  554. if (level == 0) {
  555. assert(file != nullptr);
  556. if (file->fd.GetPathId() == 0 || !print_path) {
  557. snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
  558. } else {
  559. snprintf(out_buf, out_buf_size,
  560. "file %" PRIu64
  561. "(path "
  562. "%" PRIu32 ")",
  563. file->fd.GetNumber(), file->fd.GetPathId());
  564. }
  565. } else {
  566. snprintf(out_buf, out_buf_size, "level %d", level);
  567. }
  568. }
  569. void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
  570. char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
  571. if (level == 0) {
  572. assert(file != nullptr);
  573. snprintf(out_buf, out_buf_size,
  574. "file %" PRIu64 "[%" ROCKSDB_PRIszt
  575. "] "
  576. "with size %" PRIu64 " (compensated size %" PRIu64 ")",
  577. file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
  578. file->compensated_file_size);
  579. } else {
  580. snprintf(out_buf, out_buf_size,
  581. "level %d[%" ROCKSDB_PRIszt
  582. "] "
  583. "with size %" PRIu64 " (compensated size %" PRIu64 ")",
  584. level, sorted_run_count, size, compensated_file_size);
  585. }
  586. }
  587. std::vector<UniversalCompactionBuilder::SortedRun>
  588. UniversalCompactionBuilder::CalculateSortedRuns(
  589. const VersionStorageInfo& vstorage, int last_level,
  590. uint64_t* max_run_size) {
  591. assert(max_run_size);
  592. *max_run_size = 0;
  593. std::vector<UniversalCompactionBuilder::SortedRun> ret;
  594. for (FileMetaData* f : vstorage.LevelFiles(0)) {
  595. if (earliest_snapshot_.has_value() && f->marked_for_compaction) {
  596. file_marked_for_compaction_to_sorted_run_index_.emplace(f->fd.GetNumber(),
  597. ret.size());
  598. }
  599. ret.emplace_back(
  600. 0, f, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted,
  601. f->marked_for_compaction && f->FileIsStandAloneRangeTombstone());
  602. *max_run_size = std::max(*max_run_size, f->fd.GetFileSize());
  603. }
  604. for (int level = 1; level <= last_level; level++) {
  605. uint64_t total_compensated_size = 0U;
  606. uint64_t total_size = 0U;
  607. bool being_compacted = false;
  608. bool level_has_marked_standalone_rangedel = false;
  609. for (FileMetaData* f : vstorage.LevelFiles(level)) {
  610. total_compensated_size += f->compensated_file_size;
  611. total_size += f->fd.GetFileSize();
  612. // Size amp, read amp and periodic compactions always include all files
  613. // for a non-zero level. However, a delete triggered compaction and
  614. // a trivial move might pick a subset of files in a sorted run. So
  615. // always check all files in a sorted run and mark the entire run as
  616. // being compacted if one or more files are being compacted
  617. if (f->being_compacted) {
  618. being_compacted = f->being_compacted;
  619. }
  620. level_has_marked_standalone_rangedel =
  621. level_has_marked_standalone_rangedel ||
  622. (f->marked_for_compaction && f->FileIsStandAloneRangeTombstone());
  623. if (earliest_snapshot_.has_value() && f->marked_for_compaction) {
  624. file_marked_for_compaction_to_sorted_run_index_.emplace(
  625. f->fd.GetNumber(), ret.size());
  626. }
  627. }
  628. if (total_compensated_size > 0) {
  629. ret.emplace_back(level, nullptr, total_size, total_compensated_size,
  630. being_compacted, level_has_marked_standalone_rangedel);
  631. }
  632. *max_run_size = std::max(*max_run_size, total_size);
  633. }
  634. return ret;
  635. }
  636. bool UniversalCompactionBuilder::ShouldSkipMarkedFile(
  637. const FileMetaData* file) const {
  638. assert(file->marked_for_compaction);
  639. if (!earliest_snapshot_.has_value()) {
  640. return false;
  641. }
  642. if (!file->FileIsStandAloneRangeTombstone()) {
  643. return false;
  644. }
  645. // Skip until earliest snapshot advances at or above this standalone range
  646. // tombstone file. `DB::ReleaseSnapshot` will re-examine and schedule
  647. // compaction for it.
  648. if (!DataIsDefinitelyInSnapshot(file->fd.largest_seqno,
  649. earliest_snapshot_.value(),
  650. snapshot_checker_)) {
  651. return true;
  652. }
  653. auto iter = file_marked_for_compaction_to_sorted_run_index_.find(
  654. file->fd.GetNumber());
  655. assert(iter != file_marked_for_compaction_to_sorted_run_index_.end());
  656. size_t idx = iter->second;
  657. const SortedRun* succeeding_sorted_run =
  658. idx < sorted_runs_.size() - 1 ? &sorted_runs_[idx + 1] : nullptr;
  659. // Marked standalone range tombstone file is best used if it's in the start
  660. // input level. Skip to let that compaction happen first.
  661. if (succeeding_sorted_run &&
  662. succeeding_sorted_run->level_has_marked_standalone_rangedel) {
  663. return true;
  664. }
  665. return false;
  666. }
  667. // Universal style of compaction. Pick files that are contiguous in
  668. // time-range to compact.
  669. Compaction* UniversalCompactionBuilder::PickCompaction() {
  670. const int kLevel0 = 0;
  671. score_ = vstorage_->CompactionScore(kLevel0);
  672. const int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
  673. const int file_num_compaction_trigger =
  674. mutable_cf_options_.level0_file_num_compaction_trigger;
  675. const unsigned int ratio =
  676. mutable_cf_options_.compaction_options_universal.size_ratio;
  677. if (max_output_level == 0 &&
  678. !MeetsOutputLevelRequirements(0 /* output_level */)) {
  679. return nullptr;
  680. }
  681. max_run_size_ = 0;
  682. sorted_runs_ =
  683. CalculateSortedRuns(*vstorage_, max_output_level, &max_run_size_);
  684. if (sorted_runs_.size() == 0 ||
  685. (vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
  686. vstorage_->FilesMarkedForCompaction().empty() &&
  687. sorted_runs_.size() < (unsigned int)file_num_compaction_trigger)) {
  688. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
  689. cf_name_.c_str());
  690. TEST_SYNC_POINT_CALLBACK(
  691. "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
  692. return nullptr;
  693. }
  694. VersionStorageInfo::LevelSummaryStorage tmp;
  695. ROCKS_LOG_BUFFER_MAX_SZ(
  696. log_buffer_, 3072,
  697. "[%s] Universal: sorted runs: %" ROCKSDB_PRIszt " files: %s\n",
  698. cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
  699. Compaction* c = nullptr;
  700. c = MaybePickPeriodicCompaction(c);
  701. c = MaybePickSizeAmpCompaction(c, file_num_compaction_trigger);
  702. c = MaybePickCompactionToReduceSortedRunsBasedFileRatio(
  703. c, file_num_compaction_trigger, ratio);
  704. c = MaybePickCompactionToReduceSortedRuns(c, file_num_compaction_trigger,
  705. ratio);
  706. c = MaybePickDeleteTriggeredCompaction(c);
  707. if (c == nullptr) {
  708. TEST_SYNC_POINT_CALLBACK(
  709. "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
  710. return nullptr;
  711. }
  712. assert(c->output_level() <= vstorage_->MaxOutputLevel(allow_ingest_behind_));
  713. assert(MeetsOutputLevelRequirements(c->output_level()));
  714. if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
  715. true &&
  716. c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
  717. c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
  718. }
  719. // validate that all the chosen files of L0 are non overlapping in time
  720. #ifndef NDEBUG
  721. bool is_first = true;
  722. size_t level_index = 0U;
  723. if (c->start_level() == 0) {
  724. for (auto f : *c->inputs(0)) {
  725. assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
  726. if (is_first) {
  727. is_first = false;
  728. }
  729. }
  730. level_index = 1U;
  731. }
  732. for (; level_index < c->num_input_levels(); level_index++) {
  733. if (c->num_input_files(level_index) != 0) {
  734. SequenceNumber smallest_seqno = 0U;
  735. SequenceNumber largest_seqno = 0U;
  736. GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
  737. &largest_seqno);
  738. if (is_first) {
  739. is_first = false;
  740. }
  741. }
  742. }
  743. #endif
  744. // update statistics
  745. size_t num_files = 0;
  746. for (auto& each_level : *c->inputs()) {
  747. num_files += each_level.files.size();
  748. }
  749. RecordInHistogram(ioptions_.stats, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
  750. picker_->RegisterCompaction(c);
  751. vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
  752. TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
  753. c);
  754. return c;
  755. }
  756. uint32_t UniversalCompactionBuilder::GetPathId(
  757. const ImmutableCFOptions& ioptions,
  758. const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
  759. // Two conditions need to be satisfied:
  760. // (1) the target path needs to be able to hold the file's size
  761. // (2) Total size left in this and previous paths need to be not
  762. // smaller than expected future file size before this new file is
  763. // compacted, which is estimated based on size_ratio.
  764. // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
  765. // we will make sure the target file, probably with size of 16, will be
  766. // placed in a path so that eventually when new files are generated and
  767. // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
  768. // before the path we chose.
  769. //
  770. // TODO(sdong): now the case of multiple column families is not
  771. // considered in this algorithm. So the target size can be violated in
  772. // that case. We need to improve it.
  773. uint64_t accumulated_size = 0;
  774. uint64_t future_size =
  775. file_size *
  776. (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
  777. uint32_t p = 0;
  778. assert(!ioptions.cf_paths.empty());
  779. for (; p < ioptions.cf_paths.size() - 1; p++) {
  780. uint64_t target_size = ioptions.cf_paths[p].target_size;
  781. if (target_size > file_size &&
  782. accumulated_size + (target_size - file_size) > future_size) {
  783. return p;
  784. }
  785. accumulated_size += target_size;
  786. }
  787. return p;
  788. }
  789. //
  790. // Consider compaction files based on their size differences with
  791. // the next file in time order.
  792. //
  793. Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
  794. unsigned int ratio, unsigned int max_number_of_files_to_compact) {
  795. unsigned int min_merge_width =
  796. mutable_cf_options_.compaction_options_universal.min_merge_width;
  797. unsigned int max_merge_width =
  798. mutable_cf_options_.compaction_options_universal.max_merge_width;
  799. const SortedRun* sr = nullptr;
  800. bool done = false;
  801. size_t start_index = 0;
  802. unsigned int candidate_count = 0;
  803. unsigned int max_files_to_compact =
  804. std::min(max_merge_width, max_number_of_files_to_compact);
  805. min_merge_width = std::max(min_merge_width, 2U);
  806. // Caller checks the size before executing this function. This invariant is
  807. // important because otherwise we may have a possible integer underflow when
  808. // dealing with unsigned types.
  809. assert(sorted_runs_.size() > 0);
  810. // Considers a candidate file only if it is smaller than the
  811. // total size accumulated so far.
  812. for (size_t loop = 0; loop < sorted_runs_.size(); loop++) {
  813. candidate_count = 0;
  814. // Skip files that are already being compacted
  815. for (sr = nullptr; loop < sorted_runs_.size(); loop++) {
  816. sr = &sorted_runs_[loop];
  817. if (!sr->being_compacted && !sr->level_has_marked_standalone_rangedel) {
  818. candidate_count = 1;
  819. break;
  820. }
  821. char file_num_buf[kFormatFileNumberBufSize];
  822. sr->Dump(file_num_buf, sizeof(file_num_buf));
  823. if (sr->being_compacted) {
  824. ROCKS_LOG_BUFFER(log_buffer_,
  825. "[%s] Universal: %s"
  826. "[%d] being compacted, skipping for compaction to "
  827. "reduce sorted runs",
  828. cf_name_.c_str(), file_num_buf, loop);
  829. } else if (sr->level_has_marked_standalone_rangedel) {
  830. ROCKS_LOG_BUFFER(
  831. log_buffer_,
  832. "[%s] Universal: %s"
  833. "[%d] has standalone range tombstone files marked for "
  834. "compaction, skipping for compaction to reduce sorted runs",
  835. cf_name_.c_str(), file_num_buf, loop);
  836. }
  837. sr = nullptr;
  838. }
  839. // This file is not being compacted. Consider it as the
  840. // first candidate to be compacted.
  841. uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
  842. if (sr != nullptr) {
  843. char file_num_buf[kFormatFileNumberBufSize];
  844. sr->Dump(file_num_buf, sizeof(file_num_buf), true);
  845. ROCKS_LOG_BUFFER(log_buffer_,
  846. "[%s] Universal: Possible candidate for compaction to "
  847. "reduce sorted runs %s[%d].",
  848. cf_name_.c_str(), file_num_buf, loop);
  849. }
  850. // Check if the succeeding files need compaction.
  851. for (size_t i = loop + 1;
  852. candidate_count < max_files_to_compact && i < sorted_runs_.size();
  853. i++) {
  854. const SortedRun* succeeding_sr = &sorted_runs_[i];
  855. if (succeeding_sr->being_compacted ||
  856. succeeding_sr->level_has_marked_standalone_rangedel) {
  857. break;
  858. }
  859. // Pick files if the total/last candidate file size (increased by the
  860. // specified ratio) is still larger than the next candidate file.
  861. // candidate_size is the total size of files picked so far with the
  862. // default kCompactionStopStyleTotalSize; with
  863. // kCompactionStopStyleSimilarSize, it's simply the size of the last
  864. // picked file.
  865. double sz = candidate_size * (100.0 + ratio) / 100.0;
  866. if (sz < static_cast<double>(succeeding_sr->size)) {
  867. break;
  868. }
  869. if (mutable_cf_options_.compaction_options_universal.stop_style ==
  870. kCompactionStopStyleSimilarSize) {
  871. // Similar-size stopping rule: also check the last picked file isn't
  872. // far larger than the next candidate file.
  873. sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
  874. if (sz < static_cast<double>(candidate_size)) {
  875. // If the small file we've encountered begins a run of similar-size
  876. // files, we'll pick them up on a future iteration of the outer
  877. // loop. If it's some lonely straggler, it'll eventually get picked
  878. // by the last-resort read amp strategy which disregards size ratios.
  879. break;
  880. }
  881. candidate_size = succeeding_sr->compensated_file_size;
  882. } else { // default kCompactionStopStyleTotalSize
  883. candidate_size += succeeding_sr->compensated_file_size;
  884. }
  885. candidate_count++;
  886. }
  887. // Found a series of consecutive files that need compaction.
  888. if (candidate_count >= (unsigned int)min_merge_width) {
  889. start_index = loop;
  890. done = true;
  891. break;
  892. } else {
  893. for (size_t i = loop;
  894. i < loop + candidate_count && i < sorted_runs_.size(); i++) {
  895. const SortedRun* skipping_sr = &sorted_runs_[i];
  896. char file_num_buf[256];
  897. skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
  898. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s",
  899. cf_name_.c_str(), file_num_buf);
  900. }
  901. }
  902. }
  903. if (!done || candidate_count <= 1) {
  904. return nullptr;
  905. }
  906. size_t first_index_after = start_index + candidate_count;
  907. // Compression is enabled if files compacted earlier already reached
  908. // size ratio of compression.
  909. bool enable_compression = true;
  910. int ratio_to_compress =
  911. mutable_cf_options_.compaction_options_universal.compression_size_percent;
  912. if (ratio_to_compress >= 0) {
  913. uint64_t total_size = 0;
  914. for (auto& sorted_run : sorted_runs_) {
  915. total_size += sorted_run.compensated_file_size;
  916. }
  917. uint64_t older_file_size = 0;
  918. for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) {
  919. older_file_size += sorted_runs_[i].size;
  920. if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
  921. enable_compression = false;
  922. break;
  923. }
  924. }
  925. }
  926. uint64_t estimated_total_size = 0;
  927. for (unsigned int i = 0; i < first_index_after; i++) {
  928. estimated_total_size += sorted_runs_[i].size;
  929. }
  930. uint32_t path_id =
  931. GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
  932. int start_level = sorted_runs_[start_index].level;
  933. int output_level;
  934. // last level is reserved for the files ingested behind
  935. int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
  936. if (first_index_after == sorted_runs_.size()) {
  937. output_level = max_output_level;
  938. } else if (sorted_runs_[first_index_after].level == 0) {
  939. output_level = 0;
  940. } else {
  941. output_level = sorted_runs_[first_index_after].level - 1;
  942. }
  943. if (!MeetsOutputLevelRequirements(output_level)) {
  944. return nullptr;
  945. }
  946. std::vector<CompactionInputFiles> inputs(max_output_level + 1);
  947. for (size_t i = 0; i < inputs.size(); ++i) {
  948. inputs[i].level = start_level + static_cast<int>(i);
  949. }
  950. for (size_t i = start_index; i < first_index_after; i++) {
  951. auto& picking_sr = sorted_runs_[i];
  952. if (picking_sr.level == 0) {
  953. FileMetaData* picking_file = picking_sr.file;
  954. inputs[0].files.push_back(picking_file);
  955. } else {
  956. auto& files = inputs[picking_sr.level - start_level].files;
  957. for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
  958. files.push_back(f);
  959. }
  960. }
  961. char file_num_buf[256];
  962. picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
  963. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s",
  964. cf_name_.c_str(), file_num_buf);
  965. }
  966. std::vector<FileMetaData*> grandparents;
  967. // Include grandparents for potential file cutting in incremental
  968. // mode. It is for aligning file cutting boundaries across levels,
  969. // so that subsequent compactions can pick files with aligned
  970. // buffer.
  971. // Single files are only picked up in incremental mode, so that
  972. // there is no need for full range.
  973. if (mutable_cf_options_.compaction_options_universal.incremental &&
  974. first_index_after < sorted_runs_.size() &&
  975. sorted_runs_[first_index_after].level > 1) {
  976. grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level);
  977. }
  978. if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
  979. inputs, output_level,
  980. Compaction::EvaluateProximalLevel(
  981. vstorage_, mutable_cf_options_, ioptions_,
  982. start_level, output_level))) {
  983. return nullptr;
  984. }
  985. CompactionReason compaction_reason;
  986. if (max_number_of_files_to_compact == UINT_MAX) {
  987. compaction_reason = CompactionReason::kUniversalSizeRatio;
  988. } else {
  989. compaction_reason = CompactionReason::kUniversalSortedRunNum;
  990. }
  991. return new Compaction(vstorage_, ioptions_, mutable_cf_options_,
  992. mutable_db_options_, std::move(inputs), output_level,
  993. MaxFileSizeForLevel(mutable_cf_options_, output_level,
  994. kCompactionStyleUniversal),
  995. GetMaxOverlappingBytes(), path_id,
  996. GetCompressionType(vstorage_, mutable_cf_options_,
  997. output_level, 1, enable_compression),
  998. GetCompressionOptions(mutable_cf_options_, vstorage_,
  999. output_level, enable_compression),
  1000. Temperature::kUnknown,
  1001. /* max_subcompactions */ 0, grandparents,
  1002. /* earliest_snapshot */ std::nullopt,
  1003. /* snapshot_checker */ nullptr, compaction_reason,
  1004. /* trim_ts */ "", score_,
  1005. /* l0_files_might_overlap */ true);
  1006. }
  1007. // Look at overall size amplification. If size amplification
  1008. // exceeds the configured value, then do a compaction
  1009. // on longest span of candidate files without conflict with other compactions
  1010. // ending at the earliest base file (overriding configured values of file-size
  1011. // ratios, min_merge_width and max_merge_width).
  1012. Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
  1013. assert(!sorted_runs_.empty());
  1014. const size_t end_index = ShouldSkipLastSortedRunForSizeAmpCompaction()
  1015. ? sorted_runs_.size() - 2
  1016. : sorted_runs_.size() - 1;
  1017. if (sorted_runs_[end_index].being_compacted ||
  1018. sorted_runs_[end_index].level_has_marked_standalone_rangedel) {
  1019. return nullptr;
  1020. }
  1021. const uint64_t base_sr_size = sorted_runs_[end_index].size;
  1022. size_t start_index = end_index;
  1023. uint64_t candidate_size = 0;
  1024. size_t num_l0_files = 0;
  1025. // Get longest span (i.e, [start_index, end_index]) of available sorted runs
  1026. while (start_index > 0) {
  1027. const SortedRun* sr = &sorted_runs_[start_index - 1];
  1028. if (sr->being_compacted || sr->level_has_marked_standalone_rangedel) {
  1029. char file_num_buf[kFormatFileNumberBufSize];
  1030. sr->Dump(file_num_buf, sizeof(file_num_buf), true);
  1031. if (sr->being_compacted) {
  1032. ROCKS_LOG_BUFFER(log_buffer_,
  1033. "[%s] Universal: stopping for size amp compaction at "
  1034. "sorted run undergoing compaction: "
  1035. "%s[%" ROCKSDB_PRIszt "]",
  1036. cf_name_.c_str(), file_num_buf, start_index - 1);
  1037. } else if (sr->level_has_marked_standalone_rangedel) {
  1038. ROCKS_LOG_BUFFER(log_buffer_,
  1039. "[%s] Universal: stopping for size amp compaction at "
  1040. "sorted run that has "
  1041. "standalone range "
  1042. "tombstone files marked for compaction: "
  1043. "%s[%" ROCKSDB_PRIszt "]",
  1044. cf_name_.c_str(), file_num_buf, start_index - 1);
  1045. }
  1046. break;
  1047. }
  1048. candidate_size += sr->compensated_file_size;
  1049. num_l0_files += sr->level == 0 ? 1 : 0;
  1050. --start_index;
  1051. }
  1052. if (start_index == end_index) {
  1053. return nullptr;
  1054. }
  1055. {
  1056. const size_t num_l0_to_exclude = MightExcludeNewL0sToReduceWriteStop(
  1057. num_l0_files, end_index, start_index, candidate_size);
  1058. ROCKS_LOG_BUFFER(
  1059. log_buffer_,
  1060. "[%s] Universal: Excluding for size amp compaction %" ROCKSDB_PRIszt
  1061. " latest L0 files to reduce potential write stop "
  1062. "triggered by `level0_stop_writes_trigger`",
  1063. cf_name_.c_str(), num_l0_to_exclude);
  1064. }
  1065. {
  1066. char file_num_buf[kFormatFileNumberBufSize];
  1067. sorted_runs_[start_index].Dump(file_num_buf, sizeof(file_num_buf), true);
  1068. ROCKS_LOG_BUFFER(
  1069. log_buffer_,
  1070. "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
  1071. cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
  1072. }
  1073. // percentage flexibility while reducing size amplification
  1074. const uint64_t ratio = mutable_cf_options_.compaction_options_universal
  1075. .max_size_amplification_percent;
  1076. // size amplification = percentage of additional size
  1077. if (candidate_size * 100 < ratio * base_sr_size) {
  1078. ROCKS_LOG_BUFFER(log_buffer_,
  1079. "[%s] Universal: size amp compction not needed. "
  1080. "newer-files-total-size %" PRIu64
  1081. " earliest-file-size %" PRIu64,
  1082. cf_name_.c_str(), candidate_size, base_sr_size);
  1083. return nullptr;
  1084. } else {
  1085. ROCKS_LOG_BUFFER(log_buffer_,
  1086. "[%s] Universal: size amp compaction needed. "
  1087. "newer-files-total-size %" PRIu64
  1088. " earliest-file-size %" PRIu64,
  1089. cf_name_.c_str(), candidate_size, base_sr_size);
  1090. }
  1091. // Since incremental compaction can't include more than second last
  1092. // level, it can introduce penalty, compared to full compaction. We
  1093. // hard code the pentalty to be 80%. If we end up with a compaction
  1094. // fanout higher than 80% of full level compactions, we fall back
  1095. // to full level compaction.
  1096. // The 80% threshold is arbitrary and can be adjusted or made
  1097. // configurable in the future.
  1098. // This also prevent the case when compaction falls behind and we
  1099. // need to compact more levels for compactions to catch up.
  1100. if (mutable_cf_options_.compaction_options_universal.incremental) {
  1101. double fanout_threshold = static_cast<double>(base_sr_size) /
  1102. static_cast<double>(candidate_size) * 1.8;
  1103. Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold);
  1104. if (picked != nullptr) {
  1105. // As the feature is still incremental, picking incremental compaction
  1106. // might fail and we will fall bck to compacting full level.
  1107. return picked;
  1108. }
  1109. }
  1110. return PickCompactionWithSortedRunRange(
  1111. start_index, end_index, CompactionReason::kUniversalSizeAmplification);
  1112. }
  1113. Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
  1114. double fanout_threshold) {
  1115. // Try find all potential compactions with total size just over
  1116. // options.max_compaction_size / 2, and take the one with the lowest
  1117. // fanout (defined in declaration of the function).
  1118. // This is done by having a sliding window of the files at the second
  1119. // lowest level, and keep expanding while finding overlapping in the
  1120. // last level. Once total size exceeds the size threshold, calculate
  1121. // the fanout value. And then shrinking from the small side of the
  1122. // window. Keep doing it until the end.
  1123. // Finally, we try to include upper level files if they fall into
  1124. // the range.
  1125. //
  1126. // Note that it is a similar problem as leveled compaction's
  1127. // kMinOverlappingRatio priority, but instead of picking single files
  1128. // we expand to a target compaction size. The reason is that in
  1129. // leveled compaction, actual fanout value tends to high, e.g. 10, so
  1130. // even with single file in down merging level, the extra size
  1131. // compacted in boundary files is at a lower ratio. But here users
  1132. // often have size of second last level size to be 1/4, 1/3 or even
  1133. // 1/2 of the bottommost level, so picking single file in second most
  1134. // level will cause significant waste, which is not desirable.
  1135. //
  1136. // This algorithm has lots of room to improve to pick more efficient
  1137. // compactions.
  1138. assert(sorted_runs_.size() >= 2);
  1139. int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level;
  1140. if (second_last_level == 0) {
  1141. // Can't split Level 0.
  1142. return nullptr;
  1143. }
  1144. int output_level = sorted_runs_.back().level;
  1145. const std::vector<FileMetaData*>& bottom_files =
  1146. vstorage_->LevelFiles(output_level);
  1147. const std::vector<FileMetaData*>& files =
  1148. vstorage_->LevelFiles(second_last_level);
  1149. assert(!bottom_files.empty());
  1150. assert(!files.empty());
  1151. // std::unordered_map<uint64_t, uint64_t> file_to_order;
  1152. int picked_start_idx = 0;
  1153. int picked_end_idx = 0;
  1154. double picked_fanout = fanout_threshold;
  1155. // Use half target compaction bytes as anchor to stop growing second most
  1156. // level files, and reserve growing space for more overlapping bottom level,
  1157. // clean cut, files from other levels, etc.
  1158. uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2;
  1159. int start_idx = 0;
  1160. int bottom_end_idx = 0;
  1161. int bottom_start_idx = 0;
  1162. uint64_t non_bottom_size = 0;
  1163. uint64_t bottom_size = 0;
  1164. bool end_bottom_size_counted = false;
  1165. for (int end_idx = 0; end_idx < static_cast<int>(files.size()); end_idx++) {
  1166. FileMetaData* end_file = files[end_idx];
  1167. // Include bottom most level files smaller than the current second
  1168. // last level file.
  1169. int num_skipped = 0;
  1170. while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
  1171. icmp_->Compare(bottom_files[bottom_end_idx]->largest,
  1172. end_file->smallest) < 0) {
  1173. if (!end_bottom_size_counted) {
  1174. bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
  1175. }
  1176. bottom_end_idx++;
  1177. end_bottom_size_counted = false;
  1178. num_skipped++;
  1179. }
  1180. if (num_skipped > 1) {
  1181. // At least a file in the bottom most level falls into the file gap. No
  1182. // reason to include the file. We cut the range and start a new sliding
  1183. // window.
  1184. start_idx = end_idx;
  1185. }
  1186. if (start_idx == end_idx) {
  1187. // new sliding window.
  1188. non_bottom_size = 0;
  1189. bottom_size = 0;
  1190. bottom_start_idx = bottom_end_idx;
  1191. end_bottom_size_counted = false;
  1192. }
  1193. non_bottom_size += end_file->fd.file_size;
  1194. // Include all overlapping files in bottom level.
  1195. while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
  1196. icmp_->Compare(bottom_files[bottom_end_idx]->smallest,
  1197. end_file->largest) < 0) {
  1198. if (!end_bottom_size_counted) {
  1199. bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
  1200. end_bottom_size_counted = true;
  1201. }
  1202. if (icmp_->Compare(bottom_files[bottom_end_idx]->largest,
  1203. end_file->largest) > 0) {
  1204. // next level file cross large boundary of current file.
  1205. break;
  1206. }
  1207. bottom_end_idx++;
  1208. end_bottom_size_counted = false;
  1209. }
  1210. if ((non_bottom_size + bottom_size > comp_thres_size ||
  1211. end_idx == static_cast<int>(files.size()) - 1) &&
  1212. non_bottom_size > 0) { // Do we alow 0 size file at all?
  1213. // If it is a better compaction, remember it in picked* variables.
  1214. double fanout = static_cast<double>(bottom_size) /
  1215. static_cast<double>(non_bottom_size);
  1216. if (fanout < picked_fanout) {
  1217. picked_start_idx = start_idx;
  1218. picked_end_idx = end_idx;
  1219. picked_fanout = fanout;
  1220. }
  1221. // Shrink from the start end to under comp_thres_size
  1222. while (non_bottom_size + bottom_size > comp_thres_size &&
  1223. start_idx <= end_idx) {
  1224. non_bottom_size -= files[start_idx]->fd.file_size;
  1225. start_idx++;
  1226. if (start_idx < static_cast<int>(files.size())) {
  1227. while (bottom_start_idx <= bottom_end_idx &&
  1228. icmp_->Compare(bottom_files[bottom_start_idx]->largest,
  1229. files[start_idx]->smallest) < 0) {
  1230. bottom_size -= bottom_files[bottom_start_idx]->fd.file_size;
  1231. bottom_start_idx++;
  1232. }
  1233. }
  1234. }
  1235. }
  1236. }
  1237. if (picked_fanout >= fanout_threshold) {
  1238. assert(picked_fanout == fanout_threshold);
  1239. return nullptr;
  1240. }
  1241. std::vector<CompactionInputFiles> inputs;
  1242. CompactionInputFiles bottom_level_inputs;
  1243. CompactionInputFiles second_last_level_inputs;
  1244. second_last_level_inputs.level = second_last_level;
  1245. bottom_level_inputs.level = output_level;
  1246. for (int i = picked_start_idx; i <= picked_end_idx; i++) {
  1247. if (files[i]->being_compacted) {
  1248. return nullptr;
  1249. }
  1250. second_last_level_inputs.files.push_back(files[i]);
  1251. }
  1252. assert(!second_last_level_inputs.empty());
  1253. if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
  1254. &second_last_level_inputs,
  1255. /*next_smallest=*/nullptr)) {
  1256. return nullptr;
  1257. }
  1258. // We might be able to avoid this binary search if we save and expand
  1259. // from bottom_start_idx and bottom_end_idx, but for now, we use
  1260. // SetupOtherInputs() for simplicity.
  1261. int parent_index = -1; // Create and use bottom_start_idx?
  1262. if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
  1263. &second_last_level_inputs,
  1264. &bottom_level_inputs, &parent_index,
  1265. /*base_index=*/-1)) {
  1266. return nullptr;
  1267. }
  1268. // Try to include files in upper levels if they fall into the range.
  1269. // Since we need to go from lower level up and this is in the reverse
  1270. // order, compared to level order, we first write to an reversed
  1271. // data structure and finally copy them to compaction inputs.
  1272. InternalKey smallest, largest;
  1273. picker_->GetRange(second_last_level_inputs, &smallest, &largest);
  1274. std::vector<CompactionInputFiles> inputs_reverse;
  1275. for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) {
  1276. SortedRun& sr = *it;
  1277. if (sr.level == 0) {
  1278. break;
  1279. }
  1280. std::vector<FileMetaData*> level_inputs;
  1281. vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest,
  1282. &level_inputs);
  1283. if (!level_inputs.empty()) {
  1284. inputs_reverse.push_back({});
  1285. inputs_reverse.back().level = sr.level;
  1286. inputs_reverse.back().files = level_inputs;
  1287. picker_->GetRange(inputs_reverse.back(), &smallest, &largest);
  1288. }
  1289. }
  1290. for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) {
  1291. inputs.push_back(*it);
  1292. }
  1293. inputs.push_back(second_last_level_inputs);
  1294. inputs.push_back(bottom_level_inputs);
  1295. int start_level = Compaction::kInvalidLevel;
  1296. for (const auto& in : inputs) {
  1297. if (!in.empty()) {
  1298. // inputs should already be sorted by level
  1299. start_level = in.level;
  1300. break;
  1301. }
  1302. }
  1303. // intra L0 compactions outputs could have overlap
  1304. if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
  1305. inputs, output_level,
  1306. Compaction::EvaluateProximalLevel(
  1307. vstorage_, mutable_cf_options_, ioptions_,
  1308. start_level, output_level))) {
  1309. return nullptr;
  1310. }
  1311. // TODO support multi paths?
  1312. uint32_t path_id = 0;
  1313. return new Compaction(
  1314. vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
  1315. std::move(inputs), output_level,
  1316. MaxFileSizeForLevel(mutable_cf_options_, output_level,
  1317. kCompactionStyleUniversal),
  1318. GetMaxOverlappingBytes(), path_id,
  1319. GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
  1320. true /* enable_compression */),
  1321. GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
  1322. true /* enable_compression */),
  1323. Temperature::kUnknown,
  1324. /* max_subcompactions */ 0, /* grandparents */ {},
  1325. /* earliest_snapshot */ std::nullopt,
  1326. /* snapshot_checker */ nullptr,
  1327. CompactionReason::kUniversalSizeAmplification,
  1328. /* trim_ts */ "", score_,
  1329. /* l0_files_might_overlap */ true);
  1330. }
  1331. // Pick files marked for compaction. Typically, files are marked by
  1332. // CompactOnDeleteCollector due to the presence of tombstones.
  1333. Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
  1334. CompactionInputFiles start_level_inputs;
  1335. int output_level;
  1336. std::vector<CompactionInputFiles> inputs;
  1337. std::vector<FileMetaData*> grandparents;
  1338. if (vstorage_->num_levels() == 1) {
  1339. // This is single level universal. Since we're basically trying to reclaim
  1340. // space by processing files marked for compaction due to high tombstone
  1341. // density, let's do the same thing as compaction to reduce size amp which
  1342. // has the same goals.
  1343. int start_index = -1;
  1344. start_level_inputs.level = 0;
  1345. start_level_inputs.files.clear();
  1346. output_level = 0;
  1347. // Find the first file marked for compaction. Ignore the last file
  1348. for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) {
  1349. SortedRun* sr = &sorted_runs_[loop];
  1350. if (sr->being_compacted) {
  1351. continue;
  1352. }
  1353. FileMetaData* f = vstorage_->LevelFiles(0)[loop];
  1354. if (f->marked_for_compaction && !ShouldSkipMarkedFile(f)) {
  1355. start_level_inputs.files.push_back(f);
  1356. start_index =
  1357. static_cast<int>(loop); // Consider this as the first candidate.
  1358. break;
  1359. }
  1360. }
  1361. if (start_index < 0) {
  1362. // Either no file marked, or they're already being compacted
  1363. return nullptr;
  1364. }
  1365. for (size_t loop = start_index + 1; loop < sorted_runs_.size(); loop++) {
  1366. SortedRun* sr = &sorted_runs_[loop];
  1367. if (sr->being_compacted || sr->level_has_marked_standalone_rangedel) {
  1368. break;
  1369. }
  1370. FileMetaData* f = vstorage_->LevelFiles(0)[loop];
  1371. start_level_inputs.files.push_back(f);
  1372. }
  1373. if (start_level_inputs.size() <= 1) {
  1374. // If only the last file in L0 is marked for compaction, ignore it
  1375. return nullptr;
  1376. }
  1377. inputs.push_back(start_level_inputs);
  1378. } else {
  1379. int start_level;
  1380. // For multi-level universal, the strategy is to make this look more like
  1381. // leveled. We pick one of the files marked for compaction and compact with
  1382. // overlapping files in the adjacent level.
  1383. picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level,
  1384. &output_level, &start_level_inputs,
  1385. [this](const FileMetaData* file) {
  1386. return ShouldSkipMarkedFile(file);
  1387. });
  1388. if (start_level_inputs.empty()) {
  1389. return nullptr;
  1390. }
  1391. int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
  1392. // Pick the first non-empty level after the start_level
  1393. for (output_level = start_level + 1; output_level <= max_output_level;
  1394. output_level++) {
  1395. if (vstorage_->NumLevelFiles(output_level) != 0) {
  1396. break;
  1397. }
  1398. }
  1399. // If all higher levels are empty, pick the highest level as output level
  1400. if (output_level > max_output_level) {
  1401. if (start_level == 0) {
  1402. output_level = max_output_level;
  1403. } else {
  1404. // If start level is non-zero and all higher levels are empty, this
  1405. // compaction will translate into a trivial move. Since the idea is
  1406. // to reclaim space and trivial move doesn't help with that, we
  1407. // skip compaction in this case and return nullptr
  1408. return nullptr;
  1409. }
  1410. }
  1411. assert(output_level <= max_output_level);
  1412. if (!MeetsOutputLevelRequirements(output_level)) {
  1413. return nullptr;
  1414. }
  1415. if (output_level != 0) {
  1416. // For standalone range deletion, we don't want to compact it with newer
  1417. // L0 files that it doesn't cover.
  1418. const FileMetaData* starting_l0_file =
  1419. (start_level == 0 && start_level_inputs.size() == 1 &&
  1420. start_level_inputs.files[0]->FileIsStandAloneRangeTombstone())
  1421. ? start_level_inputs.files[0]
  1422. : nullptr;
  1423. if (start_level == 0) {
  1424. if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
  1425. output_level, nullptr,
  1426. starting_l0_file)) {
  1427. return nullptr;
  1428. }
  1429. }
  1430. CompactionInputFiles output_level_inputs;
  1431. int parent_index = -1;
  1432. output_level_inputs.level = output_level;
  1433. if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
  1434. &start_level_inputs, &output_level_inputs,
  1435. &parent_index, -1, false,
  1436. starting_l0_file)) {
  1437. return nullptr;
  1438. }
  1439. inputs.push_back(start_level_inputs);
  1440. if (!output_level_inputs.empty()) {
  1441. inputs.push_back(output_level_inputs);
  1442. }
  1443. if (picker_->FilesRangeOverlapWithCompaction(
  1444. inputs, output_level,
  1445. Compaction::EvaluateProximalLevel(vstorage_, mutable_cf_options_,
  1446. ioptions_, start_level,
  1447. output_level))) {
  1448. return nullptr;
  1449. }
  1450. picker_->GetGrandparents(vstorage_, start_level_inputs,
  1451. output_level_inputs, &grandparents);
  1452. } else {
  1453. inputs.push_back(start_level_inputs);
  1454. }
  1455. }
  1456. uint64_t estimated_total_size = 0;
  1457. // Use size of the output level as estimated file size
  1458. for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
  1459. estimated_total_size += f->fd.GetFileSize();
  1460. }
  1461. uint32_t path_id =
  1462. GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
  1463. return new Compaction(
  1464. vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
  1465. std::move(inputs), output_level,
  1466. MaxFileSizeForLevel(mutable_cf_options_, output_level,
  1467. kCompactionStyleUniversal),
  1468. /* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id,
  1469. GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1),
  1470. GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
  1471. Temperature::kUnknown,
  1472. /* max_subcompactions */ 0, grandparents, earliest_snapshot_,
  1473. snapshot_checker_, CompactionReason::kFilesMarkedForCompaction,
  1474. /* trim_ts */ "", score_,
  1475. /* l0_files_might_overlap */ true);
  1476. }
  1477. Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
  1478. size_t start_index, CompactionReason compaction_reason) {
  1479. return PickCompactionWithSortedRunRange(start_index, sorted_runs_.size() - 1,
  1480. compaction_reason);
  1481. }
  1482. Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
  1483. size_t start_index, size_t end_index, CompactionReason compaction_reason) {
  1484. assert(start_index < sorted_runs_.size());
  1485. // Estimate total file size
  1486. uint64_t estimated_total_size = 0;
  1487. for (size_t loop = start_index; loop <= end_index; loop++) {
  1488. estimated_total_size += sorted_runs_[loop].size;
  1489. }
  1490. uint32_t path_id =
  1491. GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
  1492. int start_level = sorted_runs_[start_index].level;
  1493. int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
  1494. std::vector<CompactionInputFiles> inputs(max_output_level + 1);
  1495. for (size_t i = 0; i < inputs.size(); ++i) {
  1496. inputs[i].level = start_level + static_cast<int>(i);
  1497. }
  1498. for (size_t loop = start_index; loop <= end_index; loop++) {
  1499. auto& picking_sr = sorted_runs_[loop];
  1500. if (picking_sr.level == 0) {
  1501. FileMetaData* f = picking_sr.file;
  1502. inputs[0].files.push_back(f);
  1503. } else {
  1504. auto& files = inputs[picking_sr.level - start_level].files;
  1505. for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
  1506. files.push_back(f);
  1507. }
  1508. }
  1509. std::string comp_reason_print_string;
  1510. if (compaction_reason == CompactionReason::kPeriodicCompaction) {
  1511. comp_reason_print_string = "periodic compaction";
  1512. } else if (compaction_reason ==
  1513. CompactionReason::kUniversalSizeAmplification) {
  1514. comp_reason_print_string = "size amp";
  1515. } else {
  1516. assert(false);
  1517. comp_reason_print_string = "unknown: ";
  1518. comp_reason_print_string.append(
  1519. std::to_string(static_cast<int>(compaction_reason)));
  1520. }
  1521. char file_num_buf[256];
  1522. picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
  1523. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
  1524. cf_name_.c_str(), comp_reason_print_string.c_str(),
  1525. file_num_buf);
  1526. }
  1527. int output_level;
  1528. if (end_index == sorted_runs_.size() - 1) {
  1529. output_level = max_output_level;
  1530. } else {
  1531. // if it's not including all sorted_runs, it can only output to the level
  1532. // above the `end_index + 1` sorted_run.
  1533. output_level = sorted_runs_[end_index + 1].level - 1;
  1534. }
  1535. if (!MeetsOutputLevelRequirements(output_level)) {
  1536. return nullptr;
  1537. }
  1538. // intra L0 compactions outputs could have overlap
  1539. if (output_level != 0 && picker_->FilesRangeOverlapWithCompaction(
  1540. inputs, output_level,
  1541. Compaction::EvaluateProximalLevel(
  1542. vstorage_, mutable_cf_options_, ioptions_,
  1543. start_level, output_level))) {
  1544. return nullptr;
  1545. }
  1546. // We never check size for
  1547. // compaction_options_universal.compression_size_percent,
  1548. // because we always compact all the files, so always compress.
  1549. return new Compaction(
  1550. vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
  1551. std::move(inputs), output_level,
  1552. MaxFileSizeForLevel(mutable_cf_options_, output_level,
  1553. kCompactionStyleUniversal),
  1554. GetMaxOverlappingBytes(), path_id,
  1555. GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1,
  1556. true /* enable_compression */),
  1557. GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
  1558. true /* enable_compression */),
  1559. Temperature::kUnknown,
  1560. /* max_subcompactions */ 0, /* grandparents */ {},
  1561. /* earliest_snapshot */ std::nullopt,
  1562. /* snapshot_checker */ nullptr, compaction_reason,
  1563. /* trim_ts */ "", score_,
  1564. /* l0_files_might_overlap */ true);
  1565. }
  1566. Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
  1567. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
  1568. cf_name_.c_str());
  1569. // In universal compaction, sorted runs contain older data are almost always
  1570. // generated earlier too. To simplify the problem, we just try to trigger
  1571. // a full compaction. We start from the oldest sorted run and include
  1572. // all sorted runs, until we hit a sorted already being compacted.
  1573. // Since usually the largest (which is usually the oldest) sorted run is
  1574. // included anyway, doing a full compaction won't increase write
  1575. // amplification much.
  1576. // Get some information from marked files to check whether a file is
  1577. // included in the compaction.
  1578. size_t start_index = sorted_runs_.size();
  1579. while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted &&
  1580. !sorted_runs_[start_index - 1].level_has_marked_standalone_rangedel) {
  1581. start_index--;
  1582. }
  1583. if (start_index == sorted_runs_.size()) {
  1584. return nullptr;
  1585. }
  1586. // There is a rare corner case where we can't pick up all the files
  1587. // because some files are being compacted and we end up with picking files
  1588. // but none of them need periodic compaction. Unless we simply recompact
  1589. // the last sorted run (either the last level or last L0 file), we would just
  1590. // execute the compaction, in order to simplify the logic.
  1591. if (start_index == sorted_runs_.size() - 1) {
  1592. bool included_file_marked = false;
  1593. int start_level = sorted_runs_[start_index].level;
  1594. FileMetaData* start_file = sorted_runs_[start_index].file;
  1595. for (const std::pair<int, FileMetaData*>& level_file_pair :
  1596. vstorage_->FilesMarkedForPeriodicCompaction()) {
  1597. if (start_level != 0) {
  1598. // Last sorted run is a level
  1599. if (start_level == level_file_pair.first) {
  1600. included_file_marked = true;
  1601. break;
  1602. }
  1603. } else {
  1604. // Last sorted run is a L0 file.
  1605. if (start_file == level_file_pair.second) {
  1606. included_file_marked = true;
  1607. break;
  1608. }
  1609. }
  1610. }
  1611. if (!included_file_marked) {
  1612. ROCKS_LOG_BUFFER(log_buffer_,
  1613. "[%s] Universal: Cannot form a compaction covering file "
  1614. "marked for periodic compaction",
  1615. cf_name_.c_str());
  1616. return nullptr;
  1617. }
  1618. }
  1619. Compaction* c = PickCompactionToOldest(start_index,
  1620. CompactionReason::kPeriodicCompaction);
  1621. TEST_SYNC_POINT_CALLBACK(
  1622. "UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
  1623. return c;
  1624. }
  1625. uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const {
  1626. if (!mutable_cf_options_.compaction_options_universal.incremental) {
  1627. return std::numeric_limits<uint64_t>::max();
  1628. } else {
  1629. // Try to align cutting boundary with files at the next level if the
  1630. // file isn't end up with 1/2 of target size, or it would overlap
  1631. // with two full size files at the next level.
  1632. return mutable_cf_options_.target_file_size_base / 2 * 3;
  1633. }
  1634. }
  1635. } // namespace ROCKSDB_NAMESPACE