db_impl_secondary.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/db_impl/db_impl_secondary.h"
  6. #include <cinttypes>
  7. #include "db/arena_wrapped_db_iter.h"
  8. #include "db/merge_context.h"
  9. #include "logging/auto_roll_logger.h"
  10. #include "monitoring/perf_context_imp.h"
  11. #include "util/cast_util.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. #ifndef ROCKSDB_LITE
  14. DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
  15. const std::string& dbname)
  16. : DBImpl(db_options, dbname) {
  17. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  18. "Opening the db in secondary mode");
  19. LogFlush(immutable_db_options_.info_log);
  20. }
  21. DBImplSecondary::~DBImplSecondary() {}
  22. Status DBImplSecondary::Recover(
  23. const std::vector<ColumnFamilyDescriptor>& column_families,
  24. bool /*readonly*/, bool /*error_if_log_file_exist*/,
  25. bool /*error_if_data_exists_in_logs*/, uint64_t*) {
  26. mutex_.AssertHeld();
  27. JobContext job_context(0);
  28. Status s;
  29. s = static_cast<ReactiveVersionSet*>(versions_.get())
  30. ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
  31. &manifest_reader_status_);
  32. if (!s.ok()) {
  33. return s;
  34. }
  35. if (immutable_db_options_.paranoid_checks && s.ok()) {
  36. s = CheckConsistency();
  37. }
  38. // Initial max_total_in_memory_state_ before recovery logs.
  39. max_total_in_memory_state_ = 0;
  40. for (auto cfd : *versions_->GetColumnFamilySet()) {
  41. auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
  42. max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
  43. mutable_cf_options->max_write_buffer_number;
  44. }
  45. if (s.ok()) {
  46. default_cf_handle_ = new ColumnFamilyHandleImpl(
  47. versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
  48. default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
  49. single_column_family_mode_ =
  50. versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
  51. std::unordered_set<ColumnFamilyData*> cfds_changed;
  52. s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
  53. }
  54. if (s.IsPathNotFound()) {
  55. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  56. "Secondary tries to read WAL, but WAL file(s) have already "
  57. "been purged by primary.");
  58. s = Status::OK();
  59. }
  60. // TODO: update options_file_number_ needed?
  61. job_context.Clean();
  62. return s;
  63. }
  64. // find new WAL and apply them in order to the secondary instance
  65. Status DBImplSecondary::FindAndRecoverLogFiles(
  66. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  67. JobContext* job_context) {
  68. assert(nullptr != cfds_changed);
  69. assert(nullptr != job_context);
  70. Status s;
  71. std::vector<uint64_t> logs;
  72. s = FindNewLogNumbers(&logs);
  73. if (s.ok() && !logs.empty()) {
  74. SequenceNumber next_sequence(kMaxSequenceNumber);
  75. s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
  76. }
  77. return s;
  78. }
  79. // List wal_dir and find all new WALs, return these log numbers
  80. Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
  81. assert(logs != nullptr);
  82. std::vector<std::string> filenames;
  83. Status s;
  84. s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
  85. if (s.IsNotFound()) {
  86. return Status::InvalidArgument("Failed to open wal_dir",
  87. immutable_db_options_.wal_dir);
  88. } else if (!s.ok()) {
  89. return s;
  90. }
  91. // if log_readers_ is non-empty, it means we have applied all logs with log
  92. // numbers smaller than the smallest log in log_readers_, so there is no
  93. // need to pass these logs to RecoverLogFiles
  94. uint64_t log_number_min = 0;
  95. if (!log_readers_.empty()) {
  96. log_number_min = log_readers_.begin()->first;
  97. }
  98. for (size_t i = 0; i < filenames.size(); i++) {
  99. uint64_t number;
  100. FileType type;
  101. if (ParseFileName(filenames[i], &number, &type) && type == kLogFile &&
  102. number >= log_number_min) {
  103. logs->push_back(number);
  104. }
  105. }
  106. // Recover logs in the order that they were generated
  107. if (!logs->empty()) {
  108. std::sort(logs->begin(), logs->end());
  109. }
  110. return s;
  111. }
  112. Status DBImplSecondary::MaybeInitLogReader(
  113. uint64_t log_number, log::FragmentBufferedReader** log_reader) {
  114. auto iter = log_readers_.find(log_number);
  115. // make sure the log file is still present
  116. if (iter == log_readers_.end() ||
  117. iter->second->reader_->GetLogNumber() != log_number) {
  118. // delete the obsolete log reader if log number mismatch
  119. if (iter != log_readers_.end()) {
  120. log_readers_.erase(iter);
  121. }
  122. // initialize log reader from log_number
  123. // TODO: min_log_number_to_keep_2pc check needed?
  124. // Open the log file
  125. std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
  126. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  127. "Recovering log #%" PRIu64 " mode %d", log_number,
  128. static_cast<int>(immutable_db_options_.wal_recovery_mode));
  129. std::unique_ptr<SequentialFileReader> file_reader;
  130. {
  131. std::unique_ptr<FSSequentialFile> file;
  132. Status status = fs_->NewSequentialFile(
  133. fname, fs_->OptimizeForLogRead(file_options_), &file,
  134. nullptr);
  135. if (!status.ok()) {
  136. *log_reader = nullptr;
  137. return status;
  138. }
  139. file_reader.reset(new SequentialFileReader(
  140. std::move(file), fname, immutable_db_options_.log_readahead_size));
  141. }
  142. // Create the log reader.
  143. LogReaderContainer* log_reader_container = new LogReaderContainer(
  144. env_, immutable_db_options_.info_log, std::move(fname),
  145. std::move(file_reader), log_number);
  146. log_readers_.insert(std::make_pair(
  147. log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
  148. }
  149. iter = log_readers_.find(log_number);
  150. assert(iter != log_readers_.end());
  151. *log_reader = iter->second->reader_;
  152. return Status::OK();
  153. }
  154. // After manifest recovery, replay WALs and refresh log_readers_ if necessary
  155. // REQUIRES: log_numbers are sorted in ascending order
  156. Status DBImplSecondary::RecoverLogFiles(
  157. const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
  158. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  159. JobContext* job_context) {
  160. assert(nullptr != cfds_changed);
  161. assert(nullptr != job_context);
  162. mutex_.AssertHeld();
  163. Status status;
  164. for (auto log_number : log_numbers) {
  165. log::FragmentBufferedReader* reader = nullptr;
  166. status = MaybeInitLogReader(log_number, &reader);
  167. if (!status.ok()) {
  168. return status;
  169. }
  170. assert(reader != nullptr);
  171. }
  172. for (auto log_number : log_numbers) {
  173. auto it = log_readers_.find(log_number);
  174. assert(it != log_readers_.end());
  175. log::FragmentBufferedReader* reader = it->second->reader_;
  176. // Manually update the file number allocation counter in VersionSet.
  177. versions_->MarkFileNumberUsed(log_number);
  178. // Determine if we should tolerate incomplete records at the tail end of the
  179. // Read all the records and add to a memtable
  180. std::string scratch;
  181. Slice record;
  182. WriteBatch batch;
  183. while (reader->ReadRecord(&record, &scratch,
  184. immutable_db_options_.wal_recovery_mode) &&
  185. status.ok()) {
  186. if (record.size() < WriteBatchInternal::kHeader) {
  187. reader->GetReporter()->Corruption(
  188. record.size(), Status::Corruption("log record too small"));
  189. continue;
  190. }
  191. WriteBatchInternal::SetContents(&batch, record);
  192. SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
  193. std::vector<uint32_t> column_family_ids;
  194. status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
  195. if (status.ok()) {
  196. for (const auto id : column_family_ids) {
  197. ColumnFamilyData* cfd =
  198. versions_->GetColumnFamilySet()->GetColumnFamily(id);
  199. if (cfd == nullptr) {
  200. continue;
  201. }
  202. if (cfds_changed->count(cfd) == 0) {
  203. cfds_changed->insert(cfd);
  204. }
  205. const std::vector<FileMetaData*>& l0_files =
  206. cfd->current()->storage_info()->LevelFiles(0);
  207. SequenceNumber seq =
  208. l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
  209. // If the write batch's sequence number is smaller than the last
  210. // sequence number of the largest sequence persisted for this column
  211. // family, then its data must reside in an SST that has already been
  212. // added in the prior MANIFEST replay.
  213. if (seq_of_batch <= seq) {
  214. continue;
  215. }
  216. auto curr_log_num = port::kMaxUint64;
  217. if (cfd_to_current_log_.count(cfd) > 0) {
  218. curr_log_num = cfd_to_current_log_[cfd];
  219. }
  220. // If the active memtable contains records added by replaying an
  221. // earlier WAL, then we need to seal the memtable, add it to the
  222. // immutable memtable list and create a new active memtable.
  223. if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
  224. curr_log_num != log_number)) {
  225. const MutableCFOptions mutable_cf_options =
  226. *cfd->GetLatestMutableCFOptions();
  227. MemTable* new_mem =
  228. cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
  229. cfd->mem()->SetNextLogNumber(log_number);
  230. cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
  231. new_mem->Ref();
  232. cfd->SetMemtable(new_mem);
  233. }
  234. }
  235. bool has_valid_writes = false;
  236. status = WriteBatchInternal::InsertInto(
  237. &batch, column_family_memtables_.get(),
  238. nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
  239. true, log_number, this, false /* concurrent_memtable_writes */,
  240. next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
  241. }
  242. // If column family was not found, it might mean that the WAL write
  243. // batch references to the column family that was dropped after the
  244. // insert. We don't want to fail the whole write batch in that case --
  245. // we just ignore the update.
  246. // That's why we set ignore missing column families to true
  247. // passing null flush_scheduler will disable memtable flushing which is
  248. // needed for secondary instances
  249. if (status.ok()) {
  250. for (const auto id : column_family_ids) {
  251. ColumnFamilyData* cfd =
  252. versions_->GetColumnFamilySet()->GetColumnFamily(id);
  253. if (cfd == nullptr) {
  254. continue;
  255. }
  256. std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
  257. cfd_to_current_log_.find(cfd);
  258. if (iter == cfd_to_current_log_.end()) {
  259. cfd_to_current_log_.insert({cfd, log_number});
  260. } else if (log_number > iter->second) {
  261. iter->second = log_number;
  262. }
  263. }
  264. auto last_sequence = *next_sequence - 1;
  265. if ((*next_sequence != kMaxSequenceNumber) &&
  266. (versions_->LastSequence() <= last_sequence)) {
  267. versions_->SetLastAllocatedSequence(last_sequence);
  268. versions_->SetLastPublishedSequence(last_sequence);
  269. versions_->SetLastSequence(last_sequence);
  270. }
  271. } else {
  272. // We are treating this as a failure while reading since we read valid
  273. // blocks that do not form coherent data
  274. reader->GetReporter()->Corruption(record.size(), status);
  275. }
  276. }
  277. if (!status.ok()) {
  278. return status;
  279. }
  280. }
  281. // remove logreaders from map after successfully recovering the WAL
  282. if (log_readers_.size() > 1) {
  283. auto erase_iter = log_readers_.begin();
  284. std::advance(erase_iter, log_readers_.size() - 1);
  285. log_readers_.erase(log_readers_.begin(), erase_iter);
  286. }
  287. return status;
  288. }
  289. // Implementation of the DB interface
  290. Status DBImplSecondary::Get(const ReadOptions& read_options,
  291. ColumnFamilyHandle* column_family, const Slice& key,
  292. PinnableSlice* value) {
  293. return GetImpl(read_options, column_family, key, value);
  294. }
  295. Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
  296. ColumnFamilyHandle* column_family,
  297. const Slice& key, PinnableSlice* pinnable_val) {
  298. assert(pinnable_val != nullptr);
  299. PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
  300. StopWatch sw(env_, stats_, DB_GET);
  301. PERF_TIMER_GUARD(get_snapshot_time);
  302. auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
  303. ColumnFamilyData* cfd = cfh->cfd();
  304. if (tracer_) {
  305. InstrumentedMutexLock lock(&trace_mutex_);
  306. if (tracer_) {
  307. tracer_->Get(column_family, key);
  308. }
  309. }
  310. // Acquire SuperVersion
  311. SuperVersion* super_version = GetAndRefSuperVersion(cfd);
  312. SequenceNumber snapshot = versions_->LastSequence();
  313. MergeContext merge_context;
  314. SequenceNumber max_covering_tombstone_seq = 0;
  315. Status s;
  316. LookupKey lkey(key, snapshot);
  317. PERF_TIMER_STOP(get_snapshot_time);
  318. bool done = false;
  319. if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
  320. &max_covering_tombstone_seq, read_options)) {
  321. done = true;
  322. pinnable_val->PinSelf();
  323. RecordTick(stats_, MEMTABLE_HIT);
  324. } else if ((s.ok() || s.IsMergeInProgress()) &&
  325. super_version->imm->Get(
  326. lkey, pinnable_val->GetSelf(), &s, &merge_context,
  327. &max_covering_tombstone_seq, read_options)) {
  328. done = true;
  329. pinnable_val->PinSelf();
  330. RecordTick(stats_, MEMTABLE_HIT);
  331. }
  332. if (!done && !s.ok() && !s.IsMergeInProgress()) {
  333. ReturnAndCleanupSuperVersion(cfd, super_version);
  334. return s;
  335. }
  336. if (!done) {
  337. PERF_TIMER_GUARD(get_from_output_files_time);
  338. super_version->current->Get(read_options, lkey, pinnable_val, &s,
  339. &merge_context, &max_covering_tombstone_seq);
  340. RecordTick(stats_, MEMTABLE_MISS);
  341. }
  342. {
  343. PERF_TIMER_GUARD(get_post_process_time);
  344. ReturnAndCleanupSuperVersion(cfd, super_version);
  345. RecordTick(stats_, NUMBER_KEYS_READ);
  346. size_t size = pinnable_val->size();
  347. RecordTick(stats_, BYTES_READ, size);
  348. RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
  349. PERF_COUNTER_ADD(get_read_bytes, size);
  350. }
  351. return s;
  352. }
  353. Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
  354. ColumnFamilyHandle* column_family) {
  355. if (read_options.managed) {
  356. return NewErrorIterator(
  357. Status::NotSupported("Managed iterator is not supported anymore."));
  358. }
  359. if (read_options.read_tier == kPersistedTier) {
  360. return NewErrorIterator(Status::NotSupported(
  361. "ReadTier::kPersistedData is not yet supported in iterators."));
  362. }
  363. Iterator* result = nullptr;
  364. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  365. auto cfd = cfh->cfd();
  366. ReadCallback* read_callback = nullptr; // No read callback provided.
  367. if (read_options.tailing) {
  368. return NewErrorIterator(Status::NotSupported(
  369. "tailing iterator not supported in secondary mode"));
  370. } else if (read_options.snapshot != nullptr) {
  371. // TODO (yanqin) support snapshot.
  372. return NewErrorIterator(
  373. Status::NotSupported("snapshot not supported in secondary mode"));
  374. } else {
  375. auto snapshot = versions_->LastSequence();
  376. result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
  377. }
  378. return result;
  379. }
  380. ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
  381. const ReadOptions& read_options, ColumnFamilyData* cfd,
  382. SequenceNumber snapshot, ReadCallback* read_callback) {
  383. assert(nullptr != cfd);
  384. SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
  385. auto db_iter = NewArenaWrappedDbIterator(
  386. env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
  387. snapshot,
  388. super_version->mutable_cf_options.max_sequential_skip_in_iterations,
  389. super_version->version_number, read_callback);
  390. auto internal_iter =
  391. NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
  392. db_iter->GetRangeDelAggregator(), snapshot);
  393. db_iter->SetIterUnderDBIter(internal_iter);
  394. return db_iter;
  395. }
  396. Status DBImplSecondary::NewIterators(
  397. const ReadOptions& read_options,
  398. const std::vector<ColumnFamilyHandle*>& column_families,
  399. std::vector<Iterator*>* iterators) {
  400. if (read_options.managed) {
  401. return Status::NotSupported("Managed iterator is not supported anymore.");
  402. }
  403. if (read_options.read_tier == kPersistedTier) {
  404. return Status::NotSupported(
  405. "ReadTier::kPersistedData is not yet supported in iterators.");
  406. }
  407. ReadCallback* read_callback = nullptr; // No read callback provided.
  408. if (iterators == nullptr) {
  409. return Status::InvalidArgument("iterators not allowed to be nullptr");
  410. }
  411. iterators->clear();
  412. iterators->reserve(column_families.size());
  413. if (read_options.tailing) {
  414. return Status::NotSupported(
  415. "tailing iterator not supported in secondary mode");
  416. } else if (read_options.snapshot != nullptr) {
  417. // TODO (yanqin) support snapshot.
  418. return Status::NotSupported("snapshot not supported in secondary mode");
  419. } else {
  420. SequenceNumber read_seq = versions_->LastSequence();
  421. for (auto cfh : column_families) {
  422. ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
  423. iterators->push_back(
  424. NewIteratorImpl(read_options, cfd, read_seq, read_callback));
  425. }
  426. }
  427. return Status::OK();
  428. }
  429. Status DBImplSecondary::CheckConsistency() {
  430. mutex_.AssertHeld();
  431. Status s = DBImpl::CheckConsistency();
  432. // If DBImpl::CheckConsistency() which is stricter returns success, then we
  433. // do not need to give a second chance.
  434. if (s.ok()) {
  435. return s;
  436. }
  437. // It's possible that DBImpl::CheckConssitency() can fail because the primary
  438. // may have removed certain files, causing the GetFileSize(name) call to
  439. // fail and returning a PathNotFound. In this case, we take a best-effort
  440. // approach and just proceed.
  441. TEST_SYNC_POINT_CALLBACK(
  442. "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
  443. if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
  444. return Status::OK();
  445. }
  446. std::vector<LiveFileMetaData> metadata;
  447. versions_->GetLiveFilesMetaData(&metadata);
  448. std::string corruption_messages;
  449. for (const auto& md : metadata) {
  450. // md.name has a leading "/".
  451. std::string file_path = md.db_path + md.name;
  452. uint64_t fsize = 0;
  453. s = env_->GetFileSize(file_path, &fsize);
  454. if (!s.ok() &&
  455. (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
  456. s.IsPathNotFound())) {
  457. s = Status::OK();
  458. }
  459. if (!s.ok()) {
  460. corruption_messages +=
  461. "Can't access " + md.name + ": " + s.ToString() + "\n";
  462. }
  463. }
  464. return corruption_messages.empty() ? Status::OK()
  465. : Status::Corruption(corruption_messages);
  466. }
  467. Status DBImplSecondary::TryCatchUpWithPrimary() {
  468. assert(versions_.get() != nullptr);
  469. assert(manifest_reader_.get() != nullptr);
  470. Status s;
  471. // read the manifest and apply new changes to the secondary instance
  472. std::unordered_set<ColumnFamilyData*> cfds_changed;
  473. JobContext job_context(0, true /*create_superversion*/);
  474. {
  475. InstrumentedMutexLock lock_guard(&mutex_);
  476. s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
  477. ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
  478. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
  479. static_cast<uint64_t>(versions_->LastSequence()));
  480. for (ColumnFamilyData* cfd : cfds_changed) {
  481. if (cfd->IsDropped()) {
  482. ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
  483. cfd->GetName().c_str());
  484. continue;
  485. }
  486. VersionStorageInfo::LevelSummaryStorage tmp;
  487. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  488. "[%s] Level summary: %s\n", cfd->GetName().c_str(),
  489. cfd->current()->storage_info()->LevelSummary(&tmp));
  490. }
  491. // list wal_dir to discover new WALs and apply new changes to the secondary
  492. // instance
  493. if (s.ok()) {
  494. s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
  495. }
  496. if (s.IsPathNotFound()) {
  497. ROCKS_LOG_INFO(
  498. immutable_db_options_.info_log,
  499. "Secondary tries to read WAL, but WAL file(s) have already "
  500. "been purged by primary.");
  501. s = Status::OK();
  502. }
  503. if (s.ok()) {
  504. for (auto cfd : cfds_changed) {
  505. cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
  506. &job_context.memtables_to_free);
  507. auto& sv_context = job_context.superversion_contexts.back();
  508. cfd->InstallSuperVersion(&sv_context, &mutex_);
  509. sv_context.NewSuperVersion();
  510. }
  511. }
  512. }
  513. job_context.Clean();
  514. // Cleanup unused, obsolete files.
  515. JobContext purge_files_job_context(0);
  516. {
  517. InstrumentedMutexLock lock_guard(&mutex_);
  518. // Currently, secondary instance does not own the database files, thus it
  519. // is unnecessary for the secondary to force full scan.
  520. FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
  521. }
  522. if (purge_files_job_context.HaveSomethingToDelete()) {
  523. PurgeObsoleteFiles(purge_files_job_context);
  524. }
  525. purge_files_job_context.Clean();
  526. return s;
  527. }
  528. Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
  529. const std::string& secondary_path, DB** dbptr) {
  530. *dbptr = nullptr;
  531. DBOptions db_options(options);
  532. ColumnFamilyOptions cf_options(options);
  533. std::vector<ColumnFamilyDescriptor> column_families;
  534. column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  535. std::vector<ColumnFamilyHandle*> handles;
  536. Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
  537. column_families, &handles, dbptr);
  538. if (s.ok()) {
  539. assert(handles.size() == 1);
  540. delete handles[0];
  541. }
  542. return s;
  543. }
  544. Status DB::OpenAsSecondary(
  545. const DBOptions& db_options, const std::string& dbname,
  546. const std::string& secondary_path,
  547. const std::vector<ColumnFamilyDescriptor>& column_families,
  548. std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
  549. *dbptr = nullptr;
  550. if (db_options.max_open_files != -1) {
  551. // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
  552. // on SST files so that db secondary can still have access to old SSTs
  553. // while primary instance may delete original.
  554. return Status::InvalidArgument("require max_open_files to be -1");
  555. }
  556. DBOptions tmp_opts(db_options);
  557. Status s;
  558. if (nullptr == tmp_opts.info_log) {
  559. s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
  560. if (!s.ok()) {
  561. tmp_opts.info_log = nullptr;
  562. }
  563. }
  564. handles->clear();
  565. DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
  566. impl->versions_.reset(new ReactiveVersionSet(
  567. dbname, &impl->immutable_db_options_, impl->file_options_,
  568. impl->table_cache_.get(), impl->write_buffer_manager_,
  569. &impl->write_controller_));
  570. impl->column_family_memtables_.reset(
  571. new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
  572. impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
  573. impl->mutex_.Lock();
  574. s = impl->Recover(column_families, true, false, false);
  575. if (s.ok()) {
  576. for (auto cf : column_families) {
  577. auto cfd =
  578. impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
  579. if (nullptr == cfd) {
  580. s = Status::InvalidArgument("Column family not found: ", cf.name);
  581. break;
  582. }
  583. handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
  584. }
  585. }
  586. SuperVersionContext sv_context(true /* create_superversion */);
  587. if (s.ok()) {
  588. for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
  589. sv_context.NewSuperVersion();
  590. cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
  591. }
  592. }
  593. impl->mutex_.Unlock();
  594. sv_context.Clean();
  595. if (s.ok()) {
  596. *dbptr = impl;
  597. for (auto h : *handles) {
  598. impl->NewThreadStatusCfInfo(
  599. reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
  600. }
  601. } else {
  602. for (auto h : *handles) {
  603. delete h;
  604. }
  605. handles->clear();
  606. delete impl;
  607. }
  608. return s;
  609. }
  610. #else // !ROCKSDB_LITE
  611. Status DB::OpenAsSecondary(const Options& /*options*/,
  612. const std::string& /*name*/,
  613. const std::string& /*secondary_path*/,
  614. DB** /*dbptr*/) {
  615. return Status::NotSupported("Not supported in ROCKSDB_LITE.");
  616. }
  617. Status DB::OpenAsSecondary(
  618. const DBOptions& /*db_options*/, const std::string& /*dbname*/,
  619. const std::string& /*secondary_path*/,
  620. const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
  621. std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
  622. return Status::NotSupported("Not supported in ROCKSDB_LITE.");
  623. }
  624. #endif // !ROCKSDB_LITE
  625. } // namespace ROCKSDB_NAMESPACE