compaction_service_job.cc 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. //
  7. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  8. // Use of this source code is governed by a BSD-style license that can be
  9. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  10. #include "db/compaction/compaction_job.h"
  11. #include "db/compaction/compaction_state.h"
  12. #include "logging/logging.h"
  13. #include "monitoring/iostats_context_imp.h"
  14. #include "monitoring/thread_status_util.h"
  15. #include "options/options_helper.h"
  16. #include "rocksdb/utilities/options_type.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class SubcompactionState;
  19. CompactionServiceJobStatus
  20. CompactionJob::ProcessKeyValueCompactionWithCompactionService(
  21. SubcompactionState* sub_compact) {
  22. assert(sub_compact);
  23. assert(sub_compact->compaction);
  24. assert(db_options_.compaction_service);
  25. const Compaction* compaction = sub_compact->compaction;
  26. CompactionServiceInput compaction_input;
  27. compaction_input.output_level = compaction->output_level();
  28. compaction_input.db_id = db_id_;
  29. const std::vector<CompactionInputFiles>& inputs =
  30. *(compact_->compaction->inputs());
  31. for (const auto& files_per_level : inputs) {
  32. for (const auto& file : files_per_level.files) {
  33. compaction_input.input_files.emplace_back(
  34. MakeTableFileName(file->fd.GetNumber()));
  35. }
  36. }
  37. compaction_input.cf_name = compaction->column_family_data()->GetName();
  38. compaction_input.snapshots = job_context_->snapshot_seqs;
  39. compaction_input.has_begin = sub_compact->start.has_value();
  40. compaction_input.begin =
  41. compaction_input.has_begin ? sub_compact->start->ToString() : "";
  42. compaction_input.has_end = sub_compact->end.has_value();
  43. compaction_input.end =
  44. compaction_input.has_end ? sub_compact->end->ToString() : "";
  45. compaction_input.options_file_number = options_file_number_;
  46. TEST_SYNC_POINT_CALLBACK(
  47. "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
  48. &compaction_input);
  49. std::string compaction_input_binary;
  50. Status s = compaction_input.Write(&compaction_input_binary);
  51. if (!s.ok()) {
  52. sub_compact->status = s;
  53. return CompactionServiceJobStatus::kFailure;
  54. }
  55. std::ostringstream input_files_oss;
  56. bool is_first_one = true;
  57. for (const auto& file : compaction_input.input_files) {
  58. input_files_oss << (is_first_one ? "" : ", ") << file;
  59. is_first_one = false;
  60. }
  61. ROCKS_LOG_INFO(
  62. db_options_.info_log,
  63. "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
  64. compaction->column_family_data()->GetName().c_str(), job_id_,
  65. compaction_input.output_level, input_files_oss.str().c_str());
  66. CompactionServiceJobInfo info(
  67. dbname_, db_id_, db_session_id_,
  68. compaction->column_family_data()->GetID(),
  69. compaction->column_family_data()->GetName(), GetCompactionId(sub_compact),
  70. thread_pri_, compaction->compaction_reason(),
  71. compaction->is_full_compaction(), compaction->is_manual_compaction(),
  72. compaction->bottommost_level(), compaction->start_level(),
  73. compaction->output_level());
  74. CompactionServiceScheduleResponse response =
  75. db_options_.compaction_service->Schedule(info, compaction_input_binary);
  76. switch (response.status) {
  77. case CompactionServiceJobStatus::kSuccess:
  78. break;
  79. case CompactionServiceJobStatus::kAborted:
  80. sub_compact->status =
  81. Status::Aborted("Scheduling a remote compaction job was aborted");
  82. ROCKS_LOG_WARN(
  83. db_options_.info_log,
  84. "[%s] [JOB %d] Remote compaction was aborted at Schedule()",
  85. compaction->column_family_data()->GetName().c_str(), job_id_);
  86. return response.status;
  87. case CompactionServiceJobStatus::kFailure:
  88. sub_compact->status = Status::Incomplete(
  89. "CompactionService failed to schedule a remote compaction job.");
  90. ROCKS_LOG_WARN(db_options_.info_log,
  91. "[%s] [JOB %d] Remote compaction failed to start.",
  92. compaction->column_family_data()->GetName().c_str(),
  93. job_id_);
  94. return response.status;
  95. case CompactionServiceJobStatus::kUseLocal:
  96. ROCKS_LOG_INFO(
  97. db_options_.info_log,
  98. "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
  99. compaction->column_family_data()->GetName().c_str(), job_id_);
  100. return response.status;
  101. default:
  102. assert(false); // unknown status
  103. break;
  104. }
  105. std::string debug_str_before_wait =
  106. compaction->input_version()->DebugString(/*hex=*/true);
  107. ROCKS_LOG_INFO(db_options_.info_log,
  108. "[%s] [JOB %d] Waiting for remote compaction...",
  109. compaction->column_family_data()->GetName().c_str(), job_id_);
  110. std::string compaction_result_binary;
  111. CompactionServiceJobStatus compaction_status =
  112. db_options_.compaction_service->Wait(response.scheduled_job_id,
  113. &compaction_result_binary);
  114. if (compaction_status != CompactionServiceJobStatus::kSuccess) {
  115. ROCKS_LOG_ERROR(
  116. db_options_.info_log,
  117. "[%s] [JOB %d] Wait() status is not kSuccess. "
  118. "\nDebugString Before Wait():\n%s"
  119. "\nDebugString After Wait():\n%s",
  120. compaction->column_family_data()->GetName().c_str(), job_id_,
  121. debug_str_before_wait.c_str(),
  122. compaction->input_version()->DebugString(/*hex=*/true).c_str());
  123. }
  124. if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
  125. ROCKS_LOG_INFO(
  126. db_options_.info_log,
  127. "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
  128. compaction->column_family_data()->GetName().c_str(), job_id_);
  129. return compaction_status;
  130. }
  131. if (compaction_status == CompactionServiceJobStatus::kAborted) {
  132. sub_compact->status =
  133. Status::Aborted("Waiting a remote compaction job was aborted");
  134. ROCKS_LOG_INFO(db_options_.info_log,
  135. "[%s] [JOB %d] Remote compaction was aborted during Wait()",
  136. compaction->column_family_data()->GetName().c_str(),
  137. job_id_);
  138. return compaction_status;
  139. }
  140. CompactionServiceResult compaction_result;
  141. s = CompactionServiceResult::Read(compaction_result_binary,
  142. &compaction_result);
  143. if (compaction_status == CompactionServiceJobStatus::kFailure) {
  144. if (s.ok()) {
  145. if (compaction_result.status.ok()) {
  146. sub_compact->status = Status::Incomplete(
  147. "CompactionService failed to run the compaction job (even though "
  148. "the internal status is okay).");
  149. } else {
  150. // set the current sub compaction status with the status returned from
  151. // remote
  152. sub_compact->status = compaction_result.status;
  153. }
  154. } else {
  155. sub_compact->status = Status::Incomplete(
  156. "CompactionService failed to run the compaction job (and no valid "
  157. "result is returned).");
  158. compaction_result.status.PermitUncheckedError();
  159. }
  160. ROCKS_LOG_WARN(
  161. db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
  162. compaction->column_family_data()->GetName().c_str(), job_id_);
  163. return compaction_status;
  164. }
  165. // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
  166. // read the result. Consider this as an installation failure
  167. if (!s.ok()) {
  168. sub_compact->status = s;
  169. compaction_result.status.PermitUncheckedError();
  170. db_options_.compaction_service->OnInstallation(
  171. response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
  172. return CompactionServiceJobStatus::kFailure;
  173. }
  174. sub_compact->status = compaction_result.status;
  175. std::ostringstream output_files_oss;
  176. is_first_one = true;
  177. for (const auto& file : compaction_result.output_files) {
  178. output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
  179. is_first_one = false;
  180. }
  181. ROCKS_LOG_INFO(
  182. db_options_.info_log,
  183. "[%s] [JOB %d] Received remote compaction result, output path: "
  184. "%s, files: %s",
  185. compaction->column_family_data()->GetName().c_str(), job_id_,
  186. compaction_result.output_path.c_str(), output_files_oss.str().c_str());
  187. // Installation Starts
  188. for (const auto& file : compaction_result.output_files) {
  189. uint64_t file_num = versions_->NewFileNumber();
  190. auto src_file = compaction_result.output_path + "/" + file.file_name;
  191. auto tgt_file = TableFileName(compaction->immutable_options().cf_paths,
  192. file_num, compaction->output_path_id());
  193. s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
  194. if (!s.ok()) {
  195. sub_compact->status = s;
  196. db_options_.compaction_service->OnInstallation(
  197. response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
  198. return CompactionServiceJobStatus::kFailure;
  199. }
  200. FileMetaData meta;
  201. uint64_t file_size = file.file_size;
  202. // TODO - Clean this up in the next release.
  203. // For backward compatibility - in case the remote worker does not populate
  204. // the file_size yet. If missing, continue to populate this from the file
  205. // system.
  206. if (file_size == 0) {
  207. s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
  208. }
  209. if (!s.ok()) {
  210. sub_compact->status = s;
  211. db_options_.compaction_service->OnInstallation(
  212. response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
  213. return CompactionServiceJobStatus::kFailure;
  214. }
  215. assert(file_size > 0);
  216. meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
  217. file.smallest_seqno, file.largest_seqno);
  218. meta.smallest.DecodeFrom(file.smallest_internal_key);
  219. meta.largest.DecodeFrom(file.largest_internal_key);
  220. meta.oldest_ancester_time = file.oldest_ancester_time;
  221. meta.file_creation_time = file.file_creation_time;
  222. meta.epoch_number = file.epoch_number;
  223. meta.file_checksum = file.file_checksum;
  224. meta.file_checksum_func_name = file.file_checksum_func_name;
  225. meta.marked_for_compaction = file.marked_for_compaction;
  226. meta.unique_id = file.unique_id;
  227. meta.temperature = file.file_temperature;
  228. meta.tail_size =
  229. FileMetaData::CalculateTailSize(file_size, file.table_properties);
  230. auto cfd = compaction->column_family_data();
  231. CompactionOutputs* compaction_outputs =
  232. sub_compact->Outputs(file.is_proximal_level_output);
  233. assert(compaction_outputs);
  234. compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),
  235. false, true, file.paranoid_hash);
  236. compaction_outputs->UpdateTableProperties(file.table_properties);
  237. }
  238. // Set per-level stats
  239. auto compaction_output_stats =
  240. sub_compact->OutputStats(false /* is_proximal_level */);
  241. assert(compaction_output_stats);
  242. compaction_output_stats->Add(
  243. compaction_result.internal_stats.output_level_stats);
  244. if (compaction->SupportsPerKeyPlacement()) {
  245. compaction_output_stats =
  246. sub_compact->OutputStats(true /* is_proximal_level */);
  247. assert(compaction_output_stats);
  248. compaction_output_stats->Add(
  249. compaction_result.internal_stats.proximal_level_stats);
  250. }
  251. // Set job stats
  252. sub_compact->compaction_job_stats = compaction_result.stats;
  253. RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
  254. RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
  255. compaction_result.bytes_written);
  256. db_options_.compaction_service->OnInstallation(
  257. response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
  258. return CompactionServiceJobStatus::kSuccess;
  259. }
  260. std::string CompactionServiceCompactionJob::GetTableFileName(
  261. uint64_t file_number) {
  262. return MakeTableFileName(output_path_, file_number);
  263. }
  264. void CompactionServiceCompactionJob::RecordCompactionIOStats() {
  265. compaction_result_->bytes_read += IOSTATS(bytes_read);
  266. compaction_result_->bytes_written += IOSTATS(bytes_written);
  267. CompactionJob::RecordCompactionIOStats();
  268. }
  269. CompactionServiceCompactionJob::CompactionServiceCompactionJob(
  270. int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
  271. const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
  272. VersionSet* versions, const std::atomic<bool>* shutting_down,
  273. LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
  274. InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
  275. JobContext* job_context, std::shared_ptr<Cache> table_cache,
  276. EventLogger* event_logger, const std::string& dbname,
  277. const std::shared_ptr<IOTracer>& io_tracer,
  278. const std::atomic<bool>& manual_compaction_canceled,
  279. const std::string& db_id, const std::string& db_session_id,
  280. std::string output_path,
  281. const CompactionServiceInput& compaction_service_input,
  282. CompactionServiceResult* compaction_service_result)
  283. : CompactionJob(job_id, compaction, db_options, mutable_db_options,
  284. file_options, versions, shutting_down, log_buffer, nullptr,
  285. output_directory, nullptr, stats, db_mutex,
  286. db_error_handler, job_context, std::move(table_cache),
  287. event_logger,
  288. compaction->mutable_cf_options().paranoid_file_checks,
  289. compaction->mutable_cf_options().report_bg_io_stats, dbname,
  290. &(compaction_service_result->stats), Env::Priority::USER,
  291. io_tracer, manual_compaction_canceled, db_id, db_session_id,
  292. compaction->column_family_data()->GetFullHistoryTsLow()),
  293. output_path_(std::move(output_path)),
  294. compaction_input_(compaction_service_input),
  295. compaction_result_(compaction_service_result) {}
  296. void CompactionServiceCompactionJob::Prepare(
  297. const CompactionProgress& compaction_progress,
  298. log::Writer* compaction_progress_writer) {
  299. std::optional<Slice> begin;
  300. if (compaction_input_.has_begin) {
  301. begin = compaction_input_.begin;
  302. }
  303. std::optional<Slice> end;
  304. if (compaction_input_.has_end) {
  305. end = compaction_input_.end;
  306. }
  307. CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress,
  308. compaction_progress_writer);
  309. }
  310. Status CompactionServiceCompactionJob::Run() {
  311. AutoThreadOperationStageUpdater stage_updater(
  312. ThreadStatus::STAGE_COMPACTION_RUN);
  313. auto* c = compact_->compaction;
  314. log_buffer_->FlushBufferToLog();
  315. LogCompaction();
  316. compaction_result_->stats.Reset();
  317. const uint64_t start_micros = db_options_.clock->NowMicros();
  318. c->GetOrInitInputTableProperties();
  319. // Pick the only sub-compaction we should have
  320. assert(compact_->sub_compact_states.size() == 1);
  321. SubcompactionState* sub_compact = compact_->sub_compact_states.data();
  322. ProcessKeyValueCompaction(sub_compact);
  323. uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;
  324. internal_stats_.SetMicros(elapsed_micros);
  325. internal_stats_.AddCpuMicros(elapsed_micros);
  326. RecordTimeToHistogram(stats_, COMPACTION_TIME,
  327. internal_stats_.output_level_stats.micros);
  328. RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
  329. internal_stats_.output_level_stats.cpu_micros);
  330. Status status = sub_compact->status;
  331. IOStatus io_s = sub_compact->io_status;
  332. if (io_status_.ok()) {
  333. io_status_ = io_s;
  334. }
  335. if (status.ok()) {
  336. constexpr IODebugContext* dbg = nullptr;
  337. if (output_directory_) {
  338. io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
  339. DirFsyncOptions());
  340. }
  341. }
  342. if (io_status_.ok()) {
  343. io_status_ = io_s;
  344. }
  345. if (status.ok()) {
  346. status = io_s;
  347. }
  348. LogFlush(db_options_.info_log);
  349. compact_->status = status;
  350. compact_->status.PermitUncheckedError();
  351. // Build Compaction Job Stats
  352. // 1. Aggregate internal stats and job stats for all subcompactions
  353. // internal stats: sub_compact.proximal_level_outputs_.stats and
  354. // sub_compact.compaction_outputs_.stats into
  355. // internal_stats_.output_level_stats and
  356. // internal_stats_.proximal_level_stats
  357. // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_
  358. //
  359. // For remote compaction, there's only one subcompaction.
  360. compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
  361. // 2. Update job-level output stats with the aggregated internal_stats_
  362. // Please note that input stats will be updated by primary host when all
  363. // subcompactions are finished
  364. UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_);
  365. // and set fields that are not propagated as part of the update
  366. compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
  367. compaction_result_->stats.is_full_compaction = c->is_full_compaction();
  368. compaction_result_->stats.is_remote_compaction = true;
  369. // 3. Update IO Stats that are not part of the the update above
  370. // (bytes_read, bytes_written)
  371. RecordCompactionIOStats();
  372. // Build Output
  373. compaction_result_->internal_stats = internal_stats_;
  374. compaction_result_->output_level = compact_->compaction->output_level();
  375. compaction_result_->output_path = output_path_;
  376. if (status.ok()) {
  377. for (const auto& output_file : sub_compact->GetOutputs()) {
  378. auto& meta = output_file.meta;
  379. compaction_result_->output_files.emplace_back(
  380. MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(),
  381. meta.fd.smallest_seqno, meta.fd.largest_seqno,
  382. meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(),
  383. meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number,
  384. meta.file_checksum, meta.file_checksum_func_name,
  385. output_file.validator.GetHash(), meta.marked_for_compaction,
  386. meta.unique_id, *output_file.table_properties,
  387. output_file.is_proximal_level, meta.temperature);
  388. }
  389. }
  390. TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
  391. &compaction_result_);
  392. return status;
  393. }
  394. void CompactionServiceCompactionJob::CleanupCompaction() {
  395. CompactionJob::CleanupCompaction();
  396. }
  397. // Internal binary format for the input and result data
  398. enum BinaryFormatVersion : uint32_t {
  399. kOptionsString = 1, // Use string format similar to Option string format
  400. };
  401. static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
  402. {"name",
  403. {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
  404. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  405. {"options",
  406. {offsetof(struct ColumnFamilyDescriptor, options),
  407. OptionType::kConfigurable, OptionVerificationType::kNormal,
  408. OptionTypeFlags::kNone,
  409. [](const ConfigOptions& opts, const std::string& /*name*/,
  410. const std::string& value, void* addr) {
  411. auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
  412. return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
  413. value, cf_options);
  414. },
  415. [](const ConfigOptions& opts, const std::string& /*name*/,
  416. const void* addr, std::string* value) {
  417. const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
  418. std::string result;
  419. auto status =
  420. GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
  421. *value = "{" + result + "}";
  422. return status;
  423. },
  424. [](const ConfigOptions& opts, const std::string& name, const void* addr1,
  425. const void* addr2, std::string* mismatch) {
  426. const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
  427. const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
  428. auto this_conf = CFOptionsAsConfigurable(*this_one);
  429. auto that_conf = CFOptionsAsConfigurable(*that_one);
  430. std::string mismatch_opt;
  431. bool result =
  432. this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
  433. if (!result) {
  434. *mismatch = name + "." + mismatch_opt;
  435. }
  436. return result;
  437. }}},
  438. };
  439. static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
  440. {"cf_name",
  441. {offsetof(struct CompactionServiceInput, cf_name),
  442. OptionType::kEncodedString}},
  443. {"snapshots", OptionTypeInfo::Vector<uint64_t>(
  444. offsetof(struct CompactionServiceInput, snapshots),
  445. OptionVerificationType::kNormal, OptionTypeFlags::kNone,
  446. {0, OptionType::kUInt64T})},
  447. {"input_files", OptionTypeInfo::Vector<std::string>(
  448. offsetof(struct CompactionServiceInput, input_files),
  449. OptionVerificationType::kNormal, OptionTypeFlags::kNone,
  450. {0, OptionType::kEncodedString})},
  451. {"output_level",
  452. {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
  453. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  454. {"db_id",
  455. {offsetof(struct CompactionServiceInput, db_id),
  456. OptionType::kEncodedString}},
  457. {"has_begin",
  458. {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
  459. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  460. {"begin",
  461. {offsetof(struct CompactionServiceInput, begin),
  462. OptionType::kEncodedString, OptionVerificationType::kNormal,
  463. OptionTypeFlags::kNone}},
  464. {"has_end",
  465. {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
  466. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  467. {"end",
  468. {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
  469. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  470. {"options_file_number",
  471. {offsetof(struct CompactionServiceInput, options_file_number),
  472. OptionType::kUInt64T, OptionVerificationType::kNormal,
  473. OptionTypeFlags::kNone}},
  474. };
  475. static std::unordered_map<std::string, OptionTypeInfo>
  476. cs_output_file_type_info = {
  477. {"file_name",
  478. {offsetof(struct CompactionServiceOutputFile, file_name),
  479. OptionType::kEncodedString, OptionVerificationType::kNormal,
  480. OptionTypeFlags::kNone}},
  481. {"file_size",
  482. {offsetof(struct CompactionServiceOutputFile, file_size),
  483. OptionType::kUInt64T, OptionVerificationType::kNormal,
  484. OptionTypeFlags::kNone}},
  485. {"smallest_seqno",
  486. {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
  487. OptionType::kUInt64T, OptionVerificationType::kNormal,
  488. OptionTypeFlags::kNone}},
  489. {"largest_seqno",
  490. {offsetof(struct CompactionServiceOutputFile, largest_seqno),
  491. OptionType::kUInt64T, OptionVerificationType::kNormal,
  492. OptionTypeFlags::kNone}},
  493. {"smallest_internal_key",
  494. {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
  495. OptionType::kEncodedString, OptionVerificationType::kNormal,
  496. OptionTypeFlags::kNone}},
  497. {"largest_internal_key",
  498. {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
  499. OptionType::kEncodedString, OptionVerificationType::kNormal,
  500. OptionTypeFlags::kNone}},
  501. {"oldest_ancester_time",
  502. {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
  503. OptionType::kUInt64T, OptionVerificationType::kNormal,
  504. OptionTypeFlags::kNone}},
  505. {"file_creation_time",
  506. {offsetof(struct CompactionServiceOutputFile, file_creation_time),
  507. OptionType::kUInt64T, OptionVerificationType::kNormal,
  508. OptionTypeFlags::kNone}},
  509. {"epoch_number",
  510. {offsetof(struct CompactionServiceOutputFile, epoch_number),
  511. OptionType::kUInt64T, OptionVerificationType::kNormal,
  512. OptionTypeFlags::kNone}},
  513. {"file_checksum",
  514. {offsetof(struct CompactionServiceOutputFile, file_checksum),
  515. OptionType::kEncodedString, OptionVerificationType::kNormal,
  516. OptionTypeFlags::kNone}},
  517. {"file_checksum_func_name",
  518. {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
  519. OptionType::kEncodedString, OptionVerificationType::kNormal,
  520. OptionTypeFlags::kNone}},
  521. {"paranoid_hash",
  522. {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
  523. OptionType::kUInt64T, OptionVerificationType::kNormal,
  524. OptionTypeFlags::kNone}},
  525. {"marked_for_compaction",
  526. {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
  527. OptionType::kBoolean, OptionVerificationType::kNormal,
  528. OptionTypeFlags::kNone}},
  529. {"unique_id",
  530. OptionTypeInfo::Array<uint64_t, 2>(
  531. offsetof(struct CompactionServiceOutputFile, unique_id),
  532. OptionVerificationType::kNormal, OptionTypeFlags::kNone,
  533. {0, OptionType::kUInt64T})},
  534. {"table_properties",
  535. {offsetof(struct CompactionServiceOutputFile, table_properties),
  536. OptionType::kStruct, OptionVerificationType::kNormal,
  537. OptionTypeFlags::kNone,
  538. [](const ConfigOptions& opts, const std::string& /*name*/,
  539. const std::string& value, void* addr) {
  540. auto table_properties = static_cast<TableProperties*>(addr);
  541. return TableProperties::Parse(opts, value, table_properties);
  542. },
  543. [](const ConfigOptions& opts, const std::string& /*name*/,
  544. const void* addr, std::string* value) {
  545. const auto table_properties =
  546. static_cast<const TableProperties*>(addr);
  547. std::string result;
  548. auto status = table_properties->Serialize(opts, &result);
  549. *value = "{" + result + "}";
  550. return status;
  551. },
  552. [](const ConfigOptions& opts, const std::string& /*name*/,
  553. const void* addr1, const void* addr2, std::string* mismatch) {
  554. const auto this_one = static_cast<const TableProperties*>(addr1);
  555. const auto that_one = static_cast<const TableProperties*>(addr2);
  556. return this_one->AreEqual(opts, that_one, mismatch);
  557. }}},
  558. {"is_proximal_level_output",
  559. {offsetof(struct CompactionServiceOutputFile,
  560. is_proximal_level_output),
  561. OptionType::kBoolean, OptionVerificationType::kNormal,
  562. OptionTypeFlags::kNone}},
  563. {"file_temperature",
  564. {offsetof(struct CompactionServiceOutputFile, file_temperature),
  565. OptionType::kTemperature, OptionVerificationType::kNormal,
  566. OptionTypeFlags::kNone}}};
  567. static std::unordered_map<std::string, OptionTypeInfo>
  568. compaction_job_stats_type_info = {
  569. {"elapsed_micros",
  570. {offsetof(struct CompactionJobStats, elapsed_micros),
  571. OptionType::kUInt64T, OptionVerificationType::kNormal,
  572. OptionTypeFlags::kNone}},
  573. {"cpu_micros",
  574. {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
  575. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  576. {"num_input_records",
  577. {offsetof(struct CompactionJobStats, num_input_records),
  578. OptionType::kUInt64T, OptionVerificationType::kNormal,
  579. OptionTypeFlags::kNone}},
  580. {"num_blobs_read",
  581. {offsetof(struct CompactionJobStats, num_blobs_read),
  582. OptionType::kUInt64T, OptionVerificationType::kNormal,
  583. OptionTypeFlags::kNone}},
  584. {"num_input_files",
  585. {offsetof(struct CompactionJobStats, num_input_files),
  586. OptionType::kSizeT, OptionVerificationType::kNormal,
  587. OptionTypeFlags::kNone}},
  588. {"num_input_files_at_output_level",
  589. {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
  590. OptionType::kSizeT, OptionVerificationType::kNormal,
  591. OptionTypeFlags::kNone}},
  592. {"num_output_records",
  593. {offsetof(struct CompactionJobStats, num_output_records),
  594. OptionType::kUInt64T, OptionVerificationType::kNormal,
  595. OptionTypeFlags::kNone}},
  596. {"num_output_files",
  597. {offsetof(struct CompactionJobStats, num_output_files),
  598. OptionType::kSizeT, OptionVerificationType::kNormal,
  599. OptionTypeFlags::kNone}},
  600. {"num_output_files_blob",
  601. {offsetof(struct CompactionJobStats, num_output_files_blob),
  602. OptionType::kSizeT, OptionVerificationType::kNormal,
  603. OptionTypeFlags::kNone}},
  604. {"is_full_compaction",
  605. {offsetof(struct CompactionJobStats, is_full_compaction),
  606. OptionType::kBoolean, OptionVerificationType::kNormal,
  607. OptionTypeFlags::kNone}},
  608. {"is_manual_compaction",
  609. {offsetof(struct CompactionJobStats, is_manual_compaction),
  610. OptionType::kBoolean, OptionVerificationType::kNormal,
  611. OptionTypeFlags::kNone}},
  612. {"is_remote_compaction",
  613. {offsetof(struct CompactionJobStats, is_remote_compaction),
  614. OptionType::kBoolean, OptionVerificationType::kNormal,
  615. OptionTypeFlags::kNone}},
  616. {"total_input_bytes",
  617. {offsetof(struct CompactionJobStats, total_input_bytes),
  618. OptionType::kUInt64T, OptionVerificationType::kNormal,
  619. OptionTypeFlags::kNone}},
  620. {"total_blob_bytes_read",
  621. {offsetof(struct CompactionJobStats, total_blob_bytes_read),
  622. OptionType::kUInt64T, OptionVerificationType::kNormal,
  623. OptionTypeFlags::kNone}},
  624. {"total_output_bytes",
  625. {offsetof(struct CompactionJobStats, total_output_bytes),
  626. OptionType::kUInt64T, OptionVerificationType::kNormal,
  627. OptionTypeFlags::kNone}},
  628. {"total_output_bytes_blob",
  629. {offsetof(struct CompactionJobStats, total_output_bytes_blob),
  630. OptionType::kUInt64T, OptionVerificationType::kNormal,
  631. OptionTypeFlags::kNone}},
  632. {"num_records_replaced",
  633. {offsetof(struct CompactionJobStats, num_records_replaced),
  634. OptionType::kUInt64T, OptionVerificationType::kNormal,
  635. OptionTypeFlags::kNone}},
  636. {"total_input_raw_key_bytes",
  637. {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
  638. OptionType::kUInt64T, OptionVerificationType::kNormal,
  639. OptionTypeFlags::kNone}},
  640. {"total_input_raw_value_bytes",
  641. {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
  642. OptionType::kUInt64T, OptionVerificationType::kNormal,
  643. OptionTypeFlags::kNone}},
  644. {"num_input_deletion_records",
  645. {offsetof(struct CompactionJobStats, num_input_deletion_records),
  646. OptionType::kUInt64T, OptionVerificationType::kNormal,
  647. OptionTypeFlags::kNone}},
  648. {"num_expired_deletion_records",
  649. {offsetof(struct CompactionJobStats, num_expired_deletion_records),
  650. OptionType::kUInt64T, OptionVerificationType::kNormal,
  651. OptionTypeFlags::kNone}},
  652. {"num_corrupt_keys",
  653. {offsetof(struct CompactionJobStats, num_corrupt_keys),
  654. OptionType::kUInt64T, OptionVerificationType::kNormal,
  655. OptionTypeFlags::kNone}},
  656. {"file_write_nanos",
  657. {offsetof(struct CompactionJobStats, file_write_nanos),
  658. OptionType::kUInt64T, OptionVerificationType::kNormal,
  659. OptionTypeFlags::kNone}},
  660. {"file_range_sync_nanos",
  661. {offsetof(struct CompactionJobStats, file_range_sync_nanos),
  662. OptionType::kUInt64T, OptionVerificationType::kNormal,
  663. OptionTypeFlags::kNone}},
  664. {"file_fsync_nanos",
  665. {offsetof(struct CompactionJobStats, file_fsync_nanos),
  666. OptionType::kUInt64T, OptionVerificationType::kNormal,
  667. OptionTypeFlags::kNone}},
  668. {"file_prepare_write_nanos",
  669. {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
  670. OptionType::kUInt64T, OptionVerificationType::kNormal,
  671. OptionTypeFlags::kNone}},
  672. {"smallest_output_key_prefix",
  673. {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
  674. OptionType::kEncodedString, OptionVerificationType::kNormal,
  675. OptionTypeFlags::kNone}},
  676. {"largest_output_key_prefix",
  677. {offsetof(struct CompactionJobStats, largest_output_key_prefix),
  678. OptionType::kEncodedString, OptionVerificationType::kNormal,
  679. OptionTypeFlags::kNone}},
  680. {"num_single_del_fallthru",
  681. {offsetof(struct CompactionJobStats, num_single_del_fallthru),
  682. OptionType::kUInt64T, OptionVerificationType::kNormal,
  683. OptionTypeFlags::kNone}},
  684. {"num_single_del_mismatch",
  685. {offsetof(struct CompactionJobStats, num_single_del_mismatch),
  686. OptionType::kUInt64T, OptionVerificationType::kNormal,
  687. OptionTypeFlags::kNone}},
  688. };
  689. static std::unordered_map<std::string, OptionTypeInfo>
  690. compaction_stats_type_info = {
  691. {"micros",
  692. {offsetof(struct InternalStats::CompactionStats, micros),
  693. OptionType::kUInt64T, OptionVerificationType::kNormal,
  694. OptionTypeFlags::kNone}},
  695. {"cpu_micros",
  696. {offsetof(struct InternalStats::CompactionStats, cpu_micros),
  697. OptionType::kUInt64T, OptionVerificationType::kNormal,
  698. OptionTypeFlags::kNone}},
  699. {"bytes_read_non_output_levels",
  700. {offsetof(struct InternalStats::CompactionStats,
  701. bytes_read_non_output_levels),
  702. OptionType::kUInt64T, OptionVerificationType::kNormal,
  703. OptionTypeFlags::kNone}},
  704. {"bytes_read_output_level",
  705. {offsetof(struct InternalStats::CompactionStats,
  706. bytes_read_output_level),
  707. OptionType::kUInt64T, OptionVerificationType::kNormal,
  708. OptionTypeFlags::kNone}},
  709. {"bytes_skipped_non_output_levels",
  710. {offsetof(struct InternalStats::CompactionStats,
  711. bytes_skipped_non_output_levels),
  712. OptionType::kUInt64T, OptionVerificationType::kNormal,
  713. OptionTypeFlags::kNone}},
  714. {"bytes_skipped_output_level",
  715. {offsetof(struct InternalStats::CompactionStats,
  716. bytes_skipped_output_level),
  717. OptionType::kUInt64T, OptionVerificationType::kNormal,
  718. OptionTypeFlags::kNone}},
  719. {"bytes_read_blob",
  720. {offsetof(struct InternalStats::CompactionStats, bytes_read_blob),
  721. OptionType::kUInt64T, OptionVerificationType::kNormal,
  722. OptionTypeFlags::kNone}},
  723. {"bytes_written",
  724. {offsetof(struct InternalStats::CompactionStats, bytes_written),
  725. OptionType::kUInt64T, OptionVerificationType::kNormal,
  726. OptionTypeFlags::kNone}},
  727. {"bytes_written_blob",
  728. {offsetof(struct InternalStats::CompactionStats, bytes_written_blob),
  729. OptionType::kUInt64T, OptionVerificationType::kNormal,
  730. OptionTypeFlags::kNone}},
  731. {"bytes_moved",
  732. {offsetof(struct InternalStats::CompactionStats, bytes_moved),
  733. OptionType::kUInt64T, OptionVerificationType::kNormal,
  734. OptionTypeFlags::kNone}},
  735. {"num_input_files_in_non_output_levels",
  736. {offsetof(struct InternalStats::CompactionStats,
  737. num_input_files_in_non_output_levels),
  738. OptionType::kInt, OptionVerificationType::kNormal,
  739. OptionTypeFlags::kNone}},
  740. {"num_input_files_in_output_level",
  741. {offsetof(struct InternalStats::CompactionStats,
  742. num_input_files_in_output_level),
  743. OptionType::kInt, OptionVerificationType::kNormal,
  744. OptionTypeFlags::kNone}},
  745. {"num_filtered_input_files_in_non_output_levels",
  746. {offsetof(struct InternalStats::CompactionStats,
  747. num_filtered_input_files_in_non_output_levels),
  748. OptionType::kInt, OptionVerificationType::kNormal,
  749. OptionTypeFlags::kNone}},
  750. {"num_filtered_input_files_in_output_level",
  751. {offsetof(struct InternalStats::CompactionStats,
  752. num_filtered_input_files_in_output_level),
  753. OptionType::kInt, OptionVerificationType::kNormal,
  754. OptionTypeFlags::kNone}},
  755. {"num_output_files",
  756. {offsetof(struct InternalStats::CompactionStats, num_output_files),
  757. OptionType::kInt, OptionVerificationType::kNormal,
  758. OptionTypeFlags::kNone}},
  759. {"num_output_files_blob",
  760. {offsetof(struct InternalStats::CompactionStats,
  761. num_output_files_blob),
  762. OptionType::kInt, OptionVerificationType::kNormal,
  763. OptionTypeFlags::kNone}},
  764. {"num_input_records",
  765. {offsetof(struct InternalStats::CompactionStats, num_input_records),
  766. OptionType::kUInt64T, OptionVerificationType::kNormal,
  767. OptionTypeFlags::kNone}},
  768. {"num_dropped_records",
  769. {offsetof(struct InternalStats::CompactionStats, num_dropped_records),
  770. OptionType::kUInt64T, OptionVerificationType::kNormal,
  771. OptionTypeFlags::kNone}},
  772. {"num_output_records",
  773. {offsetof(struct InternalStats::CompactionStats, num_output_records),
  774. OptionType::kUInt64T, OptionVerificationType::kNormal,
  775. OptionTypeFlags::kNone}},
  776. {"count",
  777. {offsetof(struct InternalStats::CompactionStats, count),
  778. OptionType::kUInt64T, OptionVerificationType::kNormal,
  779. OptionTypeFlags::kNone}},
  780. {"counts", OptionTypeInfo::Array<
  781. int, static_cast<int>(CompactionReason::kNumOfReasons)>(
  782. offsetof(struct InternalStats::CompactionStats, counts),
  783. OptionVerificationType::kNormal, OptionTypeFlags::kNone,
  784. {0, OptionType::kInt})},
  785. };
  786. static std::unordered_map<std::string, OptionTypeInfo>
  787. compaction_internal_stats_type_info = {
  788. {"output_level_stats",
  789. OptionTypeInfo::Struct(
  790. "output_level_stats", &compaction_stats_type_info,
  791. offsetof(struct InternalStats::CompactionStatsFull,
  792. output_level_stats),
  793. OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
  794. {"has_proximal_level_output",
  795. {offsetof(struct InternalStats::CompactionStatsFull,
  796. has_proximal_level_output),
  797. OptionType::kBoolean, OptionVerificationType::kNormal,
  798. OptionTypeFlags::kNone}},
  799. {"proximal_level_stats",
  800. OptionTypeInfo::Struct(
  801. "proximal_level_stats", &compaction_stats_type_info,
  802. offsetof(struct InternalStats::CompactionStatsFull,
  803. proximal_level_stats),
  804. OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
  805. };
  806. namespace {
  807. // this is a helper struct to serialize and deserialize class Status, because
  808. // Status's members are not public.
  809. struct StatusSerializationAdapter {
  810. uint8_t code;
  811. uint8_t subcode;
  812. uint8_t severity;
  813. std::string message;
  814. StatusSerializationAdapter() = default;
  815. explicit StatusSerializationAdapter(const Status& s) {
  816. code = s.code();
  817. subcode = s.subcode();
  818. severity = s.severity();
  819. auto msg = s.getState();
  820. message = msg ? msg : "";
  821. }
  822. Status GetStatus() const {
  823. return Status{static_cast<Status::Code>(code),
  824. static_cast<Status::SubCode>(subcode),
  825. static_cast<Status::Severity>(severity), message};
  826. }
  827. };
  828. } // namespace
  829. static std::unordered_map<std::string, OptionTypeInfo>
  830. status_adapter_type_info = {
  831. {"code",
  832. {offsetof(struct StatusSerializationAdapter, code),
  833. OptionType::kUInt8T, OptionVerificationType::kNormal,
  834. OptionTypeFlags::kNone}},
  835. {"subcode",
  836. {offsetof(struct StatusSerializationAdapter, subcode),
  837. OptionType::kUInt8T, OptionVerificationType::kNormal,
  838. OptionTypeFlags::kNone}},
  839. {"severity",
  840. {offsetof(struct StatusSerializationAdapter, severity),
  841. OptionType::kUInt8T, OptionVerificationType::kNormal,
  842. OptionTypeFlags::kNone}},
  843. {"message",
  844. {offsetof(struct StatusSerializationAdapter, message),
  845. OptionType::kEncodedString, OptionVerificationType::kNormal,
  846. OptionTypeFlags::kNone}},
  847. };
  848. static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
  849. {"status",
  850. {offsetof(struct CompactionServiceResult, status),
  851. OptionType::kCustomizable, OptionVerificationType::kNormal,
  852. OptionTypeFlags::kNone,
  853. [](const ConfigOptions& opts, const std::string& /*name*/,
  854. const std::string& value, void* addr) {
  855. auto status_obj = static_cast<Status*>(addr);
  856. StatusSerializationAdapter adapter;
  857. Status s = OptionTypeInfo::ParseType(
  858. opts, value, status_adapter_type_info, &adapter);
  859. *status_obj = adapter.GetStatus();
  860. return s;
  861. },
  862. [](const ConfigOptions& opts, const std::string& /*name*/,
  863. const void* addr, std::string* value) {
  864. const auto status_obj = static_cast<const Status*>(addr);
  865. StatusSerializationAdapter adapter(*status_obj);
  866. std::string result;
  867. Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
  868. &adapter, &result);
  869. *value = "{" + result + "}";
  870. return s;
  871. },
  872. [](const ConfigOptions& opts, const std::string& /*name*/,
  873. const void* addr1, const void* addr2, std::string* mismatch) {
  874. const auto status1 = static_cast<const Status*>(addr1);
  875. const auto status2 = static_cast<const Status*>(addr2);
  876. StatusSerializationAdapter adatper1(*status1);
  877. StatusSerializationAdapter adapter2(*status2);
  878. return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
  879. &adatper1, &adapter2, mismatch);
  880. }}},
  881. {"output_files",
  882. OptionTypeInfo::Vector<CompactionServiceOutputFile>(
  883. offsetof(struct CompactionServiceResult, output_files),
  884. OptionVerificationType::kNormal, OptionTypeFlags::kNone,
  885. OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
  886. OptionVerificationType::kNormal,
  887. OptionTypeFlags::kNone))},
  888. {"output_level",
  889. {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
  890. OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
  891. {"output_path",
  892. {offsetof(struct CompactionServiceResult, output_path),
  893. OptionType::kEncodedString, OptionVerificationType::kNormal,
  894. OptionTypeFlags::kNone}},
  895. {"bytes_read",
  896. {offsetof(struct CompactionServiceResult, bytes_read),
  897. OptionType::kUInt64T, OptionVerificationType::kNormal,
  898. OptionTypeFlags::kNone}},
  899. {"bytes_written",
  900. {offsetof(struct CompactionServiceResult, bytes_written),
  901. OptionType::kUInt64T, OptionVerificationType::kNormal,
  902. OptionTypeFlags::kNone}},
  903. {"stats", OptionTypeInfo::Struct(
  904. "stats", &compaction_job_stats_type_info,
  905. offsetof(struct CompactionServiceResult, stats),
  906. OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
  907. {"internal_stats",
  908. OptionTypeInfo::Struct(
  909. "internal_stats", &compaction_internal_stats_type_info,
  910. offsetof(struct CompactionServiceResult, internal_stats),
  911. OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
  912. };
  913. Status CompactionServiceInput::Read(const std::string& data_str,
  914. CompactionServiceInput* obj) {
  915. if (data_str.size() <= sizeof(BinaryFormatVersion)) {
  916. return Status::InvalidArgument("Invalid CompactionServiceInput string");
  917. }
  918. auto format_version = DecodeFixed32(data_str.data());
  919. if (format_version == kOptionsString) {
  920. ConfigOptions cf;
  921. cf.invoke_prepare_options = false;
  922. cf.ignore_unknown_options = true;
  923. return OptionTypeInfo::ParseType(
  924. cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
  925. obj);
  926. } else {
  927. return Status::NotSupported(
  928. "Compaction Service Input data version not supported: " +
  929. std::to_string(format_version));
  930. }
  931. }
  932. Status CompactionServiceInput::Write(std::string* output) {
  933. char buf[sizeof(BinaryFormatVersion)];
  934. EncodeFixed32(buf, kOptionsString);
  935. output->append(buf, sizeof(BinaryFormatVersion));
  936. ConfigOptions cf;
  937. cf.invoke_prepare_options = false;
  938. return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
  939. }
  940. Status CompactionServiceResult::Read(const std::string& data_str,
  941. CompactionServiceResult* obj) {
  942. if (data_str.size() <= sizeof(BinaryFormatVersion)) {
  943. return Status::InvalidArgument("Invalid CompactionServiceResult string");
  944. }
  945. auto format_version = DecodeFixed32(data_str.data());
  946. if (format_version == kOptionsString) {
  947. ConfigOptions cf;
  948. cf.invoke_prepare_options = false;
  949. cf.ignore_unknown_options = true;
  950. return OptionTypeInfo::ParseType(
  951. cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
  952. obj);
  953. } else {
  954. return Status::NotSupported(
  955. "Compaction Service Result data version not supported: " +
  956. std::to_string(format_version));
  957. }
  958. }
  959. Status CompactionServiceResult::Write(std::string* output) {
  960. char buf[sizeof(BinaryFormatVersion)];
  961. EncodeFixed32(buf, kOptionsString);
  962. output->append(buf, sizeof(BinaryFormatVersion));
  963. ConfigOptions cf;
  964. cf.invoke_prepare_options = false;
  965. return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
  966. }
  967. #ifndef NDEBUG
  968. bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
  969. std::string mismatch;
  970. return TEST_Equals(other, &mismatch);
  971. }
  972. bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
  973. std::string* mismatch) {
  974. ConfigOptions cf;
  975. cf.invoke_prepare_options = false;
  976. return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
  977. mismatch);
  978. }
  979. bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
  980. std::string mismatch;
  981. return TEST_Equals(other, &mismatch);
  982. }
  983. bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
  984. std::string* mismatch) {
  985. ConfigOptions cf;
  986. cf.invoke_prepare_options = false;
  987. return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
  988. mismatch);
  989. }
  990. #endif // NDEBUG
  991. } // namespace ROCKSDB_NAMESPACE