compaction_job.cc 120 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/compaction/compaction_job.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <memory>
  13. #include <optional>
  14. #include <set>
  15. #include <utility>
  16. #include <vector>
  17. #include "db/blob/blob_counting_iterator.h"
  18. #include "db/blob/blob_file_addition.h"
  19. #include "db/blob/blob_file_builder.h"
  20. #include "db/builder.h"
  21. #include "db/compaction/clipping_iterator.h"
  22. #include "db/compaction/compaction_state.h"
  23. #include "db/db_impl/db_impl.h"
  24. #include "db/dbformat.h"
  25. #include "db/error_handler.h"
  26. #include "db/event_helpers.h"
  27. #include "db/history_trimming_iterator.h"
  28. #include "db/log_writer.h"
  29. #include "db/merge_helper.h"
  30. #include "db/range_del_aggregator.h"
  31. #include "db/version_edit.h"
  32. #include "db/version_set.h"
  33. #include "file/filename.h"
  34. #include "file/read_write_util.h"
  35. #include "file/sst_file_manager_impl.h"
  36. #include "file/writable_file_writer.h"
  37. #include "logging/log_buffer.h"
  38. #include "logging/logging.h"
  39. #include "monitoring/iostats_context_imp.h"
  40. #include "monitoring/thread_status_util.h"
  41. #include "options/configurable_helper.h"
  42. #include "options/options_helper.h"
  43. #include "port/port.h"
  44. #include "rocksdb/db.h"
  45. #include "rocksdb/env.h"
  46. #include "rocksdb/options.h"
  47. #include "rocksdb/statistics.h"
  48. #include "rocksdb/status.h"
  49. #include "rocksdb/table.h"
  50. #include "rocksdb/utilities/options_type.h"
  51. #include "table/format.h"
  52. #include "table/merging_iterator.h"
  53. #include "table/meta_blocks.h"
  54. #include "table/table_builder.h"
  55. #include "table/unique_id_impl.h"
  56. #include "test_util/sync_point.h"
  57. #include "util/stop_watch.h"
  58. namespace ROCKSDB_NAMESPACE {
  59. const char* GetCompactionReasonString(CompactionReason compaction_reason) {
  60. switch (compaction_reason) {
  61. case CompactionReason::kUnknown:
  62. return "Unknown";
  63. case CompactionReason::kLevelL0FilesNum:
  64. return "LevelL0FilesNum";
  65. case CompactionReason::kLevelMaxLevelSize:
  66. return "LevelMaxLevelSize";
  67. case CompactionReason::kUniversalSizeAmplification:
  68. return "UniversalSizeAmplification";
  69. case CompactionReason::kUniversalSizeRatio:
  70. return "UniversalSizeRatio";
  71. case CompactionReason::kUniversalSortedRunNum:
  72. return "UniversalSortedRunNum";
  73. case CompactionReason::kFIFOMaxSize:
  74. return "FIFOMaxSize";
  75. case CompactionReason::kFIFOReduceNumFiles:
  76. return "FIFOReduceNumFiles";
  77. case CompactionReason::kFIFOTtl:
  78. return "FIFOTtl";
  79. case CompactionReason::kManualCompaction:
  80. return "ManualCompaction";
  81. case CompactionReason::kFilesMarkedForCompaction:
  82. return "FilesMarkedForCompaction";
  83. case CompactionReason::kBottommostFiles:
  84. return "BottommostFiles";
  85. case CompactionReason::kTtl:
  86. return "Ttl";
  87. case CompactionReason::kFlush:
  88. return "Flush";
  89. case CompactionReason::kExternalSstIngestion:
  90. return "ExternalSstIngestion";
  91. case CompactionReason::kPeriodicCompaction:
  92. return "PeriodicCompaction";
  93. case CompactionReason::kChangeTemperature:
  94. return "ChangeTemperature";
  95. case CompactionReason::kForcedBlobGC:
  96. return "ForcedBlobGC";
  97. case CompactionReason::kRoundRobinTtl:
  98. return "RoundRobinTtl";
  99. case CompactionReason::kRefitLevel:
  100. return "RefitLevel";
  101. case CompactionReason::kNumOfReasons:
  102. // fall through
  103. default:
  104. assert(false);
  105. return "Invalid";
  106. }
  107. }
  108. const char* GetCompactionProximalOutputRangeTypeString(
  109. Compaction::ProximalOutputRangeType range_type) {
  110. switch (range_type) {
  111. case Compaction::ProximalOutputRangeType::kNotSupported:
  112. return "NotSupported";
  113. case Compaction::ProximalOutputRangeType::kFullRange:
  114. return "FullRange";
  115. case Compaction::ProximalOutputRangeType::kNonLastRange:
  116. return "NonLastRange";
  117. case Compaction::ProximalOutputRangeType::kDisabled:
  118. return "Disabled";
  119. default:
  120. assert(false);
  121. return "Invalid";
  122. }
  123. }
  124. CompactionJob::CompactionJob(
  125. int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
  126. const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
  127. VersionSet* versions, const std::atomic<bool>* shutting_down,
  128. LogBuffer* log_buffer, FSDirectory* db_directory,
  129. FSDirectory* output_directory, FSDirectory* blob_output_directory,
  130. Statistics* stats, InstrumentedMutex* db_mutex,
  131. ErrorHandler* db_error_handler, JobContext* job_context,
  132. std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
  133. bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
  134. CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
  135. const std::shared_ptr<IOTracer>& io_tracer,
  136. const std::atomic<bool>& manual_compaction_canceled,
  137. const std::string& db_id, const std::string& db_session_id,
  138. std::string full_history_ts_low, std::string trim_ts,
  139. BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
  140. int* bg_bottom_compaction_scheduled)
  141. : compact_(new CompactionState(compaction)),
  142. internal_stats_(compaction->compaction_reason(), 1),
  143. db_options_(db_options),
  144. mutable_db_options_copy_(mutable_db_options),
  145. log_buffer_(log_buffer),
  146. output_directory_(output_directory),
  147. stats_(stats),
  148. bottommost_level_(false),
  149. write_hint_(Env::WLTH_NOT_SET),
  150. job_stats_(compaction_job_stats),
  151. job_id_(job_id),
  152. dbname_(dbname),
  153. db_id_(db_id),
  154. db_session_id_(db_session_id),
  155. file_options_(file_options),
  156. env_(db_options.env),
  157. io_tracer_(io_tracer),
  158. fs_(db_options.fs, io_tracer),
  159. file_options_for_read_(
  160. fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
  161. versions_(versions),
  162. shutting_down_(shutting_down),
  163. manual_compaction_canceled_(manual_compaction_canceled),
  164. db_directory_(db_directory),
  165. blob_output_directory_(blob_output_directory),
  166. db_mutex_(db_mutex),
  167. db_error_handler_(db_error_handler),
  168. // job_context cannot be nullptr, but we will assert later in the body of
  169. // the constructor.
  170. earliest_snapshot_(job_context
  171. ? job_context->GetEarliestSnapshotSequence()
  172. : kMaxSequenceNumber),
  173. job_context_(job_context),
  174. table_cache_(std::move(table_cache)),
  175. event_logger_(event_logger),
  176. paranoid_file_checks_(paranoid_file_checks),
  177. measure_io_stats_(measure_io_stats),
  178. thread_pri_(thread_pri),
  179. full_history_ts_low_(std::move(full_history_ts_low)),
  180. trim_ts_(std::move(trim_ts)),
  181. blob_callback_(blob_callback),
  182. extra_num_subcompaction_threads_reserved_(0),
  183. bg_compaction_scheduled_(bg_compaction_scheduled),
  184. bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
  185. assert(job_stats_ != nullptr);
  186. assert(log_buffer_ != nullptr);
  187. assert(job_context);
  188. assert(job_context->snapshot_context_initialized);
  189. const auto* cfd = compact_->compaction->column_family_data();
  190. ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
  191. ThreadStatusUtil::SetColumnFamily(cfd);
  192. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  193. ReportStartedCompaction(compaction);
  194. }
  195. CompactionJob::~CompactionJob() {
  196. assert(compact_ == nullptr);
  197. ThreadStatusUtil::ResetThreadStatus();
  198. }
  199. void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
  200. ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
  201. job_id_);
  202. ThreadStatusUtil::SetThreadOperationProperty(
  203. ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
  204. (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
  205. compact_->compaction->output_level());
  206. // In the current design, a CompactionJob is always created
  207. // for non-trivial compaction.
  208. assert(compaction->IsTrivialMove() == false ||
  209. compaction->is_manual_compaction() == true);
  210. ThreadStatusUtil::SetThreadOperationProperty(
  211. ThreadStatus::COMPACTION_PROP_FLAGS,
  212. compaction->is_manual_compaction() +
  213. (compaction->deletion_compaction() << 1));
  214. auto total_input_bytes = compaction->CalculateTotalInputSize();
  215. ThreadStatusUtil::SetThreadOperationProperty(
  216. ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, total_input_bytes);
  217. IOSTATS_RESET(bytes_written);
  218. IOSTATS_RESET(bytes_read);
  219. ThreadStatusUtil::SetThreadOperationProperty(
  220. ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
  221. ThreadStatusUtil::SetThreadOperationProperty(
  222. ThreadStatus::COMPACTION_BYTES_READ, 0);
  223. // Set the thread operation after operation properties
  224. // to ensure GetThreadList() can always show them all together.
  225. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  226. job_stats_->is_manual_compaction = compaction->is_manual_compaction();
  227. job_stats_->is_full_compaction = compaction->is_full_compaction();
  228. // populate compaction stats num_input_files and total_num_of_bytes
  229. size_t num_input_files = 0;
  230. for (int input_level = 0;
  231. input_level < static_cast<int>(compaction->num_input_levels());
  232. ++input_level) {
  233. const LevelFilesBrief* flevel = compaction->input_levels(input_level);
  234. num_input_files += flevel->num_files;
  235. }
  236. job_stats_->CompactionJobStats::num_input_files = num_input_files;
  237. job_stats_->total_input_bytes = total_input_bytes;
  238. }
  239. void CompactionJob::Prepare(
  240. std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
  241. known_single_subcompact,
  242. const CompactionProgress& compaction_progress,
  243. log::Writer* compaction_progress_writer) {
  244. db_mutex_->AssertHeld();
  245. AutoThreadOperationStageUpdater stage_updater(
  246. ThreadStatus::STAGE_COMPACTION_PREPARE);
  247. // Generate file_levels_ for compaction before making Iterator
  248. auto* c = compact_->compaction;
  249. [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
  250. assert(cfd != nullptr);
  251. const VersionStorageInfo* storage_info = c->input_version()->storage_info();
  252. assert(storage_info);
  253. assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
  254. write_hint_ = storage_info->CalculateSSTWriteHint(
  255. c->output_level(), db_options_.calculate_sst_write_lifetime_hint_set);
  256. bottommost_level_ = c->bottommost_level();
  257. if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) {
  258. StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
  259. GenSubcompactionBoundaries();
  260. }
  261. if (boundaries_.size() >= 1) {
  262. assert(!known_single_subcompact.has_value());
  263. for (size_t i = 0; i <= boundaries_.size(); i++) {
  264. compact_->sub_compact_states.emplace_back(
  265. c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
  266. (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
  267. : std::nullopt,
  268. static_cast<uint32_t>(i));
  269. // assert to validate that boundaries don't have same user keys (without
  270. // timestamp part).
  271. assert(i == 0 || i == boundaries_.size() ||
  272. cfd->user_comparator()->CompareWithoutTimestamp(
  273. boundaries_[i - 1], boundaries_[i]) < 0);
  274. }
  275. RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
  276. compact_->sub_compact_states.size());
  277. } else {
  278. std::optional<Slice> start_key;
  279. std::optional<Slice> end_key;
  280. if (known_single_subcompact.has_value()) {
  281. start_key = known_single_subcompact.value().first;
  282. end_key = known_single_subcompact.value().second;
  283. } else {
  284. assert(!start_key.has_value() && !end_key.has_value());
  285. }
  286. compact_->sub_compact_states.emplace_back(c, start_key, end_key,
  287. /*sub_job_id*/ 0);
  288. }
  289. MaybeAssignCompactionProgressAndWriter(compaction_progress,
  290. compaction_progress_writer);
  291. // collect all seqno->time information from the input files which will be used
  292. // to encode seqno->time to the output files.
  293. SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber;
  294. SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber;
  295. uint64_t preserve_time_duration =
  296. MinAndMaxPreserveSeconds(c->mutable_cf_options()).max_preserve_seconds;
  297. if (preserve_time_duration > 0) {
  298. const ReadOptions read_options(Env::IOActivity::kCompaction);
  299. // Setup seqno_to_time_mapping_ with relevant time range.
  300. seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration);
  301. for (const auto& each_level : *c->inputs()) {
  302. for (const auto& fmd : each_level.files) {
  303. std::shared_ptr<const TableProperties> tp;
  304. Status s = c->input_version()->GetTableProperties(read_options, &tp,
  305. fmd, nullptr);
  306. if (s.ok()) {
  307. s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
  308. }
  309. if (!s.ok()) {
  310. ROCKS_LOG_WARN(
  311. db_options_.info_log,
  312. "Problem reading or processing seqno-to-time mapping: %s",
  313. s.ToString().c_str());
  314. }
  315. }
  316. }
  317. int64_t _current_time = 0;
  318. Status s = db_options_.clock->GetCurrentTime(&_current_time);
  319. if (!s.ok()) {
  320. ROCKS_LOG_WARN(db_options_.info_log,
  321. "Failed to get current time in compaction: Status: %s",
  322. s.ToString().c_str());
  323. // preserve all time information
  324. preserve_time_min_seqno = 0;
  325. preclude_last_level_min_seqno = 0;
  326. seqno_to_time_mapping_.Enforce();
  327. } else {
  328. seqno_to_time_mapping_.Enforce(_current_time);
  329. seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
  330. static_cast<uint64_t>(_current_time),
  331. c->mutable_cf_options().preserve_internal_time_seconds,
  332. c->mutable_cf_options().preclude_last_level_data_seconds,
  333. &preserve_time_min_seqno, &preclude_last_level_min_seqno);
  334. }
  335. // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
  336. // limit the capacity after them.
  337. // Here If we set capacity to the per-SST limit, we could be throwing away
  338. // fidelity when a compaction output file has a narrower seqno range than
  339. // all the inputs. If we only limit capacity for each compaction output, we
  340. // could be doing a lot of unnecessary recomputation in a large compaction
  341. // (up to quadratic in number of files). Thus, we do soemthing in the
  342. // middle: enforce a resonably large constant size limit substantially
  343. // larger than kMaxSeqnoTimePairsPerSST.
  344. seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
  345. }
  346. #ifndef NDEBUG
  347. assert(preserve_time_min_seqno <= preclude_last_level_min_seqno);
  348. TEST_SYNC_POINT_CALLBACK(
  349. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  350. static_cast<void*>(&preclude_last_level_min_seqno));
  351. // Restore the invariant asserted above, in case it was broken under the
  352. // callback
  353. preserve_time_min_seqno =
  354. std::min(preclude_last_level_min_seqno, preserve_time_min_seqno);
  355. #endif
  356. // Preserve sequence numbers for preserved write times and snapshots, though
  357. // the specific sequence number of the earliest snapshot can be zeroed.
  358. preserve_seqno_after_ =
  359. std::max(preserve_time_min_seqno, SequenceNumber{1}) - 1;
  360. preserve_seqno_after_ = std::min(preserve_seqno_after_, earliest_snapshot_);
  361. // If using preclude feature, also preclude snapshots from last level, just
  362. // because they are heuristically more likely to be accessed than non-snapshot
  363. // data.
  364. if (preclude_last_level_min_seqno < kMaxSequenceNumber &&
  365. earliest_snapshot_ < preclude_last_level_min_seqno) {
  366. preclude_last_level_min_seqno = earliest_snapshot_;
  367. }
  368. // Now combine what we would like to preclude from last level with what we
  369. // can safely support without dangerously moving data back up the LSM tree,
  370. // to get the final seqno threshold for proximal vs. last. In particular,
  371. // when the reserved output key range for the proximal level does not
  372. // include the entire last level input key range, we need to keep entries
  373. // already in the last level there. (Even allowing within-range entries to
  374. // move back up could cause problems with range tombstones. Perhaps it
  375. // would be better in some rare cases to keep entries in the last level
  376. // one-by-one rather than based on sequence number, but that would add extra
  377. // tracking and complexity to CompactionIterator that is probably not
  378. // worthwhile overall. Correctness is also more clear when splitting by
  379. // seqno threshold.)
  380. proximal_after_seqno_ = std::max(preclude_last_level_min_seqno,
  381. c->GetKeepInLastLevelThroughSeqno());
  382. options_file_number_ = versions_->options_file_number();
  383. }
  384. void CompactionJob::MaybeAssignCompactionProgressAndWriter(
  385. const CompactionProgress& compaction_progress,
  386. log::Writer* compaction_progress_writer) {
  387. // LIMITATION: Only supports resuming single subcompaction for now
  388. if (compact_->sub_compact_states.size() != 1) {
  389. return;
  390. }
  391. if (!compaction_progress.empty()) {
  392. assert(compaction_progress.size() == 1);
  393. SubcompactionState* sub_compact = &compact_->sub_compact_states[0];
  394. const SubcompactionProgress& subcompaction_progress =
  395. compaction_progress[0];
  396. sub_compact->SetSubcompactionProgress(subcompaction_progress);
  397. }
  398. compaction_progress_writer_ = compaction_progress_writer;
  399. }
  400. uint64_t CompactionJob::GetSubcompactionsLimit() {
  401. return extra_num_subcompaction_threads_reserved_ +
  402. std::max(
  403. std::uint64_t(1),
  404. static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
  405. }
  406. void CompactionJob::AcquireSubcompactionResources(
  407. int num_extra_required_subcompactions) {
  408. TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
  409. TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
  410. int max_db_compactions =
  411. DBImpl::GetBGJobLimits(
  412. mutable_db_options_copy_.max_background_flushes,
  413. mutable_db_options_copy_.max_background_compactions,
  414. mutable_db_options_copy_.max_background_jobs,
  415. versions_->GetColumnFamilySet()
  416. ->write_controller()
  417. ->NeedSpeedupCompaction())
  418. .max_compactions;
  419. InstrumentedMutexLock l(db_mutex_);
  420. // Apply min function first since We need to compute the extra subcompaction
  421. // against compaction limits. And then try to reserve threads for extra
  422. // subcompactions. The actual number of reserved threads could be less than
  423. // the desired number.
  424. int available_bg_compactions_against_db_limit =
  425. std::max(max_db_compactions - *bg_compaction_scheduled_ -
  426. *bg_bottom_compaction_scheduled_,
  427. 0);
  428. // Reservation only supports backgrdoun threads of which the priority is
  429. // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
  430. // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
  431. extra_num_subcompaction_threads_reserved_ =
  432. env_->ReserveThreads(std::min(num_extra_required_subcompactions,
  433. available_bg_compactions_against_db_limit),
  434. std::min(thread_pri_, Env::Priority::HIGH));
  435. // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
  436. // depending on if this compaction has the bottommost priority
  437. if (thread_pri_ == Env::Priority::BOTTOM) {
  438. *bg_bottom_compaction_scheduled_ +=
  439. extra_num_subcompaction_threads_reserved_;
  440. } else {
  441. *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
  442. }
  443. }
  444. void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
  445. // Do nothing when we have zero resources to shrink
  446. if (num_extra_resources == 0) {
  447. return;
  448. }
  449. db_mutex_->Lock();
  450. // We cannot release threads more than what we reserved before
  451. int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
  452. (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
  453. // Update the number of reserved threads and the number of background
  454. // scheduled compactions for this compaction job
  455. extra_num_subcompaction_threads_reserved_ -=
  456. extra_num_subcompaction_threads_released;
  457. // TODO (zichen): design a test case with new subcompaction partitioning
  458. // when the number of actual partitions is less than the number of planned
  459. // partitions
  460. assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
  461. // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
  462. // depending on if this compaction has the bottommost priority
  463. if (thread_pri_ == Env::Priority::BOTTOM) {
  464. *bg_bottom_compaction_scheduled_ -=
  465. extra_num_subcompaction_threads_released;
  466. } else {
  467. *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
  468. }
  469. db_mutex_->Unlock();
  470. TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
  471. }
  472. void CompactionJob::ReleaseSubcompactionResources() {
  473. if (extra_num_subcompaction_threads_reserved_ == 0) {
  474. return;
  475. }
  476. {
  477. InstrumentedMutexLock l(db_mutex_);
  478. // The number of reserved threads becomes larger than 0 only if the
  479. // compaction prioity is round robin and there is no sufficient
  480. // sub-compactions available
  481. // The scheduled compaction must be no less than 1 + extra number
  482. // subcompactions using acquired resources since this compaction job has not
  483. // finished yet
  484. assert(*bg_bottom_compaction_scheduled_ >=
  485. 1 + extra_num_subcompaction_threads_reserved_ ||
  486. *bg_compaction_scheduled_ >=
  487. 1 + extra_num_subcompaction_threads_reserved_);
  488. }
  489. ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
  490. }
  491. void CompactionJob::GenSubcompactionBoundaries() {
  492. // The goal is to find some boundary keys so that we can evenly partition
  493. // the compaction input data into max_subcompactions ranges.
  494. // For every input file, we ask TableReader to estimate 128 anchor points
  495. // that evenly partition the input file into 128 ranges and the range
  496. // sizes. This can be calculated by scanning index blocks of the file.
  497. // Once we have the anchor points for all the input files, we merge them
  498. // together and try to find keys dividing ranges evenly.
  499. // For example, if we have two input files, and each returns following
  500. // ranges:
  501. // File1: (a1, 1000), (b1, 1200), (c1, 1100)
  502. // File2: (a2, 1100), (b2, 1000), (c2, 1000)
  503. // We total sort the keys to following:
  504. // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
  505. // We calculate the total size by adding up all ranges' size, which is 6400.
  506. // If we would like to partition into 2 subcompactions, the target of the
  507. // range size is 3200. Based on the size, we take "b1" as the partition key
  508. // since the first three ranges would hit 3200.
  509. //
  510. // Note that the ranges are actually overlapping. For example, in the example
  511. // above, the range ending with "b1" is overlapping with the range ending with
  512. // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
  513. // "b1". In extreme cases where we only compact N L0 files, a range can
  514. // overlap with N-1 other ranges. Since we requested a relatively large number
  515. // (128) of ranges from each input files, even N range overlapping would
  516. // cause relatively small inaccuracy.
  517. ReadOptions read_options(Env::IOActivity::kCompaction);
  518. read_options.rate_limiter_priority = GetRateLimiterPriority();
  519. auto* c = compact_->compaction;
  520. if (c->mutable_cf_options().table_factory->Name() ==
  521. TableFactory::kPlainTableName()) {
  522. return;
  523. }
  524. if (c->max_subcompactions() <= 1 &&
  525. !(c->immutable_options().compaction_pri == kRoundRobin &&
  526. c->immutable_options().compaction_style == kCompactionStyleLevel)) {
  527. return;
  528. }
  529. auto* cfd = c->column_family_data();
  530. const Comparator* cfd_comparator = cfd->user_comparator();
  531. const InternalKeyComparator& icomp = cfd->internal_comparator();
  532. auto* v = compact_->compaction->input_version();
  533. int base_level = v->storage_info()->base_level();
  534. InstrumentedMutexUnlock unlock_guard(db_mutex_);
  535. uint64_t total_size = 0;
  536. std::vector<TableReader::Anchor> all_anchors;
  537. int start_lvl = c->start_level();
  538. int out_lvl = c->output_level();
  539. for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
  540. int lvl = c->level(lvl_idx);
  541. if (lvl >= start_lvl && lvl <= out_lvl) {
  542. const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
  543. size_t num_files = flevel->num_files;
  544. if (num_files == 0) {
  545. continue;
  546. }
  547. for (size_t i = 0; i < num_files; i++) {
  548. FileMetaData* f = flevel->files[i].file_metadata;
  549. std::vector<TableReader::Anchor> my_anchors;
  550. Status s = cfd->table_cache()->ApproximateKeyAnchors(
  551. read_options, icomp, *f, c->mutable_cf_options(), my_anchors);
  552. if (!s.ok() || my_anchors.empty()) {
  553. my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
  554. }
  555. for (auto& ac : my_anchors) {
  556. // Can be optimize to avoid this loop.
  557. total_size += ac.range_size;
  558. }
  559. all_anchors.insert(all_anchors.end(), my_anchors.begin(),
  560. my_anchors.end());
  561. }
  562. }
  563. }
  564. // Here we total sort all the anchor points across all files and go through
  565. // them in the sorted order to find partitioning boundaries.
  566. // Not the most efficient implementation. A much more efficient algorithm
  567. // probably exists. But they are more complex. If performance turns out to
  568. // be a problem, we can optimize.
  569. std::sort(
  570. all_anchors.begin(), all_anchors.end(),
  571. [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
  572. return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
  573. 0;
  574. });
  575. // Remove duplicated entries from boundaries.
  576. all_anchors.erase(
  577. std::unique(all_anchors.begin(), all_anchors.end(),
  578. [cfd_comparator](TableReader::Anchor& a,
  579. TableReader::Anchor& b) -> bool {
  580. return cfd_comparator->CompareWithoutTimestamp(
  581. a.user_key, b.user_key) == 0;
  582. }),
  583. all_anchors.end());
  584. // Get the number of planned subcompactions, may update reserve threads
  585. // and update extra_num_subcompaction_threads_reserved_ for round-robin
  586. uint64_t num_planned_subcompactions;
  587. if (c->immutable_options().compaction_pri == kRoundRobin &&
  588. c->immutable_options().compaction_style == kCompactionStyleLevel) {
  589. // For round-robin compaction prioity, we need to employ more
  590. // subcompactions (may exceed the max_subcompaction limit). The extra
  591. // subcompactions will be executed using reserved threads and taken into
  592. // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
  593. // Initialized by the number of input files
  594. num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
  595. uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
  596. if (max_subcompactions_limit < num_planned_subcompactions) {
  597. // Assert two pointers are not empty so that we can use extra
  598. // subcompactions against db compaction limits
  599. assert(bg_bottom_compaction_scheduled_ != nullptr);
  600. assert(bg_compaction_scheduled_ != nullptr);
  601. // Reserve resources when max_subcompaction is not sufficient
  602. AcquireSubcompactionResources(
  603. (int)(num_planned_subcompactions - max_subcompactions_limit));
  604. // Subcompactions limit changes after acquiring additional resources.
  605. // Need to call GetSubcompactionsLimit() again to update the number
  606. // of planned subcompactions
  607. num_planned_subcompactions =
  608. std::min(num_planned_subcompactions, GetSubcompactionsLimit());
  609. } else {
  610. num_planned_subcompactions = max_subcompactions_limit;
  611. }
  612. } else {
  613. num_planned_subcompactions = GetSubcompactionsLimit();
  614. }
  615. TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
  616. &num_planned_subcompactions);
  617. if (num_planned_subcompactions == 1) {
  618. return;
  619. }
  620. // Group the ranges into subcompactions
  621. uint64_t target_range_size = std::max(
  622. total_size / num_planned_subcompactions,
  623. MaxFileSizeForLevel(
  624. c->mutable_cf_options(), out_lvl,
  625. c->immutable_options().compaction_style, base_level,
  626. c->immutable_options().level_compaction_dynamic_level_bytes));
  627. if (target_range_size >= total_size) {
  628. return;
  629. }
  630. uint64_t next_threshold = target_range_size;
  631. uint64_t cumulative_size = 0;
  632. uint64_t num_actual_subcompactions = 1U;
  633. for (TableReader::Anchor& anchor : all_anchors) {
  634. cumulative_size += anchor.range_size;
  635. if (cumulative_size > next_threshold) {
  636. next_threshold += target_range_size;
  637. num_actual_subcompactions++;
  638. boundaries_.push_back(anchor.user_key);
  639. }
  640. if (num_actual_subcompactions == num_planned_subcompactions) {
  641. break;
  642. }
  643. }
  644. TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
  645. &num_actual_subcompactions);
  646. // Shrink extra subcompactions resources when extra resrouces are acquired
  647. ShrinkSubcompactionResources(
  648. std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
  649. extra_num_subcompaction_threads_reserved_));
  650. }
  651. void CompactionJob::InitializeCompactionRun() {
  652. AutoThreadOperationStageUpdater stage_updater(
  653. ThreadStatus::STAGE_COMPACTION_RUN);
  654. TEST_SYNC_POINT("CompactionJob::Run():Start");
  655. log_buffer_->FlushBufferToLog();
  656. LogCompaction();
  657. }
  658. void CompactionJob::RunSubcompactions() {
  659. const size_t num_threads = compact_->sub_compact_states.size();
  660. assert(num_threads > 0);
  661. compact_->compaction->GetOrInitInputTableProperties();
  662. // Launch a thread for each of subcompactions 1...num_threads-1
  663. std::vector<port::Thread> thread_pool;
  664. thread_pool.reserve(num_threads - 1);
  665. for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
  666. thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
  667. &compact_->sub_compact_states[i]);
  668. }
  669. // Always schedule the first subcompaction (whether or not there are also
  670. // others) in the current thread to be efficient with resources
  671. ProcessKeyValueCompaction(compact_->sub_compact_states.data());
  672. // Wait for all other threads (if there are any) to finish execution
  673. for (auto& thread : thread_pool) {
  674. thread.join();
  675. }
  676. RemoveEmptyOutputs();
  677. ReleaseSubcompactionResources();
  678. TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources");
  679. }
  680. void CompactionJob::UpdateTimingStats(uint64_t start_micros) {
  681. internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
  682. for (auto& state : compact_->sub_compact_states) {
  683. internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
  684. }
  685. RecordTimeToHistogram(stats_, COMPACTION_TIME,
  686. internal_stats_.output_level_stats.micros);
  687. RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
  688. internal_stats_.output_level_stats.cpu_micros);
  689. }
  690. void CompactionJob::RemoveEmptyOutputs() {
  691. for (auto& state : compact_->sub_compact_states) {
  692. state.RemoveLastEmptyOutput();
  693. }
  694. }
  695. bool CompactionJob::HasNewBlobFiles() const {
  696. for (const auto& state : compact_->sub_compact_states) {
  697. if (state.Current().HasBlobFileAdditions()) {
  698. return true;
  699. }
  700. }
  701. return false;
  702. }
  703. Status CompactionJob::CollectSubcompactionErrors() {
  704. Status status;
  705. IOStatus io_s;
  706. for (const auto& state : compact_->sub_compact_states) {
  707. if (!state.status.ok()) {
  708. status = state.status;
  709. io_s = state.io_status;
  710. break;
  711. }
  712. }
  713. if (io_status_.ok()) {
  714. io_status_ = io_s;
  715. }
  716. return status;
  717. }
  718. Status CompactionJob::SyncOutputDirectories() {
  719. Status status;
  720. IOStatus io_s;
  721. constexpr IODebugContext* dbg = nullptr;
  722. const bool wrote_new_blob_files = HasNewBlobFiles();
  723. if (output_directory_) {
  724. io_s = output_directory_->FsyncWithDirOptions(
  725. IOOptions(), dbg,
  726. DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
  727. }
  728. if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
  729. blob_output_directory_ != output_directory_) {
  730. io_s = blob_output_directory_->FsyncWithDirOptions(
  731. IOOptions(), dbg,
  732. DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
  733. }
  734. if (io_status_.ok()) {
  735. io_status_ = io_s;
  736. }
  737. if (status.ok()) {
  738. status = io_s;
  739. }
  740. return status;
  741. }
  742. Status CompactionJob::VerifyOutputFiles() {
  743. Status status;
  744. std::vector<port::Thread> thread_pool;
  745. std::vector<const CompactionOutputs::Output*> files_output;
  746. for (const auto& state : compact_->sub_compact_states) {
  747. for (const auto& output : state.GetOutputs()) {
  748. files_output.emplace_back(&output);
  749. }
  750. }
  751. ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  752. std::atomic<size_t> next_file_idx(0);
  753. auto verify_table = [&](Status& output_status) {
  754. while (true) {
  755. size_t file_idx = next_file_idx.fetch_add(1);
  756. if (file_idx >= files_output.size()) {
  757. break;
  758. }
  759. // Verify that the table is usable
  760. // We set for_compaction to false and don't
  761. // OptimizeForCompactionTableRead here because this is a special case
  762. // after we finish the table building No matter whether
  763. // use_direct_io_for_flush_and_compaction is true, we will regard this
  764. // verification as user reads since the goal is to cache it here for
  765. // further user reads
  766. ReadOptions verify_table_read_options(Env::IOActivity::kCompaction);
  767. verify_table_read_options.rate_limiter_priority =
  768. GetRateLimiterPriority();
  769. InternalIterator* iter = cfd->table_cache()->NewIterator(
  770. verify_table_read_options, file_options_, cfd->internal_comparator(),
  771. files_output[file_idx]->meta,
  772. /*range_del_agg=*/nullptr, compact_->compaction->mutable_cf_options(),
  773. /*table_reader_ptr=*/nullptr,
  774. cfd->internal_stats()->GetFileReadHist(
  775. compact_->compaction->output_level()),
  776. TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
  777. /*skip_filters=*/false, compact_->compaction->output_level(),
  778. MaxFileSizeForL0MetaPin(compact_->compaction->mutable_cf_options()),
  779. /*smallest_compaction_key=*/nullptr,
  780. /*largest_compaction_key=*/nullptr,
  781. /*allow_unprepared_value=*/false);
  782. auto s = iter->status();
  783. if (s.ok() && paranoid_file_checks_) {
  784. OutputValidator validator(cfd->internal_comparator(),
  785. /*_enable_hash=*/true);
  786. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  787. s = validator.Add(iter->key(), iter->value());
  788. if (!s.ok()) {
  789. break;
  790. }
  791. }
  792. if (s.ok()) {
  793. s = iter->status();
  794. }
  795. if (s.ok() &&
  796. !validator.CompareValidator(files_output[file_idx]->validator)) {
  797. s = Status::Corruption("Paranoid checksums do not match");
  798. }
  799. }
  800. delete iter;
  801. if (!s.ok()) {
  802. output_status = s;
  803. break;
  804. }
  805. }
  806. };
  807. for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
  808. thread_pool.emplace_back(verify_table,
  809. std::ref(compact_->sub_compact_states[i].status));
  810. }
  811. verify_table(compact_->sub_compact_states[0].status);
  812. for (auto& thread : thread_pool) {
  813. thread.join();
  814. }
  815. for (const auto& state : compact_->sub_compact_states) {
  816. if (!state.status.ok()) {
  817. status = state.status;
  818. break;
  819. }
  820. }
  821. return status;
  822. }
  823. void CompactionJob::SetOutputTableProperties() {
  824. for (const auto& state : compact_->sub_compact_states) {
  825. for (const auto& output : state.GetOutputs()) {
  826. auto fn =
  827. TableFileName(state.compaction->immutable_options().cf_paths,
  828. output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
  829. compact_->compaction->SetOutputTableProperties(fn,
  830. output.table_properties);
  831. }
  832. }
  833. }
  834. void CompactionJob::AggregateSubcompactionOutputAndJobStats() {
  835. // Before the compaction starts, is_remote_compaction was set to true if
  836. // compaction_service is set. We now know whether each sub_compaction was
  837. // done remotely or not. Reset is_remote_compaction back to false and allow
  838. // AggregateCompactionStats() to set the right value.
  839. job_stats_->is_remote_compaction = false;
  840. // Finish up all bookkeeping to unify the subcompaction results.
  841. compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
  842. }
  843. Status CompactionJob::VerifyCompactionRecordCounts(
  844. bool stats_built_from_input_table_prop, uint64_t num_input_range_del) {
  845. Status status;
  846. if (stats_built_from_input_table_prop &&
  847. job_stats_->has_accurate_num_input_records) {
  848. status = VerifyInputRecordCount(num_input_range_del);
  849. if (!status.ok()) {
  850. return status;
  851. }
  852. }
  853. const auto& mutable_cf_options = compact_->compaction->mutable_cf_options();
  854. if ((mutable_cf_options.table_factory->IsInstanceOf(
  855. TableFactory::kBlockBasedTableName()) ||
  856. mutable_cf_options.table_factory->IsInstanceOf(
  857. TableFactory::kPlainTableName()))) {
  858. status = VerifyOutputRecordCount();
  859. if (!status.ok()) {
  860. return status;
  861. }
  862. }
  863. return status;
  864. }
  865. void CompactionJob::FinalizeCompactionRun(
  866. const Status& input_status, bool stats_built_from_input_table_prop,
  867. uint64_t num_input_range_del) {
  868. if (stats_built_from_input_table_prop) {
  869. UpdateCompactionJobInputStatsFromInternalStats(internal_stats_,
  870. num_input_range_del);
  871. }
  872. UpdateCompactionJobOutputStatsFromInternalStats(input_status,
  873. internal_stats_);
  874. RecordCompactionIOStats();
  875. LogFlush(db_options_.info_log);
  876. TEST_SYNC_POINT("CompactionJob::Run():End");
  877. compact_->status = input_status;
  878. TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet",
  879. const_cast<Status*>(&input_status));
  880. }
  881. Status CompactionJob::Run() {
  882. InitializeCompactionRun();
  883. const uint64_t start_micros = db_options_.clock->NowMicros();
  884. RunSubcompactions();
  885. UpdateTimingStats(start_micros);
  886. TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
  887. Status status = CollectSubcompactionErrors();
  888. if (status.ok()) {
  889. status = SyncOutputDirectories();
  890. }
  891. if (status.ok()) {
  892. status = VerifyOutputFiles();
  893. }
  894. if (status.ok()) {
  895. SetOutputTableProperties();
  896. }
  897. AggregateSubcompactionOutputAndJobStats();
  898. uint64_t num_input_range_del = 0;
  899. bool stats_built_from_input_table_prop =
  900. UpdateInternalStatsFromInputFiles(&num_input_range_del);
  901. if (status.ok()) {
  902. status = VerifyCompactionRecordCounts(stats_built_from_input_table_prop,
  903. num_input_range_del);
  904. }
  905. FinalizeCompactionRun(status, stats_built_from_input_table_prop,
  906. num_input_range_del);
  907. return status;
  908. }
  909. Status CompactionJob::Install(bool* compaction_released) {
  910. assert(compact_);
  911. AutoThreadOperationStageUpdater stage_updater(
  912. ThreadStatus::STAGE_COMPACTION_INSTALL);
  913. db_mutex_->AssertHeld();
  914. Status status = compact_->status;
  915. ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  916. assert(cfd);
  917. int output_level = compact_->compaction->output_level();
  918. cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
  919. internal_stats_);
  920. if (status.ok()) {
  921. status = InstallCompactionResults(compaction_released);
  922. }
  923. if (!versions_->io_status().ok()) {
  924. io_status_ = versions_->io_status();
  925. }
  926. VersionStorageInfo::LevelSummaryStorage tmp;
  927. auto vstorage = cfd->current()->storage_info();
  928. const auto& stats = internal_stats_.output_level_stats;
  929. double read_write_amp = 0.0;
  930. double write_amp = 0.0;
  931. double bytes_read_per_sec = 0;
  932. double bytes_written_per_sec = 0;
  933. const uint64_t bytes_read_non_output_and_blob =
  934. stats.bytes_read_non_output_levels + stats.bytes_read_blob;
  935. const uint64_t bytes_read_all =
  936. stats.bytes_read_output_level + bytes_read_non_output_and_blob;
  937. const uint64_t bytes_written_all =
  938. stats.bytes_written + stats.bytes_written_blob;
  939. if (bytes_read_non_output_and_blob > 0) {
  940. read_write_amp = (bytes_written_all + bytes_read_all) /
  941. static_cast<double>(bytes_read_non_output_and_blob);
  942. write_amp =
  943. bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
  944. }
  945. if (stats.micros > 0) {
  946. bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
  947. bytes_written_per_sec =
  948. bytes_written_all / static_cast<double>(stats.micros);
  949. }
  950. const std::string& column_family_name = cfd->GetName();
  951. constexpr double kMB = 1048576.0;
  952. ROCKS_LOG_BUFFER(
  953. log_buffer_,
  954. "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
  955. "files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
  956. "MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
  957. "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
  958. ", records dropped: %" PRIu64 " output_compression: %s\n",
  959. column_family_name.c_str(), vstorage->LevelSummary(&tmp),
  960. bytes_read_per_sec, bytes_written_per_sec,
  961. compact_->compaction->output_level(),
  962. stats.num_input_files_in_non_output_levels,
  963. stats.num_input_files_in_output_level,
  964. stats.num_filtered_input_files_in_non_output_levels,
  965. stats.num_filtered_input_files_in_output_level, stats.num_output_files,
  966. stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
  967. stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
  968. stats.bytes_skipped_non_output_levels / kMB,
  969. stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
  970. stats.bytes_written_blob / kMB, read_write_amp, write_amp,
  971. status.ToString().c_str(), stats.num_input_records,
  972. stats.num_dropped_records,
  973. CompressionTypeToString(compact_->compaction->output_compression())
  974. .c_str());
  975. const auto& blob_files = vstorage->GetBlobFiles();
  976. if (!blob_files.empty()) {
  977. assert(blob_files.front());
  978. assert(blob_files.back());
  979. ROCKS_LOG_BUFFER(
  980. log_buffer_,
  981. "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
  982. column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
  983. blob_files.back()->GetBlobFileNumber());
  984. }
  985. if (internal_stats_.has_proximal_level_output) {
  986. ROCKS_LOG_BUFFER(log_buffer_,
  987. "[%s] has Proximal Level output: %" PRIu64
  988. ", level %d, number of files: %" PRIu64
  989. ", number of records: %" PRIu64,
  990. column_family_name.c_str(),
  991. internal_stats_.proximal_level_stats.bytes_written,
  992. compact_->compaction->GetProximalLevel(),
  993. internal_stats_.proximal_level_stats.num_output_files,
  994. internal_stats_.proximal_level_stats.num_output_records);
  995. }
  996. TEST_SYNC_POINT_CALLBACK(
  997. "CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_);
  998. auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
  999. stream << "job" << job_id_ << "event" << "compaction_finished"
  1000. << "compaction_time_micros" << stats.micros
  1001. << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
  1002. << compact_->compaction->output_level() << "num_output_files"
  1003. << stats.num_output_files << "total_output_size"
  1004. << stats.bytes_written;
  1005. if (stats.num_output_files_blob > 0) {
  1006. stream << "num_blob_output_files" << stats.num_output_files_blob
  1007. << "total_blob_output_size" << stats.bytes_written_blob;
  1008. }
  1009. stream << "num_input_records" << stats.num_input_records
  1010. << "num_output_records" << stats.num_output_records
  1011. << "num_subcompactions" << compact_->sub_compact_states.size()
  1012. << "output_compression"
  1013. << CompressionTypeToString(compact_->compaction->output_compression());
  1014. stream << "num_single_delete_mismatches"
  1015. << job_stats_->num_single_del_mismatch;
  1016. stream << "num_single_delete_fallthrough"
  1017. << job_stats_->num_single_del_fallthru;
  1018. if (measure_io_stats_) {
  1019. stream << "file_write_nanos" << job_stats_->file_write_nanos;
  1020. stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos;
  1021. stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos;
  1022. stream << "file_prepare_write_nanos"
  1023. << job_stats_->file_prepare_write_nanos;
  1024. }
  1025. stream << "lsm_state";
  1026. stream.StartArray();
  1027. for (int level = 0; level < vstorage->num_levels(); ++level) {
  1028. stream << vstorage->NumLevelFiles(level);
  1029. }
  1030. stream.EndArray();
  1031. if (!blob_files.empty()) {
  1032. assert(blob_files.front());
  1033. stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
  1034. assert(blob_files.back());
  1035. stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
  1036. }
  1037. if (internal_stats_.has_proximal_level_output) {
  1038. InternalStats::CompactionStats& pl_stats =
  1039. internal_stats_.proximal_level_stats;
  1040. stream << "proximal_level_num_output_files" << pl_stats.num_output_files;
  1041. stream << "proximal_level_bytes_written" << pl_stats.bytes_written;
  1042. stream << "proximal_level_num_output_records"
  1043. << pl_stats.num_output_records;
  1044. stream << "proximal_level_num_output_files_blob"
  1045. << pl_stats.num_output_files_blob;
  1046. stream << "proximal_level_bytes_written_blob"
  1047. << pl_stats.bytes_written_blob;
  1048. }
  1049. CleanupCompaction();
  1050. return status;
  1051. }
  1052. void CompactionJob::NotifyOnSubcompactionBegin(
  1053. SubcompactionState* sub_compact) {
  1054. Compaction* c = compact_->compaction;
  1055. if (db_options_.listeners.empty()) {
  1056. return;
  1057. }
  1058. if (shutting_down_->load(std::memory_order_acquire)) {
  1059. return;
  1060. }
  1061. if (c->is_manual_compaction() &&
  1062. manual_compaction_canceled_.load(std::memory_order_acquire)) {
  1063. return;
  1064. }
  1065. sub_compact->notify_on_subcompaction_completion = true;
  1066. SubcompactionJobInfo info{};
  1067. sub_compact->BuildSubcompactionJobInfo(info);
  1068. info.job_id = static_cast<int>(job_id_);
  1069. info.thread_id = env_->GetThreadID();
  1070. for (const auto& listener : db_options_.listeners) {
  1071. listener->OnSubcompactionBegin(info);
  1072. }
  1073. info.status.PermitUncheckedError();
  1074. }
  1075. void CompactionJob::NotifyOnSubcompactionCompleted(
  1076. SubcompactionState* sub_compact) {
  1077. if (db_options_.listeners.empty()) {
  1078. return;
  1079. }
  1080. if (shutting_down_->load(std::memory_order_acquire)) {
  1081. return;
  1082. }
  1083. if (sub_compact->notify_on_subcompaction_completion == false) {
  1084. return;
  1085. }
  1086. SubcompactionJobInfo info{};
  1087. sub_compact->BuildSubcompactionJobInfo(info);
  1088. info.job_id = static_cast<int>(job_id_);
  1089. info.thread_id = env_->GetThreadID();
  1090. for (const auto& listener : db_options_.listeners) {
  1091. listener->OnSubcompactionCompleted(info);
  1092. }
  1093. }
  1094. bool CompactionJob::ShouldUseLocalCompaction(SubcompactionState* sub_compact) {
  1095. if (db_options_.compaction_service) {
  1096. CompactionServiceJobStatus comp_status =
  1097. ProcessKeyValueCompactionWithCompactionService(sub_compact);
  1098. if (comp_status != CompactionServiceJobStatus::kUseLocal) {
  1099. return false;
  1100. }
  1101. // fallback to local compaction
  1102. assert(comp_status == CompactionServiceJobStatus::kUseLocal);
  1103. sub_compact->compaction_job_stats.is_remote_compaction = false;
  1104. }
  1105. return true;
  1106. }
  1107. CompactionJob::CompactionIOStatsSnapshot CompactionJob::InitializeIOStats() {
  1108. CompactionIOStatsSnapshot io_stats;
  1109. if (measure_io_stats_) {
  1110. io_stats.prev_perf_level = GetPerfLevel();
  1111. SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
  1112. io_stats.prev_write_nanos = IOSTATS(write_nanos);
  1113. io_stats.prev_fsync_nanos = IOSTATS(fsync_nanos);
  1114. io_stats.prev_range_sync_nanos = IOSTATS(range_sync_nanos);
  1115. io_stats.prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  1116. io_stats.prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
  1117. io_stats.prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
  1118. }
  1119. return io_stats;
  1120. }
  1121. Status CompactionJob::SetupAndValidateCompactionFilter(
  1122. SubcompactionState* sub_compact,
  1123. const CompactionFilter* configured_compaction_filter,
  1124. const CompactionFilter*& compaction_filter,
  1125. std::unique_ptr<CompactionFilter>& compaction_filter_from_factory) {
  1126. compaction_filter = configured_compaction_filter;
  1127. if (compaction_filter == nullptr) {
  1128. compaction_filter_from_factory =
  1129. sub_compact->compaction->CreateCompactionFilter();
  1130. compaction_filter = compaction_filter_from_factory.get();
  1131. }
  1132. if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
  1133. return Status::NotSupported(
  1134. "CompactionFilter::IgnoreSnapshots() = false is not supported "
  1135. "anymore.");
  1136. }
  1137. return Status::OK();
  1138. }
  1139. void CompactionJob::InitializeReadOptionsAndBoundaries(
  1140. const size_t ts_sz, ReadOptions& read_options,
  1141. SubcompactionKeyBoundaries& boundaries) {
  1142. read_options.verify_checksums = true;
  1143. read_options.fill_cache = false;
  1144. read_options.rate_limiter_priority = GetRateLimiterPriority();
  1145. read_options.io_activity = Env::IOActivity::kCompaction;
  1146. // Compaction iterators shouldn't be confined to a single prefix.
  1147. // Compactions use Seek() for
  1148. // (a) concurrent compactions,
  1149. // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
  1150. read_options.total_order_seek = true;
  1151. // Remove the timestamps from boundaries because boundaries created in
  1152. // GenSubcompactionBoundaries doesn't strip away the timestamp.
  1153. if (boundaries.start.has_value()) {
  1154. read_options.iterate_lower_bound = &(*boundaries.start);
  1155. if (ts_sz > 0) {
  1156. boundaries.start_without_ts =
  1157. StripTimestampFromUserKey(*boundaries.start, ts_sz);
  1158. read_options.iterate_lower_bound = &(*boundaries.start_without_ts);
  1159. }
  1160. }
  1161. if (boundaries.end.has_value()) {
  1162. read_options.iterate_upper_bound = &(*boundaries.end);
  1163. if (ts_sz > 0) {
  1164. boundaries.end_without_ts =
  1165. StripTimestampFromUserKey(*boundaries.end, ts_sz);
  1166. read_options.iterate_upper_bound = &(*boundaries.end_without_ts);
  1167. }
  1168. }
  1169. if (ts_sz > 0) {
  1170. if (ts_sz <= strlen(boundaries.kMaxTs)) {
  1171. boundaries.ts_slice = Slice(boundaries.kMaxTs, ts_sz);
  1172. } else {
  1173. boundaries.max_ts = std::string(ts_sz, '\xff');
  1174. boundaries.ts_slice = Slice(boundaries.max_ts);
  1175. }
  1176. }
  1177. if (boundaries.start.has_value()) {
  1178. boundaries.start_ikey.SetInternalKey(*boundaries.start, kMaxSequenceNumber,
  1179. kValueTypeForSeek);
  1180. if (ts_sz > 0) {
  1181. boundaries.start_ikey.UpdateInternalKey(
  1182. kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
  1183. }
  1184. boundaries.start_internal_key = boundaries.start_ikey.GetInternalKey();
  1185. boundaries.start_user_key = boundaries.start_ikey.GetUserKey();
  1186. }
  1187. if (boundaries.end.has_value()) {
  1188. boundaries.end_ikey.SetInternalKey(*boundaries.end, kMaxSequenceNumber,
  1189. kValueTypeForSeek);
  1190. if (ts_sz > 0) {
  1191. boundaries.end_ikey.UpdateInternalKey(
  1192. kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
  1193. }
  1194. boundaries.end_internal_key = boundaries.end_ikey.GetInternalKey();
  1195. boundaries.end_user_key = boundaries.end_ikey.GetUserKey();
  1196. }
  1197. }
  1198. InternalIterator* CompactionJob::CreateInputIterator(
  1199. SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  1200. SubcompactionInternalIterators& iterators,
  1201. SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options) {
  1202. const size_t ts_sz = cfd->user_comparator()->timestamp_size();
  1203. InitializeReadOptionsAndBoundaries(ts_sz, read_options, boundaries);
  1204. // This is assigned after creation of SubcompactionState to simplify that
  1205. // creation across both CompactionJob and CompactionServiceCompactionJob
  1206. sub_compact->AssignRangeDelAggregator(
  1207. std::make_unique<CompactionRangeDelAggregator>(
  1208. &cfd->internal_comparator(), job_context_->snapshot_seqs,
  1209. &full_history_ts_low_, &trim_ts_));
  1210. // Although the v2 aggregator is what the level iterator(s) know about,
  1211. // the AddTombstones calls will be propagated down to the v1 aggregator.
  1212. iterators.raw_input =
  1213. std::unique_ptr<InternalIterator>(versions_->MakeInputIterator(
  1214. read_options, sub_compact->compaction, sub_compact->RangeDelAgg(),
  1215. file_options_for_read_, boundaries.start, boundaries.end));
  1216. InternalIterator* input = iterators.raw_input.get();
  1217. if (boundaries.start.has_value() || boundaries.end.has_value()) {
  1218. iterators.clip = std::make_unique<ClippingIterator>(
  1219. iterators.raw_input.get(),
  1220. boundaries.start.has_value() ? &boundaries.start_internal_key : nullptr,
  1221. boundaries.end.has_value() ? &boundaries.end_internal_key : nullptr,
  1222. &cfd->internal_comparator());
  1223. input = iterators.clip.get();
  1224. }
  1225. if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
  1226. BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
  1227. iterators.blob_counter =
  1228. std::make_unique<BlobCountingIterator>(input, meter);
  1229. input = iterators.blob_counter.get();
  1230. }
  1231. if (ts_sz > 0 && !trim_ts_.empty()) {
  1232. iterators.trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
  1233. input, cfd->user_comparator(), trim_ts_);
  1234. input = iterators.trim_history_iter.get();
  1235. }
  1236. return input;
  1237. }
  1238. void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
  1239. ColumnFamilyData* cfd,
  1240. BlobFileResources& blob_resources,
  1241. const WriteOptions& write_options) {
  1242. const auto& mutable_cf_options =
  1243. sub_compact->compaction->mutable_cf_options();
  1244. // TODO: BlobDB to support output_to_proximal_level compaction, which needs
  1245. // 2 builders, so may need to move to `CompactionOutputs`
  1246. if (mutable_cf_options.enable_blob_files &&
  1247. sub_compact->compaction->output_level() >=
  1248. mutable_cf_options.blob_file_starting_level) {
  1249. blob_resources.blob_file_builder = std::make_unique<BlobFileBuilder>(
  1250. versions_, fs_.get(), &sub_compact->compaction->immutable_options(),
  1251. &mutable_cf_options, &file_options_, &write_options, db_id_,
  1252. db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_,
  1253. io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
  1254. &blob_resources.blob_file_paths,
  1255. sub_compact->Current().GetBlobFileAdditionsPtr());
  1256. } else {
  1257. blob_resources.blob_file_builder = nullptr;
  1258. }
  1259. }
  1260. std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
  1261. SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  1262. InternalIterator* input, const CompactionFilter* compaction_filter,
  1263. MergeHelper& merge, BlobFileResources& blob_resources,
  1264. const WriteOptions& write_options) {
  1265. CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options);
  1266. const std::string* const full_history_ts_low =
  1267. full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
  1268. assert(job_context_);
  1269. return std::make_unique<CompactionIterator>(
  1270. input, cfd->user_comparator(), &merge, versions_->LastSequence(),
  1271. &(job_context_->snapshot_seqs), earliest_snapshot_,
  1272. job_context_->earliest_write_conflict_snapshot,
  1273. job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
  1274. env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(),
  1275. blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors,
  1276. db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
  1277. sub_compact->compaction
  1278. ->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
  1279. sub_compact->compaction, compaction_filter, shutting_down_,
  1280. db_options_.info_log, full_history_ts_low, preserve_seqno_after_);
  1281. }
  1282. std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc>
  1283. CompactionJob::CreateFileHandlers(SubcompactionState* sub_compact,
  1284. SubcompactionKeyBoundaries& boundaries) {
  1285. const CompactionFileOpenFunc open_file_func =
  1286. [this, sub_compact](CompactionOutputs& outputs) {
  1287. return this->OpenCompactionOutputFile(sub_compact, outputs);
  1288. };
  1289. const Slice* start_user_key =
  1290. sub_compact->start.has_value() ? &boundaries.start_user_key : nullptr;
  1291. const Slice* end_user_key =
  1292. sub_compact->end.has_value() ? &boundaries.end_user_key : nullptr;
  1293. const CompactionFileCloseFunc close_file_func =
  1294. [this, sub_compact, start_user_key, end_user_key](
  1295. const Status& status,
  1296. const ParsedInternalKey& prev_table_last_internal_key,
  1297. const Slice& next_table_min_key, const CompactionIterator* c_iter,
  1298. CompactionOutputs& outputs) {
  1299. return this->FinishCompactionOutputFile(
  1300. status, prev_table_last_internal_key, next_table_min_key,
  1301. start_user_key, end_user_key, c_iter, sub_compact, outputs);
  1302. };
  1303. return {open_file_func, close_file_func};
  1304. }
  1305. Status CompactionJob::ProcessKeyValue(
  1306. SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  1307. CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func,
  1308. const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) {
  1309. Status status;
  1310. const uint64_t kRecordStatsEvery = 1000;
  1311. [[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;
  1312. IterKey last_output_key;
  1313. ParsedInternalKey last_output_ikey;
  1314. TEST_SYNC_POINT_CALLBACK(
  1315. "CompactionJob::ProcessKeyValueCompaction()::Processing",
  1316. static_cast<void*>(const_cast<Compaction*>(sub_compact->compaction)));
  1317. while (status.ok() && !cfd->IsDropped() && c_iter->Valid() &&
  1318. c_iter->status().ok()) {
  1319. assert(!end.has_value() ||
  1320. cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
  1321. if (c_iter->iter_stats().num_input_records % kRecordStatsEvery ==
  1322. kRecordStatsEvery - 1) {
  1323. UpdateSubcompactionJobStatsIncrementally(
  1324. c_iter, &sub_compact->compaction_job_stats,
  1325. db_options_.clock->CPUMicros(), prev_cpu_micros);
  1326. }
  1327. const auto& ikey = c_iter->ikey();
  1328. bool use_proximal_output = ikey.sequence > proximal_after_seqno_;
  1329. #ifndef NDEBUG
  1330. if (sub_compact->compaction->SupportsPerKeyPlacement()) {
  1331. PerKeyPlacementContext context(sub_compact->compaction->output_level(),
  1332. ikey.user_key, c_iter->value(),
  1333. ikey.sequence, use_proximal_output);
  1334. TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
  1335. &context);
  1336. if (use_proximal_output) {
  1337. // Verify that entries sent to the proximal level are within the
  1338. // allowed range (because the input key range of the last level could
  1339. // be larger than the allowed output key range of the proximal
  1340. // level). This check uses user keys (ignores sequence numbers) because
  1341. // compaction boundaries are a "clean cut" between user keys (see
  1342. // CompactionPicker::ExpandInputsToCleanCut()), which is especially
  1343. // important when preferred sequence numbers has been swapped in for
  1344. // kTypeValuePreferredSeqno / TimedPut.
  1345. sub_compact->compaction->TEST_AssertWithinProximalLevelOutputRange(
  1346. c_iter->user_key());
  1347. }
  1348. } else {
  1349. assert(proximal_after_seqno_ == kMaxSequenceNumber);
  1350. assert(!use_proximal_output);
  1351. }
  1352. #endif // NDEBUG
  1353. // Add current compaction_iterator key to target compaction output, if the
  1354. // output file needs to be close or open, it will call the `open_file_func`
  1355. // and `close_file_func`.
  1356. // TODO: it would be better to have the compaction file open/close moved
  1357. // into `CompactionOutputs` which has the output file information.
  1358. status =
  1359. sub_compact->AddToOutput(*c_iter, use_proximal_output, open_file_func,
  1360. close_file_func, last_output_ikey);
  1361. if (!status.ok()) {
  1362. break;
  1363. }
  1364. TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2",
  1365. static_cast<void*>(const_cast<std::atomic<bool>*>(
  1366. &manual_compaction_canceled_)));
  1367. last_output_key.SetInternalKey(c_iter->key(), &last_output_ikey);
  1368. last_output_ikey.sequence = ikey.sequence;
  1369. last_output_ikey.type = ikey.type;
  1370. c_iter->Next();
  1371. #ifndef NDEBUG
  1372. bool stop = false;
  1373. TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop",
  1374. static_cast<void*>(&stop));
  1375. if (stop) {
  1376. break;
  1377. }
  1378. #endif // NDEBUG
  1379. }
  1380. return status;
  1381. }
  1382. void CompactionJob::UpdateSubcompactionJobStatsIncrementally(
  1383. CompactionIterator* c_iter, CompactionJobStats* compaction_job_stats,
  1384. uint64_t cur_cpu_micros, uint64_t& prev_cpu_micros) {
  1385. RecordDroppedKeys(c_iter->iter_stats(), compaction_job_stats);
  1386. c_iter->ResetRecordCounts();
  1387. RecordCompactionIOStats();
  1388. assert(cur_cpu_micros >= prev_cpu_micros);
  1389. RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME,
  1390. cur_cpu_micros - prev_cpu_micros);
  1391. prev_cpu_micros = cur_cpu_micros;
  1392. }
  1393. void CompactionJob::FinalizeSubcompactionJobStats(
  1394. SubcompactionState* sub_compact, CompactionIterator* c_iter,
  1395. uint64_t start_cpu_micros, uint64_t prev_cpu_micros,
  1396. const CompactionIOStatsSnapshot& io_stats) {
  1397. const CompactionIterationStats& c_iter_stats = c_iter->iter_stats();
  1398. assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() ||
  1399. c_iter->HasNumInputEntryScanned());
  1400. sub_compact->compaction_job_stats.has_accurate_num_input_records &=
  1401. c_iter->HasNumInputEntryScanned();
  1402. sub_compact->compaction_job_stats.num_input_records +=
  1403. c_iter->NumInputEntryScanned();
  1404. sub_compact->compaction_job_stats.num_blobs_read =
  1405. c_iter_stats.num_blobs_read;
  1406. sub_compact->compaction_job_stats.total_blob_bytes_read =
  1407. c_iter_stats.total_blob_bytes_read;
  1408. sub_compact->compaction_job_stats.num_input_deletion_records =
  1409. c_iter_stats.num_input_deletion_records;
  1410. sub_compact->compaction_job_stats.num_corrupt_keys =
  1411. c_iter_stats.num_input_corrupt_records;
  1412. sub_compact->compaction_job_stats.num_single_del_fallthru =
  1413. c_iter_stats.num_single_del_fallthru;
  1414. sub_compact->compaction_job_stats.num_single_del_mismatch =
  1415. c_iter_stats.num_single_del_mismatch;
  1416. sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
  1417. c_iter_stats.total_input_raw_key_bytes;
  1418. sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
  1419. c_iter_stats.total_input_raw_value_bytes;
  1420. RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
  1421. c_iter_stats.total_filter_time);
  1422. if (c_iter_stats.num_blobs_relocated > 0) {
  1423. RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
  1424. c_iter_stats.num_blobs_relocated);
  1425. }
  1426. if (c_iter_stats.total_blob_bytes_relocated > 0) {
  1427. RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
  1428. c_iter_stats.total_blob_bytes_relocated);
  1429. }
  1430. uint64_t cur_cpu_micros = db_options_.clock->CPUMicros();
  1431. // Record final compaction statistics including dropped keys, I/O stats,
  1432. // and CPU time delta from the last periodic measurement
  1433. UpdateSubcompactionJobStatsIncrementally(c_iter,
  1434. &sub_compact->compaction_job_stats,
  1435. cur_cpu_micros, prev_cpu_micros);
  1436. // Finalize timing and I/O statistics
  1437. sub_compact->compaction_job_stats.cpu_micros =
  1438. cur_cpu_micros - start_cpu_micros + sub_compact->GetWorkerCPUMicros();
  1439. if (measure_io_stats_) {
  1440. sub_compact->compaction_job_stats.file_write_nanos +=
  1441. IOSTATS(write_nanos) - io_stats.prev_write_nanos;
  1442. sub_compact->compaction_job_stats.file_fsync_nanos +=
  1443. IOSTATS(fsync_nanos) - io_stats.prev_fsync_nanos;
  1444. sub_compact->compaction_job_stats.file_range_sync_nanos +=
  1445. IOSTATS(range_sync_nanos) - io_stats.prev_range_sync_nanos;
  1446. sub_compact->compaction_job_stats.file_prepare_write_nanos +=
  1447. IOSTATS(prepare_write_nanos) - io_stats.prev_prepare_write_nanos;
  1448. sub_compact->compaction_job_stats.cpu_micros -=
  1449. (IOSTATS(cpu_write_nanos) - io_stats.prev_cpu_write_nanos +
  1450. IOSTATS(cpu_read_nanos) - io_stats.prev_cpu_read_nanos) /
  1451. 1000;
  1452. if (io_stats.prev_perf_level !=
  1453. PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
  1454. SetPerfLevel(io_stats.prev_perf_level);
  1455. }
  1456. }
  1457. }
  1458. Status CompactionJob::FinalizeProcessKeyValueStatus(
  1459. ColumnFamilyData* cfd, InternalIterator* input_iter,
  1460. CompactionIterator* c_iter, Status status) {
  1461. if (status.ok() && cfd->IsDropped()) {
  1462. status =
  1463. Status::ColumnFamilyDropped("Column family dropped during compaction");
  1464. }
  1465. if (status.ok() && shutting_down_->load(std::memory_order_relaxed)) {
  1466. status = Status::ShutdownInProgress("Database shutdown");
  1467. }
  1468. if (status.ok() &&
  1469. (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
  1470. status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  1471. }
  1472. if (status.ok()) {
  1473. status = input_iter->status();
  1474. }
  1475. if (status.ok()) {
  1476. status = c_iter->status();
  1477. }
  1478. return status;
  1479. }
  1480. Status CompactionJob::CleanupCompactionFiles(
  1481. SubcompactionState* sub_compact, Status status,
  1482. const CompactionFileOpenFunc& open_file_func,
  1483. const CompactionFileCloseFunc& close_file_func) {
  1484. // Call FinishCompactionOutputFile() even if status is not ok: it needs to
  1485. // close the output files. Open file function is also passed, in case there's
  1486. // only range-dels, no file was opened, to save the range-dels, it need to
  1487. // create a new output file.
  1488. return sub_compact->CloseCompactionFiles(status, open_file_func,
  1489. close_file_func);
  1490. }
  1491. Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact,
  1492. BlobFileBuilder* blob_file_builder,
  1493. Status status) {
  1494. if (blob_file_builder) {
  1495. if (status.ok()) {
  1496. status = blob_file_builder->Finish();
  1497. } else {
  1498. blob_file_builder->Abandon(status);
  1499. }
  1500. sub_compact->Current().UpdateBlobStats();
  1501. }
  1502. return status;
  1503. }
  1504. void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
  1505. assert(sub_compact);
  1506. assert(sub_compact->compaction);
  1507. if (!ShouldUseLocalCompaction(sub_compact)) {
  1508. return;
  1509. }
  1510. AutoThreadOperationStageUpdater stage_updater(
  1511. ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
  1512. const uint64_t start_cpu_micros = db_options_.clock->CPUMicros();
  1513. uint64_t prev_cpu_micros = start_cpu_micros;
  1514. const CompactionIOStatsSnapshot io_stats = InitializeIOStats();
  1515. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  1516. const CompactionFilter* compaction_filter;
  1517. std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  1518. Status filter_status = SetupAndValidateCompactionFilter(
  1519. sub_compact, cfd->ioptions().compaction_filter, compaction_filter,
  1520. compaction_filter_from_factory);
  1521. if (!filter_status.ok()) {
  1522. sub_compact->status = filter_status;
  1523. return;
  1524. }
  1525. NotifyOnSubcompactionBegin(sub_compact);
  1526. SubcompactionKeyBoundaries boundaries(sub_compact->start, sub_compact->end);
  1527. SubcompactionInternalIterators iterators;
  1528. ReadOptions read_options;
  1529. const WriteOptions write_options(Env::IOPriority::IO_LOW,
  1530. Env::IOActivity::kCompaction);
  1531. InternalIterator* input_iter = CreateInputIterator(
  1532. sub_compact, cfd, iterators, boundaries, read_options);
  1533. assert(input_iter);
  1534. Status status =
  1535. MaybeResumeSubcompactionProgressOnInputIterator(sub_compact, input_iter);
  1536. if (status.IsNotFound()) {
  1537. input_iter->SeekToFirst();
  1538. } else if (!status.ok()) {
  1539. sub_compact->status = status;
  1540. return;
  1541. }
  1542. MergeHelper merge(
  1543. env_, cfd->user_comparator(), cfd->ioptions().merge_operator.get(),
  1544. compaction_filter, db_options_.info_log.get(),
  1545. false /* internal key corruption is expected */,
  1546. job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
  1547. compact_->compaction->level(), db_options_.stats);
  1548. BlobFileResources blob_resources;
  1549. auto c_iter =
  1550. CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter,
  1551. merge, blob_resources, write_options);
  1552. assert(c_iter);
  1553. c_iter->SeekToFirst();
  1554. TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
  1555. TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1",
  1556. static_cast<void*>(const_cast<std::atomic<bool>*>(
  1557. &manual_compaction_canceled_)));
  1558. auto [open_file_func, close_file_func] =
  1559. CreateFileHandlers(sub_compact, boundaries);
  1560. status = ProcessKeyValue(sub_compact, cfd, c_iter.get(), open_file_func,
  1561. close_file_func, prev_cpu_micros);
  1562. status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status);
  1563. FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func,
  1564. blob_resources.blob_file_builder.get(), c_iter.get(),
  1565. input_iter, start_cpu_micros, prev_cpu_micros,
  1566. io_stats);
  1567. NotifyOnSubcompactionCompleted(sub_compact);
  1568. }
  1569. void CompactionJob::FinalizeSubcompaction(
  1570. SubcompactionState* sub_compact, Status status,
  1571. const CompactionFileOpenFunc& open_file_func,
  1572. const CompactionFileCloseFunc& close_file_func,
  1573. BlobFileBuilder* blob_file_builder, CompactionIterator* c_iter,
  1574. [[maybe_unused]] InternalIterator* input_iter, uint64_t start_cpu_micros,
  1575. uint64_t prev_cpu_micros, const CompactionIOStatsSnapshot& io_stats) {
  1576. status = CleanupCompactionFiles(sub_compact, status, open_file_func,
  1577. close_file_func);
  1578. status = FinalizeBlobFiles(sub_compact, blob_file_builder, status);
  1579. FinalizeSubcompactionJobStats(sub_compact, c_iter, start_cpu_micros,
  1580. prev_cpu_micros, io_stats);
  1581. #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
  1582. if (!status.ok()) {
  1583. if (c_iter) {
  1584. c_iter->status().PermitUncheckedError();
  1585. }
  1586. if (input_iter) {
  1587. input_iter->status().PermitUncheckedError();
  1588. }
  1589. }
  1590. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  1591. sub_compact->status = status;
  1592. }
  1593. uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
  1594. return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
  1595. }
  1596. void CompactionJob::RecordDroppedKeys(
  1597. const CompactionIterationStats& c_iter_stats,
  1598. CompactionJobStats* compaction_job_stats) {
  1599. if (c_iter_stats.num_record_drop_user > 0) {
  1600. RecordTick(stats_, COMPACTION_KEY_DROP_USER,
  1601. c_iter_stats.num_record_drop_user);
  1602. }
  1603. if (c_iter_stats.num_record_drop_hidden > 0) {
  1604. RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
  1605. c_iter_stats.num_record_drop_hidden);
  1606. if (compaction_job_stats) {
  1607. compaction_job_stats->num_records_replaced +=
  1608. c_iter_stats.num_record_drop_hidden;
  1609. }
  1610. }
  1611. if (c_iter_stats.num_record_drop_obsolete > 0) {
  1612. RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
  1613. c_iter_stats.num_record_drop_obsolete);
  1614. if (compaction_job_stats) {
  1615. compaction_job_stats->num_expired_deletion_records +=
  1616. c_iter_stats.num_record_drop_obsolete;
  1617. }
  1618. }
  1619. if (c_iter_stats.num_record_drop_range_del > 0) {
  1620. RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
  1621. c_iter_stats.num_record_drop_range_del);
  1622. }
  1623. if (c_iter_stats.num_range_del_drop_obsolete > 0) {
  1624. RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
  1625. c_iter_stats.num_range_del_drop_obsolete);
  1626. }
  1627. if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
  1628. RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
  1629. c_iter_stats.num_optimized_del_drop_obsolete);
  1630. }
  1631. }
  1632. Status CompactionJob::FinishCompactionOutputFile(
  1633. const Status& input_status,
  1634. const ParsedInternalKey& prev_table_last_internal_key,
  1635. const Slice& next_table_min_key, const Slice* comp_start_user_key,
  1636. const Slice* comp_end_user_key, const CompactionIterator* c_iter,
  1637. SubcompactionState* sub_compact, CompactionOutputs& outputs) {
  1638. AutoThreadOperationStageUpdater stage_updater(
  1639. ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
  1640. assert(sub_compact != nullptr);
  1641. assert(outputs.HasBuilder());
  1642. FileMetaData* meta = outputs.GetMetaData();
  1643. uint64_t output_number = meta->fd.GetNumber();
  1644. assert(output_number != 0);
  1645. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  1646. std::string file_checksum = kUnknownFileChecksum;
  1647. std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
  1648. // Check for iterator errors
  1649. Status s = input_status;
  1650. // Add range tombstones
  1651. if (s.ok()) {
  1652. // Inclusive lower bound, exclusive upper bound
  1653. std::pair<SequenceNumber, SequenceNumber> keep_seqno_range{
  1654. 0, kMaxSequenceNumber};
  1655. if (sub_compact->compaction->SupportsPerKeyPlacement()) {
  1656. if (outputs.IsProximalLevel()) {
  1657. keep_seqno_range.first = proximal_after_seqno_;
  1658. } else {
  1659. keep_seqno_range.second = proximal_after_seqno_;
  1660. }
  1661. }
  1662. CompactionIterationStats range_del_out_stats;
  1663. // NOTE1: Use `bottommost_level_ = true` for both bottommost and
  1664. // output_to_proximal_level compaction here, as it's only used to decide
  1665. // if range dels could be dropped. (Logically, we are taking a single sorted
  1666. // run returned from CompactionIterator and physically splitting it between
  1667. // two output levels.)
  1668. // NOTE2: with per-key placement, range tombstones will be filtered on
  1669. // each output level based on sequence number (traversed twice). This is
  1670. // CPU-inefficient for a large number of range tombstones, but that would
  1671. // be an unusual work load.
  1672. if (sub_compact->HasRangeDel()) {
  1673. s = outputs.AddRangeDels(*sub_compact->RangeDelAgg(), comp_start_user_key,
  1674. comp_end_user_key, range_del_out_stats,
  1675. bottommost_level_, cfd->internal_comparator(),
  1676. earliest_snapshot_, keep_seqno_range,
  1677. next_table_min_key, full_history_ts_low_);
  1678. }
  1679. RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
  1680. TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
  1681. }
  1682. const uint64_t current_entries = outputs.NumEntries();
  1683. s = outputs.Finish(s, seqno_to_time_mapping_);
  1684. TEST_SYNC_POINT_CALLBACK(
  1685. "CompactionJob::FinishCompactionOutputFile()::AfterFinish", &s);
  1686. if (s.ok()) {
  1687. // With accurate smallest and largest key, we can get a slightly more
  1688. // accurate oldest ancester time.
  1689. // This makes oldest ancester time in manifest more accurate than in
  1690. // table properties. Not sure how to resolve it.
  1691. if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
  1692. uint64_t refined_oldest_ancester_time;
  1693. Slice new_smallest = meta->smallest.user_key();
  1694. Slice new_largest = meta->largest.user_key();
  1695. if (!new_largest.empty() && !new_smallest.empty()) {
  1696. refined_oldest_ancester_time =
  1697. sub_compact->compaction->MinInputFileOldestAncesterTime(
  1698. &(meta->smallest), &(meta->largest));
  1699. if (refined_oldest_ancester_time !=
  1700. std::numeric_limits<uint64_t>::max()) {
  1701. meta->oldest_ancester_time = refined_oldest_ancester_time;
  1702. }
  1703. }
  1704. }
  1705. }
  1706. // Finish and check for file errors
  1707. IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
  1708. db_options_.use_fsync);
  1709. if (s.ok() && io_s.ok()) {
  1710. file_checksum = meta->file_checksum;
  1711. file_checksum_func_name = meta->file_checksum_func_name;
  1712. }
  1713. if (s.ok()) {
  1714. s = io_s;
  1715. }
  1716. if (sub_compact->io_status.ok()) {
  1717. sub_compact->io_status = io_s;
  1718. // Since this error is really a copy of the
  1719. // "normal" status, it does not also need to be checked
  1720. sub_compact->io_status.PermitUncheckedError();
  1721. }
  1722. TableProperties tp;
  1723. if (s.ok()) {
  1724. tp = outputs.GetTableProperties();
  1725. }
  1726. if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
  1727. // If there is nothing to output, no necessary to generate a sst file.
  1728. // This happens when the output level is bottom level, at the same time
  1729. // the sub_compact output nothing.
  1730. std::string fname = GetTableFileName(meta->fd.GetNumber());
  1731. // TODO(AR) it is not clear if there are any larger implications if
  1732. // DeleteFile fails here
  1733. Status ds = env_->DeleteFile(fname);
  1734. if (!ds.ok()) {
  1735. ROCKS_LOG_WARN(
  1736. db_options_.info_log,
  1737. "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
  1738. " at bottom level%s",
  1739. cfd->GetName().c_str(), job_id_, output_number,
  1740. meta->marked_for_compaction ? " (need compaction)" : "");
  1741. }
  1742. // Also need to remove the file from outputs, or it will be added to the
  1743. // VersionEdit.
  1744. outputs.RemoveLastOutput();
  1745. meta = nullptr;
  1746. }
  1747. if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
  1748. // Output to event logger and fire events.
  1749. outputs.UpdateTableProperties();
  1750. ROCKS_LOG_INFO(db_options_.info_log,
  1751. "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
  1752. " keys, %" PRIu64 " bytes%s, temperature: %s",
  1753. cfd->GetName().c_str(), job_id_, output_number,
  1754. current_entries, meta->fd.file_size,
  1755. meta->marked_for_compaction ? " (need compaction)" : "",
  1756. temperature_to_string[meta->temperature].c_str());
  1757. }
  1758. std::string fname;
  1759. FileDescriptor output_fd;
  1760. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
  1761. Status status_for_listener = s;
  1762. if (meta != nullptr) {
  1763. fname = GetTableFileName(meta->fd.GetNumber());
  1764. output_fd = meta->fd;
  1765. oldest_blob_file_number = meta->oldest_blob_file_number;
  1766. } else {
  1767. fname = "(nil)";
  1768. if (s.ok()) {
  1769. status_for_listener = Status::Aborted("Empty SST file not kept");
  1770. }
  1771. }
  1772. EventHelpers::LogAndNotifyTableFileCreationFinished(
  1773. event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), fname,
  1774. job_id_, output_fd, oldest_blob_file_number, tp,
  1775. TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
  1776. file_checksum_func_name);
  1777. // Report new file to SstFileManagerImpl
  1778. auto sfm =
  1779. static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
  1780. if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
  1781. Status add_s = sfm->OnAddFile(fname);
  1782. if (!add_s.ok() && s.ok()) {
  1783. s = add_s;
  1784. }
  1785. if (sfm->IsMaxAllowedSpaceReached()) {
  1786. // TODO(ajkr): should we return OK() if max space was reached by the final
  1787. // compaction output file (similarly to how flush works when full)?
  1788. s = Status::SpaceLimit("Max allowed space was reached");
  1789. TEST_SYNC_POINT(
  1790. "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
  1791. InstrumentedMutexLock l(db_mutex_);
  1792. db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
  1793. }
  1794. }
  1795. if (s.ok() && ShouldUpdateSubcompactionProgress(sub_compact, c_iter,
  1796. prev_table_last_internal_key,
  1797. next_table_min_key, meta)) {
  1798. UpdateSubcompactionProgress(c_iter, next_table_min_key, sub_compact);
  1799. s = PersistSubcompactionProgress(sub_compact);
  1800. }
  1801. outputs.ResetBuilder();
  1802. return s;
  1803. }
  1804. bool CompactionJob::ShouldUpdateSubcompactionProgress(
  1805. const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
  1806. const ParsedInternalKey& prev_table_last_internal_key,
  1807. const Slice& next_table_min_internal_key, const FileMetaData* meta) const {
  1808. const auto* cfd = sub_compact->compaction->column_family_data();
  1809. // No need to update when the output will not get persisted
  1810. if (compaction_progress_writer_ == nullptr) {
  1811. return false;
  1812. }
  1813. // No need to update for a new empty output
  1814. if (meta == nullptr) {
  1815. return false;
  1816. }
  1817. // TODO(hx235): save progress even on the last output file
  1818. if (next_table_min_internal_key.empty()) {
  1819. return false;
  1820. }
  1821. // LIMITATION: Persisting compaction progress with timestamp
  1822. // is not supported since the feature of persisting timestamp of the key in
  1823. // SST files itself is still experimental
  1824. size_t ts_sz = cfd->user_comparator()->timestamp_size();
  1825. if (ts_sz > 0) {
  1826. return false;
  1827. }
  1828. // LIMITATION: Compaction progress persistence disabled for file boundaries
  1829. // contaning range deletions. Range deletions can span file boundaries, making
  1830. // it difficult (but possible) to ensure adjacent output tables have different
  1831. // user keys. See the last check for why different users keys of adjacent
  1832. // output tables are needed
  1833. const ValueType next_table_min_internal_key_type =
  1834. ExtractValueType(next_table_min_internal_key);
  1835. const ValueType prev_table_last_internal_key_type =
  1836. prev_table_last_internal_key.user_key.empty()
  1837. ? ValueType::kTypeValue
  1838. : prev_table_last_internal_key.type;
  1839. if (next_table_min_internal_key_type == ValueType::kTypeRangeDeletion ||
  1840. prev_table_last_internal_key_type == ValueType::kTypeRangeDeletion) {
  1841. return false;
  1842. }
  1843. // LIMITATION: Compaction progress persistence disabled when adjacent output
  1844. // tables share the same user key at boundaries. This ensures a simple Seek()
  1845. // of the next key when resuming can process all versions of a user key
  1846. const Slice next_table_min_user_key =
  1847. ExtractUserKey(next_table_min_internal_key);
  1848. const Slice prev_table_last_user_key =
  1849. prev_table_last_internal_key.user_key.empty()
  1850. ? Slice()
  1851. : prev_table_last_internal_key.user_key;
  1852. if (cfd->user_comparator()->EqualWithoutTimestamp(next_table_min_user_key,
  1853. prev_table_last_user_key)) {
  1854. return false;
  1855. }
  1856. // LIMITATION: Don't save progress if the current key has already been scanned
  1857. // (looked ahead) in the input but not yet output. This can happen with merge
  1858. // operations, single deletes, and deletes at the bottommost level where
  1859. // CompactionIterator needs to look ahead to process multiple entries for the
  1860. // same user key before outputting a result. If we saved progress and resumed
  1861. // at this boundary, the resumed session would see and process the same input
  1862. // key again through Seek(), leading to incorrect double-counting in
  1863. // number of processed input entries and input count verification failure
  1864. //
  1865. // TODO(hx235): Offset num_processed_input_records to avoid double counting
  1866. // instead of disabling progress persistence.
  1867. if (c_iter->IsCurrentKeyAlreadyScanned()) {
  1868. return false;
  1869. }
  1870. return true;
  1871. }
  1872. Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
  1873. assert(compact_);
  1874. db_mutex_->AssertHeld();
  1875. const ReadOptions read_options(Env::IOActivity::kCompaction);
  1876. const WriteOptions write_options(Env::IOActivity::kCompaction);
  1877. auto* compaction = compact_->compaction;
  1878. assert(compaction);
  1879. {
  1880. Compaction::InputLevelSummaryBuffer inputs_summary;
  1881. if (internal_stats_.has_proximal_level_output) {
  1882. ROCKS_LOG_BUFFER(
  1883. log_buffer_,
  1884. "[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64
  1885. " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
  1886. compaction->column_family_data()->GetName().c_str(), job_id_,
  1887. compaction->InputLevelSummary(&inputs_summary),
  1888. internal_stats_.proximal_level_stats.bytes_written,
  1889. internal_stats_.output_level_stats.bytes_written,
  1890. internal_stats_.TotalBytesWritten());
  1891. } else {
  1892. ROCKS_LOG_BUFFER(log_buffer_,
  1893. "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
  1894. compaction->column_family_data()->GetName().c_str(),
  1895. job_id_, compaction->InputLevelSummary(&inputs_summary),
  1896. internal_stats_.TotalBytesWritten());
  1897. }
  1898. }
  1899. VersionEdit* const edit = compaction->edit();
  1900. assert(edit);
  1901. // Add compaction inputs
  1902. compaction->AddInputDeletions(edit);
  1903. std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
  1904. for (const auto& sub_compact : compact_->sub_compact_states) {
  1905. sub_compact.AddOutputsEdit(edit);
  1906. for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
  1907. edit->AddBlobFile(blob);
  1908. }
  1909. if (sub_compact.Current().GetBlobGarbageMeter()) {
  1910. const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
  1911. for (const auto& pair : flows) {
  1912. const uint64_t blob_file_number = pair.first;
  1913. const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
  1914. assert(flow.IsValid());
  1915. if (flow.HasGarbage()) {
  1916. blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
  1917. flow.GetGarbageBytes());
  1918. }
  1919. }
  1920. }
  1921. }
  1922. for (const auto& pair : blob_total_garbage) {
  1923. const uint64_t blob_file_number = pair.first;
  1924. const BlobGarbageMeter::BlobStats& stats = pair.second;
  1925. edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
  1926. stats.GetBytes());
  1927. }
  1928. if ((compaction->compaction_reason() ==
  1929. CompactionReason::kLevelMaxLevelSize ||
  1930. compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
  1931. compaction->immutable_options().compaction_pri == kRoundRobin) {
  1932. int start_level = compaction->start_level();
  1933. if (start_level > 0) {
  1934. auto vstorage = compaction->input_version()->storage_info();
  1935. edit->AddCompactCursor(start_level,
  1936. vstorage->GetNextCompactCursor(
  1937. start_level, compaction->num_input_files(0)));
  1938. }
  1939. }
  1940. auto manifest_wcb = [&compaction, &compaction_released](const Status& s) {
  1941. compaction->ReleaseCompactionFiles(s);
  1942. *compaction_released = true;
  1943. };
  1944. return versions_->LogAndApply(compaction->column_family_data(), read_options,
  1945. write_options, edit, db_mutex_, db_directory_,
  1946. /*new_descriptor_log=*/false,
  1947. /*column_family_options=*/nullptr,
  1948. manifest_wcb);
  1949. }
  1950. void CompactionJob::RecordCompactionIOStats() {
  1951. RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
  1952. RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
  1953. CompactionReason compaction_reason =
  1954. compact_->compaction->compaction_reason();
  1955. if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
  1956. RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
  1957. RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
  1958. } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
  1959. RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
  1960. RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
  1961. } else if (compaction_reason == CompactionReason::kTtl) {
  1962. RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
  1963. RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
  1964. }
  1965. ThreadStatusUtil::IncreaseThreadOperationProperty(
  1966. ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
  1967. IOSTATS_RESET(bytes_read);
  1968. ThreadStatusUtil::IncreaseThreadOperationProperty(
  1969. ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
  1970. IOSTATS_RESET(bytes_written);
  1971. }
  1972. Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
  1973. CompactionOutputs& outputs) {
  1974. assert(sub_compact != nullptr);
  1975. // no need to lock because VersionSet::next_file_number_ is atomic
  1976. uint64_t file_number = versions_->NewFileNumber();
  1977. #ifndef NDEBUG
  1978. TEST_SYNC_POINT_CALLBACK(
  1979. "CompactionJob::OpenCompactionOutputFile::NewFileNumber", &file_number);
  1980. #endif
  1981. std::string fname = GetTableFileName(file_number);
  1982. // Fire events.
  1983. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  1984. EventHelpers::NotifyTableFileCreationStarted(
  1985. cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, job_id_,
  1986. TableFileCreationReason::kCompaction);
  1987. // Make the output file
  1988. std::unique_ptr<FSWritableFile> writable_file;
  1989. #ifndef NDEBUG
  1990. bool syncpoint_arg = file_options_.use_direct_writes;
  1991. TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
  1992. &syncpoint_arg);
  1993. #endif
  1994. // Pass temperature of the last level files to FileSystem.
  1995. FileOptions fo_copy = file_options_;
  1996. auto temperature =
  1997. sub_compact->compaction->GetOutputTemperature(outputs.IsProximalLevel());
  1998. fo_copy.temperature = temperature;
  1999. fo_copy.write_hint = write_hint_;
  2000. Status s;
  2001. IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
  2002. s = io_s;
  2003. if (sub_compact->io_status.ok()) {
  2004. sub_compact->io_status = io_s;
  2005. // Since this error is really a copy of the io_s that is checked below as s,
  2006. // it does not also need to be checked.
  2007. sub_compact->io_status.PermitUncheckedError();
  2008. }
  2009. if (!s.ok()) {
  2010. ROCKS_LOG_ERROR(
  2011. db_options_.info_log,
  2012. "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
  2013. " fails at NewWritableFile with status %s",
  2014. sub_compact->compaction->column_family_data()->GetName().c_str(),
  2015. job_id_, file_number, s.ToString().c_str());
  2016. LogFlush(db_options_.info_log);
  2017. EventHelpers::LogAndNotifyTableFileCreationFinished(
  2018. event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(),
  2019. fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
  2020. TableProperties(), TableFileCreationReason::kCompaction, s,
  2021. kUnknownFileChecksum, kUnknownFileChecksumFuncName);
  2022. return s;
  2023. }
  2024. // Try to figure out the output file's oldest ancester time.
  2025. int64_t temp_current_time = 0;
  2026. auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
  2027. // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
  2028. if (!get_time_status.ok()) {
  2029. ROCKS_LOG_WARN(db_options_.info_log,
  2030. "Failed to get current time. Status: %s",
  2031. get_time_status.ToString().c_str());
  2032. }
  2033. uint64_t current_time = static_cast<uint64_t>(temp_current_time);
  2034. InternalKey tmp_start, tmp_end;
  2035. if (sub_compact->start.has_value()) {
  2036. tmp_start.SetMinPossibleForUserKey(*(sub_compact->start));
  2037. }
  2038. if (sub_compact->end.has_value()) {
  2039. tmp_end.SetMinPossibleForUserKey(*(sub_compact->end));
  2040. }
  2041. uint64_t oldest_ancester_time =
  2042. sub_compact->compaction->MinInputFileOldestAncesterTime(
  2043. sub_compact->start.has_value() ? &tmp_start : nullptr,
  2044. sub_compact->end.has_value() ? &tmp_end : nullptr);
  2045. if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
  2046. // TODO: fix DBSSTTest.GetTotalSstFilesSize and use
  2047. // kUnknownOldestAncesterTime
  2048. oldest_ancester_time = current_time;
  2049. }
  2050. uint64_t newest_key_time = sub_compact->compaction->MaxInputFileNewestKeyTime(
  2051. sub_compact->start.has_value() ? &tmp_start : nullptr,
  2052. sub_compact->end.has_value() ? &tmp_end : nullptr);
  2053. // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
  2054. uint64_t epoch_number = sub_compact->compaction->MinInputFileEpochNumber();
  2055. {
  2056. FileMetaData meta;
  2057. meta.fd = FileDescriptor(file_number,
  2058. sub_compact->compaction->output_path_id(), 0);
  2059. meta.oldest_ancester_time = oldest_ancester_time;
  2060. meta.file_creation_time = current_time;
  2061. meta.epoch_number = epoch_number;
  2062. meta.temperature = temperature;
  2063. assert(!db_id_.empty());
  2064. assert(!db_session_id_.empty());
  2065. s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
  2066. &meta.unique_id);
  2067. if (!s.ok()) {
  2068. ROCKS_LOG_ERROR(db_options_.info_log,
  2069. "[%s] [JOB %d] file #%" PRIu64
  2070. " failed to generate unique id: %s.",
  2071. cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
  2072. s.ToString().c_str());
  2073. return s;
  2074. }
  2075. outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
  2076. paranoid_file_checks_);
  2077. }
  2078. writable_file->SetIOPriority(GetRateLimiterPriority());
  2079. // Subsequent attempts to override the hint via SetWriteLifeTimeHint
  2080. // with the very same value will be ignored by the fs.
  2081. writable_file->SetWriteLifeTimeHint(fo_copy.write_hint);
  2082. FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
  2083. writable_file->SetPreallocationBlockSize(static_cast<size_t>(
  2084. sub_compact->compaction->OutputFilePreallocationSize()));
  2085. const auto& listeners =
  2086. sub_compact->compaction->immutable_options().listeners;
  2087. outputs.AssignFileWriter(new WritableFileWriter(
  2088. std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
  2089. db_options_.stats, Histograms::SST_WRITE_MICROS, listeners,
  2090. db_options_.file_checksum_gen_factory.get(),
  2091. tmp_set.Contains(FileType::kTableFile), false));
  2092. // TODO(hx235): pass in the correct `oldest_key_time` instead of `0`
  2093. const ReadOptions read_options(Env::IOActivity::kCompaction);
  2094. const WriteOptions write_options(Env::IOActivity::kCompaction);
  2095. TableBuilderOptions tboptions(
  2096. cfd->ioptions(), sub_compact->compaction->mutable_cf_options(),
  2097. read_options, write_options, cfd->internal_comparator(),
  2098. cfd->internal_tbl_prop_coll_factories(),
  2099. sub_compact->compaction->output_compression(),
  2100. sub_compact->compaction->output_compression_opts(), cfd->GetID(),
  2101. cfd->GetName(), sub_compact->compaction->output_level(), newest_key_time,
  2102. bottommost_level_, TableFileCreationReason::kCompaction,
  2103. 0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
  2104. sub_compact->compaction->max_output_file_size(), file_number,
  2105. proximal_after_seqno_ /*last_level_inclusive_max_seqno_threshold*/);
  2106. outputs.NewBuilder(tboptions);
  2107. LogFlush(db_options_.info_log);
  2108. return s;
  2109. }
  2110. void CompactionJob::CleanupCompaction() {
  2111. for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
  2112. sub_compact.Cleanup(table_cache_.get());
  2113. }
  2114. delete compact_;
  2115. compact_ = nullptr;
  2116. }
  2117. namespace {
  2118. void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
  2119. assert(prefix_length > 0);
  2120. size_t length = src.size() > prefix_length ? prefix_length : src.size();
  2121. dst->assign(src.data(), length);
  2122. }
  2123. } // namespace
  2124. bool CompactionJob::UpdateInternalStatsFromInputFiles(
  2125. uint64_t* num_input_range_del) {
  2126. assert(compact_);
  2127. Compaction* compaction = compact_->compaction;
  2128. internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0;
  2129. internal_stats_.output_level_stats.num_input_files_in_output_level = 0;
  2130. bool has_error = false;
  2131. const ReadOptions read_options(Env::IOActivity::kCompaction);
  2132. const auto& input_table_properties = compaction->GetInputTableProperties();
  2133. for (int input_level = 0;
  2134. input_level < static_cast<int>(compaction->num_input_levels());
  2135. ++input_level) {
  2136. const LevelFilesBrief* flevel = compaction->input_levels(input_level);
  2137. size_t num_input_files = flevel->num_files;
  2138. uint64_t* bytes_read;
  2139. if (compaction->level(input_level) != compaction->output_level()) {
  2140. internal_stats_.output_level_stats.num_input_files_in_non_output_levels +=
  2141. static_cast<int>(num_input_files);
  2142. bytes_read =
  2143. &internal_stats_.output_level_stats.bytes_read_non_output_levels;
  2144. } else {
  2145. internal_stats_.output_level_stats.num_input_files_in_output_level +=
  2146. static_cast<int>(num_input_files);
  2147. bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level;
  2148. }
  2149. for (size_t i = 0; i < num_input_files; ++i) {
  2150. const FileMetaData* file_meta = flevel->files[i].file_metadata;
  2151. *bytes_read += file_meta->fd.GetFileSize();
  2152. uint64_t file_input_entries = file_meta->num_entries;
  2153. uint64_t file_num_range_del = file_meta->num_range_deletions;
  2154. if (file_input_entries == 0) {
  2155. uint64_t file_number = file_meta->fd.GetNumber();
  2156. // Try getting info from table property
  2157. std::string fn = TableFileName(compaction->immutable_options().cf_paths,
  2158. file_number, file_meta->fd.GetPathId());
  2159. const auto& tp = input_table_properties.find(fn);
  2160. if (tp != input_table_properties.end()) {
  2161. file_input_entries = tp->second->num_entries;
  2162. file_num_range_del = tp->second->num_range_deletions;
  2163. } else {
  2164. has_error = true;
  2165. }
  2166. }
  2167. internal_stats_.output_level_stats.num_input_records +=
  2168. file_input_entries;
  2169. if (num_input_range_del) {
  2170. *num_input_range_del += file_num_range_del;
  2171. }
  2172. }
  2173. const std::vector<FileMetaData*>& filtered_flevel =
  2174. compaction->filtered_input_levels(input_level);
  2175. size_t num_filtered_input_files = filtered_flevel.size();
  2176. uint64_t* bytes_skipped;
  2177. if (compaction->level(input_level) != compaction->output_level()) {
  2178. internal_stats_.output_level_stats
  2179. .num_filtered_input_files_in_non_output_levels +=
  2180. static_cast<int>(num_filtered_input_files);
  2181. bytes_skipped =
  2182. &internal_stats_.output_level_stats.bytes_skipped_non_output_levels;
  2183. } else {
  2184. internal_stats_.output_level_stats
  2185. .num_filtered_input_files_in_output_level +=
  2186. static_cast<int>(num_filtered_input_files);
  2187. bytes_skipped =
  2188. &internal_stats_.output_level_stats.bytes_skipped_output_level;
  2189. }
  2190. for (const FileMetaData* filtered_file_meta : filtered_flevel) {
  2191. *bytes_skipped += filtered_file_meta->fd.GetFileSize();
  2192. }
  2193. }
  2194. // TODO - find a better place to set these two
  2195. assert(job_stats_);
  2196. internal_stats_.output_level_stats.bytes_read_blob =
  2197. job_stats_->total_blob_bytes_read;
  2198. internal_stats_.output_level_stats.num_dropped_records =
  2199. internal_stats_.DroppedRecords();
  2200. return !has_error;
  2201. }
  2202. void CompactionJob::UpdateCompactionJobInputStatsFromInternalStats(
  2203. const InternalStats::CompactionStatsFull& internal_stats,
  2204. uint64_t num_input_range_del) const {
  2205. assert(job_stats_);
  2206. // input information
  2207. job_stats_->total_input_bytes =
  2208. internal_stats.output_level_stats.bytes_read_non_output_levels +
  2209. internal_stats.output_level_stats.bytes_read_output_level;
  2210. job_stats_->num_input_records =
  2211. internal_stats.output_level_stats.num_input_records - num_input_range_del;
  2212. job_stats_->num_input_files =
  2213. internal_stats.output_level_stats.num_input_files_in_non_output_levels +
  2214. internal_stats.output_level_stats.num_input_files_in_output_level;
  2215. job_stats_->num_input_files_at_output_level =
  2216. internal_stats.output_level_stats.num_input_files_in_output_level;
  2217. job_stats_->num_filtered_input_files =
  2218. internal_stats.output_level_stats
  2219. .num_filtered_input_files_in_non_output_levels +
  2220. internal_stats.output_level_stats
  2221. .num_filtered_input_files_in_output_level;
  2222. job_stats_->num_filtered_input_files_at_output_level =
  2223. internal_stats.output_level_stats
  2224. .num_filtered_input_files_in_output_level;
  2225. job_stats_->total_skipped_input_bytes =
  2226. internal_stats.output_level_stats.bytes_skipped_non_output_levels +
  2227. internal_stats.output_level_stats.bytes_skipped_output_level;
  2228. if (internal_stats.has_proximal_level_output) {
  2229. job_stats_->total_input_bytes +=
  2230. internal_stats.proximal_level_stats.bytes_read_non_output_levels +
  2231. internal_stats.proximal_level_stats.bytes_read_output_level;
  2232. job_stats_->num_input_records +=
  2233. internal_stats.proximal_level_stats.num_input_records;
  2234. job_stats_->num_input_files +=
  2235. internal_stats.proximal_level_stats
  2236. .num_input_files_in_non_output_levels +
  2237. internal_stats.proximal_level_stats.num_input_files_in_output_level;
  2238. job_stats_->num_input_files_at_output_level +=
  2239. internal_stats.proximal_level_stats.num_input_files_in_output_level;
  2240. job_stats_->num_filtered_input_files +=
  2241. internal_stats.proximal_level_stats
  2242. .num_filtered_input_files_in_non_output_levels +
  2243. internal_stats.proximal_level_stats
  2244. .num_filtered_input_files_in_output_level;
  2245. job_stats_->num_filtered_input_files_at_output_level +=
  2246. internal_stats.proximal_level_stats
  2247. .num_filtered_input_files_in_output_level;
  2248. job_stats_->total_skipped_input_bytes +=
  2249. internal_stats.proximal_level_stats.bytes_skipped_non_output_levels +
  2250. internal_stats.proximal_level_stats.bytes_skipped_output_level;
  2251. }
  2252. }
  2253. void CompactionJob::UpdateCompactionJobOutputStatsFromInternalStats(
  2254. const Status& status,
  2255. const InternalStats::CompactionStatsFull& internal_stats) const {
  2256. assert(job_stats_);
  2257. job_stats_->elapsed_micros = internal_stats.output_level_stats.micros;
  2258. job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros;
  2259. // output information
  2260. job_stats_->total_output_bytes =
  2261. internal_stats.output_level_stats.bytes_written;
  2262. job_stats_->total_output_bytes_blob =
  2263. internal_stats.output_level_stats.bytes_written_blob;
  2264. job_stats_->num_output_records =
  2265. internal_stats.output_level_stats.num_output_records;
  2266. job_stats_->num_output_files =
  2267. internal_stats.output_level_stats.num_output_files;
  2268. job_stats_->num_output_files_blob =
  2269. internal_stats.output_level_stats.num_output_files_blob;
  2270. if (internal_stats.has_proximal_level_output) {
  2271. job_stats_->total_output_bytes +=
  2272. internal_stats.proximal_level_stats.bytes_written;
  2273. job_stats_->total_output_bytes_blob +=
  2274. internal_stats.proximal_level_stats.bytes_written_blob;
  2275. job_stats_->num_output_records +=
  2276. internal_stats.proximal_level_stats.num_output_records;
  2277. job_stats_->num_output_files +=
  2278. internal_stats.proximal_level_stats.num_output_files;
  2279. job_stats_->num_output_files_blob +=
  2280. internal_stats.proximal_level_stats.num_output_files_blob;
  2281. }
  2282. if (status.ok() && job_stats_->num_output_files > 0) {
  2283. CopyPrefix(compact_->SmallestUserKey(),
  2284. CompactionJobStats::kMaxPrefixLength,
  2285. &job_stats_->smallest_output_key_prefix);
  2286. CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
  2287. &job_stats_->largest_output_key_prefix);
  2288. }
  2289. }
  2290. void CompactionJob::LogCompaction() {
  2291. Compaction* compaction = compact_->compaction;
  2292. ColumnFamilyData* cfd = compaction->column_family_data();
  2293. // Let's check if anything will get logged. Don't prepare all the info if
  2294. // we're not logging
  2295. if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
  2296. Compaction::InputLevelSummaryBuffer inputs_summary;
  2297. ROCKS_LOG_INFO(
  2298. db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
  2299. cfd->GetName().c_str(), job_id_,
  2300. compaction->InputLevelSummary(&inputs_summary), compaction->score());
  2301. char scratch[2345];
  2302. compaction->Summary(scratch, sizeof(scratch));
  2303. ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
  2304. cfd->GetName().c_str(), scratch);
  2305. // build event logger report
  2306. auto stream = event_logger_->Log();
  2307. stream << "job" << job_id_ << "event" << "compaction_started" << "cf_name"
  2308. << cfd->GetName() << "compaction_reason"
  2309. << GetCompactionReasonString(compaction->compaction_reason());
  2310. for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
  2311. stream << ("files_L" + std::to_string(compaction->level(i)));
  2312. stream.StartArray();
  2313. for (auto f : *compaction->inputs(i)) {
  2314. stream << f->fd.GetNumber();
  2315. }
  2316. stream.EndArray();
  2317. }
  2318. stream << "score" << compaction->score() << "input_data_size"
  2319. << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
  2320. << (job_context_->snapshot_seqs.empty()
  2321. ? int64_t{-1} // Use -1 for "none"
  2322. : static_cast<int64_t>(
  2323. job_context_->GetEarliestSnapshotSequence()));
  2324. if (compaction->SupportsPerKeyPlacement()) {
  2325. stream << "proximal_after_seqno" << proximal_after_seqno_;
  2326. stream << "preserve_seqno_after" << preserve_seqno_after_;
  2327. stream << "proximal_output_level" << compaction->GetProximalLevel();
  2328. stream << "proximal_output_range"
  2329. << GetCompactionProximalOutputRangeTypeString(
  2330. compaction->GetProximalOutputRangeType());
  2331. if (compaction->GetProximalOutputRangeType() ==
  2332. Compaction::ProximalOutputRangeType::kDisabled) {
  2333. ROCKS_LOG_WARN(
  2334. db_options_.info_log,
  2335. "[%s] [JOB %d] Proximal level output is disabled, likely "
  2336. "because of the range conflict in the proximal level",
  2337. cfd->GetName().c_str(), job_id_);
  2338. }
  2339. }
  2340. }
  2341. }
  2342. std::string CompactionJob::GetTableFileName(uint64_t file_number) {
  2343. return TableFileName(compact_->compaction->immutable_options().cf_paths,
  2344. file_number, compact_->compaction->output_path_id());
  2345. }
  2346. Env::IOPriority CompactionJob::GetRateLimiterPriority() {
  2347. if (versions_ && versions_->GetColumnFamilySet() &&
  2348. versions_->GetColumnFamilySet()->write_controller()) {
  2349. WriteController* write_controller =
  2350. versions_->GetColumnFamilySet()->write_controller();
  2351. if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
  2352. return Env::IO_USER;
  2353. }
  2354. }
  2355. return Env::IO_LOW;
  2356. }
  2357. Status CompactionJob::ReadTablePropertiesDirectly(
  2358. const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
  2359. const FileMetaData* file_meta, const ReadOptions& read_options,
  2360. std::shared_ptr<const TableProperties>* tp) {
  2361. std::unique_ptr<FSRandomAccessFile> file;
  2362. std::string file_name = GetTableFileName(file_meta->fd.GetNumber());
  2363. Status s = ioptions.fs->NewRandomAccessFile(file_name, file_options_, &file,
  2364. nullptr /* dbg */);
  2365. if (!s.ok()) {
  2366. return s;
  2367. }
  2368. std::unique_ptr<RandomAccessFileReader> file_reader(
  2369. new RandomAccessFileReader(
  2370. std::move(file), file_name, ioptions.clock, io_tracer_,
  2371. ioptions.stats, Histograms::SST_READ_MICROS /* hist_type */,
  2372. nullptr /* file_read_hist */, ioptions.rate_limiter.get(),
  2373. ioptions.listeners));
  2374. std::unique_ptr<TableProperties> props;
  2375. uint64_t magic_number = kBlockBasedTableMagicNumber;
  2376. const auto* table_factory = moptions.table_factory.get();
  2377. if (table_factory == nullptr) {
  2378. return Status::Incomplete("Table factory is not set");
  2379. } else {
  2380. const auto& table_factory_name = table_factory->Name();
  2381. if (table_factory_name == TableFactory::kPlainTableName()) {
  2382. magic_number = kPlainTableMagicNumber;
  2383. } else if (table_factory_name == TableFactory::kCuckooTableName()) {
  2384. magic_number = kCuckooTableMagicNumber;
  2385. }
  2386. }
  2387. s = ReadTableProperties(file_reader.get(), file_meta->fd.GetFileSize(),
  2388. magic_number, ioptions, read_options, &props);
  2389. if (!s.ok()) {
  2390. return s;
  2391. }
  2392. *tp = std::move(props);
  2393. return s;
  2394. }
  2395. Status CompactionJob::ReadOutputFilesTableProperties(
  2396. const autovector<FileMetaData>& output_files,
  2397. const ReadOptions& read_options,
  2398. std::vector<std::shared_ptr<const TableProperties>>&
  2399. output_files_table_properties,
  2400. bool is_proximal_level) {
  2401. assert(!output_files.empty());
  2402. static const char* level_type =
  2403. is_proximal_level ? "proximal output" : "output";
  2404. output_files_table_properties.reserve(output_files.size());
  2405. Status s;
  2406. for (const FileMetaData& metadata : output_files) {
  2407. std::shared_ptr<const TableProperties> tp;
  2408. s = ReadTablePropertiesDirectly(compact_->compaction->immutable_options(),
  2409. compact_->compaction->mutable_cf_options(),
  2410. &metadata, read_options, &tp);
  2411. if (!s.ok()) {
  2412. ROCKS_LOG_ERROR(
  2413. db_options_.info_log,
  2414. "Failed to read table properties for %s level output file #%" PRIu64
  2415. ": %s",
  2416. level_type, metadata.fd.GetNumber(), s.ToString().c_str());
  2417. return s;
  2418. }
  2419. if (tp == nullptr) {
  2420. ROCKS_LOG_ERROR(db_options_.info_log,
  2421. "Empty table property for %s level output file #%" PRIu64
  2422. "",
  2423. level_type, metadata.fd.GetNumber());
  2424. s = Status::Corruption("Empty table property for " +
  2425. std::string(level_type) +
  2426. " level output files during resuming");
  2427. return s;
  2428. }
  2429. output_files_table_properties.push_back(tp);
  2430. }
  2431. return s;
  2432. }
  2433. void CompactionJob::RestoreCompactionOutputs(
  2434. const ColumnFamilyData* cfd,
  2435. const std::vector<std::shared_ptr<const TableProperties>>&
  2436. output_files_table_properties,
  2437. SubcompactionProgressPerLevel& subcompaction_progress_per_level,
  2438. CompactionOutputs* outputs_to_restore) {
  2439. assert(outputs_to_restore->GetOutputs().size() == 0);
  2440. const auto& output_files = subcompaction_progress_per_level.GetOutputFiles();
  2441. for (size_t i = 0; i < output_files.size(); i++) {
  2442. FileMetaData file_copy = output_files[i];
  2443. outputs_to_restore->AddOutput(std::move(file_copy),
  2444. cfd->internal_comparator(),
  2445. paranoid_file_checks_, true /* finished */);
  2446. outputs_to_restore->UpdateTableProperties(
  2447. *output_files_table_properties[i]);
  2448. }
  2449. outputs_to_restore->SetNumOutputRecords(
  2450. subcompaction_progress_per_level.GetNumProcessedOutputRecords());
  2451. }
  2452. // Attempt to resume compaction from a previously persisted compaction progress.
  2453. //
  2454. // RETURNS:
  2455. // - Status::OK():
  2456. // * Input iterator positioned at next unprocessed key
  2457. // * CompactionOutputs objects fully restored for both output and proximal
  2458. // output levels in SubcompactionState
  2459. // * Compaction job statistics accurately reflect input and output records
  2460. // processed for record count verification
  2461. // * File number generation advanced to prevent conflicts with existing outputs
  2462. // - Status::NotFound(): No valid progress to resume from
  2463. // - Status::Corruption(): Resume key is invalid, beyond input range, or output
  2464. // restoration failed
  2465. // - Other non-OK status: Iterator errors or file system issues during
  2466. // restoration
  2467. //
  2468. // The caller must check for Status::IsIncomplete() to distinguish between
  2469. // "no resume needed" (proceed with `InternalIterator::SeekToFirst()`) vs
  2470. // "resume failed" scenarios.
  2471. Status CompactionJob::MaybeResumeSubcompactionProgressOnInputIterator(
  2472. SubcompactionState* sub_compact, InternalIterator* input_iter) {
  2473. const ReadOptions read_options(Env::IOActivity::kCompaction);
  2474. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  2475. SubcompactionProgress& subcompaction_progress =
  2476. sub_compact->GetSubcompactionProgressRef();
  2477. if (subcompaction_progress.output_level_progress
  2478. .GetNumProcessedOutputRecords() == 0 &&
  2479. subcompaction_progress.proximal_output_level_progress
  2480. .GetNumProcessedOutputRecords() == 0) {
  2481. return Status::NotFound("No subcompaction progress to resume");
  2482. }
  2483. ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Resuming compaction : %s",
  2484. cfd->GetName().c_str(), job_id_,
  2485. subcompaction_progress.ToString().c_str());
  2486. input_iter->Seek(subcompaction_progress.next_internal_key_to_compact);
  2487. if (!input_iter->Valid()) {
  2488. ROCKS_LOG_ERROR(db_options_.info_log,
  2489. "[%s] [JOB %d] Iterator is invalid after "
  2490. "seeking to the key to resume. This indicates the key is "
  2491. "incorrectly beyond the input data range.",
  2492. cfd->GetName().c_str(), job_id_);
  2493. return Status::Corruption(
  2494. "The key to resume is beyond the input data range");
  2495. } else if (!input_iter->status().ok()) {
  2496. ROCKS_LOG_ERROR(db_options_.info_log,
  2497. "[%s] [JOB %d] Iterator has error after seeking to "
  2498. "the key to resume: %s",
  2499. cfd->GetName().c_str(), job_id_,
  2500. input_iter->status().ToString().c_str());
  2501. return Status::Corruption(
  2502. "Iterator has error status after seeking to the key: " +
  2503. input_iter->status().ToString());
  2504. }
  2505. sub_compact->compaction_job_stats.has_accurate_num_input_records =
  2506. subcompaction_progress.num_processed_input_records != 0;
  2507. sub_compact->compaction_job_stats.num_input_records =
  2508. subcompaction_progress.num_processed_input_records;
  2509. for (const bool& is_proximal_level : {false, true}) {
  2510. if (is_proximal_level &&
  2511. !sub_compact->compaction->SupportsPerKeyPlacement()) {
  2512. continue;
  2513. }
  2514. Status s;
  2515. SubcompactionProgressPerLevel& subcompaction_progress_per_level =
  2516. is_proximal_level
  2517. ? subcompaction_progress.proximal_output_level_progress
  2518. : subcompaction_progress.output_level_progress;
  2519. const auto& output_files =
  2520. subcompaction_progress_per_level.GetOutputFiles();
  2521. std::vector<std::shared_ptr<const TableProperties>>
  2522. output_files_table_properties;
  2523. // TODO(hx235): investigate if we can skip reading properties to save read
  2524. // IO
  2525. s = ReadOutputFilesTableProperties(output_files, read_options,
  2526. output_files_table_properties);
  2527. if (!s.ok()) {
  2528. ROCKS_LOG_ERROR(
  2529. db_options_.info_log,
  2530. "[%s] [JOB %d] Failed to read table properties for %s output level"
  2531. "files "
  2532. "during resume: %s.",
  2533. cfd->GetName().c_str(), job_id_, is_proximal_level ? "proximal" : "",
  2534. s.ToString().c_str());
  2535. return Status::Corruption(
  2536. "Not able to resume due to table property reading error " +
  2537. s.ToString());
  2538. }
  2539. RestoreCompactionOutputs(cfd, output_files_table_properties,
  2540. subcompaction_progress_per_level,
  2541. sub_compact->Outputs(is_proximal_level));
  2542. // Skip past all the used file numbers to avoid creating new output files
  2543. // after resumption that conflict with the existing output files
  2544. for (const auto& file_meta : output_files) {
  2545. uint64_t file_number = file_meta.fd.GetNumber();
  2546. while (versions_->NewFileNumber() <= file_number) {
  2547. versions_->FetchAddFileNumber(1);
  2548. }
  2549. }
  2550. }
  2551. return Status::OK();
  2552. }
  2553. void CompactionJob::UpdateSubcompactionProgress(
  2554. const CompactionIterator* c_iter, const Slice next_table_min_key,
  2555. SubcompactionState* sub_compact) {
  2556. assert(c_iter);
  2557. SubcompactionProgress& subcompaction_progress =
  2558. sub_compact->GetSubcompactionProgressRef();
  2559. IterKey next_ikey_to_compact;
  2560. next_ikey_to_compact.SetInternalKey(ExtractUserKey(next_table_min_key),
  2561. kMaxSequenceNumber, kValueTypeForSeek);
  2562. subcompaction_progress.next_internal_key_to_compact =
  2563. next_ikey_to_compact.GetInternalKey().ToString();
  2564. // Track total processed input records for progress reporting by combining:
  2565. // - Resumed count: records already processed before compaction was
  2566. // interrupted
  2567. // - Current count: records scanned in the current compaction session
  2568. // Only update when both tracking mechanisms provide accurate counts to ensure
  2569. // reliability.
  2570. subcompaction_progress.num_processed_input_records =
  2571. c_iter->HasNumInputEntryScanned() &&
  2572. sub_compact->compaction_job_stats.has_accurate_num_input_records
  2573. ? c_iter->NumInputEntryScanned() +
  2574. sub_compact->compaction_job_stats.num_input_records
  2575. : 0;
  2576. UpdateSubcompactionProgressPerLevel(
  2577. sub_compact, false /* is_proximal_level */, subcompaction_progress);
  2578. if (sub_compact->compaction->SupportsPerKeyPlacement()) {
  2579. UpdateSubcompactionProgressPerLevel(
  2580. sub_compact, true /* is_proximal_level */, subcompaction_progress);
  2581. }
  2582. }
  2583. void CompactionJob::UpdateSubcompactionProgressPerLevel(
  2584. SubcompactionState* sub_compact, bool is_proximal_level,
  2585. SubcompactionProgress& subcompaction_progress) {
  2586. SubcompactionProgressPerLevel& subcompaction_progress_per_level =
  2587. is_proximal_level ? subcompaction_progress.proximal_output_level_progress
  2588. : subcompaction_progress.output_level_progress;
  2589. subcompaction_progress_per_level.SetNumProcessedOutputRecords(
  2590. sub_compact->OutputStats(is_proximal_level)->num_output_records);
  2591. const auto& prev_output_files =
  2592. subcompaction_progress_per_level.GetOutputFiles();
  2593. const auto& current_output_files =
  2594. sub_compact->Outputs(is_proximal_level)->GetOutputs();
  2595. for (size_t i = prev_output_files.size(); i < current_output_files.size();
  2596. i++) {
  2597. subcompaction_progress_per_level.AddToOutputFiles(
  2598. current_output_files[i].meta);
  2599. }
  2600. }
  2601. Status CompactionJob::PersistSubcompactionProgress(
  2602. SubcompactionState* sub_compact) {
  2603. SubcompactionProgress& subcompaction_progress =
  2604. sub_compact->GetSubcompactionProgressRef();
  2605. assert(compaction_progress_writer_);
  2606. VersionEdit edit;
  2607. edit.SetSubcompactionProgress(subcompaction_progress);
  2608. std::string record;
  2609. if (!edit.EncodeTo(&record)) {
  2610. ROCKS_LOG_ERROR(
  2611. db_options_.info_log,
  2612. "[%s] [JOB %d] Failed to encode subcompaction "
  2613. "progress",
  2614. compact_->compaction->column_family_data()->GetName().c_str(), job_id_);
  2615. return Status::Corruption("Failed to encode subcompaction progress");
  2616. }
  2617. WriteOptions write_options(Env::IOActivity::kCompaction);
  2618. Status s = compaction_progress_writer_->AddRecord(write_options, record);
  2619. IOOptions opts;
  2620. if (s.ok()) {
  2621. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  2622. }
  2623. if (s.ok()) {
  2624. s = compaction_progress_writer_->file()->Sync(opts, db_options_.use_fsync);
  2625. }
  2626. if (!s.ok()) {
  2627. ROCKS_LOG_ERROR(
  2628. db_options_.info_log,
  2629. "[%s] [JOB %d] Failed to persist subcompaction "
  2630. "progress: %s",
  2631. compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
  2632. s.ToString().c_str());
  2633. return s;
  2634. }
  2635. subcompaction_progress.output_level_progress
  2636. .UpdateLastPersistedOutputFilesCount();
  2637. subcompaction_progress.proximal_output_level_progress
  2638. .UpdateLastPersistedOutputFilesCount();
  2639. return Status::OK();
  2640. }
  2641. Status CompactionJob::VerifyInputRecordCount(
  2642. uint64_t num_input_range_del) const {
  2643. size_t ts_sz = compact_->compaction->column_family_data()
  2644. ->user_comparator()
  2645. ->timestamp_size();
  2646. // When trim_ts_ is non-empty, CompactionIterator takes
  2647. // HistoryTrimmingIterator as input iterator and sees a trimmed view of
  2648. // input keys. So the number of keys it processed is not suitable for
  2649. // verification here.
  2650. // TODO: support verification when trim_ts_ is non-empty.
  2651. if (!(ts_sz > 0 && !trim_ts_.empty())) {
  2652. assert(internal_stats_.output_level_stats.num_input_records > 0);
  2653. // TODO: verify the number of range deletion entries.
  2654. uint64_t expected = internal_stats_.output_level_stats.num_input_records -
  2655. num_input_range_del;
  2656. uint64_t actual = job_stats_->num_input_records;
  2657. if (expected != actual) {
  2658. char scratch[2345];
  2659. compact_->compaction->Summary(scratch, sizeof(scratch));
  2660. std::string msg =
  2661. "Compaction number of input keys does not match "
  2662. "number of keys processed. Expected " +
  2663. std::to_string(expected) + " but processed " +
  2664. std::to_string(actual) + ". Compaction summary: " + scratch;
  2665. ROCKS_LOG_WARN(
  2666. db_options_.info_log,
  2667. "[%s] [JOB %d] VerifyInputRecordCount() Status: %s",
  2668. compact_->compaction->column_family_data()->GetName().c_str(),
  2669. job_context_->job_id, msg.c_str());
  2670. if (db_options_.compaction_verify_record_count) {
  2671. return Status::Corruption(msg);
  2672. }
  2673. }
  2674. }
  2675. return Status::OK();
  2676. }
  2677. Status CompactionJob::VerifyOutputRecordCount() const {
  2678. uint64_t total_output_num = 0;
  2679. for (const auto& state : compact_->sub_compact_states) {
  2680. for (const auto& output : state.GetOutputs()) {
  2681. total_output_num += output.table_properties->num_entries -
  2682. output.table_properties->num_range_deletions;
  2683. }
  2684. }
  2685. uint64_t expected = internal_stats_.output_level_stats.num_output_records;
  2686. if (internal_stats_.has_proximal_level_output) {
  2687. expected += internal_stats_.proximal_level_stats.num_output_records;
  2688. }
  2689. if (expected != total_output_num) {
  2690. char scratch[2345];
  2691. compact_->compaction->Summary(scratch, sizeof(scratch));
  2692. std::string msg =
  2693. "Number of keys in compaction output SST files does not match "
  2694. "number of keys added. Expected " +
  2695. std::to_string(expected) + " but there are " +
  2696. std::to_string(total_output_num) +
  2697. " in output SST files. Compaction summary: " + scratch;
  2698. ROCKS_LOG_WARN(
  2699. db_options_.info_log,
  2700. "[%s] [JOB %d] VerifyOutputRecordCount() status: %s",
  2701. compact_->compaction->column_family_data()->GetName().c_str(),
  2702. job_context_->job_id, msg.c_str());
  2703. if (db_options_.compaction_verify_record_count) {
  2704. return Status::Corruption(msg);
  2705. }
  2706. }
  2707. return Status::OK();
  2708. }
  2709. } // namespace ROCKSDB_NAMESPACE