| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#pragma once#include <atomic>#include <deque>#include <functional>#include <limits>#include <set>#include <string>#include <utility>#include <vector>#include "db/column_family.h"#include "db/compaction/compaction_iterator.h"#include "db/dbformat.h"#include "db/flush_scheduler.h"#include "db/internal_stats.h"#include "db/job_context.h"#include "db/log_writer.h"#include "db/memtable_list.h"#include "db/range_del_aggregator.h"#include "db/version_edit.h"#include "db/write_controller.h"#include "db/write_thread.h"#include "logging/event_logger.h"#include "options/cf_options.h"#include "options/db_options.h"#include "port/port.h"#include "rocksdb/compaction_filter.h"#include "rocksdb/compaction_job_stats.h"#include "rocksdb/db.h"#include "rocksdb/env.h"#include "rocksdb/memtablerep.h"#include "rocksdb/transaction_log.h"#include "table/scoped_arena_iterator.h"#include "util/autovector.h"#include "util/stop_watch.h"#include "util/thread_local.h"namespace ROCKSDB_NAMESPACE {class Arena;class ErrorHandler;class MemTable;class SnapshotChecker;class TableCache;class Version;class VersionEdit;class VersionSet;// CompactionJob is responsible for executing the compaction. Each (manual or// automated) compaction corresponds to a CompactionJob object, and usually// goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob// will divide the compaction into subcompactions and execute them in parallel// if needed.class CompactionJob { public:  CompactionJob(int job_id, Compaction* compaction,                const ImmutableDBOptions& db_options,                const FileOptions& file_options, VersionSet* versions,                const std::atomic<bool>* shutting_down,                const SequenceNumber preserve_deletes_seqnum,                LogBuffer* log_buffer, Directory* db_directory,                Directory* output_directory, Statistics* stats,                InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,                std::vector<SequenceNumber> existing_snapshots,                SequenceNumber earliest_write_conflict_snapshot,                const SnapshotChecker* snapshot_checker,                std::shared_ptr<Cache> table_cache, EventLogger* event_logger,                bool paranoid_file_checks, bool measure_io_stats,                const std::string& dbname,                CompactionJobStats* compaction_job_stats,                Env::Priority thread_pri,                const std::atomic<bool>* manual_compaction_paused = nullptr);  ~CompactionJob();  // no copy/move  CompactionJob(CompactionJob&& job) = delete;  CompactionJob(const CompactionJob& job) = delete;  CompactionJob& operator=(const CompactionJob& job) = delete;  // REQUIRED: mutex held  // Prepare for the compaction by setting up boundaries for each subcompaction  void Prepare();  // REQUIRED mutex not held  // Launch threads for each subcompaction and wait for them to finish. After  // that, verify table is usable and finally do bookkeeping to unify  // subcompaction results  Status Run();  // REQUIRED: mutex held  // Add compaction input/output to the current version  Status Install(const MutableCFOptions& mutable_cf_options); private:  struct SubcompactionState;  void AggregateStatistics();  // Generates a histogram representing potential divisions of key ranges from  // the input. It adds the starting and/or ending keys of certain input files  // to the working set and then finds the approximate size of data in between  // each consecutive pair of slices. Then it divides these ranges into  // consecutive groups such that each group has a similar size.  void GenSubcompactionBoundaries();  // update the thread status for starting a compaction.  void ReportStartedCompaction(Compaction* compaction);  void AllocateCompactionOutputFileNumbers();  // Call compaction filter. Then iterate through input and compact the  // kv-pairs  void ProcessKeyValueCompaction(SubcompactionState* sub_compact);  Status FinishCompactionOutputFile(      const Status& input_status, SubcompactionState* sub_compact,      CompactionRangeDelAggregator* range_del_agg,      CompactionIterationStats* range_del_out_stats,      const Slice* next_table_min_key = nullptr);  Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);  void RecordCompactionIOStats();  Status OpenCompactionOutputFile(SubcompactionState* sub_compact);  void CleanupCompaction();  void UpdateCompactionJobStats(    const InternalStats::CompactionStats& stats) const;  void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,                         CompactionJobStats* compaction_job_stats = nullptr);  void UpdateCompactionStats();  void UpdateCompactionInputStatsHelper(      int* num_files, uint64_t* bytes_read, int input_level);  void LogCompaction();  int job_id_;  // CompactionJob state  struct CompactionState;  CompactionState* compact_;  CompactionJobStats* compaction_job_stats_;  InternalStats::CompactionStats compaction_stats_;  // DBImpl state  const std::string& dbname_;  const ImmutableDBOptions& db_options_;  const FileOptions file_options_;  Env* env_;  FileSystem* fs_;  // env_option optimized for compaction table reads  FileOptions file_options_for_read_;  VersionSet* versions_;  const std::atomic<bool>* shutting_down_;  const std::atomic<bool>* manual_compaction_paused_;  const SequenceNumber preserve_deletes_seqnum_;  LogBuffer* log_buffer_;  Directory* db_directory_;  Directory* output_directory_;  Statistics* stats_;  InstrumentedMutex* db_mutex_;  ErrorHandler* db_error_handler_;  // If there were two snapshots with seq numbers s1 and  // s2 and s1 < s2, and if we find two instances of a key k1 then lies  // entirely within s1 and s2, then the earlier version of k1 can be safely  // deleted because that version is not visible in any snapshot.  std::vector<SequenceNumber> existing_snapshots_;  // This is the earliest snapshot that could be used for write-conflict  // checking by a transaction.  For any user-key newer than this snapshot, we  // should make sure not to remove evidence that a write occurred.  SequenceNumber earliest_write_conflict_snapshot_;  const SnapshotChecker* const snapshot_checker_;  std::shared_ptr<Cache> table_cache_;  EventLogger* event_logger_;  // Is this compaction creating a file in the bottom most level?  bool bottommost_level_;  bool paranoid_file_checks_;  bool measure_io_stats_;  // Stores the Slices that designate the boundaries for each subcompaction  std::vector<Slice> boundaries_;  // Stores the approx size of keys covered in the range of each subcompaction  std::vector<uint64_t> sizes_;  Env::WriteLifeTimeHint write_hint_;  Env::Priority thread_pri_;};}  // namespace ROCKSDB_NAMESPACE
 |