| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066 | //  Copyright (c) Meta Platforms, Inc. and affiliates.////  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "db/compaction/compaction_job.h"#include "db/compaction/compaction_state.h"#include "logging/logging.h"#include "monitoring/iostats_context_imp.h"#include "monitoring/thread_status_util.h"#include "options/options_helper.h"#include "rocksdb/utilities/options_type.h"namespace ROCKSDB_NAMESPACE {class SubcompactionState;CompactionServiceJobStatusCompactionJob::ProcessKeyValueCompactionWithCompactionService(    SubcompactionState* sub_compact) {  assert(sub_compact);  assert(sub_compact->compaction);  assert(db_options_.compaction_service);  const Compaction* compaction = sub_compact->compaction;  CompactionServiceInput compaction_input;  compaction_input.output_level = compaction->output_level();  compaction_input.db_id = db_id_;  const std::vector<CompactionInputFiles>& inputs =      *(compact_->compaction->inputs());  for (const auto& files_per_level : inputs) {    for (const auto& file : files_per_level.files) {      compaction_input.input_files.emplace_back(          MakeTableFileName(file->fd.GetNumber()));    }  }  compaction_input.cf_name = compaction->column_family_data()->GetName();  compaction_input.snapshots = job_context_->snapshot_seqs;  compaction_input.has_begin = sub_compact->start.has_value();  compaction_input.begin =      compaction_input.has_begin ? sub_compact->start->ToString() : "";  compaction_input.has_end = sub_compact->end.has_value();  compaction_input.end =      compaction_input.has_end ? sub_compact->end->ToString() : "";  compaction_input.options_file_number = options_file_number_;  TEST_SYNC_POINT_CALLBACK(      "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",      &compaction_input);  std::string compaction_input_binary;  Status s = compaction_input.Write(&compaction_input_binary);  if (!s.ok()) {    sub_compact->status = s;    return CompactionServiceJobStatus::kFailure;  }  std::ostringstream input_files_oss;  bool is_first_one = true;  for (const auto& file : compaction_input.input_files) {    input_files_oss << (is_first_one ? "" : ", ") << file;    is_first_one = false;  }  ROCKS_LOG_INFO(      db_options_.info_log,      "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",      compaction->column_family_data()->GetName().c_str(), job_id_,      compaction_input.output_level, input_files_oss.str().c_str());  CompactionServiceJobInfo info(      dbname_, db_id_, db_session_id_,      compaction->column_family_data()->GetID(),      compaction->column_family_data()->GetName(), GetCompactionId(sub_compact),      thread_pri_, compaction->compaction_reason(),      compaction->is_full_compaction(), compaction->is_manual_compaction(),      compaction->bottommost_level(), compaction->start_level(),      compaction->output_level());  CompactionServiceScheduleResponse response =      db_options_.compaction_service->Schedule(info, compaction_input_binary);  switch (response.status) {    case CompactionServiceJobStatus::kSuccess:      break;    case CompactionServiceJobStatus::kAborted:      sub_compact->status =          Status::Aborted("Scheduling a remote compaction job was aborted");      ROCKS_LOG_WARN(          db_options_.info_log,          "[%s] [JOB %d] Remote compaction was aborted at Schedule()",          compaction->column_family_data()->GetName().c_str(), job_id_);      return response.status;    case CompactionServiceJobStatus::kFailure:      sub_compact->status = Status::Incomplete(          "CompactionService failed to schedule a remote compaction job.");      ROCKS_LOG_WARN(db_options_.info_log,                     "[%s] [JOB %d] Remote compaction failed to start.",                     compaction->column_family_data()->GetName().c_str(),                     job_id_);      return response.status;    case CompactionServiceJobStatus::kUseLocal:      ROCKS_LOG_INFO(          db_options_.info_log,          "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",          compaction->column_family_data()->GetName().c_str(), job_id_);      return response.status;    default:      assert(false);  // unknown status      break;  }  std::string debug_str_before_wait =      compaction->input_version()->DebugString(/*hex=*/true);  ROCKS_LOG_INFO(db_options_.info_log,                 "[%s] [JOB %d] Waiting for remote compaction...",                 compaction->column_family_data()->GetName().c_str(), job_id_);  std::string compaction_result_binary;  CompactionServiceJobStatus compaction_status =      db_options_.compaction_service->Wait(response.scheduled_job_id,                                           &compaction_result_binary);  if (compaction_status != CompactionServiceJobStatus::kSuccess) {    ROCKS_LOG_ERROR(        db_options_.info_log,        "[%s] [JOB %d] Wait() status is not kSuccess. "        "\nDebugString Before Wait():\n%s"        "\nDebugString After Wait():\n%s",        compaction->column_family_data()->GetName().c_str(), job_id_,        debug_str_before_wait.c_str(),        compaction->input_version()->DebugString(/*hex=*/true).c_str());  }  if (compaction_status == CompactionServiceJobStatus::kUseLocal) {    ROCKS_LOG_INFO(        db_options_.info_log,        "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",        compaction->column_family_data()->GetName().c_str(), job_id_);    return compaction_status;  }  if (compaction_status == CompactionServiceJobStatus::kAborted) {    sub_compact->status =        Status::Aborted("Waiting a remote compaction job was aborted");    ROCKS_LOG_INFO(db_options_.info_log,                   "[%s] [JOB %d] Remote compaction was aborted during Wait()",                   compaction->column_family_data()->GetName().c_str(),                   job_id_);    return compaction_status;  }  CompactionServiceResult compaction_result;  s = CompactionServiceResult::Read(compaction_result_binary,                                    &compaction_result);  if (compaction_status == CompactionServiceJobStatus::kFailure) {    if (s.ok()) {      if (compaction_result.status.ok()) {        sub_compact->status = Status::Incomplete(            "CompactionService failed to run the compaction job (even though "            "the internal status is okay).");      } else {        // set the current sub compaction status with the status returned from        // remote        sub_compact->status = compaction_result.status;      }    } else {      sub_compact->status = Status::Incomplete(          "CompactionService failed to run the compaction job (and no valid "          "result is returned).");      compaction_result.status.PermitUncheckedError();    }    ROCKS_LOG_WARN(        db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",        compaction->column_family_data()->GetName().c_str(), job_id_);    return compaction_status;  }  // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to  // read the result. Consider this as an installation failure  if (!s.ok()) {    sub_compact->status = s;    compaction_result.status.PermitUncheckedError();    db_options_.compaction_service->OnInstallation(        response.scheduled_job_id, CompactionServiceJobStatus::kFailure);    return CompactionServiceJobStatus::kFailure;  }  sub_compact->status = compaction_result.status;  std::ostringstream output_files_oss;  is_first_one = true;  for (const auto& file : compaction_result.output_files) {    output_files_oss << (is_first_one ? "" : ", ") << file.file_name;    is_first_one = false;  }  ROCKS_LOG_INFO(      db_options_.info_log,      "[%s] [JOB %d] Received remote compaction result, output path: "      "%s, files: %s",      compaction->column_family_data()->GetName().c_str(), job_id_,      compaction_result.output_path.c_str(), output_files_oss.str().c_str());  // Installation Starts  for (const auto& file : compaction_result.output_files) {    uint64_t file_num = versions_->NewFileNumber();    auto src_file = compaction_result.output_path + "/" + file.file_name;    auto tgt_file = TableFileName(compaction->immutable_options().cf_paths,                                  file_num, compaction->output_path_id());    s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);    if (!s.ok()) {      sub_compact->status = s;      db_options_.compaction_service->OnInstallation(          response.scheduled_job_id, CompactionServiceJobStatus::kFailure);      return CompactionServiceJobStatus::kFailure;    }    FileMetaData meta;    uint64_t file_size = file.file_size;    // TODO - Clean this up in the next release.    // For backward compatibility - in case the remote worker does not populate    // the file_size yet. If missing, continue to populate this from the file    // system.    if (file_size == 0) {      s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);    }    if (!s.ok()) {      sub_compact->status = s;      db_options_.compaction_service->OnInstallation(          response.scheduled_job_id, CompactionServiceJobStatus::kFailure);      return CompactionServiceJobStatus::kFailure;    }    assert(file_size > 0);    meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,                             file.smallest_seqno, file.largest_seqno);    meta.smallest.DecodeFrom(file.smallest_internal_key);    meta.largest.DecodeFrom(file.largest_internal_key);    meta.oldest_ancester_time = file.oldest_ancester_time;    meta.file_creation_time = file.file_creation_time;    meta.epoch_number = file.epoch_number;    meta.file_checksum = file.file_checksum;    meta.file_checksum_func_name = file.file_checksum_func_name;    meta.marked_for_compaction = file.marked_for_compaction;    meta.unique_id = file.unique_id;    meta.temperature = file.file_temperature;    meta.tail_size =        FileMetaData::CalculateTailSize(file_size, file.table_properties);    auto cfd = compaction->column_family_data();    CompactionOutputs* compaction_outputs =        sub_compact->Outputs(file.is_proximal_level_output);    assert(compaction_outputs);    compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),                                  false, true, file.paranoid_hash);    compaction_outputs->UpdateTableProperties(file.table_properties);  }  // Set per-level stats  auto compaction_output_stats =      sub_compact->OutputStats(false /* is_proximal_level */);  assert(compaction_output_stats);  compaction_output_stats->Add(      compaction_result.internal_stats.output_level_stats);  if (compaction->SupportsPerKeyPlacement()) {    compaction_output_stats =        sub_compact->OutputStats(true /* is_proximal_level */);    assert(compaction_output_stats);    compaction_output_stats->Add(        compaction_result.internal_stats.proximal_level_stats);  }  // Set job stats  sub_compact->compaction_job_stats = compaction_result.stats;  RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);  RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,             compaction_result.bytes_written);  db_options_.compaction_service->OnInstallation(      response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);  return CompactionServiceJobStatus::kSuccess;}std::string CompactionServiceCompactionJob::GetTableFileName(    uint64_t file_number) {  return MakeTableFileName(output_path_, file_number);}void CompactionServiceCompactionJob::RecordCompactionIOStats() {  compaction_result_->bytes_read += IOSTATS(bytes_read);  compaction_result_->bytes_written += IOSTATS(bytes_written);  CompactionJob::RecordCompactionIOStats();}CompactionServiceCompactionJob::CompactionServiceCompactionJob(    int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,    const MutableDBOptions& mutable_db_options, const FileOptions& file_options,    VersionSet* versions, const std::atomic<bool>* shutting_down,    LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,    InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,    JobContext* job_context, std::shared_ptr<Cache> table_cache,    EventLogger* event_logger, const std::string& dbname,    const std::shared_ptr<IOTracer>& io_tracer,    const std::atomic<bool>& manual_compaction_canceled,    const std::string& db_id, const std::string& db_session_id,    std::string output_path,    const CompactionServiceInput& compaction_service_input,    CompactionServiceResult* compaction_service_result)    : CompactionJob(job_id, compaction, db_options, mutable_db_options,                    file_options, versions, shutting_down, log_buffer, nullptr,                    output_directory, nullptr, stats, db_mutex,                    db_error_handler, job_context, std::move(table_cache),                    event_logger,                    compaction->mutable_cf_options().paranoid_file_checks,                    compaction->mutable_cf_options().report_bg_io_stats, dbname,                    &(compaction_service_result->stats), Env::Priority::USER,                    io_tracer, manual_compaction_canceled, db_id, db_session_id,                    compaction->column_family_data()->GetFullHistoryTsLow()),      output_path_(std::move(output_path)),      compaction_input_(compaction_service_input),      compaction_result_(compaction_service_result) {}void CompactionServiceCompactionJob::Prepare(    const CompactionProgress& compaction_progress,    log::Writer* compaction_progress_writer) {  std::optional<Slice> begin;  if (compaction_input_.has_begin) {    begin = compaction_input_.begin;  }  std::optional<Slice> end;  if (compaction_input_.has_end) {    end = compaction_input_.end;  }  CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress,                         compaction_progress_writer);}Status CompactionServiceCompactionJob::Run() {  AutoThreadOperationStageUpdater stage_updater(      ThreadStatus::STAGE_COMPACTION_RUN);  auto* c = compact_->compaction;  log_buffer_->FlushBufferToLog();  LogCompaction();  compaction_result_->stats.Reset();  const uint64_t start_micros = db_options_.clock->NowMicros();  c->GetOrInitInputTableProperties();  // Pick the only sub-compaction we should have  assert(compact_->sub_compact_states.size() == 1);  SubcompactionState* sub_compact = compact_->sub_compact_states.data();  ProcessKeyValueCompaction(sub_compact);  uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;  internal_stats_.SetMicros(elapsed_micros);  internal_stats_.AddCpuMicros(elapsed_micros);  RecordTimeToHistogram(stats_, COMPACTION_TIME,                        internal_stats_.output_level_stats.micros);  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,                        internal_stats_.output_level_stats.cpu_micros);  Status status = sub_compact->status;  IOStatus io_s = sub_compact->io_status;  if (io_status_.ok()) {    io_status_ = io_s;  }  if (status.ok()) {    constexpr IODebugContext* dbg = nullptr;    if (output_directory_) {      io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,                                                    DirFsyncOptions());    }  }  if (io_status_.ok()) {    io_status_ = io_s;  }  if (status.ok()) {    status = io_s;  }  LogFlush(db_options_.info_log);  compact_->status = status;  compact_->status.PermitUncheckedError();  // Build Compaction Job Stats  // 1. Aggregate internal stats and job stats for all subcompactions  // internal stats: sub_compact.proximal_level_outputs_.stats and  //                 sub_compact.compaction_outputs_.stats into  //                 internal_stats_.output_level_stats and  //                 internal_stats_.proximal_level_stats  // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_  //  // For remote compaction, there's only one subcompaction.  compact_->AggregateCompactionStats(internal_stats_, *job_stats_);  // 2. Update job-level output stats with the aggregated internal_stats_  // Please note that input stats will be updated by primary host when all  // subcompactions are finished  UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_);  // and set fields that are not propagated as part of the update  compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();  compaction_result_->stats.is_full_compaction = c->is_full_compaction();  compaction_result_->stats.is_remote_compaction = true;  // 3. Update IO Stats that are not part of the the update above  // (bytes_read, bytes_written)  RecordCompactionIOStats();  // Build Output  compaction_result_->internal_stats = internal_stats_;  compaction_result_->output_level = compact_->compaction->output_level();  compaction_result_->output_path = output_path_;  if (status.ok()) {    for (const auto& output_file : sub_compact->GetOutputs()) {      auto& meta = output_file.meta;      compaction_result_->output_files.emplace_back(          MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(),          meta.fd.smallest_seqno, meta.fd.largest_seqno,          meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(),          meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number,          meta.file_checksum, meta.file_checksum_func_name,          output_file.validator.GetHash(), meta.marked_for_compaction,          meta.unique_id, *output_file.table_properties,          output_file.is_proximal_level, meta.temperature);    }  }  TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",                           &compaction_result_);  return status;}void CompactionServiceCompactionJob::CleanupCompaction() {  CompactionJob::CleanupCompaction();}// Internal binary format for the input and result dataenum BinaryFormatVersion : uint32_t {  kOptionsString = 1,  // Use string format similar to Option string format};static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {    {"name",     {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"options",     {offsetof(struct ColumnFamilyDescriptor, options),      OptionType::kConfigurable, OptionVerificationType::kNormal,      OptionTypeFlags::kNone,      [](const ConfigOptions& opts, const std::string& /*name*/,         const std::string& value, void* addr) {        auto cf_options = static_cast<ColumnFamilyOptions*>(addr);        return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),                                                value, cf_options);      },      [](const ConfigOptions& opts, const std::string& /*name*/,         const void* addr, std::string* value) {        const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);        std::string result;        auto status =            GetStringFromColumnFamilyOptions(opts, *cf_options, &result);        *value = "{" + result + "}";        return status;      },      [](const ConfigOptions& opts, const std::string& name, const void* addr1,         const void* addr2, std::string* mismatch) {        const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);        const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);        auto this_conf = CFOptionsAsConfigurable(*this_one);        auto that_conf = CFOptionsAsConfigurable(*that_one);        std::string mismatch_opt;        bool result =            this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);        if (!result) {          *mismatch = name + "." + mismatch_opt;        }        return result;      }}},};static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {    {"cf_name",     {offsetof(struct CompactionServiceInput, cf_name),      OptionType::kEncodedString}},    {"snapshots", OptionTypeInfo::Vector<uint64_t>(                      offsetof(struct CompactionServiceInput, snapshots),                      OptionVerificationType::kNormal, OptionTypeFlags::kNone,                      {0, OptionType::kUInt64T})},    {"input_files", OptionTypeInfo::Vector<std::string>(                        offsetof(struct CompactionServiceInput, input_files),                        OptionVerificationType::kNormal, OptionTypeFlags::kNone,                        {0, OptionType::kEncodedString})},    {"output_level",     {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"db_id",     {offsetof(struct CompactionServiceInput, db_id),      OptionType::kEncodedString}},    {"has_begin",     {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"begin",     {offsetof(struct CompactionServiceInput, begin),      OptionType::kEncodedString, OptionVerificationType::kNormal,      OptionTypeFlags::kNone}},    {"has_end",     {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"end",     {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"options_file_number",     {offsetof(struct CompactionServiceInput, options_file_number),      OptionType::kUInt64T, OptionVerificationType::kNormal,      OptionTypeFlags::kNone}},};static std::unordered_map<std::string, OptionTypeInfo>    cs_output_file_type_info = {        {"file_name",         {offsetof(struct CompactionServiceOutputFile, file_name),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_size",         {offsetof(struct CompactionServiceOutputFile, file_size),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"smallest_seqno",         {offsetof(struct CompactionServiceOutputFile, smallest_seqno),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"largest_seqno",         {offsetof(struct CompactionServiceOutputFile, largest_seqno),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"smallest_internal_key",         {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"largest_internal_key",         {offsetof(struct CompactionServiceOutputFile, largest_internal_key),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"oldest_ancester_time",         {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_creation_time",         {offsetof(struct CompactionServiceOutputFile, file_creation_time),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"epoch_number",         {offsetof(struct CompactionServiceOutputFile, epoch_number),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_checksum",         {offsetof(struct CompactionServiceOutputFile, file_checksum),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_checksum_func_name",         {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"paranoid_hash",         {offsetof(struct CompactionServiceOutputFile, paranoid_hash),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"marked_for_compaction",         {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"unique_id",         OptionTypeInfo::Array<uint64_t, 2>(             offsetof(struct CompactionServiceOutputFile, unique_id),             OptionVerificationType::kNormal, OptionTypeFlags::kNone,             {0, OptionType::kUInt64T})},        {"table_properties",         {offsetof(struct CompactionServiceOutputFile, table_properties),          OptionType::kStruct, OptionVerificationType::kNormal,          OptionTypeFlags::kNone,          [](const ConfigOptions& opts, const std::string& /*name*/,             const std::string& value, void* addr) {            auto table_properties = static_cast<TableProperties*>(addr);            return TableProperties::Parse(opts, value, table_properties);          },          [](const ConfigOptions& opts, const std::string& /*name*/,             const void* addr, std::string* value) {            const auto table_properties =                static_cast<const TableProperties*>(addr);            std::string result;            auto status = table_properties->Serialize(opts, &result);            *value = "{" + result + "}";            return status;          },          [](const ConfigOptions& opts, const std::string& /*name*/,             const void* addr1, const void* addr2, std::string* mismatch) {            const auto this_one = static_cast<const TableProperties*>(addr1);            const auto that_one = static_cast<const TableProperties*>(addr2);            return this_one->AreEqual(opts, that_one, mismatch);          }}},        {"is_proximal_level_output",         {offsetof(struct CompactionServiceOutputFile,                   is_proximal_level_output),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_temperature",         {offsetof(struct CompactionServiceOutputFile, file_temperature),          OptionType::kTemperature, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}}};static std::unordered_map<std::string, OptionTypeInfo>    compaction_job_stats_type_info = {        {"elapsed_micros",         {offsetof(struct CompactionJobStats, elapsed_micros),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"cpu_micros",         {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,          OptionVerificationType::kNormal, OptionTypeFlags::kNone}},        {"num_input_records",         {offsetof(struct CompactionJobStats, num_input_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_blobs_read",         {offsetof(struct CompactionJobStats, num_blobs_read),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_files",         {offsetof(struct CompactionJobStats, num_input_files),          OptionType::kSizeT, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_files_at_output_level",         {offsetof(struct CompactionJobStats, num_input_files_at_output_level),          OptionType::kSizeT, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_records",         {offsetof(struct CompactionJobStats, num_output_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_files",         {offsetof(struct CompactionJobStats, num_output_files),          OptionType::kSizeT, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_files_blob",         {offsetof(struct CompactionJobStats, num_output_files_blob),          OptionType::kSizeT, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"is_full_compaction",         {offsetof(struct CompactionJobStats, is_full_compaction),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"is_manual_compaction",         {offsetof(struct CompactionJobStats, is_manual_compaction),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"is_remote_compaction",         {offsetof(struct CompactionJobStats, is_remote_compaction),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_input_bytes",         {offsetof(struct CompactionJobStats, total_input_bytes),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_blob_bytes_read",         {offsetof(struct CompactionJobStats, total_blob_bytes_read),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_output_bytes",         {offsetof(struct CompactionJobStats, total_output_bytes),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_output_bytes_blob",         {offsetof(struct CompactionJobStats, total_output_bytes_blob),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_records_replaced",         {offsetof(struct CompactionJobStats, num_records_replaced),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_input_raw_key_bytes",         {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"total_input_raw_value_bytes",         {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_deletion_records",         {offsetof(struct CompactionJobStats, num_input_deletion_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_expired_deletion_records",         {offsetof(struct CompactionJobStats, num_expired_deletion_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_corrupt_keys",         {offsetof(struct CompactionJobStats, num_corrupt_keys),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_write_nanos",         {offsetof(struct CompactionJobStats, file_write_nanos),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_range_sync_nanos",         {offsetof(struct CompactionJobStats, file_range_sync_nanos),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_fsync_nanos",         {offsetof(struct CompactionJobStats, file_fsync_nanos),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"file_prepare_write_nanos",         {offsetof(struct CompactionJobStats, file_prepare_write_nanos),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"smallest_output_key_prefix",         {offsetof(struct CompactionJobStats, smallest_output_key_prefix),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"largest_output_key_prefix",         {offsetof(struct CompactionJobStats, largest_output_key_prefix),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_single_del_fallthru",         {offsetof(struct CompactionJobStats, num_single_del_fallthru),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_single_del_mismatch",         {offsetof(struct CompactionJobStats, num_single_del_mismatch),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},};static std::unordered_map<std::string, OptionTypeInfo>    compaction_stats_type_info = {        {"micros",         {offsetof(struct InternalStats::CompactionStats, micros),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"cpu_micros",         {offsetof(struct InternalStats::CompactionStats, cpu_micros),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_read_non_output_levels",         {offsetof(struct InternalStats::CompactionStats,                   bytes_read_non_output_levels),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_read_output_level",         {offsetof(struct InternalStats::CompactionStats,                   bytes_read_output_level),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_skipped_non_output_levels",         {offsetof(struct InternalStats::CompactionStats,                   bytes_skipped_non_output_levels),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_skipped_output_level",         {offsetof(struct InternalStats::CompactionStats,                   bytes_skipped_output_level),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_read_blob",         {offsetof(struct InternalStats::CompactionStats, bytes_read_blob),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_written",         {offsetof(struct InternalStats::CompactionStats, bytes_written),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_written_blob",         {offsetof(struct InternalStats::CompactionStats, bytes_written_blob),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"bytes_moved",         {offsetof(struct InternalStats::CompactionStats, bytes_moved),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_files_in_non_output_levels",         {offsetof(struct InternalStats::CompactionStats,                   num_input_files_in_non_output_levels),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_files_in_output_level",         {offsetof(struct InternalStats::CompactionStats,                   num_input_files_in_output_level),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_filtered_input_files_in_non_output_levels",         {offsetof(struct InternalStats::CompactionStats,                   num_filtered_input_files_in_non_output_levels),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_filtered_input_files_in_output_level",         {offsetof(struct InternalStats::CompactionStats,                   num_filtered_input_files_in_output_level),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_files",         {offsetof(struct InternalStats::CompactionStats, num_output_files),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_files_blob",         {offsetof(struct InternalStats::CompactionStats,                   num_output_files_blob),          OptionType::kInt, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_input_records",         {offsetof(struct InternalStats::CompactionStats, num_input_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_dropped_records",         {offsetof(struct InternalStats::CompactionStats, num_dropped_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"num_output_records",         {offsetof(struct InternalStats::CompactionStats, num_output_records),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"count",         {offsetof(struct InternalStats::CompactionStats, count),          OptionType::kUInt64T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"counts", OptionTypeInfo::Array<                       int, static_cast<int>(CompactionReason::kNumOfReasons)>(                       offsetof(struct InternalStats::CompactionStats, counts),                       OptionVerificationType::kNormal, OptionTypeFlags::kNone,                       {0, OptionType::kInt})},};static std::unordered_map<std::string, OptionTypeInfo>    compaction_internal_stats_type_info = {        {"output_level_stats",         OptionTypeInfo::Struct(             "output_level_stats", &compaction_stats_type_info,             offsetof(struct InternalStats::CompactionStatsFull,                      output_level_stats),             OptionVerificationType::kNormal, OptionTypeFlags::kNone)},        {"has_proximal_level_output",         {offsetof(struct InternalStats::CompactionStatsFull,                   has_proximal_level_output),          OptionType::kBoolean, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"proximal_level_stats",         OptionTypeInfo::Struct(             "proximal_level_stats", &compaction_stats_type_info,             offsetof(struct InternalStats::CompactionStatsFull,                      proximal_level_stats),             OptionVerificationType::kNormal, OptionTypeFlags::kNone)},};namespace {// this is a helper struct to serialize and deserialize class Status, because// Status's members are not public.struct StatusSerializationAdapter {  uint8_t code;  uint8_t subcode;  uint8_t severity;  std::string message;  StatusSerializationAdapter() = default;  explicit StatusSerializationAdapter(const Status& s) {    code = s.code();    subcode = s.subcode();    severity = s.severity();    auto msg = s.getState();    message = msg ? msg : "";  }  Status GetStatus() const {    return Status{static_cast<Status::Code>(code),                  static_cast<Status::SubCode>(subcode),                  static_cast<Status::Severity>(severity), message};  }};}  // namespacestatic std::unordered_map<std::string, OptionTypeInfo>    status_adapter_type_info = {        {"code",         {offsetof(struct StatusSerializationAdapter, code),          OptionType::kUInt8T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"subcode",         {offsetof(struct StatusSerializationAdapter, subcode),          OptionType::kUInt8T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"severity",         {offsetof(struct StatusSerializationAdapter, severity),          OptionType::kUInt8T, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},        {"message",         {offsetof(struct StatusSerializationAdapter, message),          OptionType::kEncodedString, OptionVerificationType::kNormal,          OptionTypeFlags::kNone}},};static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {    {"status",     {offsetof(struct CompactionServiceResult, status),      OptionType::kCustomizable, OptionVerificationType::kNormal,      OptionTypeFlags::kNone,      [](const ConfigOptions& opts, const std::string& /*name*/,         const std::string& value, void* addr) {        auto status_obj = static_cast<Status*>(addr);        StatusSerializationAdapter adapter;        Status s = OptionTypeInfo::ParseType(            opts, value, status_adapter_type_info, &adapter);        *status_obj = adapter.GetStatus();        return s;      },      [](const ConfigOptions& opts, const std::string& /*name*/,         const void* addr, std::string* value) {        const auto status_obj = static_cast<const Status*>(addr);        StatusSerializationAdapter adapter(*status_obj);        std::string result;        Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,                                                 &adapter, &result);        *value = "{" + result + "}";        return s;      },      [](const ConfigOptions& opts, const std::string& /*name*/,         const void* addr1, const void* addr2, std::string* mismatch) {        const auto status1 = static_cast<const Status*>(addr1);        const auto status2 = static_cast<const Status*>(addr2);        StatusSerializationAdapter adatper1(*status1);        StatusSerializationAdapter adapter2(*status2);        return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,                                             &adatper1, &adapter2, mismatch);      }}},    {"output_files",     OptionTypeInfo::Vector<CompactionServiceOutputFile>(         offsetof(struct CompactionServiceResult, output_files),         OptionVerificationType::kNormal, OptionTypeFlags::kNone,         OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,                                OptionVerificationType::kNormal,                                OptionTypeFlags::kNone))},    {"output_level",     {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},    {"output_path",     {offsetof(struct CompactionServiceResult, output_path),      OptionType::kEncodedString, OptionVerificationType::kNormal,      OptionTypeFlags::kNone}},    {"bytes_read",     {offsetof(struct CompactionServiceResult, bytes_read),      OptionType::kUInt64T, OptionVerificationType::kNormal,      OptionTypeFlags::kNone}},    {"bytes_written",     {offsetof(struct CompactionServiceResult, bytes_written),      OptionType::kUInt64T, OptionVerificationType::kNormal,      OptionTypeFlags::kNone}},    {"stats", OptionTypeInfo::Struct(                  "stats", &compaction_job_stats_type_info,                  offsetof(struct CompactionServiceResult, stats),                  OptionVerificationType::kNormal, OptionTypeFlags::kNone)},    {"internal_stats",     OptionTypeInfo::Struct(         "internal_stats", &compaction_internal_stats_type_info,         offsetof(struct CompactionServiceResult, internal_stats),         OptionVerificationType::kNormal, OptionTypeFlags::kNone)},};Status CompactionServiceInput::Read(const std::string& data_str,                                    CompactionServiceInput* obj) {  if (data_str.size() <= sizeof(BinaryFormatVersion)) {    return Status::InvalidArgument("Invalid CompactionServiceInput string");  }  auto format_version = DecodeFixed32(data_str.data());  if (format_version == kOptionsString) {    ConfigOptions cf;    cf.invoke_prepare_options = false;    cf.ignore_unknown_options = true;    return OptionTypeInfo::ParseType(        cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,        obj);  } else {    return Status::NotSupported(        "Compaction Service Input data version not supported: " +        std::to_string(format_version));  }}Status CompactionServiceInput::Write(std::string* output) {  char buf[sizeof(BinaryFormatVersion)];  EncodeFixed32(buf, kOptionsString);  output->append(buf, sizeof(BinaryFormatVersion));  ConfigOptions cf;  cf.invoke_prepare_options = false;  return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);}Status CompactionServiceResult::Read(const std::string& data_str,                                     CompactionServiceResult* obj) {  if (data_str.size() <= sizeof(BinaryFormatVersion)) {    return Status::InvalidArgument("Invalid CompactionServiceResult string");  }  auto format_version = DecodeFixed32(data_str.data());  if (format_version == kOptionsString) {    ConfigOptions cf;    cf.invoke_prepare_options = false;    cf.ignore_unknown_options = true;    return OptionTypeInfo::ParseType(        cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,        obj);  } else {    return Status::NotSupported(        "Compaction Service Result data version not supported: " +        std::to_string(format_version));  }}Status CompactionServiceResult::Write(std::string* output) {  char buf[sizeof(BinaryFormatVersion)];  EncodeFixed32(buf, kOptionsString);  output->append(buf, sizeof(BinaryFormatVersion));  ConfigOptions cf;  cf.invoke_prepare_options = false;  return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);}#ifndef NDEBUGbool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {  std::string mismatch;  return TEST_Equals(other, &mismatch);}bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,                                          std::string* mismatch) {  ConfigOptions cf;  cf.invoke_prepare_options = false;  return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,                                       mismatch);}bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {  std::string mismatch;  return TEST_Equals(other, &mismatch);}bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,                                         std::string* mismatch) {  ConfigOptions cf;  cf.invoke_prepare_options = false;  return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,                                       mismatch);}#endif  // NDEBUG}  // namespace ROCKSDB_NAMESPACE
 |