| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066 |
- // Copyright (c) Meta Platforms, Inc. and affiliates.
- //
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/compaction/compaction_job.h"
- #include "db/compaction/compaction_state.h"
- #include "logging/logging.h"
- #include "monitoring/iostats_context_imp.h"
- #include "monitoring/thread_status_util.h"
- #include "options/options_helper.h"
- #include "rocksdb/utilities/options_type.h"
- namespace ROCKSDB_NAMESPACE {
- class SubcompactionState;
- CompactionServiceJobStatus
- CompactionJob::ProcessKeyValueCompactionWithCompactionService(
- SubcompactionState* sub_compact) {
- assert(sub_compact);
- assert(sub_compact->compaction);
- assert(db_options_.compaction_service);
- const Compaction* compaction = sub_compact->compaction;
- CompactionServiceInput compaction_input;
- compaction_input.output_level = compaction->output_level();
- compaction_input.db_id = db_id_;
- const std::vector<CompactionInputFiles>& inputs =
- *(compact_->compaction->inputs());
- for (const auto& files_per_level : inputs) {
- for (const auto& file : files_per_level.files) {
- compaction_input.input_files.emplace_back(
- MakeTableFileName(file->fd.GetNumber()));
- }
- }
- compaction_input.cf_name = compaction->column_family_data()->GetName();
- compaction_input.snapshots = job_context_->snapshot_seqs;
- compaction_input.has_begin = sub_compact->start.has_value();
- compaction_input.begin =
- compaction_input.has_begin ? sub_compact->start->ToString() : "";
- compaction_input.has_end = sub_compact->end.has_value();
- compaction_input.end =
- compaction_input.has_end ? sub_compact->end->ToString() : "";
- compaction_input.options_file_number = options_file_number_;
- TEST_SYNC_POINT_CALLBACK(
- "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
- &compaction_input);
- std::string compaction_input_binary;
- Status s = compaction_input.Write(&compaction_input_binary);
- if (!s.ok()) {
- sub_compact->status = s;
- return CompactionServiceJobStatus::kFailure;
- }
- std::ostringstream input_files_oss;
- bool is_first_one = true;
- for (const auto& file : compaction_input.input_files) {
- input_files_oss << (is_first_one ? "" : ", ") << file;
- is_first_one = false;
- }
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- compaction_input.output_level, input_files_oss.str().c_str());
- CompactionServiceJobInfo info(
- dbname_, db_id_, db_session_id_,
- compaction->column_family_data()->GetID(),
- compaction->column_family_data()->GetName(), GetCompactionId(sub_compact),
- thread_pri_, compaction->compaction_reason(),
- compaction->is_full_compaction(), compaction->is_manual_compaction(),
- compaction->bottommost_level(), compaction->start_level(),
- compaction->output_level());
- CompactionServiceScheduleResponse response =
- db_options_.compaction_service->Schedule(info, compaction_input_binary);
- switch (response.status) {
- case CompactionServiceJobStatus::kSuccess:
- break;
- case CompactionServiceJobStatus::kAborted:
- sub_compact->status =
- Status::Aborted("Scheduling a remote compaction job was aborted");
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "[%s] [JOB %d] Remote compaction was aborted at Schedule()",
- compaction->column_family_data()->GetName().c_str(), job_id_);
- return response.status;
- case CompactionServiceJobStatus::kFailure:
- sub_compact->status = Status::Incomplete(
- "CompactionService failed to schedule a remote compaction job.");
- ROCKS_LOG_WARN(db_options_.info_log,
- "[%s] [JOB %d] Remote compaction failed to start.",
- compaction->column_family_data()->GetName().c_str(),
- job_id_);
- return response.status;
- case CompactionServiceJobStatus::kUseLocal:
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
- compaction->column_family_data()->GetName().c_str(), job_id_);
- return response.status;
- default:
- assert(false); // unknown status
- break;
- }
- std::string debug_str_before_wait =
- compaction->input_version()->DebugString(/*hex=*/true);
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Waiting for remote compaction...",
- compaction->column_family_data()->GetName().c_str(), job_id_);
- std::string compaction_result_binary;
- CompactionServiceJobStatus compaction_status =
- db_options_.compaction_service->Wait(response.scheduled_job_id,
- &compaction_result_binary);
- if (compaction_status != CompactionServiceJobStatus::kSuccess) {
- ROCKS_LOG_ERROR(
- db_options_.info_log,
- "[%s] [JOB %d] Wait() status is not kSuccess. "
- "\nDebugString Before Wait():\n%s"
- "\nDebugString After Wait():\n%s",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- debug_str_before_wait.c_str(),
- compaction->input_version()->DebugString(/*hex=*/true).c_str());
- }
- if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
- compaction->column_family_data()->GetName().c_str(), job_id_);
- return compaction_status;
- }
- if (compaction_status == CompactionServiceJobStatus::kAborted) {
- sub_compact->status =
- Status::Aborted("Waiting a remote compaction job was aborted");
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Remote compaction was aborted during Wait()",
- compaction->column_family_data()->GetName().c_str(),
- job_id_);
- return compaction_status;
- }
- CompactionServiceResult compaction_result;
- s = CompactionServiceResult::Read(compaction_result_binary,
- &compaction_result);
- if (compaction_status == CompactionServiceJobStatus::kFailure) {
- if (s.ok()) {
- if (compaction_result.status.ok()) {
- sub_compact->status = Status::Incomplete(
- "CompactionService failed to run the compaction job (even though "
- "the internal status is okay).");
- } else {
- // set the current sub compaction status with the status returned from
- // remote
- sub_compact->status = compaction_result.status;
- }
- } else {
- sub_compact->status = Status::Incomplete(
- "CompactionService failed to run the compaction job (and no valid "
- "result is returned).");
- compaction_result.status.PermitUncheckedError();
- }
- ROCKS_LOG_WARN(
- db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
- compaction->column_family_data()->GetName().c_str(), job_id_);
- return compaction_status;
- }
- // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
- // read the result. Consider this as an installation failure
- if (!s.ok()) {
- sub_compact->status = s;
- compaction_result.status.PermitUncheckedError();
- db_options_.compaction_service->OnInstallation(
- response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
- return CompactionServiceJobStatus::kFailure;
- }
- sub_compact->status = compaction_result.status;
- std::ostringstream output_files_oss;
- is_first_one = true;
- for (const auto& file : compaction_result.output_files) {
- output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
- is_first_one = false;
- }
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "[%s] [JOB %d] Received remote compaction result, output path: "
- "%s, files: %s",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- compaction_result.output_path.c_str(), output_files_oss.str().c_str());
- // Installation Starts
- for (const auto& file : compaction_result.output_files) {
- uint64_t file_num = versions_->NewFileNumber();
- auto src_file = compaction_result.output_path + "/" + file.file_name;
- auto tgt_file = TableFileName(compaction->immutable_options().cf_paths,
- file_num, compaction->output_path_id());
- s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
- if (!s.ok()) {
- sub_compact->status = s;
- db_options_.compaction_service->OnInstallation(
- response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
- return CompactionServiceJobStatus::kFailure;
- }
- FileMetaData meta;
- uint64_t file_size = file.file_size;
- // TODO - Clean this up in the next release.
- // For backward compatibility - in case the remote worker does not populate
- // the file_size yet. If missing, continue to populate this from the file
- // system.
- if (file_size == 0) {
- s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
- }
- if (!s.ok()) {
- sub_compact->status = s;
- db_options_.compaction_service->OnInstallation(
- response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
- return CompactionServiceJobStatus::kFailure;
- }
- assert(file_size > 0);
- meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
- file.smallest_seqno, file.largest_seqno);
- meta.smallest.DecodeFrom(file.smallest_internal_key);
- meta.largest.DecodeFrom(file.largest_internal_key);
- meta.oldest_ancester_time = file.oldest_ancester_time;
- meta.file_creation_time = file.file_creation_time;
- meta.epoch_number = file.epoch_number;
- meta.file_checksum = file.file_checksum;
- meta.file_checksum_func_name = file.file_checksum_func_name;
- meta.marked_for_compaction = file.marked_for_compaction;
- meta.unique_id = file.unique_id;
- meta.temperature = file.file_temperature;
- meta.tail_size =
- FileMetaData::CalculateTailSize(file_size, file.table_properties);
- auto cfd = compaction->column_family_data();
- CompactionOutputs* compaction_outputs =
- sub_compact->Outputs(file.is_proximal_level_output);
- assert(compaction_outputs);
- compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),
- false, true, file.paranoid_hash);
- compaction_outputs->UpdateTableProperties(file.table_properties);
- }
- // Set per-level stats
- auto compaction_output_stats =
- sub_compact->OutputStats(false /* is_proximal_level */);
- assert(compaction_output_stats);
- compaction_output_stats->Add(
- compaction_result.internal_stats.output_level_stats);
- if (compaction->SupportsPerKeyPlacement()) {
- compaction_output_stats =
- sub_compact->OutputStats(true /* is_proximal_level */);
- assert(compaction_output_stats);
- compaction_output_stats->Add(
- compaction_result.internal_stats.proximal_level_stats);
- }
- // Set job stats
- sub_compact->compaction_job_stats = compaction_result.stats;
- RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
- RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
- compaction_result.bytes_written);
- db_options_.compaction_service->OnInstallation(
- response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
- return CompactionServiceJobStatus::kSuccess;
- }
- std::string CompactionServiceCompactionJob::GetTableFileName(
- uint64_t file_number) {
- return MakeTableFileName(output_path_, file_number);
- }
- void CompactionServiceCompactionJob::RecordCompactionIOStats() {
- compaction_result_->bytes_read += IOSTATS(bytes_read);
- compaction_result_->bytes_written += IOSTATS(bytes_written);
- CompactionJob::RecordCompactionIOStats();
- }
- CompactionServiceCompactionJob::CompactionServiceCompactionJob(
- int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
- const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
- VersionSet* versions, const std::atomic<bool>* shutting_down,
- LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
- InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
- JobContext* job_context, std::shared_ptr<Cache> table_cache,
- EventLogger* event_logger, const std::string& dbname,
- const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<bool>& manual_compaction_canceled,
- const std::string& db_id, const std::string& db_session_id,
- std::string output_path,
- const CompactionServiceInput& compaction_service_input,
- CompactionServiceResult* compaction_service_result)
- : CompactionJob(job_id, compaction, db_options, mutable_db_options,
- file_options, versions, shutting_down, log_buffer, nullptr,
- output_directory, nullptr, stats, db_mutex,
- db_error_handler, job_context, std::move(table_cache),
- event_logger,
- compaction->mutable_cf_options().paranoid_file_checks,
- compaction->mutable_cf_options().report_bg_io_stats, dbname,
- &(compaction_service_result->stats), Env::Priority::USER,
- io_tracer, manual_compaction_canceled, db_id, db_session_id,
- compaction->column_family_data()->GetFullHistoryTsLow()),
- output_path_(std::move(output_path)),
- compaction_input_(compaction_service_input),
- compaction_result_(compaction_service_result) {}
- void CompactionServiceCompactionJob::Prepare(
- const CompactionProgress& compaction_progress,
- log::Writer* compaction_progress_writer) {
- std::optional<Slice> begin;
- if (compaction_input_.has_begin) {
- begin = compaction_input_.begin;
- }
- std::optional<Slice> end;
- if (compaction_input_.has_end) {
- end = compaction_input_.end;
- }
- CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress,
- compaction_progress_writer);
- }
- Status CompactionServiceCompactionJob::Run() {
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_COMPACTION_RUN);
- auto* c = compact_->compaction;
- log_buffer_->FlushBufferToLog();
- LogCompaction();
- compaction_result_->stats.Reset();
- const uint64_t start_micros = db_options_.clock->NowMicros();
- c->GetOrInitInputTableProperties();
- // Pick the only sub-compaction we should have
- assert(compact_->sub_compact_states.size() == 1);
- SubcompactionState* sub_compact = compact_->sub_compact_states.data();
- ProcessKeyValueCompaction(sub_compact);
- uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;
- internal_stats_.SetMicros(elapsed_micros);
- internal_stats_.AddCpuMicros(elapsed_micros);
- RecordTimeToHistogram(stats_, COMPACTION_TIME,
- internal_stats_.output_level_stats.micros);
- RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
- internal_stats_.output_level_stats.cpu_micros);
- Status status = sub_compact->status;
- IOStatus io_s = sub_compact->io_status;
- if (io_status_.ok()) {
- io_status_ = io_s;
- }
- if (status.ok()) {
- constexpr IODebugContext* dbg = nullptr;
- if (output_directory_) {
- io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
- DirFsyncOptions());
- }
- }
- if (io_status_.ok()) {
- io_status_ = io_s;
- }
- if (status.ok()) {
- status = io_s;
- }
- LogFlush(db_options_.info_log);
- compact_->status = status;
- compact_->status.PermitUncheckedError();
- // Build Compaction Job Stats
- // 1. Aggregate internal stats and job stats for all subcompactions
- // internal stats: sub_compact.proximal_level_outputs_.stats and
- // sub_compact.compaction_outputs_.stats into
- // internal_stats_.output_level_stats and
- // internal_stats_.proximal_level_stats
- // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_
- //
- // For remote compaction, there's only one subcompaction.
- compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
- // 2. Update job-level output stats with the aggregated internal_stats_
- // Please note that input stats will be updated by primary host when all
- // subcompactions are finished
- UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_);
- // and set fields that are not propagated as part of the update
- compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
- compaction_result_->stats.is_full_compaction = c->is_full_compaction();
- compaction_result_->stats.is_remote_compaction = true;
- // 3. Update IO Stats that are not part of the the update above
- // (bytes_read, bytes_written)
- RecordCompactionIOStats();
- // Build Output
- compaction_result_->internal_stats = internal_stats_;
- compaction_result_->output_level = compact_->compaction->output_level();
- compaction_result_->output_path = output_path_;
- if (status.ok()) {
- for (const auto& output_file : sub_compact->GetOutputs()) {
- auto& meta = output_file.meta;
- compaction_result_->output_files.emplace_back(
- MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(),
- meta.fd.smallest_seqno, meta.fd.largest_seqno,
- meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(),
- meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number,
- meta.file_checksum, meta.file_checksum_func_name,
- output_file.validator.GetHash(), meta.marked_for_compaction,
- meta.unique_id, *output_file.table_properties,
- output_file.is_proximal_level, meta.temperature);
- }
- }
- TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
- &compaction_result_);
- return status;
- }
- void CompactionServiceCompactionJob::CleanupCompaction() {
- CompactionJob::CleanupCompaction();
- }
- // Internal binary format for the input and result data
- enum BinaryFormatVersion : uint32_t {
- kOptionsString = 1, // Use string format similar to Option string format
- };
- static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
- {"name",
- {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"options",
- {offsetof(struct ColumnFamilyDescriptor, options),
- OptionType::kConfigurable, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone,
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const std::string& value, void* addr) {
- auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
- return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
- value, cf_options);
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr, std::string* value) {
- const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
- std::string result;
- auto status =
- GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
- *value = "{" + result + "}";
- return status;
- },
- [](const ConfigOptions& opts, const std::string& name, const void* addr1,
- const void* addr2, std::string* mismatch) {
- const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
- const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
- auto this_conf = CFOptionsAsConfigurable(*this_one);
- auto that_conf = CFOptionsAsConfigurable(*that_one);
- std::string mismatch_opt;
- bool result =
- this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
- if (!result) {
- *mismatch = name + "." + mismatch_opt;
- }
- return result;
- }}},
- };
- static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
- {"cf_name",
- {offsetof(struct CompactionServiceInput, cf_name),
- OptionType::kEncodedString}},
- {"snapshots", OptionTypeInfo::Vector<uint64_t>(
- offsetof(struct CompactionServiceInput, snapshots),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone,
- {0, OptionType::kUInt64T})},
- {"input_files", OptionTypeInfo::Vector<std::string>(
- offsetof(struct CompactionServiceInput, input_files),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone,
- {0, OptionType::kEncodedString})},
- {"output_level",
- {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"db_id",
- {offsetof(struct CompactionServiceInput, db_id),
- OptionType::kEncodedString}},
- {"has_begin",
- {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"begin",
- {offsetof(struct CompactionServiceInput, begin),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"has_end",
- {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"end",
- {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"options_file_number",
- {offsetof(struct CompactionServiceInput, options_file_number),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- };
- static std::unordered_map<std::string, OptionTypeInfo>
- cs_output_file_type_info = {
- {"file_name",
- {offsetof(struct CompactionServiceOutputFile, file_name),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_size",
- {offsetof(struct CompactionServiceOutputFile, file_size),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"smallest_seqno",
- {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"largest_seqno",
- {offsetof(struct CompactionServiceOutputFile, largest_seqno),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"smallest_internal_key",
- {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"largest_internal_key",
- {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"oldest_ancester_time",
- {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_creation_time",
- {offsetof(struct CompactionServiceOutputFile, file_creation_time),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"epoch_number",
- {offsetof(struct CompactionServiceOutputFile, epoch_number),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_checksum",
- {offsetof(struct CompactionServiceOutputFile, file_checksum),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_checksum_func_name",
- {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"paranoid_hash",
- {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"marked_for_compaction",
- {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"unique_id",
- OptionTypeInfo::Array<uint64_t, 2>(
- offsetof(struct CompactionServiceOutputFile, unique_id),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone,
- {0, OptionType::kUInt64T})},
- {"table_properties",
- {offsetof(struct CompactionServiceOutputFile, table_properties),
- OptionType::kStruct, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone,
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const std::string& value, void* addr) {
- auto table_properties = static_cast<TableProperties*>(addr);
- return TableProperties::Parse(opts, value, table_properties);
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr, std::string* value) {
- const auto table_properties =
- static_cast<const TableProperties*>(addr);
- std::string result;
- auto status = table_properties->Serialize(opts, &result);
- *value = "{" + result + "}";
- return status;
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr1, const void* addr2, std::string* mismatch) {
- const auto this_one = static_cast<const TableProperties*>(addr1);
- const auto that_one = static_cast<const TableProperties*>(addr2);
- return this_one->AreEqual(opts, that_one, mismatch);
- }}},
- {"is_proximal_level_output",
- {offsetof(struct CompactionServiceOutputFile,
- is_proximal_level_output),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_temperature",
- {offsetof(struct CompactionServiceOutputFile, file_temperature),
- OptionType::kTemperature, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}}};
- static std::unordered_map<std::string, OptionTypeInfo>
- compaction_job_stats_type_info = {
- {"elapsed_micros",
- {offsetof(struct CompactionJobStats, elapsed_micros),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"cpu_micros",
- {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"num_input_records",
- {offsetof(struct CompactionJobStats, num_input_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_blobs_read",
- {offsetof(struct CompactionJobStats, num_blobs_read),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_files",
- {offsetof(struct CompactionJobStats, num_input_files),
- OptionType::kSizeT, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_files_at_output_level",
- {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
- OptionType::kSizeT, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_records",
- {offsetof(struct CompactionJobStats, num_output_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_files",
- {offsetof(struct CompactionJobStats, num_output_files),
- OptionType::kSizeT, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_files_blob",
- {offsetof(struct CompactionJobStats, num_output_files_blob),
- OptionType::kSizeT, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"is_full_compaction",
- {offsetof(struct CompactionJobStats, is_full_compaction),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"is_manual_compaction",
- {offsetof(struct CompactionJobStats, is_manual_compaction),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"is_remote_compaction",
- {offsetof(struct CompactionJobStats, is_remote_compaction),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_input_bytes",
- {offsetof(struct CompactionJobStats, total_input_bytes),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_blob_bytes_read",
- {offsetof(struct CompactionJobStats, total_blob_bytes_read),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_output_bytes",
- {offsetof(struct CompactionJobStats, total_output_bytes),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_output_bytes_blob",
- {offsetof(struct CompactionJobStats, total_output_bytes_blob),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_records_replaced",
- {offsetof(struct CompactionJobStats, num_records_replaced),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_input_raw_key_bytes",
- {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"total_input_raw_value_bytes",
- {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_deletion_records",
- {offsetof(struct CompactionJobStats, num_input_deletion_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_expired_deletion_records",
- {offsetof(struct CompactionJobStats, num_expired_deletion_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_corrupt_keys",
- {offsetof(struct CompactionJobStats, num_corrupt_keys),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_write_nanos",
- {offsetof(struct CompactionJobStats, file_write_nanos),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_range_sync_nanos",
- {offsetof(struct CompactionJobStats, file_range_sync_nanos),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_fsync_nanos",
- {offsetof(struct CompactionJobStats, file_fsync_nanos),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"file_prepare_write_nanos",
- {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"smallest_output_key_prefix",
- {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"largest_output_key_prefix",
- {offsetof(struct CompactionJobStats, largest_output_key_prefix),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_single_del_fallthru",
- {offsetof(struct CompactionJobStats, num_single_del_fallthru),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_single_del_mismatch",
- {offsetof(struct CompactionJobStats, num_single_del_mismatch),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- };
- static std::unordered_map<std::string, OptionTypeInfo>
- compaction_stats_type_info = {
- {"micros",
- {offsetof(struct InternalStats::CompactionStats, micros),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"cpu_micros",
- {offsetof(struct InternalStats::CompactionStats, cpu_micros),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_read_non_output_levels",
- {offsetof(struct InternalStats::CompactionStats,
- bytes_read_non_output_levels),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_read_output_level",
- {offsetof(struct InternalStats::CompactionStats,
- bytes_read_output_level),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_skipped_non_output_levels",
- {offsetof(struct InternalStats::CompactionStats,
- bytes_skipped_non_output_levels),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_skipped_output_level",
- {offsetof(struct InternalStats::CompactionStats,
- bytes_skipped_output_level),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_read_blob",
- {offsetof(struct InternalStats::CompactionStats, bytes_read_blob),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_written",
- {offsetof(struct InternalStats::CompactionStats, bytes_written),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_written_blob",
- {offsetof(struct InternalStats::CompactionStats, bytes_written_blob),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_moved",
- {offsetof(struct InternalStats::CompactionStats, bytes_moved),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_files_in_non_output_levels",
- {offsetof(struct InternalStats::CompactionStats,
- num_input_files_in_non_output_levels),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_files_in_output_level",
- {offsetof(struct InternalStats::CompactionStats,
- num_input_files_in_output_level),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_filtered_input_files_in_non_output_levels",
- {offsetof(struct InternalStats::CompactionStats,
- num_filtered_input_files_in_non_output_levels),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_filtered_input_files_in_output_level",
- {offsetof(struct InternalStats::CompactionStats,
- num_filtered_input_files_in_output_level),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_files",
- {offsetof(struct InternalStats::CompactionStats, num_output_files),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_files_blob",
- {offsetof(struct InternalStats::CompactionStats,
- num_output_files_blob),
- OptionType::kInt, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_input_records",
- {offsetof(struct InternalStats::CompactionStats, num_input_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_dropped_records",
- {offsetof(struct InternalStats::CompactionStats, num_dropped_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"num_output_records",
- {offsetof(struct InternalStats::CompactionStats, num_output_records),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"count",
- {offsetof(struct InternalStats::CompactionStats, count),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"counts", OptionTypeInfo::Array<
- int, static_cast<int>(CompactionReason::kNumOfReasons)>(
- offsetof(struct InternalStats::CompactionStats, counts),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone,
- {0, OptionType::kInt})},
- };
- static std::unordered_map<std::string, OptionTypeInfo>
- compaction_internal_stats_type_info = {
- {"output_level_stats",
- OptionTypeInfo::Struct(
- "output_level_stats", &compaction_stats_type_info,
- offsetof(struct InternalStats::CompactionStatsFull,
- output_level_stats),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
- {"has_proximal_level_output",
- {offsetof(struct InternalStats::CompactionStatsFull,
- has_proximal_level_output),
- OptionType::kBoolean, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"proximal_level_stats",
- OptionTypeInfo::Struct(
- "proximal_level_stats", &compaction_stats_type_info,
- offsetof(struct InternalStats::CompactionStatsFull,
- proximal_level_stats),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
- };
- namespace {
- // this is a helper struct to serialize and deserialize class Status, because
- // Status's members are not public.
- struct StatusSerializationAdapter {
- uint8_t code;
- uint8_t subcode;
- uint8_t severity;
- std::string message;
- StatusSerializationAdapter() = default;
- explicit StatusSerializationAdapter(const Status& s) {
- code = s.code();
- subcode = s.subcode();
- severity = s.severity();
- auto msg = s.getState();
- message = msg ? msg : "";
- }
- Status GetStatus() const {
- return Status{static_cast<Status::Code>(code),
- static_cast<Status::SubCode>(subcode),
- static_cast<Status::Severity>(severity), message};
- }
- };
- } // namespace
- static std::unordered_map<std::string, OptionTypeInfo>
- status_adapter_type_info = {
- {"code",
- {offsetof(struct StatusSerializationAdapter, code),
- OptionType::kUInt8T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"subcode",
- {offsetof(struct StatusSerializationAdapter, subcode),
- OptionType::kUInt8T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"severity",
- {offsetof(struct StatusSerializationAdapter, severity),
- OptionType::kUInt8T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"message",
- {offsetof(struct StatusSerializationAdapter, message),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- };
- static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
- {"status",
- {offsetof(struct CompactionServiceResult, status),
- OptionType::kCustomizable, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone,
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const std::string& value, void* addr) {
- auto status_obj = static_cast<Status*>(addr);
- StatusSerializationAdapter adapter;
- Status s = OptionTypeInfo::ParseType(
- opts, value, status_adapter_type_info, &adapter);
- *status_obj = adapter.GetStatus();
- return s;
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr, std::string* value) {
- const auto status_obj = static_cast<const Status*>(addr);
- StatusSerializationAdapter adapter(*status_obj);
- std::string result;
- Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
- &adapter, &result);
- *value = "{" + result + "}";
- return s;
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr1, const void* addr2, std::string* mismatch) {
- const auto status1 = static_cast<const Status*>(addr1);
- const auto status2 = static_cast<const Status*>(addr2);
- StatusSerializationAdapter adatper1(*status1);
- StatusSerializationAdapter adapter2(*status2);
- return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
- &adatper1, &adapter2, mismatch);
- }}},
- {"output_files",
- OptionTypeInfo::Vector<CompactionServiceOutputFile>(
- offsetof(struct CompactionServiceResult, output_files),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone,
- OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
- OptionVerificationType::kNormal,
- OptionTypeFlags::kNone))},
- {"output_level",
- {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
- OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
- {"output_path",
- {offsetof(struct CompactionServiceResult, output_path),
- OptionType::kEncodedString, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_read",
- {offsetof(struct CompactionServiceResult, bytes_read),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"bytes_written",
- {offsetof(struct CompactionServiceResult, bytes_written),
- OptionType::kUInt64T, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone}},
- {"stats", OptionTypeInfo::Struct(
- "stats", &compaction_job_stats_type_info,
- offsetof(struct CompactionServiceResult, stats),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
- {"internal_stats",
- OptionTypeInfo::Struct(
- "internal_stats", &compaction_internal_stats_type_info,
- offsetof(struct CompactionServiceResult, internal_stats),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
- };
- Status CompactionServiceInput::Read(const std::string& data_str,
- CompactionServiceInput* obj) {
- if (data_str.size() <= sizeof(BinaryFormatVersion)) {
- return Status::InvalidArgument("Invalid CompactionServiceInput string");
- }
- auto format_version = DecodeFixed32(data_str.data());
- if (format_version == kOptionsString) {
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- cf.ignore_unknown_options = true;
- return OptionTypeInfo::ParseType(
- cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
- obj);
- } else {
- return Status::NotSupported(
- "Compaction Service Input data version not supported: " +
- std::to_string(format_version));
- }
- }
- Status CompactionServiceInput::Write(std::string* output) {
- char buf[sizeof(BinaryFormatVersion)];
- EncodeFixed32(buf, kOptionsString);
- output->append(buf, sizeof(BinaryFormatVersion));
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
- }
- Status CompactionServiceResult::Read(const std::string& data_str,
- CompactionServiceResult* obj) {
- if (data_str.size() <= sizeof(BinaryFormatVersion)) {
- return Status::InvalidArgument("Invalid CompactionServiceResult string");
- }
- auto format_version = DecodeFixed32(data_str.data());
- if (format_version == kOptionsString) {
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- cf.ignore_unknown_options = true;
- return OptionTypeInfo::ParseType(
- cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
- obj);
- } else {
- return Status::NotSupported(
- "Compaction Service Result data version not supported: " +
- std::to_string(format_version));
- }
- }
- Status CompactionServiceResult::Write(std::string* output) {
- char buf[sizeof(BinaryFormatVersion)];
- EncodeFixed32(buf, kOptionsString);
- output->append(buf, sizeof(BinaryFormatVersion));
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
- }
- #ifndef NDEBUG
- bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
- std::string mismatch;
- return TEST_Equals(other, &mismatch);
- }
- bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
- std::string* mismatch) {
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
- mismatch);
- }
- bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
- std::string mismatch;
- return TEST_Equals(other, &mismatch);
- }
- bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
- std::string* mismatch) {
- ConfigOptions cf;
- cf.invoke_prepare_options = false;
- return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
- mismatch);
- }
- #endif // NDEBUG
- } // namespace ROCKSDB_NAMESPACE
|