flush_job.cc 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/flush_job.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <vector>
  13. #include "db/builder.h"
  14. #include "db/db_iter.h"
  15. #include "db/dbformat.h"
  16. #include "db/event_helpers.h"
  17. #include "db/log_reader.h"
  18. #include "db/log_writer.h"
  19. #include "db/memtable.h"
  20. #include "db/memtable_list.h"
  21. #include "db/merge_context.h"
  22. #include "db/range_tombstone_fragmenter.h"
  23. #include "db/version_edit.h"
  24. #include "db/version_set.h"
  25. #include "file/file_util.h"
  26. #include "file/filename.h"
  27. #include "logging/event_logger.h"
  28. #include "logging/log_buffer.h"
  29. #include "logging/logging.h"
  30. #include "monitoring/iostats_context_imp.h"
  31. #include "monitoring/perf_context_imp.h"
  32. #include "monitoring/thread_status_util.h"
  33. #include "port/port.h"
  34. #include "rocksdb/db.h"
  35. #include "rocksdb/env.h"
  36. #include "rocksdb/statistics.h"
  37. #include "rocksdb/status.h"
  38. #include "rocksdb/table.h"
  39. #include "table/merging_iterator.h"
  40. #include "table/table_builder.h"
  41. #include "table/two_level_iterator.h"
  42. #include "test_util/sync_point.h"
  43. #include "util/coding.h"
  44. #include "util/mutexlock.h"
  45. #include "util/stop_watch.h"
  46. namespace ROCKSDB_NAMESPACE {
  47. const char* GetFlushReasonString(FlushReason flush_reason) {
  48. switch (flush_reason) {
  49. case FlushReason::kOthers:
  50. return "Other Reasons";
  51. case FlushReason::kGetLiveFiles:
  52. return "Get Live Files";
  53. case FlushReason::kShutDown:
  54. return "Shut down";
  55. case FlushReason::kExternalFileIngestion:
  56. return "External File Ingestion";
  57. case FlushReason::kManualCompaction:
  58. return "Manual Compaction";
  59. case FlushReason::kWriteBufferManager:
  60. return "Write Buffer Manager";
  61. case FlushReason::kWriteBufferFull:
  62. return "Write Buffer Full";
  63. case FlushReason::kTest:
  64. return "Test";
  65. case FlushReason::kDeleteFiles:
  66. return "Delete Files";
  67. case FlushReason::kAutoCompaction:
  68. return "Auto Compaction";
  69. case FlushReason::kManualFlush:
  70. return "Manual Flush";
  71. case FlushReason::kErrorRecovery:
  72. return "Error Recovery";
  73. case FlushReason::kErrorRecoveryRetryFlush:
  74. return "Error Recovery Retry Flush";
  75. case FlushReason::kWalFull:
  76. return "WAL Full";
  77. case FlushReason::kCatchUpAfterErrorRecovery:
  78. return "Catch Up After Error Recovery";
  79. default:
  80. return "Invalid";
  81. }
  82. }
  83. FlushJob::FlushJob(
  84. const std::string& dbname, ColumnFamilyData* cfd,
  85. const ImmutableDBOptions& db_options,
  86. const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
  87. const FileOptions& file_options, VersionSet* versions,
  88. InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
  89. JobContext* job_context, FlushReason flush_reason, LogBuffer* log_buffer,
  90. FSDirectory* db_directory, FSDirectory* output_file_directory,
  91. CompressionType output_compression, Statistics* stats,
  92. EventLogger* event_logger, bool measure_io_stats,
  93. const bool sync_output_directory, const bool write_manifest,
  94. Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
  95. std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
  96. const std::string& db_id, const std::string& db_session_id,
  97. std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
  98. : dbname_(dbname),
  99. db_id_(db_id),
  100. db_session_id_(db_session_id),
  101. cfd_(cfd),
  102. db_options_(db_options),
  103. mutable_cf_options_(mutable_cf_options),
  104. max_memtable_id_(max_memtable_id),
  105. file_options_(file_options),
  106. versions_(versions),
  107. db_mutex_(db_mutex),
  108. shutting_down_(shutting_down),
  109. earliest_snapshot_(job_context->GetEarliestSnapshotSequence()),
  110. job_context_(job_context),
  111. flush_reason_(flush_reason),
  112. log_buffer_(log_buffer),
  113. db_directory_(db_directory),
  114. output_file_directory_(output_file_directory),
  115. output_compression_(output_compression),
  116. stats_(stats),
  117. event_logger_(event_logger),
  118. measure_io_stats_(measure_io_stats),
  119. sync_output_directory_(sync_output_directory),
  120. write_manifest_(write_manifest),
  121. edit_(nullptr),
  122. base_(nullptr),
  123. pick_memtable_called(false),
  124. thread_pri_(thread_pri),
  125. io_tracer_(io_tracer),
  126. clock_(db_options_.clock),
  127. full_history_ts_low_(std::move(full_history_ts_low)),
  128. blob_callback_(blob_callback),
  129. seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) {
  130. assert(job_context->snapshot_context_initialized);
  131. // Update the thread status to indicate flush.
  132. ReportStartedFlush();
  133. TEST_SYNC_POINT("FlushJob::FlushJob()");
  134. }
  135. FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); }
  136. void FlushJob::ReportStartedFlush() {
  137. ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
  138. ThreadStatusUtil::SetColumnFamily(cfd_);
  139. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  140. ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
  141. job_context_->job_id);
  142. IOSTATS_RESET(bytes_written);
  143. }
  144. void FlushJob::ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems) {
  145. uint64_t input_size = 0;
  146. for (auto* mem : mems) {
  147. input_size += mem->ApproximateMemoryUsage();
  148. }
  149. ThreadStatusUtil::IncreaseThreadOperationProperty(
  150. ThreadStatus::FLUSH_BYTES_MEMTABLES, input_size);
  151. }
  152. void FlushJob::RecordFlushIOStats() {
  153. RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  154. ThreadStatusUtil::IncreaseThreadOperationProperty(
  155. ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
  156. IOSTATS_RESET(bytes_written);
  157. }
  158. void FlushJob::PickMemTable() {
  159. db_mutex_->AssertHeld();
  160. assert(!pick_memtable_called);
  161. pick_memtable_called = true;
  162. // Maximum "NextLogNumber" of the memtables to flush.
  163. // When mempurge feature is turned off, this variable is useless
  164. // because the memtables are implicitly sorted by increasing order of creation
  165. // time. Therefore mems_->back()->GetNextLogNumber() is already equal to
  166. // max_next_log_number. However when Mempurge is on, the memtables are no
  167. // longer sorted by increasing order of creation time. Therefore this variable
  168. // becomes necessary because mems_->back()->GetNextLogNumber() is no longer
  169. // necessarily equal to max_next_log_number.
  170. uint64_t max_next_log_number = 0;
  171. // Save the contents of the earliest memtable as a new Table
  172. cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
  173. &max_next_log_number);
  174. if (mems_.empty()) {
  175. return;
  176. }
  177. // Track effective cutoff user-defined timestamp during flush if
  178. // user-defined timestamps can be stripped.
  179. GetEffectiveCutoffUDTForPickedMemTables();
  180. GetPrecludeLastLevelMinSeqno();
  181. ReportFlushInputSize(mems_);
  182. // entries mems are (implicitly) sorted in ascending order by their created
  183. // time. We will use the first memtable's `edit` to keep the meta info for
  184. // this flush.
  185. ReadOnlyMemTable* m = mems_[0];
  186. edit_ = m->GetEdits();
  187. edit_->SetPrevLogNumber(0);
  188. // SetLogNumber(log_num) indicates logs with number smaller than log_num
  189. // will no longer be picked up for recovery.
  190. edit_->SetLogNumber(max_next_log_number);
  191. edit_->SetColumnFamily(cfd_->GetID());
  192. // path 0 for level 0 file.
  193. meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  194. meta_.epoch_number = cfd_->NewEpochNumber();
  195. base_ = cfd_->current();
  196. base_->Ref(); // it is likely that we do not need this reference
  197. }
  198. Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
  199. bool* switched_to_mempurge, bool* skipped_since_bg_error,
  200. ErrorHandler* error_handler) {
  201. TEST_SYNC_POINT("FlushJob::Start");
  202. db_mutex_->AssertHeld();
  203. assert(pick_memtable_called);
  204. // Mempurge threshold can be dynamically changed.
  205. // For sake of consistency, mempurge_threshold is
  206. // saved locally to maintain consistency in each
  207. // FlushJob::Run call.
  208. double mempurge_threshold =
  209. mutable_cf_options_.experimental_mempurge_threshold;
  210. AutoThreadOperationStageUpdater stage_run(ThreadStatus::STAGE_FLUSH_RUN);
  211. if (mems_.empty()) {
  212. ROCKS_LOG_BUFFER(log_buffer_, "[%s] No memtable to flush",
  213. cfd_->GetName().c_str());
  214. return Status::OK();
  215. }
  216. // I/O measurement variables
  217. PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  218. uint64_t prev_write_nanos = 0;
  219. uint64_t prev_fsync_nanos = 0;
  220. uint64_t prev_range_sync_nanos = 0;
  221. uint64_t prev_prepare_write_nanos = 0;
  222. uint64_t prev_cpu_write_nanos = 0;
  223. uint64_t prev_cpu_read_nanos = 0;
  224. if (measure_io_stats_) {
  225. prev_perf_level = GetPerfLevel();
  226. SetPerfLevel(PerfLevel::kEnableTime);
  227. prev_write_nanos = IOSTATS(write_nanos);
  228. prev_fsync_nanos = IOSTATS(fsync_nanos);
  229. prev_range_sync_nanos = IOSTATS(range_sync_nanos);
  230. prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  231. prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
  232. prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
  233. }
  234. Status mempurge_s = Status::NotFound("No MemPurge.");
  235. if ((mempurge_threshold > 0.0) &&
  236. (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) &&
  237. MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) {
  238. cfd_->SetMempurgeUsed();
  239. mempurge_s = MemPurge();
  240. if (!mempurge_s.ok()) {
  241. // Mempurge is typically aborted when the output
  242. // bytes cannot be contained onto a single output memtable.
  243. if (mempurge_s.IsAborted()) {
  244. ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
  245. mempurge_s.ToString().c_str());
  246. } else {
  247. // However the mempurge process can also fail for
  248. // other reasons (eg: new_mem->Add() fails).
  249. ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
  250. mempurge_s.ToString().c_str());
  251. }
  252. } else {
  253. if (switched_to_mempurge) {
  254. *switched_to_mempurge = true;
  255. } else {
  256. // The mempurge process was successful, but no switch_to_mempurge
  257. // pointer provided so no way to propagate the state of flush job.
  258. ROCKS_LOG_WARN(db_options_.info_log,
  259. "Mempurge process succeeded"
  260. "but no 'switched_to_mempurge' ptr provided.\n");
  261. }
  262. }
  263. }
  264. Status s;
  265. if (mempurge_s.ok()) {
  266. base_->Unref();
  267. s = Status::OK();
  268. } else {
  269. // This will release and re-acquire the mutex.
  270. s = WriteLevel0Table();
  271. }
  272. if (s.ok() && cfd_->IsDropped()) {
  273. s = Status::ColumnFamilyDropped("Column family dropped during compaction");
  274. }
  275. if ((s.ok() || s.IsColumnFamilyDropped()) &&
  276. shutting_down_->load(std::memory_order_acquire)) {
  277. s = Status::ShutdownInProgress("Database shutdown");
  278. }
  279. if (s.ok()) {
  280. s = MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
  281. }
  282. if (!s.ok()) {
  283. cfd_->imm()->RollbackMemtableFlush(
  284. mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
  285. } else if (write_manifest_) {
  286. assert(!db_options_.atomic_flush);
  287. if (!db_options_.atomic_flush &&
  288. flush_reason_ != FlushReason::kErrorRecovery &&
  289. flush_reason_ != FlushReason::kErrorRecoveryRetryFlush &&
  290. error_handler && !error_handler->GetBGError().ok() &&
  291. error_handler->IsBGWorkStopped()) {
  292. cfd_->imm()->RollbackMemtableFlush(
  293. mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
  294. s = error_handler->GetBGError();
  295. if (skipped_since_bg_error) {
  296. *skipped_since_bg_error = true;
  297. }
  298. } else {
  299. TEST_SYNC_POINT("FlushJob::InstallResults");
  300. // Replace immutable memtable with the generated Table
  301. s = cfd_->imm()->TryInstallMemtableFlushResults(
  302. cfd_, mems_, prep_tracker, versions_, db_mutex_,
  303. meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
  304. log_buffer_, &committed_flush_jobs_info_,
  305. !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
  306. but 'false' if mempurge successful: no new min log number
  307. or new level 0 file path to write to manifest. */);
  308. }
  309. }
  310. if (s.ok() && file_meta != nullptr) {
  311. *file_meta = meta_;
  312. }
  313. RecordFlushIOStats();
  314. // When measure_io_stats_ is true, the default 512 bytes is not enough.
  315. auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
  316. stream << "job" << job_context_->job_id << "event" << "flush_finished";
  317. stream << "output_compression"
  318. << CompressionTypeToString(output_compression_);
  319. stream << "lsm_state";
  320. stream.StartArray();
  321. auto vstorage = cfd_->current()->storage_info();
  322. for (int level = 0; level < vstorage->num_levels(); ++level) {
  323. stream << vstorage->NumLevelFiles(level);
  324. }
  325. stream.EndArray();
  326. const auto& blob_files = vstorage->GetBlobFiles();
  327. if (!blob_files.empty()) {
  328. assert(blob_files.front());
  329. stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
  330. assert(blob_files.back());
  331. stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
  332. }
  333. stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
  334. if (measure_io_stats_) {
  335. if (prev_perf_level != PerfLevel::kEnableTime) {
  336. SetPerfLevel(prev_perf_level);
  337. }
  338. stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
  339. stream << "file_range_sync_nanos"
  340. << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
  341. stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
  342. stream << "file_prepare_write_nanos"
  343. << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  344. stream << "file_cpu_write_nanos"
  345. << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
  346. stream << "file_cpu_read_nanos"
  347. << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
  348. }
  349. TEST_SYNC_POINT("FlushJob::End");
  350. return s;
  351. }
  352. void FlushJob::Cancel() {
  353. db_mutex_->AssertHeld();
  354. assert(base_ != nullptr);
  355. base_->Unref();
  356. }
  357. Status FlushJob::MemPurge() {
  358. Status s;
  359. db_mutex_->AssertHeld();
  360. db_mutex_->Unlock();
  361. assert(!mems_.empty());
  362. // Measure purging time.
  363. const uint64_t start_micros = clock_->NowMicros();
  364. const uint64_t start_cpu_micros = clock_->CPUMicros();
  365. MemTable* new_mem = nullptr;
  366. // For performance/log investigation purposes:
  367. // look at how much useful payload we harvest in the new_mem.
  368. // This value is then printed to the DB log.
  369. double new_mem_capacity = 0.0;
  370. // Create two iterators, one for the memtable data (contains
  371. // info from puts + deletes), and one for the memtable
  372. // Range Tombstones (from DeleteRanges).
  373. // TODO: plumb Env::IOActivity, Env::IOPriority
  374. ReadOptions ro;
  375. ro.total_order_seek = true;
  376. Arena arena;
  377. std::vector<InternalIterator*> memtables;
  378. std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
  379. range_del_iters;
  380. for (ReadOnlyMemTable* m : mems_) {
  381. memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
  382. &arena, /*prefix_extractor=*/nullptr,
  383. /*for_flush=*/true));
  384. auto* range_del_iter = m->NewRangeTombstoneIterator(
  385. ro, kMaxSequenceNumber, true /* immutable_memtable */);
  386. if (range_del_iter != nullptr) {
  387. range_del_iters.emplace_back(range_del_iter);
  388. }
  389. }
  390. assert(!memtables.empty());
  391. SequenceNumber first_seqno = kMaxSequenceNumber;
  392. SequenceNumber earliest_seqno = kMaxSequenceNumber;
  393. // Pick first and earliest seqno as min of all first_seqno
  394. // and earliest_seqno of the mempurged memtables.
  395. for (const auto& mem : mems_) {
  396. first_seqno = mem->GetFirstSequenceNumber() < first_seqno
  397. ? mem->GetFirstSequenceNumber()
  398. : first_seqno;
  399. earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
  400. ? mem->GetEarliestSequenceNumber()
  401. : earliest_seqno;
  402. }
  403. ScopedArenaPtr<InternalIterator> iter(
  404. NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
  405. static_cast<int>(memtables.size()), &arena));
  406. const auto& ioptions = cfd_->ioptions();
  407. // Place iterator at the First (meaning most recent) key node.
  408. iter->SeekToFirst();
  409. const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow());
  410. std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
  411. new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
  412. job_context_->snapshot_seqs,
  413. full_history_ts_low));
  414. for (auto& rd_iter : range_del_iters) {
  415. range_del_agg->AddTombstones(std::move(rd_iter));
  416. }
  417. // If there is valid data in the memtable,
  418. // or at least range tombstones, copy over the info
  419. // to the new memtable.
  420. if (iter->Valid() || !range_del_agg->IsEmpty()) {
  421. // MaxSize is the size of a memtable.
  422. size_t maxSize = mutable_cf_options_.write_buffer_size;
  423. std::unique_ptr<CompactionFilter> compaction_filter;
  424. if (ioptions.compaction_filter_factory != nullptr &&
  425. ioptions.compaction_filter_factory->ShouldFilterTableFileCreation(
  426. TableFileCreationReason::kFlush)) {
  427. CompactionFilter::Context ctx;
  428. ctx.is_full_compaction = false;
  429. ctx.is_manual_compaction = false;
  430. ctx.column_family_id = cfd_->GetID();
  431. ctx.reason = TableFileCreationReason::kFlush;
  432. compaction_filter =
  433. ioptions.compaction_filter_factory->CreateCompactionFilter(ctx);
  434. if (compaction_filter != nullptr &&
  435. !compaction_filter->IgnoreSnapshots()) {
  436. s = Status::NotSupported(
  437. "CompactionFilter::IgnoreSnapshots() = false is not supported "
  438. "anymore.");
  439. return s;
  440. }
  441. }
  442. new_mem = new MemTable(cfd_->internal_comparator(), cfd_->ioptions(),
  443. mutable_cf_options_, cfd_->write_buffer_mgr(),
  444. earliest_seqno, cfd_->GetID());
  445. assert(new_mem != nullptr);
  446. Env* env = db_options_.env;
  447. assert(env);
  448. MergeHelper merge(env, (cfd_->internal_comparator()).user_comparator(),
  449. (ioptions.merge_operator).get(), compaction_filter.get(),
  450. ioptions.logger,
  451. true /* internal key corruption is not ok */,
  452. job_context_->GetLatestSnapshotSequence(),
  453. job_context_->snapshot_checker);
  454. assert(job_context_);
  455. const std::atomic<bool> kManualCompactionCanceledFalse{false};
  456. CompactionIterator c_iter(
  457. iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
  458. kMaxSequenceNumber, &job_context_->snapshot_seqs, earliest_snapshot_,
  459. job_context_->earliest_write_conflict_snapshot,
  460. job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
  461. env, ShouldReportDetailedTime(env, ioptions.stats), range_del_agg.get(),
  462. nullptr, ioptions.allow_data_in_errors,
  463. ioptions.enforce_single_del_contracts,
  464. /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
  465. false /* must_count_input_entries */,
  466. /*compaction=*/nullptr, compaction_filter.get(),
  467. /*shutting_down=*/nullptr, ioptions.info_log, full_history_ts_low);
  468. // Set earliest sequence number in the new memtable
  469. // to be equal to the earliest sequence number of the
  470. // memtable being flushed (See later if there is a need
  471. // to update this number!).
  472. new_mem->SetEarliestSequenceNumber(earliest_seqno);
  473. // Likewise for first seq number.
  474. new_mem->SetFirstSequenceNumber(first_seqno);
  475. SequenceNumber new_first_seqno = kMaxSequenceNumber;
  476. c_iter.SeekToFirst();
  477. // Key transfer
  478. for (; c_iter.Valid(); c_iter.Next()) {
  479. const ParsedInternalKey ikey = c_iter.ikey();
  480. const Slice value = c_iter.value();
  481. new_first_seqno =
  482. ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
  483. // Should we update "OldestKeyTime" ???? -> timestamp appear
  484. // to still be an "experimental" feature.
  485. s = new_mem->Add(
  486. ikey.sequence, ikey.type, ikey.user_key, value,
  487. nullptr, // KV protection info set as nullptr since it
  488. // should only be useful for the first add to
  489. // the original memtable.
  490. false, // : allow concurrent_memtable_writes_
  491. // Not seen as necessary for now.
  492. nullptr, // get_post_process_info(m) must be nullptr
  493. // when concurrent_memtable_writes is switched off.
  494. nullptr); // hint, only used when concurrent_memtable_writes_
  495. // is switched on.
  496. if (!s.ok()) {
  497. break;
  498. }
  499. // If new_mem has size greater than maxSize,
  500. // then rollback to regular flush operation,
  501. // and destroy new_mem.
  502. if (new_mem->ApproximateMemoryUsage() > maxSize) {
  503. s = Status::Aborted("Mempurge filled more than one memtable.");
  504. new_mem_capacity = 1.0;
  505. break;
  506. }
  507. }
  508. // Check status and propagate
  509. // potential error status from c_iter
  510. if (!s.ok()) {
  511. c_iter.status().PermitUncheckedError();
  512. } else if (!c_iter.status().ok()) {
  513. s = c_iter.status();
  514. }
  515. // Range tombstone transfer.
  516. if (s.ok()) {
  517. auto range_del_it = range_del_agg->NewIterator();
  518. for (range_del_it->SeekToFirst(); range_del_it->Valid();
  519. range_del_it->Next()) {
  520. auto tombstone = range_del_it->Tombstone();
  521. new_first_seqno =
  522. tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
  523. s = new_mem->Add(
  524. tombstone.seq_, // Sequence number
  525. kTypeRangeDeletion, // KV type
  526. tombstone.start_key_, // Key is start key.
  527. tombstone.end_key_, // Value is end key.
  528. nullptr, // KV protection info set as nullptr since it
  529. // should only be useful for the first add to
  530. // the original memtable.
  531. false, // : allow concurrent_memtable_writes_
  532. // Not seen as necessary for now.
  533. nullptr, // get_post_process_info(m) must be nullptr
  534. // when concurrent_memtable_writes is switched off.
  535. nullptr); // hint, only used when concurrent_memtable_writes_
  536. // is switched on.
  537. if (!s.ok()) {
  538. break;
  539. }
  540. // If new_mem has size greater than maxSize,
  541. // then rollback to regular flush operation,
  542. // and destroy new_mem.
  543. if (new_mem->ApproximateMemoryUsage() > maxSize) {
  544. s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
  545. new_mem_capacity = 1.0;
  546. break;
  547. }
  548. }
  549. }
  550. // If everything happened smoothly and new_mem contains valid data,
  551. // decide if it is flushed to storage or kept in the imm()
  552. // memtable list (memory).
  553. if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
  554. // Rectify the first sequence number, which (unlike the earliest seq
  555. // number) needs to be present in the new memtable.
  556. new_mem->SetFirstSequenceNumber(new_first_seqno);
  557. // The new_mem is added to the list of immutable memtables
  558. // only if it filled at less than 100% capacity and isn't flagged
  559. // as in need of being flushed.
  560. if (new_mem->ApproximateMemoryUsage() < maxSize &&
  561. !(new_mem->ShouldFlushNow())) {
  562. // Construct fragmented memtable range tombstones without mutex
  563. new_mem->ConstructFragmentedRangeTombstones();
  564. db_mutex_->Lock();
  565. // Take the newest id, so that memtables in MemtableList don't have
  566. // out-of-order memtable ids.
  567. uint64_t new_mem_id = mems_.back()->GetID();
  568. new_mem->SetID(new_mem_id);
  569. // Take the latest memtable's next log number.
  570. new_mem->SetNextLogNumber(mems_.back()->GetNextLogNumber());
  571. // This addition will not trigger another flush, because
  572. // we do not call EnqueuePendingFlush().
  573. cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
  574. new_mem->Ref();
  575. // Piggyback FlushJobInfo on the first flushed memtable.
  576. db_mutex_->AssertHeld();
  577. meta_.fd.file_size = 0;
  578. mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
  579. db_mutex_->Unlock();
  580. } else {
  581. s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
  582. new_mem_capacity = 1.0;
  583. if (new_mem) {
  584. job_context_->memtables_to_free.push_back(new_mem);
  585. }
  586. }
  587. } else {
  588. // In this case, the newly allocated new_mem is empty.
  589. assert(new_mem != nullptr);
  590. job_context_->memtables_to_free.push_back(new_mem);
  591. }
  592. }
  593. // Reacquire the mutex for WriteLevel0 function.
  594. db_mutex_->Lock();
  595. // If mempurge successful, don't write input tables to level0,
  596. // but write any full output table to level0.
  597. if (s.ok()) {
  598. TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
  599. } else {
  600. TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
  601. }
  602. const uint64_t micros = clock_->NowMicros() - start_micros;
  603. const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
  604. ROCKS_LOG_INFO(db_options_.info_log,
  605. "[%s] [JOB %d] Mempurge lasted %" PRIu64
  606. " microseconds, and %" PRIu64
  607. " cpu "
  608. "microseconds. Status is %s ok. Perc capacity: %f\n",
  609. cfd_->GetName().c_str(), job_context_->job_id, micros,
  610. cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
  611. return s;
  612. }
  613. bool FlushJob::MemPurgeDecider(double threshold) {
  614. // Never trigger mempurge if threshold is not a strictly positive value.
  615. if (!(threshold > 0.0)) {
  616. return false;
  617. }
  618. if (threshold > (1.0 * mems_.size())) {
  619. return true;
  620. }
  621. // Payload and useful_payload (in bytes).
  622. // The useful payload ratio of a given MemTable
  623. // is estimated to be useful_payload/payload.
  624. uint64_t payload = 0, useful_payload = 0, entry_size = 0;
  625. // Local variables used repetitively inside the for-loop
  626. // when iterating over the sampled entries.
  627. Slice key_slice, value_slice;
  628. ParsedInternalKey res;
  629. SnapshotImpl min_snapshot;
  630. std::string vget;
  631. Status mget_s, parse_s;
  632. MergeContext merge_context;
  633. SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
  634. min_seqno_snapshot = 0;
  635. bool get_res, can_be_useful_payload, not_in_next_mems;
  636. // If estimated_useful_payload is > threshold,
  637. // then flush to storage, else MemPurge.
  638. double estimated_useful_payload = 0.0;
  639. // Cochran formula for determining sample size.
  640. // 95% confidence interval, 7% precision.
  641. // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
  642. double n0 = 196.0;
  643. // TODO: plumb Env::IOActivity, Env::IOPriority
  644. ReadOptions ro;
  645. ro.total_order_seek = true;
  646. // Iterate over each memtable of the set.
  647. for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
  648. ++mem_iter) {
  649. ReadOnlyMemTable* mt = *mem_iter;
  650. // Else sample from the table.
  651. uint64_t nentries = mt->NumEntries();
  652. // Corrected Cochran formula for small populations
  653. // (converges to n0 for large populations).
  654. uint64_t target_sample_size =
  655. static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
  656. std::unordered_set<const char*> sentries = {};
  657. // Populate sample entries set.
  658. mt->UniqueRandomSample(target_sample_size, &sentries);
  659. // Estimate the garbage ratio by comparing if
  660. // each sample corresponds to a valid entry.
  661. for (const char* ss : sentries) {
  662. key_slice = GetLengthPrefixedSlice(ss);
  663. parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
  664. if (!parse_s.ok()) {
  665. ROCKS_LOG_WARN(db_options_.info_log,
  666. "Memtable Decider: ParseInternalKey did not parse "
  667. "key_slice %s successfully.",
  668. key_slice.data());
  669. }
  670. // Size of the entry is "key size (+ value size if KV entry)"
  671. entry_size = key_slice.size();
  672. if (res.type == kTypeValue) {
  673. value_slice =
  674. GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  675. entry_size += value_slice.size();
  676. }
  677. // Count entry bytes as payload.
  678. payload += entry_size;
  679. LookupKey lkey(res.user_key, kMaxSequenceNumber);
  680. // Paranoia: zero out these values just in case.
  681. max_covering_tombstone_seq = 0;
  682. sqno = 0;
  683. // Pick the oldest existing snapshot that is more recent
  684. // than the sequence number of the sampled entry.
  685. min_seqno_snapshot = kMaxSequenceNumber;
  686. for (SequenceNumber seq_num : job_context_->snapshot_seqs) {
  687. if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
  688. min_seqno_snapshot = seq_num;
  689. }
  690. }
  691. min_snapshot.number_ = min_seqno_snapshot;
  692. ro.snapshot =
  693. min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
  694. // Estimate if the sample entry is valid or not.
  695. get_res = mt->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
  696. &mget_s, &merge_context, &max_covering_tombstone_seq,
  697. &sqno, ro, true /* immutable_memtable */);
  698. if (!get_res) {
  699. ROCKS_LOG_WARN(
  700. db_options_.info_log,
  701. "Memtable Get returned false when Get(sampled entry). "
  702. "Yet each sample entry should exist somewhere in the memtable, "
  703. "unrelated to whether it has been deleted or not.");
  704. }
  705. // TODO(bjlemaire): evaluate typeMerge.
  706. // This is where the sampled entry is estimated to be
  707. // garbage or not. Note that this is a garbage *estimation*
  708. // because we do not include certain items such as
  709. // CompactionFitlers triggered at flush, or if the same delete
  710. // has been inserted twice or more in the memtable.
  711. // Evaluate if the entry can be useful payload
  712. // Situation #1: entry is a KV entry, was found in the memtable mt
  713. // and the sequence numbers match.
  714. can_be_useful_payload = (res.type == kTypeValue) && get_res &&
  715. mget_s.ok() && (sqno == res.sequence);
  716. // Situation #2: entry is a delete entry, was found in the memtable mt
  717. // (because gres==true) and no valid KV entry is found.
  718. // (note: duplicate delete entries are also taken into
  719. // account here, because the sequence number 'sqno'
  720. // in memtable->Get(&sqno) operation is set to be equal
  721. // to the most recent delete entry as well).
  722. can_be_useful_payload |=
  723. ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
  724. mget_s.IsNotFound() && get_res && (sqno == res.sequence);
  725. // If there is a chance that the entry is useful payload
  726. // Verify that the entry does not appear in the following memtables
  727. // (memtables with greater memtable ID/larger sequence numbers).
  728. if (can_be_useful_payload) {
  729. not_in_next_mems = true;
  730. for (auto next_mem_iter = mem_iter + 1;
  731. next_mem_iter != std::end(mems_); next_mem_iter++) {
  732. if ((*next_mem_iter)
  733. ->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
  734. &mget_s, &merge_context, &max_covering_tombstone_seq,
  735. &sqno, ro, true /* immutable_memtable */)) {
  736. not_in_next_mems = false;
  737. break;
  738. }
  739. }
  740. if (not_in_next_mems) {
  741. useful_payload += entry_size;
  742. }
  743. }
  744. }
  745. if (payload > 0) {
  746. // We use the estimated useful payload ratio to
  747. // evaluate how many of the memtable bytes are useful bytes.
  748. estimated_useful_payload +=
  749. (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
  750. ROCKS_LOG_INFO(db_options_.info_log,
  751. "Mempurge sampling [CF %s] - found garbage ratio from "
  752. "sampling: %f. Threshold is %f\n",
  753. cfd_->GetName().c_str(),
  754. (payload - useful_payload) * 1.0 / payload, threshold);
  755. } else {
  756. ROCKS_LOG_WARN(db_options_.info_log,
  757. "Mempurge sampling: null payload measured, and collected "
  758. "sample size is %zu\n.",
  759. sentries.size());
  760. }
  761. }
  762. // We convert the total number of useful payload bytes
  763. // into the proportion of memtable necessary to store all these bytes.
  764. // We compare this proportion with the threshold value.
  765. return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
  766. threshold);
  767. }
  768. Status FlushJob::WriteLevel0Table() {
  769. AutoThreadOperationStageUpdater stage_updater(
  770. ThreadStatus::STAGE_FLUSH_WRITE_L0);
  771. db_mutex_->AssertHeld();
  772. const uint64_t start_micros = clock_->NowMicros();
  773. const uint64_t start_cpu_micros = clock_->CPUMicros();
  774. Status s;
  775. meta_.temperature = mutable_cf_options_.default_write_temperature;
  776. file_options_.temperature = meta_.temperature;
  777. const auto* ucmp = cfd_->internal_comparator().user_comparator();
  778. assert(ucmp);
  779. const size_t ts_sz = ucmp->timestamp_size();
  780. const bool logical_strip_timestamp =
  781. ts_sz > 0 && !cfd_->ioptions().persist_user_defined_timestamps;
  782. std::vector<BlobFileAddition> blob_file_additions;
  783. // Note that here we treat flush as level 0 compaction in internal stats
  784. InternalStats::CompactionStats flush_stats(CompactionReason::kFlush,
  785. 1 /* count**/);
  786. {
  787. auto write_hint = base_->storage_info()->CalculateSSTWriteHint(
  788. /*level=*/0, db_options_.calculate_sst_write_lifetime_hint_set);
  789. Env::IOPriority io_priority = GetRateLimiterPriority();
  790. db_mutex_->Unlock();
  791. if (log_buffer_) {
  792. log_buffer_->FlushBufferToLog();
  793. }
  794. // memtables and range_del_iters store internal iterators over each data
  795. // memtable and its associated range deletion memtable, respectively, at
  796. // corresponding indexes.
  797. std::vector<InternalIterator*> memtables;
  798. std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
  799. range_del_iters;
  800. ReadOptions ro;
  801. ro.total_order_seek = true;
  802. ro.io_activity = Env::IOActivity::kFlush;
  803. Arena arena;
  804. uint64_t total_num_input_entries = 0, total_num_deletes = 0;
  805. uint64_t total_data_size = 0;
  806. size_t total_memory_usage = 0;
  807. uint64_t total_num_range_deletes = 0;
  808. // Used for testing:
  809. uint64_t mems_size = mems_.size();
  810. (void)mems_size; // avoids unused variable error when
  811. // TEST_SYNC_POINT_CALLBACK not used.
  812. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
  813. &mems_size);
  814. assert(job_context_);
  815. for (ReadOnlyMemTable* m : mems_) {
  816. ROCKS_LOG_INFO(db_options_.info_log,
  817. "[%s] [JOB %d] Flushing memtable id %" PRIu64
  818. " with next log file: %" PRIu64 ", marked_for_flush: %d\n",
  819. cfd_->GetName().c_str(), job_context_->job_id, m->GetID(),
  820. m->GetNextLogNumber(), m->IsMarkedForFlush());
  821. if (logical_strip_timestamp) {
  822. memtables.push_back(m->NewTimestampStrippingIterator(
  823. ro, /*seqno_to_time_mapping=*/nullptr, &arena,
  824. /*prefix_extractor=*/nullptr, ts_sz));
  825. } else {
  826. memtables.push_back(
  827. m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
  828. /*prefix_extractor=*/nullptr, /*for_flush=*/true));
  829. }
  830. auto* range_del_iter =
  831. logical_strip_timestamp
  832. ? m->NewTimestampStrippingRangeTombstoneIterator(
  833. ro, kMaxSequenceNumber, ts_sz)
  834. : m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
  835. true /* immutable_memtable */);
  836. if (range_del_iter != nullptr) {
  837. range_del_iters.emplace_back(range_del_iter);
  838. }
  839. total_num_input_entries += m->NumEntries();
  840. total_num_deletes += m->NumDeletion();
  841. total_data_size += m->GetDataSize();
  842. total_memory_usage += m->ApproximateMemoryUsage();
  843. total_num_range_deletes += m->NumRangeDeletion();
  844. }
  845. // TODO(cbi): when memtable is flushed due to number of range deletions
  846. // hitting limit memtable_max_range_deletions, flush_reason_ is still
  847. // "Write Buffer Full", should make update flush_reason_ accordingly.
  848. event_logger_->Log() << "job" << job_context_->job_id << "event"
  849. << "flush_started" << "num_memtables" << mems_.size()
  850. << "total_num_input_entries" << total_num_input_entries
  851. << "num_deletes" << total_num_deletes
  852. << "total_data_size" << total_data_size
  853. << "memory_usage" << total_memory_usage
  854. << "num_range_deletes" << total_num_range_deletes
  855. << "flush_reason"
  856. << GetFlushReasonString(flush_reason_);
  857. {
  858. ScopedArenaPtr<InternalIterator> iter(
  859. NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
  860. static_cast<int>(memtables.size()), &arena));
  861. ROCKS_LOG_INFO(db_options_.info_log,
  862. "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
  863. cfd_->GetName().c_str(), job_context_->job_id,
  864. meta_.fd.GetNumber());
  865. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
  866. &output_compression_);
  867. int64_t _current_time = 0;
  868. auto status = clock_->GetCurrentTime(&_current_time);
  869. // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
  870. if (!status.ok()) {
  871. ROCKS_LOG_WARN(
  872. db_options_.info_log,
  873. "Failed to get current time to populate creation_time property. "
  874. "Status: %s",
  875. status.ToString().c_str());
  876. }
  877. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  878. uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime();
  879. // It's not clear whether oldest_key_time is always available. In case
  880. // it is not available, use current_time.
  881. uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
  882. TEST_SYNC_POINT_CALLBACK(
  883. "FlushJob::WriteLevel0Table:oldest_ancester_time",
  884. &oldest_ancester_time);
  885. meta_.oldest_ancester_time = oldest_ancester_time;
  886. meta_.file_creation_time = current_time;
  887. uint64_t memtable_payload_bytes = 0;
  888. uint64_t memtable_garbage_bytes = 0;
  889. IOStatus io_s;
  890. const std::string* const full_history_ts_low =
  891. (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
  892. ReadOptions read_options(Env::IOActivity::kFlush);
  893. read_options.rate_limiter_priority = io_priority;
  894. const WriteOptions write_options(io_priority, Env::IOActivity::kFlush);
  895. TableBuilderOptions tboptions(
  896. cfd_->ioptions(), mutable_cf_options_, read_options, write_options,
  897. cfd_->internal_comparator(), cfd_->internal_tbl_prop_coll_factories(),
  898. output_compression_, mutable_cf_options_.compression_opts,
  899. cfd_->GetID(), cfd_->GetName(), 0 /* level */,
  900. current_time /* newest_key_time */, false /* is_bottommost */,
  901. TableFileCreationReason::kFlush, oldest_key_time, current_time,
  902. db_id_, db_session_id_, 0 /* target_file_size */,
  903. meta_.fd.GetNumber(),
  904. preclude_last_level_min_seqno_ == kMaxSequenceNumber
  905. ? preclude_last_level_min_seqno_
  906. : std::min(earliest_snapshot_, preclude_last_level_min_seqno_));
  907. s = BuildTable(
  908. dbname_, versions_, db_options_, tboptions, file_options_,
  909. cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
  910. &blob_file_additions, job_context_->snapshot_seqs, earliest_snapshot_,
  911. job_context_->earliest_write_conflict_snapshot,
  912. job_context_->GetJobSnapshotSequence(),
  913. job_context_->snapshot_checker,
  914. mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
  915. &io_s, io_tracer_, BlobFileCreationReason::kFlush,
  916. seqno_to_time_mapping_.get(), event_logger_, job_context_->job_id,
  917. &table_properties_, write_hint, full_history_ts_low, blob_callback_,
  918. base_, &memtable_payload_bytes, &memtable_garbage_bytes,
  919. &flush_stats);
  920. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
  921. // TODO: Cleanup io_status in BuildTable and table builders
  922. assert(!s.ok() || io_s.ok());
  923. io_s.PermitUncheckedError();
  924. if (s.ok() && total_num_input_entries != flush_stats.num_input_records) {
  925. std::string msg = "Expected " +
  926. std::to_string(total_num_input_entries) +
  927. " entries in memtables, but read " +
  928. std::to_string(flush_stats.num_input_records);
  929. ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
  930. cfd_->GetName().c_str(), job_context_->job_id,
  931. msg.c_str());
  932. if (db_options_.flush_verify_memtable_count) {
  933. s = Status::Corruption(msg);
  934. }
  935. }
  936. // Only verify on table with format collects table properties
  937. if (s.ok() &&
  938. (mutable_cf_options_.table_factory->IsInstanceOf(
  939. TableFactory::kBlockBasedTableName()) ||
  940. mutable_cf_options_.table_factory->IsInstanceOf(
  941. TableFactory::kPlainTableName())) &&
  942. flush_stats.num_output_records != table_properties_.num_entries) {
  943. std::string msg =
  944. "Number of keys in flush output SST files does not match "
  945. "number of keys added to the table. Expected " +
  946. std::to_string(flush_stats.num_output_records) + " but there are " +
  947. std::to_string(table_properties_.num_entries) +
  948. " in output SST files";
  949. ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
  950. cfd_->GetName().c_str(), job_context_->job_id,
  951. msg.c_str());
  952. if (db_options_.flush_verify_memtable_count) {
  953. s = Status::Corruption(msg);
  954. }
  955. }
  956. if (tboptions.reason == TableFileCreationReason::kFlush) {
  957. TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
  958. RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
  959. memtable_payload_bytes);
  960. RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
  961. memtable_garbage_bytes);
  962. }
  963. LogFlush(db_options_.info_log);
  964. }
  965. ROCKS_LOG_BUFFER(log_buffer_,
  966. "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
  967. " bytes %s"
  968. " %s"
  969. " %s",
  970. cfd_->GetName().c_str(), job_context_->job_id,
  971. meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
  972. s.ToString().c_str(),
  973. s.ok() && meta_.fd.GetFileSize() == 0
  974. ? "It's an empty SST file from a successful flush so "
  975. "won't be kept in the DB"
  976. : "",
  977. meta_.marked_for_compaction ? " (needs compaction)" : "");
  978. if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
  979. s = output_file_directory_->FsyncWithDirOptions(
  980. IOOptions(), nullptr,
  981. DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
  982. }
  983. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
  984. db_mutex_->Lock();
  985. }
  986. base_->Unref();
  987. // Note that if file_size is zero, the file has been deleted and
  988. // should not be added to the manifest.
  989. const bool has_output = meta_.fd.GetFileSize() > 0;
  990. if (s.ok() && has_output) {
  991. TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
  992. // if we have more than 1 background thread, then we cannot
  993. // insert files directly into higher levels because some other
  994. // threads could be concurrently producing compacted files for
  995. // that key range.
  996. // Add file to L0
  997. edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
  998. meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
  999. meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
  1000. meta_.marked_for_compaction, meta_.temperature,
  1001. meta_.oldest_blob_file_number, meta_.oldest_ancester_time,
  1002. meta_.file_creation_time, meta_.epoch_number,
  1003. meta_.file_checksum, meta_.file_checksum_func_name,
  1004. meta_.unique_id, meta_.compensated_range_deletion_size,
  1005. meta_.tail_size, meta_.user_defined_timestamps_persisted);
  1006. edit_->SetBlobFileAdditions(std::move(blob_file_additions));
  1007. }
  1008. // Piggyback FlushJobInfo on the first first flushed memtable.
  1009. mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
  1010. const uint64_t micros = clock_->NowMicros() - start_micros;
  1011. const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
  1012. flush_stats.micros = micros;
  1013. flush_stats.cpu_micros += cpu_micros;
  1014. ROCKS_LOG_INFO(db_options_.info_log,
  1015. "[%s] [JOB %d] Flush lasted %" PRIu64
  1016. " microseconds, and %" PRIu64 " cpu microseconds.\n",
  1017. cfd_->GetName().c_str(), job_context_->job_id, micros,
  1018. flush_stats.cpu_micros);
  1019. if (has_output) {
  1020. flush_stats.bytes_written = meta_.fd.GetFileSize();
  1021. flush_stats.num_output_files = 1;
  1022. }
  1023. const auto& blobs = edit_->GetBlobFileAdditions();
  1024. for (const auto& blob : blobs) {
  1025. flush_stats.bytes_written_blob += blob.GetTotalBlobBytes();
  1026. }
  1027. flush_stats.num_output_files_blob = static_cast<int>(blobs.size());
  1028. RecordTimeToHistogram(stats_, FLUSH_TIME, flush_stats.micros);
  1029. cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_,
  1030. flush_stats);
  1031. cfd_->internal_stats()->AddCFStats(
  1032. InternalStats::BYTES_FLUSHED,
  1033. flush_stats.bytes_written + flush_stats.bytes_written_blob);
  1034. RecordFlushIOStats();
  1035. return s;
  1036. }
  1037. Env::IOPriority FlushJob::GetRateLimiterPriority() {
  1038. if (versions_ && versions_->GetColumnFamilySet() &&
  1039. versions_->GetColumnFamilySet()->write_controller()) {
  1040. WriteController* write_controller =
  1041. versions_->GetColumnFamilySet()->write_controller();
  1042. if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
  1043. return Env::IO_USER;
  1044. }
  1045. }
  1046. return Env::IO_HIGH;
  1047. }
  1048. std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
  1049. db_mutex_->AssertHeld();
  1050. std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
  1051. info->cf_id = cfd_->GetID();
  1052. info->cf_name = cfd_->GetName();
  1053. const uint64_t file_number = meta_.fd.GetNumber();
  1054. info->file_path =
  1055. MakeTableFileName(cfd_->ioptions().cf_paths[0].path, file_number);
  1056. info->file_number = file_number;
  1057. info->oldest_blob_file_number = meta_.oldest_blob_file_number;
  1058. info->thread_id = db_options_.env->GetThreadID();
  1059. info->job_id = job_context_->job_id;
  1060. info->smallest_seqno = meta_.fd.smallest_seqno;
  1061. info->largest_seqno = meta_.fd.largest_seqno;
  1062. info->table_properties = table_properties_;
  1063. info->flush_reason = flush_reason_;
  1064. info->blob_compression_type = mutable_cf_options_.blob_compression_type;
  1065. // Update BlobFilesInfo.
  1066. for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
  1067. BlobFileAdditionInfo blob_file_addition_info(
  1068. BlobFileName(cfd_->ioptions().cf_paths.front().path,
  1069. blob_file.GetBlobFileNumber()) /*blob_file_path*/,
  1070. blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
  1071. blob_file.GetTotalBlobBytes());
  1072. info->blob_file_addition_infos.emplace_back(
  1073. std::move(blob_file_addition_info));
  1074. }
  1075. return info;
  1076. }
  1077. void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
  1078. db_mutex_->AssertHeld();
  1079. assert(pick_memtable_called);
  1080. const auto* ucmp = cfd_->internal_comparator().user_comparator();
  1081. assert(ucmp);
  1082. const size_t ts_sz = ucmp->timestamp_size();
  1083. if (db_options_.atomic_flush || ts_sz == 0 ||
  1084. cfd_->ioptions().persist_user_defined_timestamps) {
  1085. return;
  1086. }
  1087. // Find the newest user-defined timestamps from all the flushed memtables.
  1088. for (const ReadOnlyMemTable* m : mems_) {
  1089. Slice table_newest_udt = m->GetNewestUDT();
  1090. // Empty memtables can be legitimately created and flushed, for example
  1091. // by error recovery flush attempts.
  1092. if (table_newest_udt.empty()) {
  1093. continue;
  1094. }
  1095. if (cutoff_udt_.empty() ||
  1096. ucmp->CompareTimestamp(table_newest_udt, cutoff_udt_) > 0) {
  1097. if (!cutoff_udt_.empty()) {
  1098. assert(table_newest_udt.size() == cutoff_udt_.size());
  1099. }
  1100. cutoff_udt_.assign(table_newest_udt.data(), table_newest_udt.size());
  1101. }
  1102. }
  1103. }
  1104. void FlushJob::GetPrecludeLastLevelMinSeqno() {
  1105. if (mutable_cf_options_.preclude_last_level_data_seconds == 0) {
  1106. return;
  1107. }
  1108. // SuperVersion should guarantee this
  1109. assert(seqno_to_time_mapping_);
  1110. assert(!seqno_to_time_mapping_->Empty());
  1111. int64_t current_time = 0;
  1112. Status s = db_options_.clock->GetCurrentTime(&current_time);
  1113. if (!s.ok()) {
  1114. ROCKS_LOG_WARN(db_options_.info_log,
  1115. "Failed to get current time in Flush: Status: %s",
  1116. s.ToString().c_str());
  1117. } else {
  1118. SequenceNumber preserve_time_min_seqno;
  1119. seqno_to_time_mapping_->GetCurrentTieringCutoffSeqnos(
  1120. static_cast<uint64_t>(current_time),
  1121. mutable_cf_options_.preserve_internal_time_seconds,
  1122. mutable_cf_options_.preclude_last_level_data_seconds,
  1123. &preserve_time_min_seqno, &preclude_last_level_min_seqno_);
  1124. }
  1125. }
  1126. Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() {
  1127. db_mutex_->AssertHeld();
  1128. const auto* ucmp = cfd_->user_comparator();
  1129. assert(ucmp);
  1130. const std::string& full_history_ts_low = cfd_->GetFullHistoryTsLow();
  1131. // Update full_history_ts_low to right above cutoff udt only if that would
  1132. // increase it.
  1133. if (cutoff_udt_.empty() ||
  1134. (!full_history_ts_low.empty() &&
  1135. ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) {
  1136. return Status::OK();
  1137. }
  1138. std::string new_full_history_ts_low;
  1139. Slice cutoff_udt_slice = cutoff_udt_;
  1140. // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an
  1141. // operation to get the next immediately larger user-defined timestamp to
  1142. // expand this feature to other user-defined timestamp formats.
  1143. GetFullHistoryTsLowFromU64CutoffTs(&cutoff_udt_slice,
  1144. &new_full_history_ts_low);
  1145. VersionEdit edit;
  1146. edit.SetColumnFamily(cfd_->GetID());
  1147. edit.SetFullHistoryTsLow(new_full_history_ts_low);
  1148. return versions_->LogAndApply(cfd_, ReadOptions(Env::IOActivity::kFlush),
  1149. WriteOptions(Env::IOActivity::kFlush), &edit,
  1150. db_mutex_, output_file_directory_);
  1151. }
  1152. } // namespace ROCKSDB_NAMESPACE