| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/flush_job.h"
- #include <cinttypes>
- #include <algorithm>
- #include <vector>
- #include "db/builder.h"
- #include "db/db_iter.h"
- #include "db/dbformat.h"
- #include "db/event_helpers.h"
- #include "db/log_reader.h"
- #include "db/log_writer.h"
- #include "db/memtable.h"
- #include "db/memtable_list.h"
- #include "db/merge_context.h"
- #include "db/range_tombstone_fragmenter.h"
- #include "db/version_set.h"
- #include "file/file_util.h"
- #include "file/filename.h"
- #include "logging/event_logger.h"
- #include "logging/log_buffer.h"
- #include "logging/logging.h"
- #include "monitoring/iostats_context_imp.h"
- #include "monitoring/perf_context_imp.h"
- #include "monitoring/thread_status_util.h"
- #include "port/port.h"
- #include "rocksdb/db.h"
- #include "rocksdb/env.h"
- #include "rocksdb/statistics.h"
- #include "rocksdb/status.h"
- #include "rocksdb/table.h"
- #include "table/block_based/block.h"
- #include "table/block_based/block_based_table_factory.h"
- #include "table/merging_iterator.h"
- #include "table/table_builder.h"
- #include "table/two_level_iterator.h"
- #include "test_util/sync_point.h"
- #include "util/coding.h"
- #include "util/mutexlock.h"
- #include "util/stop_watch.h"
- namespace ROCKSDB_NAMESPACE {
- const char* GetFlushReasonString (FlushReason flush_reason) {
- switch (flush_reason) {
- case FlushReason::kOthers:
- return "Other Reasons";
- case FlushReason::kGetLiveFiles:
- return "Get Live Files";
- case FlushReason::kShutDown:
- return "Shut down";
- case FlushReason::kExternalFileIngestion:
- return "External File Ingestion";
- case FlushReason::kManualCompaction:
- return "Manual Compaction";
- case FlushReason::kWriteBufferManager:
- return "Write Buffer Manager";
- case FlushReason::kWriteBufferFull:
- return "Write Buffer Full";
- case FlushReason::kTest:
- return "Test";
- case FlushReason::kDeleteFiles:
- return "Delete Files";
- case FlushReason::kAutoCompaction:
- return "Auto Compaction";
- case FlushReason::kManualFlush:
- return "Manual Flush";
- case FlushReason::kErrorRecovery:
- return "Error Recovery";
- default:
- return "Invalid";
- }
- }
- FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
- const ImmutableDBOptions& db_options,
- const MutableCFOptions& mutable_cf_options,
- const uint64_t* max_memtable_id,
- const FileOptions& file_options, VersionSet* versions,
- InstrumentedMutex* db_mutex,
- std::atomic<bool>* shutting_down,
- std::vector<SequenceNumber> existing_snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
- SnapshotChecker* snapshot_checker, JobContext* job_context,
- LogBuffer* log_buffer, Directory* db_directory,
- Directory* output_file_directory,
- CompressionType output_compression, Statistics* stats,
- EventLogger* event_logger, bool measure_io_stats,
- const bool sync_output_directory, const bool write_manifest,
- Env::Priority thread_pri)
- : dbname_(dbname),
- cfd_(cfd),
- db_options_(db_options),
- mutable_cf_options_(mutable_cf_options),
- max_memtable_id_(max_memtable_id),
- file_options_(file_options),
- versions_(versions),
- db_mutex_(db_mutex),
- shutting_down_(shutting_down),
- existing_snapshots_(std::move(existing_snapshots)),
- earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
- snapshot_checker_(snapshot_checker),
- job_context_(job_context),
- log_buffer_(log_buffer),
- db_directory_(db_directory),
- output_file_directory_(output_file_directory),
- output_compression_(output_compression),
- stats_(stats),
- event_logger_(event_logger),
- measure_io_stats_(measure_io_stats),
- sync_output_directory_(sync_output_directory),
- write_manifest_(write_manifest),
- edit_(nullptr),
- base_(nullptr),
- pick_memtable_called(false),
- thread_pri_(thread_pri) {
- // Update the thread status to indicate flush.
- ReportStartedFlush();
- TEST_SYNC_POINT("FlushJob::FlushJob()");
- }
- FlushJob::~FlushJob() {
- ThreadStatusUtil::ResetThreadStatus();
- }
- void FlushJob::ReportStartedFlush() {
- ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
- db_options_.enable_thread_tracking);
- ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_JOB_ID,
- job_context_->job_id);
- IOSTATS_RESET(bytes_written);
- }
- void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
- uint64_t input_size = 0;
- for (auto* mem : mems) {
- input_size += mem->ApproximateMemoryUsage();
- }
- ThreadStatusUtil::IncreaseThreadOperationProperty(
- ThreadStatus::FLUSH_BYTES_MEMTABLES,
- input_size);
- }
- void FlushJob::RecordFlushIOStats() {
- RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
- ThreadStatusUtil::IncreaseThreadOperationProperty(
- ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
- IOSTATS_RESET(bytes_written);
- }
- void FlushJob::PickMemTable() {
- db_mutex_->AssertHeld();
- assert(!pick_memtable_called);
- pick_memtable_called = true;
- // Save the contents of the earliest memtable as a new Table
- cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
- if (mems_.empty()) {
- return;
- }
- ReportFlushInputSize(mems_);
- // entries mems are (implicitly) sorted in ascending order by their created
- // time. We will use the first memtable's `edit` to keep the meta info for
- // this flush.
- MemTable* m = mems_[0];
- edit_ = m->GetEdits();
- edit_->SetPrevLogNumber(0);
- // SetLogNumber(log_num) indicates logs with number smaller than log_num
- // will no longer be picked up for recovery.
- edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
- edit_->SetColumnFamily(cfd_->GetID());
- // path 0 for level 0 file.
- meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
- base_ = cfd_->current();
- base_->Ref(); // it is likely that we do not need this reference
- }
- Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
- FileMetaData* file_meta) {
- TEST_SYNC_POINT("FlushJob::Start");
- db_mutex_->AssertHeld();
- assert(pick_memtable_called);
- AutoThreadOperationStageUpdater stage_run(
- ThreadStatus::STAGE_FLUSH_RUN);
- if (mems_.empty()) {
- ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
- cfd_->GetName().c_str());
- return Status::OK();
- }
- // I/O measurement variables
- PerfLevel prev_perf_level = PerfLevel::kEnableTime;
- uint64_t prev_write_nanos = 0;
- uint64_t prev_fsync_nanos = 0;
- uint64_t prev_range_sync_nanos = 0;
- uint64_t prev_prepare_write_nanos = 0;
- uint64_t prev_cpu_write_nanos = 0;
- uint64_t prev_cpu_read_nanos = 0;
- if (measure_io_stats_) {
- prev_perf_level = GetPerfLevel();
- SetPerfLevel(PerfLevel::kEnableTime);
- prev_write_nanos = IOSTATS(write_nanos);
- prev_fsync_nanos = IOSTATS(fsync_nanos);
- prev_range_sync_nanos = IOSTATS(range_sync_nanos);
- prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
- prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
- prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
- }
- // This will release and re-acquire the mutex.
- Status s = WriteLevel0Table();
- if (s.ok() && cfd_->IsDropped()) {
- s = Status::ColumnFamilyDropped("Column family dropped during compaction");
- }
- if ((s.ok() || s.IsColumnFamilyDropped()) &&
- shutting_down_->load(std::memory_order_acquire)) {
- s = Status::ShutdownInProgress("Database shutdown");
- }
- if (!s.ok()) {
- cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
- } else if (write_manifest_) {
- TEST_SYNC_POINT("FlushJob::InstallResults");
- // Replace immutable memtable with the generated Table
- s = cfd_->imm()->TryInstallMemtableFlushResults(
- cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
- meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
- log_buffer_, &committed_flush_jobs_info_);
- }
- if (s.ok() && file_meta != nullptr) {
- *file_meta = meta_;
- }
- RecordFlushIOStats();
- // When measure_io_stats_ is true, the default 512 bytes is not enough.
- auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
- stream << "job" << job_context_->job_id << "event"
- << "flush_finished";
- stream << "output_compression"
- << CompressionTypeToString(output_compression_);
- stream << "lsm_state";
- stream.StartArray();
- auto vstorage = cfd_->current()->storage_info();
- for (int level = 0; level < vstorage->num_levels(); ++level) {
- stream << vstorage->NumLevelFiles(level);
- }
- stream.EndArray();
- stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
- if (measure_io_stats_) {
- if (prev_perf_level != PerfLevel::kEnableTime) {
- SetPerfLevel(prev_perf_level);
- }
- stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
- stream << "file_range_sync_nanos"
- << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
- stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
- stream << "file_prepare_write_nanos"
- << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
- stream << "file_cpu_write_nanos"
- << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
- stream << "file_cpu_read_nanos"
- << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
- }
- return s;
- }
- void FlushJob::Cancel() {
- db_mutex_->AssertHeld();
- assert(base_ != nullptr);
- base_->Unref();
- }
- Status FlushJob::WriteLevel0Table() {
- AutoThreadOperationStageUpdater stage_updater(
- ThreadStatus::STAGE_FLUSH_WRITE_L0);
- db_mutex_->AssertHeld();
- const uint64_t start_micros = db_options_.env->NowMicros();
- const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
- Status s;
- {
- auto write_hint = cfd_->CalculateSSTWriteHint(0);
- db_mutex_->Unlock();
- if (log_buffer_) {
- log_buffer_->FlushBufferToLog();
- }
- // memtables and range_del_iters store internal iterators over each data
- // memtable and its associated range deletion memtable, respectively, at
- // corresponding indexes.
- std::vector<InternalIterator*> memtables;
- std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
- range_del_iters;
- ReadOptions ro;
- ro.total_order_seek = true;
- Arena arena;
- uint64_t total_num_entries = 0, total_num_deletes = 0;
- uint64_t total_data_size = 0;
- size_t total_memory_usage = 0;
- for (MemTable* m : mems_) {
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
- cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
- memtables.push_back(m->NewIterator(ro, &arena));
- auto* range_del_iter =
- m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
- if (range_del_iter != nullptr) {
- range_del_iters.emplace_back(range_del_iter);
- }
- total_num_entries += m->num_entries();
- total_num_deletes += m->num_deletes();
- total_data_size += m->get_data_size();
- total_memory_usage += m->ApproximateMemoryUsage();
- }
- event_logger_->Log() << "job" << job_context_->job_id << "event"
- << "flush_started"
- << "num_memtables" << mems_.size() << "num_entries"
- << total_num_entries << "num_deletes"
- << total_num_deletes << "total_data_size"
- << total_data_size << "memory_usage"
- << total_memory_usage << "flush_reason"
- << GetFlushReasonString(cfd_->GetFlushReason());
- {
- ScopedArenaIterator iter(
- NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
- static_cast<int>(memtables.size()), &arena));
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
- cfd_->GetName().c_str(), job_context_->job_id,
- meta_.fd.GetNumber());
- TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
- &output_compression_);
- int64_t _current_time = 0;
- auto status = db_options_.env->GetCurrentTime(&_current_time);
- // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
- if (!status.ok()) {
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "Failed to get current time to populate creation_time property. "
- "Status: %s",
- status.ToString().c_str());
- }
- const uint64_t current_time = static_cast<uint64_t>(_current_time);
- uint64_t oldest_key_time =
- mems_.front()->ApproximateOldestKeyTime();
- // It's not clear whether oldest_key_time is always available. In case
- // it is not available, use current_time.
- meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
- meta_.file_creation_time = current_time;
- uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
- CompactionStyle::kCompactionStyleFIFO)
- ? current_time
- : meta_.oldest_ancester_time;
- s = BuildTable(
- dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
- mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
- std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
- cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
- cfd_->GetName(), existing_snapshots_,
- earliest_write_conflict_snapshot_, snapshot_checker_,
- output_compression_, mutable_cf_options_.sample_for_compression,
- cfd_->ioptions()->compression_opts,
- mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
- TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
- Env::IO_HIGH, &table_properties_, 0 /* level */,
- creation_time, oldest_key_time, write_hint, current_time);
- LogFlush(db_options_.info_log);
- }
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
- " bytes %s"
- "%s",
- cfd_->GetName().c_str(), job_context_->job_id,
- meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
- s.ToString().c_str(),
- meta_.marked_for_compaction ? " (needs compaction)" : "");
- if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
- s = output_file_directory_->Fsync();
- }
- TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
- db_mutex_->Lock();
- }
- base_->Unref();
- // Note that if file_size is zero, the file has been deleted and
- // should not be added to the manifest.
- if (s.ok() && meta_.fd.GetFileSize() > 0) {
- // if we have more than 1 background thread, then we cannot
- // insert files directly into higher levels because some other
- // threads could be concurrently producing compacted files for
- // that key range.
- // Add file to L0
- edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
- meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
- meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
- meta_.marked_for_compaction, meta_.oldest_blob_file_number,
- meta_.oldest_ancester_time, meta_.file_creation_time,
- meta_.file_checksum, meta_.file_checksum_func_name);
- }
- #ifndef ROCKSDB_LITE
- // Piggyback FlushJobInfo on the first first flushed memtable.
- mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
- #endif // !ROCKSDB_LITE
- // Note that here we treat flush as level 0 compaction in internal stats
- InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
- stats.micros = db_options_.env->NowMicros() - start_micros;
- stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
- stats.bytes_written = meta_.fd.GetFileSize();
- RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
- cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
- cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
- meta_.fd.GetFileSize());
- RecordFlushIOStats();
- return s;
- }
- #ifndef ROCKSDB_LITE
- std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
- db_mutex_->AssertHeld();
- std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
- info->cf_id = cfd_->GetID();
- info->cf_name = cfd_->GetName();
- const uint64_t file_number = meta_.fd.GetNumber();
- info->file_path =
- MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
- info->file_number = file_number;
- info->oldest_blob_file_number = meta_.oldest_blob_file_number;
- info->thread_id = db_options_.env->GetThreadID();
- info->job_id = job_context_->job_id;
- info->smallest_seqno = meta_.fd.smallest_seqno;
- info->largest_seqno = meta_.fd.largest_seqno;
- info->table_properties = table_properties_;
- info->flush_reason = cfd_->GetFlushReason();
- return info;
- }
- #endif // !ROCKSDB_LITE
- } // namespace ROCKSDB_NAMESPACE
|