compaction_job.h 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <deque>
  12. #include <functional>
  13. #include <limits>
  14. #include <set>
  15. #include <string>
  16. #include <utility>
  17. #include <vector>
  18. #include "db/blob/blob_file_completion_callback.h"
  19. #include "db/column_family.h"
  20. #include "db/compaction/compaction_iterator.h"
  21. #include "db/compaction/compaction_outputs.h"
  22. #include "db/flush_scheduler.h"
  23. #include "db/internal_stats.h"
  24. #include "db/job_context.h"
  25. #include "db/log_writer.h"
  26. #include "db/memtable_list.h"
  27. #include "db/range_del_aggregator.h"
  28. #include "db/seqno_to_time_mapping.h"
  29. #include "db/version_edit.h"
  30. #include "db/write_controller.h"
  31. #include "db/write_thread.h"
  32. #include "logging/event_logger.h"
  33. #include "options/cf_options.h"
  34. #include "options/db_options.h"
  35. #include "port/port.h"
  36. #include "rocksdb/compaction_filter.h"
  37. #include "rocksdb/compaction_job_stats.h"
  38. #include "rocksdb/db.h"
  39. #include "rocksdb/env.h"
  40. #include "rocksdb/memtablerep.h"
  41. #include "rocksdb/transaction_log.h"
  42. #include "util/autovector.h"
  43. #include "util/stop_watch.h"
  44. #include "util/thread_local.h"
  45. namespace ROCKSDB_NAMESPACE {
  46. class Arena;
  47. class CompactionState;
  48. class ErrorHandler;
  49. class MemTable;
  50. class SnapshotChecker;
  51. class SystemClock;
  52. class TableCache;
  53. class Version;
  54. class VersionEdit;
  55. class VersionSet;
  56. class SubcompactionState;
  57. // CompactionJob is responsible for executing the compaction. Each (manual or
  58. // automated) compaction corresponds to a CompactionJob object, and usually
  59. // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
  60. // will divide the compaction into subcompactions and execute them in parallel
  61. // if needed.
  62. //
  63. // CompactionJob has 2 main stats:
  64. // 1. CompactionJobStats job_stats_
  65. // CompactionJobStats is a public data structure which is part of Compaction
  66. // event listener that rocksdb share the job stats with the user.
  67. // Internally it's an aggregation of all the compaction_job_stats from each
  68. // `SubcompactionState`:
  69. // +------------------------+
  70. // | SubcompactionState |
  71. // | |
  72. // +--------->| compaction_job_stats |
  73. // | | |
  74. // | +------------------------+
  75. // +------------------------+ |
  76. // | CompactionJob | | +------------------------+
  77. // | | | | SubcompactionState |
  78. // | job_stats +-----+ | |
  79. // | | +--------->| compaction_job_stats |
  80. // | | | | |
  81. // +------------------------+ | +------------------------+
  82. // |
  83. // | +------------------------+
  84. // | | SubcompactionState |
  85. // | | |
  86. // +--------->+ compaction_job_stats |
  87. // | | |
  88. // | +------------------------+
  89. // |
  90. // | +------------------------+
  91. // | | ... |
  92. // +--------->+ |
  93. // +------------------------+
  94. //
  95. // 2. CompactionStatsFull internal_stats_
  96. // `CompactionStatsFull` is an internal stats about the compaction, which
  97. // is eventually sent to `ColumnFamilyData::internal_stats_` and used for
  98. // logging and public metrics.
  99. // Internally, it's an aggregation of stats_ from each `SubcompactionState`.
  100. // It has 2 parts, ordinary output level stats and the proximal level output
  101. // stats.
  102. // +---------------------------+
  103. // | SubcompactionState |
  104. // | |
  105. // | +----------------------+ |
  106. // | | CompactionOutputs | |
  107. // | | (normal output) | |
  108. // +---->| stats_ | |
  109. // | | +----------------------+ |
  110. // | | |
  111. // | | +----------------------+ |
  112. // +--------------------------------+ | | | CompactionOutputs | |
  113. // | CompactionJob | | | | (proximal_level) | |
  114. // | | +--------->| stats_ | |
  115. // | internal_stats_ | | | | +----------------------+ |
  116. // | +-------------------------+ | | | | |
  117. // | |output_level_stats |------|----+ +---------------------------+
  118. // | +-------------------------+ | | |
  119. // | | | |
  120. // | +-------------------------+ | | | +---------------------------+
  121. // | |proximal_level_stats |------+ | | SubcompactionState |
  122. // | +-------------------------+ | | | | |
  123. // | | | | | +----------------------+ |
  124. // | | | | | | CompactionOutputs | |
  125. // +--------------------------------+ | | | | (normal output) | |
  126. // | +---->| stats_ | |
  127. // | | +----------------------+ |
  128. // | | |
  129. // | | +----------------------+ |
  130. // | | | CompactionOutputs | |
  131. // | | | (proximal_level) | |
  132. // +--------->| stats_ | |
  133. // | +----------------------+ |
  134. // | |
  135. // +---------------------------+
  136. class CompactionJob {
  137. public:
  138. CompactionJob(int job_id, Compaction* compaction,
  139. const ImmutableDBOptions& db_options,
  140. const MutableDBOptions& mutable_db_options,
  141. const FileOptions& file_options, VersionSet* versions,
  142. const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
  143. FSDirectory* db_directory, FSDirectory* output_directory,
  144. FSDirectory* blob_output_directory, Statistics* stats,
  145. InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
  146. JobContext* job_context, std::shared_ptr<Cache> table_cache,
  147. EventLogger* event_logger, bool paranoid_file_checks,
  148. bool measure_io_stats, const std::string& dbname,
  149. CompactionJobStats* compaction_job_stats,
  150. Env::Priority thread_pri,
  151. const std::shared_ptr<IOTracer>& io_tracer,
  152. const std::atomic<bool>& manual_compaction_canceled,
  153. const std::string& db_id = "",
  154. const std::string& db_session_id = "",
  155. std::string full_history_ts_low = "", std::string trim_ts = "",
  156. BlobFileCompletionCallback* blob_callback = nullptr,
  157. int* bg_compaction_scheduled = nullptr,
  158. int* bg_bottom_compaction_scheduled = nullptr);
  159. virtual ~CompactionJob();
  160. // no copy/move
  161. CompactionJob(CompactionJob&& job) = delete;
  162. CompactionJob(const CompactionJob& job) = delete;
  163. CompactionJob& operator=(const CompactionJob& job) = delete;
  164. // REQUIRED: mutex held
  165. // Prepare for the compaction by setting up boundaries for each subcompaction
  166. // and organizing seqno <-> time info. `known_single_subcompact` is non-null
  167. // if we already have a known single subcompaction, with optional key bounds
  168. // (currently for executing a remote compaction).
  169. //
  170. // @param compaction_progress Previously saved compaction progress
  171. // to resume from. If empty, compaction starts fresh from the
  172. // beginning.
  173. //
  174. // @param compaction_progress_writer Writer for persisting
  175. // subcompaction progress periodically during compaction
  176. // execution. If nullptr, progress tracking is disabled and compaction
  177. // cannot be resumed later.
  178. void Prepare(
  179. std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
  180. known_single_subcompact,
  181. const CompactionProgress& compaction_progress = CompactionProgress{},
  182. log::Writer* compaction_progress_writer = nullptr);
  183. // REQUIRED mutex not held
  184. // Launch threads for each subcompaction and wait for them to finish. After
  185. // that, verify table is usable and finally do bookkeeping to unify
  186. // subcompaction results
  187. Status Run();
  188. // REQUIRED: mutex held
  189. // Add compaction input/output to the current version
  190. // Releases compaction file through Compaction::ReleaseCompactionFiles().
  191. // Sets *compaction_released to true if compaction is released.
  192. Status Install(bool* compaction_released);
  193. // Return the IO status
  194. IOStatus io_status() const { return io_status_; }
  195. protected:
  196. void UpdateCompactionJobOutputStatsFromInternalStats(
  197. const Status& status,
  198. const InternalStats::CompactionStatsFull& internal_stats) const;
  199. void LogCompaction();
  200. virtual void RecordCompactionIOStats();
  201. void CleanupCompaction();
  202. // Iterate through input and compact the kv-pairs.
  203. void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
  204. CompactionState* compact_;
  205. InternalStats::CompactionStatsFull internal_stats_;
  206. const ImmutableDBOptions& db_options_;
  207. const MutableDBOptions mutable_db_options_copy_;
  208. LogBuffer* log_buffer_;
  209. FSDirectory* output_directory_;
  210. Statistics* stats_;
  211. // Is this compaction creating a file in the bottom most level?
  212. bool bottommost_level_;
  213. Env::WriteLifeTimeHint write_hint_;
  214. IOStatus io_status_;
  215. CompactionJobStats* job_stats_;
  216. private:
  217. friend class CompactionJobTestBase;
  218. // Collect the following stats from input files and table properties
  219. // - num_input_files_in_non_output_levels
  220. // - num_input_files_in_output_level
  221. // - bytes_read_non_output_levels
  222. // - bytes_read_output_level
  223. // - num_input_records
  224. // - bytes_read_blob
  225. // - num_dropped_records
  226. // and set them in internal_stats_.output_level_stats
  227. //
  228. // @param num_input_range_del if non-null, will be set to the number of range
  229. // deletion entries in this compaction input.
  230. //
  231. // Returns true iff internal_stats_.output_level_stats.num_input_records and
  232. // num_input_range_del are calculated successfully.
  233. //
  234. // This should be called only once for compactions (not per subcompaction)
  235. bool UpdateInternalStatsFromInputFiles(
  236. uint64_t* num_input_range_del = nullptr);
  237. void UpdateCompactionJobInputStatsFromInternalStats(
  238. const InternalStats::CompactionStatsFull& internal_stats,
  239. uint64_t num_input_range_del) const;
  240. Status VerifyInputRecordCount(uint64_t num_input_range_del) const;
  241. Status VerifyOutputRecordCount() const;
  242. // Generates a histogram representing potential divisions of key ranges from
  243. // the input. It adds the starting and/or ending keys of certain input files
  244. // to the working set and then finds the approximate size of data in between
  245. // each consecutive pair of slices. Then it divides these ranges into
  246. // consecutive groups such that each group has a similar size.
  247. void GenSubcompactionBoundaries();
  248. void MaybeAssignCompactionProgressAndWriter(
  249. const CompactionProgress& compaction_progress,
  250. log::Writer* compaction_progress_writer);
  251. // Get the number of planned subcompactions based on max_subcompactions and
  252. // extra reserved resources
  253. uint64_t GetSubcompactionsLimit();
  254. // Additional reserved threads are reserved and the number is stored in
  255. // extra_num_subcompaction_threads_reserved__. For now, this happens only if
  256. // the compaction priority is round-robin and max_subcompactions is not
  257. // sufficient (extra resources may be needed)
  258. void AcquireSubcompactionResources(int num_extra_required_subcompactions);
  259. // Additional threads may be reserved during IncreaseSubcompactionResources()
  260. // if num_actual_subcompactions is less than num_planned_subcompactions.
  261. // Additional threads will be released and the bg_compaction_scheduled_ or
  262. // bg_bottom_compaction_scheduled_ will be updated if they are used.
  263. // DB Mutex lock is required.
  264. void ShrinkSubcompactionResources(uint64_t num_extra_resources);
  265. // Release all reserved threads and update the compaction limits.
  266. void ReleaseSubcompactionResources();
  267. void InitializeCompactionRun();
  268. void RunSubcompactions();
  269. void UpdateTimingStats(uint64_t start_micros);
  270. void RemoveEmptyOutputs();
  271. bool HasNewBlobFiles() const;
  272. Status CollectSubcompactionErrors();
  273. Status SyncOutputDirectories();
  274. Status VerifyOutputFiles();
  275. void SetOutputTableProperties();
  276. // Aggregates subcompaction output stats to internal stat, and aggregates
  277. // subcompaction's compaction job stats to the whole entire surrounding
  278. // compaction job stats.
  279. void AggregateSubcompactionOutputAndJobStats();
  280. Status VerifyCompactionRecordCounts(bool stats_built_from_input_table_prop,
  281. uint64_t num_input_range_del);
  282. void FinalizeCompactionRun(const Status& status,
  283. bool stats_built_from_input_table_prop,
  284. uint64_t num_input_range_del);
  285. CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
  286. SubcompactionState* sub_compact);
  287. struct CompactionIOStatsSnapshot {
  288. PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  289. uint64_t prev_write_nanos = 0;
  290. uint64_t prev_fsync_nanos = 0;
  291. uint64_t prev_range_sync_nanos = 0;
  292. uint64_t prev_prepare_write_nanos = 0;
  293. uint64_t prev_cpu_write_nanos = 0;
  294. uint64_t prev_cpu_read_nanos = 0;
  295. };
  296. struct SubcompactionKeyBoundaries {
  297. const std::optional<const Slice> start;
  298. const std::optional<const Slice> end;
  299. // Boundaries without timestamps for read options
  300. std::optional<Slice> start_without_ts;
  301. std::optional<Slice> end_without_ts;
  302. // Timestamp management
  303. static constexpr char kMaxTs[] =
  304. "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
  305. std::string max_ts;
  306. Slice ts_slice;
  307. // Internal key boundaries
  308. IterKey start_ikey;
  309. IterKey end_ikey;
  310. Slice start_internal_key;
  311. Slice end_internal_key;
  312. // User key boundaries
  313. Slice start_user_key;
  314. Slice end_user_key;
  315. SubcompactionKeyBoundaries(std::optional<const Slice> start_boundary,
  316. std::optional<const Slice> end_boundary)
  317. : start(start_boundary), end(end_boundary) {}
  318. };
  319. struct SubcompactionInternalIterators {
  320. std::unique_ptr<InternalIterator> raw_input;
  321. std::unique_ptr<InternalIterator> clip;
  322. std::unique_ptr<InternalIterator> blob_counter;
  323. std::unique_ptr<InternalIterator> trim_history_iter;
  324. };
  325. struct BlobFileResources {
  326. std::vector<std::string> blob_file_paths;
  327. std::unique_ptr<BlobFileBuilder> blob_file_builder;
  328. };
  329. bool ShouldUseLocalCompaction(SubcompactionState* sub_compact);
  330. CompactionIOStatsSnapshot InitializeIOStats();
  331. Status SetupAndValidateCompactionFilter(
  332. SubcompactionState* sub_compact,
  333. const CompactionFilter* configured_compaction_filter,
  334. const CompactionFilter*& compaction_filter,
  335. std::unique_ptr<CompactionFilter>& compaction_filter_from_factory);
  336. void InitializeReadOptionsAndBoundaries(
  337. size_t ts_sz, ReadOptions& read_options,
  338. SubcompactionKeyBoundaries& boundaries);
  339. InternalIterator* CreateInputIterator(
  340. SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  341. SubcompactionInternalIterators& iterators,
  342. SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options);
  343. void CreateBlobFileBuilder(SubcompactionState* sub_compact,
  344. ColumnFamilyData* cfd,
  345. BlobFileResources& blob_resources,
  346. const WriteOptions& write_options);
  347. std::unique_ptr<CompactionIterator> CreateCompactionIterator(
  348. SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  349. InternalIterator* input_iter, const CompactionFilter* compaction_filter,
  350. MergeHelper& merge, BlobFileResources& blob_resources,
  351. const WriteOptions& write_options);
  352. std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc> CreateFileHandlers(
  353. SubcompactionState* sub_compact, SubcompactionKeyBoundaries& boundaries);
  354. Status ProcessKeyValue(SubcompactionState* sub_compact, ColumnFamilyData* cfd,
  355. CompactionIterator* c_iter,
  356. const CompactionFileOpenFunc& open_file_func,
  357. const CompactionFileCloseFunc& close_file_func,
  358. uint64_t& prev_cpu_micros);
  359. void UpdateSubcompactionJobStatsIncrementally(
  360. CompactionIterator* c_iter, CompactionJobStats* compaction_job_stats,
  361. uint64_t cur_cpu_micros, uint64_t& prev_cpu_micros);
  362. void FinalizeSubcompactionJobStats(SubcompactionState* sub_compact,
  363. CompactionIterator* c_iter,
  364. uint64_t start_cpu_micros,
  365. uint64_t prev_cpu_micros,
  366. const CompactionIOStatsSnapshot& io_stats);
  367. Status FinalizeProcessKeyValueStatus(ColumnFamilyData* cfd,
  368. InternalIterator* input_iter,
  369. CompactionIterator* c_iter,
  370. Status status);
  371. Status CleanupCompactionFiles(SubcompactionState* sub_compact, Status status,
  372. const CompactionFileOpenFunc& open_file_func,
  373. const CompactionFileCloseFunc& close_file_func);
  374. Status FinalizeBlobFiles(SubcompactionState* sub_compact,
  375. BlobFileBuilder* blob_file_builder, Status status);
  376. void FinalizeSubcompaction(SubcompactionState* sub_compact, Status status,
  377. const CompactionFileOpenFunc& open_file_func,
  378. const CompactionFileCloseFunc& close_file_func,
  379. BlobFileBuilder* blob_file_builder,
  380. CompactionIterator* c_iter,
  381. InternalIterator* input_iter,
  382. uint64_t start_cpu_micros,
  383. uint64_t prev_cpu_micros,
  384. const CompactionIOStatsSnapshot& io_stats);
  385. // update the thread status for starting a compaction.
  386. void ReportStartedCompaction(Compaction* compaction);
  387. Status FinishCompactionOutputFile(
  388. const Status& input_status,
  389. const ParsedInternalKey& prev_table_last_internal_key,
  390. const Slice& next_table_min_key, const Slice* comp_start_user_key,
  391. const Slice* comp_end_user_key, const CompactionIterator* c_iter,
  392. SubcompactionState* sub_compact, CompactionOutputs& outputs);
  393. Status InstallCompactionResults(bool* compaction_released);
  394. Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
  395. CompactionOutputs& outputs);
  396. void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
  397. CompactionJobStats* compaction_job_stats = nullptr);
  398. void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
  399. void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
  400. uint32_t job_id_;
  401. // DBImpl state
  402. const std::string& dbname_;
  403. const std::string db_id_;
  404. const std::string db_session_id_;
  405. const FileOptions file_options_;
  406. Env* env_;
  407. std::shared_ptr<IOTracer> io_tracer_;
  408. FileSystemPtr fs_;
  409. // env_option optimized for compaction table reads
  410. FileOptions file_options_for_read_;
  411. VersionSet* versions_;
  412. const std::atomic<bool>* shutting_down_;
  413. const std::atomic<bool>& manual_compaction_canceled_;
  414. FSDirectory* db_directory_;
  415. FSDirectory* blob_output_directory_;
  416. InstrumentedMutex* db_mutex_;
  417. ErrorHandler* db_error_handler_;
  418. SequenceNumber earliest_snapshot_;
  419. JobContext* job_context_;
  420. std::shared_ptr<Cache> table_cache_;
  421. EventLogger* event_logger_;
  422. bool paranoid_file_checks_;
  423. bool measure_io_stats_;
  424. // Stores the Slices that designate the boundaries for each subcompaction
  425. std::vector<std::string> boundaries_;
  426. Env::Priority thread_pri_;
  427. std::string full_history_ts_low_;
  428. std::string trim_ts_;
  429. BlobFileCompletionCallback* blob_callback_;
  430. uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
  431. // Stores the number of reserved threads in shared env_ for the number of
  432. // extra subcompaction in kRoundRobin compaction priority
  433. int extra_num_subcompaction_threads_reserved_;
  434. // Stores the pointer to bg_compaction_scheduled_,
  435. // bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing
  436. // or updating it.
  437. int* bg_compaction_scheduled_;
  438. int* bg_bottom_compaction_scheduled_;
  439. // Stores the sequence number to time mapping gathered from all input files
  440. // it also collects the smallest_seqno -> oldest_ancester_time from the SST.
  441. SeqnoToTimeMapping seqno_to_time_mapping_;
  442. // Max seqno that can be zeroed out in last level, including for preserving
  443. // write times.
  444. SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
  445. // Minimal sequence number to preclude the data from the last level. If the
  446. // key has bigger (newer) sequence number than this, it will be precluded from
  447. // the last level (output to proximal level).
  448. SequenceNumber proximal_after_seqno_ = kMaxSequenceNumber;
  449. // Options File Number used for Remote Compaction
  450. // Setting this requires DBMutex.
  451. uint64_t options_file_number_ = 0;
  452. // Writer for persisting compaction progress during compaction
  453. log::Writer* compaction_progress_writer_ = nullptr;
  454. // Get table file name in where it's outputting to, which should also be in
  455. // `output_directory_`.
  456. virtual std::string GetTableFileName(uint64_t file_number);
  457. // The rate limiter priority (io_priority) is determined dynamically here.
  458. // The Compaction Read and Write priorities are the same for different
  459. // scenarios, such as write stalled.
  460. Env::IOPriority GetRateLimiterPriority();
  461. Status MaybeResumeSubcompactionProgressOnInputIterator(
  462. SubcompactionState* sub_compact, InternalIterator* input_iter);
  463. Status ReadOutputFilesTableProperties(
  464. const autovector<FileMetaData>& temporary_output_file_allocation,
  465. const ReadOptions& read_options,
  466. std::vector<std::shared_ptr<const TableProperties>>&
  467. output_files_table_properties,
  468. bool is_proximal_level = false);
  469. Status ReadTablePropertiesDirectly(
  470. const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
  471. const FileMetaData* file_meta, const ReadOptions& read_options,
  472. std::shared_ptr<const TableProperties>* tp);
  473. void RestoreCompactionOutputs(
  474. const ColumnFamilyData* cfd,
  475. const std::vector<std::shared_ptr<const TableProperties>>&
  476. output_files_table_properties,
  477. SubcompactionProgressPerLevel& subcompaction_progress_per_level,
  478. CompactionOutputs* outputs_to_restore);
  479. bool ShouldUpdateSubcompactionProgress(
  480. const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
  481. const ParsedInternalKey& prev_table_last_internal_key,
  482. const Slice& next_table_min_internal_key, const FileMetaData* meta) const;
  483. void UpdateSubcompactionProgress(const CompactionIterator* c_iter,
  484. const Slice next_table_min_key,
  485. SubcompactionState* sub_compact);
  486. Status PersistSubcompactionProgress(SubcompactionState* sub_compact);
  487. void UpdateSubcompactionProgressPerLevel(
  488. SubcompactionState* sub_compact, bool is_proximal_level,
  489. SubcompactionProgress& subcompaction_progress);
  490. };
  491. // CompactionServiceInput is used the pass compaction information between two
  492. // db instances. It contains the information needed to do a compaction. It
  493. // doesn't contain the LSM tree information, which is passed though MANIFEST
  494. // file.
  495. struct CompactionServiceInput {
  496. std::string cf_name;
  497. std::vector<SequenceNumber> snapshots;
  498. // SST files for compaction, it should already be expended to include all the
  499. // files needed for this compaction, for both input level files and output
  500. // level files.
  501. std::vector<std::string> input_files;
  502. int output_level = 0;
  503. // db_id is used to generate unique id of sst on the remote compactor
  504. std::string db_id;
  505. // information for subcompaction
  506. bool has_begin = false;
  507. std::string begin;
  508. bool has_end = false;
  509. std::string end;
  510. uint64_t options_file_number = 0;
  511. // serialization interface to read and write the object
  512. static Status Read(const std::string& data_str, CompactionServiceInput* obj);
  513. Status Write(std::string* output);
  514. #ifndef NDEBUG
  515. bool TEST_Equals(CompactionServiceInput* other);
  516. bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
  517. #endif // NDEBUG
  518. };
  519. // CompactionServiceOutputFile is the metadata for the output SST file
  520. struct CompactionServiceOutputFile {
  521. std::string file_name;
  522. uint64_t file_size{};
  523. SequenceNumber smallest_seqno{};
  524. SequenceNumber largest_seqno{};
  525. std::string smallest_internal_key;
  526. std::string largest_internal_key;
  527. uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
  528. uint64_t file_creation_time = kUnknownFileCreationTime;
  529. uint64_t epoch_number = kUnknownEpochNumber;
  530. std::string file_checksum = kUnknownFileChecksum;
  531. std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
  532. uint64_t paranoid_hash{};
  533. bool marked_for_compaction;
  534. UniqueId64x2 unique_id{};
  535. TableProperties table_properties;
  536. bool is_proximal_level_output;
  537. Temperature file_temperature = Temperature::kUnknown;
  538. CompactionServiceOutputFile() = default;
  539. CompactionServiceOutputFile(
  540. const std::string& name, uint64_t size, SequenceNumber smallest,
  541. SequenceNumber largest, std::string _smallest_internal_key,
  542. std::string _largest_internal_key, uint64_t _oldest_ancester_time,
  543. uint64_t _file_creation_time, uint64_t _epoch_number,
  544. const std::string& _file_checksum,
  545. const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
  546. bool _marked_for_compaction, UniqueId64x2 _unique_id,
  547. const TableProperties& _table_properties, bool _is_proximal_level_output,
  548. Temperature _file_temperature)
  549. : file_name(name),
  550. file_size(size),
  551. smallest_seqno(smallest),
  552. largest_seqno(largest),
  553. smallest_internal_key(std::move(_smallest_internal_key)),
  554. largest_internal_key(std::move(_largest_internal_key)),
  555. oldest_ancester_time(_oldest_ancester_time),
  556. file_creation_time(_file_creation_time),
  557. epoch_number(_epoch_number),
  558. file_checksum(_file_checksum),
  559. file_checksum_func_name(_file_checksum_func_name),
  560. paranoid_hash(_paranoid_hash),
  561. marked_for_compaction(_marked_for_compaction),
  562. unique_id(std::move(_unique_id)),
  563. table_properties(_table_properties),
  564. is_proximal_level_output(_is_proximal_level_output),
  565. file_temperature(_file_temperature) {}
  566. };
  567. // CompactionServiceResult contains the compaction result from a different db
  568. // instance, with these information, the primary db instance with write
  569. // permission is able to install the result to the DB.
  570. struct CompactionServiceResult {
  571. Status status;
  572. std::vector<CompactionServiceOutputFile> output_files;
  573. int output_level = 0;
  574. // location of the output files
  575. std::string output_path;
  576. uint64_t bytes_read = 0;
  577. uint64_t bytes_written = 0;
  578. // Job-level Compaction Stats.
  579. //
  580. // NOTE: Job level stats cannot be rebuilt from scratch by simply aggregating
  581. // per-level stats due to some fields populated directly during compaction
  582. // (e.g. RecordDroppedKeys()). This is why we need both job-level stats and
  583. // per-level in the serialized result. If rebuilding job-level stats from
  584. // per-level stats become possible in the future, consider deprecating this
  585. // field.
  586. CompactionJobStats stats;
  587. // Per-level Compaction Stats for both output_level_stats and
  588. // proximal_level_stats
  589. InternalStats::CompactionStatsFull internal_stats;
  590. // serialization interface to read and write the object
  591. static Status Read(const std::string& data_str, CompactionServiceResult* obj);
  592. Status Write(std::string* output);
  593. #ifndef NDEBUG
  594. bool TEST_Equals(CompactionServiceResult* other);
  595. bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
  596. #endif // NDEBUG
  597. };
  598. // CompactionServiceCompactionJob is an read-only compaction job, it takes
  599. // input information from `compaction_service_input` and put result information
  600. // in `compaction_service_result`, the SST files are generated to `output_path`.
  601. class CompactionServiceCompactionJob : private CompactionJob {
  602. public:
  603. CompactionServiceCompactionJob(
  604. int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
  605. const MutableDBOptions& mutable_db_options,
  606. const FileOptions& file_options, VersionSet* versions,
  607. const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
  608. FSDirectory* output_directory, Statistics* stats,
  609. InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
  610. JobContext* job_context, std::shared_ptr<Cache> table_cache,
  611. EventLogger* event_logger, const std::string& dbname,
  612. const std::shared_ptr<IOTracer>& io_tracer,
  613. const std::atomic<bool>& manual_compaction_canceled,
  614. const std::string& db_id, const std::string& db_session_id,
  615. std::string output_path,
  616. const CompactionServiceInput& compaction_service_input,
  617. CompactionServiceResult* compaction_service_result);
  618. // REQUIRED: mutex held
  619. // Like CompactionJob::Prepare()
  620. void Prepare(
  621. const CompactionProgress& compaction_progress = CompactionProgress{},
  622. log::Writer* compaction_progress_writer = nullptr);
  623. // Run the compaction in current thread and return the result
  624. Status Run();
  625. void CleanupCompaction();
  626. IOStatus io_status() const { return CompactionJob::io_status(); }
  627. protected:
  628. void RecordCompactionIOStats() override;
  629. private:
  630. // Get table file name in output_path
  631. std::string GetTableFileName(uint64_t file_number) override;
  632. // Specific the compaction output path, otherwise it uses default DB path
  633. const std::string output_path_;
  634. // Compaction job input
  635. const CompactionServiceInput& compaction_input_;
  636. // Compaction job result
  637. CompactionServiceResult* compaction_result_;
  638. };
  639. } // namespace ROCKSDB_NAMESPACE