flush_job.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/flush_job.h"
  10. #include <cinttypes>
  11. #include <algorithm>
  12. #include <vector>
  13. #include "db/builder.h"
  14. #include "db/db_iter.h"
  15. #include "db/dbformat.h"
  16. #include "db/event_helpers.h"
  17. #include "db/log_reader.h"
  18. #include "db/log_writer.h"
  19. #include "db/memtable.h"
  20. #include "db/memtable_list.h"
  21. #include "db/merge_context.h"
  22. #include "db/range_tombstone_fragmenter.h"
  23. #include "db/version_set.h"
  24. #include "file/file_util.h"
  25. #include "file/filename.h"
  26. #include "logging/event_logger.h"
  27. #include "logging/log_buffer.h"
  28. #include "logging/logging.h"
  29. #include "monitoring/iostats_context_imp.h"
  30. #include "monitoring/perf_context_imp.h"
  31. #include "monitoring/thread_status_util.h"
  32. #include "port/port.h"
  33. #include "rocksdb/db.h"
  34. #include "rocksdb/env.h"
  35. #include "rocksdb/statistics.h"
  36. #include "rocksdb/status.h"
  37. #include "rocksdb/table.h"
  38. #include "table/block_based/block.h"
  39. #include "table/block_based/block_based_table_factory.h"
  40. #include "table/merging_iterator.h"
  41. #include "table/table_builder.h"
  42. #include "table/two_level_iterator.h"
  43. #include "test_util/sync_point.h"
  44. #include "util/coding.h"
  45. #include "util/mutexlock.h"
  46. #include "util/stop_watch.h"
  47. namespace ROCKSDB_NAMESPACE {
  48. const char* GetFlushReasonString (FlushReason flush_reason) {
  49. switch (flush_reason) {
  50. case FlushReason::kOthers:
  51. return "Other Reasons";
  52. case FlushReason::kGetLiveFiles:
  53. return "Get Live Files";
  54. case FlushReason::kShutDown:
  55. return "Shut down";
  56. case FlushReason::kExternalFileIngestion:
  57. return "External File Ingestion";
  58. case FlushReason::kManualCompaction:
  59. return "Manual Compaction";
  60. case FlushReason::kWriteBufferManager:
  61. return "Write Buffer Manager";
  62. case FlushReason::kWriteBufferFull:
  63. return "Write Buffer Full";
  64. case FlushReason::kTest:
  65. return "Test";
  66. case FlushReason::kDeleteFiles:
  67. return "Delete Files";
  68. case FlushReason::kAutoCompaction:
  69. return "Auto Compaction";
  70. case FlushReason::kManualFlush:
  71. return "Manual Flush";
  72. case FlushReason::kErrorRecovery:
  73. return "Error Recovery";
  74. default:
  75. return "Invalid";
  76. }
  77. }
  78. FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
  79. const ImmutableDBOptions& db_options,
  80. const MutableCFOptions& mutable_cf_options,
  81. const uint64_t* max_memtable_id,
  82. const FileOptions& file_options, VersionSet* versions,
  83. InstrumentedMutex* db_mutex,
  84. std::atomic<bool>* shutting_down,
  85. std::vector<SequenceNumber> existing_snapshots,
  86. SequenceNumber earliest_write_conflict_snapshot,
  87. SnapshotChecker* snapshot_checker, JobContext* job_context,
  88. LogBuffer* log_buffer, Directory* db_directory,
  89. Directory* output_file_directory,
  90. CompressionType output_compression, Statistics* stats,
  91. EventLogger* event_logger, bool measure_io_stats,
  92. const bool sync_output_directory, const bool write_manifest,
  93. Env::Priority thread_pri)
  94. : dbname_(dbname),
  95. cfd_(cfd),
  96. db_options_(db_options),
  97. mutable_cf_options_(mutable_cf_options),
  98. max_memtable_id_(max_memtable_id),
  99. file_options_(file_options),
  100. versions_(versions),
  101. db_mutex_(db_mutex),
  102. shutting_down_(shutting_down),
  103. existing_snapshots_(std::move(existing_snapshots)),
  104. earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
  105. snapshot_checker_(snapshot_checker),
  106. job_context_(job_context),
  107. log_buffer_(log_buffer),
  108. db_directory_(db_directory),
  109. output_file_directory_(output_file_directory),
  110. output_compression_(output_compression),
  111. stats_(stats),
  112. event_logger_(event_logger),
  113. measure_io_stats_(measure_io_stats),
  114. sync_output_directory_(sync_output_directory),
  115. write_manifest_(write_manifest),
  116. edit_(nullptr),
  117. base_(nullptr),
  118. pick_memtable_called(false),
  119. thread_pri_(thread_pri) {
  120. // Update the thread status to indicate flush.
  121. ReportStartedFlush();
  122. TEST_SYNC_POINT("FlushJob::FlushJob()");
  123. }
  124. FlushJob::~FlushJob() {
  125. ThreadStatusUtil::ResetThreadStatus();
  126. }
  127. void FlushJob::ReportStartedFlush() {
  128. ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
  129. db_options_.enable_thread_tracking);
  130. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  131. ThreadStatusUtil::SetThreadOperationProperty(
  132. ThreadStatus::COMPACTION_JOB_ID,
  133. job_context_->job_id);
  134. IOSTATS_RESET(bytes_written);
  135. }
  136. void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  137. uint64_t input_size = 0;
  138. for (auto* mem : mems) {
  139. input_size += mem->ApproximateMemoryUsage();
  140. }
  141. ThreadStatusUtil::IncreaseThreadOperationProperty(
  142. ThreadStatus::FLUSH_BYTES_MEMTABLES,
  143. input_size);
  144. }
  145. void FlushJob::RecordFlushIOStats() {
  146. RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  147. ThreadStatusUtil::IncreaseThreadOperationProperty(
  148. ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
  149. IOSTATS_RESET(bytes_written);
  150. }
  151. void FlushJob::PickMemTable() {
  152. db_mutex_->AssertHeld();
  153. assert(!pick_memtable_called);
  154. pick_memtable_called = true;
  155. // Save the contents of the earliest memtable as a new Table
  156. cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
  157. if (mems_.empty()) {
  158. return;
  159. }
  160. ReportFlushInputSize(mems_);
  161. // entries mems are (implicitly) sorted in ascending order by their created
  162. // time. We will use the first memtable's `edit` to keep the meta info for
  163. // this flush.
  164. MemTable* m = mems_[0];
  165. edit_ = m->GetEdits();
  166. edit_->SetPrevLogNumber(0);
  167. // SetLogNumber(log_num) indicates logs with number smaller than log_num
  168. // will no longer be picked up for recovery.
  169. edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  170. edit_->SetColumnFamily(cfd_->GetID());
  171. // path 0 for level 0 file.
  172. meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  173. base_ = cfd_->current();
  174. base_->Ref(); // it is likely that we do not need this reference
  175. }
  176. Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
  177. FileMetaData* file_meta) {
  178. TEST_SYNC_POINT("FlushJob::Start");
  179. db_mutex_->AssertHeld();
  180. assert(pick_memtable_called);
  181. AutoThreadOperationStageUpdater stage_run(
  182. ThreadStatus::STAGE_FLUSH_RUN);
  183. if (mems_.empty()) {
  184. ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
  185. cfd_->GetName().c_str());
  186. return Status::OK();
  187. }
  188. // I/O measurement variables
  189. PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  190. uint64_t prev_write_nanos = 0;
  191. uint64_t prev_fsync_nanos = 0;
  192. uint64_t prev_range_sync_nanos = 0;
  193. uint64_t prev_prepare_write_nanos = 0;
  194. uint64_t prev_cpu_write_nanos = 0;
  195. uint64_t prev_cpu_read_nanos = 0;
  196. if (measure_io_stats_) {
  197. prev_perf_level = GetPerfLevel();
  198. SetPerfLevel(PerfLevel::kEnableTime);
  199. prev_write_nanos = IOSTATS(write_nanos);
  200. prev_fsync_nanos = IOSTATS(fsync_nanos);
  201. prev_range_sync_nanos = IOSTATS(range_sync_nanos);
  202. prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  203. prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
  204. prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
  205. }
  206. // This will release and re-acquire the mutex.
  207. Status s = WriteLevel0Table();
  208. if (s.ok() && cfd_->IsDropped()) {
  209. s = Status::ColumnFamilyDropped("Column family dropped during compaction");
  210. }
  211. if ((s.ok() || s.IsColumnFamilyDropped()) &&
  212. shutting_down_->load(std::memory_order_acquire)) {
  213. s = Status::ShutdownInProgress("Database shutdown");
  214. }
  215. if (!s.ok()) {
  216. cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
  217. } else if (write_manifest_) {
  218. TEST_SYNC_POINT("FlushJob::InstallResults");
  219. // Replace immutable memtable with the generated Table
  220. s = cfd_->imm()->TryInstallMemtableFlushResults(
  221. cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
  222. meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
  223. log_buffer_, &committed_flush_jobs_info_);
  224. }
  225. if (s.ok() && file_meta != nullptr) {
  226. *file_meta = meta_;
  227. }
  228. RecordFlushIOStats();
  229. // When measure_io_stats_ is true, the default 512 bytes is not enough.
  230. auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
  231. stream << "job" << job_context_->job_id << "event"
  232. << "flush_finished";
  233. stream << "output_compression"
  234. << CompressionTypeToString(output_compression_);
  235. stream << "lsm_state";
  236. stream.StartArray();
  237. auto vstorage = cfd_->current()->storage_info();
  238. for (int level = 0; level < vstorage->num_levels(); ++level) {
  239. stream << vstorage->NumLevelFiles(level);
  240. }
  241. stream.EndArray();
  242. stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
  243. if (measure_io_stats_) {
  244. if (prev_perf_level != PerfLevel::kEnableTime) {
  245. SetPerfLevel(prev_perf_level);
  246. }
  247. stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
  248. stream << "file_range_sync_nanos"
  249. << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
  250. stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
  251. stream << "file_prepare_write_nanos"
  252. << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  253. stream << "file_cpu_write_nanos"
  254. << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
  255. stream << "file_cpu_read_nanos"
  256. << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
  257. }
  258. return s;
  259. }
  260. void FlushJob::Cancel() {
  261. db_mutex_->AssertHeld();
  262. assert(base_ != nullptr);
  263. base_->Unref();
  264. }
  265. Status FlushJob::WriteLevel0Table() {
  266. AutoThreadOperationStageUpdater stage_updater(
  267. ThreadStatus::STAGE_FLUSH_WRITE_L0);
  268. db_mutex_->AssertHeld();
  269. const uint64_t start_micros = db_options_.env->NowMicros();
  270. const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
  271. Status s;
  272. {
  273. auto write_hint = cfd_->CalculateSSTWriteHint(0);
  274. db_mutex_->Unlock();
  275. if (log_buffer_) {
  276. log_buffer_->FlushBufferToLog();
  277. }
  278. // memtables and range_del_iters store internal iterators over each data
  279. // memtable and its associated range deletion memtable, respectively, at
  280. // corresponding indexes.
  281. std::vector<InternalIterator*> memtables;
  282. std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
  283. range_del_iters;
  284. ReadOptions ro;
  285. ro.total_order_seek = true;
  286. Arena arena;
  287. uint64_t total_num_entries = 0, total_num_deletes = 0;
  288. uint64_t total_data_size = 0;
  289. size_t total_memory_usage = 0;
  290. for (MemTable* m : mems_) {
  291. ROCKS_LOG_INFO(
  292. db_options_.info_log,
  293. "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
  294. cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
  295. memtables.push_back(m->NewIterator(ro, &arena));
  296. auto* range_del_iter =
  297. m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
  298. if (range_del_iter != nullptr) {
  299. range_del_iters.emplace_back(range_del_iter);
  300. }
  301. total_num_entries += m->num_entries();
  302. total_num_deletes += m->num_deletes();
  303. total_data_size += m->get_data_size();
  304. total_memory_usage += m->ApproximateMemoryUsage();
  305. }
  306. event_logger_->Log() << "job" << job_context_->job_id << "event"
  307. << "flush_started"
  308. << "num_memtables" << mems_.size() << "num_entries"
  309. << total_num_entries << "num_deletes"
  310. << total_num_deletes << "total_data_size"
  311. << total_data_size << "memory_usage"
  312. << total_memory_usage << "flush_reason"
  313. << GetFlushReasonString(cfd_->GetFlushReason());
  314. {
  315. ScopedArenaIterator iter(
  316. NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
  317. static_cast<int>(memtables.size()), &arena));
  318. ROCKS_LOG_INFO(db_options_.info_log,
  319. "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
  320. cfd_->GetName().c_str(), job_context_->job_id,
  321. meta_.fd.GetNumber());
  322. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
  323. &output_compression_);
  324. int64_t _current_time = 0;
  325. auto status = db_options_.env->GetCurrentTime(&_current_time);
  326. // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
  327. if (!status.ok()) {
  328. ROCKS_LOG_WARN(
  329. db_options_.info_log,
  330. "Failed to get current time to populate creation_time property. "
  331. "Status: %s",
  332. status.ToString().c_str());
  333. }
  334. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  335. uint64_t oldest_key_time =
  336. mems_.front()->ApproximateOldestKeyTime();
  337. // It's not clear whether oldest_key_time is always available. In case
  338. // it is not available, use current_time.
  339. meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
  340. meta_.file_creation_time = current_time;
  341. uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
  342. CompactionStyle::kCompactionStyleFIFO)
  343. ? current_time
  344. : meta_.oldest_ancester_time;
  345. s = BuildTable(
  346. dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
  347. mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
  348. std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
  349. cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
  350. cfd_->GetName(), existing_snapshots_,
  351. earliest_write_conflict_snapshot_, snapshot_checker_,
  352. output_compression_, mutable_cf_options_.sample_for_compression,
  353. cfd_->ioptions()->compression_opts,
  354. mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
  355. TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
  356. Env::IO_HIGH, &table_properties_, 0 /* level */,
  357. creation_time, oldest_key_time, write_hint, current_time);
  358. LogFlush(db_options_.info_log);
  359. }
  360. ROCKS_LOG_INFO(db_options_.info_log,
  361. "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
  362. " bytes %s"
  363. "%s",
  364. cfd_->GetName().c_str(), job_context_->job_id,
  365. meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
  366. s.ToString().c_str(),
  367. meta_.marked_for_compaction ? " (needs compaction)" : "");
  368. if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
  369. s = output_file_directory_->Fsync();
  370. }
  371. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
  372. db_mutex_->Lock();
  373. }
  374. base_->Unref();
  375. // Note that if file_size is zero, the file has been deleted and
  376. // should not be added to the manifest.
  377. if (s.ok() && meta_.fd.GetFileSize() > 0) {
  378. // if we have more than 1 background thread, then we cannot
  379. // insert files directly into higher levels because some other
  380. // threads could be concurrently producing compacted files for
  381. // that key range.
  382. // Add file to L0
  383. edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
  384. meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
  385. meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
  386. meta_.marked_for_compaction, meta_.oldest_blob_file_number,
  387. meta_.oldest_ancester_time, meta_.file_creation_time,
  388. meta_.file_checksum, meta_.file_checksum_func_name);
  389. }
  390. #ifndef ROCKSDB_LITE
  391. // Piggyback FlushJobInfo on the first first flushed memtable.
  392. mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
  393. #endif // !ROCKSDB_LITE
  394. // Note that here we treat flush as level 0 compaction in internal stats
  395. InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
  396. stats.micros = db_options_.env->NowMicros() - start_micros;
  397. stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
  398. stats.bytes_written = meta_.fd.GetFileSize();
  399. RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
  400. cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
  401. cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
  402. meta_.fd.GetFileSize());
  403. RecordFlushIOStats();
  404. return s;
  405. }
  406. #ifndef ROCKSDB_LITE
  407. std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
  408. db_mutex_->AssertHeld();
  409. std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
  410. info->cf_id = cfd_->GetID();
  411. info->cf_name = cfd_->GetName();
  412. const uint64_t file_number = meta_.fd.GetNumber();
  413. info->file_path =
  414. MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
  415. info->file_number = file_number;
  416. info->oldest_blob_file_number = meta_.oldest_blob_file_number;
  417. info->thread_id = db_options_.env->GetThreadID();
  418. info->job_id = job_context_->job_id;
  419. info->smallest_seqno = meta_.fd.smallest_seqno;
  420. info->largest_seqno = meta_.fd.largest_seqno;
  421. info->table_properties = table_properties_;
  422. info->flush_reason = cfd_->GetFlushReason();
  423. return info;
  424. }
  425. #endif // !ROCKSDB_LITE
  426. } // namespace ROCKSDB_NAMESPACE