| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/version_edit_handler.h"
- #include <cinttypes>
- #include <sstream>
- #include "db/blob/blob_file_reader.h"
- #include "db/blob/blob_source.h"
- #include "db/version_edit.h"
- #include "logging/logging.h"
- #include "monitoring/persistent_stats_history.h"
- #include "util/udt_util.h"
- namespace ROCKSDB_NAMESPACE {
- void VersionEditHandlerBase::Iterate(log::Reader& reader,
- Status* log_read_status) {
- Slice record;
- std::string scratch;
- assert(log_read_status);
- assert(log_read_status->ok());
- [[maybe_unused]] size_t recovered_edits = 0;
- Status s = Initialize();
- while (reader.LastRecordEnd() < max_manifest_read_size_ && s.ok() &&
- reader.ReadRecord(&record, &scratch) && log_read_status->ok()) {
- VersionEdit edit;
- s = edit.DecodeFrom(record);
- if (s.ok()) {
- s = read_buffer_.AddEdit(&edit);
- }
- if (s.ok()) {
- ColumnFamilyData* cfd = nullptr;
- if (edit.IsInAtomicGroup()) {
- if (read_buffer_.IsFull()) {
- s = OnAtomicGroupReplayBegin();
- for (size_t i = 0; s.ok() && i < read_buffer_.replay_buffer().size();
- i++) {
- auto& e = read_buffer_.replay_buffer()[i];
- s = ApplyVersionEdit(e, &cfd);
- if (s.ok()) {
- recovered_edits++;
- }
- }
- if (s.ok()) {
- read_buffer_.Clear();
- s = OnAtomicGroupReplayEnd();
- }
- }
- } else {
- s = ApplyVersionEdit(edit, &cfd);
- if (s.ok()) {
- recovered_edits++;
- }
- }
- }
- }
- if (s.ok() && !log_read_status->ok()) {
- s = *log_read_status;
- }
- CheckIterationResult(reader, &s);
- if (!s.ok()) {
- if (s.IsCorruption()) {
- // when we find a Corruption error, something is
- // wrong with the underlying file. in this case we
- // want to report the filename, so in here we append
- // the filename to the Corruption message
- assert(reader.file());
- // build a new error message
- std::stringstream message;
- // append previous dynamic state message
- const char* state = s.getState();
- if (state != nullptr) {
- message << state;
- message << ' ';
- }
- // append the filename to the corruption message
- message << " The file " << reader.file()->file_name()
- << " may be corrupted.";
- // overwrite the status with the extended status
- s = Status(s.code(), s.subcode(), s.severity(), message.str());
- }
- status_ = s;
- }
- TEST_SYNC_POINT_CALLBACK("VersionEditHandlerBase::Iterate:Finish",
- &recovered_edits);
- }
- Status ListColumnFamiliesHandler::ApplyVersionEdit(
- VersionEdit& edit, ColumnFamilyData** /*unused*/) {
- Status s;
- uint32_t cf_id = edit.GetColumnFamily();
- if (edit.IsColumnFamilyAdd()) {
- if (column_family_names_.find(cf_id) != column_family_names_.end()) {
- s = Status::Corruption("Manifest adding the same column family twice");
- } else {
- column_family_names_.insert({cf_id, edit.GetColumnFamilyName()});
- }
- } else if (edit.IsColumnFamilyDrop()) {
- if (column_family_names_.find(cf_id) == column_family_names_.end()) {
- s = Status::Corruption("Manifest - dropping non-existing column family");
- } else {
- column_family_names_.erase(cf_id);
- }
- }
- return s;
- }
- Status FileChecksumRetriever::FetchFileChecksumList(
- FileChecksumList& file_checksum_list) {
- Status s = Status::OK();
- for (const auto& [cf, file_checksums] : cf_file_checksums_) {
- [[maybe_unused]] const auto& _ = cf;
- for (const auto& [file_number, info] : file_checksums) {
- if (!(s = file_checksum_list.InsertOneFileChecksum(
- file_number, info.first, info.second))
- .ok()) {
- break;
- }
- }
- }
- return s;
- }
- Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
- ColumnFamilyData** /*unused*/) {
- uint32_t column_family_id = edit.GetColumnFamily();
- if (edit.IsColumnFamilyDrop()) {
- cf_file_checksums_.erase(column_family_id);
- }
- for (const auto& deleted_file : edit.GetDeletedFiles()) {
- if (cf_file_checksums_.find(column_family_id) == cf_file_checksums_.end()) {
- return Status::NotFound();
- }
- if (cf_file_checksums_[column_family_id].find(deleted_file.second) ==
- cf_file_checksums_[column_family_id].end()) {
- return Status::NotFound();
- }
- cf_file_checksums_[column_family_id].erase(deleted_file.second);
- }
- for (const auto& new_file : edit.GetNewFiles()) {
- cf_file_checksums_[column_family_id].emplace(
- new_file.second.fd.GetNumber(),
- std::make_pair(new_file.second.file_checksum,
- new_file.second.file_checksum_func_name));
- }
- for (const auto& new_blob_file : edit.GetBlobFileAdditions()) {
- std::string checksum_value = new_blob_file.GetChecksumValue();
- std::string checksum_method = new_blob_file.GetChecksumMethod();
- assert(checksum_value.empty() == checksum_method.empty());
- if (checksum_method.empty()) {
- checksum_value = kUnknownFileChecksum;
- checksum_method = kUnknownFileChecksumFuncName;
- }
- cf_file_checksums_[column_family_id].emplace(
- new_blob_file.GetBlobFileNumber(),
- std::make_pair(checksum_value, checksum_method));
- }
- return Status::OK();
- }
- VersionEditHandler::VersionEditHandler(
- bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
- VersionSet* version_set, bool track_found_and_missing_files,
- bool no_error_if_files_missing, const std::shared_ptr<IOTracer>& io_tracer,
- const ReadOptions& read_options, bool skip_load_table_files,
- bool allow_incomplete_valid_version,
- EpochNumberRequirement epoch_number_requirement)
- : VersionEditHandlerBase(read_options),
- read_only_(read_only),
- column_families_(std::move(column_families)),
- version_set_(version_set),
- track_found_and_missing_files_(track_found_and_missing_files),
- no_error_if_files_missing_(no_error_if_files_missing),
- io_tracer_(io_tracer),
- skip_load_table_files_(skip_load_table_files),
- initialized_(false),
- allow_incomplete_valid_version_(allow_incomplete_valid_version),
- epoch_number_requirement_(epoch_number_requirement) {
- assert(version_set_ != nullptr);
- }
- Status VersionEditHandler::Initialize() {
- Status s;
- if (!initialized_) {
- for (const auto& cf_desc : column_families_) {
- name_to_options_.emplace(cf_desc.name, cf_desc.options);
- }
- auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName);
- if (default_cf_iter == name_to_options_.end()) {
- s = Status::InvalidArgument("Default column family not specified");
- }
- if (s.ok()) {
- VersionEdit default_cf_edit;
- default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
- default_cf_edit.SetColumnFamily(0);
- ColumnFamilyData* cfd =
- CreateCfAndInit(default_cf_iter->second, default_cf_edit);
- assert(cfd != nullptr);
- #ifdef NDEBUG
- (void)cfd;
- #endif
- initialized_ = true;
- }
- }
- return s;
- }
- Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- Status s;
- if (edit.IsColumnFamilyAdd()) {
- s = OnColumnFamilyAdd(edit, cfd);
- } else if (edit.IsColumnFamilyDrop()) {
- s = OnColumnFamilyDrop(edit, cfd);
- } else if (edit.IsWalAddition()) {
- s = OnWalAddition(edit);
- } else if (edit.IsWalDeletion()) {
- s = OnWalDeletion(edit);
- } else {
- s = OnNonCfOperation(edit, cfd);
- }
- if (s.ok()) {
- assert(cfd != nullptr);
- s = ExtractInfoFromVersionEdit(*cfd, edit);
- }
- return s;
- }
- Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- bool do_not_open_cf = false;
- bool cf_in_builders = false;
- CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
- assert(cfd != nullptr);
- *cfd = nullptr;
- const std::string& cf_name = edit.GetColumnFamilyName();
- Status s;
- if (cf_in_builders || do_not_open_cf) {
- s = Status::Corruption("MANIFEST adding the same column family twice: " +
- cf_name);
- }
- if (s.ok()) {
- auto cf_options = name_to_options_.find(cf_name);
- // implicitly add persistent_stats column family without requiring user
- // to specify
- ColumnFamilyData* tmp_cfd = nullptr;
- bool is_persistent_stats_column_family =
- cf_name.compare(kPersistentStatsColumnFamilyName) == 0;
- if (cf_options == name_to_options_.end() &&
- !is_persistent_stats_column_family) {
- do_not_open_column_families_.emplace(edit.GetColumnFamily(), cf_name);
- } else {
- if (is_persistent_stats_column_family) {
- ColumnFamilyOptions cfo;
- OptimizeForPersistentStats(&cfo);
- tmp_cfd = CreateCfAndInit(cfo, edit);
- } else {
- tmp_cfd = CreateCfAndInit(cf_options->second, edit);
- }
- *cfd = tmp_cfd;
- }
- }
- return s;
- }
- Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- bool do_not_open_cf = false;
- bool cf_in_builders = false;
- CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
- assert(cfd != nullptr);
- *cfd = nullptr;
- ColumnFamilyData* tmp_cfd = nullptr;
- Status s;
- if (cf_in_builders) {
- tmp_cfd = DestroyCfAndCleanup(edit);
- } else if (do_not_open_cf) {
- do_not_open_column_families_.erase(edit.GetColumnFamily());
- } else {
- s = Status::Corruption("MANIFEST - dropping non-existing column family");
- }
- *cfd = tmp_cfd;
- return s;
- }
- Status VersionEditHandler::OnWalAddition(VersionEdit& edit) {
- assert(edit.IsWalAddition());
- return version_set_->wals_.AddWals(edit.GetWalAdditions());
- }
- Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
- assert(edit.IsWalDeletion());
- return version_set_->wals_.DeleteWalsBefore(
- edit.GetWalDeletion().GetLogNumber());
- }
- Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- bool do_not_open_cf = false;
- bool cf_in_builders = false;
- CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
- assert(cfd != nullptr);
- *cfd = nullptr;
- Status s;
- if (!do_not_open_cf) {
- if (!cf_in_builders) {
- s = Status::Corruption(
- "MANIFEST record referencing unknown column family");
- }
- ColumnFamilyData* tmp_cfd = nullptr;
- if (s.ok()) {
- tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
- edit.GetColumnFamily());
- assert(tmp_cfd != nullptr);
- // It's important to handle file boundaries before `MaybeCreateVersion`
- // because `VersionEditHandlerPointInTime::MaybeCreateVersion` does
- // `FileMetaData` verification that involves the file boundaries.
- // All `VersionEditHandlerBase` subclasses that need to deal with
- // `FileMetaData` for new files are also subclasses of
- // `VersionEditHandler`, so it's sufficient to do the file boundaries
- // handling in this method.
- s = MaybeHandleFileBoundariesForNewFiles(edit, tmp_cfd);
- if (!s.ok()) {
- return s;
- }
- s = MaybeCreateVersionBeforeApplyEdit(edit, tmp_cfd,
- /*force_create_version=*/false);
- }
- *cfd = tmp_cfd;
- }
- return s;
- }
- void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
- bool* do_not_open_cf,
- bool* cf_in_builders) const {
- assert(do_not_open_cf != nullptr);
- assert(cf_in_builders != nullptr);
- // Not found means that user didn't supply that column
- // family option AND we encountered column family add
- // record. Once we encounter column family drop record,
- // we will delete the column family from
- // do_not_open_column_families_.
- uint32_t cf_id = edit.GetColumnFamily();
- bool in_do_not_open = do_not_open_column_families_.find(cf_id) !=
- do_not_open_column_families_.end();
- // in builders means that user supplied that column family
- // option AND that we encountered column family add record
- bool in_builders = builders_.find(cf_id) != builders_.end();
- // They cannot both be true
- assert(!(in_do_not_open && in_builders));
- *do_not_open_cf = in_do_not_open;
- *cf_in_builders = in_builders;
- }
- void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
- Status* s) {
- assert(s != nullptr);
- if (!s->ok()) {
- // Do nothing here.
- } else if (!version_edit_params_.HasLogNumber() ||
- !version_edit_params_.HasNextFile() ||
- !version_edit_params_.HasLastSequence()) {
- std::string msg("no ");
- if (!version_edit_params_.HasLogNumber()) {
- msg.append("log_file_number, ");
- }
- if (!version_edit_params_.HasNextFile()) {
- msg.append("next_file_number, ");
- }
- if (!version_edit_params_.HasLastSequence()) {
- msg.append("last_sequence, ");
- }
- msg = msg.substr(0, msg.size() - 2);
- msg.append(" entry in MANIFEST");
- *s = Status::Corruption(msg);
- }
- // There were some column families in the MANIFEST that weren't specified
- // in the argument. This is OK in read_only mode
- if (s->ok() && MustOpenAllColumnFamilies() &&
- !do_not_open_column_families_.empty()) {
- std::string msg;
- for (const auto& cf : do_not_open_column_families_) {
- msg.append(", ");
- msg.append(cf.second);
- }
- msg = msg.substr(2);
- *s = Status::InvalidArgument("Column families not opened: " + msg);
- }
- if (s->ok()) {
- version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
- version_edit_params_.GetMaxColumnFamily());
- version_set_->MarkMinLogNumberToKeep(
- version_edit_params_.GetMinLogNumberToKeep());
- version_set_->MarkFileNumberUsed(version_edit_params_.GetPrevLogNumber());
- version_set_->MarkFileNumberUsed(version_edit_params_.GetLogNumber());
- for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
- if (cfd->IsDropped()) {
- continue;
- }
- auto builder_iter = builders_.find(cfd->GetID());
- assert(builder_iter != builders_.end());
- auto* builder = builder_iter->second->version_builder();
- if (!builder->CheckConsistencyForNumLevels()) {
- *s = Status::InvalidArgument(
- "db has more levels than options.num_levels");
- break;
- }
- }
- }
- if (s->ok()) {
- for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
- if (cfd->IsDropped()) {
- continue;
- }
- if (version_set_->unchanging()) {
- cfd->table_cache()->SetTablesAreImmortal();
- }
- *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
- /*is_initial_load=*/true);
- if (!s->ok()) {
- // If s is IOError::PathNotFound, then we mark the db as corrupted.
- if (s->IsPathNotFound()) {
- *s = Status::Corruption("Corruption: " + s->ToString());
- }
- break;
- }
- }
- }
- if (s->ok()) {
- for (auto* cfd : *(version_set_->column_family_set_)) {
- if (cfd->IsDropped()) {
- continue;
- }
- assert(cfd->initialized());
- VersionEdit edit;
- *s = MaybeCreateVersionBeforeApplyEdit(edit, cfd,
- /*force_create_version=*/true);
- if (!s->ok()) {
- break;
- }
- }
- }
- if (s->ok()) {
- version_set_->manifest_file_size_ = reader.GetReadOffset();
- assert(version_set_->manifest_file_size_ > 0);
- version_set_->next_file_number_.store(version_edit_params_.GetNextFile() +
- 1);
- SequenceNumber last_seq = version_edit_params_.GetLastSequence();
- assert(last_seq != kMaxSequenceNumber);
- if (last_seq != kMaxSequenceNumber &&
- last_seq > version_set_->last_allocated_sequence_.load()) {
- version_set_->last_allocated_sequence_.store(last_seq);
- }
- if (last_seq != kMaxSequenceNumber &&
- last_seq > version_set_->last_published_sequence_.load()) {
- version_set_->last_published_sequence_.store(last_seq);
- }
- if (last_seq != kMaxSequenceNumber &&
- last_seq > version_set_->last_sequence_.load()) {
- version_set_->last_sequence_.store(last_seq);
- }
- if (last_seq != kMaxSequenceNumber &&
- last_seq > version_set_->descriptor_last_sequence_) {
- // This is the maximum last sequence of all `VersionEdit`s iterated. It
- // may be greater than the maximum `largest_seqno` of all files in case
- // the newest data referred to by the MANIFEST has been dropped or had its
- // sequence number zeroed through compaction.
- version_set_->descriptor_last_sequence_ = last_seq;
- }
- version_set_->prev_log_number_ = version_edit_params_.GetPrevLogNumber();
- }
- }
- ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
- const ColumnFamilyOptions& cf_options, const VersionEdit& edit) {
- uint32_t cf_id = edit.GetColumnFamily();
- ColumnFamilyData* cfd = version_set_->CreateColumnFamily(
- cf_options, read_options_, &edit, read_only_);
- assert(cfd != nullptr);
- cfd->set_initialized();
- assert(builders_.find(cf_id) == builders_.end());
- builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(
- cfd, this, track_found_and_missing_files_,
- allow_incomplete_valid_version_)));
- return cfd;
- }
- ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
- const VersionEdit& edit) {
- uint32_t cf_id = edit.GetColumnFamily();
- auto builder_iter = builders_.find(cf_id);
- assert(builder_iter != builders_.end());
- builders_.erase(builder_iter);
- ColumnFamilyData* ret =
- version_set_->GetColumnFamilySet()->GetColumnFamily(cf_id);
- assert(ret != nullptr);
- ret->SetDropped();
- ret->UnrefAndTryDelete();
- ret = nullptr;
- return ret;
- }
- Status VersionEditHandler::MaybeCreateVersionBeforeApplyEdit(
- const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
- assert(cfd->initialized());
- Status s;
- auto builder_iter = builders_.find(cfd->GetID());
- assert(builder_iter != builders_.end());
- auto* builder = builder_iter->second->version_builder();
- if (force_create_version) {
- auto* v = new Version(cfd, version_set_, version_set_->file_options_,
- cfd->GetLatestMutableCFOptions(), io_tracer_,
- version_set_->current_version_number_++,
- epoch_number_requirement_);
- s = builder->SaveTo(v->storage_info());
- if (s.ok()) {
- // Install new version
- v->PrepareAppend(
- read_options_,
- !(version_set_->db_options_->skip_stats_update_on_db_open));
- version_set_->AppendVersion(cfd, v);
- } else {
- delete v;
- }
- }
- s = builder->Apply(&edit);
- return s;
- }
- Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
- bool prefetch_index_and_filter_in_cache,
- bool is_initial_load) {
- bool skip_load_table_files = skip_load_table_files_;
- TEST_SYNC_POINT_CALLBACK(
- "VersionEditHandler::LoadTables:skip_load_table_files",
- &skip_load_table_files);
- if (skip_load_table_files) {
- return Status::OK();
- }
- assert(cfd != nullptr);
- assert(!cfd->IsDropped());
- auto builder_iter = builders_.find(cfd->GetID());
- assert(builder_iter != builders_.end());
- assert(builder_iter->second != nullptr);
- VersionBuilder* builder = builder_iter->second->version_builder();
- assert(builder);
- const auto& moptions = cfd->GetLatestMutableCFOptions();
- Status s = builder->LoadTableHandlers(
- cfd->internal_stats(),
- version_set_->db_options_->max_file_opening_threads,
- prefetch_index_and_filter_in_cache, is_initial_load, moptions,
- MaxFileSizeForL0MetaPin(moptions), read_options_);
- if ((s.IsPathNotFound() || s.IsCorruption()) && no_error_if_files_missing_) {
- s = Status::OK();
- }
- if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
- s = Status::OK();
- }
- return s;
- }
- Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
- const VersionEdit& edit) {
- Status s;
- if (edit.HasDbId()) {
- version_set_->db_id_ = edit.GetDbId();
- version_edit_params_.SetDBId(edit.GetDbId());
- }
- if (cfd != nullptr) {
- if (edit.HasLogNumber()) {
- if (cfd->GetLogNumber() > edit.GetLogNumber()) {
- ROCKS_LOG_WARN(
- version_set_->db_options()->info_log,
- "MANIFEST corruption detected, but ignored - Log numbers in "
- "records NOT monotonically increasing");
- } else {
- cfd->SetLogNumber(edit.GetLogNumber());
- version_edit_params_.SetLogNumber(edit.GetLogNumber());
- }
- }
- if (edit.HasComparatorName()) {
- bool mark_sst_files_has_no_udt = false;
- // If `persist_user_defined_timestamps` flag is recorded in manifest, it
- // is guaranteed to be in the same VersionEdit as comparator. Otherwise,
- // it's not recorded and it should have default value true.
- s = ValidateUserDefinedTimestampsOptions(
- cfd->user_comparator(), edit.GetComparatorName(),
- cfd->ioptions().persist_user_defined_timestamps,
- edit.GetPersistUserDefinedTimestamps(), &mark_sst_files_has_no_udt);
- if (!s.ok() && cf_to_cmp_names_) {
- cf_to_cmp_names_->emplace(cfd->GetID(), edit.GetComparatorName());
- }
- if (mark_sst_files_has_no_udt) {
- cfds_to_mark_no_udt_.insert(cfd->GetID());
- }
- }
- if (edit.HasFullHistoryTsLow()) {
- const std::string& new_ts = edit.GetFullHistoryTsLow();
- cfd->SetFullHistoryTsLow(new_ts);
- }
- }
- if (s.ok()) {
- if (edit.HasPrevLogNumber()) {
- version_edit_params_.SetPrevLogNumber(edit.GetPrevLogNumber());
- }
- if (edit.HasNextFile()) {
- version_edit_params_.SetNextFile(edit.GetNextFile());
- }
- if (edit.HasMaxColumnFamily()) {
- version_edit_params_.SetMaxColumnFamily(edit.GetMaxColumnFamily());
- }
- if (edit.HasMinLogNumberToKeep()) {
- version_edit_params_.SetMinLogNumberToKeep(
- std::max(version_edit_params_.GetMinLogNumberToKeep(),
- edit.GetMinLogNumberToKeep()));
- }
- if (edit.HasLastSequence()) {
- // `VersionEdit::last_sequence_`s are assumed to be non-decreasing. This
- // is legacy behavior that cannot change without breaking downgrade
- // compatibility.
- assert(!version_edit_params_.HasLastSequence() ||
- version_edit_params_.GetLastSequence() <= edit.GetLastSequence());
- version_edit_params_.SetLastSequence(edit.GetLastSequence());
- }
- if (!version_edit_params_.HasPrevLogNumber()) {
- version_edit_params_.SetPrevLogNumber(0);
- }
- }
- return s;
- }
- Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles(
- VersionEdit& edit, const ColumnFamilyData* cfd) {
- if (edit.GetNewFiles().empty()) {
- return Status::OK();
- }
- auto ucmp = cfd->user_comparator();
- assert(ucmp);
- size_t ts_sz = ucmp->timestamp_size();
- if (ts_sz == 0) {
- return Status::OK();
- }
- VersionEdit::NewFiles& new_files = edit.GetMutableNewFiles();
- assert(!new_files.empty());
- // If true, enabling user-defined timestamp is detected for this column
- // family. All its existing SST files need to have the file boundaries handled
- // and their `persist_user_defined_timestamps` flag set to false regardless of
- // its existing value.
- bool mark_existing_ssts_with_no_udt =
- cfds_to_mark_no_udt_.find(cfd->GetID()) != cfds_to_mark_no_udt_.end();
- bool file_boundaries_need_handling = false;
- for (auto& new_file : new_files) {
- FileMetaData& meta = new_file.second;
- if (meta.user_defined_timestamps_persisted &&
- !mark_existing_ssts_with_no_udt) {
- // `FileMetaData.user_defined_timestamps_persisted` field is the value of
- // the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps`
- // at the time when the SST file was created. As a result, all added SST
- // files in one `VersionEdit` should have the same value for it.
- if (file_boundaries_need_handling) {
- return Status::Corruption(
- "New files in one VersionEdit has different "
- "user_defined_timestamps_persisted value.");
- }
- break;
- }
- file_boundaries_need_handling = true;
- assert(!meta.user_defined_timestamps_persisted ||
- mark_existing_ssts_with_no_udt);
- if (mark_existing_ssts_with_no_udt) {
- meta.user_defined_timestamps_persisted = false;
- }
- std::string smallest_buf;
- std::string largest_buf;
- Slice largest_slice = meta.largest.Encode();
- PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(),
- ts_sz);
- auto largest_footer = ExtractInternalKeyFooter(largest_slice);
- if (largest_footer == kRangeTombstoneSentinel) {
- PadInternalKeyWithMaxTimestamp(&largest_buf, largest_slice, ts_sz);
- } else {
- PadInternalKeyWithMinTimestamp(&largest_buf, largest_slice, ts_sz);
- }
- meta.smallest.DecodeFrom(smallest_buf);
- meta.largest.DecodeFrom(largest_buf);
- }
- return Status::OK();
- }
- VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
- bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
- VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
- const ReadOptions& read_options, bool allow_incomplete_valid_version,
- EpochNumberRequirement epoch_number_requirement)
- : VersionEditHandler(read_only, column_families, version_set,
- /*track_found_and_missing_files=*/true,
- /*no_error_if_files_missing=*/true, io_tracer,
- read_options, allow_incomplete_valid_version,
- epoch_number_requirement) {}
- VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
- for (const auto& cfid_and_version : atomic_update_versions_) {
- delete cfid_and_version.second;
- }
- for (const auto& elem : versions_) {
- delete elem.second;
- }
- versions_.clear();
- }
- Status VersionEditHandlerPointInTime::OnAtomicGroupReplayBegin() {
- if (in_atomic_group_) {
- return Status::Corruption("unexpected AtomicGroup start");
- }
- // The AtomicGroup that is about to begin may block column families in a valid
- // state from saving any more updates. So we should save any valid states
- // before proceeding.
- for (const auto& cfid_and_builder : builders_) {
- ColumnFamilyData* cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
- cfid_and_builder.first);
- assert(!cfd->IsDropped());
- assert(cfd->initialized());
- VersionEdit edit;
- Status s = MaybeCreateVersionBeforeApplyEdit(
- edit, cfd, true /* force_create_version */);
- if (!s.ok()) {
- return s;
- }
- }
- // An old AtomicGroup is incomplete. Throw away the versions that failed to
- // complete it. They must not be used for completing the upcoming
- // AtomicGroup since they are too old.
- for (auto& cfid_and_version : atomic_update_versions_) {
- delete cfid_and_version.second;
- }
- in_atomic_group_ = true;
- // We lazily assume the column families that exist at this point are all
- // involved in the AtomicGroup. Overestimating the scope of the AtomicGroup
- // will sometimes cause less data to be recovered, which is fine for
- // best-effort recovery.
- atomic_update_versions_.clear();
- for (const auto& cfid_and_builder : builders_) {
- atomic_update_versions_[cfid_and_builder.first] = nullptr;
- }
- atomic_update_versions_missing_ = atomic_update_versions_.size();
- return Status::OK();
- }
- Status VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd() {
- if (!in_atomic_group_) {
- return Status::Corruption("unexpected AtomicGroup end");
- }
- in_atomic_group_ = false;
- // The AtomicGroup must not have changed the column families. We don't support
- // CF adds or drops in an AtomicGroup.
- if (builders_.size() != atomic_update_versions_.size()) {
- return Status::Corruption("unexpected CF change in AtomicGroup");
- }
- for (const auto& cfid_and_builder : builders_) {
- if (atomic_update_versions_.find(cfid_and_builder.first) ==
- atomic_update_versions_.end()) {
- return Status::Corruption("unexpected CF add in AtomicGroup");
- }
- }
- for (const auto& cfid_and_version : atomic_update_versions_) {
- if (builders_.find(cfid_and_version.first) == builders_.end()) {
- return Status::Corruption("unexpected CF drop in AtomicGroup");
- }
- }
- return Status::OK();
- }
- void VersionEditHandlerPointInTime::CheckIterationResult(
- const log::Reader& reader, Status* s) {
- VersionEditHandler::CheckIterationResult(reader, s);
- assert(s != nullptr);
- if (s->ok()) {
- for (auto* cfd : *(version_set_->column_family_set_)) {
- if (cfd->IsDropped()) {
- continue;
- }
- assert(cfd->initialized());
- auto v_iter = versions_.find(cfd->GetID());
- auto builder_iter = builders_.find(cfd->GetID());
- if (v_iter != versions_.end()) {
- assert(v_iter->second != nullptr);
- assert(builder_iter != builders_.end());
- version_set_->AppendVersion(cfd, v_iter->second);
- versions_.erase(v_iter);
- // Let's clear found_files, since any files in that are part of the
- // installed Version. Any files that got obsoleted would have already
- // been moved to intermediate_files_
- builder_iter->second->version_builder()->ClearFoundFiles();
- }
- }
- } else {
- for (const auto& elem : versions_) {
- delete elem.second;
- }
- versions_.clear();
- }
- }
- ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
- const VersionEdit& edit) {
- ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
- uint32_t cfid = edit.GetColumnFamily();
- if (AtomicUpdateVersionsContains(cfid)) {
- AtomicUpdateVersionsDropCf(cfid);
- if (AtomicUpdateVersionsCompleted()) {
- AtomicUpdateVersionsApply();
- }
- }
- auto v_iter = versions_.find(cfid);
- if (v_iter != versions_.end()) {
- delete v_iter->second;
- versions_.erase(v_iter);
- }
- return cfd;
- }
- Status VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit(
- const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
- TEST_SYNC_POINT(
- "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
- "Begin1");
- TEST_SYNC_POINT(
- "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
- "Begin2");
- assert(cfd != nullptr);
- if (!force_create_version) {
- assert(edit.GetColumnFamily() == cfd->GetID());
- }
- bool missing_info = !version_edit_params_.HasLogNumber() ||
- !version_edit_params_.HasNextFile() ||
- !version_edit_params_.HasLastSequence();
- Status s;
- auto builder_iter = builders_.find(cfd->GetID());
- assert(builder_iter != builders_.end());
- VersionBuilder* builder = builder_iter->second->version_builder();
- const bool valid_pit_before_edit = builder->ValidVersionAvailable();
- builder->CreateOrReplaceSavePoint();
- s = builder->Apply(&edit);
- const bool valid_pit_after_edit = builder->ValidVersionAvailable();
- // A new version will be created if:
- // 1) no error has occurred so far, and
- // 2) log_number_, next_file_number_ and last_sequence_ are known, and
- // 3) not in an AtomicGroup
- // 4) any of the following:
- // a) a valid Version is available before applying the edit
- // and a valid Version is not available after the edit.
- // b) a valid Version is available after the edit and the
- // caller explicitly request that a new version be created.
- if (s.ok() && !missing_info && !in_atomic_group_ &&
- ((!valid_pit_after_edit && valid_pit_before_edit) ||
- (valid_pit_after_edit && force_create_version))) {
- const auto& mopts = cfd->GetLatestMutableCFOptions();
- auto* version = new Version(
- cfd, version_set_, version_set_->file_options_, mopts, io_tracer_,
- version_set_->current_version_number_++, epoch_number_requirement_);
- s = builder->LoadSavePointTableHandlers(
- cfd->internal_stats(),
- version_set_->db_options_->max_file_opening_threads, false, true, mopts,
- MaxFileSizeForL0MetaPin(mopts), read_options_);
- if (!s.ok()) {
- delete version;
- if (s.IsCorruption()) {
- s = Status::OK();
- }
- return s;
- }
- s = builder->SaveSavePointTo(version->storage_info());
- if (s.ok()) {
- if (AtomicUpdateVersionsContains(cfd->GetID())) {
- AtomicUpdateVersionsPut(version);
- if (AtomicUpdateVersionsCompleted()) {
- AtomicUpdateVersionsApply();
- }
- } else {
- version->PrepareAppend(
- read_options_,
- !version_set_->db_options_->skip_stats_update_on_db_open);
- auto v_iter = versions_.find(cfd->GetID());
- if (v_iter != versions_.end()) {
- delete v_iter->second;
- v_iter->second = version;
- } else {
- versions_.emplace(cfd->GetID(), version);
- }
- }
- } else {
- delete version;
- }
- }
- builder->ClearSavePoint();
- return s;
- }
- Status VersionEditHandlerPointInTime::VerifyFile(ColumnFamilyData* cfd,
- const std::string& fpath,
- int level,
- const FileMetaData& fmeta) {
- return version_set_->VerifyFileMetadata(read_options_, cfd, fpath, level,
- fmeta);
- }
- Status VersionEditHandlerPointInTime::VerifyBlobFile(
- ColumnFamilyData* cfd, uint64_t blob_file_num,
- const BlobFileAddition& blob_addition) {
- BlobSource* blob_source = cfd->blob_source();
- assert(blob_source);
- CacheHandleGuard<BlobFileReader> blob_file_reader;
- Status s = blob_source->GetBlobFileReader(read_options_, blob_file_num,
- &blob_file_reader);
- if (!s.ok()) {
- return s;
- }
- // TODO: verify checksum
- (void)blob_addition;
- return s;
- }
- Status VersionEditHandlerPointInTime::LoadTables(
- ColumnFamilyData* /*cfd*/, bool /*prefetch_index_and_filter_in_cache*/,
- bool /*is_initial_load*/) {
- return Status::OK();
- }
- bool VersionEditHandlerPointInTime::HasMissingFiles() const {
- for (const auto& builder : builders_) {
- if (builder.second->version_builder()->HasMissingFiles()) {
- return true;
- }
- }
- return false;
- }
- bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() {
- return atomic_update_versions_missing_ == 0;
- }
- bool VersionEditHandlerPointInTime::AtomicUpdateVersionsContains(
- uint32_t cfid) {
- return atomic_update_versions_.find(cfid) != atomic_update_versions_.end();
- }
- void VersionEditHandlerPointInTime::AtomicUpdateVersionsDropCf(uint32_t cfid) {
- assert(!AtomicUpdateVersionsCompleted());
- auto atomic_update_versions_iter = atomic_update_versions_.find(cfid);
- assert(atomic_update_versions_iter != atomic_update_versions_.end());
- if (atomic_update_versions_iter->second == nullptr) {
- atomic_update_versions_missing_--;
- } else {
- delete atomic_update_versions_iter->second;
- }
- atomic_update_versions_.erase(atomic_update_versions_iter);
- }
- void VersionEditHandlerPointInTime::AtomicUpdateVersionsPut(Version* version) {
- assert(!AtomicUpdateVersionsCompleted());
- auto atomic_update_versions_iter =
- atomic_update_versions_.find(version->cfd()->GetID());
- assert(atomic_update_versions_iter != atomic_update_versions_.end());
- if (atomic_update_versions_iter->second == nullptr) {
- atomic_update_versions_missing_--;
- } else {
- delete atomic_update_versions_iter->second;
- }
- atomic_update_versions_iter->second = version;
- }
- void VersionEditHandlerPointInTime::AtomicUpdateVersionsApply() {
- assert(AtomicUpdateVersionsCompleted());
- for (const auto& cfid_and_version : atomic_update_versions_) {
- uint32_t cfid = cfid_and_version.first;
- Version* version = cfid_and_version.second;
- assert(version != nullptr);
- version->PrepareAppend(
- read_options_,
- !version_set_->db_options_->skip_stats_update_on_db_open);
- auto versions_iter = versions_.find(cfid);
- if (versions_iter != versions_.end()) {
- delete versions_iter->second;
- versions_iter->second = version;
- } else {
- versions_.emplace(cfid, version);
- }
- }
- atomic_update_versions_.clear();
- }
- Status ManifestTailer::Initialize() {
- if (Mode::kRecovery == mode_) {
- return VersionEditHandler::Initialize();
- }
- assert(Mode::kCatchUp == mode_);
- Status s;
- if (!initialized_) {
- ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
- assert(cfd_set);
- ColumnFamilyData* default_cfd = cfd_set->GetDefault();
- assert(default_cfd);
- auto builder_iter = builders_.find(default_cfd->GetID());
- assert(builder_iter != builders_.end());
- Version* dummy_version = default_cfd->dummy_versions();
- assert(dummy_version);
- Version* base_version = dummy_version->Next();
- assert(base_version);
- base_version->Ref();
- VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
- default_cfd, base_version, this, track_found_and_missing_files_,
- allow_incomplete_valid_version_));
- builder_iter->second = std::move(new_builder);
- initialized_ = true;
- }
- return s;
- }
- Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd);
- if (s.ok()) {
- assert(cfd);
- if (*cfd) {
- cfds_changed_.insert(*cfd);
- }
- }
- return s;
- }
- Status ManifestTailer::OnColumnFamilyAdd(VersionEdit& edit,
- ColumnFamilyData** cfd) {
- if (Mode::kRecovery == mode_) {
- return VersionEditHandler::OnColumnFamilyAdd(edit, cfd);
- }
- assert(Mode::kCatchUp == mode_);
- ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
- assert(cfd_set);
- ColumnFamilyData* tmp_cfd = cfd_set->GetColumnFamily(edit.GetColumnFamily());
- assert(cfd);
- *cfd = tmp_cfd;
- if (!tmp_cfd) {
- // For now, ignore new column families created after Recover() succeeds.
- return Status::OK();
- }
- auto builder_iter = builders_.find(edit.GetColumnFamily());
- assert(builder_iter != builders_.end());
- Version* dummy_version = tmp_cfd->dummy_versions();
- assert(dummy_version);
- Version* base_version = dummy_version->Next();
- assert(base_version);
- base_version->Ref();
- VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
- tmp_cfd, base_version, this, track_found_and_missing_files_));
- builder_iter->second = std::move(new_builder);
- #ifndef NDEBUG
- auto version_iter = versions_.find(edit.GetColumnFamily());
- assert(version_iter == versions_.end());
- #endif // !NDEBUG
- return Status::OK();
- }
- void ManifestTailer::CheckIterationResult(const log::Reader& reader,
- Status* s) {
- VersionEditHandlerPointInTime::CheckIterationResult(reader, s);
- assert(s);
- if (s->ok()) {
- if (Mode::kRecovery == mode_) {
- mode_ = Mode::kCatchUp;
- } else {
- assert(Mode::kCatchUp == mode_);
- }
- }
- }
- std::vector<std::string> ManifestTailer::GetAndClearIntermediateFiles() {
- std::vector<std::string> res;
- for (const auto& builder : builders_) {
- auto files =
- builder.second->version_builder()->GetAndClearIntermediateFiles();
- res.insert(res.end(), std::make_move_iterator(files.begin()),
- std::make_move_iterator(files.end()));
- files.erase(files.begin(), files.end());
- }
- return res;
- }
- Status ManifestTailer::VerifyFile(ColumnFamilyData* cfd,
- const std::string& fpath, int level,
- const FileMetaData& fmeta) {
- Status s =
- VersionEditHandlerPointInTime::VerifyFile(cfd, fpath, level, fmeta);
- // TODO: Open file or create hard link to prevent the file from being
- // deleted.
- return s;
- }
- void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
- Status* s) {
- VersionEditHandler::CheckIterationResult(reader, s);
- if (!s->ok()) {
- fprintf(stdout, "%s\n", s->ToString().c_str());
- return;
- }
- assert(cf_to_cmp_names_);
- for (auto* cfd : *(version_set_->column_family_set_)) {
- fprintf(stdout,
- "--------------- Column family \"%s\" (ID %" PRIu32
- ") --------------\n",
- cfd->GetName().c_str(), cfd->GetID());
- fprintf(stdout, "log number: %" PRIu64 "\n", cfd->GetLogNumber());
- auto it = cf_to_cmp_names_->find(cfd->GetID());
- if (it != cf_to_cmp_names_->end()) {
- fprintf(stdout,
- "comparator: <%s>, but the comparator object is not available.\n",
- it->second.c_str());
- } else {
- fprintf(stdout, "comparator: %s\n", cfd->user_comparator()->Name());
- }
- assert(cfd->current());
- // Print out DebugStrings. Can include non-terminating null characters.
- fwrite(cfd->current()->DebugString(hex_).data(), sizeof(char),
- cfd->current()->DebugString(hex_).size(), stdout);
- fprintf(stdout,
- "By default, manifest file dump prints LSM trees as if %d levels "
- "were configured, "
- "which is not necessarily true for the column family (CF) this "
- "manifest is associated with. "
- "Please consult other DB files, such as the OPTIONS file, to "
- "confirm.\n",
- cfd->ioptions().num_levels);
- }
- fprintf(stdout,
- "next_file_number %" PRIu64 " last_sequence %" PRIu64
- " prev_log_number %" PRIu64 " max_column_family %" PRIu32
- " min_log_number_to_keep %" PRIu64 "\n",
- version_set_->current_next_file_number(),
- version_set_->LastSequence(), version_set_->prev_log_number(),
- version_set_->column_family_set_->GetMaxColumnFamily(),
- version_set_->min_log_number_to_keep());
- }
- } // namespace ROCKSDB_NAMESPACE
|