| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #include <string>
- #include <vector>
- #include "db/db_impl/db_impl.h"
- #include "logging/logging.h"
- namespace ROCKSDB_NAMESPACE {
- // A wrapper class to hold log reader, log reporter, log status.
- class LogReaderContainer {
- public:
- LogReaderContainer()
- : reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
- LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
- std::string fname,
- std::unique_ptr<SequentialFileReader>&& file_reader,
- uint64_t log_number) {
- LogReporter* reporter = new LogReporter();
- status_ = new Status();
- reporter->env = env;
- reporter->info_log = info_log.get();
- reporter->fname = std::move(fname);
- reporter->status = status_;
- reporter_ = reporter;
- // We intentially make log::Reader do checksumming even if
- // paranoid_checks==false so that corruptions cause entire commits
- // to be skipped instead of propagating bad information (like overly
- // large sequence numbers).
- reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
- reporter, true /*checksum*/,
- log_number);
- }
- log::FragmentBufferedReader* reader_;
- log::Reader::Reporter* reporter_;
- Status* status_;
- ~LogReaderContainer() {
- delete reader_;
- delete reporter_;
- delete status_;
- }
- private:
- struct LogReporter : public log::Reader::Reporter {
- Env* env;
- Logger* info_log;
- std::string fname;
- Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
- void Corruption(size_t bytes, const Status& s,
- uint64_t /*log_number*/ = kMaxSequenceNumber) override {
- ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
- (this->status == nullptr ? "(ignoring error) " : ""),
- fname.c_str(), static_cast<int>(bytes),
- s.ToString().c_str());
- if (this->status != nullptr && this->status->ok()) {
- *this->status = s;
- }
- }
- };
- };
- // The secondary instance shares access to the storage as the primary.
- // The secondary is able to read and replay changes described in both the
- // MANIFEST and the WAL files without coordination with the primary.
- // The secondary instance can be opened using `DB::OpenAsSecondary`. After
- // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best
- // effort attempts to catch up with the primary.
- // TODO: Share common structure with CompactedDBImpl and DBImplReadOnly
- class DBImplSecondary : public DBImpl {
- public:
- DBImplSecondary(const DBOptions& options, const std::string& dbname,
- std::string secondary_path);
- ~DBImplSecondary() override;
- // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_
- // and log_readers_ to facilitate future operations.
- Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
- bool read_only, bool error_if_wal_file_exists,
- bool error_if_data_exists_in_wals, bool is_retry = false,
- uint64_t* = nullptr, RecoveryContext* recovery_ctx = nullptr,
- bool* can_retry = nullptr) override;
- // Can return IOError due to files being deleted by the primary. To avoid
- // IOError in this case, application can coordinate between primary and
- // secondaries so that primary will not delete files that are currently being
- // used by the secondaries. The application can also provide a custom FS/Env
- // implementation so that files will remain present until all primary and
- // secondaries indicate that they can be deleted. As a partial hacky
- // workaround, the secondaries can be opened with `max_open_files=-1` so that
- // it eagerly keeps all talbe files open and is able to access the contents of
- // deleted files via prior open fd.
- using DBImpl::GetImpl;
- Status GetImpl(const ReadOptions& options, const Slice& key,
- GetImplOptions& get_impl_options) override;
- using DBImpl::NewIterator;
- // Operations on the created iterators can return IOError due to files being
- // deleted by the primary. To avoid IOError in this case, application can
- // coordinate between primary and secondaries so that primary will not delete
- // files that are currently being used by the secondaries. The application can
- // also provide a custom FS/Env implementation so that files will remain
- // present until all primary and secondaries indicate that they can be
- // deleted. As a partial hacky workaround, the secondaries can be opened with
- // `max_open_files=-1` so that it eagerly keeps all talbe files open and is
- // able to access the contents of deleted files via prior open fd.
- Iterator* NewIterator(const ReadOptions& _read_options,
- ColumnFamilyHandle* column_family) override;
- ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
- ColumnFamilyHandleImpl* cfh,
- SuperVersion* sv, SequenceNumber snapshot,
- ReadCallback* read_callback,
- bool expose_blob_index = false,
- bool allow_refresh = true);
- Status NewIterators(const ReadOptions& _read_options,
- const std::vector<ColumnFamilyHandle*>& column_families,
- std::vector<Iterator*>* iterators) override;
- using DBImpl::Put;
- Status Put(const WriteOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
- const Slice& /*value*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::PutEntity;
- Status PutEntity(const WriteOptions& /* options */,
- ColumnFamilyHandle* /* column_family */,
- const Slice& /* key */,
- const WideColumns& /* columns */) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
- const AttributeGroups& /* attribute_groups */) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::Merge;
- Status Merge(const WriteOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
- const Slice& /*value*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::Delete;
- Status Delete(const WriteOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/,
- const Slice& /*key*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::SingleDelete;
- Status SingleDelete(const WriteOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/,
- const Slice& /*key*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- Status Write(const WriteOptions& /*options*/,
- WriteBatch* /*updates*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::CompactRange;
- Status CompactRange(const CompactRangeOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/,
- const Slice* /*begin*/, const Slice* /*end*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::CompactFiles;
- Status CompactFiles(
- const CompactionOptions& /*compact_options*/,
- ColumnFamilyHandle* /*column_family*/,
- const std::vector<std::string>& /*input_file_names*/,
- const int /*output_level*/, const int /*output_path_id*/ = -1,
- std::vector<std::string>* const /*output_file_names*/ = nullptr,
- CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- Status DisableFileDeletions() override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- Status EnableFileDeletions() override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- Status GetLiveFiles(std::vector<std::string>&,
- uint64_t* /*manifest_file_size*/,
- bool /*flush_memtable*/ = true) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::Flush;
- Status Flush(const FlushOptions& /*options*/,
- ColumnFamilyHandle* /*column_family*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::SetDBOptions;
- Status SetDBOptions(const std::unordered_map<std::string, std::string>&
- /*options_map*/) override {
- // Currently not supported because changing certain options may cause
- // flush/compaction.
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::SetOptions;
- Status SetOptions(
- ColumnFamilyHandle* /*cfd*/,
- const std::unordered_map<std::string, std::string>& /*options_map*/)
- override {
- // Currently not supported because changing certain options may cause
- // flush/compaction and/or write to MANIFEST.
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DBImpl::SyncWAL;
- Status SyncWAL() override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- using DB::IngestExternalFile;
- Status IngestExternalFile(
- ColumnFamilyHandle* /*column_family*/,
- const std::vector<std::string>& /*external_files*/,
- const IngestExternalFileOptions& /*ingestion_options*/) override {
- return Status::NotSupported("Not supported operation in secondary mode.");
- }
- // Try to catch up with the primary by reading as much as possible from the
- // log files until there is nothing more to read or encounters an error. If
- // the amount of information in the log files to process is huge, this
- // method can take long time due to all the I/O and CPU costs.
- Status TryCatchUpWithPrimary() override;
- // Try to find log reader using log_number from log_readers_ map, initialize
- // if it doesn't exist
- Status MaybeInitLogReader(uint64_t log_number,
- log::FragmentBufferedReader** log_reader);
- #ifndef NDEBUG
- Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options,
- ColumnFamilyHandle* cfh,
- const CompactionServiceInput& input,
- CompactionServiceResult* result) {
- return CompactWithoutInstallation(options, cfh, input, result);
- }
- #endif // NDEBUG
- protected:
- Status FlushForGetLiveFiles() override {
- // No-op for read-only DB
- return Status::OK();
- }
- bool OwnTablesAndLogs() const override {
- // Currently, the secondary instance does not own the database files. It
- // simply opens the files of the primary instance and tracks their file
- // descriptors until they become obsolete. In the future, the secondary may
- // create links to database files. OwnTablesAndLogs will return true then.
- return false;
- }
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
- std::unique_ptr<Status> manifest_reader_status_;
- private:
- friend class DB;
- // No copying allowed
- DBImplSecondary(const DBImplSecondary&);
- void operator=(const DBImplSecondary&);
- using DBImpl::Recover;
- Status FindAndRecoverLogFiles(
- std::unordered_set<ColumnFamilyData*>* cfds_changed,
- JobContext* job_context);
- Status FindNewLogNumbers(std::vector<uint64_t>* logs);
- // After manifest recovery, replay WALs and refresh log_readers_ if necessary
- // REQUIRES: log_numbers are sorted in ascending order
- Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
- SequenceNumber* next_sequence,
- std::unordered_set<ColumnFamilyData*>* cfds_changed,
- JobContext* job_context);
- // Run compaction without installation, the output files will be placed in the
- // secondary DB path. The LSM tree won't be changed, the secondary DB is still
- // in read-only mode.
- Status CompactWithoutInstallation(const OpenAndCompactOptions& options,
- ColumnFamilyHandle* cfh,
- const CompactionServiceInput& input,
- CompactionServiceResult* result);
- private:
- // Holds results of compaction progress files and output files from a single
- // directory scan
- struct CompactionProgressFilesScan {
- // The latest (newest) progress file filename
- std::optional<std::string> latest_progress_filename;
- uint64_t latest_progress_timestamp = 0;
- // Older progress file filenames (to be deleted)
- autovector<std::string> old_progress_filenames;
- // Temporary progress file filenames (to be deleted)
- autovector<std::string> temp_progress_filenames;
- // All output file numbers - for cleanup optimization
- std::vector<uint64_t> table_file_numbers;
- bool HasLatestProgressFile() const {
- return latest_progress_filename.has_value();
- }
- void Clear() {
- latest_progress_filename.reset();
- latest_progress_timestamp = 0;
- old_progress_filenames.clear();
- temp_progress_filenames.clear();
- table_file_numbers.clear();
- }
- };
- Status InitializeCompactionWorkspace(
- bool allow_resumption, std::unique_ptr<FSDirectory>* output_dir,
- std::unique_ptr<log::Writer>* compaction_progress_writer);
- Status PrepareCompactionProgressState();
- Status ScanCompactionProgressFiles(CompactionProgressFilesScan* scan_result);
- Status DeleteCompactionProgressFiles(
- const std::vector<std::string>& filenames);
- Status CleanupOldAndTemporaryCompactionProgressFiles(
- bool preserve_latest, const CompactionProgressFilesScan& scan_result);
- Status LoadCompactionProgressAndCleanupExtraOutputFiles(
- const std::string& compaction_progress_file_path,
- const CompactionProgressFilesScan& scan_result);
- Status ParseCompactionProgressFile(
- const std::string& compaction_progress_file_path,
- CompactionProgress* compaction_progress);
- Status HandleInvalidOrNoCompactionProgress(
- const std::optional<std::string>& compaction_progress_file_path,
- const CompactionProgressFilesScan& scan_result);
- Status CleanupPhysicalCompactionOutputFiles(
- bool preserve_tracked_files,
- const CompactionProgressFilesScan& scan_result);
- Status FinalizeCompactionProgressWriter(
- std::unique_ptr<log::Writer>* compaction_progress_writer);
- Status CreateCompactionProgressWriter(
- const std::string& file_path,
- std::unique_ptr<log::Writer>* compaction_progress_writer);
- Status PersistInitialCompactionProgress(
- log::Writer* compaction_progress_writer,
- const CompactionProgress& compaction_progress);
- Status RenameCompactionProgressFile(const std::string& temp_file_path,
- std::string* final_file_path);
- Status HandleCompactionProgressWriterCreationFailure(
- const std::string& temp_file_path, const std::string& final_file_path,
- std::unique_ptr<log::Writer>* compaction_progress_writer);
- uint64_t CalculateResumedCompactionBytes(
- const CompactionProgress& compaction_progress) const;
- // Cache log readers for each log number, used for continue WAL replay
- // after recovery
- std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
- // Current WAL number replayed for each column family.
- std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
- const std::string secondary_path_;
- CompactionProgress compaction_progress_;
- };
- } // namespace ROCKSDB_NAMESPACE
|