repair.cc 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. // Repairer does best effort recovery to recover as much data as possible after
  11. // a disaster without compromising consistency. It does not guarantee bringing
  12. // the database to a time consistent state.
  13. //
  14. // Repair process is broken into 4 phases:
  15. // (a) Find files
  16. // (b) Convert logs to tables
  17. // (c) Extract metadata
  18. // (d) Write Descriptor
  19. //
  20. // (a) Find files
  21. //
  22. // The repairer goes through all the files in the directory, and classifies them
  23. // based on their file name. Any file that cannot be identified by name will be
  24. // ignored.
  25. //
  26. // (b) Convert logs to table
  27. //
  28. // Every log file that is active is replayed. All sections of the file where the
  29. // checksum does not match is skipped over. We intentionally give preference to
  30. // data consistency.
  31. //
  32. // (c) Extract metadata
  33. //
  34. // We scan every table to compute
  35. // (1) smallest/largest for the table
  36. // (2) largest sequence number in the table
  37. // (3) oldest blob file referred to by the table (if applicable)
  38. //
  39. // If we are unable to scan the file, then we ignore the table.
  40. //
  41. // (d) Write Descriptor
  42. //
  43. // We generate descriptor contents:
  44. // - log number is set to zero
  45. // - next-file-number is set to 1 + largest file number we found
  46. // - last-sequence-number is set to largest sequence# found across
  47. // all tables (see 2c)
  48. // - compaction pointers are cleared
  49. // - every table file is added at level 0
  50. //
  51. // Possible optimization 1:
  52. // (a) Compute total size and use to pick appropriate max-level M
  53. // (b) Sort tables by largest sequence# in the table
  54. // (c) For each table: if it overlaps earlier table, place in level-0,
  55. // else place in level-M.
  56. // (d) We can provide options for time consistent recovery and unsafe recovery
  57. // (ignore checksum failure when applicable)
  58. // Possible optimization 2:
  59. // Store per-table metadata (smallest, largest, largest-seq#, ...)
  60. // in the table's meta section to speed up ScanTable.
  61. #include <cinttypes>
  62. #include "db/builder.h"
  63. #include "db/db_impl/db_impl.h"
  64. #include "db/dbformat.h"
  65. #include "db/log_reader.h"
  66. #include "db/log_writer.h"
  67. #include "db/memtable.h"
  68. #include "db/table_cache.h"
  69. #include "db/version_builder.h"
  70. #include "db/version_edit.h"
  71. #include "db/write_batch_internal.h"
  72. #include "file/filename.h"
  73. #include "file/writable_file_writer.h"
  74. #include "logging/logging.h"
  75. #include "options/cf_options.h"
  76. #include "rocksdb/comparator.h"
  77. #include "rocksdb/db.h"
  78. #include "rocksdb/env.h"
  79. #include "rocksdb/options.h"
  80. #include "rocksdb/write_buffer_manager.h"
  81. #include "table/unique_id_impl.h"
  82. #include "util/string_util.h"
  83. namespace ROCKSDB_NAMESPACE {
  84. namespace {
  85. class Repairer {
  86. public:
  87. Repairer(const std::string& dbname, const DBOptions& db_options,
  88. const std::vector<ColumnFamilyDescriptor>& column_families,
  89. const ColumnFamilyOptions& default_cf_opts,
  90. const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
  91. : dbname_(dbname),
  92. db_session_id_(DBImpl::GenerateDbSessionId(db_options.env)),
  93. env_(db_options.env),
  94. file_options_(),
  95. db_options_(SanitizeOptions(dbname_, db_options)),
  96. immutable_db_options_(ImmutableDBOptions(db_options_)),
  97. icmp_(default_cf_opts.comparator),
  98. default_cf_opts_(SanitizeCfOptions(immutable_db_options_,
  99. /*read_only*/ false,
  100. default_cf_opts)),
  101. default_iopts_(
  102. ImmutableOptions(immutable_db_options_, default_cf_opts_)),
  103. default_mopts_(MutableCFOptions(default_cf_opts_)),
  104. unknown_cf_opts_(SanitizeCfOptions(immutable_db_options_,
  105. /*read_only*/ false,
  106. unknown_cf_opts)),
  107. create_unknown_cfs_(create_unknown_cfs),
  108. raw_table_cache_(
  109. // TableCache can be small since we expect each table to be opened
  110. // once.
  111. NewLRUCache(10, db_options_.table_cache_numshardbits)),
  112. table_cache_(new TableCache(default_iopts_, &file_options_,
  113. raw_table_cache_.get(),
  114. /*block_cache_tracer=*/nullptr,
  115. /*io_tracer=*/nullptr, db_session_id_)),
  116. wb_(db_options_.db_write_buffer_size),
  117. wc_(db_options_.delayed_write_rate),
  118. vset_(dbname_, &immutable_db_options_, file_options_,
  119. raw_table_cache_.get(), &wb_, &wc_,
  120. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  121. /*db_id=*/"", db_session_id_, db_options.daily_offpeak_time_utc,
  122. /*error_handler=*/nullptr, /*read_only=*/false),
  123. next_file_number_(1),
  124. db_lock_(nullptr),
  125. closed_(false) {
  126. for (const auto& cfd : column_families) {
  127. cf_name_to_opts_[cfd.name] = cfd.options;
  128. }
  129. }
  130. const ColumnFamilyOptions* GetColumnFamilyOptions(
  131. const std::string& cf_name) {
  132. if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
  133. if (create_unknown_cfs_) {
  134. return &unknown_cf_opts_;
  135. }
  136. return nullptr;
  137. }
  138. return &cf_name_to_opts_[cf_name];
  139. }
  140. // Adds a column family to the VersionSet with cf_options_ and updates
  141. // manifest.
  142. Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
  143. // TODO: plumb Env::IOActivity, Env::IOPriority;
  144. const ReadOptions read_options;
  145. const WriteOptions write_options;
  146. const auto* cf_opts = GetColumnFamilyOptions(cf_name);
  147. if (cf_opts == nullptr) {
  148. return Status::Corruption("Encountered unknown column family with name=" +
  149. cf_name + ", id=" + std::to_string(cf_id));
  150. }
  151. Options opts(db_options_, *cf_opts);
  152. VersionEdit edit;
  153. edit.SetComparatorName(opts.comparator->Name());
  154. edit.SetPersistUserDefinedTimestamps(opts.persist_user_defined_timestamps);
  155. edit.SetLogNumber(0);
  156. edit.SetColumnFamily(cf_id);
  157. ColumnFamilyData* cfd;
  158. cfd = nullptr;
  159. edit.AddColumnFamily(cf_name);
  160. mutex_.Lock();
  161. std::unique_ptr<FSDirectory> db_dir;
  162. Status status = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(),
  163. &db_dir, nullptr);
  164. if (status.ok()) {
  165. status = vset_.LogAndApply(cfd, read_options, write_options, &edit,
  166. &mutex_, db_dir.get(),
  167. false /* new_descriptor_log */, cf_opts);
  168. }
  169. mutex_.Unlock();
  170. return status;
  171. }
  172. Status Close() {
  173. Status s = Status::OK();
  174. if (!closed_) {
  175. if (db_lock_ != nullptr) {
  176. s = env_->UnlockFile(db_lock_);
  177. db_lock_ = nullptr;
  178. }
  179. closed_ = true;
  180. }
  181. return s;
  182. }
  183. ~Repairer() { Close().PermitUncheckedError(); }
  184. Status Run() {
  185. Status status = env_->LockFile(LockFileName(dbname_), &db_lock_);
  186. if (!status.ok()) {
  187. return status;
  188. }
  189. status = FindFiles();
  190. DBImpl* db_impl = nullptr;
  191. if (status.ok()) {
  192. // Discard older manifests and start a fresh one
  193. for (size_t i = 0; i < manifests_.size(); i++) {
  194. ArchiveFile(dbname_ + "/" + manifests_[i]);
  195. }
  196. // Just create a DBImpl temporarily so we can reuse NewDB()
  197. db_impl = new DBImpl(db_options_, dbname_);
  198. status = db_impl->NewDB(/*new_filenames=*/nullptr);
  199. }
  200. delete db_impl;
  201. if (status.ok()) {
  202. // Recover using the fresh manifest created by NewDB()
  203. status =
  204. vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
  205. }
  206. if (status.ok()) {
  207. // Need to scan existing SST files first so the column families are
  208. // created before we process WAL files
  209. ExtractMetaData();
  210. // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
  211. // extract -- we need to clear it here since metadata for existing SST
  212. // files has been extracted already
  213. table_fds_.clear();
  214. ConvertLogFilesToTables();
  215. ExtractMetaData();
  216. status = AddTables();
  217. }
  218. if (status.ok()) {
  219. uint64_t bytes = 0;
  220. for (size_t i = 0; i < tables_.size(); i++) {
  221. bytes += tables_[i].meta.fd.GetFileSize();
  222. }
  223. ROCKS_LOG_WARN(db_options_.info_log,
  224. "**** Repaired rocksdb %s; "
  225. "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
  226. " bytes. "
  227. "Some data may have been lost. "
  228. "****",
  229. dbname_.c_str(), tables_.size(), bytes);
  230. }
  231. return status;
  232. }
  233. private:
  234. struct TableInfo {
  235. FileMetaData meta;
  236. uint32_t column_family_id;
  237. std::string column_family_name;
  238. };
  239. std::string const dbname_;
  240. std::string db_session_id_;
  241. Env* const env_;
  242. const FileOptions file_options_;
  243. const DBOptions db_options_;
  244. const ImmutableDBOptions immutable_db_options_;
  245. const InternalKeyComparator icmp_;
  246. const ColumnFamilyOptions default_cf_opts_;
  247. const ImmutableOptions default_iopts_; // table_cache_ holds reference
  248. const MutableCFOptions default_mopts_;
  249. const ColumnFamilyOptions unknown_cf_opts_;
  250. const bool create_unknown_cfs_;
  251. std::shared_ptr<Cache> raw_table_cache_;
  252. std::unique_ptr<TableCache> table_cache_;
  253. WriteBufferManager wb_;
  254. WriteController wc_;
  255. VersionSet vset_;
  256. std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
  257. InstrumentedMutex mutex_;
  258. std::vector<std::string> manifests_;
  259. std::vector<FileDescriptor> table_fds_;
  260. std::vector<uint64_t> logs_;
  261. std::vector<TableInfo> tables_;
  262. uint64_t next_file_number_;
  263. // Lock over the persistent DB state. Non-nullptr iff successfully
  264. // acquired.
  265. FileLock* db_lock_;
  266. bool closed_;
  267. Status FindFiles() {
  268. std::vector<std::string> filenames;
  269. bool found_file = false;
  270. std::vector<std::string> to_search_paths;
  271. for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
  272. to_search_paths.push_back(db_options_.db_paths[path_id].path);
  273. }
  274. // search wal_dir if user uses a customize wal_dir
  275. bool same = immutable_db_options_.IsWalDirSameAsDBPath(dbname_);
  276. if (!same) {
  277. to_search_paths.push_back(immutable_db_options_.wal_dir);
  278. }
  279. for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
  280. ROCKS_LOG_INFO(db_options_.info_log, "Searching path %s\n",
  281. to_search_paths[path_id].c_str());
  282. Status status = env_->GetChildren(to_search_paths[path_id], &filenames);
  283. if (!status.ok()) {
  284. return status;
  285. }
  286. if (!filenames.empty()) {
  287. found_file = true;
  288. }
  289. uint64_t number;
  290. FileType type;
  291. for (size_t i = 0; i < filenames.size(); i++) {
  292. if (ParseFileName(filenames[i], &number, &type)) {
  293. if (type == kDescriptorFile) {
  294. manifests_.push_back(filenames[i]);
  295. } else {
  296. if (number + 1 > next_file_number_) {
  297. next_file_number_ = number + 1;
  298. }
  299. if (type == kWalFile) {
  300. logs_.push_back(number);
  301. } else if (type == kTableFile) {
  302. table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
  303. 0);
  304. } else {
  305. // Ignore other files
  306. }
  307. }
  308. }
  309. }
  310. }
  311. if (!found_file) {
  312. return Status::Corruption(dbname_, "repair found no files");
  313. }
  314. return Status::OK();
  315. }
  316. void ConvertLogFilesToTables() {
  317. const auto& wal_dir = immutable_db_options_.GetWalDir();
  318. for (size_t i = 0; i < logs_.size(); i++) {
  319. // we should use LogFileName(wal_dir, logs_[i]) here. user might uses
  320. // wal_dir option.
  321. std::string logname = LogFileName(wal_dir, logs_[i]);
  322. Status status = ConvertLogToTable(wal_dir, logs_[i]);
  323. if (!status.ok()) {
  324. ROCKS_LOG_WARN(db_options_.info_log,
  325. "Log #%" PRIu64 ": ignoring conversion error: %s",
  326. logs_[i], status.ToString().c_str());
  327. }
  328. ArchiveFile(logname);
  329. }
  330. }
  331. Status ConvertLogToTable(const std::string& wal_dir, uint64_t log) {
  332. struct LogReporter : public log::Reader::Reporter {
  333. Env* env;
  334. std::shared_ptr<Logger> info_log;
  335. uint64_t lognum;
  336. void Corruption(size_t bytes, const Status& s,
  337. uint64_t log_number = kMaxSequenceNumber) override {
  338. // We print error messages for corruption, but continue repairing.
  339. ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
  340. log_number == kMaxSequenceNumber ? lognum : log_number,
  341. static_cast<int>(bytes), s.ToString().c_str());
  342. }
  343. };
  344. // Open the log file
  345. std::string logname = LogFileName(wal_dir, log);
  346. const auto& fs = env_->GetFileSystem();
  347. std::unique_ptr<SequentialFileReader> lfile_reader;
  348. Status status = SequentialFileReader::Create(
  349. fs, logname, fs->OptimizeForLogRead(file_options_), &lfile_reader,
  350. nullptr /* dbg */, nullptr /* rate limiter */);
  351. if (!status.ok()) {
  352. return status;
  353. }
  354. // Create the log reader.
  355. LogReporter reporter;
  356. reporter.env = env_;
  357. reporter.info_log = db_options_.info_log;
  358. reporter.lognum = log;
  359. // We intentionally make log::Reader do checksumming so that
  360. // corruptions cause entire commits to be skipped instead of
  361. // propagating bad information (like overly large sequence
  362. // numbers).
  363. log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
  364. true /*enable checksum*/, log);
  365. // Initialize per-column family memtables
  366. for (auto* cfd : *vset_.GetColumnFamilySet()) {
  367. cfd->CreateNewMemtable(kMaxSequenceNumber);
  368. }
  369. auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
  370. // Read all the records and add to a memtable
  371. const UnorderedMap<uint32_t, size_t>& running_ts_sz =
  372. vset_.GetRunningColumnFamiliesTimestampSize();
  373. std::string scratch;
  374. Slice record;
  375. WriteBatch batch;
  376. int counter = 0;
  377. while (reader.ReadRecord(&record, &scratch)) {
  378. if (record.size() < WriteBatchInternal::kHeader) {
  379. reporter.Corruption(record.size(),
  380. Status::Corruption("log record too small"));
  381. continue;
  382. }
  383. Status record_status = WriteBatchInternal::SetContents(&batch, record);
  384. if (record_status.ok()) {
  385. const UnorderedMap<uint32_t, size_t>& record_ts_sz =
  386. reader.GetRecordedTimestampSize();
  387. // Use same value for `seq_per_batch` and `batch_per_txn` as
  388. // WriteBatchInternal::InsertInto does below.
  389. record_status = HandleWriteBatchTimestampSizeDifference(
  390. &batch, running_ts_sz, record_ts_sz,
  391. TimestampSizeConsistencyMode::kVerifyConsistency,
  392. /* seq_per_batch */ false, /* batch_per_txn */ true);
  393. if (record_status.ok()) {
  394. record_status =
  395. WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
  396. }
  397. }
  398. if (record_status.ok()) {
  399. counter += WriteBatchInternal::Count(&batch);
  400. } else {
  401. ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
  402. log, record_status.ToString().c_str());
  403. }
  404. }
  405. // Dump a table for each column family with entries in this log file.
  406. for (auto* cfd : *vset_.GetColumnFamilySet()) {
  407. // Do not record a version edit for this conversion to a Table
  408. // since ExtractMetaData() will also generate edits.
  409. MemTable* mem = cfd->mem();
  410. if (mem->IsEmpty()) {
  411. continue;
  412. }
  413. FileMetaData meta;
  414. meta.fd = FileDescriptor(next_file_number_++, 0, 0);
  415. // TODO: plumb Env::IOActivity, Env::IOPriority
  416. ReadOptions ro;
  417. ro.total_order_seek = true;
  418. Arena arena;
  419. ScopedArenaPtr<InternalIterator> iter(
  420. mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
  421. /*prefix_extractor=*/nullptr, /*for_flush=*/true));
  422. int64_t _current_time = 0;
  423. immutable_db_options_.clock->GetCurrentTime(&_current_time)
  424. .PermitUncheckedError(); // ignore error
  425. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  426. meta.file_creation_time = current_time;
  427. SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
  428. auto write_hint = cfd->current()->storage_info()->CalculateSSTWriteHint(
  429. /*level=*/0, db_options_.calculate_sst_write_lifetime_hint_set);
  430. std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
  431. range_del_iters;
  432. auto range_del_iter = mem->NewRangeTombstoneIterator(
  433. ro, kMaxSequenceNumber, false /* immutable_memtable */);
  434. if (range_del_iter != nullptr) {
  435. range_del_iters.emplace_back(range_del_iter);
  436. }
  437. IOStatus io_s;
  438. CompressionOptions default_compression;
  439. // TODO: plumb Env::IOActivity, Env::IOPriority
  440. const ReadOptions read_options;
  441. const WriteOptions write_option(Env::IO_HIGH);
  442. TableBuilderOptions tboptions(
  443. cfd->ioptions(), cfd->GetLatestMutableCFOptions(), read_options,
  444. write_option, cfd->internal_comparator(),
  445. cfd->internal_tbl_prop_coll_factories(), kNoCompression,
  446. default_compression, cfd->GetID(), cfd->GetName(), -1 /* level */,
  447. current_time /* newest_key_time */, false /* is_bottommost */,
  448. TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
  449. 0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_,
  450. 0 /*target_file_size*/, meta.fd.GetNumber());
  451. SeqnoToTimeMapping empty_seqno_to_time_mapping;
  452. status = BuildTable(
  453. dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
  454. file_options_, table_cache_.get(), iter.get(),
  455. std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
  456. {}, kMaxSequenceNumber, kMaxSequenceNumber, kMaxSequenceNumber,
  457. snapshot_checker, false /* paranoid_file_checks*/,
  458. nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/,
  459. BlobFileCreationReason::kRecovery,
  460. nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */,
  461. 0 /* job_id */, nullptr /* table_properties */, write_hint);
  462. ROCKS_LOG_INFO(db_options_.info_log,
  463. "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
  464. log, counter, meta.fd.GetNumber(),
  465. status.ToString().c_str());
  466. if (status.ok()) {
  467. if (meta.fd.GetFileSize() > 0) {
  468. table_fds_.push_back(meta.fd);
  469. }
  470. } else {
  471. break;
  472. }
  473. }
  474. delete cf_mems;
  475. return status;
  476. }
  477. void ExtractMetaData() {
  478. for (size_t i = 0; i < table_fds_.size(); i++) {
  479. TableInfo t;
  480. t.meta.fd = table_fds_[i];
  481. Status status = ScanTable(&t);
  482. if (!status.ok()) {
  483. std::string fname = TableFileName(
  484. db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
  485. char file_num_buf[kFormatFileNumberBufSize];
  486. FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
  487. file_num_buf, sizeof(file_num_buf));
  488. ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
  489. file_num_buf, status.ToString().c_str());
  490. ArchiveFile(fname);
  491. } else {
  492. tables_.push_back(t);
  493. }
  494. }
  495. }
  496. Status ScanTable(TableInfo* t) {
  497. std::string fname = TableFileName(
  498. db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
  499. int counter = 0;
  500. uint64_t file_size;
  501. Status status = env_->GetFileSize(fname, &file_size);
  502. t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
  503. file_size);
  504. std::shared_ptr<const TableProperties> props;
  505. if (status.ok()) {
  506. // TODO: plumb Env::IOActivity, Env::IOPriority
  507. const ReadOptions read_options;
  508. status = table_cache_->GetTableProperties(
  509. file_options_, read_options, icmp_, t->meta, &props, default_mopts_);
  510. }
  511. if (status.ok()) {
  512. auto s =
  513. GetSstInternalUniqueId(props->db_id, props->db_session_id,
  514. props->orig_file_number, &t->meta.unique_id);
  515. if (!s.ok()) {
  516. ROCKS_LOG_WARN(db_options_.info_log,
  517. "Table #%" PRIu64
  518. ": unable to get unique id, default to Unknown.",
  519. t->meta.fd.GetNumber());
  520. }
  521. t->column_family_id = static_cast<uint32_t>(props->column_family_id);
  522. if (t->column_family_id ==
  523. TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
  524. ROCKS_LOG_WARN(
  525. db_options_.info_log,
  526. "Table #%" PRIu64
  527. ": column family unknown (probably due to legacy format); "
  528. "adding to default column family id 0.",
  529. t->meta.fd.GetNumber());
  530. t->column_family_id = 0;
  531. }
  532. if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
  533. nullptr) {
  534. status =
  535. AddColumnFamily(props->column_family_name, t->column_family_id);
  536. }
  537. t->meta.oldest_ancester_time = props->creation_time;
  538. t->meta.user_defined_timestamps_persisted =
  539. static_cast<bool>(props->user_defined_timestamps_persisted);
  540. }
  541. if (status.ok()) {
  542. uint64_t tail_size = FileMetaData::CalculateTailSize(file_size, *props);
  543. t->meta.tail_size = tail_size;
  544. }
  545. ColumnFamilyData* cfd = nullptr;
  546. if (status.ok()) {
  547. cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
  548. if (cfd->GetName() != props->column_family_name) {
  549. ROCKS_LOG_ERROR(
  550. db_options_.info_log,
  551. "Table #%" PRIu64
  552. ": inconsistent column family name '%s'; expected '%s' for column "
  553. "family id %" PRIu32 ".",
  554. t->meta.fd.GetNumber(), props->column_family_name.c_str(),
  555. cfd->GetName().c_str(), t->column_family_id);
  556. status = Status::Corruption(dbname_, "inconsistent column family name");
  557. }
  558. }
  559. if (status.ok()) {
  560. // TODO: plumb Env::IOActivity, Env::IOPriority
  561. ReadOptions ropts;
  562. ropts.total_order_seek = true;
  563. InternalIterator* iter = table_cache_->NewIterator(
  564. ropts, file_options_, cfd->internal_comparator(), t->meta,
  565. nullptr /* range_del_agg */, cfd->GetLatestMutableCFOptions(),
  566. /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
  567. TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
  568. /*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
  569. /*smallest_compaction_key=*/nullptr,
  570. /*largest_compaction_key=*/nullptr,
  571. /*allow_unprepared_value=*/false);
  572. ParsedInternalKey parsed;
  573. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  574. Slice key = iter->key();
  575. Status pik_status =
  576. ParseInternalKey(key, &parsed, db_options_.allow_data_in_errors);
  577. if (!pik_status.ok()) {
  578. ROCKS_LOG_ERROR(db_options_.info_log,
  579. "Table #%" PRIu64 ": unparsable key - %s",
  580. t->meta.fd.GetNumber(), pik_status.getState());
  581. continue;
  582. }
  583. counter++;
  584. status = t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
  585. parsed.type);
  586. if (!status.ok()) {
  587. break;
  588. }
  589. }
  590. if (status.ok() && !iter->status().ok()) {
  591. status = iter->status();
  592. }
  593. delete iter;
  594. ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
  595. t->meta.fd.GetNumber(), counter,
  596. status.ToString().c_str());
  597. }
  598. if (status.ok()) {
  599. // XXX/FIXME: This is just basic, naive handling of range tombstones,
  600. // like call to UpdateBoundariesForRange in builder.cc where we assume
  601. // an SST file is a full sorted run. This probably needs the extra logic
  602. // from compaction_job.cc around call to UpdateBoundariesForRange (to
  603. // handle range tombstones extendingg beyond range of other entries).
  604. // TODO: plumb Env::IOActivity, Env::IOPriority
  605. ReadOptions ropts;
  606. std::unique_ptr<FragmentedRangeTombstoneIterator> r_iter;
  607. status = table_cache_->GetRangeTombstoneIterator(
  608. ropts, cfd->internal_comparator(), t->meta,
  609. cfd->GetLatestMutableCFOptions(), &r_iter);
  610. if (r_iter) {
  611. r_iter->SeekToFirst();
  612. while (r_iter->Valid()) {
  613. auto tombstone = r_iter->Tombstone();
  614. auto kv = tombstone.Serialize();
  615. t->meta.UpdateBoundariesForRange(
  616. kv.first, tombstone.SerializeEndKey(), tombstone.seq_,
  617. cfd->internal_comparator());
  618. r_iter->Next();
  619. }
  620. }
  621. }
  622. return status;
  623. }
  624. Status AddTables() {
  625. // TODO: plumb Env::IOActivity, Env::IOPriority;
  626. const ReadOptions read_options;
  627. const WriteOptions write_options;
  628. std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
  629. SequenceNumber max_sequence = 0;
  630. for (size_t i = 0; i < tables_.size(); i++) {
  631. cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
  632. if (max_sequence < tables_[i].meta.fd.largest_seqno) {
  633. max_sequence = tables_[i].meta.fd.largest_seqno;
  634. }
  635. }
  636. vset_.SetLastAllocatedSequence(max_sequence);
  637. vset_.SetLastPublishedSequence(max_sequence);
  638. vset_.SetLastSequence(max_sequence);
  639. for (const auto& cf_id_and_tables : cf_id_to_tables) {
  640. auto* cfd =
  641. vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
  642. // Recover files' epoch number using dummy VersionStorageInfo
  643. VersionBuilder dummy_version_builder(
  644. cfd->current()->version_set()->file_options(), &cfd->ioptions(),
  645. cfd->table_cache(), cfd->current()->storage_info(),
  646. cfd->current()->version_set(),
  647. cfd->GetFileMetadataCacheReservationManager());
  648. VersionStorageInfo dummy_vstorage(
  649. &cfd->internal_comparator(), cfd->user_comparator(),
  650. cfd->NumberLevels(), cfd->ioptions().compaction_style,
  651. nullptr /* src_vstorage */, cfd->ioptions().force_consistency_checks,
  652. EpochNumberRequirement::kMightMissing, cfd->ioptions().clock,
  653. /*bottommost_file_compaction_delay=*/0,
  654. cfd->current()->version_set()->offpeak_time_option());
  655. Status s;
  656. VersionEdit dummy_edit;
  657. for (const auto* table : cf_id_and_tables.second) {
  658. // TODO(opt): separate out into multiple levels
  659. dummy_edit.AddFile(
  660. 0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
  661. table->meta.fd.GetFileSize(), table->meta.smallest,
  662. table->meta.largest, table->meta.fd.smallest_seqno,
  663. table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
  664. table->meta.temperature, table->meta.oldest_blob_file_number,
  665. table->meta.oldest_ancester_time, table->meta.file_creation_time,
  666. table->meta.epoch_number, table->meta.file_checksum,
  667. table->meta.file_checksum_func_name, table->meta.unique_id,
  668. table->meta.compensated_range_deletion_size, table->meta.tail_size,
  669. table->meta.user_defined_timestamps_persisted);
  670. }
  671. s = dummy_version_builder.Apply(&dummy_edit);
  672. if (s.ok()) {
  673. s = dummy_version_builder.SaveTo(&dummy_vstorage);
  674. }
  675. if (s.ok()) {
  676. dummy_vstorage.RecoverEpochNumbers(cfd);
  677. }
  678. if (s.ok()) {
  679. // Record changes from this repair in VersionEdit, including files with
  680. // recovered epoch numbers
  681. VersionEdit edit;
  682. edit.SetComparatorName(cfd->user_comparator()->Name());
  683. edit.SetPersistUserDefinedTimestamps(
  684. cfd->ioptions().persist_user_defined_timestamps);
  685. edit.SetLogNumber(0);
  686. edit.SetNextFile(next_file_number_);
  687. edit.SetColumnFamily(cfd->GetID());
  688. for (int level = 0; level < dummy_vstorage.num_levels(); ++level) {
  689. for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
  690. edit.AddFile(level, *file_meta);
  691. }
  692. }
  693. // Release resources occupied by the dummy VersionStorageInfo
  694. for (int level = 0; level < dummy_vstorage.num_levels(); ++level) {
  695. for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
  696. file_meta->refs--;
  697. if (file_meta->refs <= 0) {
  698. delete file_meta;
  699. }
  700. }
  701. }
  702. // Persist record of changes
  703. assert(next_file_number_ > 0);
  704. vset_.MarkFileNumberUsed(next_file_number_ - 1);
  705. mutex_.Lock();
  706. std::unique_ptr<FSDirectory> db_dir;
  707. s = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(), &db_dir,
  708. nullptr);
  709. if (s.ok()) {
  710. s = vset_.LogAndApply(cfd, read_options, write_options, &edit,
  711. &mutex_, db_dir.get(),
  712. false /* new_descriptor_log */);
  713. }
  714. mutex_.Unlock();
  715. }
  716. if (!s.ok()) {
  717. return s;
  718. }
  719. }
  720. return Status::OK();
  721. }
  722. void ArchiveFile(const std::string& fname) {
  723. // Move into another directory. E.g., for
  724. // dir/foo
  725. // rename to
  726. // dir/lost/foo
  727. const char* slash = strrchr(fname.c_str(), '/');
  728. std::string new_dir;
  729. if (slash != nullptr) {
  730. new_dir.assign(fname.data(), slash - fname.data());
  731. }
  732. new_dir.append("/lost");
  733. env_->CreateDir(new_dir).PermitUncheckedError(); // Ignore error
  734. std::string new_file = new_dir;
  735. new_file.append("/");
  736. new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
  737. Status s = env_->RenameFile(fname, new_file);
  738. ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
  739. s.ToString().c_str());
  740. }
  741. };
  742. Status GetDefaultCFOptions(
  743. const std::vector<ColumnFamilyDescriptor>& column_families,
  744. ColumnFamilyOptions* res) {
  745. assert(res != nullptr);
  746. auto iter = std::find_if(column_families.begin(), column_families.end(),
  747. [](const ColumnFamilyDescriptor& cfd) {
  748. return cfd.name == kDefaultColumnFamilyName;
  749. });
  750. if (iter == column_families.end()) {
  751. return Status::InvalidArgument(
  752. "column_families", "Must contain entry for default column family");
  753. }
  754. *res = iter->options;
  755. return Status::OK();
  756. }
  757. } // anonymous namespace
  758. Status RepairDB(const std::string& dbname, const DBOptions& db_options,
  759. const std::vector<ColumnFamilyDescriptor>& column_families) {
  760. ColumnFamilyOptions default_cf_opts;
  761. Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
  762. if (!status.ok()) {
  763. return status;
  764. }
  765. Repairer repairer(dbname, db_options, column_families, default_cf_opts,
  766. ColumnFamilyOptions() /* unknown_cf_opts */,
  767. false /* create_unknown_cfs */);
  768. status = repairer.Run();
  769. if (status.ok()) {
  770. status = repairer.Close();
  771. }
  772. return status;
  773. }
  774. Status RepairDB(const std::string& dbname, const DBOptions& db_options,
  775. const std::vector<ColumnFamilyDescriptor>& column_families,
  776. const ColumnFamilyOptions& unknown_cf_opts) {
  777. ColumnFamilyOptions default_cf_opts;
  778. Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
  779. if (!status.ok()) {
  780. return status;
  781. }
  782. Repairer repairer(dbname, db_options, column_families, default_cf_opts,
  783. unknown_cf_opts, true /* create_unknown_cfs */);
  784. status = repairer.Run();
  785. if (status.ok()) {
  786. status = repairer.Close();
  787. }
  788. return status;
  789. }
  790. Status RepairDB(const std::string& dbname, const Options& options) {
  791. Options opts(options);
  792. DBOptions db_options(opts);
  793. ColumnFamilyOptions cf_options(opts);
  794. Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */,
  795. cf_options /* unknown_cf_opts */,
  796. true /* create_unknown_cfs */);
  797. Status status = repairer.Run();
  798. if (status.ok()) {
  799. status = repairer.Close();
  800. }
  801. return status;
  802. }
  803. } // namespace ROCKSDB_NAMESPACE