| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- // Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/db_impl/db_impl_follower.h"
- #include <algorithm>
- #include <cinttypes>
- #include "db/arena_wrapped_db_iter.h"
- #include "db/merge_context.h"
- #include "env/composite_env_wrapper.h"
- #include "env/fs_on_demand.h"
- #include "logging/auto_roll_logger.h"
- #include "logging/logging.h"
- #include "monitoring/perf_context_imp.h"
- #include "rocksdb/configurable.h"
- #include "rocksdb/db.h"
- #include "util/cast_util.h"
- #include "util/write_batch_util.h"
- namespace ROCKSDB_NAMESPACE {
- DBImplFollower::DBImplFollower(const DBOptions& db_options,
- std::unique_ptr<Env>&& env,
- const std::string& dbname, std::string src_path)
- : DBImplSecondary(db_options, dbname, ""),
- env_guard_(std::move(env)),
- stop_requested_(false),
- src_path_(std::move(src_path)),
- cv_(&mu_) {
- ROCKS_LOG_INFO(immutable_db_options_.info_log,
- "Opening the db in follower mode");
- LogFlush(immutable_db_options_.info_log);
- }
- DBImplFollower::~DBImplFollower() {
- Status s = Close();
- if (!s.ok()) {
- ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s",
- s.ToString().c_str());
- }
- }
- // Recover a follower DB instance by reading the MANIFEST. The verification
- // as part of the MANIFEST replay will ensure that local links to the
- // leader's files are created, thus ensuring we can continue reading them
- // even if the leader deletes those files due to compaction.
- // TODO:
- // 1. Devise a mechanism to prevent misconfiguration by, for example,
- // keeping a local copy of the IDENTITY file and cross checking
- // 2. Make the recovery more robust by retrying if the first attempt
- // fails.
- Status DBImplFollower::Recover(
- const std::vector<ColumnFamilyDescriptor>& column_families,
- bool /*readonly*/, bool /*error_if_wal_file_exists*/,
- bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
- RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
- mutex_.AssertHeld();
- JobContext job_context(0);
- Status s;
- s = static_cast<ReactiveVersionSet*>(versions_.get())
- ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
- &manifest_reader_status_);
- if (!s.ok()) {
- if (manifest_reader_status_) {
- manifest_reader_status_->PermitUncheckedError();
- }
- return s;
- }
- if (s.ok()) {
- default_cf_handle_ = new ColumnFamilyHandleImpl(
- versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
- default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
- // Start the periodic catch-up thread
- // TODO: See if it makes sense to have a threadpool, rather than a thread
- // per follower DB instance
- catch_up_thread_.reset(
- new port::Thread(&DBImplFollower::PeriodicRefresh, this));
- }
- return s;
- }
- // Try to catch up by tailing the MANIFEST.
- // TODO:
- // 1. Cleanup obsolete files afterward
- // 2. Add some error notifications and statistics
- Status DBImplFollower::TryCatchUpWithLeader() {
- assert(versions_.get() != nullptr);
- assert(manifest_reader_.get() != nullptr);
- Status s;
- TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin1");
- TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin2");
- // read the manifest and apply new changes to the follower instance
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- JobContext job_context(0, true /*create_superversion*/);
- {
- InstrumentedMutexLock lock_guard(&mutex_);
- std::vector<std::string> files_to_delete;
- s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
- ->ReadAndApply(&mutex_, &manifest_reader_,
- manifest_reader_status_.get(), &cfds_changed,
- &files_to_delete);
- ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_);
- pending_outputs_inserted_elem_.reset(new std::list<uint64_t>::iterator(
- CaptureCurrentFileNumberInPendingOutputs()));
- ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
- static_cast<uint64_t>(versions_->LastSequence()));
- ROCKS_LOG_INFO(
- immutable_db_options_.info_log, "Next file number is %" PRIu64,
- static_cast<uint64_t>(versions_->current_next_file_number()));
- for (ColumnFamilyData* cfd : cfds_changed) {
- if (cfd->IsDropped()) {
- ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
- cfd->GetName().c_str());
- continue;
- }
- VersionStorageInfo::LevelSummaryStorage tmp;
- ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
- "[%s] Level summary: %s\n", cfd->GetName().c_str(),
- cfd->current()->storage_info()->LevelSummary(&tmp));
- }
- if (s.ok()) {
- for (auto cfd : cfds_changed) {
- if (cfd->mem()->GetEarliestSequenceNumber() <
- versions_->LastSequence()) {
- // Construct a new memtable with earliest sequence number set to the
- // last sequence number in the VersionSet. This matters when
- // DBImpl::MultiCFSnapshot tries to get consistent references
- // to super versions in a lock free manner, it checks the earliest
- // sequence number to detect if there was a change in version in
- // the meantime.
- MemTable* new_mem = cfd->ConstructNewMemtable(
- cfd->GetLatestMutableCFOptions(), versions_->LastSequence());
- cfd->mem()->SetNextLogNumber(cfd->GetLogNumber());
- cfd->mem()->ConstructFragmentedRangeTombstones();
- cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free);
- new_mem->Ref();
- cfd->SetMemtable(new_mem);
- }
- // This will check if the old memtable is still referenced
- cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
- &job_context.memtables_to_free);
- auto& sv_context = job_context.superversion_contexts.back();
- cfd->InstallSuperVersion(&sv_context, &mutex_);
- sv_context.NewSuperVersion();
- }
- }
- for (auto& file : files_to_delete) {
- IOStatus io_s = fs_->DeleteFile(file, IOOptions(), nullptr);
- if (!io_s.ok()) {
- ROCKS_LOG_INFO(immutable_db_options_.info_log,
- "Cannot delete file %s: %s", file.c_str(),
- io_s.ToString().c_str());
- }
- }
- }
- job_context.Clean();
- // Cleanup unused, obsolete files.
- JobContext purge_files_job_context(0);
- {
- InstrumentedMutexLock lock_guard(&mutex_);
- // Currently, follower instance does not create any database files, thus
- // is unnecessary for the follower to force full scan.
- FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
- }
- if (purge_files_job_context.HaveSomethingToDelete()) {
- PurgeObsoleteFiles(purge_files_job_context);
- }
- purge_files_job_context.Clean();
- TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:End");
- return s;
- }
- void DBImplFollower::PeriodicRefresh() {
- while (!stop_requested_.load()) {
- MutexLock l(&mu_);
- int64_t wait_until =
- immutable_db_options_.clock->NowMicros() +
- immutable_db_options_.follower_refresh_catchup_period_ms * 1000;
- immutable_db_options_.clock->TimedWait(
- &cv_, std::chrono::microseconds(wait_until));
- if (stop_requested_.load()) {
- break;
- }
- Status s;
- for (uint64_t i = 0;
- i < immutable_db_options_.follower_catchup_retry_count &&
- !stop_requested_.load();
- ++i) {
- s = TryCatchUpWithLeader();
- if (s.ok()) {
- ROCKS_LOG_INFO(immutable_db_options_.info_log,
- "Successful catch up on attempt %llu",
- static_cast<unsigned long long>(i));
- break;
- }
- wait_until = immutable_db_options_.clock->NowMicros() +
- immutable_db_options_.follower_catchup_retry_wait_ms * 1000;
- immutable_db_options_.clock->TimedWait(
- &cv_, std::chrono::microseconds(wait_until));
- }
- if (!s.ok()) {
- ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful");
- }
- }
- }
- Status DBImplFollower::Close() {
- if (catch_up_thread_) {
- stop_requested_.store(true);
- {
- MutexLock l(&mu_);
- cv_.SignalAll();
- }
- catch_up_thread_->join();
- catch_up_thread_.reset();
- }
- ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_);
- return DBImpl::Close();
- }
- Status DB::OpenAsFollower(const Options& options, const std::string& dbname,
- const std::string& leader_path,
- std::unique_ptr<DB>* dbptr) {
- dbptr->reset();
- DBOptions db_options(options);
- ColumnFamilyOptions cf_options(options);
- std::vector<ColumnFamilyDescriptor> column_families;
- column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
- std::vector<ColumnFamilyHandle*> handles;
- Status s = DB::OpenAsFollower(db_options, dbname, leader_path,
- column_families, &handles, dbptr);
- if (s.ok()) {
- assert(handles.size() == 1);
- delete handles[0];
- }
- return s;
- }
- Status DB::OpenAsFollower(
- const DBOptions& db_options, const std::string& dbname,
- const std::string& src_path,
- const std::vector<ColumnFamilyDescriptor>& column_families,
- std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr) {
- dbptr->reset();
- FileSystem* fs = db_options.env->GetFileSystem().get();
- {
- IOStatus io_s;
- if (db_options.create_if_missing) {
- io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr);
- } else {
- io_s = fs->FileExists(dbname, IOOptions(), nullptr);
- }
- if (!io_s.ok()) {
- return static_cast<Status>(io_s);
- }
- }
- std::unique_ptr<Env> new_env(new CompositeEnvWrapper(
- db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(),
- src_path, dbname)));
- DBOptions tmp_opts(db_options);
- Status s;
- tmp_opts.env = new_env.get();
- if (nullptr == tmp_opts.info_log) {
- s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log);
- if (!s.ok()) {
- tmp_opts.info_log = nullptr;
- return s;
- }
- }
- handles->clear();
- DBImplFollower* impl =
- new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path);
- impl->versions_.reset(new ReactiveVersionSet(
- dbname, &impl->immutable_db_options_, impl->file_options_,
- impl->table_cache_.get(), impl->write_buffer_manager_,
- &impl->write_controller_, impl->io_tracer_));
- impl->column_family_memtables_.reset(
- new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
- impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
- impl->mutex_.Lock();
- s = impl->Recover(column_families, /*read_only=*/true,
- /*error_if_wal_file_exists=*/false,
- /*error_if_data_exists_in_wals=*/false);
- if (s.ok()) {
- for (const auto& cf : column_families) {
- auto cfd =
- impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
- if (nullptr == cfd) {
- s = Status::InvalidArgument("Column family not found", cf.name);
- break;
- }
- handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
- }
- }
- SuperVersionContext sv_context(false /* create_superversion */);
- if (s.ok()) {
- for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
- sv_context.NewSuperVersion();
- cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
- }
- }
- impl->mutex_.Unlock();
- sv_context.Clean();
- if (s.ok()) {
- dbptr->reset(impl);
- for (auto h : *handles) {
- impl->NewThreadStatusCfInfo(
- static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
- }
- } else {
- for (auto h : *handles) {
- delete h;
- }
- handles->clear();
- delete impl;
- }
- return s;
- }
- } // namespace ROCKSDB_NAMESPACE
|