// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #include #include #include "db/db_impl/db_impl.h" #include "logging/logging.h" namespace ROCKSDB_NAMESPACE { // A wrapper class to hold log reader, log reporter, log status. class LogReaderContainer { public: LogReaderContainer() : reader_(nullptr), reporter_(nullptr), status_(nullptr) {} LogReaderContainer(Env* env, std::shared_ptr info_log, std::string fname, std::unique_ptr&& file_reader, uint64_t log_number) { LogReporter* reporter = new LogReporter(); status_ = new Status(); reporter->env = env; reporter->info_log = info_log.get(); reporter->fname = std::move(fname); reporter->status = status_; reporter_ = reporter; // We intentially make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader), reporter, true /*checksum*/, log_number); } log::FragmentBufferedReader* reader_; log::Reader::Reporter* reporter_; Status* status_; ~LogReaderContainer() { delete reader_; delete reporter_; delete status_; } private: struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; std::string fname; Status* status; // nullptr if immutable_db_options_.paranoid_checks==false void Corruption(size_t bytes, const Status& s, uint64_t /*log_number*/ = kMaxSequenceNumber) override { ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", (this->status == nullptr ? "(ignoring error) " : ""), fname.c_str(), static_cast(bytes), s.ToString().c_str()); if (this->status != nullptr && this->status->ok()) { *this->status = s; } } }; }; // The secondary instance shares access to the storage as the primary. // The secondary is able to read and replay changes described in both the // MANIFEST and the WAL files without coordination with the primary. // The secondary instance can be opened using `DB::OpenAsSecondary`. After // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best // effort attempts to catch up with the primary. // TODO: Share common structure with CompactedDBImpl and DBImplReadOnly class DBImplSecondary : public DBImpl { public: DBImplSecondary(const DBOptions& options, const std::string& dbname, std::string secondary_path); ~DBImplSecondary() override; // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_ // and log_readers_ to facilitate future operations. Status Recover(const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, bool error_if_data_exists_in_wals, bool is_retry = false, uint64_t* = nullptr, RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr) override; // Can return IOError due to files being deleted by the primary. To avoid // IOError in this case, application can coordinate between primary and // secondaries so that primary will not delete files that are currently being // used by the secondaries. The application can also provide a custom FS/Env // implementation so that files will remain present until all primary and // secondaries indicate that they can be deleted. As a partial hacky // workaround, the secondaries can be opened with `max_open_files=-1` so that // it eagerly keeps all talbe files open and is able to access the contents of // deleted files via prior open fd. using DBImpl::GetImpl; Status GetImpl(const ReadOptions& options, const Slice& key, GetImplOptions& get_impl_options) override; using DBImpl::NewIterator; // Operations on the created iterators can return IOError due to files being // deleted by the primary. To avoid IOError in this case, application can // coordinate between primary and secondaries so that primary will not delete // files that are currently being used by the secondaries. The application can // also provide a custom FS/Env implementation so that files will remain // present until all primary and secondaries indicate that they can be // deleted. As a partial hacky workaround, the secondaries can be opened with // `max_open_files=-1` so that it eagerly keeps all talbe files open and is // able to access the contents of deleted files via prior open fd. Iterator* NewIterator(const ReadOptions& _read_options, ColumnFamilyHandle* column_family) override; ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh, SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback, bool expose_blob_index = false, bool allow_refresh = true); Status NewIterators(const ReadOptions& _read_options, const std::vector& column_families, std::vector* iterators) override; using DBImpl::Put; Status Put(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::PutEntity; Status PutEntity(const WriteOptions& /* options */, ColumnFamilyHandle* /* column_family */, const Slice& /* key */, const WideColumns& /* columns */) override { return Status::NotSupported("Not supported operation in secondary mode."); } Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */, const AttributeGroups& /* attribute_groups */) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Merge; Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Delete; Status Delete(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SingleDelete; Status SingleDelete(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::CompactRange; Status CompactRange(const CompactRangeOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice* /*begin*/, const Slice* /*end*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::CompactFiles; Status CompactFiles( const CompactionOptions& /*compact_options*/, ColumnFamilyHandle* /*column_family*/, const std::vector& /*input_file_names*/, const int /*output_level*/, const int /*output_path_id*/ = -1, std::vector* const /*output_file_names*/ = nullptr, CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { return Status::NotSupported("Not supported operation in secondary mode."); } Status DisableFileDeletions() override { return Status::NotSupported("Not supported operation in secondary mode."); } Status EnableFileDeletions() override { return Status::NotSupported("Not supported operation in secondary mode."); } Status GetLiveFiles(std::vector&, uint64_t* /*manifest_file_size*/, bool /*flush_memtable*/ = true) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Flush; Status Flush(const FlushOptions& /*options*/, ColumnFamilyHandle* /*column_family*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SetDBOptions; Status SetDBOptions(const std::unordered_map& /*options_map*/) override { // Currently not supported because changing certain options may cause // flush/compaction. return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SetOptions; Status SetOptions( ColumnFamilyHandle* /*cfd*/, const std::unordered_map& /*options_map*/) override { // Currently not supported because changing certain options may cause // flush/compaction and/or write to MANIFEST. return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SyncWAL; Status SyncWAL() override { return Status::NotSupported("Not supported operation in secondary mode."); } using DB::IngestExternalFile; Status IngestExternalFile( ColumnFamilyHandle* /*column_family*/, const std::vector& /*external_files*/, const IngestExternalFileOptions& /*ingestion_options*/) override { return Status::NotSupported("Not supported operation in secondary mode."); } // Try to catch up with the primary by reading as much as possible from the // log files until there is nothing more to read or encounters an error. If // the amount of information in the log files to process is huge, this // method can take long time due to all the I/O and CPU costs. Status TryCatchUpWithPrimary() override; // Try to find log reader using log_number from log_readers_ map, initialize // if it doesn't exist Status MaybeInitLogReader(uint64_t log_number, log::FragmentBufferedReader** log_reader); #ifndef NDEBUG Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh, const CompactionServiceInput& input, CompactionServiceResult* result) { return CompactWithoutInstallation(options, cfh, input, result); } #endif // NDEBUG protected: Status FlushForGetLiveFiles() override { // No-op for read-only DB return Status::OK(); } bool OwnTablesAndLogs() const override { // Currently, the secondary instance does not own the database files. It // simply opens the files of the primary instance and tracks their file // descriptors until they become obsolete. In the future, the secondary may // create links to database files. OwnTablesAndLogs will return true then. return false; } std::unique_ptr manifest_reader_; std::unique_ptr manifest_reporter_; std::unique_ptr manifest_reader_status_; private: friend class DB; // No copying allowed DBImplSecondary(const DBImplSecondary&); void operator=(const DBImplSecondary&); using DBImpl::Recover; Status FindAndRecoverLogFiles( std::unordered_set* cfds_changed, JobContext* job_context); Status FindNewLogNumbers(std::vector* logs); // After manifest recovery, replay WALs and refresh log_readers_ if necessary // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, std::unordered_set* cfds_changed, JobContext* job_context); // Run compaction without installation, the output files will be placed in the // secondary DB path. The LSM tree won't be changed, the secondary DB is still // in read-only mode. Status CompactWithoutInstallation(const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh, const CompactionServiceInput& input, CompactionServiceResult* result); private: // Holds results of compaction progress files and output files from a single // directory scan struct CompactionProgressFilesScan { // The latest (newest) progress file filename std::optional latest_progress_filename; uint64_t latest_progress_timestamp = 0; // Older progress file filenames (to be deleted) autovector old_progress_filenames; // Temporary progress file filenames (to be deleted) autovector temp_progress_filenames; // All output file numbers - for cleanup optimization std::vector table_file_numbers; bool HasLatestProgressFile() const { return latest_progress_filename.has_value(); } void Clear() { latest_progress_filename.reset(); latest_progress_timestamp = 0; old_progress_filenames.clear(); temp_progress_filenames.clear(); table_file_numbers.clear(); } }; Status InitializeCompactionWorkspace( bool allow_resumption, std::unique_ptr* output_dir, std::unique_ptr* compaction_progress_writer); Status PrepareCompactionProgressState(); Status ScanCompactionProgressFiles(CompactionProgressFilesScan* scan_result); Status DeleteCompactionProgressFiles( const std::vector& filenames); Status CleanupOldAndTemporaryCompactionProgressFiles( bool preserve_latest, const CompactionProgressFilesScan& scan_result); Status LoadCompactionProgressAndCleanupExtraOutputFiles( const std::string& compaction_progress_file_path, const CompactionProgressFilesScan& scan_result); Status ParseCompactionProgressFile( const std::string& compaction_progress_file_path, CompactionProgress* compaction_progress); Status HandleInvalidOrNoCompactionProgress( const std::optional& compaction_progress_file_path, const CompactionProgressFilesScan& scan_result); Status CleanupPhysicalCompactionOutputFiles( bool preserve_tracked_files, const CompactionProgressFilesScan& scan_result); Status FinalizeCompactionProgressWriter( std::unique_ptr* compaction_progress_writer); Status CreateCompactionProgressWriter( const std::string& file_path, std::unique_ptr* compaction_progress_writer); Status PersistInitialCompactionProgress( log::Writer* compaction_progress_writer, const CompactionProgress& compaction_progress); Status RenameCompactionProgressFile(const std::string& temp_file_path, std::string* final_file_path); Status HandleCompactionProgressWriterCreationFailure( const std::string& temp_file_path, const std::string& final_file_path, std::unique_ptr* compaction_progress_writer); uint64_t CalculateResumedCompactionBytes( const CompactionProgress& compaction_progress) const; // Cache log readers for each log number, used for continue WAL replay // after recovery std::map> log_readers_; // Current WAL number replayed for each column family. std::unordered_map cfd_to_current_log_; const std::string secondary_path_; CompactionProgress compaction_progress_; }; } // namespace ROCKSDB_NAMESPACE