compaction_job.cc 66 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <algorithm>
  10. #include <cinttypes>
  11. #include <functional>
  12. #include <list>
  13. #include <memory>
  14. #include <random>
  15. #include <set>
  16. #include <thread>
  17. #include <utility>
  18. #include <vector>
  19. #include "db/builder.h"
  20. #include "db/compaction/compaction_job.h"
  21. #include "db/db_impl/db_impl.h"
  22. #include "db/db_iter.h"
  23. #include "db/dbformat.h"
  24. #include "db/error_handler.h"
  25. #include "db/event_helpers.h"
  26. #include "db/log_reader.h"
  27. #include "db/log_writer.h"
  28. #include "db/memtable.h"
  29. #include "db/memtable_list.h"
  30. #include "db/merge_context.h"
  31. #include "db/merge_helper.h"
  32. #include "db/range_del_aggregator.h"
  33. #include "db/version_set.h"
  34. #include "file/filename.h"
  35. #include "file/read_write_util.h"
  36. #include "file/sst_file_manager_impl.h"
  37. #include "file/writable_file_writer.h"
  38. #include "logging/log_buffer.h"
  39. #include "logging/logging.h"
  40. #include "monitoring/iostats_context_imp.h"
  41. #include "monitoring/perf_context_imp.h"
  42. #include "monitoring/thread_status_util.h"
  43. #include "port/port.h"
  44. #include "rocksdb/db.h"
  45. #include "rocksdb/env.h"
  46. #include "rocksdb/statistics.h"
  47. #include "rocksdb/status.h"
  48. #include "rocksdb/table.h"
  49. #include "table/block_based/block.h"
  50. #include "table/block_based/block_based_table_factory.h"
  51. #include "table/merging_iterator.h"
  52. #include "table/table_builder.h"
  53. #include "test_util/sync_point.h"
  54. #include "util/coding.h"
  55. #include "util/mutexlock.h"
  56. #include "util/random.h"
  57. #include "util/stop_watch.h"
  58. #include "util/string_util.h"
  59. namespace ROCKSDB_NAMESPACE {
  60. const char* GetCompactionReasonString(CompactionReason compaction_reason) {
  61. switch (compaction_reason) {
  62. case CompactionReason::kUnknown:
  63. return "Unknown";
  64. case CompactionReason::kLevelL0FilesNum:
  65. return "LevelL0FilesNum";
  66. case CompactionReason::kLevelMaxLevelSize:
  67. return "LevelMaxLevelSize";
  68. case CompactionReason::kUniversalSizeAmplification:
  69. return "UniversalSizeAmplification";
  70. case CompactionReason::kUniversalSizeRatio:
  71. return "UniversalSizeRatio";
  72. case CompactionReason::kUniversalSortedRunNum:
  73. return "UniversalSortedRunNum";
  74. case CompactionReason::kFIFOMaxSize:
  75. return "FIFOMaxSize";
  76. case CompactionReason::kFIFOReduceNumFiles:
  77. return "FIFOReduceNumFiles";
  78. case CompactionReason::kFIFOTtl:
  79. return "FIFOTtl";
  80. case CompactionReason::kManualCompaction:
  81. return "ManualCompaction";
  82. case CompactionReason::kFilesMarkedForCompaction:
  83. return "FilesMarkedForCompaction";
  84. case CompactionReason::kBottommostFiles:
  85. return "BottommostFiles";
  86. case CompactionReason::kTtl:
  87. return "Ttl";
  88. case CompactionReason::kFlush:
  89. return "Flush";
  90. case CompactionReason::kExternalSstIngestion:
  91. return "ExternalSstIngestion";
  92. case CompactionReason::kPeriodicCompaction:
  93. return "PeriodicCompaction";
  94. case CompactionReason::kNumOfReasons:
  95. // fall through
  96. default:
  97. assert(false);
  98. return "Invalid";
  99. }
  100. }
  101. // Maintains state for each sub-compaction
  102. struct CompactionJob::SubcompactionState {
  103. const Compaction* compaction;
  104. std::unique_ptr<CompactionIterator> c_iter;
  105. // The boundaries of the key-range this compaction is interested in. No two
  106. // subcompactions may have overlapping key-ranges.
  107. // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
  108. Slice *start, *end;
  109. // The return status of this subcompaction
  110. Status status;
  111. // Files produced by this subcompaction
  112. struct Output {
  113. FileMetaData meta;
  114. bool finished;
  115. std::shared_ptr<const TableProperties> table_properties;
  116. };
  117. // State kept for output being generated
  118. std::vector<Output> outputs;
  119. std::unique_ptr<WritableFileWriter> outfile;
  120. std::unique_ptr<TableBuilder> builder;
  121. Output* current_output() {
  122. if (outputs.empty()) {
  123. // This subcompaction's outptut could be empty if compaction was aborted
  124. // before this subcompaction had a chance to generate any output files.
  125. // When subcompactions are executed sequentially this is more likely and
  126. // will be particulalry likely for the later subcompactions to be empty.
  127. // Once they are run in parallel however it should be much rarer.
  128. return nullptr;
  129. } else {
  130. return &outputs.back();
  131. }
  132. }
  133. uint64_t current_output_file_size;
  134. // State during the subcompaction
  135. uint64_t total_bytes;
  136. uint64_t num_output_records;
  137. CompactionJobStats compaction_job_stats;
  138. uint64_t approx_size;
  139. // An index that used to speed up ShouldStopBefore().
  140. size_t grandparent_index = 0;
  141. // The number of bytes overlapping between the current output and
  142. // grandparent files used in ShouldStopBefore().
  143. uint64_t overlapped_bytes = 0;
  144. // A flag determine whether the key has been seen in ShouldStopBefore()
  145. bool seen_key = false;
  146. SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
  147. uint64_t size = 0)
  148. : compaction(c),
  149. start(_start),
  150. end(_end),
  151. outfile(nullptr),
  152. builder(nullptr),
  153. current_output_file_size(0),
  154. total_bytes(0),
  155. num_output_records(0),
  156. approx_size(size),
  157. grandparent_index(0),
  158. overlapped_bytes(0),
  159. seen_key(false) {
  160. assert(compaction != nullptr);
  161. }
  162. SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
  163. SubcompactionState& operator=(SubcompactionState&& o) {
  164. compaction = std::move(o.compaction);
  165. start = std::move(o.start);
  166. end = std::move(o.end);
  167. status = std::move(o.status);
  168. outputs = std::move(o.outputs);
  169. outfile = std::move(o.outfile);
  170. builder = std::move(o.builder);
  171. current_output_file_size = std::move(o.current_output_file_size);
  172. total_bytes = std::move(o.total_bytes);
  173. num_output_records = std::move(o.num_output_records);
  174. compaction_job_stats = std::move(o.compaction_job_stats);
  175. approx_size = std::move(o.approx_size);
  176. grandparent_index = std::move(o.grandparent_index);
  177. overlapped_bytes = std::move(o.overlapped_bytes);
  178. seen_key = std::move(o.seen_key);
  179. return *this;
  180. }
  181. // Because member std::unique_ptrs do not have these.
  182. SubcompactionState(const SubcompactionState&) = delete;
  183. SubcompactionState& operator=(const SubcompactionState&) = delete;
  184. // Returns true iff we should stop building the current output
  185. // before processing "internal_key".
  186. bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
  187. const InternalKeyComparator* icmp =
  188. &compaction->column_family_data()->internal_comparator();
  189. const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
  190. // Scan to find earliest grandparent file that contains key.
  191. while (grandparent_index < grandparents.size() &&
  192. icmp->Compare(internal_key,
  193. grandparents[grandparent_index]->largest.Encode()) >
  194. 0) {
  195. if (seen_key) {
  196. overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
  197. }
  198. assert(grandparent_index + 1 >= grandparents.size() ||
  199. icmp->Compare(
  200. grandparents[grandparent_index]->largest.Encode(),
  201. grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
  202. grandparent_index++;
  203. }
  204. seen_key = true;
  205. if (overlapped_bytes + curr_file_size >
  206. compaction->max_compaction_bytes()) {
  207. // Too much overlap for current output; start new output
  208. overlapped_bytes = 0;
  209. return true;
  210. }
  211. return false;
  212. }
  213. };
  214. // Maintains state for the entire compaction
  215. struct CompactionJob::CompactionState {
  216. Compaction* const compaction;
  217. // REQUIRED: subcompaction states are stored in order of increasing
  218. // key-range
  219. std::vector<CompactionJob::SubcompactionState> sub_compact_states;
  220. Status status;
  221. uint64_t total_bytes;
  222. uint64_t num_output_records;
  223. explicit CompactionState(Compaction* c)
  224. : compaction(c),
  225. total_bytes(0),
  226. num_output_records(0) {}
  227. size_t NumOutputFiles() {
  228. size_t total = 0;
  229. for (auto& s : sub_compact_states) {
  230. total += s.outputs.size();
  231. }
  232. return total;
  233. }
  234. Slice SmallestUserKey() {
  235. for (const auto& sub_compact_state : sub_compact_states) {
  236. if (!sub_compact_state.outputs.empty() &&
  237. sub_compact_state.outputs[0].finished) {
  238. return sub_compact_state.outputs[0].meta.smallest.user_key();
  239. }
  240. }
  241. // If there is no finished output, return an empty slice.
  242. return Slice(nullptr, 0);
  243. }
  244. Slice LargestUserKey() {
  245. for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
  246. ++it) {
  247. if (!it->outputs.empty() && it->current_output()->finished) {
  248. assert(it->current_output() != nullptr);
  249. return it->current_output()->meta.largest.user_key();
  250. }
  251. }
  252. // If there is no finished output, return an empty slice.
  253. return Slice(nullptr, 0);
  254. }
  255. };
  256. void CompactionJob::AggregateStatistics() {
  257. for (SubcompactionState& sc : compact_->sub_compact_states) {
  258. compact_->total_bytes += sc.total_bytes;
  259. compact_->num_output_records += sc.num_output_records;
  260. }
  261. if (compaction_job_stats_) {
  262. for (SubcompactionState& sc : compact_->sub_compact_states) {
  263. compaction_job_stats_->Add(sc.compaction_job_stats);
  264. }
  265. }
  266. }
  267. CompactionJob::CompactionJob(
  268. int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
  269. const FileOptions& file_options, VersionSet* versions,
  270. const std::atomic<bool>* shutting_down,
  271. const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
  272. Directory* db_directory, Directory* output_directory, Statistics* stats,
  273. InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
  274. std::vector<SequenceNumber> existing_snapshots,
  275. SequenceNumber earliest_write_conflict_snapshot,
  276. const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
  277. EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
  278. const std::string& dbname, CompactionJobStats* compaction_job_stats,
  279. Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
  280. : job_id_(job_id),
  281. compact_(new CompactionState(compaction)),
  282. compaction_job_stats_(compaction_job_stats),
  283. compaction_stats_(compaction->compaction_reason(), 1),
  284. dbname_(dbname),
  285. db_options_(db_options),
  286. file_options_(file_options),
  287. env_(db_options.env),
  288. fs_(db_options.fs.get()),
  289. file_options_for_read_(
  290. fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
  291. versions_(versions),
  292. shutting_down_(shutting_down),
  293. manual_compaction_paused_(manual_compaction_paused),
  294. preserve_deletes_seqnum_(preserve_deletes_seqnum),
  295. log_buffer_(log_buffer),
  296. db_directory_(db_directory),
  297. output_directory_(output_directory),
  298. stats_(stats),
  299. db_mutex_(db_mutex),
  300. db_error_handler_(db_error_handler),
  301. existing_snapshots_(std::move(existing_snapshots)),
  302. earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
  303. snapshot_checker_(snapshot_checker),
  304. table_cache_(std::move(table_cache)),
  305. event_logger_(event_logger),
  306. bottommost_level_(false),
  307. paranoid_file_checks_(paranoid_file_checks),
  308. measure_io_stats_(measure_io_stats),
  309. write_hint_(Env::WLTH_NOT_SET),
  310. thread_pri_(thread_pri) {
  311. assert(log_buffer_ != nullptr);
  312. const auto* cfd = compact_->compaction->column_family_data();
  313. ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
  314. db_options_.enable_thread_tracking);
  315. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  316. ReportStartedCompaction(compaction);
  317. }
  318. CompactionJob::~CompactionJob() {
  319. assert(compact_ == nullptr);
  320. ThreadStatusUtil::ResetThreadStatus();
  321. }
  322. void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
  323. const auto* cfd = compact_->compaction->column_family_data();
  324. ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
  325. db_options_.enable_thread_tracking);
  326. ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
  327. job_id_);
  328. ThreadStatusUtil::SetThreadOperationProperty(
  329. ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
  330. (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
  331. compact_->compaction->output_level());
  332. // In the current design, a CompactionJob is always created
  333. // for non-trivial compaction.
  334. assert(compaction->IsTrivialMove() == false ||
  335. compaction->is_manual_compaction() == true);
  336. ThreadStatusUtil::SetThreadOperationProperty(
  337. ThreadStatus::COMPACTION_PROP_FLAGS,
  338. compaction->is_manual_compaction() +
  339. (compaction->deletion_compaction() << 1));
  340. ThreadStatusUtil::SetThreadOperationProperty(
  341. ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
  342. compaction->CalculateTotalInputSize());
  343. IOSTATS_RESET(bytes_written);
  344. IOSTATS_RESET(bytes_read);
  345. ThreadStatusUtil::SetThreadOperationProperty(
  346. ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
  347. ThreadStatusUtil::SetThreadOperationProperty(
  348. ThreadStatus::COMPACTION_BYTES_READ, 0);
  349. // Set the thread operation after operation properties
  350. // to ensure GetThreadList() can always show them all together.
  351. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  352. if (compaction_job_stats_) {
  353. compaction_job_stats_->is_manual_compaction =
  354. compaction->is_manual_compaction();
  355. }
  356. }
  357. void CompactionJob::Prepare() {
  358. AutoThreadOperationStageUpdater stage_updater(
  359. ThreadStatus::STAGE_COMPACTION_PREPARE);
  360. // Generate file_levels_ for compaction berfore making Iterator
  361. auto* c = compact_->compaction;
  362. assert(c->column_family_data() != nullptr);
  363. assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
  364. compact_->compaction->level()) > 0);
  365. write_hint_ =
  366. c->column_family_data()->CalculateSSTWriteHint(c->output_level());
  367. bottommost_level_ = c->bottommost_level();
  368. if (c->ShouldFormSubcompactions()) {
  369. {
  370. StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
  371. GenSubcompactionBoundaries();
  372. }
  373. assert(sizes_.size() == boundaries_.size() + 1);
  374. for (size_t i = 0; i <= boundaries_.size(); i++) {
  375. Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
  376. Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
  377. compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
  378. }
  379. RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
  380. compact_->sub_compact_states.size());
  381. } else {
  382. compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
  383. }
  384. }
  385. struct RangeWithSize {
  386. Range range;
  387. uint64_t size;
  388. RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
  389. : range(a, b), size(s) {}
  390. };
  391. void CompactionJob::GenSubcompactionBoundaries() {
  392. auto* c = compact_->compaction;
  393. auto* cfd = c->column_family_data();
  394. const Comparator* cfd_comparator = cfd->user_comparator();
  395. std::vector<Slice> bounds;
  396. int start_lvl = c->start_level();
  397. int out_lvl = c->output_level();
  398. // Add the starting and/or ending key of certain input files as a potential
  399. // boundary
  400. for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
  401. int lvl = c->level(lvl_idx);
  402. if (lvl >= start_lvl && lvl <= out_lvl) {
  403. const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
  404. size_t num_files = flevel->num_files;
  405. if (num_files == 0) {
  406. continue;
  407. }
  408. if (lvl == 0) {
  409. // For level 0 add the starting and ending key of each file since the
  410. // files may have greatly differing key ranges (not range-partitioned)
  411. for (size_t i = 0; i < num_files; i++) {
  412. bounds.emplace_back(flevel->files[i].smallest_key);
  413. bounds.emplace_back(flevel->files[i].largest_key);
  414. }
  415. } else {
  416. // For all other levels add the smallest/largest key in the level to
  417. // encompass the range covered by that level
  418. bounds.emplace_back(flevel->files[0].smallest_key);
  419. bounds.emplace_back(flevel->files[num_files - 1].largest_key);
  420. if (lvl == out_lvl) {
  421. // For the last level include the starting keys of all files since
  422. // the last level is the largest and probably has the widest key
  423. // range. Since it's range partitioned, the ending key of one file
  424. // and the starting key of the next are very close (or identical).
  425. for (size_t i = 1; i < num_files; i++) {
  426. bounds.emplace_back(flevel->files[i].smallest_key);
  427. }
  428. }
  429. }
  430. }
  431. }
  432. std::sort(bounds.begin(), bounds.end(),
  433. [cfd_comparator](const Slice& a, const Slice& b) -> bool {
  434. return cfd_comparator->Compare(ExtractUserKey(a),
  435. ExtractUserKey(b)) < 0;
  436. });
  437. // Remove duplicated entries from bounds
  438. bounds.erase(
  439. std::unique(bounds.begin(), bounds.end(),
  440. [cfd_comparator](const Slice& a, const Slice& b) -> bool {
  441. return cfd_comparator->Compare(ExtractUserKey(a),
  442. ExtractUserKey(b)) == 0;
  443. }),
  444. bounds.end());
  445. // Combine consecutive pairs of boundaries into ranges with an approximate
  446. // size of data covered by keys in that range
  447. uint64_t sum = 0;
  448. std::vector<RangeWithSize> ranges;
  449. // Get input version from CompactionState since it's already referenced
  450. // earlier in SetInputVersioCompaction::SetInputVersion and will not change
  451. // when db_mutex_ is released below
  452. auto* v = compact_->compaction->input_version();
  453. for (auto it = bounds.begin();;) {
  454. const Slice a = *it;
  455. ++it;
  456. if (it == bounds.end()) {
  457. break;
  458. }
  459. const Slice b = *it;
  460. // ApproximateSize could potentially create table reader iterator to seek
  461. // to the index block and may incur I/O cost in the process. Unlock db
  462. // mutex to reduce contention
  463. db_mutex_->Unlock();
  464. uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
  465. b, start_lvl, out_lvl + 1,
  466. TableReaderCaller::kCompaction);
  467. db_mutex_->Lock();
  468. ranges.emplace_back(a, b, size);
  469. sum += size;
  470. }
  471. // Group the ranges into subcompactions
  472. const double min_file_fill_percent = 4.0 / 5;
  473. int base_level = v->storage_info()->base_level();
  474. uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
  475. sum / min_file_fill_percent /
  476. MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
  477. c->immutable_cf_options()->compaction_style, base_level,
  478. c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
  479. uint64_t subcompactions =
  480. std::min({static_cast<uint64_t>(ranges.size()),
  481. static_cast<uint64_t>(c->max_subcompactions()),
  482. max_output_files});
  483. if (subcompactions > 1) {
  484. double mean = sum * 1.0 / subcompactions;
  485. // Greedily add ranges to the subcompaction until the sum of the ranges'
  486. // sizes becomes >= the expected mean size of a subcompaction
  487. sum = 0;
  488. for (size_t i = 0; i < ranges.size() - 1; i++) {
  489. sum += ranges[i].size;
  490. if (subcompactions == 1) {
  491. // If there's only one left to schedule then it goes to the end so no
  492. // need to put an end boundary
  493. continue;
  494. }
  495. if (sum >= mean) {
  496. boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
  497. sizes_.emplace_back(sum);
  498. subcompactions--;
  499. sum = 0;
  500. }
  501. }
  502. sizes_.emplace_back(sum + ranges.back().size);
  503. } else {
  504. // Only one range so its size is the total sum of sizes computed above
  505. sizes_.emplace_back(sum);
  506. }
  507. }
  508. Status CompactionJob::Run() {
  509. AutoThreadOperationStageUpdater stage_updater(
  510. ThreadStatus::STAGE_COMPACTION_RUN);
  511. TEST_SYNC_POINT("CompactionJob::Run():Start");
  512. log_buffer_->FlushBufferToLog();
  513. LogCompaction();
  514. const size_t num_threads = compact_->sub_compact_states.size();
  515. assert(num_threads > 0);
  516. const uint64_t start_micros = env_->NowMicros();
  517. // Launch a thread for each of subcompactions 1...num_threads-1
  518. std::vector<port::Thread> thread_pool;
  519. thread_pool.reserve(num_threads - 1);
  520. for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
  521. thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
  522. &compact_->sub_compact_states[i]);
  523. }
  524. // Always schedule the first subcompaction (whether or not there are also
  525. // others) in the current thread to be efficient with resources
  526. ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
  527. // Wait for all other threads (if there are any) to finish execution
  528. for (auto& thread : thread_pool) {
  529. thread.join();
  530. }
  531. compaction_stats_.micros = env_->NowMicros() - start_micros;
  532. compaction_stats_.cpu_micros = 0;
  533. for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
  534. compaction_stats_.cpu_micros +=
  535. compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
  536. }
  537. RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
  538. RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
  539. compaction_stats_.cpu_micros);
  540. TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
  541. // Check if any thread encountered an error during execution
  542. Status status;
  543. for (const auto& state : compact_->sub_compact_states) {
  544. if (!state.status.ok()) {
  545. status = state.status;
  546. break;
  547. }
  548. }
  549. if (status.ok() && output_directory_) {
  550. status = output_directory_->Fsync();
  551. }
  552. if (status.ok()) {
  553. thread_pool.clear();
  554. std::vector<const FileMetaData*> files_meta;
  555. for (const auto& state : compact_->sub_compact_states) {
  556. for (const auto& output : state.outputs) {
  557. files_meta.emplace_back(&output.meta);
  558. }
  559. }
  560. ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  561. auto prefix_extractor =
  562. compact_->compaction->mutable_cf_options()->prefix_extractor.get();
  563. std::atomic<size_t> next_file_meta_idx(0);
  564. auto verify_table = [&](Status& output_status) {
  565. while (true) {
  566. size_t file_idx = next_file_meta_idx.fetch_add(1);
  567. if (file_idx >= files_meta.size()) {
  568. break;
  569. }
  570. // Verify that the table is usable
  571. // We set for_compaction to false and don't OptimizeForCompactionTableRead
  572. // here because this is a special case after we finish the table building
  573. // No matter whether use_direct_io_for_flush_and_compaction is true,
  574. // we will regard this verification as user reads since the goal is
  575. // to cache it here for further user reads
  576. InternalIterator* iter = cfd->table_cache()->NewIterator(
  577. ReadOptions(), file_options_, cfd->internal_comparator(),
  578. *files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
  579. /*table_reader_ptr=*/nullptr,
  580. cfd->internal_stats()->GetFileReadHist(
  581. compact_->compaction->output_level()),
  582. TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
  583. /*skip_filters=*/false, compact_->compaction->output_level(),
  584. /*smallest_compaction_key=*/nullptr,
  585. /*largest_compaction_key=*/nullptr);
  586. auto s = iter->status();
  587. if (s.ok() && paranoid_file_checks_) {
  588. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
  589. s = iter->status();
  590. }
  591. delete iter;
  592. if (!s.ok()) {
  593. output_status = s;
  594. break;
  595. }
  596. }
  597. };
  598. for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
  599. thread_pool.emplace_back(verify_table,
  600. std::ref(compact_->sub_compact_states[i].status));
  601. }
  602. verify_table(compact_->sub_compact_states[0].status);
  603. for (auto& thread : thread_pool) {
  604. thread.join();
  605. }
  606. for (const auto& state : compact_->sub_compact_states) {
  607. if (!state.status.ok()) {
  608. status = state.status;
  609. break;
  610. }
  611. }
  612. }
  613. TablePropertiesCollection tp;
  614. for (const auto& state : compact_->sub_compact_states) {
  615. for (const auto& output : state.outputs) {
  616. auto fn =
  617. TableFileName(state.compaction->immutable_cf_options()->cf_paths,
  618. output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
  619. tp[fn] = output.table_properties;
  620. }
  621. }
  622. compact_->compaction->SetOutputTableProperties(std::move(tp));
  623. // Finish up all book-keeping to unify the subcompaction results
  624. AggregateStatistics();
  625. UpdateCompactionStats();
  626. RecordCompactionIOStats();
  627. LogFlush(db_options_.info_log);
  628. TEST_SYNC_POINT("CompactionJob::Run():End");
  629. compact_->status = status;
  630. return status;
  631. }
  632. Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
  633. AutoThreadOperationStageUpdater stage_updater(
  634. ThreadStatus::STAGE_COMPACTION_INSTALL);
  635. db_mutex_->AssertHeld();
  636. Status status = compact_->status;
  637. ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  638. cfd->internal_stats()->AddCompactionStats(
  639. compact_->compaction->output_level(), thread_pri_, compaction_stats_);
  640. if (status.ok()) {
  641. status = InstallCompactionResults(mutable_cf_options);
  642. }
  643. VersionStorageInfo::LevelSummaryStorage tmp;
  644. auto vstorage = cfd->current()->storage_info();
  645. const auto& stats = compaction_stats_;
  646. double read_write_amp = 0.0;
  647. double write_amp = 0.0;
  648. double bytes_read_per_sec = 0;
  649. double bytes_written_per_sec = 0;
  650. if (stats.bytes_read_non_output_levels > 0) {
  651. read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
  652. stats.bytes_read_non_output_levels) /
  653. static_cast<double>(stats.bytes_read_non_output_levels);
  654. write_amp = stats.bytes_written /
  655. static_cast<double>(stats.bytes_read_non_output_levels);
  656. }
  657. if (stats.micros > 0) {
  658. bytes_read_per_sec =
  659. (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
  660. static_cast<double>(stats.micros);
  661. bytes_written_per_sec =
  662. stats.bytes_written / static_cast<double>(stats.micros);
  663. }
  664. ROCKS_LOG_BUFFER(
  665. log_buffer_,
  666. "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
  667. "files in(%d, %d) out(%d) "
  668. "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
  669. "write-amplify(%.1f) %s, records in: %" PRIu64
  670. ", records dropped: %" PRIu64 " output_compression: %s\n",
  671. cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
  672. bytes_written_per_sec, compact_->compaction->output_level(),
  673. stats.num_input_files_in_non_output_levels,
  674. stats.num_input_files_in_output_level, stats.num_output_files,
  675. stats.bytes_read_non_output_levels / 1048576.0,
  676. stats.bytes_read_output_level / 1048576.0,
  677. stats.bytes_written / 1048576.0, read_write_amp, write_amp,
  678. status.ToString().c_str(), stats.num_input_records,
  679. stats.num_dropped_records,
  680. CompressionTypeToString(compact_->compaction->output_compression())
  681. .c_str());
  682. UpdateCompactionJobStats(stats);
  683. auto stream = event_logger_->LogToBuffer(log_buffer_);
  684. stream << "job" << job_id_ << "event"
  685. << "compaction_finished"
  686. << "compaction_time_micros" << stats.micros
  687. << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
  688. << compact_->compaction->output_level() << "num_output_files"
  689. << compact_->NumOutputFiles() << "total_output_size"
  690. << compact_->total_bytes << "num_input_records"
  691. << stats.num_input_records << "num_output_records"
  692. << compact_->num_output_records << "num_subcompactions"
  693. << compact_->sub_compact_states.size() << "output_compression"
  694. << CompressionTypeToString(compact_->compaction->output_compression());
  695. if (compaction_job_stats_ != nullptr) {
  696. stream << "num_single_delete_mismatches"
  697. << compaction_job_stats_->num_single_del_mismatch;
  698. stream << "num_single_delete_fallthrough"
  699. << compaction_job_stats_->num_single_del_fallthru;
  700. }
  701. if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
  702. stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
  703. stream << "file_range_sync_nanos"
  704. << compaction_job_stats_->file_range_sync_nanos;
  705. stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
  706. stream << "file_prepare_write_nanos"
  707. << compaction_job_stats_->file_prepare_write_nanos;
  708. }
  709. stream << "lsm_state";
  710. stream.StartArray();
  711. for (int level = 0; level < vstorage->num_levels(); ++level) {
  712. stream << vstorage->NumLevelFiles(level);
  713. }
  714. stream.EndArray();
  715. CleanupCompaction();
  716. return status;
  717. }
  718. void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
  719. assert(sub_compact != nullptr);
  720. uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
  721. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  722. // Create compaction filter and fail the compaction if
  723. // IgnoreSnapshots() = false because it is not supported anymore
  724. const CompactionFilter* compaction_filter =
  725. cfd->ioptions()->compaction_filter;
  726. std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  727. if (compaction_filter == nullptr) {
  728. compaction_filter_from_factory =
  729. sub_compact->compaction->CreateCompactionFilter();
  730. compaction_filter = compaction_filter_from_factory.get();
  731. }
  732. if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
  733. sub_compact->status = Status::NotSupported(
  734. "CompactionFilter::IgnoreSnapshots() = false is not supported "
  735. "anymore.");
  736. return;
  737. }
  738. CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
  739. existing_snapshots_);
  740. // Although the v2 aggregator is what the level iterator(s) know about,
  741. // the AddTombstones calls will be propagated down to the v1 aggregator.
  742. std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
  743. sub_compact->compaction, &range_del_agg, file_options_for_read_));
  744. AutoThreadOperationStageUpdater stage_updater(
  745. ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
  746. // I/O measurement variables
  747. PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  748. const uint64_t kRecordStatsEvery = 1000;
  749. uint64_t prev_write_nanos = 0;
  750. uint64_t prev_fsync_nanos = 0;
  751. uint64_t prev_range_sync_nanos = 0;
  752. uint64_t prev_prepare_write_nanos = 0;
  753. uint64_t prev_cpu_write_nanos = 0;
  754. uint64_t prev_cpu_read_nanos = 0;
  755. if (measure_io_stats_) {
  756. prev_perf_level = GetPerfLevel();
  757. SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
  758. prev_write_nanos = IOSTATS(write_nanos);
  759. prev_fsync_nanos = IOSTATS(fsync_nanos);
  760. prev_range_sync_nanos = IOSTATS(range_sync_nanos);
  761. prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  762. prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
  763. prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
  764. }
  765. MergeHelper merge(
  766. env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
  767. compaction_filter, db_options_.info_log.get(),
  768. false /* internal key corruption is expected */,
  769. existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
  770. snapshot_checker_, compact_->compaction->level(),
  771. db_options_.statistics.get());
  772. TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
  773. TEST_SYNC_POINT_CALLBACK(
  774. "CompactionJob::Run():PausingManualCompaction:1",
  775. reinterpret_cast<void*>(
  776. const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
  777. Slice* start = sub_compact->start;
  778. Slice* end = sub_compact->end;
  779. if (start != nullptr) {
  780. IterKey start_iter;
  781. start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
  782. input->Seek(start_iter.GetInternalKey());
  783. } else {
  784. input->SeekToFirst();
  785. }
  786. Status status;
  787. sub_compact->c_iter.reset(new CompactionIterator(
  788. input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
  789. &existing_snapshots_, earliest_write_conflict_snapshot_,
  790. snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
  791. &range_del_agg, sub_compact->compaction, compaction_filter,
  792. shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
  793. db_options_.info_log));
  794. auto c_iter = sub_compact->c_iter.get();
  795. c_iter->SeekToFirst();
  796. if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
  797. // ShouldStopBefore() maintains state based on keys processed so far. The
  798. // compaction loop always calls it on the "next" key, thus won't tell it the
  799. // first key. So we do that here.
  800. sub_compact->ShouldStopBefore(c_iter->key(),
  801. sub_compact->current_output_file_size);
  802. }
  803. const auto& c_iter_stats = c_iter->iter_stats();
  804. while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
  805. // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
  806. // returns true.
  807. const Slice& key = c_iter->key();
  808. const Slice& value = c_iter->value();
  809. // If an end key (exclusive) is specified, check if the current key is
  810. // >= than it and exit if it is because the iterator is out of its range
  811. if (end != nullptr &&
  812. cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
  813. break;
  814. }
  815. if (c_iter_stats.num_input_records % kRecordStatsEvery ==
  816. kRecordStatsEvery - 1) {
  817. RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
  818. c_iter->ResetRecordCounts();
  819. RecordCompactionIOStats();
  820. }
  821. // Open output file if necessary
  822. if (sub_compact->builder == nullptr) {
  823. status = OpenCompactionOutputFile(sub_compact);
  824. if (!status.ok()) {
  825. break;
  826. }
  827. }
  828. assert(sub_compact->builder != nullptr);
  829. assert(sub_compact->current_output() != nullptr);
  830. sub_compact->builder->Add(key, value);
  831. sub_compact->current_output_file_size = sub_compact->builder->FileSize();
  832. const ParsedInternalKey& ikey = c_iter->ikey();
  833. sub_compact->current_output()->meta.UpdateBoundaries(
  834. key, value, ikey.sequence, ikey.type);
  835. sub_compact->num_output_records++;
  836. // Close output file if it is big enough. Two possibilities determine it's
  837. // time to close it: (1) the current key should be this file's last key, (2)
  838. // the next key should not be in this file.
  839. //
  840. // TODO(aekmekji): determine if file should be closed earlier than this
  841. // during subcompactions (i.e. if output size, estimated by input size, is
  842. // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
  843. // and 0.6MB instead of 1MB and 0.2MB)
  844. bool output_file_ended = false;
  845. Status input_status;
  846. if (sub_compact->compaction->output_level() != 0 &&
  847. sub_compact->current_output_file_size >=
  848. sub_compact->compaction->max_output_file_size()) {
  849. // (1) this key terminates the file. For historical reasons, the iterator
  850. // status before advancing will be given to FinishCompactionOutputFile().
  851. input_status = input->status();
  852. output_file_ended = true;
  853. }
  854. TEST_SYNC_POINT_CALLBACK(
  855. "CompactionJob::Run():PausingManualCompaction:2",
  856. reinterpret_cast<void*>(
  857. const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
  858. c_iter->Next();
  859. if (c_iter->status().IsManualCompactionPaused()) {
  860. break;
  861. }
  862. if (!output_file_ended && c_iter->Valid() &&
  863. sub_compact->compaction->output_level() != 0 &&
  864. sub_compact->ShouldStopBefore(c_iter->key(),
  865. sub_compact->current_output_file_size) &&
  866. sub_compact->builder != nullptr) {
  867. // (2) this key belongs to the next file. For historical reasons, the
  868. // iterator status after advancing will be given to
  869. // FinishCompactionOutputFile().
  870. input_status = input->status();
  871. output_file_ended = true;
  872. }
  873. if (output_file_ended) {
  874. const Slice* next_key = nullptr;
  875. if (c_iter->Valid()) {
  876. next_key = &c_iter->key();
  877. }
  878. CompactionIterationStats range_del_out_stats;
  879. status =
  880. FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
  881. &range_del_out_stats, next_key);
  882. RecordDroppedKeys(range_del_out_stats,
  883. &sub_compact->compaction_job_stats);
  884. }
  885. }
  886. sub_compact->compaction_job_stats.num_input_deletion_records =
  887. c_iter_stats.num_input_deletion_records;
  888. sub_compact->compaction_job_stats.num_corrupt_keys =
  889. c_iter_stats.num_input_corrupt_records;
  890. sub_compact->compaction_job_stats.num_single_del_fallthru =
  891. c_iter_stats.num_single_del_fallthru;
  892. sub_compact->compaction_job_stats.num_single_del_mismatch =
  893. c_iter_stats.num_single_del_mismatch;
  894. sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
  895. c_iter_stats.total_input_raw_key_bytes;
  896. sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
  897. c_iter_stats.total_input_raw_value_bytes;
  898. RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
  899. c_iter_stats.total_filter_time);
  900. RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
  901. RecordCompactionIOStats();
  902. if (status.ok() && cfd->IsDropped()) {
  903. status =
  904. Status::ColumnFamilyDropped("Column family dropped during compaction");
  905. }
  906. if ((status.ok() || status.IsColumnFamilyDropped()) &&
  907. shutting_down_->load(std::memory_order_relaxed)) {
  908. status = Status::ShutdownInProgress("Database shutdown");
  909. }
  910. if ((status.ok() || status.IsColumnFamilyDropped()) &&
  911. (manual_compaction_paused_ &&
  912. manual_compaction_paused_->load(std::memory_order_relaxed))) {
  913. status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  914. }
  915. if (status.ok()) {
  916. status = input->status();
  917. }
  918. if (status.ok()) {
  919. status = c_iter->status();
  920. }
  921. if (status.ok() && sub_compact->builder == nullptr &&
  922. sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
  923. // handle subcompaction containing only range deletions
  924. status = OpenCompactionOutputFile(sub_compact);
  925. }
  926. // Call FinishCompactionOutputFile() even if status is not ok: it needs to
  927. // close the output file.
  928. if (sub_compact->builder != nullptr) {
  929. CompactionIterationStats range_del_out_stats;
  930. Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
  931. &range_del_out_stats);
  932. if (status.ok()) {
  933. status = s;
  934. }
  935. RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
  936. }
  937. sub_compact->compaction_job_stats.cpu_micros =
  938. env_->NowCPUNanos() / 1000 - prev_cpu_micros;
  939. if (measure_io_stats_) {
  940. sub_compact->compaction_job_stats.file_write_nanos +=
  941. IOSTATS(write_nanos) - prev_write_nanos;
  942. sub_compact->compaction_job_stats.file_fsync_nanos +=
  943. IOSTATS(fsync_nanos) - prev_fsync_nanos;
  944. sub_compact->compaction_job_stats.file_range_sync_nanos +=
  945. IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
  946. sub_compact->compaction_job_stats.file_prepare_write_nanos +=
  947. IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
  948. sub_compact->compaction_job_stats.cpu_micros -=
  949. (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
  950. IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
  951. 1000;
  952. if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
  953. SetPerfLevel(prev_perf_level);
  954. }
  955. }
  956. sub_compact->c_iter.reset();
  957. input.reset();
  958. sub_compact->status = status;
  959. }
  960. void CompactionJob::RecordDroppedKeys(
  961. const CompactionIterationStats& c_iter_stats,
  962. CompactionJobStats* compaction_job_stats) {
  963. if (c_iter_stats.num_record_drop_user > 0) {
  964. RecordTick(stats_, COMPACTION_KEY_DROP_USER,
  965. c_iter_stats.num_record_drop_user);
  966. }
  967. if (c_iter_stats.num_record_drop_hidden > 0) {
  968. RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
  969. c_iter_stats.num_record_drop_hidden);
  970. if (compaction_job_stats) {
  971. compaction_job_stats->num_records_replaced +=
  972. c_iter_stats.num_record_drop_hidden;
  973. }
  974. }
  975. if (c_iter_stats.num_record_drop_obsolete > 0) {
  976. RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
  977. c_iter_stats.num_record_drop_obsolete);
  978. if (compaction_job_stats) {
  979. compaction_job_stats->num_expired_deletion_records +=
  980. c_iter_stats.num_record_drop_obsolete;
  981. }
  982. }
  983. if (c_iter_stats.num_record_drop_range_del > 0) {
  984. RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
  985. c_iter_stats.num_record_drop_range_del);
  986. }
  987. if (c_iter_stats.num_range_del_drop_obsolete > 0) {
  988. RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
  989. c_iter_stats.num_range_del_drop_obsolete);
  990. }
  991. if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
  992. RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
  993. c_iter_stats.num_optimized_del_drop_obsolete);
  994. }
  995. }
  996. Status CompactionJob::FinishCompactionOutputFile(
  997. const Status& input_status, SubcompactionState* sub_compact,
  998. CompactionRangeDelAggregator* range_del_agg,
  999. CompactionIterationStats* range_del_out_stats,
  1000. const Slice* next_table_min_key /* = nullptr */) {
  1001. AutoThreadOperationStageUpdater stage_updater(
  1002. ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
  1003. assert(sub_compact != nullptr);
  1004. assert(sub_compact->outfile);
  1005. assert(sub_compact->builder != nullptr);
  1006. assert(sub_compact->current_output() != nullptr);
  1007. uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
  1008. assert(output_number != 0);
  1009. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  1010. const Comparator* ucmp = cfd->user_comparator();
  1011. // Check for iterator errors
  1012. Status s = input_status;
  1013. auto meta = &sub_compact->current_output()->meta;
  1014. assert(meta != nullptr);
  1015. if (s.ok()) {
  1016. Slice lower_bound_guard, upper_bound_guard;
  1017. std::string smallest_user_key;
  1018. const Slice *lower_bound, *upper_bound;
  1019. bool lower_bound_from_sub_compact = false;
  1020. if (sub_compact->outputs.size() == 1) {
  1021. // For the first output table, include range tombstones before the min key
  1022. // but after the subcompaction boundary.
  1023. lower_bound = sub_compact->start;
  1024. lower_bound_from_sub_compact = true;
  1025. } else if (meta->smallest.size() > 0) {
  1026. // For subsequent output tables, only include range tombstones from min
  1027. // key onwards since the previous file was extended to contain range
  1028. // tombstones falling before min key.
  1029. smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
  1030. lower_bound_guard = Slice(smallest_user_key);
  1031. lower_bound = &lower_bound_guard;
  1032. } else {
  1033. lower_bound = nullptr;
  1034. }
  1035. if (next_table_min_key != nullptr) {
  1036. // This may be the last file in the subcompaction in some cases, so we
  1037. // need to compare the end key of subcompaction with the next file start
  1038. // key. When the end key is chosen by the subcompaction, we know that
  1039. // it must be the biggest key in output file. Therefore, it is safe to
  1040. // use the smaller key as the upper bound of the output file, to ensure
  1041. // that there is no overlapping between different output files.
  1042. upper_bound_guard = ExtractUserKey(*next_table_min_key);
  1043. if (sub_compact->end != nullptr &&
  1044. ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
  1045. upper_bound = sub_compact->end;
  1046. } else {
  1047. upper_bound = &upper_bound_guard;
  1048. }
  1049. } else {
  1050. // This is the last file in the subcompaction, so extend until the
  1051. // subcompaction ends.
  1052. upper_bound = sub_compact->end;
  1053. }
  1054. auto earliest_snapshot = kMaxSequenceNumber;
  1055. if (existing_snapshots_.size() > 0) {
  1056. earliest_snapshot = existing_snapshots_[0];
  1057. }
  1058. bool has_overlapping_endpoints;
  1059. if (upper_bound != nullptr && meta->largest.size() > 0) {
  1060. has_overlapping_endpoints =
  1061. ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
  1062. } else {
  1063. has_overlapping_endpoints = false;
  1064. }
  1065. // The end key of the subcompaction must be bigger or equal to the upper
  1066. // bound. If the end of subcompaction is null or the upper bound is null,
  1067. // it means that this file is the last file in the compaction. So there
  1068. // will be no overlapping between this file and others.
  1069. assert(sub_compact->end == nullptr ||
  1070. upper_bound == nullptr ||
  1071. ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
  1072. auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
  1073. has_overlapping_endpoints);
  1074. // Position the range tombstone output iterator. There may be tombstone
  1075. // fragments that are entirely out of range, so make sure that we do not
  1076. // include those.
  1077. if (lower_bound != nullptr) {
  1078. it->Seek(*lower_bound);
  1079. } else {
  1080. it->SeekToFirst();
  1081. }
  1082. for (; it->Valid(); it->Next()) {
  1083. auto tombstone = it->Tombstone();
  1084. if (upper_bound != nullptr) {
  1085. int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
  1086. if ((has_overlapping_endpoints && cmp < 0) ||
  1087. (!has_overlapping_endpoints && cmp <= 0)) {
  1088. // Tombstones starting after upper_bound only need to be included in
  1089. // the next table. If the current SST ends before upper_bound, i.e.,
  1090. // `has_overlapping_endpoints == false`, we can also skip over range
  1091. // tombstones that start exactly at upper_bound. Such range tombstones
  1092. // will be included in the next file and are not relevant to the point
  1093. // keys or endpoints of the current file.
  1094. break;
  1095. }
  1096. }
  1097. if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
  1098. // TODO(andrewkr): tombstones that span multiple output files are
  1099. // counted for each compaction output file, so lots of double counting.
  1100. range_del_out_stats->num_range_del_drop_obsolete++;
  1101. range_del_out_stats->num_record_drop_obsolete++;
  1102. continue;
  1103. }
  1104. auto kv = tombstone.Serialize();
  1105. assert(lower_bound == nullptr ||
  1106. ucmp->Compare(*lower_bound, kv.second) < 0);
  1107. sub_compact->builder->Add(kv.first.Encode(), kv.second);
  1108. InternalKey smallest_candidate = std::move(kv.first);
  1109. if (lower_bound != nullptr &&
  1110. ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
  1111. // Pretend the smallest key has the same user key as lower_bound
  1112. // (the max key in the previous table or subcompaction) in order for
  1113. // files to appear key-space partitioned.
  1114. //
  1115. // When lower_bound is chosen by a subcompaction, we know that
  1116. // subcompactions over smaller keys cannot contain any keys at
  1117. // lower_bound. We also know that smaller subcompactions exist, because
  1118. // otherwise the subcompaction woud be unbounded on the left. As a
  1119. // result, we know that no other files on the output level will contain
  1120. // actual keys at lower_bound (an output file may have a largest key of
  1121. // lower_bound@kMaxSequenceNumber, but this only indicates a large range
  1122. // tombstone was truncated). Therefore, it is safe to use the
  1123. // tombstone's sequence number, to ensure that keys at lower_bound at
  1124. // lower levels are covered by truncated tombstones.
  1125. //
  1126. // If lower_bound was chosen by the smallest data key in the file,
  1127. // choose lowest seqnum so this file's smallest internal key comes after
  1128. // the previous file's largest. The fake seqnum is OK because the read
  1129. // path's file-picking code only considers user key.
  1130. smallest_candidate = InternalKey(
  1131. *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
  1132. kTypeRangeDeletion);
  1133. }
  1134. InternalKey largest_candidate = tombstone.SerializeEndKey();
  1135. if (upper_bound != nullptr &&
  1136. ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
  1137. // Pretend the largest key has the same user key as upper_bound (the
  1138. // min key in the following table or subcompaction) in order for files
  1139. // to appear key-space partitioned.
  1140. //
  1141. // Choose highest seqnum so this file's largest internal key comes
  1142. // before the next file's/subcompaction's smallest. The fake seqnum is
  1143. // OK because the read path's file-picking code only considers the user
  1144. // key portion.
  1145. //
  1146. // Note Seek() also creates InternalKey with (user_key,
  1147. // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
  1148. // kTypeRangeDeletion (0xF), so the range tombstone comes before the
  1149. // Seek() key in InternalKey's ordering. So Seek() will look in the
  1150. // next file for the user key.
  1151. largest_candidate =
  1152. InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
  1153. }
  1154. #ifndef NDEBUG
  1155. SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
  1156. if (meta->smallest.size() > 0) {
  1157. smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
  1158. }
  1159. #endif
  1160. meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
  1161. tombstone.seq_,
  1162. cfd->internal_comparator());
  1163. // The smallest key in a file is used for range tombstone truncation, so
  1164. // it cannot have a seqnum of 0 (unless the smallest data key in a file
  1165. // has a seqnum of 0). Otherwise, the truncated tombstone may expose
  1166. // deleted keys at lower levels.
  1167. assert(smallest_ikey_seqnum == 0 ||
  1168. ExtractInternalKeyFooter(meta->smallest.Encode()) !=
  1169. PackSequenceAndType(0, kTypeRangeDeletion));
  1170. }
  1171. meta->marked_for_compaction = sub_compact->builder->NeedCompact();
  1172. }
  1173. const uint64_t current_entries = sub_compact->builder->NumEntries();
  1174. if (s.ok()) {
  1175. s = sub_compact->builder->Finish();
  1176. } else {
  1177. sub_compact->builder->Abandon();
  1178. }
  1179. const uint64_t current_bytes = sub_compact->builder->FileSize();
  1180. if (s.ok()) {
  1181. // Add the checksum information to file metadata.
  1182. meta->file_checksum = sub_compact->builder->GetFileChecksum();
  1183. meta->file_checksum_func_name =
  1184. sub_compact->builder->GetFileChecksumFuncName();
  1185. meta->fd.file_size = current_bytes;
  1186. }
  1187. sub_compact->current_output()->finished = true;
  1188. sub_compact->total_bytes += current_bytes;
  1189. // Finish and check for file errors
  1190. if (s.ok()) {
  1191. StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
  1192. s = sub_compact->outfile->Sync(db_options_.use_fsync);
  1193. }
  1194. if (s.ok()) {
  1195. s = sub_compact->outfile->Close();
  1196. }
  1197. sub_compact->outfile.reset();
  1198. TableProperties tp;
  1199. if (s.ok()) {
  1200. tp = sub_compact->builder->GetTableProperties();
  1201. }
  1202. if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
  1203. // If there is nothing to output, no necessary to generate a sst file.
  1204. // This happens when the output level is bottom level, at the same time
  1205. // the sub_compact output nothing.
  1206. std::string fname =
  1207. TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
  1208. meta->fd.GetNumber(), meta->fd.GetPathId());
  1209. env_->DeleteFile(fname);
  1210. // Also need to remove the file from outputs, or it will be added to the
  1211. // VersionEdit.
  1212. assert(!sub_compact->outputs.empty());
  1213. sub_compact->outputs.pop_back();
  1214. meta = nullptr;
  1215. }
  1216. if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
  1217. // Output to event logger and fire events.
  1218. sub_compact->current_output()->table_properties =
  1219. std::make_shared<TableProperties>(tp);
  1220. ROCKS_LOG_INFO(db_options_.info_log,
  1221. "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
  1222. " keys, %" PRIu64 " bytes%s",
  1223. cfd->GetName().c_str(), job_id_, output_number,
  1224. current_entries, current_bytes,
  1225. meta->marked_for_compaction ? " (need compaction)" : "");
  1226. }
  1227. std::string fname;
  1228. FileDescriptor output_fd;
  1229. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
  1230. if (meta != nullptr) {
  1231. fname =
  1232. TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
  1233. meta->fd.GetNumber(), meta->fd.GetPathId());
  1234. output_fd = meta->fd;
  1235. oldest_blob_file_number = meta->oldest_blob_file_number;
  1236. } else {
  1237. fname = "(nil)";
  1238. }
  1239. EventHelpers::LogAndNotifyTableFileCreationFinished(
  1240. event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
  1241. job_id_, output_fd, oldest_blob_file_number, tp,
  1242. TableFileCreationReason::kCompaction, s);
  1243. #ifndef ROCKSDB_LITE
  1244. // Report new file to SstFileManagerImpl
  1245. auto sfm =
  1246. static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
  1247. if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
  1248. sfm->OnAddFile(fname);
  1249. if (sfm->IsMaxAllowedSpaceReached()) {
  1250. // TODO(ajkr): should we return OK() if max space was reached by the final
  1251. // compaction output file (similarly to how flush works when full)?
  1252. s = Status::SpaceLimit("Max allowed space was reached");
  1253. TEST_SYNC_POINT(
  1254. "CompactionJob::FinishCompactionOutputFile:"
  1255. "MaxAllowedSpaceReached");
  1256. InstrumentedMutexLock l(db_mutex_);
  1257. db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
  1258. }
  1259. }
  1260. #endif
  1261. sub_compact->builder.reset();
  1262. sub_compact->current_output_file_size = 0;
  1263. return s;
  1264. }
  1265. Status CompactionJob::InstallCompactionResults(
  1266. const MutableCFOptions& mutable_cf_options) {
  1267. db_mutex_->AssertHeld();
  1268. auto* compaction = compact_->compaction;
  1269. // paranoia: verify that the files that we started with
  1270. // still exist in the current version and in the same original level.
  1271. // This ensures that a concurrent compaction did not erroneously
  1272. // pick the same files to compact_.
  1273. if (!versions_->VerifyCompactionFileConsistency(compaction)) {
  1274. Compaction::InputLevelSummaryBuffer inputs_summary;
  1275. ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
  1276. compaction->column_family_data()->GetName().c_str(),
  1277. job_id_, compaction->InputLevelSummary(&inputs_summary));
  1278. return Status::Corruption("Compaction input files inconsistent");
  1279. }
  1280. {
  1281. Compaction::InputLevelSummaryBuffer inputs_summary;
  1282. ROCKS_LOG_INFO(
  1283. db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
  1284. compaction->column_family_data()->GetName().c_str(), job_id_,
  1285. compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
  1286. }
  1287. // Add compaction inputs
  1288. compaction->AddInputDeletions(compact_->compaction->edit());
  1289. for (const auto& sub_compact : compact_->sub_compact_states) {
  1290. for (const auto& out : sub_compact.outputs) {
  1291. compaction->edit()->AddFile(compaction->output_level(), out.meta);
  1292. }
  1293. }
  1294. return versions_->LogAndApply(compaction->column_family_data(),
  1295. mutable_cf_options, compaction->edit(),
  1296. db_mutex_, db_directory_);
  1297. }
  1298. void CompactionJob::RecordCompactionIOStats() {
  1299. RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
  1300. ThreadStatusUtil::IncreaseThreadOperationProperty(
  1301. ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
  1302. IOSTATS_RESET(bytes_read);
  1303. RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
  1304. ThreadStatusUtil::IncreaseThreadOperationProperty(
  1305. ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
  1306. IOSTATS_RESET(bytes_written);
  1307. }
  1308. Status CompactionJob::OpenCompactionOutputFile(
  1309. SubcompactionState* sub_compact) {
  1310. assert(sub_compact != nullptr);
  1311. assert(sub_compact->builder == nullptr);
  1312. // no need to lock because VersionSet::next_file_number_ is atomic
  1313. uint64_t file_number = versions_->NewFileNumber();
  1314. std::string fname =
  1315. TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
  1316. file_number, sub_compact->compaction->output_path_id());
  1317. // Fire events.
  1318. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
  1319. #ifndef ROCKSDB_LITE
  1320. EventHelpers::NotifyTableFileCreationStarted(
  1321. cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
  1322. TableFileCreationReason::kCompaction);
  1323. #endif // !ROCKSDB_LITE
  1324. // Make the output file
  1325. std::unique_ptr<FSWritableFile> writable_file;
  1326. #ifndef NDEBUG
  1327. bool syncpoint_arg = file_options_.use_direct_writes;
  1328. TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
  1329. &syncpoint_arg);
  1330. #endif
  1331. Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
  1332. if (!s.ok()) {
  1333. ROCKS_LOG_ERROR(
  1334. db_options_.info_log,
  1335. "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
  1336. " fails at NewWritableFile with status %s",
  1337. sub_compact->compaction->column_family_data()->GetName().c_str(),
  1338. job_id_, file_number, s.ToString().c_str());
  1339. LogFlush(db_options_.info_log);
  1340. EventHelpers::LogAndNotifyTableFileCreationFinished(
  1341. event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
  1342. fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
  1343. TableProperties(), TableFileCreationReason::kCompaction, s);
  1344. return s;
  1345. }
  1346. // Try to figure out the output file's oldest ancester time.
  1347. int64_t temp_current_time = 0;
  1348. auto get_time_status = env_->GetCurrentTime(&temp_current_time);
  1349. // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
  1350. if (!get_time_status.ok()) {
  1351. ROCKS_LOG_WARN(db_options_.info_log,
  1352. "Failed to get current time. Status: %s",
  1353. get_time_status.ToString().c_str());
  1354. }
  1355. uint64_t current_time = static_cast<uint64_t>(temp_current_time);
  1356. uint64_t oldest_ancester_time =
  1357. sub_compact->compaction->MinInputFileOldestAncesterTime();
  1358. if (oldest_ancester_time == port::kMaxUint64) {
  1359. oldest_ancester_time = current_time;
  1360. }
  1361. // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
  1362. {
  1363. SubcompactionState::Output out;
  1364. out.meta.fd = FileDescriptor(file_number,
  1365. sub_compact->compaction->output_path_id(), 0);
  1366. out.meta.oldest_ancester_time = oldest_ancester_time;
  1367. out.meta.file_creation_time = current_time;
  1368. out.finished = false;
  1369. sub_compact->outputs.push_back(out);
  1370. }
  1371. writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
  1372. writable_file->SetWriteLifeTimeHint(write_hint_);
  1373. writable_file->SetPreallocationBlockSize(static_cast<size_t>(
  1374. sub_compact->compaction->OutputFilePreallocationSize()));
  1375. const auto& listeners =
  1376. sub_compact->compaction->immutable_cf_options()->listeners;
  1377. sub_compact->outfile.reset(
  1378. new WritableFileWriter(std::move(writable_file), fname, file_options_,
  1379. env_, db_options_.statistics.get(), listeners,
  1380. db_options_.sst_file_checksum_func.get()));
  1381. // If the Column family flag is to only optimize filters for hits,
  1382. // we can skip creating filters if this is the bottommost_level where
  1383. // data is going to be found
  1384. bool skip_filters =
  1385. cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
  1386. sub_compact->builder.reset(NewTableBuilder(
  1387. *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
  1388. cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
  1389. cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
  1390. sub_compact->compaction->output_compression(),
  1391. 0 /*sample_for_compression */,
  1392. sub_compact->compaction->output_compression_opts(),
  1393. sub_compact->compaction->output_level(), skip_filters,
  1394. oldest_ancester_time, 0 /* oldest_key_time */,
  1395. sub_compact->compaction->max_output_file_size(), current_time));
  1396. LogFlush(db_options_.info_log);
  1397. return s;
  1398. }
  1399. void CompactionJob::CleanupCompaction() {
  1400. for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
  1401. const auto& sub_status = sub_compact.status;
  1402. if (sub_compact.builder != nullptr) {
  1403. // May happen if we get a shutdown call in the middle of compaction
  1404. sub_compact.builder->Abandon();
  1405. sub_compact.builder.reset();
  1406. } else {
  1407. assert(!sub_status.ok() || sub_compact.outfile == nullptr);
  1408. }
  1409. for (const auto& out : sub_compact.outputs) {
  1410. // If this file was inserted into the table cache then remove
  1411. // them here because this compaction was not committed.
  1412. if (!sub_status.ok()) {
  1413. TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
  1414. }
  1415. }
  1416. }
  1417. delete compact_;
  1418. compact_ = nullptr;
  1419. }
  1420. #ifndef ROCKSDB_LITE
  1421. namespace {
  1422. void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
  1423. assert(prefix_length > 0);
  1424. size_t length = src.size() > prefix_length ? prefix_length : src.size();
  1425. dst->assign(src.data(), length);
  1426. }
  1427. } // namespace
  1428. #endif // !ROCKSDB_LITE
  1429. void CompactionJob::UpdateCompactionStats() {
  1430. Compaction* compaction = compact_->compaction;
  1431. compaction_stats_.num_input_files_in_non_output_levels = 0;
  1432. compaction_stats_.num_input_files_in_output_level = 0;
  1433. for (int input_level = 0;
  1434. input_level < static_cast<int>(compaction->num_input_levels());
  1435. ++input_level) {
  1436. if (compaction->level(input_level) != compaction->output_level()) {
  1437. UpdateCompactionInputStatsHelper(
  1438. &compaction_stats_.num_input_files_in_non_output_levels,
  1439. &compaction_stats_.bytes_read_non_output_levels, input_level);
  1440. } else {
  1441. UpdateCompactionInputStatsHelper(
  1442. &compaction_stats_.num_input_files_in_output_level,
  1443. &compaction_stats_.bytes_read_output_level, input_level);
  1444. }
  1445. }
  1446. uint64_t num_output_records = 0;
  1447. for (const auto& sub_compact : compact_->sub_compact_states) {
  1448. size_t num_output_files = sub_compact.outputs.size();
  1449. if (sub_compact.builder != nullptr) {
  1450. // An error occurred so ignore the last output.
  1451. assert(num_output_files > 0);
  1452. --num_output_files;
  1453. }
  1454. compaction_stats_.num_output_files += static_cast<int>(num_output_files);
  1455. num_output_records += sub_compact.num_output_records;
  1456. for (const auto& out : sub_compact.outputs) {
  1457. compaction_stats_.bytes_written += out.meta.fd.file_size;
  1458. }
  1459. }
  1460. if (compaction_stats_.num_input_records > num_output_records) {
  1461. compaction_stats_.num_dropped_records =
  1462. compaction_stats_.num_input_records - num_output_records;
  1463. }
  1464. }
  1465. void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
  1466. uint64_t* bytes_read,
  1467. int input_level) {
  1468. const Compaction* compaction = compact_->compaction;
  1469. auto num_input_files = compaction->num_input_files(input_level);
  1470. *num_files += static_cast<int>(num_input_files);
  1471. for (size_t i = 0; i < num_input_files; ++i) {
  1472. const auto* file_meta = compaction->input(input_level, i);
  1473. *bytes_read += file_meta->fd.GetFileSize();
  1474. compaction_stats_.num_input_records +=
  1475. static_cast<uint64_t>(file_meta->num_entries);
  1476. }
  1477. }
  1478. void CompactionJob::UpdateCompactionJobStats(
  1479. const InternalStats::CompactionStats& stats) const {
  1480. #ifndef ROCKSDB_LITE
  1481. if (compaction_job_stats_) {
  1482. compaction_job_stats_->elapsed_micros = stats.micros;
  1483. // input information
  1484. compaction_job_stats_->total_input_bytes =
  1485. stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
  1486. compaction_job_stats_->num_input_records = stats.num_input_records;
  1487. compaction_job_stats_->num_input_files =
  1488. stats.num_input_files_in_non_output_levels +
  1489. stats.num_input_files_in_output_level;
  1490. compaction_job_stats_->num_input_files_at_output_level =
  1491. stats.num_input_files_in_output_level;
  1492. // output information
  1493. compaction_job_stats_->total_output_bytes = stats.bytes_written;
  1494. compaction_job_stats_->num_output_records = compact_->num_output_records;
  1495. compaction_job_stats_->num_output_files = stats.num_output_files;
  1496. if (compact_->NumOutputFiles() > 0U) {
  1497. CopyPrefix(compact_->SmallestUserKey(),
  1498. CompactionJobStats::kMaxPrefixLength,
  1499. &compaction_job_stats_->smallest_output_key_prefix);
  1500. CopyPrefix(compact_->LargestUserKey(),
  1501. CompactionJobStats::kMaxPrefixLength,
  1502. &compaction_job_stats_->largest_output_key_prefix);
  1503. }
  1504. }
  1505. #else
  1506. (void)stats;
  1507. #endif // !ROCKSDB_LITE
  1508. }
  1509. void CompactionJob::LogCompaction() {
  1510. Compaction* compaction = compact_->compaction;
  1511. ColumnFamilyData* cfd = compaction->column_family_data();
  1512. // Let's check if anything will get logged. Don't prepare all the info if
  1513. // we're not logging
  1514. if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
  1515. Compaction::InputLevelSummaryBuffer inputs_summary;
  1516. ROCKS_LOG_INFO(
  1517. db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
  1518. cfd->GetName().c_str(), job_id_,
  1519. compaction->InputLevelSummary(&inputs_summary), compaction->score());
  1520. char scratch[2345];
  1521. compaction->Summary(scratch, sizeof(scratch));
  1522. ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
  1523. cfd->GetName().c_str(), scratch);
  1524. // build event logger report
  1525. auto stream = event_logger_->Log();
  1526. stream << "job" << job_id_ << "event"
  1527. << "compaction_started"
  1528. << "compaction_reason"
  1529. << GetCompactionReasonString(compaction->compaction_reason());
  1530. for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
  1531. stream << ("files_L" + ToString(compaction->level(i)));
  1532. stream.StartArray();
  1533. for (auto f : *compaction->inputs(i)) {
  1534. stream << f->fd.GetNumber();
  1535. }
  1536. stream.EndArray();
  1537. }
  1538. stream << "score" << compaction->score() << "input_data_size"
  1539. << compaction->CalculateTotalInputSize();
  1540. }
  1541. }
  1542. } // namespace ROCKSDB_NAMESPACE