version_edit_handler.cc 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_edit_handler.h"
  10. #include <cinttypes>
  11. #include <sstream>
  12. #include "db/blob/blob_file_reader.h"
  13. #include "db/blob/blob_source.h"
  14. #include "db/version_edit.h"
  15. #include "logging/logging.h"
  16. #include "monitoring/persistent_stats_history.h"
  17. #include "util/udt_util.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. void VersionEditHandlerBase::Iterate(log::Reader& reader,
  20. Status* log_read_status) {
  21. Slice record;
  22. std::string scratch;
  23. assert(log_read_status);
  24. assert(log_read_status->ok());
  25. [[maybe_unused]] size_t recovered_edits = 0;
  26. Status s = Initialize();
  27. while (reader.LastRecordEnd() < max_manifest_read_size_ && s.ok() &&
  28. reader.ReadRecord(&record, &scratch) && log_read_status->ok()) {
  29. VersionEdit edit;
  30. s = edit.DecodeFrom(record);
  31. if (s.ok()) {
  32. s = read_buffer_.AddEdit(&edit);
  33. }
  34. if (s.ok()) {
  35. ColumnFamilyData* cfd = nullptr;
  36. if (edit.IsInAtomicGroup()) {
  37. if (read_buffer_.IsFull()) {
  38. s = OnAtomicGroupReplayBegin();
  39. for (size_t i = 0; s.ok() && i < read_buffer_.replay_buffer().size();
  40. i++) {
  41. auto& e = read_buffer_.replay_buffer()[i];
  42. s = ApplyVersionEdit(e, &cfd);
  43. if (s.ok()) {
  44. recovered_edits++;
  45. }
  46. }
  47. if (s.ok()) {
  48. read_buffer_.Clear();
  49. s = OnAtomicGroupReplayEnd();
  50. }
  51. }
  52. } else {
  53. s = ApplyVersionEdit(edit, &cfd);
  54. if (s.ok()) {
  55. recovered_edits++;
  56. }
  57. }
  58. }
  59. }
  60. if (s.ok() && !log_read_status->ok()) {
  61. s = *log_read_status;
  62. }
  63. CheckIterationResult(reader, &s);
  64. if (!s.ok()) {
  65. if (s.IsCorruption()) {
  66. // when we find a Corruption error, something is
  67. // wrong with the underlying file. in this case we
  68. // want to report the filename, so in here we append
  69. // the filename to the Corruption message
  70. assert(reader.file());
  71. // build a new error message
  72. std::stringstream message;
  73. // append previous dynamic state message
  74. const char* state = s.getState();
  75. if (state != nullptr) {
  76. message << state;
  77. message << ' ';
  78. }
  79. // append the filename to the corruption message
  80. message << " The file " << reader.file()->file_name()
  81. << " may be corrupted.";
  82. // overwrite the status with the extended status
  83. s = Status(s.code(), s.subcode(), s.severity(), message.str());
  84. }
  85. status_ = s;
  86. }
  87. TEST_SYNC_POINT_CALLBACK("VersionEditHandlerBase::Iterate:Finish",
  88. &recovered_edits);
  89. }
  90. Status ListColumnFamiliesHandler::ApplyVersionEdit(
  91. VersionEdit& edit, ColumnFamilyData** /*unused*/) {
  92. Status s;
  93. uint32_t cf_id = edit.GetColumnFamily();
  94. if (edit.IsColumnFamilyAdd()) {
  95. if (column_family_names_.find(cf_id) != column_family_names_.end()) {
  96. s = Status::Corruption("Manifest adding the same column family twice");
  97. } else {
  98. column_family_names_.insert({cf_id, edit.GetColumnFamilyName()});
  99. }
  100. } else if (edit.IsColumnFamilyDrop()) {
  101. if (column_family_names_.find(cf_id) == column_family_names_.end()) {
  102. s = Status::Corruption("Manifest - dropping non-existing column family");
  103. } else {
  104. column_family_names_.erase(cf_id);
  105. }
  106. }
  107. return s;
  108. }
  109. Status FileChecksumRetriever::FetchFileChecksumList(
  110. FileChecksumList& file_checksum_list) {
  111. Status s = Status::OK();
  112. for (const auto& [cf, file_checksums] : cf_file_checksums_) {
  113. [[maybe_unused]] const auto& _ = cf;
  114. for (const auto& [file_number, info] : file_checksums) {
  115. if (!(s = file_checksum_list.InsertOneFileChecksum(
  116. file_number, info.first, info.second))
  117. .ok()) {
  118. break;
  119. }
  120. }
  121. }
  122. return s;
  123. }
  124. Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
  125. ColumnFamilyData** /*unused*/) {
  126. uint32_t column_family_id = edit.GetColumnFamily();
  127. if (edit.IsColumnFamilyDrop()) {
  128. cf_file_checksums_.erase(column_family_id);
  129. }
  130. for (const auto& deleted_file : edit.GetDeletedFiles()) {
  131. if (cf_file_checksums_.find(column_family_id) == cf_file_checksums_.end()) {
  132. return Status::NotFound();
  133. }
  134. if (cf_file_checksums_[column_family_id].find(deleted_file.second) ==
  135. cf_file_checksums_[column_family_id].end()) {
  136. return Status::NotFound();
  137. }
  138. cf_file_checksums_[column_family_id].erase(deleted_file.second);
  139. }
  140. for (const auto& new_file : edit.GetNewFiles()) {
  141. cf_file_checksums_[column_family_id].emplace(
  142. new_file.second.fd.GetNumber(),
  143. std::make_pair(new_file.second.file_checksum,
  144. new_file.second.file_checksum_func_name));
  145. }
  146. for (const auto& new_blob_file : edit.GetBlobFileAdditions()) {
  147. std::string checksum_value = new_blob_file.GetChecksumValue();
  148. std::string checksum_method = new_blob_file.GetChecksumMethod();
  149. assert(checksum_value.empty() == checksum_method.empty());
  150. if (checksum_method.empty()) {
  151. checksum_value = kUnknownFileChecksum;
  152. checksum_method = kUnknownFileChecksumFuncName;
  153. }
  154. cf_file_checksums_[column_family_id].emplace(
  155. new_blob_file.GetBlobFileNumber(),
  156. std::make_pair(checksum_value, checksum_method));
  157. }
  158. return Status::OK();
  159. }
  160. VersionEditHandler::VersionEditHandler(
  161. bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
  162. VersionSet* version_set, bool track_found_and_missing_files,
  163. bool no_error_if_files_missing, const std::shared_ptr<IOTracer>& io_tracer,
  164. const ReadOptions& read_options, bool skip_load_table_files,
  165. bool allow_incomplete_valid_version,
  166. EpochNumberRequirement epoch_number_requirement)
  167. : VersionEditHandlerBase(read_options),
  168. read_only_(read_only),
  169. column_families_(std::move(column_families)),
  170. version_set_(version_set),
  171. track_found_and_missing_files_(track_found_and_missing_files),
  172. no_error_if_files_missing_(no_error_if_files_missing),
  173. io_tracer_(io_tracer),
  174. skip_load_table_files_(skip_load_table_files),
  175. initialized_(false),
  176. allow_incomplete_valid_version_(allow_incomplete_valid_version),
  177. epoch_number_requirement_(epoch_number_requirement) {
  178. assert(version_set_ != nullptr);
  179. }
  180. Status VersionEditHandler::Initialize() {
  181. Status s;
  182. if (!initialized_) {
  183. for (const auto& cf_desc : column_families_) {
  184. name_to_options_.emplace(cf_desc.name, cf_desc.options);
  185. }
  186. auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName);
  187. if (default_cf_iter == name_to_options_.end()) {
  188. s = Status::InvalidArgument("Default column family not specified");
  189. }
  190. if (s.ok()) {
  191. VersionEdit default_cf_edit;
  192. default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
  193. default_cf_edit.SetColumnFamily(0);
  194. ColumnFamilyData* cfd =
  195. CreateCfAndInit(default_cf_iter->second, default_cf_edit);
  196. assert(cfd != nullptr);
  197. #ifdef NDEBUG
  198. (void)cfd;
  199. #endif
  200. initialized_ = true;
  201. }
  202. }
  203. return s;
  204. }
  205. Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
  206. ColumnFamilyData** cfd) {
  207. Status s;
  208. if (edit.IsColumnFamilyAdd()) {
  209. s = OnColumnFamilyAdd(edit, cfd);
  210. } else if (edit.IsColumnFamilyDrop()) {
  211. s = OnColumnFamilyDrop(edit, cfd);
  212. } else if (edit.IsWalAddition()) {
  213. s = OnWalAddition(edit);
  214. } else if (edit.IsWalDeletion()) {
  215. s = OnWalDeletion(edit);
  216. } else {
  217. s = OnNonCfOperation(edit, cfd);
  218. }
  219. if (s.ok()) {
  220. assert(cfd != nullptr);
  221. s = ExtractInfoFromVersionEdit(*cfd, edit);
  222. }
  223. return s;
  224. }
  225. Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
  226. ColumnFamilyData** cfd) {
  227. bool do_not_open_cf = false;
  228. bool cf_in_builders = false;
  229. CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
  230. assert(cfd != nullptr);
  231. *cfd = nullptr;
  232. const std::string& cf_name = edit.GetColumnFamilyName();
  233. Status s;
  234. if (cf_in_builders || do_not_open_cf) {
  235. s = Status::Corruption("MANIFEST adding the same column family twice: " +
  236. cf_name);
  237. }
  238. if (s.ok()) {
  239. auto cf_options = name_to_options_.find(cf_name);
  240. // implicitly add persistent_stats column family without requiring user
  241. // to specify
  242. ColumnFamilyData* tmp_cfd = nullptr;
  243. bool is_persistent_stats_column_family =
  244. cf_name.compare(kPersistentStatsColumnFamilyName) == 0;
  245. if (cf_options == name_to_options_.end() &&
  246. !is_persistent_stats_column_family) {
  247. do_not_open_column_families_.emplace(edit.GetColumnFamily(), cf_name);
  248. } else {
  249. if (is_persistent_stats_column_family) {
  250. ColumnFamilyOptions cfo;
  251. OptimizeForPersistentStats(&cfo);
  252. tmp_cfd = CreateCfAndInit(cfo, edit);
  253. } else {
  254. tmp_cfd = CreateCfAndInit(cf_options->second, edit);
  255. }
  256. *cfd = tmp_cfd;
  257. }
  258. }
  259. return s;
  260. }
  261. Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
  262. ColumnFamilyData** cfd) {
  263. bool do_not_open_cf = false;
  264. bool cf_in_builders = false;
  265. CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
  266. assert(cfd != nullptr);
  267. *cfd = nullptr;
  268. ColumnFamilyData* tmp_cfd = nullptr;
  269. Status s;
  270. if (cf_in_builders) {
  271. tmp_cfd = DestroyCfAndCleanup(edit);
  272. } else if (do_not_open_cf) {
  273. do_not_open_column_families_.erase(edit.GetColumnFamily());
  274. } else {
  275. s = Status::Corruption("MANIFEST - dropping non-existing column family");
  276. }
  277. *cfd = tmp_cfd;
  278. return s;
  279. }
  280. Status VersionEditHandler::OnWalAddition(VersionEdit& edit) {
  281. assert(edit.IsWalAddition());
  282. return version_set_->wals_.AddWals(edit.GetWalAdditions());
  283. }
  284. Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
  285. assert(edit.IsWalDeletion());
  286. return version_set_->wals_.DeleteWalsBefore(
  287. edit.GetWalDeletion().GetLogNumber());
  288. }
  289. Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
  290. ColumnFamilyData** cfd) {
  291. bool do_not_open_cf = false;
  292. bool cf_in_builders = false;
  293. CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
  294. assert(cfd != nullptr);
  295. *cfd = nullptr;
  296. Status s;
  297. if (!do_not_open_cf) {
  298. if (!cf_in_builders) {
  299. s = Status::Corruption(
  300. "MANIFEST record referencing unknown column family");
  301. }
  302. ColumnFamilyData* tmp_cfd = nullptr;
  303. if (s.ok()) {
  304. tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
  305. edit.GetColumnFamily());
  306. assert(tmp_cfd != nullptr);
  307. // It's important to handle file boundaries before `MaybeCreateVersion`
  308. // because `VersionEditHandlerPointInTime::MaybeCreateVersion` does
  309. // `FileMetaData` verification that involves the file boundaries.
  310. // All `VersionEditHandlerBase` subclasses that need to deal with
  311. // `FileMetaData` for new files are also subclasses of
  312. // `VersionEditHandler`, so it's sufficient to do the file boundaries
  313. // handling in this method.
  314. s = MaybeHandleFileBoundariesForNewFiles(edit, tmp_cfd);
  315. if (!s.ok()) {
  316. return s;
  317. }
  318. s = MaybeCreateVersionBeforeApplyEdit(edit, tmp_cfd,
  319. /*force_create_version=*/false);
  320. }
  321. *cfd = tmp_cfd;
  322. }
  323. return s;
  324. }
  325. void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
  326. bool* do_not_open_cf,
  327. bool* cf_in_builders) const {
  328. assert(do_not_open_cf != nullptr);
  329. assert(cf_in_builders != nullptr);
  330. // Not found means that user didn't supply that column
  331. // family option AND we encountered column family add
  332. // record. Once we encounter column family drop record,
  333. // we will delete the column family from
  334. // do_not_open_column_families_.
  335. uint32_t cf_id = edit.GetColumnFamily();
  336. bool in_do_not_open = do_not_open_column_families_.find(cf_id) !=
  337. do_not_open_column_families_.end();
  338. // in builders means that user supplied that column family
  339. // option AND that we encountered column family add record
  340. bool in_builders = builders_.find(cf_id) != builders_.end();
  341. // They cannot both be true
  342. assert(!(in_do_not_open && in_builders));
  343. *do_not_open_cf = in_do_not_open;
  344. *cf_in_builders = in_builders;
  345. }
  346. void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
  347. Status* s) {
  348. assert(s != nullptr);
  349. if (!s->ok()) {
  350. // Do nothing here.
  351. } else if (!version_edit_params_.HasLogNumber() ||
  352. !version_edit_params_.HasNextFile() ||
  353. !version_edit_params_.HasLastSequence()) {
  354. std::string msg("no ");
  355. if (!version_edit_params_.HasLogNumber()) {
  356. msg.append("log_file_number, ");
  357. }
  358. if (!version_edit_params_.HasNextFile()) {
  359. msg.append("next_file_number, ");
  360. }
  361. if (!version_edit_params_.HasLastSequence()) {
  362. msg.append("last_sequence, ");
  363. }
  364. msg = msg.substr(0, msg.size() - 2);
  365. msg.append(" entry in MANIFEST");
  366. *s = Status::Corruption(msg);
  367. }
  368. // There were some column families in the MANIFEST that weren't specified
  369. // in the argument. This is OK in read_only mode
  370. if (s->ok() && MustOpenAllColumnFamilies() &&
  371. !do_not_open_column_families_.empty()) {
  372. std::string msg;
  373. for (const auto& cf : do_not_open_column_families_) {
  374. msg.append(", ");
  375. msg.append(cf.second);
  376. }
  377. msg = msg.substr(2);
  378. *s = Status::InvalidArgument("Column families not opened: " + msg);
  379. }
  380. if (s->ok()) {
  381. version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
  382. version_edit_params_.GetMaxColumnFamily());
  383. version_set_->MarkMinLogNumberToKeep(
  384. version_edit_params_.GetMinLogNumberToKeep());
  385. version_set_->MarkFileNumberUsed(version_edit_params_.GetPrevLogNumber());
  386. version_set_->MarkFileNumberUsed(version_edit_params_.GetLogNumber());
  387. for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
  388. if (cfd->IsDropped()) {
  389. continue;
  390. }
  391. auto builder_iter = builders_.find(cfd->GetID());
  392. assert(builder_iter != builders_.end());
  393. auto* builder = builder_iter->second->version_builder();
  394. if (!builder->CheckConsistencyForNumLevels()) {
  395. *s = Status::InvalidArgument(
  396. "db has more levels than options.num_levels");
  397. break;
  398. }
  399. }
  400. }
  401. if (s->ok()) {
  402. for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
  403. if (cfd->IsDropped()) {
  404. continue;
  405. }
  406. if (version_set_->unchanging()) {
  407. cfd->table_cache()->SetTablesAreImmortal();
  408. }
  409. *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
  410. /*is_initial_load=*/true);
  411. if (!s->ok()) {
  412. // If s is IOError::PathNotFound, then we mark the db as corrupted.
  413. if (s->IsPathNotFound()) {
  414. *s = Status::Corruption("Corruption: " + s->ToString());
  415. }
  416. break;
  417. }
  418. }
  419. }
  420. if (s->ok()) {
  421. for (auto* cfd : *(version_set_->column_family_set_)) {
  422. if (cfd->IsDropped()) {
  423. continue;
  424. }
  425. assert(cfd->initialized());
  426. VersionEdit edit;
  427. *s = MaybeCreateVersionBeforeApplyEdit(edit, cfd,
  428. /*force_create_version=*/true);
  429. if (!s->ok()) {
  430. break;
  431. }
  432. }
  433. }
  434. if (s->ok()) {
  435. version_set_->manifest_file_size_ = reader.GetReadOffset();
  436. assert(version_set_->manifest_file_size_ > 0);
  437. version_set_->next_file_number_.store(version_edit_params_.GetNextFile() +
  438. 1);
  439. SequenceNumber last_seq = version_edit_params_.GetLastSequence();
  440. assert(last_seq != kMaxSequenceNumber);
  441. if (last_seq != kMaxSequenceNumber &&
  442. last_seq > version_set_->last_allocated_sequence_.load()) {
  443. version_set_->last_allocated_sequence_.store(last_seq);
  444. }
  445. if (last_seq != kMaxSequenceNumber &&
  446. last_seq > version_set_->last_published_sequence_.load()) {
  447. version_set_->last_published_sequence_.store(last_seq);
  448. }
  449. if (last_seq != kMaxSequenceNumber &&
  450. last_seq > version_set_->last_sequence_.load()) {
  451. version_set_->last_sequence_.store(last_seq);
  452. }
  453. if (last_seq != kMaxSequenceNumber &&
  454. last_seq > version_set_->descriptor_last_sequence_) {
  455. // This is the maximum last sequence of all `VersionEdit`s iterated. It
  456. // may be greater than the maximum `largest_seqno` of all files in case
  457. // the newest data referred to by the MANIFEST has been dropped or had its
  458. // sequence number zeroed through compaction.
  459. version_set_->descriptor_last_sequence_ = last_seq;
  460. }
  461. version_set_->prev_log_number_ = version_edit_params_.GetPrevLogNumber();
  462. }
  463. }
  464. ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
  465. const ColumnFamilyOptions& cf_options, const VersionEdit& edit) {
  466. uint32_t cf_id = edit.GetColumnFamily();
  467. ColumnFamilyData* cfd = version_set_->CreateColumnFamily(
  468. cf_options, read_options_, &edit, read_only_);
  469. assert(cfd != nullptr);
  470. cfd->set_initialized();
  471. assert(builders_.find(cf_id) == builders_.end());
  472. builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(
  473. cfd, this, track_found_and_missing_files_,
  474. allow_incomplete_valid_version_)));
  475. return cfd;
  476. }
  477. ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
  478. const VersionEdit& edit) {
  479. uint32_t cf_id = edit.GetColumnFamily();
  480. auto builder_iter = builders_.find(cf_id);
  481. assert(builder_iter != builders_.end());
  482. builders_.erase(builder_iter);
  483. ColumnFamilyData* ret =
  484. version_set_->GetColumnFamilySet()->GetColumnFamily(cf_id);
  485. assert(ret != nullptr);
  486. ret->SetDropped();
  487. ret->UnrefAndTryDelete();
  488. ret = nullptr;
  489. return ret;
  490. }
  491. Status VersionEditHandler::MaybeCreateVersionBeforeApplyEdit(
  492. const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
  493. assert(cfd->initialized());
  494. Status s;
  495. auto builder_iter = builders_.find(cfd->GetID());
  496. assert(builder_iter != builders_.end());
  497. auto* builder = builder_iter->second->version_builder();
  498. if (force_create_version) {
  499. auto* v = new Version(cfd, version_set_, version_set_->file_options_,
  500. cfd->GetLatestMutableCFOptions(), io_tracer_,
  501. version_set_->current_version_number_++,
  502. epoch_number_requirement_);
  503. s = builder->SaveTo(v->storage_info());
  504. if (s.ok()) {
  505. // Install new version
  506. v->PrepareAppend(
  507. read_options_,
  508. !(version_set_->db_options_->skip_stats_update_on_db_open));
  509. version_set_->AppendVersion(cfd, v);
  510. } else {
  511. delete v;
  512. }
  513. }
  514. s = builder->Apply(&edit);
  515. return s;
  516. }
  517. Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
  518. bool prefetch_index_and_filter_in_cache,
  519. bool is_initial_load) {
  520. bool skip_load_table_files = skip_load_table_files_;
  521. TEST_SYNC_POINT_CALLBACK(
  522. "VersionEditHandler::LoadTables:skip_load_table_files",
  523. &skip_load_table_files);
  524. if (skip_load_table_files) {
  525. return Status::OK();
  526. }
  527. assert(cfd != nullptr);
  528. assert(!cfd->IsDropped());
  529. auto builder_iter = builders_.find(cfd->GetID());
  530. assert(builder_iter != builders_.end());
  531. assert(builder_iter->second != nullptr);
  532. VersionBuilder* builder = builder_iter->second->version_builder();
  533. assert(builder);
  534. const auto& moptions = cfd->GetLatestMutableCFOptions();
  535. Status s = builder->LoadTableHandlers(
  536. cfd->internal_stats(),
  537. version_set_->db_options_->max_file_opening_threads,
  538. prefetch_index_and_filter_in_cache, is_initial_load, moptions,
  539. MaxFileSizeForL0MetaPin(moptions), read_options_);
  540. if ((s.IsPathNotFound() || s.IsCorruption()) && no_error_if_files_missing_) {
  541. s = Status::OK();
  542. }
  543. if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
  544. s = Status::OK();
  545. }
  546. return s;
  547. }
  548. Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
  549. const VersionEdit& edit) {
  550. Status s;
  551. if (edit.HasDbId()) {
  552. version_set_->db_id_ = edit.GetDbId();
  553. version_edit_params_.SetDBId(edit.GetDbId());
  554. }
  555. if (cfd != nullptr) {
  556. if (edit.HasLogNumber()) {
  557. if (cfd->GetLogNumber() > edit.GetLogNumber()) {
  558. ROCKS_LOG_WARN(
  559. version_set_->db_options()->info_log,
  560. "MANIFEST corruption detected, but ignored - Log numbers in "
  561. "records NOT monotonically increasing");
  562. } else {
  563. cfd->SetLogNumber(edit.GetLogNumber());
  564. version_edit_params_.SetLogNumber(edit.GetLogNumber());
  565. }
  566. }
  567. if (edit.HasComparatorName()) {
  568. bool mark_sst_files_has_no_udt = false;
  569. // If `persist_user_defined_timestamps` flag is recorded in manifest, it
  570. // is guaranteed to be in the same VersionEdit as comparator. Otherwise,
  571. // it's not recorded and it should have default value true.
  572. s = ValidateUserDefinedTimestampsOptions(
  573. cfd->user_comparator(), edit.GetComparatorName(),
  574. cfd->ioptions().persist_user_defined_timestamps,
  575. edit.GetPersistUserDefinedTimestamps(), &mark_sst_files_has_no_udt);
  576. if (!s.ok() && cf_to_cmp_names_) {
  577. cf_to_cmp_names_->emplace(cfd->GetID(), edit.GetComparatorName());
  578. }
  579. if (mark_sst_files_has_no_udt) {
  580. cfds_to_mark_no_udt_.insert(cfd->GetID());
  581. }
  582. }
  583. if (edit.HasFullHistoryTsLow()) {
  584. const std::string& new_ts = edit.GetFullHistoryTsLow();
  585. cfd->SetFullHistoryTsLow(new_ts);
  586. }
  587. }
  588. if (s.ok()) {
  589. if (edit.HasPrevLogNumber()) {
  590. version_edit_params_.SetPrevLogNumber(edit.GetPrevLogNumber());
  591. }
  592. if (edit.HasNextFile()) {
  593. version_edit_params_.SetNextFile(edit.GetNextFile());
  594. }
  595. if (edit.HasMaxColumnFamily()) {
  596. version_edit_params_.SetMaxColumnFamily(edit.GetMaxColumnFamily());
  597. }
  598. if (edit.HasMinLogNumberToKeep()) {
  599. version_edit_params_.SetMinLogNumberToKeep(
  600. std::max(version_edit_params_.GetMinLogNumberToKeep(),
  601. edit.GetMinLogNumberToKeep()));
  602. }
  603. if (edit.HasLastSequence()) {
  604. // `VersionEdit::last_sequence_`s are assumed to be non-decreasing. This
  605. // is legacy behavior that cannot change without breaking downgrade
  606. // compatibility.
  607. assert(!version_edit_params_.HasLastSequence() ||
  608. version_edit_params_.GetLastSequence() <= edit.GetLastSequence());
  609. version_edit_params_.SetLastSequence(edit.GetLastSequence());
  610. }
  611. if (!version_edit_params_.HasPrevLogNumber()) {
  612. version_edit_params_.SetPrevLogNumber(0);
  613. }
  614. }
  615. return s;
  616. }
  617. Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles(
  618. VersionEdit& edit, const ColumnFamilyData* cfd) {
  619. if (edit.GetNewFiles().empty()) {
  620. return Status::OK();
  621. }
  622. auto ucmp = cfd->user_comparator();
  623. assert(ucmp);
  624. size_t ts_sz = ucmp->timestamp_size();
  625. if (ts_sz == 0) {
  626. return Status::OK();
  627. }
  628. VersionEdit::NewFiles& new_files = edit.GetMutableNewFiles();
  629. assert(!new_files.empty());
  630. // If true, enabling user-defined timestamp is detected for this column
  631. // family. All its existing SST files need to have the file boundaries handled
  632. // and their `persist_user_defined_timestamps` flag set to false regardless of
  633. // its existing value.
  634. bool mark_existing_ssts_with_no_udt =
  635. cfds_to_mark_no_udt_.find(cfd->GetID()) != cfds_to_mark_no_udt_.end();
  636. bool file_boundaries_need_handling = false;
  637. for (auto& new_file : new_files) {
  638. FileMetaData& meta = new_file.second;
  639. if (meta.user_defined_timestamps_persisted &&
  640. !mark_existing_ssts_with_no_udt) {
  641. // `FileMetaData.user_defined_timestamps_persisted` field is the value of
  642. // the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps`
  643. // at the time when the SST file was created. As a result, all added SST
  644. // files in one `VersionEdit` should have the same value for it.
  645. if (file_boundaries_need_handling) {
  646. return Status::Corruption(
  647. "New files in one VersionEdit has different "
  648. "user_defined_timestamps_persisted value.");
  649. }
  650. break;
  651. }
  652. file_boundaries_need_handling = true;
  653. assert(!meta.user_defined_timestamps_persisted ||
  654. mark_existing_ssts_with_no_udt);
  655. if (mark_existing_ssts_with_no_udt) {
  656. meta.user_defined_timestamps_persisted = false;
  657. }
  658. std::string smallest_buf;
  659. std::string largest_buf;
  660. Slice largest_slice = meta.largest.Encode();
  661. PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(),
  662. ts_sz);
  663. auto largest_footer = ExtractInternalKeyFooter(largest_slice);
  664. if (largest_footer == kRangeTombstoneSentinel) {
  665. PadInternalKeyWithMaxTimestamp(&largest_buf, largest_slice, ts_sz);
  666. } else {
  667. PadInternalKeyWithMinTimestamp(&largest_buf, largest_slice, ts_sz);
  668. }
  669. meta.smallest.DecodeFrom(smallest_buf);
  670. meta.largest.DecodeFrom(largest_buf);
  671. }
  672. return Status::OK();
  673. }
  674. VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
  675. bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
  676. VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
  677. const ReadOptions& read_options, bool allow_incomplete_valid_version,
  678. EpochNumberRequirement epoch_number_requirement)
  679. : VersionEditHandler(read_only, column_families, version_set,
  680. /*track_found_and_missing_files=*/true,
  681. /*no_error_if_files_missing=*/true, io_tracer,
  682. read_options, allow_incomplete_valid_version,
  683. epoch_number_requirement) {}
  684. VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
  685. for (const auto& cfid_and_version : atomic_update_versions_) {
  686. delete cfid_and_version.second;
  687. }
  688. for (const auto& elem : versions_) {
  689. delete elem.second;
  690. }
  691. versions_.clear();
  692. }
  693. Status VersionEditHandlerPointInTime::OnAtomicGroupReplayBegin() {
  694. if (in_atomic_group_) {
  695. return Status::Corruption("unexpected AtomicGroup start");
  696. }
  697. // The AtomicGroup that is about to begin may block column families in a valid
  698. // state from saving any more updates. So we should save any valid states
  699. // before proceeding.
  700. for (const auto& cfid_and_builder : builders_) {
  701. ColumnFamilyData* cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
  702. cfid_and_builder.first);
  703. assert(!cfd->IsDropped());
  704. assert(cfd->initialized());
  705. VersionEdit edit;
  706. Status s = MaybeCreateVersionBeforeApplyEdit(
  707. edit, cfd, true /* force_create_version */);
  708. if (!s.ok()) {
  709. return s;
  710. }
  711. }
  712. // An old AtomicGroup is incomplete. Throw away the versions that failed to
  713. // complete it. They must not be used for completing the upcoming
  714. // AtomicGroup since they are too old.
  715. for (auto& cfid_and_version : atomic_update_versions_) {
  716. delete cfid_and_version.second;
  717. }
  718. in_atomic_group_ = true;
  719. // We lazily assume the column families that exist at this point are all
  720. // involved in the AtomicGroup. Overestimating the scope of the AtomicGroup
  721. // will sometimes cause less data to be recovered, which is fine for
  722. // best-effort recovery.
  723. atomic_update_versions_.clear();
  724. for (const auto& cfid_and_builder : builders_) {
  725. atomic_update_versions_[cfid_and_builder.first] = nullptr;
  726. }
  727. atomic_update_versions_missing_ = atomic_update_versions_.size();
  728. return Status::OK();
  729. }
  730. Status VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd() {
  731. if (!in_atomic_group_) {
  732. return Status::Corruption("unexpected AtomicGroup end");
  733. }
  734. in_atomic_group_ = false;
  735. // The AtomicGroup must not have changed the column families. We don't support
  736. // CF adds or drops in an AtomicGroup.
  737. if (builders_.size() != atomic_update_versions_.size()) {
  738. return Status::Corruption("unexpected CF change in AtomicGroup");
  739. }
  740. for (const auto& cfid_and_builder : builders_) {
  741. if (atomic_update_versions_.find(cfid_and_builder.first) ==
  742. atomic_update_versions_.end()) {
  743. return Status::Corruption("unexpected CF add in AtomicGroup");
  744. }
  745. }
  746. for (const auto& cfid_and_version : atomic_update_versions_) {
  747. if (builders_.find(cfid_and_version.first) == builders_.end()) {
  748. return Status::Corruption("unexpected CF drop in AtomicGroup");
  749. }
  750. }
  751. return Status::OK();
  752. }
  753. void VersionEditHandlerPointInTime::CheckIterationResult(
  754. const log::Reader& reader, Status* s) {
  755. VersionEditHandler::CheckIterationResult(reader, s);
  756. assert(s != nullptr);
  757. if (s->ok()) {
  758. for (auto* cfd : *(version_set_->column_family_set_)) {
  759. if (cfd->IsDropped()) {
  760. continue;
  761. }
  762. assert(cfd->initialized());
  763. auto v_iter = versions_.find(cfd->GetID());
  764. auto builder_iter = builders_.find(cfd->GetID());
  765. if (v_iter != versions_.end()) {
  766. assert(v_iter->second != nullptr);
  767. assert(builder_iter != builders_.end());
  768. version_set_->AppendVersion(cfd, v_iter->second);
  769. versions_.erase(v_iter);
  770. // Let's clear found_files, since any files in that are part of the
  771. // installed Version. Any files that got obsoleted would have already
  772. // been moved to intermediate_files_
  773. builder_iter->second->version_builder()->ClearFoundFiles();
  774. }
  775. }
  776. } else {
  777. for (const auto& elem : versions_) {
  778. delete elem.second;
  779. }
  780. versions_.clear();
  781. }
  782. }
  783. ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
  784. const VersionEdit& edit) {
  785. ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
  786. uint32_t cfid = edit.GetColumnFamily();
  787. if (AtomicUpdateVersionsContains(cfid)) {
  788. AtomicUpdateVersionsDropCf(cfid);
  789. if (AtomicUpdateVersionsCompleted()) {
  790. AtomicUpdateVersionsApply();
  791. }
  792. }
  793. auto v_iter = versions_.find(cfid);
  794. if (v_iter != versions_.end()) {
  795. delete v_iter->second;
  796. versions_.erase(v_iter);
  797. }
  798. return cfd;
  799. }
  800. Status VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit(
  801. const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
  802. TEST_SYNC_POINT(
  803. "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
  804. "Begin1");
  805. TEST_SYNC_POINT(
  806. "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
  807. "Begin2");
  808. assert(cfd != nullptr);
  809. if (!force_create_version) {
  810. assert(edit.GetColumnFamily() == cfd->GetID());
  811. }
  812. bool missing_info = !version_edit_params_.HasLogNumber() ||
  813. !version_edit_params_.HasNextFile() ||
  814. !version_edit_params_.HasLastSequence();
  815. Status s;
  816. auto builder_iter = builders_.find(cfd->GetID());
  817. assert(builder_iter != builders_.end());
  818. VersionBuilder* builder = builder_iter->second->version_builder();
  819. const bool valid_pit_before_edit = builder->ValidVersionAvailable();
  820. builder->CreateOrReplaceSavePoint();
  821. s = builder->Apply(&edit);
  822. const bool valid_pit_after_edit = builder->ValidVersionAvailable();
  823. // A new version will be created if:
  824. // 1) no error has occurred so far, and
  825. // 2) log_number_, next_file_number_ and last_sequence_ are known, and
  826. // 3) not in an AtomicGroup
  827. // 4) any of the following:
  828. // a) a valid Version is available before applying the edit
  829. // and a valid Version is not available after the edit.
  830. // b) a valid Version is available after the edit and the
  831. // caller explicitly request that a new version be created.
  832. if (s.ok() && !missing_info && !in_atomic_group_ &&
  833. ((!valid_pit_after_edit && valid_pit_before_edit) ||
  834. (valid_pit_after_edit && force_create_version))) {
  835. const auto& mopts = cfd->GetLatestMutableCFOptions();
  836. auto* version = new Version(
  837. cfd, version_set_, version_set_->file_options_, mopts, io_tracer_,
  838. version_set_->current_version_number_++, epoch_number_requirement_);
  839. s = builder->LoadSavePointTableHandlers(
  840. cfd->internal_stats(),
  841. version_set_->db_options_->max_file_opening_threads, false, true, mopts,
  842. MaxFileSizeForL0MetaPin(mopts), read_options_);
  843. if (!s.ok()) {
  844. delete version;
  845. if (s.IsCorruption()) {
  846. s = Status::OK();
  847. }
  848. return s;
  849. }
  850. s = builder->SaveSavePointTo(version->storage_info());
  851. if (s.ok()) {
  852. if (AtomicUpdateVersionsContains(cfd->GetID())) {
  853. AtomicUpdateVersionsPut(version);
  854. if (AtomicUpdateVersionsCompleted()) {
  855. AtomicUpdateVersionsApply();
  856. }
  857. } else {
  858. version->PrepareAppend(
  859. read_options_,
  860. !version_set_->db_options_->skip_stats_update_on_db_open);
  861. auto v_iter = versions_.find(cfd->GetID());
  862. if (v_iter != versions_.end()) {
  863. delete v_iter->second;
  864. v_iter->second = version;
  865. } else {
  866. versions_.emplace(cfd->GetID(), version);
  867. }
  868. }
  869. } else {
  870. delete version;
  871. }
  872. }
  873. builder->ClearSavePoint();
  874. return s;
  875. }
  876. Status VersionEditHandlerPointInTime::VerifyFile(ColumnFamilyData* cfd,
  877. const std::string& fpath,
  878. int level,
  879. const FileMetaData& fmeta) {
  880. return version_set_->VerifyFileMetadata(read_options_, cfd, fpath, level,
  881. fmeta);
  882. }
  883. Status VersionEditHandlerPointInTime::VerifyBlobFile(
  884. ColumnFamilyData* cfd, uint64_t blob_file_num,
  885. const BlobFileAddition& blob_addition) {
  886. BlobSource* blob_source = cfd->blob_source();
  887. assert(blob_source);
  888. CacheHandleGuard<BlobFileReader> blob_file_reader;
  889. Status s = blob_source->GetBlobFileReader(read_options_, blob_file_num,
  890. &blob_file_reader);
  891. if (!s.ok()) {
  892. return s;
  893. }
  894. // TODO: verify checksum
  895. (void)blob_addition;
  896. return s;
  897. }
  898. Status VersionEditHandlerPointInTime::LoadTables(
  899. ColumnFamilyData* /*cfd*/, bool /*prefetch_index_and_filter_in_cache*/,
  900. bool /*is_initial_load*/) {
  901. return Status::OK();
  902. }
  903. bool VersionEditHandlerPointInTime::HasMissingFiles() const {
  904. for (const auto& builder : builders_) {
  905. if (builder.second->version_builder()->HasMissingFiles()) {
  906. return true;
  907. }
  908. }
  909. return false;
  910. }
  911. bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() {
  912. return atomic_update_versions_missing_ == 0;
  913. }
  914. bool VersionEditHandlerPointInTime::AtomicUpdateVersionsContains(
  915. uint32_t cfid) {
  916. return atomic_update_versions_.find(cfid) != atomic_update_versions_.end();
  917. }
  918. void VersionEditHandlerPointInTime::AtomicUpdateVersionsDropCf(uint32_t cfid) {
  919. assert(!AtomicUpdateVersionsCompleted());
  920. auto atomic_update_versions_iter = atomic_update_versions_.find(cfid);
  921. assert(atomic_update_versions_iter != atomic_update_versions_.end());
  922. if (atomic_update_versions_iter->second == nullptr) {
  923. atomic_update_versions_missing_--;
  924. } else {
  925. delete atomic_update_versions_iter->second;
  926. }
  927. atomic_update_versions_.erase(atomic_update_versions_iter);
  928. }
  929. void VersionEditHandlerPointInTime::AtomicUpdateVersionsPut(Version* version) {
  930. assert(!AtomicUpdateVersionsCompleted());
  931. auto atomic_update_versions_iter =
  932. atomic_update_versions_.find(version->cfd()->GetID());
  933. assert(atomic_update_versions_iter != atomic_update_versions_.end());
  934. if (atomic_update_versions_iter->second == nullptr) {
  935. atomic_update_versions_missing_--;
  936. } else {
  937. delete atomic_update_versions_iter->second;
  938. }
  939. atomic_update_versions_iter->second = version;
  940. }
  941. void VersionEditHandlerPointInTime::AtomicUpdateVersionsApply() {
  942. assert(AtomicUpdateVersionsCompleted());
  943. for (const auto& cfid_and_version : atomic_update_versions_) {
  944. uint32_t cfid = cfid_and_version.first;
  945. Version* version = cfid_and_version.second;
  946. assert(version != nullptr);
  947. version->PrepareAppend(
  948. read_options_,
  949. !version_set_->db_options_->skip_stats_update_on_db_open);
  950. auto versions_iter = versions_.find(cfid);
  951. if (versions_iter != versions_.end()) {
  952. delete versions_iter->second;
  953. versions_iter->second = version;
  954. } else {
  955. versions_.emplace(cfid, version);
  956. }
  957. }
  958. atomic_update_versions_.clear();
  959. }
  960. Status ManifestTailer::Initialize() {
  961. if (Mode::kRecovery == mode_) {
  962. return VersionEditHandler::Initialize();
  963. }
  964. assert(Mode::kCatchUp == mode_);
  965. Status s;
  966. if (!initialized_) {
  967. ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
  968. assert(cfd_set);
  969. ColumnFamilyData* default_cfd = cfd_set->GetDefault();
  970. assert(default_cfd);
  971. auto builder_iter = builders_.find(default_cfd->GetID());
  972. assert(builder_iter != builders_.end());
  973. Version* dummy_version = default_cfd->dummy_versions();
  974. assert(dummy_version);
  975. Version* base_version = dummy_version->Next();
  976. assert(base_version);
  977. base_version->Ref();
  978. VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
  979. default_cfd, base_version, this, track_found_and_missing_files_,
  980. allow_incomplete_valid_version_));
  981. builder_iter->second = std::move(new_builder);
  982. initialized_ = true;
  983. }
  984. return s;
  985. }
  986. Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit,
  987. ColumnFamilyData** cfd) {
  988. Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd);
  989. if (s.ok()) {
  990. assert(cfd);
  991. if (*cfd) {
  992. cfds_changed_.insert(*cfd);
  993. }
  994. }
  995. return s;
  996. }
  997. Status ManifestTailer::OnColumnFamilyAdd(VersionEdit& edit,
  998. ColumnFamilyData** cfd) {
  999. if (Mode::kRecovery == mode_) {
  1000. return VersionEditHandler::OnColumnFamilyAdd(edit, cfd);
  1001. }
  1002. assert(Mode::kCatchUp == mode_);
  1003. ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
  1004. assert(cfd_set);
  1005. ColumnFamilyData* tmp_cfd = cfd_set->GetColumnFamily(edit.GetColumnFamily());
  1006. assert(cfd);
  1007. *cfd = tmp_cfd;
  1008. if (!tmp_cfd) {
  1009. // For now, ignore new column families created after Recover() succeeds.
  1010. return Status::OK();
  1011. }
  1012. auto builder_iter = builders_.find(edit.GetColumnFamily());
  1013. assert(builder_iter != builders_.end());
  1014. Version* dummy_version = tmp_cfd->dummy_versions();
  1015. assert(dummy_version);
  1016. Version* base_version = dummy_version->Next();
  1017. assert(base_version);
  1018. base_version->Ref();
  1019. VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
  1020. tmp_cfd, base_version, this, track_found_and_missing_files_));
  1021. builder_iter->second = std::move(new_builder);
  1022. #ifndef NDEBUG
  1023. auto version_iter = versions_.find(edit.GetColumnFamily());
  1024. assert(version_iter == versions_.end());
  1025. #endif // !NDEBUG
  1026. return Status::OK();
  1027. }
  1028. void ManifestTailer::CheckIterationResult(const log::Reader& reader,
  1029. Status* s) {
  1030. VersionEditHandlerPointInTime::CheckIterationResult(reader, s);
  1031. assert(s);
  1032. if (s->ok()) {
  1033. if (Mode::kRecovery == mode_) {
  1034. mode_ = Mode::kCatchUp;
  1035. } else {
  1036. assert(Mode::kCatchUp == mode_);
  1037. }
  1038. }
  1039. }
  1040. std::vector<std::string> ManifestTailer::GetAndClearIntermediateFiles() {
  1041. std::vector<std::string> res;
  1042. for (const auto& builder : builders_) {
  1043. auto files =
  1044. builder.second->version_builder()->GetAndClearIntermediateFiles();
  1045. res.insert(res.end(), std::make_move_iterator(files.begin()),
  1046. std::make_move_iterator(files.end()));
  1047. files.erase(files.begin(), files.end());
  1048. }
  1049. return res;
  1050. }
  1051. Status ManifestTailer::VerifyFile(ColumnFamilyData* cfd,
  1052. const std::string& fpath, int level,
  1053. const FileMetaData& fmeta) {
  1054. Status s =
  1055. VersionEditHandlerPointInTime::VerifyFile(cfd, fpath, level, fmeta);
  1056. // TODO: Open file or create hard link to prevent the file from being
  1057. // deleted.
  1058. return s;
  1059. }
  1060. void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
  1061. Status* s) {
  1062. VersionEditHandler::CheckIterationResult(reader, s);
  1063. if (!s->ok()) {
  1064. fprintf(stdout, "%s\n", s->ToString().c_str());
  1065. return;
  1066. }
  1067. assert(cf_to_cmp_names_);
  1068. for (auto* cfd : *(version_set_->column_family_set_)) {
  1069. fprintf(stdout,
  1070. "--------------- Column family \"%s\" (ID %" PRIu32
  1071. ") --------------\n",
  1072. cfd->GetName().c_str(), cfd->GetID());
  1073. fprintf(stdout, "log number: %" PRIu64 "\n", cfd->GetLogNumber());
  1074. auto it = cf_to_cmp_names_->find(cfd->GetID());
  1075. if (it != cf_to_cmp_names_->end()) {
  1076. fprintf(stdout,
  1077. "comparator: <%s>, but the comparator object is not available.\n",
  1078. it->second.c_str());
  1079. } else {
  1080. fprintf(stdout, "comparator: %s\n", cfd->user_comparator()->Name());
  1081. }
  1082. assert(cfd->current());
  1083. // Print out DebugStrings. Can include non-terminating null characters.
  1084. fwrite(cfd->current()->DebugString(hex_).data(), sizeof(char),
  1085. cfd->current()->DebugString(hex_).size(), stdout);
  1086. fprintf(stdout,
  1087. "By default, manifest file dump prints LSM trees as if %d levels "
  1088. "were configured, "
  1089. "which is not necessarily true for the column family (CF) this "
  1090. "manifest is associated with. "
  1091. "Please consult other DB files, such as the OPTIONS file, to "
  1092. "confirm.\n",
  1093. cfd->ioptions().num_levels);
  1094. }
  1095. fprintf(stdout,
  1096. "next_file_number %" PRIu64 " last_sequence %" PRIu64
  1097. " prev_log_number %" PRIu64 " max_column_family %" PRIu32
  1098. " min_log_number_to_keep %" PRIu64 "\n",
  1099. version_set_->current_next_file_number(),
  1100. version_set_->LastSequence(), version_set_->prev_log_number(),
  1101. version_set_->column_family_set_->GetMaxColumnFamily(),
  1102. version_set_->min_log_number_to_keep());
  1103. }
  1104. } // namespace ROCKSDB_NAMESPACE