db_impl_secondary.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <string>
  7. #include <vector>
  8. #include "db/db_impl/db_impl.h"
  9. #include "logging/logging.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. // A wrapper class to hold log reader, log reporter, log status.
  12. class LogReaderContainer {
  13. public:
  14. LogReaderContainer()
  15. : reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
  16. LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
  17. std::string fname,
  18. std::unique_ptr<SequentialFileReader>&& file_reader,
  19. uint64_t log_number) {
  20. LogReporter* reporter = new LogReporter();
  21. status_ = new Status();
  22. reporter->env = env;
  23. reporter->info_log = info_log.get();
  24. reporter->fname = std::move(fname);
  25. reporter->status = status_;
  26. reporter_ = reporter;
  27. // We intentially make log::Reader do checksumming even if
  28. // paranoid_checks==false so that corruptions cause entire commits
  29. // to be skipped instead of propagating bad information (like overly
  30. // large sequence numbers).
  31. reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
  32. reporter, true /*checksum*/,
  33. log_number);
  34. }
  35. log::FragmentBufferedReader* reader_;
  36. log::Reader::Reporter* reporter_;
  37. Status* status_;
  38. ~LogReaderContainer() {
  39. delete reader_;
  40. delete reporter_;
  41. delete status_;
  42. }
  43. private:
  44. struct LogReporter : public log::Reader::Reporter {
  45. Env* env;
  46. Logger* info_log;
  47. std::string fname;
  48. Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
  49. void Corruption(size_t bytes, const Status& s,
  50. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  51. ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
  52. (this->status == nullptr ? "(ignoring error) " : ""),
  53. fname.c_str(), static_cast<int>(bytes),
  54. s.ToString().c_str());
  55. if (this->status != nullptr && this->status->ok()) {
  56. *this->status = s;
  57. }
  58. }
  59. };
  60. };
  61. // The secondary instance shares access to the storage as the primary.
  62. // The secondary is able to read and replay changes described in both the
  63. // MANIFEST and the WAL files without coordination with the primary.
  64. // The secondary instance can be opened using `DB::OpenAsSecondary`. After
  65. // that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best
  66. // effort attempts to catch up with the primary.
  67. // TODO: Share common structure with CompactedDBImpl and DBImplReadOnly
  68. class DBImplSecondary : public DBImpl {
  69. public:
  70. DBImplSecondary(const DBOptions& options, const std::string& dbname,
  71. std::string secondary_path);
  72. ~DBImplSecondary() override;
  73. // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_
  74. // and log_readers_ to facilitate future operations.
  75. Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
  76. bool read_only, bool error_if_wal_file_exists,
  77. bool error_if_data_exists_in_wals, bool is_retry = false,
  78. uint64_t* = nullptr, RecoveryContext* recovery_ctx = nullptr,
  79. bool* can_retry = nullptr) override;
  80. // Can return IOError due to files being deleted by the primary. To avoid
  81. // IOError in this case, application can coordinate between primary and
  82. // secondaries so that primary will not delete files that are currently being
  83. // used by the secondaries. The application can also provide a custom FS/Env
  84. // implementation so that files will remain present until all primary and
  85. // secondaries indicate that they can be deleted. As a partial hacky
  86. // workaround, the secondaries can be opened with `max_open_files=-1` so that
  87. // it eagerly keeps all talbe files open and is able to access the contents of
  88. // deleted files via prior open fd.
  89. using DBImpl::GetImpl;
  90. Status GetImpl(const ReadOptions& options, const Slice& key,
  91. GetImplOptions& get_impl_options) override;
  92. using DBImpl::NewIterator;
  93. // Operations on the created iterators can return IOError due to files being
  94. // deleted by the primary. To avoid IOError in this case, application can
  95. // coordinate between primary and secondaries so that primary will not delete
  96. // files that are currently being used by the secondaries. The application can
  97. // also provide a custom FS/Env implementation so that files will remain
  98. // present until all primary and secondaries indicate that they can be
  99. // deleted. As a partial hacky workaround, the secondaries can be opened with
  100. // `max_open_files=-1` so that it eagerly keeps all talbe files open and is
  101. // able to access the contents of deleted files via prior open fd.
  102. Iterator* NewIterator(const ReadOptions& _read_options,
  103. ColumnFamilyHandle* column_family) override;
  104. ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
  105. ColumnFamilyHandleImpl* cfh,
  106. SuperVersion* sv, SequenceNumber snapshot,
  107. ReadCallback* read_callback,
  108. bool expose_blob_index = false,
  109. bool allow_refresh = true);
  110. Status NewIterators(const ReadOptions& _read_options,
  111. const std::vector<ColumnFamilyHandle*>& column_families,
  112. std::vector<Iterator*>* iterators) override;
  113. using DBImpl::Put;
  114. Status Put(const WriteOptions& /*options*/,
  115. ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
  116. const Slice& /*value*/) override {
  117. return Status::NotSupported("Not supported operation in secondary mode.");
  118. }
  119. using DBImpl::PutEntity;
  120. Status PutEntity(const WriteOptions& /* options */,
  121. ColumnFamilyHandle* /* column_family */,
  122. const Slice& /* key */,
  123. const WideColumns& /* columns */) override {
  124. return Status::NotSupported("Not supported operation in secondary mode.");
  125. }
  126. Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
  127. const AttributeGroups& /* attribute_groups */) override {
  128. return Status::NotSupported("Not supported operation in secondary mode.");
  129. }
  130. using DBImpl::Merge;
  131. Status Merge(const WriteOptions& /*options*/,
  132. ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
  133. const Slice& /*value*/) override {
  134. return Status::NotSupported("Not supported operation in secondary mode.");
  135. }
  136. using DBImpl::Delete;
  137. Status Delete(const WriteOptions& /*options*/,
  138. ColumnFamilyHandle* /*column_family*/,
  139. const Slice& /*key*/) override {
  140. return Status::NotSupported("Not supported operation in secondary mode.");
  141. }
  142. using DBImpl::SingleDelete;
  143. Status SingleDelete(const WriteOptions& /*options*/,
  144. ColumnFamilyHandle* /*column_family*/,
  145. const Slice& /*key*/) override {
  146. return Status::NotSupported("Not supported operation in secondary mode.");
  147. }
  148. Status Write(const WriteOptions& /*options*/,
  149. WriteBatch* /*updates*/) override {
  150. return Status::NotSupported("Not supported operation in secondary mode.");
  151. }
  152. using DBImpl::CompactRange;
  153. Status CompactRange(const CompactRangeOptions& /*options*/,
  154. ColumnFamilyHandle* /*column_family*/,
  155. const Slice* /*begin*/, const Slice* /*end*/) override {
  156. return Status::NotSupported("Not supported operation in secondary mode.");
  157. }
  158. using DBImpl::CompactFiles;
  159. Status CompactFiles(
  160. const CompactionOptions& /*compact_options*/,
  161. ColumnFamilyHandle* /*column_family*/,
  162. const std::vector<std::string>& /*input_file_names*/,
  163. const int /*output_level*/, const int /*output_path_id*/ = -1,
  164. std::vector<std::string>* const /*output_file_names*/ = nullptr,
  165. CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
  166. return Status::NotSupported("Not supported operation in secondary mode.");
  167. }
  168. Status DisableFileDeletions() override {
  169. return Status::NotSupported("Not supported operation in secondary mode.");
  170. }
  171. Status EnableFileDeletions() override {
  172. return Status::NotSupported("Not supported operation in secondary mode.");
  173. }
  174. Status GetLiveFiles(std::vector<std::string>&,
  175. uint64_t* /*manifest_file_size*/,
  176. bool /*flush_memtable*/ = true) override {
  177. return Status::NotSupported("Not supported operation in secondary mode.");
  178. }
  179. using DBImpl::Flush;
  180. Status Flush(const FlushOptions& /*options*/,
  181. ColumnFamilyHandle* /*column_family*/) override {
  182. return Status::NotSupported("Not supported operation in secondary mode.");
  183. }
  184. using DBImpl::SetDBOptions;
  185. Status SetDBOptions(const std::unordered_map<std::string, std::string>&
  186. /*options_map*/) override {
  187. // Currently not supported because changing certain options may cause
  188. // flush/compaction.
  189. return Status::NotSupported("Not supported operation in secondary mode.");
  190. }
  191. using DBImpl::SetOptions;
  192. Status SetOptions(
  193. ColumnFamilyHandle* /*cfd*/,
  194. const std::unordered_map<std::string, std::string>& /*options_map*/)
  195. override {
  196. // Currently not supported because changing certain options may cause
  197. // flush/compaction and/or write to MANIFEST.
  198. return Status::NotSupported("Not supported operation in secondary mode.");
  199. }
  200. using DBImpl::SyncWAL;
  201. Status SyncWAL() override {
  202. return Status::NotSupported("Not supported operation in secondary mode.");
  203. }
  204. using DB::IngestExternalFile;
  205. Status IngestExternalFile(
  206. ColumnFamilyHandle* /*column_family*/,
  207. const std::vector<std::string>& /*external_files*/,
  208. const IngestExternalFileOptions& /*ingestion_options*/) override {
  209. return Status::NotSupported("Not supported operation in secondary mode.");
  210. }
  211. // Try to catch up with the primary by reading as much as possible from the
  212. // log files until there is nothing more to read or encounters an error. If
  213. // the amount of information in the log files to process is huge, this
  214. // method can take long time due to all the I/O and CPU costs.
  215. Status TryCatchUpWithPrimary() override;
  216. // Try to find log reader using log_number from log_readers_ map, initialize
  217. // if it doesn't exist
  218. Status MaybeInitLogReader(uint64_t log_number,
  219. log::FragmentBufferedReader** log_reader);
  220. #ifndef NDEBUG
  221. Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options,
  222. ColumnFamilyHandle* cfh,
  223. const CompactionServiceInput& input,
  224. CompactionServiceResult* result) {
  225. return CompactWithoutInstallation(options, cfh, input, result);
  226. }
  227. #endif // NDEBUG
  228. protected:
  229. Status FlushForGetLiveFiles() override {
  230. // No-op for read-only DB
  231. return Status::OK();
  232. }
  233. bool OwnTablesAndLogs() const override {
  234. // Currently, the secondary instance does not own the database files. It
  235. // simply opens the files of the primary instance and tracks their file
  236. // descriptors until they become obsolete. In the future, the secondary may
  237. // create links to database files. OwnTablesAndLogs will return true then.
  238. return false;
  239. }
  240. std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
  241. std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
  242. std::unique_ptr<Status> manifest_reader_status_;
  243. private:
  244. friend class DB;
  245. // No copying allowed
  246. DBImplSecondary(const DBImplSecondary&);
  247. void operator=(const DBImplSecondary&);
  248. using DBImpl::Recover;
  249. Status FindAndRecoverLogFiles(
  250. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  251. JobContext* job_context);
  252. Status FindNewLogNumbers(std::vector<uint64_t>* logs);
  253. // After manifest recovery, replay WALs and refresh log_readers_ if necessary
  254. // REQUIRES: log_numbers are sorted in ascending order
  255. Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
  256. SequenceNumber* next_sequence,
  257. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  258. JobContext* job_context);
  259. // Run compaction without installation, the output files will be placed in the
  260. // secondary DB path. The LSM tree won't be changed, the secondary DB is still
  261. // in read-only mode.
  262. Status CompactWithoutInstallation(const OpenAndCompactOptions& options,
  263. ColumnFamilyHandle* cfh,
  264. const CompactionServiceInput& input,
  265. CompactionServiceResult* result);
  266. private:
  267. // Holds results of compaction progress files and output files from a single
  268. // directory scan
  269. struct CompactionProgressFilesScan {
  270. // The latest (newest) progress file filename
  271. std::optional<std::string> latest_progress_filename;
  272. uint64_t latest_progress_timestamp = 0;
  273. // Older progress file filenames (to be deleted)
  274. autovector<std::string> old_progress_filenames;
  275. // Temporary progress file filenames (to be deleted)
  276. autovector<std::string> temp_progress_filenames;
  277. // All output file numbers - for cleanup optimization
  278. std::vector<uint64_t> table_file_numbers;
  279. bool HasLatestProgressFile() const {
  280. return latest_progress_filename.has_value();
  281. }
  282. void Clear() {
  283. latest_progress_filename.reset();
  284. latest_progress_timestamp = 0;
  285. old_progress_filenames.clear();
  286. temp_progress_filenames.clear();
  287. table_file_numbers.clear();
  288. }
  289. };
  290. Status InitializeCompactionWorkspace(
  291. bool allow_resumption, std::unique_ptr<FSDirectory>* output_dir,
  292. std::unique_ptr<log::Writer>* compaction_progress_writer);
  293. Status PrepareCompactionProgressState();
  294. Status ScanCompactionProgressFiles(CompactionProgressFilesScan* scan_result);
  295. Status DeleteCompactionProgressFiles(
  296. const std::vector<std::string>& filenames);
  297. Status CleanupOldAndTemporaryCompactionProgressFiles(
  298. bool preserve_latest, const CompactionProgressFilesScan& scan_result);
  299. Status LoadCompactionProgressAndCleanupExtraOutputFiles(
  300. const std::string& compaction_progress_file_path,
  301. const CompactionProgressFilesScan& scan_result);
  302. Status ParseCompactionProgressFile(
  303. const std::string& compaction_progress_file_path,
  304. CompactionProgress* compaction_progress);
  305. Status HandleInvalidOrNoCompactionProgress(
  306. const std::optional<std::string>& compaction_progress_file_path,
  307. const CompactionProgressFilesScan& scan_result);
  308. Status CleanupPhysicalCompactionOutputFiles(
  309. bool preserve_tracked_files,
  310. const CompactionProgressFilesScan& scan_result);
  311. Status FinalizeCompactionProgressWriter(
  312. std::unique_ptr<log::Writer>* compaction_progress_writer);
  313. Status CreateCompactionProgressWriter(
  314. const std::string& file_path,
  315. std::unique_ptr<log::Writer>* compaction_progress_writer);
  316. Status PersistInitialCompactionProgress(
  317. log::Writer* compaction_progress_writer,
  318. const CompactionProgress& compaction_progress);
  319. Status RenameCompactionProgressFile(const std::string& temp_file_path,
  320. std::string* final_file_path);
  321. Status HandleCompactionProgressWriterCreationFailure(
  322. const std::string& temp_file_path, const std::string& final_file_path,
  323. std::unique_ptr<log::Writer>* compaction_progress_writer);
  324. uint64_t CalculateResumedCompactionBytes(
  325. const CompactionProgress& compaction_progress) const;
  326. // Cache log readers for each log number, used for continue WAL replay
  327. // after recovery
  328. std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
  329. // Current WAL number replayed for each column family.
  330. std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
  331. const std::string secondary_path_;
  332. CompactionProgress compaction_progress_;
  333. };
  334. } // namespace ROCKSDB_NAMESPACE