db_impl_secondary.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <string>
  8. #include <vector>
  9. #include "db/db_impl/db_impl.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. // A wrapper class to hold log reader, log reporter, log status.
  12. class LogReaderContainer {
  13. public:
  14. LogReaderContainer()
  15. : reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
  16. LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
  17. std::string fname,
  18. std::unique_ptr<SequentialFileReader>&& file_reader,
  19. uint64_t log_number) {
  20. LogReporter* reporter = new LogReporter();
  21. status_ = new Status();
  22. reporter->env = env;
  23. reporter->info_log = info_log.get();
  24. reporter->fname = std::move(fname);
  25. reporter->status = status_;
  26. reporter_ = reporter;
  27. // We intentially make log::Reader do checksumming even if
  28. // paranoid_checks==false so that corruptions cause entire commits
  29. // to be skipped instead of propagating bad information (like overly
  30. // large sequence numbers).
  31. reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
  32. reporter, true /*checksum*/,
  33. log_number);
  34. }
  35. log::FragmentBufferedReader* reader_;
  36. log::Reader::Reporter* reporter_;
  37. Status* status_;
  38. ~LogReaderContainer() {
  39. delete reader_;
  40. delete reporter_;
  41. delete status_;
  42. }
  43. private:
  44. struct LogReporter : public log::Reader::Reporter {
  45. Env* env;
  46. Logger* info_log;
  47. std::string fname;
  48. Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
  49. void Corruption(size_t bytes, const Status& s) override {
  50. ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
  51. (this->status == nullptr ? "(ignoring error) " : ""),
  52. fname.c_str(), static_cast<int>(bytes),
  53. s.ToString().c_str());
  54. if (this->status != nullptr && this->status->ok()) {
  55. *this->status = s;
  56. }
  57. }
  58. };
  59. };
  60. // The secondary instance shares access to the storage as the primary.
  61. // The secondary is able to read and replay changes described in both the
  62. // MANIFEST and the WAL files without coordination with the primary.
  63. // The secondary instance can be opened using `DB::OpenAsSecondary`. After
  64. // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best
  65. // effort attempts to catch up with the primary.
  66. class DBImplSecondary : public DBImpl {
  67. public:
  68. DBImplSecondary(const DBOptions& options, const std::string& dbname);
  69. ~DBImplSecondary() override;
  70. // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_
  71. // and log_readers_ to facilitate future operations.
  72. Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
  73. bool read_only, bool error_if_log_file_exist,
  74. bool error_if_data_exists_in_logs,
  75. uint64_t* = nullptr) override;
  76. // Implementations of the DB interface
  77. using DB::Get;
  78. Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
  79. const Slice& key, PinnableSlice* value) override;
  80. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  81. const Slice& key, PinnableSlice* value);
  82. using DBImpl::NewIterator;
  83. Iterator* NewIterator(const ReadOptions&,
  84. ColumnFamilyHandle* column_family) override;
  85. ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
  86. ColumnFamilyData* cfd,
  87. SequenceNumber snapshot,
  88. ReadCallback* read_callback);
  89. Status NewIterators(const ReadOptions& options,
  90. const std::vector<ColumnFamilyHandle*>& column_families,
  91. std::vector<Iterator*>* iterators) override;
  92. using DBImpl::Put;
  93. Status Put(const WriteOptions& /*options*/,
  94. ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
  95. const Slice& /*value*/) override {
  96. return Status::NotSupported("Not supported operation in secondary mode.");
  97. }
  98. using DBImpl::Merge;
  99. Status Merge(const WriteOptions& /*options*/,
  100. ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
  101. const Slice& /*value*/) override {
  102. return Status::NotSupported("Not supported operation in secondary mode.");
  103. }
  104. using DBImpl::Delete;
  105. Status Delete(const WriteOptions& /*options*/,
  106. ColumnFamilyHandle* /*column_family*/,
  107. const Slice& /*key*/) override {
  108. return Status::NotSupported("Not supported operation in secondary mode.");
  109. }
  110. using DBImpl::SingleDelete;
  111. Status SingleDelete(const WriteOptions& /*options*/,
  112. ColumnFamilyHandle* /*column_family*/,
  113. const Slice& /*key*/) override {
  114. return Status::NotSupported("Not supported operation in secondary mode.");
  115. }
  116. Status Write(const WriteOptions& /*options*/,
  117. WriteBatch* /*updates*/) override {
  118. return Status::NotSupported("Not supported operation in secondary mode.");
  119. }
  120. using DBImpl::CompactRange;
  121. Status CompactRange(const CompactRangeOptions& /*options*/,
  122. ColumnFamilyHandle* /*column_family*/,
  123. const Slice* /*begin*/, const Slice* /*end*/) override {
  124. return Status::NotSupported("Not supported operation in secondary mode.");
  125. }
  126. using DBImpl::CompactFiles;
  127. Status CompactFiles(
  128. const CompactionOptions& /*compact_options*/,
  129. ColumnFamilyHandle* /*column_family*/,
  130. const std::vector<std::string>& /*input_file_names*/,
  131. const int /*output_level*/, const int /*output_path_id*/ = -1,
  132. std::vector<std::string>* const /*output_file_names*/ = nullptr,
  133. CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
  134. return Status::NotSupported("Not supported operation in secondary mode.");
  135. }
  136. Status DisableFileDeletions() override {
  137. return Status::NotSupported("Not supported operation in secondary mode.");
  138. }
  139. Status EnableFileDeletions(bool /*force*/) override {
  140. return Status::NotSupported("Not supported operation in secondary mode.");
  141. }
  142. Status GetLiveFiles(std::vector<std::string>&,
  143. uint64_t* /*manifest_file_size*/,
  144. bool /*flush_memtable*/ = true) override {
  145. return Status::NotSupported("Not supported operation in secondary mode.");
  146. }
  147. using DBImpl::Flush;
  148. Status Flush(const FlushOptions& /*options*/,
  149. ColumnFamilyHandle* /*column_family*/) override {
  150. return Status::NotSupported("Not supported operation in secondary mode.");
  151. }
  152. using DBImpl::SetDBOptions;
  153. Status SetDBOptions(const std::unordered_map<std::string, std::string>&
  154. /*options_map*/) override {
  155. // Currently not supported because changing certain options may cause
  156. // flush/compaction.
  157. return Status::NotSupported("Not supported operation in secondary mode.");
  158. }
  159. using DBImpl::SetOptions;
  160. Status SetOptions(
  161. ColumnFamilyHandle* /*cfd*/,
  162. const std::unordered_map<std::string, std::string>& /*options_map*/)
  163. override {
  164. // Currently not supported because changing certain options may cause
  165. // flush/compaction and/or write to MANIFEST.
  166. return Status::NotSupported("Not supported operation in secondary mode.");
  167. }
  168. using DBImpl::SyncWAL;
  169. Status SyncWAL() override {
  170. return Status::NotSupported("Not supported operation in secondary mode.");
  171. }
  172. using DB::IngestExternalFile;
  173. Status IngestExternalFile(
  174. ColumnFamilyHandle* /*column_family*/,
  175. const std::vector<std::string>& /*external_files*/,
  176. const IngestExternalFileOptions& /*ingestion_options*/) override {
  177. return Status::NotSupported("Not supported operation in secondary mode.");
  178. }
  179. // Try to catch up with the primary by reading as much as possible from the
  180. // log files until there is nothing more to read or encounters an error. If
  181. // the amount of information in the log files to process is huge, this
  182. // method can take long time due to all the I/O and CPU costs.
  183. Status TryCatchUpWithPrimary() override;
  184. // Try to find log reader using log_number from log_readers_ map, initialize
  185. // if it doesn't exist
  186. Status MaybeInitLogReader(uint64_t log_number,
  187. log::FragmentBufferedReader** log_reader);
  188. // Check if all live files exist on file system and that their file sizes
  189. // matche to the in-memory records. It is possible that some live files may
  190. // have been deleted by the primary. In this case, CheckConsistency() does
  191. // not flag the missing file as inconsistency.
  192. Status CheckConsistency() override;
  193. protected:
  194. // ColumnFamilyCollector is a write batch handler which does nothing
  195. // except recording unique column family IDs
  196. class ColumnFamilyCollector : public WriteBatch::Handler {
  197. std::unordered_set<uint32_t> column_family_ids_;
  198. Status AddColumnFamilyId(uint32_t column_family_id) {
  199. if (column_family_ids_.find(column_family_id) ==
  200. column_family_ids_.end()) {
  201. column_family_ids_.insert(column_family_id);
  202. }
  203. return Status::OK();
  204. }
  205. public:
  206. explicit ColumnFamilyCollector() {}
  207. ~ColumnFamilyCollector() override {}
  208. Status PutCF(uint32_t column_family_id, const Slice&,
  209. const Slice&) override {
  210. return AddColumnFamilyId(column_family_id);
  211. }
  212. Status DeleteCF(uint32_t column_family_id, const Slice&) override {
  213. return AddColumnFamilyId(column_family_id);
  214. }
  215. Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
  216. return AddColumnFamilyId(column_family_id);
  217. }
  218. Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
  219. const Slice&) override {
  220. return AddColumnFamilyId(column_family_id);
  221. }
  222. Status MergeCF(uint32_t column_family_id, const Slice&,
  223. const Slice&) override {
  224. return AddColumnFamilyId(column_family_id);
  225. }
  226. Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
  227. const Slice&) override {
  228. return AddColumnFamilyId(column_family_id);
  229. }
  230. const std::unordered_set<uint32_t>& column_families() const {
  231. return column_family_ids_;
  232. }
  233. };
  234. Status CollectColumnFamilyIdsFromWriteBatch(
  235. const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
  236. assert(column_family_ids != nullptr);
  237. column_family_ids->clear();
  238. ColumnFamilyCollector handler;
  239. Status s = batch.Iterate(&handler);
  240. if (s.ok()) {
  241. for (const auto& cf : handler.column_families()) {
  242. column_family_ids->push_back(cf);
  243. }
  244. }
  245. return s;
  246. }
  247. bool OwnTablesAndLogs() const override {
  248. // Currently, the secondary instance does not own the database files. It
  249. // simply opens the files of the primary instance and tracks their file
  250. // descriptors until they become obsolete. In the future, the secondary may
  251. // create links to database files. OwnTablesAndLogs will return true then.
  252. return false;
  253. }
  254. private:
  255. friend class DB;
  256. // No copying allowed
  257. DBImplSecondary(const DBImplSecondary&);
  258. void operator=(const DBImplSecondary&);
  259. using DBImpl::Recover;
  260. Status FindAndRecoverLogFiles(
  261. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  262. JobContext* job_context);
  263. Status FindNewLogNumbers(std::vector<uint64_t>* logs);
  264. // After manifest recovery, replay WALs and refresh log_readers_ if necessary
  265. // REQUIRES: log_numbers are sorted in ascending order
  266. Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
  267. SequenceNumber* next_sequence,
  268. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  269. JobContext* job_context);
  270. std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
  271. std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
  272. std::unique_ptr<Status> manifest_reader_status_;
  273. // Cache log readers for each log number, used for continue WAL replay
  274. // after recovery
  275. std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
  276. // Current WAL number replayed for each column family.
  277. std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
  278. };
  279. } // namespace ROCKSDB_NAMESPACE
  280. #endif // !ROCKSDB_LITE