| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/compaction/compaction_job.h"
- #include <algorithm>
- #include <cinttypes>
- #include <memory>
- #include <optional>
- #include <set>
- #include <utility>
- #include <vector>
- #include "db/blob/blob_counting_iterator.h"
- #include "db/blob/blob_file_addition.h"
- #include "db/blob/blob_file_builder.h"
- #include "db/builder.h"
- #include "db/compaction/clipping_iterator.h"
- #include "db/compaction/compaction_state.h"
- #include "db/db_impl/db_impl.h"
- #include "db/dbformat.h"
- #include "db/error_handler.h"
- #include "db/event_helpers.h"
- #include "db/history_trimming_iterator.h"
- #include "db/log_writer.h"
- #include "db/merge_helper.h"
- #include "db/range_del_aggregator.h"
- #include "db/version_edit.h"
- #include "db/version_set.h"
- #include "file/filename.h"
- #include "file/read_write_util.h"
- #include "file/sst_file_manager_impl.h"
- #include "file/writable_file_writer.h"
- #include "logging/log_buffer.h"
- #include "logging/logging.h"
- #include "monitoring/iostats_context_imp.h"
- #include "monitoring/thread_status_util.h"
- #include "options/configurable_helper.h"
- #include "options/options_helper.h"
- #include "port/port.h"
- #include "rocksdb/db.h"
- #include "rocksdb/env.h"
- #include "rocksdb/options.h"
- #include "rocksdb/statistics.h"
- #include "rocksdb/status.h"
- #include "rocksdb/table.h"
- #include "rocksdb/utilities/options_type.h"
- #include "table/format.h"
- #include "table/merging_iterator.h"
- #include "table/meta_blocks.h"
- #include "table/table_builder.h"
- #include "table/unique_id_impl.h"
- #include "test_util/sync_point.h"
- #include "util/stop_watch.h"
- namespace ROCKSDB_NAMESPACE {
- const char* GetCompactionReasonString(CompactionReason compaction_reason) {
- switch (compaction_reason) {
- case CompactionReason::kUnknown:
- return "Unknown";
- case CompactionReason::kLevelL0FilesNum:
- return "LevelL0FilesNum";
- case CompactionReason::kLevelMaxLevelSize:
- return "LevelMaxLevelSize";
- case CompactionReason::kUniversalSizeAmplification:
- return "UniversalSizeAmplification";
- case CompactionReason::kUniversalSizeRatio:
- return "UniversalSizeRatio";
- case CompactionReason::kUniversalSortedRunNum:
- return "UniversalSortedRunNum";
- case CompactionReason::kFIFOMaxSize:
- return "FIFOMaxSize";
- case CompactionReason::kFIFOReduceNumFiles:
- return "FIFOReduceNumFiles";
- case CompactionReason::kFIFOTtl:
- return "FIFOTtl";
- case CompactionReason::kManualCompaction:
- return "ManualCompaction";
- case CompactionReason::kFilesMarkedForCompaction:
- return "FilesMarkedForCompaction";
- case CompactionReason::kBottommostFiles:
- return "BottommostFiles";
- case CompactionReason::kTtl:
- return "Ttl";
- case CompactionReason::kFlush:
- return "Flush";
- case CompactionReason::kExternalSstIngestion:
- return "ExternalSstIngestion";
- case CompactionReason::kPeriodicCompaction:
- return "PeriodicCompaction";
- case CompactionReason::kChangeTemperature:
- return "ChangeTemperature";
- case CompactionReason::kForcedBlobGC:
- return "ForcedBlobGC";
- case CompactionReason::kRoundRobinTtl:
- return "RoundRobinTtl";
- case CompactionReason::kRefitLevel:
- return "RefitLevel";
- case CompactionReason::kNumOfReasons:
- // fall through
- default:
- assert(false);
- return "Invalid";
- }
- }
- const char* GetCompactionProximalOutputRangeTypeString(
- Compaction::ProximalOutputRangeType range_type) {
- switch (range_type) {
- case Compaction::ProximalOutputRangeType::kNotSupported:
- return "NotSupported";
- case Compaction::ProximalOutputRangeType::kFullRange:
- return "FullRange";
- case Compaction::ProximalOutputRangeType::kNonLastRange:
- return "NonLastRange";
- case Compaction::ProximalOutputRangeType::kDisabled:
- return "Disabled";
- default:
- assert(false);
- return "Invalid";
- }
- }
- CompactionJob::CompactionJob(
- int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
- const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
- VersionSet* versions, const std::atomic<bool>* shutting_down,
- LogBuffer* log_buffer, FSDirectory* db_directory,
- FSDirectory* output_directory, FSDirectory* blob_output_directory,
- Statistics* stats, InstrumentedMutex* db_mutex,
- ErrorHandler* db_error_handler, JobContext* job_context,
- std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
- bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
- CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
- const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<bool>& manual_compaction_canceled,
- const std::string& db_id, const std::string& db_session_id,
- std::string full_history_ts_low, std::string trim_ts,
- BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
- int* bg_bottom_compaction_scheduled)
- : compact_(new CompactionState(compaction)),
- internal_stats_(compaction->compaction_reason(), 1),
- db_options_(db_options),
- mutable_db_options_copy_(mutable_db_options),
- log_buffer_(log_buffer),
- output_directory_(output_directory),
- stats_(stats),
- bottommost_level_(false),
- write_hint_(Env::WLTH_NOT_SET),
- job_stats_(compaction_job_stats),
- job_id_(job_id),
- dbname_(dbname),
- db_id_(db_id),
- db_session_id_(db_session_id),
- file_options_(file_options),
- env_(db_options.env),
- io_tracer_(io_tracer),
- fs_(db_options.fs, io_tracer),
- file_options_for_read_(
- fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
- versions_(versions),
- shutting_down_(shutting_down),
- manual_compaction_canceled_(manual_compaction_canceled),
- db_directory_(db_directory),
- blob_output_directory_(blob_output_directory),
- db_mutex_(db_mutex),
- db_error_handler_(db_error_handler),
- // job_context cannot be nullptr, but we will assert later in the body of
- // the constructor.
- earliest_snapshot_(job_context
- ? job_context->GetEarliestSnapshotSequence()
- : kMaxSequenceNumber),
- job_context_(job_context),
- table_cache_(std::move(table_cache)),
- event_logger_(event_logger),
- paranoid_file_checks_(paranoid_file_checks),
- measure_io_stats_(measure_io_stats),
- thread_pri_(thread_pri),
- full_history_ts_low_(std::move(full_history_ts_low)),
- trim_ts_(std::move(trim_ts)),
- blob_callback_(blob_callback),
- extra_num_subcompaction_threads_reserved_(0),
- bg_compaction_scheduled_(bg_compaction_scheduled),
- bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
- assert(job_stats_ != nullptr);
- assert(log_buffer_ != nullptr);
- assert(job_context);
- assert(job_context->snapshot_context_initialized);
- const auto* cfd = compact_->compaction->column_family_data();
- ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
- ThreadStatusUtil::SetColumnFamily(cfd);
- ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
- ReportStartedCompaction(compaction);
- }
- CompactionJob::~CompactionJob() {
- assert(compact_ == nullptr);
- ThreadStatusUtil::ResetThreadStatus();
- }
- void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
- ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
- job_id_);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
- (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
- compact_->compaction->output_level());
- // In the current design, a CompactionJob is always created
- // for non-trivial compaction.
- assert(compaction->IsTrivialMove() == false ||
- compaction->is_manual_compaction() == true);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_PROP_FLAGS,
- compaction->is_manual_compaction() +
- (compaction->deletion_compaction() << 1));
- auto total_input_bytes = compaction->CalculateTotalInputSize();
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, total_input_bytes);
- IOSTATS_RESET(bytes_written);
- IOSTATS_RESET(bytes_read);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_BYTES_READ, 0);
- // Set the thread operation after operation properties
- // to ensure GetThreadList() can always show them all together.
- ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
- job_stats_->is_manual_compaction = compaction->is_manual_compaction();
- job_stats_->is_full_compaction = compaction->is_full_compaction();
- // populate compaction stats num_input_files and total_num_of_bytes
- size_t num_input_files = 0;
- for (int input_level = 0;
- input_level < static_cast<int>(compaction->num_input_levels());
- ++input_level) {
- const LevelFilesBrief* flevel = compaction->input_levels(input_level);
- num_input_files += flevel->num_files;
- }
- job_stats_->CompactionJobStats::num_input_files = num_input_files;
- job_stats_->total_input_bytes = total_input_bytes;
- }
- void CompactionJob::Prepare(
- std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
- known_single_subcompact,
- const CompactionProgress& compaction_progress,
- log::Writer* compaction_progress_writer) {
- db_mutex_->AssertHeld();
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_PREPARE);
- // Generate file_levels_ for compaction before making Iterator
- auto* c = compact_->compaction;
- [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
- assert(cfd != nullptr);
- const VersionStorageInfo* storage_info = c->input_version()->storage_info();
- assert(storage_info);
- assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
- write_hint_ = storage_info->CalculateSSTWriteHint(
- c->output_level(), db_options_.calculate_sst_write_lifetime_hint_set);
- bottommost_level_ = c->bottommost_level();
- if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) {
- StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
- GenSubcompactionBoundaries();
- }
- if (boundaries_.size() >= 1) {
- assert(!known_single_subcompact.has_value());
- for (size_t i = 0; i <= boundaries_.size(); i++) {
- compact_->sub_compact_states.emplace_back(
- c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
- (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
- : std::nullopt,
- static_cast<uint32_t>(i));
- // assert to validate that boundaries don't have same user keys (without
- // timestamp part).
- assert(i == 0 || i == boundaries_.size() ||
- cfd->user_comparator()->CompareWithoutTimestamp(
- boundaries_[i - 1], boundaries_[i]) < 0);
- }
- RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
- compact_->sub_compact_states.size());
- } else {
- std::optional<Slice> start_key;
- std::optional<Slice> end_key;
- if (known_single_subcompact.has_value()) {
- start_key = known_single_subcompact.value().first;
- end_key = known_single_subcompact.value().second;
- } else {
- assert(!start_key.has_value() && !end_key.has_value());
- }
- compact_->sub_compact_states.emplace_back(c, start_key, end_key,
- /*sub_job_id*/ 0);
- }
- MaybeAssignCompactionProgressAndWriter(compaction_progress,
- compaction_progress_writer);
- // collect all seqno->time information from the input files which will be used
- // to encode seqno->time to the output files.
- SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber;
- SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber;
- uint64_t preserve_time_duration =
- MinAndMaxPreserveSeconds(c->mutable_cf_options()).max_preserve_seconds;
- if (preserve_time_duration > 0) {
- const ReadOptions read_options(Env::IOActivity::kCompaction);
- // Setup seqno_to_time_mapping_ with relevant time range.
- seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration);
- for (const auto& each_level : *c->inputs()) {
- for (const auto& fmd : each_level.files) {
- std::shared_ptr<const TableProperties> tp;
- Status s = c->input_version()->GetTableProperties(read_options, &tp,
- fmd, nullptr);
- if (s.ok()) {
- s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
- }
- if (!s.ok()) {
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "Problem reading or processing seqno-to-time mapping: %s",
- s.ToString().c_str());
- }
- }
- }
- int64_t _current_time = 0;
- Status s = db_options_.clock->GetCurrentTime(&_current_time);
- if (!s.ok()) {
- ROCKS_LOG_WARN(db_options_.info_log,
- "Failed to get current time in compaction: Status: %s",
- s.ToString().c_str());
- // preserve all time information
- preserve_time_min_seqno = 0;
- preclude_last_level_min_seqno = 0;
- seqno_to_time_mapping_.Enforce();
- } else {
- seqno_to_time_mapping_.Enforce(_current_time);
- seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
- static_cast<uint64_t>(_current_time),
- c->mutable_cf_options().preserve_internal_time_seconds,
- c->mutable_cf_options().preclude_last_level_data_seconds,
- &preserve_time_min_seqno, &preclude_last_level_min_seqno);
- }
- // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
- // limit the capacity after them.
- // Here If we set capacity to the per-SST limit, we could be throwing away
- // fidelity when a compaction output file has a narrower seqno range than
- // all the inputs. If we only limit capacity for each compaction output, we
- // could be doing a lot of unnecessary recomputation in a large compaction
- // (up to quadratic in number of files). Thus, we do soemthing in the
- // middle: enforce a resonably large constant size limit substantially
- // larger than kMaxSeqnoTimePairsPerSST.
- seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
- }
- #ifndef NDEBUG
- assert(preserve_time_min_seqno <= preclude_last_level_min_seqno);
- TEST_SYNC_POINT_CALLBACK(
- "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
- static_cast<void*>(&preclude_last_level_min_seqno));
- // Restore the invariant asserted above, in case it was broken under the
- // callback
- preserve_time_min_seqno =
- std::min(preclude_last_level_min_seqno, preserve_time_min_seqno);
- #endif
- // Preserve sequence numbers for preserved write times and snapshots, though
- // the specific sequence number of the earliest snapshot can be zeroed.
- preserve_seqno_after_ =
- std::max(preserve_time_min_seqno, SequenceNumber{1}) - 1;
- preserve_seqno_after_ = std::min(preserve_seqno_after_, earliest_snapshot_);
- // If using preclude feature, also preclude snapshots from last level, just
- // because they are heuristically more likely to be accessed than non-snapshot
- // data.
- if (preclude_last_level_min_seqno < kMaxSequenceNumber &&
- earliest_snapshot_ < preclude_last_level_min_seqno) {
- preclude_last_level_min_seqno = earliest_snapshot_;
- }
- // Now combine what we would like to preclude from last level with what we
- // can safely support without dangerously moving data back up the LSM tree,
- // to get the final seqno threshold for proximal vs. last. In particular,
- // when the reserved output key range for the proximal level does not
- // include the entire last level input key range, we need to keep entries
- // already in the last level there. (Even allowing within-range entries to
- // move back up could cause problems with range tombstones. Perhaps it
- // would be better in some rare cases to keep entries in the last level
- // one-by-one rather than based on sequence number, but that would add extra
- // tracking and complexity to CompactionIterator that is probably not
- // worthwhile overall. Correctness is also more clear when splitting by
- // seqno threshold.)
- proximal_after_seqno_ = std::max(preclude_last_level_min_seqno,
- c->GetKeepInLastLevelThroughSeqno());
- options_file_number_ = versions_->options_file_number();
- }
- void CompactionJob::MaybeAssignCompactionProgressAndWriter(
- const CompactionProgress& compaction_progress,
- log::Writer* compaction_progress_writer) {
- // LIMITATION: Only supports resuming single subcompaction for now
- if (compact_->sub_compact_states.size() != 1) {
- return;
- }
- if (!compaction_progress.empty()) {
- assert(compaction_progress.size() == 1);
- SubcompactionState* sub_compact = &compact_->sub_compact_states[0];
- const SubcompactionProgress& subcompaction_progress =
- compaction_progress[0];
- sub_compact->SetSubcompactionProgress(subcompaction_progress);
- }
- compaction_progress_writer_ = compaction_progress_writer;
- }
- uint64_t CompactionJob::GetSubcompactionsLimit() {
- return extra_num_subcompaction_threads_reserved_ +
- std::max(
- std::uint64_t(1),
- static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
- }
- void CompactionJob::AcquireSubcompactionResources(
- int num_extra_required_subcompactions) {
- TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
- TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
- int max_db_compactions =
- DBImpl::GetBGJobLimits(
- mutable_db_options_copy_.max_background_flushes,
- mutable_db_options_copy_.max_background_compactions,
- mutable_db_options_copy_.max_background_jobs,
- versions_->GetColumnFamilySet()
- ->write_controller()
- ->NeedSpeedupCompaction())
- .max_compactions;
- InstrumentedMutexLock l(db_mutex_);
- // Apply min function first since We need to compute the extra subcompaction
- // against compaction limits. And then try to reserve threads for extra
- // subcompactions. The actual number of reserved threads could be less than
- // the desired number.
- int available_bg_compactions_against_db_limit =
- std::max(max_db_compactions - *bg_compaction_scheduled_ -
- *bg_bottom_compaction_scheduled_,
- 0);
- // Reservation only supports backgrdoun threads of which the priority is
- // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
- // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
- extra_num_subcompaction_threads_reserved_ =
- env_->ReserveThreads(std::min(num_extra_required_subcompactions,
- available_bg_compactions_against_db_limit),
- std::min(thread_pri_, Env::Priority::HIGH));
- // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
- // depending on if this compaction has the bottommost priority
- if (thread_pri_ == Env::Priority::BOTTOM) {
- *bg_bottom_compaction_scheduled_ +=
- extra_num_subcompaction_threads_reserved_;
- } else {
- *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
- }
- }
- void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
- // Do nothing when we have zero resources to shrink
- if (num_extra_resources == 0) {
- return;
- }
- db_mutex_->Lock();
- // We cannot release threads more than what we reserved before
- int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
- (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
- // Update the number of reserved threads and the number of background
- // scheduled compactions for this compaction job
- extra_num_subcompaction_threads_reserved_ -=
- extra_num_subcompaction_threads_released;
- // TODO (zichen): design a test case with new subcompaction partitioning
- // when the number of actual partitions is less than the number of planned
- // partitions
- assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
- // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
- // depending on if this compaction has the bottommost priority
- if (thread_pri_ == Env::Priority::BOTTOM) {
- *bg_bottom_compaction_scheduled_ -=
- extra_num_subcompaction_threads_released;
- } else {
- *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
- }
- db_mutex_->Unlock();
- TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
- }
- void CompactionJob::ReleaseSubcompactionResources() {
- if (extra_num_subcompaction_threads_reserved_ == 0) {
- return;
- }
- {
- InstrumentedMutexLock l(db_mutex_);
- // The number of reserved threads becomes larger than 0 only if the
- // compaction prioity is round robin and there is no sufficient
- // sub-compactions available
- // The scheduled compaction must be no less than 1 + extra number
- // subcompactions using acquired resources since this compaction job has not
- // finished yet
- assert(*bg_bottom_compaction_scheduled_ >=
- 1 + extra_num_subcompaction_threads_reserved_ ||
- *bg_compaction_scheduled_ >=
- 1 + extra_num_subcompaction_threads_reserved_);
- }
- ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
- }
- void CompactionJob::GenSubcompactionBoundaries() {
- // The goal is to find some boundary keys so that we can evenly partition
- // the compaction input data into max_subcompactions ranges.
- // For every input file, we ask TableReader to estimate 128 anchor points
- // that evenly partition the input file into 128 ranges and the range
- // sizes. This can be calculated by scanning index blocks of the file.
- // Once we have the anchor points for all the input files, we merge them
- // together and try to find keys dividing ranges evenly.
- // For example, if we have two input files, and each returns following
- // ranges:
- // File1: (a1, 1000), (b1, 1200), (c1, 1100)
- // File2: (a2, 1100), (b2, 1000), (c2, 1000)
- // We total sort the keys to following:
- // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
- // We calculate the total size by adding up all ranges' size, which is 6400.
- // If we would like to partition into 2 subcompactions, the target of the
- // range size is 3200. Based on the size, we take "b1" as the partition key
- // since the first three ranges would hit 3200.
- //
- // Note that the ranges are actually overlapping. For example, in the example
- // above, the range ending with "b1" is overlapping with the range ending with
- // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
- // "b1". In extreme cases where we only compact N L0 files, a range can
- // overlap with N-1 other ranges. Since we requested a relatively large number
- // (128) of ranges from each input files, even N range overlapping would
- // cause relatively small inaccuracy.
- ReadOptions read_options(Env::IOActivity::kCompaction);
- read_options.rate_limiter_priority = GetRateLimiterPriority();
- auto* c = compact_->compaction;
- if (c->mutable_cf_options().table_factory->Name() ==
- TableFactory::kPlainTableName()) {
- return;
- }
- if (c->max_subcompactions() <= 1 &&
- !(c->immutable_options().compaction_pri == kRoundRobin &&
- c->immutable_options().compaction_style == kCompactionStyleLevel)) {
- return;
- }
- auto* cfd = c->column_family_data();
- const Comparator* cfd_comparator = cfd->user_comparator();
- const InternalKeyComparator& icomp = cfd->internal_comparator();
- auto* v = compact_->compaction->input_version();
- int base_level = v->storage_info()->base_level();
- InstrumentedMutexUnlock unlock_guard(db_mutex_);
- uint64_t total_size = 0;
- std::vector<TableReader::Anchor> all_anchors;
- int start_lvl = c->start_level();
- int out_lvl = c->output_level();
- for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
- int lvl = c->level(lvl_idx);
- if (lvl >= start_lvl && lvl <= out_lvl) {
- const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
- size_t num_files = flevel->num_files;
- if (num_files == 0) {
- continue;
- }
- for (size_t i = 0; i < num_files; i++) {
- FileMetaData* f = flevel->files[i].file_metadata;
- std::vector<TableReader::Anchor> my_anchors;
- Status s = cfd->table_cache()->ApproximateKeyAnchors(
- read_options, icomp, *f, c->mutable_cf_options(), my_anchors);
- if (!s.ok() || my_anchors.empty()) {
- my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
- }
- for (auto& ac : my_anchors) {
- // Can be optimize to avoid this loop.
- total_size += ac.range_size;
- }
- all_anchors.insert(all_anchors.end(), my_anchors.begin(),
- my_anchors.end());
- }
- }
- }
- // Here we total sort all the anchor points across all files and go through
- // them in the sorted order to find partitioning boundaries.
- // Not the most efficient implementation. A much more efficient algorithm
- // probably exists. But they are more complex. If performance turns out to
- // be a problem, we can optimize.
- std::sort(
- all_anchors.begin(), all_anchors.end(),
- [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
- return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
- 0;
- });
- // Remove duplicated entries from boundaries.
- all_anchors.erase(
- std::unique(all_anchors.begin(), all_anchors.end(),
- [cfd_comparator](TableReader::Anchor& a,
- TableReader::Anchor& b) -> bool {
- return cfd_comparator->CompareWithoutTimestamp(
- a.user_key, b.user_key) == 0;
- }),
- all_anchors.end());
- // Get the number of planned subcompactions, may update reserve threads
- // and update extra_num_subcompaction_threads_reserved_ for round-robin
- uint64_t num_planned_subcompactions;
- if (c->immutable_options().compaction_pri == kRoundRobin &&
- c->immutable_options().compaction_style == kCompactionStyleLevel) {
- // For round-robin compaction prioity, we need to employ more
- // subcompactions (may exceed the max_subcompaction limit). The extra
- // subcompactions will be executed using reserved threads and taken into
- // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
- // Initialized by the number of input files
- num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
- uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
- if (max_subcompactions_limit < num_planned_subcompactions) {
- // Assert two pointers are not empty so that we can use extra
- // subcompactions against db compaction limits
- assert(bg_bottom_compaction_scheduled_ != nullptr);
- assert(bg_compaction_scheduled_ != nullptr);
- // Reserve resources when max_subcompaction is not sufficient
- AcquireSubcompactionResources(
- (int)(num_planned_subcompactions - max_subcompactions_limit));
- // Subcompactions limit changes after acquiring additional resources.
- // Need to call GetSubcompactionsLimit() again to update the number
- // of planned subcompactions
- num_planned_subcompactions =
- std::min(num_planned_subcompactions, GetSubcompactionsLimit());
- } else {
- num_planned_subcompactions = max_subcompactions_limit;
- }
- } else {
- num_planned_subcompactions = GetSubcompactionsLimit();
- }
- TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
- &num_planned_subcompactions);
- if (num_planned_subcompactions == 1) {
- return;
- }
- // Group the ranges into subcompactions
- uint64_t target_range_size = std::max(
- total_size / num_planned_subcompactions,
- MaxFileSizeForLevel(
- c->mutable_cf_options(), out_lvl,
- c->immutable_options().compaction_style, base_level,
- c->immutable_options().level_compaction_dynamic_level_bytes));
- if (target_range_size >= total_size) {
- return;
- }
- uint64_t next_threshold = target_range_size;
- uint64_t cumulative_size = 0;
- uint64_t num_actual_subcompactions = 1U;
- for (TableReader::Anchor& anchor : all_anchors) {
- cumulative_size += anchor.range_size;
- if (cumulative_size > next_threshold) {
- next_threshold += target_range_size;
- num_actual_subcompactions++;
- boundaries_.push_back(anchor.user_key);
- }
- if (num_actual_subcompactions == num_planned_subcompactions) {
- break;
- }
- }
- TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
- &num_actual_subcompactions);
- // Shrink extra subcompactions resources when extra resrouces are acquired
- ShrinkSubcompactionResources(
- std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
- extra_num_subcompaction_threads_reserved_));
- }
- void CompactionJob::InitializeCompactionRun() {
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_RUN);
- TEST_SYNC_POINT("CompactionJob::Run():Start");
- log_buffer_->FlushBufferToLog();
- LogCompaction();
- }
- void CompactionJob::RunSubcompactions() {
- const size_t num_threads = compact_->sub_compact_states.size();
- assert(num_threads > 0);
- compact_->compaction->GetOrInitInputTableProperties();
- // Launch a thread for each of subcompactions 1...num_threads-1
- std::vector<port::Thread> thread_pool;
- thread_pool.reserve(num_threads - 1);
- for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
- thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
- &compact_->sub_compact_states[i]);
- }
- // Always schedule the first subcompaction (whether or not there are also
- // others) in the current thread to be efficient with resources
- ProcessKeyValueCompaction(compact_->sub_compact_states.data());
- // Wait for all other threads (if there are any) to finish execution
- for (auto& thread : thread_pool) {
- thread.join();
- }
- RemoveEmptyOutputs();
- ReleaseSubcompactionResources();
- TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources");
- }
- void CompactionJob::UpdateTimingStats(uint64_t start_micros) {
- internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
- for (auto& state : compact_->sub_compact_states) {
- internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
- }
- RecordTimeToHistogram(stats_, COMPACTION_TIME,
- internal_stats_.output_level_stats.micros);
- RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
- internal_stats_.output_level_stats.cpu_micros);
- }
- void CompactionJob::RemoveEmptyOutputs() {
- for (auto& state : compact_->sub_compact_states) {
- state.RemoveLastEmptyOutput();
- }
- }
- bool CompactionJob::HasNewBlobFiles() const {
- for (const auto& state : compact_->sub_compact_states) {
- if (state.Current().HasBlobFileAdditions()) {
- return true;
- }
- }
- return false;
- }
- Status CompactionJob::CollectSubcompactionErrors() {
- Status status;
- IOStatus io_s;
- for (const auto& state : compact_->sub_compact_states) {
- if (!state.status.ok()) {
- status = state.status;
- io_s = state.io_status;
- break;
- }
- }
- if (io_status_.ok()) {
- io_status_ = io_s;
- }
- return status;
- }
- Status CompactionJob::SyncOutputDirectories() {
- Status status;
- IOStatus io_s;
- constexpr IODebugContext* dbg = nullptr;
- const bool wrote_new_blob_files = HasNewBlobFiles();
- if (output_directory_) {
- io_s = output_directory_->FsyncWithDirOptions(
- IOOptions(), dbg,
- DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
- }
- if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
- blob_output_directory_ != output_directory_) {
- io_s = blob_output_directory_->FsyncWithDirOptions(
- IOOptions(), dbg,
- DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
- }
- if (io_status_.ok()) {
- io_status_ = io_s;
- }
- if (status.ok()) {
- status = io_s;
- }
- return status;
- }
- Status CompactionJob::VerifyOutputFiles() {
- Status status;
- std::vector<port::Thread> thread_pool;
- std::vector<const CompactionOutputs::Output*> files_output;
- for (const auto& state : compact_->sub_compact_states) {
- for (const auto& output : state.GetOutputs()) {
- files_output.emplace_back(&output);
- }
- }
- ColumnFamilyData* cfd = compact_->compaction->column_family_data();
- std::atomic<size_t> next_file_idx(0);
- auto verify_table = [&](Status& output_status) {
- while (true) {
- size_t file_idx = next_file_idx.fetch_add(1);
- if (file_idx >= files_output.size()) {
- break;
- }
- // Verify that the table is usable
- // We set for_compaction to false and don't
- // OptimizeForCompactionTableRead here because this is a special case
- // after we finish the table building No matter whether
- // use_direct_io_for_flush_and_compaction is true, we will regard this
- // verification as user reads since the goal is to cache it here for
- // further user reads
- ReadOptions verify_table_read_options(Env::IOActivity::kCompaction);
- verify_table_read_options.rate_limiter_priority =
- GetRateLimiterPriority();
- InternalIterator* iter = cfd->table_cache()->NewIterator(
- verify_table_read_options, file_options_, cfd->internal_comparator(),
- files_output[file_idx]->meta,
- /*range_del_agg=*/nullptr, compact_->compaction->mutable_cf_options(),
- /*table_reader_ptr=*/nullptr,
- cfd->internal_stats()->GetFileReadHist(
- compact_->compaction->output_level()),
- TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
- /*skip_filters=*/false, compact_->compaction->output_level(),
- MaxFileSizeForL0MetaPin(compact_->compaction->mutable_cf_options()),
- /*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr,
- /*allow_unprepared_value=*/false);
- auto s = iter->status();
- if (s.ok() && paranoid_file_checks_) {
- OutputValidator validator(cfd->internal_comparator(),
- /*_enable_hash=*/true);
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- s = validator.Add(iter->key(), iter->value());
- if (!s.ok()) {
- break;
- }
- }
- if (s.ok()) {
- s = iter->status();
- }
- if (s.ok() &&
- !validator.CompareValidator(files_output[file_idx]->validator)) {
- s = Status::Corruption("Paranoid checksums do not match");
- }
- }
- delete iter;
- if (!s.ok()) {
- output_status = s;
- break;
- }
- }
- };
- for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
- thread_pool.emplace_back(verify_table,
- std::ref(compact_->sub_compact_states[i].status));
- }
- verify_table(compact_->sub_compact_states[0].status);
- for (auto& thread : thread_pool) {
- thread.join();
- }
- for (const auto& state : compact_->sub_compact_states) {
- if (!state.status.ok()) {
- status = state.status;
- break;
- }
- }
- return status;
- }
- void CompactionJob::SetOutputTableProperties() {
- for (const auto& state : compact_->sub_compact_states) {
- for (const auto& output : state.GetOutputs()) {
- auto fn =
- TableFileName(state.compaction->immutable_options().cf_paths,
- output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
- compact_->compaction->SetOutputTableProperties(fn,
- output.table_properties);
- }
- }
- }
- void CompactionJob::AggregateSubcompactionOutputAndJobStats() {
- // Before the compaction starts, is_remote_compaction was set to true if
- // compaction_service is set. We now know whether each sub_compaction was
- // done remotely or not. Reset is_remote_compaction back to false and allow
- // AggregateCompactionStats() to set the right value.
- job_stats_->is_remote_compaction = false;
- // Finish up all bookkeeping to unify the subcompaction results.
- compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
- }
- Status CompactionJob::VerifyCompactionRecordCounts(
- bool stats_built_from_input_table_prop, uint64_t num_input_range_del) {
- Status status;
- if (stats_built_from_input_table_prop &&
- job_stats_->has_accurate_num_input_records) {
- status = VerifyInputRecordCount(num_input_range_del);
- if (!status.ok()) {
- return status;
- }
- }
- const auto& mutable_cf_options = compact_->compaction->mutable_cf_options();
- if ((mutable_cf_options.table_factory->IsInstanceOf(
- TableFactory::kBlockBasedTableName()) ||
- mutable_cf_options.table_factory->IsInstanceOf(
- TableFactory::kPlainTableName()))) {
- status = VerifyOutputRecordCount();
- if (!status.ok()) {
- return status;
- }
- }
- return status;
- }
- void CompactionJob::FinalizeCompactionRun(
- const Status& input_status, bool stats_built_from_input_table_prop,
- uint64_t num_input_range_del) {
- if (stats_built_from_input_table_prop) {
- UpdateCompactionJobInputStatsFromInternalStats(internal_stats_,
- num_input_range_del);
- }
- UpdateCompactionJobOutputStatsFromInternalStats(input_status,
- internal_stats_);
- RecordCompactionIOStats();
- LogFlush(db_options_.info_log);
- TEST_SYNC_POINT("CompactionJob::Run():End");
- compact_->status = input_status;
- TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet",
- const_cast<Status*>(&input_status));
- }
- Status CompactionJob::Run() {
- InitializeCompactionRun();
- const uint64_t start_micros = db_options_.clock->NowMicros();
- RunSubcompactions();
- UpdateTimingStats(start_micros);
- TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
- Status status = CollectSubcompactionErrors();
- if (status.ok()) {
- status = SyncOutputDirectories();
- }
- if (status.ok()) {
- status = VerifyOutputFiles();
- }
- if (status.ok()) {
- SetOutputTableProperties();
- }
- AggregateSubcompactionOutputAndJobStats();
- uint64_t num_input_range_del = 0;
- bool stats_built_from_input_table_prop =
- UpdateInternalStatsFromInputFiles(&num_input_range_del);
- if (status.ok()) {
- status = VerifyCompactionRecordCounts(stats_built_from_input_table_prop,
- num_input_range_del);
- }
- FinalizeCompactionRun(status, stats_built_from_input_table_prop,
- num_input_range_del);
- return status;
- }
- Status CompactionJob::Install(bool* compaction_released) {
- assert(compact_);
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_INSTALL);
- db_mutex_->AssertHeld();
- Status status = compact_->status;
- ColumnFamilyData* cfd = compact_->compaction->column_family_data();
- assert(cfd);
- int output_level = compact_->compaction->output_level();
- cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
- internal_stats_);
- if (status.ok()) {
- status = InstallCompactionResults(compaction_released);
- }
- if (!versions_->io_status().ok()) {
- io_status_ = versions_->io_status();
- }
- VersionStorageInfo::LevelSummaryStorage tmp;
- auto vstorage = cfd->current()->storage_info();
- const auto& stats = internal_stats_.output_level_stats;
- double read_write_amp = 0.0;
- double write_amp = 0.0;
- double bytes_read_per_sec = 0;
- double bytes_written_per_sec = 0;
- const uint64_t bytes_read_non_output_and_blob =
- stats.bytes_read_non_output_levels + stats.bytes_read_blob;
- const uint64_t bytes_read_all =
- stats.bytes_read_output_level + bytes_read_non_output_and_blob;
- const uint64_t bytes_written_all =
- stats.bytes_written + stats.bytes_written_blob;
- if (bytes_read_non_output_and_blob > 0) {
- read_write_amp = (bytes_written_all + bytes_read_all) /
- static_cast<double>(bytes_read_non_output_and_blob);
- write_amp =
- bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
- }
- if (stats.micros > 0) {
- bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
- bytes_written_per_sec =
- bytes_written_all / static_cast<double>(stats.micros);
- }
- const std::string& column_family_name = cfd->GetName();
- constexpr double kMB = 1048576.0;
- ROCKS_LOG_BUFFER(
- log_buffer_,
- "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
- "files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
- "MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
- "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
- ", records dropped: %" PRIu64 " output_compression: %s\n",
- column_family_name.c_str(), vstorage->LevelSummary(&tmp),
- bytes_read_per_sec, bytes_written_per_sec,
- compact_->compaction->output_level(),
- stats.num_input_files_in_non_output_levels,
- stats.num_input_files_in_output_level,
- stats.num_filtered_input_files_in_non_output_levels,
- stats.num_filtered_input_files_in_output_level, stats.num_output_files,
- stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
- stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
- stats.bytes_skipped_non_output_levels / kMB,
- stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
- stats.bytes_written_blob / kMB, read_write_amp, write_amp,
- status.ToString().c_str(), stats.num_input_records,
- stats.num_dropped_records,
- CompressionTypeToString(compact_->compaction->output_compression())
- .c_str());
- const auto& blob_files = vstorage->GetBlobFiles();
- if (!blob_files.empty()) {
- assert(blob_files.front());
- assert(blob_files.back());
- ROCKS_LOG_BUFFER(
- log_buffer_,
- "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
- column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
- blob_files.back()->GetBlobFileNumber());
- }
- if (internal_stats_.has_proximal_level_output) {
- ROCKS_LOG_BUFFER(log_buffer_,
- "[%s] has Proximal Level output: %" PRIu64
- ", level %d, number of files: %" PRIu64
- ", number of records: %" PRIu64,
- column_family_name.c_str(),
- internal_stats_.proximal_level_stats.bytes_written,
- compact_->compaction->GetProximalLevel(),
- internal_stats_.proximal_level_stats.num_output_files,
- internal_stats_.proximal_level_stats.num_output_records);
- }
- TEST_SYNC_POINT_CALLBACK(
- "CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_);
- auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
- stream << "job" << job_id_ << "event" << "compaction_finished"
- << "compaction_time_micros" << stats.micros
- << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
- << compact_->compaction->output_level() << "num_output_files"
- << stats.num_output_files << "total_output_size"
- << stats.bytes_written;
- if (stats.num_output_files_blob > 0) {
- stream << "num_blob_output_files" << stats.num_output_files_blob
- << "total_blob_output_size" << stats.bytes_written_blob;
- }
- stream << "num_input_records" << stats.num_input_records
- << "num_output_records" << stats.num_output_records
- << "num_subcompactions" << compact_->sub_compact_states.size()
- << "output_compression"
- << CompressionTypeToString(compact_->compaction->output_compression());
- stream << "num_single_delete_mismatches"
- << job_stats_->num_single_del_mismatch;
- stream << "num_single_delete_fallthrough"
- << job_stats_->num_single_del_fallthru;
- if (measure_io_stats_) {
- stream << "file_write_nanos" << job_stats_->file_write_nanos;
- stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos;
- stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos;
- stream << "file_prepare_write_nanos"
- << job_stats_->file_prepare_write_nanos;
- }
- stream << "lsm_state";
- stream.StartArray();
- for (int level = 0; level < vstorage->num_levels(); ++level) {
- stream << vstorage->NumLevelFiles(level);
- }
- stream.EndArray();
- if (!blob_files.empty()) {
- assert(blob_files.front());
- stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
- assert(blob_files.back());
- stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
- }
- if (internal_stats_.has_proximal_level_output) {
- InternalStats::CompactionStats& pl_stats =
- internal_stats_.proximal_level_stats;
- stream << "proximal_level_num_output_files" << pl_stats.num_output_files;
- stream << "proximal_level_bytes_written" << pl_stats.bytes_written;
- stream << "proximal_level_num_output_records"
- << pl_stats.num_output_records;
- stream << "proximal_level_num_output_files_blob"
- << pl_stats.num_output_files_blob;
- stream << "proximal_level_bytes_written_blob"
- << pl_stats.bytes_written_blob;
- }
- CleanupCompaction();
- return status;
- }
- void CompactionJob::NotifyOnSubcompactionBegin(
- SubcompactionState* sub_compact) {
- Compaction* c = compact_->compaction;
- if (db_options_.listeners.empty()) {
- return;
- }
- if (shutting_down_->load(std::memory_order_acquire)) {
- return;
- }
- if (c->is_manual_compaction() &&
- manual_compaction_canceled_.load(std::memory_order_acquire)) {
- return;
- }
- sub_compact->notify_on_subcompaction_completion = true;
- SubcompactionJobInfo info{};
- sub_compact->BuildSubcompactionJobInfo(info);
- info.job_id = static_cast<int>(job_id_);
- info.thread_id = env_->GetThreadID();
- for (const auto& listener : db_options_.listeners) {
- listener->OnSubcompactionBegin(info);
- }
- info.status.PermitUncheckedError();
- }
- void CompactionJob::NotifyOnSubcompactionCompleted(
- SubcompactionState* sub_compact) {
- if (db_options_.listeners.empty()) {
- return;
- }
- if (shutting_down_->load(std::memory_order_acquire)) {
- return;
- }
- if (sub_compact->notify_on_subcompaction_completion == false) {
- return;
- }
- SubcompactionJobInfo info{};
- sub_compact->BuildSubcompactionJobInfo(info);
- info.job_id = static_cast<int>(job_id_);
- info.thread_id = env_->GetThreadID();
- for (const auto& listener : db_options_.listeners) {
- listener->OnSubcompactionCompleted(info);
- }
- }
- bool CompactionJob::ShouldUseLocalCompaction(SubcompactionState* sub_compact) {
- if (db_options_.compaction_service) {
- CompactionServiceJobStatus comp_status =
- ProcessKeyValueCompactionWithCompactionService(sub_compact);
- if (comp_status != CompactionServiceJobStatus::kUseLocal) {
- return false;
- }
- // fallback to local compaction
- assert(comp_status == CompactionServiceJobStatus::kUseLocal);
- sub_compact->compaction_job_stats.is_remote_compaction = false;
- }
- return true;
- }
- CompactionJob::CompactionIOStatsSnapshot CompactionJob::InitializeIOStats() {
- CompactionIOStatsSnapshot io_stats;
- if (measure_io_stats_) {
- io_stats.prev_perf_level = GetPerfLevel();
- SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
- io_stats.prev_write_nanos = IOSTATS(write_nanos);
- io_stats.prev_fsync_nanos = IOSTATS(fsync_nanos);
- io_stats.prev_range_sync_nanos = IOSTATS(range_sync_nanos);
- io_stats.prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
- io_stats.prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
- io_stats.prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
- }
- return io_stats;
- }
- Status CompactionJob::SetupAndValidateCompactionFilter(
- SubcompactionState* sub_compact,
- const CompactionFilter* configured_compaction_filter,
- const CompactionFilter*& compaction_filter,
- std::unique_ptr<CompactionFilter>& compaction_filter_from_factory) {
- compaction_filter = configured_compaction_filter;
- if (compaction_filter == nullptr) {
- compaction_filter_from_factory =
- sub_compact->compaction->CreateCompactionFilter();
- compaction_filter = compaction_filter_from_factory.get();
- }
- if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
- return Status::NotSupported(
- "CompactionFilter::IgnoreSnapshots() = false is not supported "
- "anymore.");
- }
- return Status::OK();
- }
- void CompactionJob::InitializeReadOptionsAndBoundaries(
- const size_t ts_sz, ReadOptions& read_options,
- SubcompactionKeyBoundaries& boundaries) {
- read_options.verify_checksums = true;
- read_options.fill_cache = false;
- read_options.rate_limiter_priority = GetRateLimiterPriority();
- read_options.io_activity = Env::IOActivity::kCompaction;
- // Compaction iterators shouldn't be confined to a single prefix.
- // Compactions use Seek() for
- // (a) concurrent compactions,
- // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
- read_options.total_order_seek = true;
- // Remove the timestamps from boundaries because boundaries created in
- // GenSubcompactionBoundaries doesn't strip away the timestamp.
- if (boundaries.start.has_value()) {
- read_options.iterate_lower_bound = &(*boundaries.start);
- if (ts_sz > 0) {
- boundaries.start_without_ts =
- StripTimestampFromUserKey(*boundaries.start, ts_sz);
- read_options.iterate_lower_bound = &(*boundaries.start_without_ts);
- }
- }
- if (boundaries.end.has_value()) {
- read_options.iterate_upper_bound = &(*boundaries.end);
- if (ts_sz > 0) {
- boundaries.end_without_ts =
- StripTimestampFromUserKey(*boundaries.end, ts_sz);
- read_options.iterate_upper_bound = &(*boundaries.end_without_ts);
- }
- }
- if (ts_sz > 0) {
- if (ts_sz <= strlen(boundaries.kMaxTs)) {
- boundaries.ts_slice = Slice(boundaries.kMaxTs, ts_sz);
- } else {
- boundaries.max_ts = std::string(ts_sz, '\xff');
- boundaries.ts_slice = Slice(boundaries.max_ts);
- }
- }
- if (boundaries.start.has_value()) {
- boundaries.start_ikey.SetInternalKey(*boundaries.start, kMaxSequenceNumber,
- kValueTypeForSeek);
- if (ts_sz > 0) {
- boundaries.start_ikey.UpdateInternalKey(
- kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
- }
- boundaries.start_internal_key = boundaries.start_ikey.GetInternalKey();
- boundaries.start_user_key = boundaries.start_ikey.GetUserKey();
- }
- if (boundaries.end.has_value()) {
- boundaries.end_ikey.SetInternalKey(*boundaries.end, kMaxSequenceNumber,
- kValueTypeForSeek);
- if (ts_sz > 0) {
- boundaries.end_ikey.UpdateInternalKey(
- kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
- }
- boundaries.end_internal_key = boundaries.end_ikey.GetInternalKey();
- boundaries.end_user_key = boundaries.end_ikey.GetUserKey();
- }
- }
- InternalIterator* CompactionJob::CreateInputIterator(
- SubcompactionState* sub_compact, ColumnFamilyData* cfd,
- SubcompactionInternalIterators& iterators,
- SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options) {
- const size_t ts_sz = cfd->user_comparator()->timestamp_size();
- InitializeReadOptionsAndBoundaries(ts_sz, read_options, boundaries);
- // This is assigned after creation of SubcompactionState to simplify that
- // creation across both CompactionJob and CompactionServiceCompactionJob
- sub_compact->AssignRangeDelAggregator(
- std::make_unique<CompactionRangeDelAggregator>(
- &cfd->internal_comparator(), job_context_->snapshot_seqs,
- &full_history_ts_low_, &trim_ts_));
- // Although the v2 aggregator is what the level iterator(s) know about,
- // the AddTombstones calls will be propagated down to the v1 aggregator.
- iterators.raw_input =
- std::unique_ptr<InternalIterator>(versions_->MakeInputIterator(
- read_options, sub_compact->compaction, sub_compact->RangeDelAgg(),
- file_options_for_read_, boundaries.start, boundaries.end));
- InternalIterator* input = iterators.raw_input.get();
- if (boundaries.start.has_value() || boundaries.end.has_value()) {
- iterators.clip = std::make_unique<ClippingIterator>(
- iterators.raw_input.get(),
- boundaries.start.has_value() ? &boundaries.start_internal_key : nullptr,
- boundaries.end.has_value() ? &boundaries.end_internal_key : nullptr,
- &cfd->internal_comparator());
- input = iterators.clip.get();
- }
- if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
- BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
- iterators.blob_counter =
- std::make_unique<BlobCountingIterator>(input, meter);
- input = iterators.blob_counter.get();
- }
- if (ts_sz > 0 && !trim_ts_.empty()) {
- iterators.trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
- input, cfd->user_comparator(), trim_ts_);
- input = iterators.trim_history_iter.get();
- }
- return input;
- }
- void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
- ColumnFamilyData* cfd,
- BlobFileResources& blob_resources,
- const WriteOptions& write_options) {
- const auto& mutable_cf_options =
- sub_compact->compaction->mutable_cf_options();
- // TODO: BlobDB to support output_to_proximal_level compaction, which needs
- // 2 builders, so may need to move to `CompactionOutputs`
- if (mutable_cf_options.enable_blob_files &&
- sub_compact->compaction->output_level() >=
- mutable_cf_options.blob_file_starting_level) {
- blob_resources.blob_file_builder = std::make_unique<BlobFileBuilder>(
- versions_, fs_.get(), &sub_compact->compaction->immutable_options(),
- &mutable_cf_options, &file_options_, &write_options, db_id_,
- db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_,
- io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
- &blob_resources.blob_file_paths,
- sub_compact->Current().GetBlobFileAdditionsPtr());
- } else {
- blob_resources.blob_file_builder = nullptr;
- }
- }
- std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
- SubcompactionState* sub_compact, ColumnFamilyData* cfd,
- InternalIterator* input, const CompactionFilter* compaction_filter,
- MergeHelper& merge, BlobFileResources& blob_resources,
- const WriteOptions& write_options) {
- CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options);
- const std::string* const full_history_ts_low =
- full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
- assert(job_context_);
- return std::make_unique<CompactionIterator>(
- input, cfd->user_comparator(), &merge, versions_->LastSequence(),
- &(job_context_->snapshot_seqs), earliest_snapshot_,
- job_context_->earliest_write_conflict_snapshot,
- job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
- env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(),
- blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors,
- db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
- sub_compact->compaction
- ->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
- sub_compact->compaction, compaction_filter, shutting_down_,
- db_options_.info_log, full_history_ts_low, preserve_seqno_after_);
- }
- std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc>
- CompactionJob::CreateFileHandlers(SubcompactionState* sub_compact,
- SubcompactionKeyBoundaries& boundaries) {
- const CompactionFileOpenFunc open_file_func =
- [this, sub_compact](CompactionOutputs& outputs) {
- return this->OpenCompactionOutputFile(sub_compact, outputs);
- };
- const Slice* start_user_key =
- sub_compact->start.has_value() ? &boundaries.start_user_key : nullptr;
- const Slice* end_user_key =
- sub_compact->end.has_value() ? &boundaries.end_user_key : nullptr;
- const CompactionFileCloseFunc close_file_func =
- [this, sub_compact, start_user_key, end_user_key](
- const Status& status,
- const ParsedInternalKey& prev_table_last_internal_key,
- const Slice& next_table_min_key, const CompactionIterator* c_iter,
- CompactionOutputs& outputs) {
- return this->FinishCompactionOutputFile(
- status, prev_table_last_internal_key, next_table_min_key,
- start_user_key, end_user_key, c_iter, sub_compact, outputs);
- };
- return {open_file_func, close_file_func};
- }
- Status CompactionJob::ProcessKeyValue(
- SubcompactionState* sub_compact, ColumnFamilyData* cfd,
- CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func,
- const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) {
- Status status;
- const uint64_t kRecordStatsEvery = 1000;
- [[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;
- IterKey last_output_key;
- ParsedInternalKey last_output_ikey;
- TEST_SYNC_POINT_CALLBACK(
- "CompactionJob::ProcessKeyValueCompaction()::Processing",
- static_cast<void*>(const_cast<Compaction*>(sub_compact->compaction)));
- while (status.ok() && !cfd->IsDropped() && c_iter->Valid() &&
- c_iter->status().ok()) {
- assert(!end.has_value() ||
- cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
- if (c_iter->iter_stats().num_input_records % kRecordStatsEvery ==
- kRecordStatsEvery - 1) {
- UpdateSubcompactionJobStatsIncrementally(
- c_iter, &sub_compact->compaction_job_stats,
- db_options_.clock->CPUMicros(), prev_cpu_micros);
- }
- const auto& ikey = c_iter->ikey();
- bool use_proximal_output = ikey.sequence > proximal_after_seqno_;
- #ifndef NDEBUG
- if (sub_compact->compaction->SupportsPerKeyPlacement()) {
- PerKeyPlacementContext context(sub_compact->compaction->output_level(),
- ikey.user_key, c_iter->value(),
- ikey.sequence, use_proximal_output);
- TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
- &context);
- if (use_proximal_output) {
- // Verify that entries sent to the proximal level are within the
- // allowed range (because the input key range of the last level could
- // be larger than the allowed output key range of the proximal
- // level). This check uses user keys (ignores sequence numbers) because
- // compaction boundaries are a "clean cut" between user keys (see
- // CompactionPicker::ExpandInputsToCleanCut()), which is especially
- // important when preferred sequence numbers has been swapped in for
- // kTypeValuePreferredSeqno / TimedPut.
- sub_compact->compaction->TEST_AssertWithinProximalLevelOutputRange(
- c_iter->user_key());
- }
- } else {
- assert(proximal_after_seqno_ == kMaxSequenceNumber);
- assert(!use_proximal_output);
- }
- #endif // NDEBUG
- // Add current compaction_iterator key to target compaction output, if the
- // output file needs to be close or open, it will call the `open_file_func`
- // and `close_file_func`.
- // TODO: it would be better to have the compaction file open/close moved
- // into `CompactionOutputs` which has the output file information.
- status =
- sub_compact->AddToOutput(*c_iter, use_proximal_output, open_file_func,
- close_file_func, last_output_ikey);
- if (!status.ok()) {
- break;
- }
- TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2",
- static_cast<void*>(const_cast<std::atomic<bool>*>(
- &manual_compaction_canceled_)));
- last_output_key.SetInternalKey(c_iter->key(), &last_output_ikey);
- last_output_ikey.sequence = ikey.sequence;
- last_output_ikey.type = ikey.type;
- c_iter->Next();
- #ifndef NDEBUG
- bool stop = false;
- TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop",
- static_cast<void*>(&stop));
- if (stop) {
- break;
- }
- #endif // NDEBUG
- }
- return status;
- }
- void CompactionJob::UpdateSubcompactionJobStatsIncrementally(
- CompactionIterator* c_iter, CompactionJobStats* compaction_job_stats,
- uint64_t cur_cpu_micros, uint64_t& prev_cpu_micros) {
- RecordDroppedKeys(c_iter->iter_stats(), compaction_job_stats);
- c_iter->ResetRecordCounts();
- RecordCompactionIOStats();
- assert(cur_cpu_micros >= prev_cpu_micros);
- RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME,
- cur_cpu_micros - prev_cpu_micros);
- prev_cpu_micros = cur_cpu_micros;
- }
- void CompactionJob::FinalizeSubcompactionJobStats(
- SubcompactionState* sub_compact, CompactionIterator* c_iter,
- uint64_t start_cpu_micros, uint64_t prev_cpu_micros,
- const CompactionIOStatsSnapshot& io_stats) {
- const CompactionIterationStats& c_iter_stats = c_iter->iter_stats();
- assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() ||
- c_iter->HasNumInputEntryScanned());
- sub_compact->compaction_job_stats.has_accurate_num_input_records &=
- c_iter->HasNumInputEntryScanned();
- sub_compact->compaction_job_stats.num_input_records +=
- c_iter->NumInputEntryScanned();
- sub_compact->compaction_job_stats.num_blobs_read =
- c_iter_stats.num_blobs_read;
- sub_compact->compaction_job_stats.total_blob_bytes_read =
- c_iter_stats.total_blob_bytes_read;
- sub_compact->compaction_job_stats.num_input_deletion_records =
- c_iter_stats.num_input_deletion_records;
- sub_compact->compaction_job_stats.num_corrupt_keys =
- c_iter_stats.num_input_corrupt_records;
- sub_compact->compaction_job_stats.num_single_del_fallthru =
- c_iter_stats.num_single_del_fallthru;
- sub_compact->compaction_job_stats.num_single_del_mismatch =
- c_iter_stats.num_single_del_mismatch;
- sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
- c_iter_stats.total_input_raw_key_bytes;
- sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
- c_iter_stats.total_input_raw_value_bytes;
- RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
- c_iter_stats.total_filter_time);
- if (c_iter_stats.num_blobs_relocated > 0) {
- RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
- c_iter_stats.num_blobs_relocated);
- }
- if (c_iter_stats.total_blob_bytes_relocated > 0) {
- RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
- c_iter_stats.total_blob_bytes_relocated);
- }
- uint64_t cur_cpu_micros = db_options_.clock->CPUMicros();
- // Record final compaction statistics including dropped keys, I/O stats,
- // and CPU time delta from the last periodic measurement
- UpdateSubcompactionJobStatsIncrementally(c_iter,
- &sub_compact->compaction_job_stats,
- cur_cpu_micros, prev_cpu_micros);
- // Finalize timing and I/O statistics
- sub_compact->compaction_job_stats.cpu_micros =
- cur_cpu_micros - start_cpu_micros + sub_compact->GetWorkerCPUMicros();
- if (measure_io_stats_) {
- sub_compact->compaction_job_stats.file_write_nanos +=
- IOSTATS(write_nanos) - io_stats.prev_write_nanos;
- sub_compact->compaction_job_stats.file_fsync_nanos +=
- IOSTATS(fsync_nanos) - io_stats.prev_fsync_nanos;
- sub_compact->compaction_job_stats.file_range_sync_nanos +=
- IOSTATS(range_sync_nanos) - io_stats.prev_range_sync_nanos;
- sub_compact->compaction_job_stats.file_prepare_write_nanos +=
- IOSTATS(prepare_write_nanos) - io_stats.prev_prepare_write_nanos;
- sub_compact->compaction_job_stats.cpu_micros -=
- (IOSTATS(cpu_write_nanos) - io_stats.prev_cpu_write_nanos +
- IOSTATS(cpu_read_nanos) - io_stats.prev_cpu_read_nanos) /
- 1000;
- if (io_stats.prev_perf_level !=
- PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
- SetPerfLevel(io_stats.prev_perf_level);
- }
- }
- }
- Status CompactionJob::FinalizeProcessKeyValueStatus(
- ColumnFamilyData* cfd, InternalIterator* input_iter,
- CompactionIterator* c_iter, Status status) {
- if (status.ok() && cfd->IsDropped()) {
- status =
- Status::ColumnFamilyDropped("Column family dropped during compaction");
- }
- if (status.ok() && shutting_down_->load(std::memory_order_relaxed)) {
- status = Status::ShutdownInProgress("Database shutdown");
- }
- if (status.ok() &&
- (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
- status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
- }
- if (status.ok()) {
- status = input_iter->status();
- }
- if (status.ok()) {
- status = c_iter->status();
- }
- return status;
- }
- Status CompactionJob::CleanupCompactionFiles(
- SubcompactionState* sub_compact, Status status,
- const CompactionFileOpenFunc& open_file_func,
- const CompactionFileCloseFunc& close_file_func) {
- // Call FinishCompactionOutputFile() even if status is not ok: it needs to
- // close the output files. Open file function is also passed, in case there's
- // only range-dels, no file was opened, to save the range-dels, it need to
- // create a new output file.
- return sub_compact->CloseCompactionFiles(status, open_file_func,
- close_file_func);
- }
- Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact,
- BlobFileBuilder* blob_file_builder,
- Status status) {
- if (blob_file_builder) {
- if (status.ok()) {
- status = blob_file_builder->Finish();
- } else {
- blob_file_builder->Abandon(status);
- }
- sub_compact->Current().UpdateBlobStats();
- }
- return status;
- }
- void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
- assert(sub_compact);
- assert(sub_compact->compaction);
- if (!ShouldUseLocalCompaction(sub_compact)) {
- return;
- }
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
- const uint64_t start_cpu_micros = db_options_.clock->CPUMicros();
- uint64_t prev_cpu_micros = start_cpu_micros;
- const CompactionIOStatsSnapshot io_stats = InitializeIOStats();
- ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- const CompactionFilter* compaction_filter;
- std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
- Status filter_status = SetupAndValidateCompactionFilter(
- sub_compact, cfd->ioptions().compaction_filter, compaction_filter,
- compaction_filter_from_factory);
- if (!filter_status.ok()) {
- sub_compact->status = filter_status;
- return;
- }
- NotifyOnSubcompactionBegin(sub_compact);
- SubcompactionKeyBoundaries boundaries(sub_compact->start, sub_compact->end);
- SubcompactionInternalIterators iterators;
- ReadOptions read_options;
- const WriteOptions write_options(Env::IOPriority::IO_LOW,
- Env::IOActivity::kCompaction);
- InternalIterator* input_iter = CreateInputIterator(
- sub_compact, cfd, iterators, boundaries, read_options);
- assert(input_iter);
- Status status =
- MaybeResumeSubcompactionProgressOnInputIterator(sub_compact, input_iter);
- if (status.IsNotFound()) {
- input_iter->SeekToFirst();
- } else if (!status.ok()) {
- sub_compact->status = status;
- return;
- }
- MergeHelper merge(
- env_, cfd->user_comparator(), cfd->ioptions().merge_operator.get(),
- compaction_filter, db_options_.info_log.get(),
- false /* internal key corruption is expected */,
- job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
- compact_->compaction->level(), db_options_.stats);
- BlobFileResources blob_resources;
- auto c_iter =
- CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter,
- merge, blob_resources, write_options);
- assert(c_iter);
- c_iter->SeekToFirst();
- TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
- TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1",
- static_cast<void*>(const_cast<std::atomic<bool>*>(
- &manual_compaction_canceled_)));
- auto [open_file_func, close_file_func] =
- CreateFileHandlers(sub_compact, boundaries);
- status = ProcessKeyValue(sub_compact, cfd, c_iter.get(), open_file_func,
- close_file_func, prev_cpu_micros);
- status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status);
- FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func,
- blob_resources.blob_file_builder.get(), c_iter.get(),
- input_iter, start_cpu_micros, prev_cpu_micros,
- io_stats);
- NotifyOnSubcompactionCompleted(sub_compact);
- }
- void CompactionJob::FinalizeSubcompaction(
- SubcompactionState* sub_compact, Status status,
- const CompactionFileOpenFunc& open_file_func,
- const CompactionFileCloseFunc& close_file_func,
- BlobFileBuilder* blob_file_builder, CompactionIterator* c_iter,
- [[maybe_unused]] InternalIterator* input_iter, uint64_t start_cpu_micros,
- uint64_t prev_cpu_micros, const CompactionIOStatsSnapshot& io_stats) {
- status = CleanupCompactionFiles(sub_compact, status, open_file_func,
- close_file_func);
- status = FinalizeBlobFiles(sub_compact, blob_file_builder, status);
- FinalizeSubcompactionJobStats(sub_compact, c_iter, start_cpu_micros,
- prev_cpu_micros, io_stats);
- #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
- if (!status.ok()) {
- if (c_iter) {
- c_iter->status().PermitUncheckedError();
- }
- if (input_iter) {
- input_iter->status().PermitUncheckedError();
- }
- }
- #endif // ROCKSDB_ASSERT_STATUS_CHECKED
- sub_compact->status = status;
- }
- uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
- return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
- }
- void CompactionJob::RecordDroppedKeys(
- const CompactionIterationStats& c_iter_stats,
- CompactionJobStats* compaction_job_stats) {
- if (c_iter_stats.num_record_drop_user > 0) {
- RecordTick(stats_, COMPACTION_KEY_DROP_USER,
- c_iter_stats.num_record_drop_user);
- }
- if (c_iter_stats.num_record_drop_hidden > 0) {
- RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
- c_iter_stats.num_record_drop_hidden);
- if (compaction_job_stats) {
- compaction_job_stats->num_records_replaced +=
- c_iter_stats.num_record_drop_hidden;
- }
- }
- if (c_iter_stats.num_record_drop_obsolete > 0) {
- RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
- c_iter_stats.num_record_drop_obsolete);
- if (compaction_job_stats) {
- compaction_job_stats->num_expired_deletion_records +=
- c_iter_stats.num_record_drop_obsolete;
- }
- }
- if (c_iter_stats.num_record_drop_range_del > 0) {
- RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
- c_iter_stats.num_record_drop_range_del);
- }
- if (c_iter_stats.num_range_del_drop_obsolete > 0) {
- RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
- c_iter_stats.num_range_del_drop_obsolete);
- }
- if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
- RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
- c_iter_stats.num_optimized_del_drop_obsolete);
- }
- }
- Status CompactionJob::FinishCompactionOutputFile(
- const Status& input_status,
- const ParsedInternalKey& prev_table_last_internal_key,
- const Slice& next_table_min_key, const Slice* comp_start_user_key,
- const Slice* comp_end_user_key, const CompactionIterator* c_iter,
- SubcompactionState* sub_compact, CompactionOutputs& outputs) {
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
- assert(sub_compact != nullptr);
- assert(outputs.HasBuilder());
- FileMetaData* meta = outputs.GetMetaData();
- uint64_t output_number = meta->fd.GetNumber();
- assert(output_number != 0);
- ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- std::string file_checksum = kUnknownFileChecksum;
- std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
- // Check for iterator errors
- Status s = input_status;
- // Add range tombstones
- if (s.ok()) {
- // Inclusive lower bound, exclusive upper bound
- std::pair<SequenceNumber, SequenceNumber> keep_seqno_range{
- 0, kMaxSequenceNumber};
- if (sub_compact->compaction->SupportsPerKeyPlacement()) {
- if (outputs.IsProximalLevel()) {
- keep_seqno_range.first = proximal_after_seqno_;
- } else {
- keep_seqno_range.second = proximal_after_seqno_;
- }
- }
- CompactionIterationStats range_del_out_stats;
- // NOTE1: Use `bottommost_level_ = true` for both bottommost and
- // output_to_proximal_level compaction here, as it's only used to decide
- // if range dels could be dropped. (Logically, we are taking a single sorted
- // run returned from CompactionIterator and physically splitting it between
- // two output levels.)
- // NOTE2: with per-key placement, range tombstones will be filtered on
- // each output level based on sequence number (traversed twice). This is
- // CPU-inefficient for a large number of range tombstones, but that would
- // be an unusual work load.
- if (sub_compact->HasRangeDel()) {
- s = outputs.AddRangeDels(*sub_compact->RangeDelAgg(), comp_start_user_key,
- comp_end_user_key, range_del_out_stats,
- bottommost_level_, cfd->internal_comparator(),
- earliest_snapshot_, keep_seqno_range,
- next_table_min_key, full_history_ts_low_);
- }
- RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
- TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
- }
- const uint64_t current_entries = outputs.NumEntries();
- s = outputs.Finish(s, seqno_to_time_mapping_);
- TEST_SYNC_POINT_CALLBACK(
- "CompactionJob::FinishCompactionOutputFile()::AfterFinish", &s);
- if (s.ok()) {
- // With accurate smallest and largest key, we can get a slightly more
- // accurate oldest ancester time.
- // This makes oldest ancester time in manifest more accurate than in
- // table properties. Not sure how to resolve it.
- if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
- uint64_t refined_oldest_ancester_time;
- Slice new_smallest = meta->smallest.user_key();
- Slice new_largest = meta->largest.user_key();
- if (!new_largest.empty() && !new_smallest.empty()) {
- refined_oldest_ancester_time =
- sub_compact->compaction->MinInputFileOldestAncesterTime(
- &(meta->smallest), &(meta->largest));
- if (refined_oldest_ancester_time !=
- std::numeric_limits<uint64_t>::max()) {
- meta->oldest_ancester_time = refined_oldest_ancester_time;
- }
- }
- }
- }
- // Finish and check for file errors
- IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
- db_options_.use_fsync);
- if (s.ok() && io_s.ok()) {
- file_checksum = meta->file_checksum;
- file_checksum_func_name = meta->file_checksum_func_name;
- }
- if (s.ok()) {
- s = io_s;
- }
- if (sub_compact->io_status.ok()) {
- sub_compact->io_status = io_s;
- // Since this error is really a copy of the
- // "normal" status, it does not also need to be checked
- sub_compact->io_status.PermitUncheckedError();
- }
- TableProperties tp;
- if (s.ok()) {
- tp = outputs.GetTableProperties();
- }
- if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
- // If there is nothing to output, no necessary to generate a sst file.
- // This happens when the output level is bottom level, at the same time
- // the sub_compact output nothing.
- std::string fname = GetTableFileName(meta->fd.GetNumber());
- // TODO(AR) it is not clear if there are any larger implications if
- // DeleteFile fails here
- Status ds = env_->DeleteFile(fname);
- if (!ds.ok()) {
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
- " at bottom level%s",
- cfd->GetName().c_str(), job_id_, output_number,
- meta->marked_for_compaction ? " (need compaction)" : "");
- }
- // Also need to remove the file from outputs, or it will be added to the
- // VersionEdit.
- outputs.RemoveLastOutput();
- meta = nullptr;
- }
- if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
- // Output to event logger and fire events.
- outputs.UpdateTableProperties();
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
- " keys, %" PRIu64 " bytes%s, temperature: %s",
- cfd->GetName().c_str(), job_id_, output_number,
- current_entries, meta->fd.file_size,
- meta->marked_for_compaction ? " (need compaction)" : "",
- temperature_to_string[meta->temperature].c_str());
- }
- std::string fname;
- FileDescriptor output_fd;
- uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
- Status status_for_listener = s;
- if (meta != nullptr) {
- fname = GetTableFileName(meta->fd.GetNumber());
- output_fd = meta->fd;
- oldest_blob_file_number = meta->oldest_blob_file_number;
- } else {
- fname = "(nil)";
- if (s.ok()) {
- status_for_listener = Status::Aborted("Empty SST file not kept");
- }
- }
- EventHelpers::LogAndNotifyTableFileCreationFinished(
- event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), fname,
- job_id_, output_fd, oldest_blob_file_number, tp,
- TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
- file_checksum_func_name);
- // Report new file to SstFileManagerImpl
- auto sfm =
- static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
- if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
- Status add_s = sfm->OnAddFile(fname);
- if (!add_s.ok() && s.ok()) {
- s = add_s;
- }
- if (sfm->IsMaxAllowedSpaceReached()) {
- // TODO(ajkr): should we return OK() if max space was reached by the final
- // compaction output file (similarly to how flush works when full)?
- s = Status::SpaceLimit("Max allowed space was reached");
- TEST_SYNC_POINT(
- "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
- InstrumentedMutexLock l(db_mutex_);
- db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
- }
- }
- if (s.ok() && ShouldUpdateSubcompactionProgress(sub_compact, c_iter,
- prev_table_last_internal_key,
- next_table_min_key, meta)) {
- UpdateSubcompactionProgress(c_iter, next_table_min_key, sub_compact);
- s = PersistSubcompactionProgress(sub_compact);
- }
- outputs.ResetBuilder();
- return s;
- }
- bool CompactionJob::ShouldUpdateSubcompactionProgress(
- const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
- const ParsedInternalKey& prev_table_last_internal_key,
- const Slice& next_table_min_internal_key, const FileMetaData* meta) const {
- const auto* cfd = sub_compact->compaction->column_family_data();
- // No need to update when the output will not get persisted
- if (compaction_progress_writer_ == nullptr) {
- return false;
- }
- // No need to update for a new empty output
- if (meta == nullptr) {
- return false;
- }
- // TODO(hx235): save progress even on the last output file
- if (next_table_min_internal_key.empty()) {
- return false;
- }
- // LIMITATION: Persisting compaction progress with timestamp
- // is not supported since the feature of persisting timestamp of the key in
- // SST files itself is still experimental
- size_t ts_sz = cfd->user_comparator()->timestamp_size();
- if (ts_sz > 0) {
- return false;
- }
- // LIMITATION: Compaction progress persistence disabled for file boundaries
- // contaning range deletions. Range deletions can span file boundaries, making
- // it difficult (but possible) to ensure adjacent output tables have different
- // user keys. See the last check for why different users keys of adjacent
- // output tables are needed
- const ValueType next_table_min_internal_key_type =
- ExtractValueType(next_table_min_internal_key);
- const ValueType prev_table_last_internal_key_type =
- prev_table_last_internal_key.user_key.empty()
- ? ValueType::kTypeValue
- : prev_table_last_internal_key.type;
- if (next_table_min_internal_key_type == ValueType::kTypeRangeDeletion ||
- prev_table_last_internal_key_type == ValueType::kTypeRangeDeletion) {
- return false;
- }
- // LIMITATION: Compaction progress persistence disabled when adjacent output
- // tables share the same user key at boundaries. This ensures a simple Seek()
- // of the next key when resuming can process all versions of a user key
- const Slice next_table_min_user_key =
- ExtractUserKey(next_table_min_internal_key);
- const Slice prev_table_last_user_key =
- prev_table_last_internal_key.user_key.empty()
- ? Slice()
- : prev_table_last_internal_key.user_key;
- if (cfd->user_comparator()->EqualWithoutTimestamp(next_table_min_user_key,
- prev_table_last_user_key)) {
- return false;
- }
- // LIMITATION: Don't save progress if the current key has already been scanned
- // (looked ahead) in the input but not yet output. This can happen with merge
- // operations, single deletes, and deletes at the bottommost level where
- // CompactionIterator needs to look ahead to process multiple entries for the
- // same user key before outputting a result. If we saved progress and resumed
- // at this boundary, the resumed session would see and process the same input
- // key again through Seek(), leading to incorrect double-counting in
- // number of processed input entries and input count verification failure
- //
- // TODO(hx235): Offset num_processed_input_records to avoid double counting
- // instead of disabling progress persistence.
- if (c_iter->IsCurrentKeyAlreadyScanned()) {
- return false;
- }
- return true;
- }
- Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
- assert(compact_);
- db_mutex_->AssertHeld();
- const ReadOptions read_options(Env::IOActivity::kCompaction);
- const WriteOptions write_options(Env::IOActivity::kCompaction);
- auto* compaction = compact_->compaction;
- assert(compaction);
- {
- Compaction::InputLevelSummaryBuffer inputs_summary;
- if (internal_stats_.has_proximal_level_output) {
- ROCKS_LOG_BUFFER(
- log_buffer_,
- "[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64
- " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- compaction->InputLevelSummary(&inputs_summary),
- internal_stats_.proximal_level_stats.bytes_written,
- internal_stats_.output_level_stats.bytes_written,
- internal_stats_.TotalBytesWritten());
- } else {
- ROCKS_LOG_BUFFER(log_buffer_,
- "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
- compaction->column_family_data()->GetName().c_str(),
- job_id_, compaction->InputLevelSummary(&inputs_summary),
- internal_stats_.TotalBytesWritten());
- }
- }
- VersionEdit* const edit = compaction->edit();
- assert(edit);
- // Add compaction inputs
- compaction->AddInputDeletions(edit);
- std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
- for (const auto& sub_compact : compact_->sub_compact_states) {
- sub_compact.AddOutputsEdit(edit);
- for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
- edit->AddBlobFile(blob);
- }
- if (sub_compact.Current().GetBlobGarbageMeter()) {
- const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
- for (const auto& pair : flows) {
- const uint64_t blob_file_number = pair.first;
- const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
- assert(flow.IsValid());
- if (flow.HasGarbage()) {
- blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
- flow.GetGarbageBytes());
- }
- }
- }
- }
- for (const auto& pair : blob_total_garbage) {
- const uint64_t blob_file_number = pair.first;
- const BlobGarbageMeter::BlobStats& stats = pair.second;
- edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
- stats.GetBytes());
- }
- if ((compaction->compaction_reason() ==
- CompactionReason::kLevelMaxLevelSize ||
- compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
- compaction->immutable_options().compaction_pri == kRoundRobin) {
- int start_level = compaction->start_level();
- if (start_level > 0) {
- auto vstorage = compaction->input_version()->storage_info();
- edit->AddCompactCursor(start_level,
- vstorage->GetNextCompactCursor(
- start_level, compaction->num_input_files(0)));
- }
- }
- auto manifest_wcb = [&compaction, &compaction_released](const Status& s) {
- compaction->ReleaseCompactionFiles(s);
- *compaction_released = true;
- };
- return versions_->LogAndApply(compaction->column_family_data(), read_options,
- write_options, edit, db_mutex_, db_directory_,
- /*new_descriptor_log=*/false,
- /*column_family_options=*/nullptr,
- manifest_wcb);
- }
- void CompactionJob::RecordCompactionIOStats() {
- RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
- RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
- CompactionReason compaction_reason =
- compact_->compaction->compaction_reason();
- if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
- RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
- RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
- } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
- RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
- RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
- } else if (compaction_reason == CompactionReason::kTtl) {
- RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
- RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
- }
- ThreadStatusUtil::IncreaseThreadOperationProperty(
- ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
- IOSTATS_RESET(bytes_read);
- ThreadStatusUtil::IncreaseThreadOperationProperty(
- ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
- IOSTATS_RESET(bytes_written);
- }
- Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
- CompactionOutputs& outputs) {
- assert(sub_compact != nullptr);
- // no need to lock because VersionSet::next_file_number_ is atomic
- uint64_t file_number = versions_->NewFileNumber();
- #ifndef NDEBUG
- TEST_SYNC_POINT_CALLBACK(
- "CompactionJob::OpenCompactionOutputFile::NewFileNumber", &file_number);
- #endif
- std::string fname = GetTableFileName(file_number);
- // Fire events.
- ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- EventHelpers::NotifyTableFileCreationStarted(
- cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, job_id_,
- TableFileCreationReason::kCompaction);
- // Make the output file
- std::unique_ptr<FSWritableFile> writable_file;
- #ifndef NDEBUG
- bool syncpoint_arg = file_options_.use_direct_writes;
- TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
- &syncpoint_arg);
- #endif
- // Pass temperature of the last level files to FileSystem.
- FileOptions fo_copy = file_options_;
- auto temperature =
- sub_compact->compaction->GetOutputTemperature(outputs.IsProximalLevel());
- fo_copy.temperature = temperature;
- fo_copy.write_hint = write_hint_;
- Status s;
- IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
- s = io_s;
- if (sub_compact->io_status.ok()) {
- sub_compact->io_status = io_s;
- // Since this error is really a copy of the io_s that is checked below as s,
- // it does not also need to be checked.
- sub_compact->io_status.PermitUncheckedError();
- }
- if (!s.ok()) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
- " fails at NewWritableFile with status %s",
- sub_compact->compaction->column_family_data()->GetName().c_str(),
- job_id_, file_number, s.ToString().c_str());
- LogFlush(db_options_.info_log);
- EventHelpers::LogAndNotifyTableFileCreationFinished(
- event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(),
- fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
- TableProperties(), TableFileCreationReason::kCompaction, s,
- kUnknownFileChecksum, kUnknownFileChecksumFuncName);
- return s;
- }
- // Try to figure out the output file's oldest ancester time.
- int64_t temp_current_time = 0;
- auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
- // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
- if (!get_time_status.ok()) {
- ROCKS_LOG_WARN(db_options_.info_log,
- "Failed to get current time. Status: %s",
- get_time_status.ToString().c_str());
- }
- uint64_t current_time = static_cast<uint64_t>(temp_current_time);
- InternalKey tmp_start, tmp_end;
- if (sub_compact->start.has_value()) {
- tmp_start.SetMinPossibleForUserKey(*(sub_compact->start));
- }
- if (sub_compact->end.has_value()) {
- tmp_end.SetMinPossibleForUserKey(*(sub_compact->end));
- }
- uint64_t oldest_ancester_time =
- sub_compact->compaction->MinInputFileOldestAncesterTime(
- sub_compact->start.has_value() ? &tmp_start : nullptr,
- sub_compact->end.has_value() ? &tmp_end : nullptr);
- if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
- // TODO: fix DBSSTTest.GetTotalSstFilesSize and use
- // kUnknownOldestAncesterTime
- oldest_ancester_time = current_time;
- }
- uint64_t newest_key_time = sub_compact->compaction->MaxInputFileNewestKeyTime(
- sub_compact->start.has_value() ? &tmp_start : nullptr,
- sub_compact->end.has_value() ? &tmp_end : nullptr);
- // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
- uint64_t epoch_number = sub_compact->compaction->MinInputFileEpochNumber();
- {
- FileMetaData meta;
- meta.fd = FileDescriptor(file_number,
- sub_compact->compaction->output_path_id(), 0);
- meta.oldest_ancester_time = oldest_ancester_time;
- meta.file_creation_time = current_time;
- meta.epoch_number = epoch_number;
- meta.temperature = temperature;
- assert(!db_id_.empty());
- assert(!db_session_id_.empty());
- s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
- &meta.unique_id);
- if (!s.ok()) {
- ROCKS_LOG_ERROR(db_options_.info_log,
- "[%s] [JOB %d] file #%" PRIu64
- " failed to generate unique id: %s.",
- cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
- s.ToString().c_str());
- return s;
- }
- outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
- paranoid_file_checks_);
- }
- writable_file->SetIOPriority(GetRateLimiterPriority());
- // Subsequent attempts to override the hint via SetWriteLifeTimeHint
- // with the very same value will be ignored by the fs.
- writable_file->SetWriteLifeTimeHint(fo_copy.write_hint);
- FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
- writable_file->SetPreallocationBlockSize(static_cast<size_t>(
- sub_compact->compaction->OutputFilePreallocationSize()));
- const auto& listeners =
- sub_compact->compaction->immutable_options().listeners;
- outputs.AssignFileWriter(new WritableFileWriter(
- std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
- db_options_.stats, Histograms::SST_WRITE_MICROS, listeners,
- db_options_.file_checksum_gen_factory.get(),
- tmp_set.Contains(FileType::kTableFile), false));
- // TODO(hx235): pass in the correct `oldest_key_time` instead of `0`
- const ReadOptions read_options(Env::IOActivity::kCompaction);
- const WriteOptions write_options(Env::IOActivity::kCompaction);
- TableBuilderOptions tboptions(
- cfd->ioptions(), sub_compact->compaction->mutable_cf_options(),
- read_options, write_options, cfd->internal_comparator(),
- cfd->internal_tbl_prop_coll_factories(),
- sub_compact->compaction->output_compression(),
- sub_compact->compaction->output_compression_opts(), cfd->GetID(),
- cfd->GetName(), sub_compact->compaction->output_level(), newest_key_time,
- bottommost_level_, TableFileCreationReason::kCompaction,
- 0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
- sub_compact->compaction->max_output_file_size(), file_number,
- proximal_after_seqno_ /*last_level_inclusive_max_seqno_threshold*/);
- outputs.NewBuilder(tboptions);
- LogFlush(db_options_.info_log);
- return s;
- }
- void CompactionJob::CleanupCompaction() {
- for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
- sub_compact.Cleanup(table_cache_.get());
- }
- delete compact_;
- compact_ = nullptr;
- }
- namespace {
- void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
- assert(prefix_length > 0);
- size_t length = src.size() > prefix_length ? prefix_length : src.size();
- dst->assign(src.data(), length);
- }
- } // namespace
- bool CompactionJob::UpdateInternalStatsFromInputFiles(
- uint64_t* num_input_range_del) {
- assert(compact_);
- Compaction* compaction = compact_->compaction;
- internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0;
- internal_stats_.output_level_stats.num_input_files_in_output_level = 0;
- bool has_error = false;
- const ReadOptions read_options(Env::IOActivity::kCompaction);
- const auto& input_table_properties = compaction->GetInputTableProperties();
- for (int input_level = 0;
- input_level < static_cast<int>(compaction->num_input_levels());
- ++input_level) {
- const LevelFilesBrief* flevel = compaction->input_levels(input_level);
- size_t num_input_files = flevel->num_files;
- uint64_t* bytes_read;
- if (compaction->level(input_level) != compaction->output_level()) {
- internal_stats_.output_level_stats.num_input_files_in_non_output_levels +=
- static_cast<int>(num_input_files);
- bytes_read =
- &internal_stats_.output_level_stats.bytes_read_non_output_levels;
- } else {
- internal_stats_.output_level_stats.num_input_files_in_output_level +=
- static_cast<int>(num_input_files);
- bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level;
- }
- for (size_t i = 0; i < num_input_files; ++i) {
- const FileMetaData* file_meta = flevel->files[i].file_metadata;
- *bytes_read += file_meta->fd.GetFileSize();
- uint64_t file_input_entries = file_meta->num_entries;
- uint64_t file_num_range_del = file_meta->num_range_deletions;
- if (file_input_entries == 0) {
- uint64_t file_number = file_meta->fd.GetNumber();
- // Try getting info from table property
- std::string fn = TableFileName(compaction->immutable_options().cf_paths,
- file_number, file_meta->fd.GetPathId());
- const auto& tp = input_table_properties.find(fn);
- if (tp != input_table_properties.end()) {
- file_input_entries = tp->second->num_entries;
- file_num_range_del = tp->second->num_range_deletions;
- } else {
- has_error = true;
- }
- }
- internal_stats_.output_level_stats.num_input_records +=
- file_input_entries;
- if (num_input_range_del) {
- *num_input_range_del += file_num_range_del;
- }
- }
- const std::vector<FileMetaData*>& filtered_flevel =
- compaction->filtered_input_levels(input_level);
- size_t num_filtered_input_files = filtered_flevel.size();
- uint64_t* bytes_skipped;
- if (compaction->level(input_level) != compaction->output_level()) {
- internal_stats_.output_level_stats
- .num_filtered_input_files_in_non_output_levels +=
- static_cast<int>(num_filtered_input_files);
- bytes_skipped =
- &internal_stats_.output_level_stats.bytes_skipped_non_output_levels;
- } else {
- internal_stats_.output_level_stats
- .num_filtered_input_files_in_output_level +=
- static_cast<int>(num_filtered_input_files);
- bytes_skipped =
- &internal_stats_.output_level_stats.bytes_skipped_output_level;
- }
- for (const FileMetaData* filtered_file_meta : filtered_flevel) {
- *bytes_skipped += filtered_file_meta->fd.GetFileSize();
- }
- }
- // TODO - find a better place to set these two
- assert(job_stats_);
- internal_stats_.output_level_stats.bytes_read_blob =
- job_stats_->total_blob_bytes_read;
- internal_stats_.output_level_stats.num_dropped_records =
- internal_stats_.DroppedRecords();
- return !has_error;
- }
- void CompactionJob::UpdateCompactionJobInputStatsFromInternalStats(
- const InternalStats::CompactionStatsFull& internal_stats,
- uint64_t num_input_range_del) const {
- assert(job_stats_);
- // input information
- job_stats_->total_input_bytes =
- internal_stats.output_level_stats.bytes_read_non_output_levels +
- internal_stats.output_level_stats.bytes_read_output_level;
- job_stats_->num_input_records =
- internal_stats.output_level_stats.num_input_records - num_input_range_del;
- job_stats_->num_input_files =
- internal_stats.output_level_stats.num_input_files_in_non_output_levels +
- internal_stats.output_level_stats.num_input_files_in_output_level;
- job_stats_->num_input_files_at_output_level =
- internal_stats.output_level_stats.num_input_files_in_output_level;
- job_stats_->num_filtered_input_files =
- internal_stats.output_level_stats
- .num_filtered_input_files_in_non_output_levels +
- internal_stats.output_level_stats
- .num_filtered_input_files_in_output_level;
- job_stats_->num_filtered_input_files_at_output_level =
- internal_stats.output_level_stats
- .num_filtered_input_files_in_output_level;
- job_stats_->total_skipped_input_bytes =
- internal_stats.output_level_stats.bytes_skipped_non_output_levels +
- internal_stats.output_level_stats.bytes_skipped_output_level;
- if (internal_stats.has_proximal_level_output) {
- job_stats_->total_input_bytes +=
- internal_stats.proximal_level_stats.bytes_read_non_output_levels +
- internal_stats.proximal_level_stats.bytes_read_output_level;
- job_stats_->num_input_records +=
- internal_stats.proximal_level_stats.num_input_records;
- job_stats_->num_input_files +=
- internal_stats.proximal_level_stats
- .num_input_files_in_non_output_levels +
- internal_stats.proximal_level_stats.num_input_files_in_output_level;
- job_stats_->num_input_files_at_output_level +=
- internal_stats.proximal_level_stats.num_input_files_in_output_level;
- job_stats_->num_filtered_input_files +=
- internal_stats.proximal_level_stats
- .num_filtered_input_files_in_non_output_levels +
- internal_stats.proximal_level_stats
- .num_filtered_input_files_in_output_level;
- job_stats_->num_filtered_input_files_at_output_level +=
- internal_stats.proximal_level_stats
- .num_filtered_input_files_in_output_level;
- job_stats_->total_skipped_input_bytes +=
- internal_stats.proximal_level_stats.bytes_skipped_non_output_levels +
- internal_stats.proximal_level_stats.bytes_skipped_output_level;
- }
- }
- void CompactionJob::UpdateCompactionJobOutputStatsFromInternalStats(
- const Status& status,
- const InternalStats::CompactionStatsFull& internal_stats) const {
- assert(job_stats_);
- job_stats_->elapsed_micros = internal_stats.output_level_stats.micros;
- job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros;
- // output information
- job_stats_->total_output_bytes =
- internal_stats.output_level_stats.bytes_written;
- job_stats_->total_output_bytes_blob =
- internal_stats.output_level_stats.bytes_written_blob;
- job_stats_->num_output_records =
- internal_stats.output_level_stats.num_output_records;
- job_stats_->num_output_files =
- internal_stats.output_level_stats.num_output_files;
- job_stats_->num_output_files_blob =
- internal_stats.output_level_stats.num_output_files_blob;
- if (internal_stats.has_proximal_level_output) {
- job_stats_->total_output_bytes +=
- internal_stats.proximal_level_stats.bytes_written;
- job_stats_->total_output_bytes_blob +=
- internal_stats.proximal_level_stats.bytes_written_blob;
- job_stats_->num_output_records +=
- internal_stats.proximal_level_stats.num_output_records;
- job_stats_->num_output_files +=
- internal_stats.proximal_level_stats.num_output_files;
- job_stats_->num_output_files_blob +=
- internal_stats.proximal_level_stats.num_output_files_blob;
- }
- if (status.ok() && job_stats_->num_output_files > 0) {
- CopyPrefix(compact_->SmallestUserKey(),
- CompactionJobStats::kMaxPrefixLength,
- &job_stats_->smallest_output_key_prefix);
- CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
- &job_stats_->largest_output_key_prefix);
- }
- }
- void CompactionJob::LogCompaction() {
- Compaction* compaction = compact_->compaction;
- ColumnFamilyData* cfd = compaction->column_family_data();
- // Let's check if anything will get logged. Don't prepare all the info if
- // we're not logging
- if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
- Compaction::InputLevelSummaryBuffer inputs_summary;
- ROCKS_LOG_INFO(
- db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
- cfd->GetName().c_str(), job_id_,
- compaction->InputLevelSummary(&inputs_summary), compaction->score());
- char scratch[2345];
- compaction->Summary(scratch, sizeof(scratch));
- ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
- cfd->GetName().c_str(), scratch);
- // build event logger report
- auto stream = event_logger_->Log();
- stream << "job" << job_id_ << "event" << "compaction_started" << "cf_name"
- << cfd->GetName() << "compaction_reason"
- << GetCompactionReasonString(compaction->compaction_reason());
- for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
- stream << ("files_L" + std::to_string(compaction->level(i)));
- stream.StartArray();
- for (auto f : *compaction->inputs(i)) {
- stream << f->fd.GetNumber();
- }
- stream.EndArray();
- }
- stream << "score" << compaction->score() << "input_data_size"
- << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
- << (job_context_->snapshot_seqs.empty()
- ? int64_t{-1} // Use -1 for "none"
- : static_cast<int64_t>(
- job_context_->GetEarliestSnapshotSequence()));
- if (compaction->SupportsPerKeyPlacement()) {
- stream << "proximal_after_seqno" << proximal_after_seqno_;
- stream << "preserve_seqno_after" << preserve_seqno_after_;
- stream << "proximal_output_level" << compaction->GetProximalLevel();
- stream << "proximal_output_range"
- << GetCompactionProximalOutputRangeTypeString(
- compaction->GetProximalOutputRangeType());
- if (compaction->GetProximalOutputRangeType() ==
- Compaction::ProximalOutputRangeType::kDisabled) {
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "[%s] [JOB %d] Proximal level output is disabled, likely "
- "because of the range conflict in the proximal level",
- cfd->GetName().c_str(), job_id_);
- }
- }
- }
- }
- std::string CompactionJob::GetTableFileName(uint64_t file_number) {
- return TableFileName(compact_->compaction->immutable_options().cf_paths,
- file_number, compact_->compaction->output_path_id());
- }
- Env::IOPriority CompactionJob::GetRateLimiterPriority() {
- if (versions_ && versions_->GetColumnFamilySet() &&
- versions_->GetColumnFamilySet()->write_controller()) {
- WriteController* write_controller =
- versions_->GetColumnFamilySet()->write_controller();
- if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
- return Env::IO_USER;
- }
- }
- return Env::IO_LOW;
- }
- Status CompactionJob::ReadTablePropertiesDirectly(
- const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
- const FileMetaData* file_meta, const ReadOptions& read_options,
- std::shared_ptr<const TableProperties>* tp) {
- std::unique_ptr<FSRandomAccessFile> file;
- std::string file_name = GetTableFileName(file_meta->fd.GetNumber());
- Status s = ioptions.fs->NewRandomAccessFile(file_name, file_options_, &file,
- nullptr /* dbg */);
- if (!s.ok()) {
- return s;
- }
- std::unique_ptr<RandomAccessFileReader> file_reader(
- new RandomAccessFileReader(
- std::move(file), file_name, ioptions.clock, io_tracer_,
- ioptions.stats, Histograms::SST_READ_MICROS /* hist_type */,
- nullptr /* file_read_hist */, ioptions.rate_limiter.get(),
- ioptions.listeners));
- std::unique_ptr<TableProperties> props;
- uint64_t magic_number = kBlockBasedTableMagicNumber;
- const auto* table_factory = moptions.table_factory.get();
- if (table_factory == nullptr) {
- return Status::Incomplete("Table factory is not set");
- } else {
- const auto& table_factory_name = table_factory->Name();
- if (table_factory_name == TableFactory::kPlainTableName()) {
- magic_number = kPlainTableMagicNumber;
- } else if (table_factory_name == TableFactory::kCuckooTableName()) {
- magic_number = kCuckooTableMagicNumber;
- }
- }
- s = ReadTableProperties(file_reader.get(), file_meta->fd.GetFileSize(),
- magic_number, ioptions, read_options, &props);
- if (!s.ok()) {
- return s;
- }
- *tp = std::move(props);
- return s;
- }
- Status CompactionJob::ReadOutputFilesTableProperties(
- const autovector<FileMetaData>& output_files,
- const ReadOptions& read_options,
- std::vector<std::shared_ptr<const TableProperties>>&
- output_files_table_properties,
- bool is_proximal_level) {
- assert(!output_files.empty());
- static const char* level_type =
- is_proximal_level ? "proximal output" : "output";
- output_files_table_properties.reserve(output_files.size());
- Status s;
- for (const FileMetaData& metadata : output_files) {
- std::shared_ptr<const TableProperties> tp;
- s = ReadTablePropertiesDirectly(compact_->compaction->immutable_options(),
- compact_->compaction->mutable_cf_options(),
- &metadata, read_options, &tp);
- if (!s.ok()) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "Failed to read table properties for %s level output file #%" PRIu64
- ": %s",
- level_type, metadata.fd.GetNumber(), s.ToString().c_str());
- return s;
- }
- if (tp == nullptr) {
- ROCKS_LOG_ERROR(db_options_.info_log,
- "Empty table property for %s level output file #%" PRIu64
- "",
- level_type, metadata.fd.GetNumber());
- s = Status::Corruption("Empty table property for " +
- std::string(level_type) +
- " level output files during resuming");
- return s;
- }
- output_files_table_properties.push_back(tp);
- }
- return s;
- }
- void CompactionJob::RestoreCompactionOutputs(
- const ColumnFamilyData* cfd,
- const std::vector<std::shared_ptr<const TableProperties>>&
- output_files_table_properties,
- SubcompactionProgressPerLevel& subcompaction_progress_per_level,
- CompactionOutputs* outputs_to_restore) {
- assert(outputs_to_restore->GetOutputs().size() == 0);
- const auto& output_files = subcompaction_progress_per_level.GetOutputFiles();
- for (size_t i = 0; i < output_files.size(); i++) {
- FileMetaData file_copy = output_files[i];
- outputs_to_restore->AddOutput(std::move(file_copy),
- cfd->internal_comparator(),
- paranoid_file_checks_, true /* finished */);
- outputs_to_restore->UpdateTableProperties(
- *output_files_table_properties[i]);
- }
- outputs_to_restore->SetNumOutputRecords(
- subcompaction_progress_per_level.GetNumProcessedOutputRecords());
- }
- // Attempt to resume compaction from a previously persisted compaction progress.
- //
- // RETURNS:
- // - Status::OK():
- // * Input iterator positioned at next unprocessed key
- // * CompactionOutputs objects fully restored for both output and proximal
- // output levels in SubcompactionState
- // * Compaction job statistics accurately reflect input and output records
- // processed for record count verification
- // * File number generation advanced to prevent conflicts with existing outputs
- // - Status::NotFound(): No valid progress to resume from
- // - Status::Corruption(): Resume key is invalid, beyond input range, or output
- // restoration failed
- // - Other non-OK status: Iterator errors or file system issues during
- // restoration
- //
- // The caller must check for Status::IsIncomplete() to distinguish between
- // "no resume needed" (proceed with `InternalIterator::SeekToFirst()`) vs
- // "resume failed" scenarios.
- Status CompactionJob::MaybeResumeSubcompactionProgressOnInputIterator(
- SubcompactionState* sub_compact, InternalIterator* input_iter) {
- const ReadOptions read_options(Env::IOActivity::kCompaction);
- ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- SubcompactionProgress& subcompaction_progress =
- sub_compact->GetSubcompactionProgressRef();
- if (subcompaction_progress.output_level_progress
- .GetNumProcessedOutputRecords() == 0 &&
- subcompaction_progress.proximal_output_level_progress
- .GetNumProcessedOutputRecords() == 0) {
- return Status::NotFound("No subcompaction progress to resume");
- }
- ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Resuming compaction : %s",
- cfd->GetName().c_str(), job_id_,
- subcompaction_progress.ToString().c_str());
- input_iter->Seek(subcompaction_progress.next_internal_key_to_compact);
- if (!input_iter->Valid()) {
- ROCKS_LOG_ERROR(db_options_.info_log,
- "[%s] [JOB %d] Iterator is invalid after "
- "seeking to the key to resume. This indicates the key is "
- "incorrectly beyond the input data range.",
- cfd->GetName().c_str(), job_id_);
- return Status::Corruption(
- "The key to resume is beyond the input data range");
- } else if (!input_iter->status().ok()) {
- ROCKS_LOG_ERROR(db_options_.info_log,
- "[%s] [JOB %d] Iterator has error after seeking to "
- "the key to resume: %s",
- cfd->GetName().c_str(), job_id_,
- input_iter->status().ToString().c_str());
- return Status::Corruption(
- "Iterator has error status after seeking to the key: " +
- input_iter->status().ToString());
- }
- sub_compact->compaction_job_stats.has_accurate_num_input_records =
- subcompaction_progress.num_processed_input_records != 0;
- sub_compact->compaction_job_stats.num_input_records =
- subcompaction_progress.num_processed_input_records;
- for (const bool& is_proximal_level : {false, true}) {
- if (is_proximal_level &&
- !sub_compact->compaction->SupportsPerKeyPlacement()) {
- continue;
- }
- Status s;
- SubcompactionProgressPerLevel& subcompaction_progress_per_level =
- is_proximal_level
- ? subcompaction_progress.proximal_output_level_progress
- : subcompaction_progress.output_level_progress;
- const auto& output_files =
- subcompaction_progress_per_level.GetOutputFiles();
- std::vector<std::shared_ptr<const TableProperties>>
- output_files_table_properties;
- // TODO(hx235): investigate if we can skip reading properties to save read
- // IO
- s = ReadOutputFilesTableProperties(output_files, read_options,
- output_files_table_properties);
- if (!s.ok()) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "[%s] [JOB %d] Failed to read table properties for %s output level"
- "files "
- "during resume: %s.",
- cfd->GetName().c_str(), job_id_, is_proximal_level ? "proximal" : "",
- s.ToString().c_str());
- return Status::Corruption(
- "Not able to resume due to table property reading error " +
- s.ToString());
- }
- RestoreCompactionOutputs(cfd, output_files_table_properties,
- subcompaction_progress_per_level,
- sub_compact->Outputs(is_proximal_level));
- // Skip past all the used file numbers to avoid creating new output files
- // after resumption that conflict with the existing output files
- for (const auto& file_meta : output_files) {
- uint64_t file_number = file_meta.fd.GetNumber();
- while (versions_->NewFileNumber() <= file_number) {
- versions_->FetchAddFileNumber(1);
- }
- }
- }
- return Status::OK();
- }
- void CompactionJob::UpdateSubcompactionProgress(
- const CompactionIterator* c_iter, const Slice next_table_min_key,
- SubcompactionState* sub_compact) {
- assert(c_iter);
- SubcompactionProgress& subcompaction_progress =
- sub_compact->GetSubcompactionProgressRef();
- IterKey next_ikey_to_compact;
- next_ikey_to_compact.SetInternalKey(ExtractUserKey(next_table_min_key),
- kMaxSequenceNumber, kValueTypeForSeek);
- subcompaction_progress.next_internal_key_to_compact =
- next_ikey_to_compact.GetInternalKey().ToString();
- // Track total processed input records for progress reporting by combining:
- // - Resumed count: records already processed before compaction was
- // interrupted
- // - Current count: records scanned in the current compaction session
- // Only update when both tracking mechanisms provide accurate counts to ensure
- // reliability.
- subcompaction_progress.num_processed_input_records =
- c_iter->HasNumInputEntryScanned() &&
- sub_compact->compaction_job_stats.has_accurate_num_input_records
- ? c_iter->NumInputEntryScanned() +
- sub_compact->compaction_job_stats.num_input_records
- : 0;
- UpdateSubcompactionProgressPerLevel(
- sub_compact, false /* is_proximal_level */, subcompaction_progress);
- if (sub_compact->compaction->SupportsPerKeyPlacement()) {
- UpdateSubcompactionProgressPerLevel(
- sub_compact, true /* is_proximal_level */, subcompaction_progress);
- }
- }
- void CompactionJob::UpdateSubcompactionProgressPerLevel(
- SubcompactionState* sub_compact, bool is_proximal_level,
- SubcompactionProgress& subcompaction_progress) {
- SubcompactionProgressPerLevel& subcompaction_progress_per_level =
- is_proximal_level ? subcompaction_progress.proximal_output_level_progress
- : subcompaction_progress.output_level_progress;
- subcompaction_progress_per_level.SetNumProcessedOutputRecords(
- sub_compact->OutputStats(is_proximal_level)->num_output_records);
- const auto& prev_output_files =
- subcompaction_progress_per_level.GetOutputFiles();
- const auto& current_output_files =
- sub_compact->Outputs(is_proximal_level)->GetOutputs();
- for (size_t i = prev_output_files.size(); i < current_output_files.size();
- i++) {
- subcompaction_progress_per_level.AddToOutputFiles(
- current_output_files[i].meta);
- }
- }
- Status CompactionJob::PersistSubcompactionProgress(
- SubcompactionState* sub_compact) {
- SubcompactionProgress& subcompaction_progress =
- sub_compact->GetSubcompactionProgressRef();
- assert(compaction_progress_writer_);
- VersionEdit edit;
- edit.SetSubcompactionProgress(subcompaction_progress);
- std::string record;
- if (!edit.EncodeTo(&record)) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "[%s] [JOB %d] Failed to encode subcompaction "
- "progress",
- compact_->compaction->column_family_data()->GetName().c_str(), job_id_);
- return Status::Corruption("Failed to encode subcompaction progress");
- }
- WriteOptions write_options(Env::IOActivity::kCompaction);
- Status s = compaction_progress_writer_->AddRecord(write_options, record);
- IOOptions opts;
- if (s.ok()) {
- s = WritableFileWriter::PrepareIOOptions(write_options, opts);
- }
- if (s.ok()) {
- s = compaction_progress_writer_->file()->Sync(opts, db_options_.use_fsync);
- }
- if (!s.ok()) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "[%s] [JOB %d] Failed to persist subcompaction "
- "progress: %s",
- compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
- s.ToString().c_str());
- return s;
- }
- subcompaction_progress.output_level_progress
- .UpdateLastPersistedOutputFilesCount();
- subcompaction_progress.proximal_output_level_progress
- .UpdateLastPersistedOutputFilesCount();
- return Status::OK();
- }
- Status CompactionJob::VerifyInputRecordCount(
- uint64_t num_input_range_del) const {
- size_t ts_sz = compact_->compaction->column_family_data()
- ->user_comparator()
- ->timestamp_size();
- // When trim_ts_ is non-empty, CompactionIterator takes
- // HistoryTrimmingIterator as input iterator and sees a trimmed view of
- // input keys. So the number of keys it processed is not suitable for
- // verification here.
- // TODO: support verification when trim_ts_ is non-empty.
- if (!(ts_sz > 0 && !trim_ts_.empty())) {
- assert(internal_stats_.output_level_stats.num_input_records > 0);
- // TODO: verify the number of range deletion entries.
- uint64_t expected = internal_stats_.output_level_stats.num_input_records -
- num_input_range_del;
- uint64_t actual = job_stats_->num_input_records;
- if (expected != actual) {
- char scratch[2345];
- compact_->compaction->Summary(scratch, sizeof(scratch));
- std::string msg =
- "Compaction number of input keys does not match "
- "number of keys processed. Expected " +
- std::to_string(expected) + " but processed " +
- std::to_string(actual) + ". Compaction summary: " + scratch;
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "[%s] [JOB %d] VerifyInputRecordCount() Status: %s",
- compact_->compaction->column_family_data()->GetName().c_str(),
- job_context_->job_id, msg.c_str());
- if (db_options_.compaction_verify_record_count) {
- return Status::Corruption(msg);
- }
- }
- }
- return Status::OK();
- }
- Status CompactionJob::VerifyOutputRecordCount() const {
- uint64_t total_output_num = 0;
- for (const auto& state : compact_->sub_compact_states) {
- for (const auto& output : state.GetOutputs()) {
- total_output_num += output.table_properties->num_entries -
- output.table_properties->num_range_deletions;
- }
- }
- uint64_t expected = internal_stats_.output_level_stats.num_output_records;
- if (internal_stats_.has_proximal_level_output) {
- expected += internal_stats_.proximal_level_stats.num_output_records;
- }
- if (expected != total_output_num) {
- char scratch[2345];
- compact_->compaction->Summary(scratch, sizeof(scratch));
- std::string msg =
- "Number of keys in compaction output SST files does not match "
- "number of keys added. Expected " +
- std::to_string(expected) + " but there are " +
- std::to_string(total_output_num) +
- " in output SST files. Compaction summary: " + scratch;
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "[%s] [JOB %d] VerifyOutputRecordCount() status: %s",
- compact_->compaction->column_family_data()->GetName().c_str(),
- job_context_->job_id, msg.c_str());
- if (db_options_.compaction_verify_record_count) {
- return Status::Corruption(msg);
- }
- }
- return Status::OK();
- }
- } // namespace ROCKSDB_NAMESPACE
|