backupable_db.cc 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifndef ROCKSDB_LITE
  10. #include <stdlib.h>
  11. #include <algorithm>
  12. #include <atomic>
  13. #include <cinttypes>
  14. #include <functional>
  15. #include <future>
  16. #include <limits>
  17. #include <map>
  18. #include <mutex>
  19. #include <sstream>
  20. #include <string>
  21. #include <thread>
  22. #include <unordered_map>
  23. #include <unordered_set>
  24. #include <vector>
  25. #include "env/composite_env_wrapper.h"
  26. #include "file/filename.h"
  27. #include "file/sequence_file_reader.h"
  28. #include "file/writable_file_writer.h"
  29. #include "logging/logging.h"
  30. #include "port/port.h"
  31. #include "rocksdb/rate_limiter.h"
  32. #include "rocksdb/transaction_log.h"
  33. #include "rocksdb/utilities/backupable_db.h"
  34. #include "test_util/sync_point.h"
  35. #include "util/channel.h"
  36. #include "util/coding.h"
  37. #include "util/crc32c.h"
  38. #include "util/string_util.h"
  39. #include "utilities/checkpoint/checkpoint_impl.h"
  40. namespace ROCKSDB_NAMESPACE {
  41. void BackupStatistics::IncrementNumberSuccessBackup() {
  42. number_success_backup++;
  43. }
  44. void BackupStatistics::IncrementNumberFailBackup() {
  45. number_fail_backup++;
  46. }
  47. uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  48. return number_success_backup;
  49. }
  50. uint32_t BackupStatistics::GetNumberFailBackup() const {
  51. return number_fail_backup;
  52. }
  53. std::string BackupStatistics::ToString() const {
  54. char result[50];
  55. snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
  56. GetNumberSuccessBackup(), GetNumberFailBackup());
  57. return result;
  58. }
  59. void BackupableDBOptions::Dump(Logger* logger) const {
  60. ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
  61. backup_dir.c_str());
  62. ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
  63. ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
  64. static_cast<int>(share_table_files));
  65. ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
  66. ROCKS_LOG_INFO(logger, " Options.sync: %d",
  67. static_cast<int>(sync));
  68. ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
  69. static_cast<int>(destroy_old_data));
  70. ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
  71. static_cast<int>(backup_log_files));
  72. ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
  73. backup_rate_limit);
  74. ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
  75. restore_rate_limit);
  76. ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
  77. max_background_operations);
  78. }
  79. // -------- BackupEngineImpl class ---------
  80. class BackupEngineImpl : public BackupEngine {
  81. public:
  82. BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
  83. bool read_only = false);
  84. ~BackupEngineImpl() override;
  85. Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
  86. bool flush_before_backup = false,
  87. std::function<void()> progress_callback =
  88. []() {}) override;
  89. Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
  90. Status DeleteBackup(BackupID backup_id) override;
  91. void StopBackup() override {
  92. stop_backup_.store(true, std::memory_order_release);
  93. }
  94. Status GarbageCollect() override;
  95. // The returned BackupInfos are in chronological order, which means the
  96. // latest backup comes last.
  97. void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
  98. void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
  99. Status RestoreDBFromBackup(
  100. BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
  101. const RestoreOptions& restore_options = RestoreOptions()) override;
  102. Status RestoreDBFromLatestBackup(
  103. const std::string& db_dir, const std::string& wal_dir,
  104. const RestoreOptions& restore_options = RestoreOptions()) override {
  105. return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
  106. restore_options);
  107. }
  108. Status VerifyBackup(BackupID backup_id) override;
  109. Status Initialize();
  110. private:
  111. void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
  112. Status DeleteBackupInternal(BackupID backup_id);
  113. // Extends the "result" map with pathname->size mappings for the contents of
  114. // "dir" in "env". Pathnames are prefixed with "dir".
  115. Status InsertPathnameToSizeBytes(
  116. const std::string& dir, Env* env,
  117. std::unordered_map<std::string, uint64_t>* result);
  118. struct FileInfo {
  119. FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
  120. : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
  121. FileInfo(const FileInfo&) = delete;
  122. FileInfo& operator=(const FileInfo&) = delete;
  123. int refs;
  124. const std::string filename;
  125. const uint64_t size;
  126. const uint32_t checksum_value;
  127. };
  128. class BackupMeta {
  129. public:
  130. BackupMeta(
  131. const std::string& meta_filename, const std::string& meta_tmp_filename,
  132. std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
  133. Env* env)
  134. : timestamp_(0),
  135. sequence_number_(0),
  136. size_(0),
  137. meta_filename_(meta_filename),
  138. meta_tmp_filename_(meta_tmp_filename),
  139. file_infos_(file_infos),
  140. env_(env) {}
  141. BackupMeta(const BackupMeta&) = delete;
  142. BackupMeta& operator=(const BackupMeta&) = delete;
  143. ~BackupMeta() {}
  144. void RecordTimestamp() {
  145. env_->GetCurrentTime(&timestamp_);
  146. }
  147. int64_t GetTimestamp() const {
  148. return timestamp_;
  149. }
  150. uint64_t GetSize() const {
  151. return size_;
  152. }
  153. uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
  154. void SetSequenceNumber(uint64_t sequence_number) {
  155. sequence_number_ = sequence_number;
  156. }
  157. uint64_t GetSequenceNumber() {
  158. return sequence_number_;
  159. }
  160. const std::string& GetAppMetadata() const { return app_metadata_; }
  161. void SetAppMetadata(const std::string& app_metadata) {
  162. app_metadata_ = app_metadata;
  163. }
  164. Status AddFile(std::shared_ptr<FileInfo> file_info);
  165. Status Delete(bool delete_meta = true);
  166. bool Empty() {
  167. return files_.empty();
  168. }
  169. std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
  170. auto it = file_infos_->find(filename);
  171. if (it == file_infos_->end())
  172. return nullptr;
  173. return it->second;
  174. }
  175. const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
  176. return files_;
  177. }
  178. // @param abs_path_to_size Pre-fetched file sizes (bytes).
  179. Status LoadFromFile(
  180. const std::string& backup_dir,
  181. const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
  182. Status StoreToFile(bool sync);
  183. std::string GetInfoString() {
  184. std::ostringstream ss;
  185. ss << "Timestamp: " << timestamp_ << std::endl;
  186. char human_size[16];
  187. AppendHumanBytes(size_, human_size, sizeof(human_size));
  188. ss << "Size: " << human_size << std::endl;
  189. ss << "Files:" << std::endl;
  190. for (const auto& file : files_) {
  191. AppendHumanBytes(file->size, human_size, sizeof(human_size));
  192. ss << file->filename << ", size " << human_size << ", refs "
  193. << file->refs << std::endl;
  194. }
  195. return ss.str();
  196. }
  197. private:
  198. int64_t timestamp_;
  199. // sequence number is only approximate, should not be used
  200. // by clients
  201. uint64_t sequence_number_;
  202. uint64_t size_;
  203. std::string app_metadata_;
  204. std::string const meta_filename_;
  205. std::string const meta_tmp_filename_;
  206. // files with relative paths (without "/" prefix!!)
  207. std::vector<std::shared_ptr<FileInfo>> files_;
  208. std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
  209. Env* env_;
  210. static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
  211. }; // BackupMeta
  212. inline std::string GetAbsolutePath(
  213. const std::string &relative_path = "") const {
  214. assert(relative_path.size() == 0 || relative_path[0] != '/');
  215. return options_.backup_dir + "/" + relative_path;
  216. }
  217. inline std::string GetPrivateDirRel() const {
  218. return "private";
  219. }
  220. inline std::string GetSharedChecksumDirRel() const {
  221. return "shared_checksum";
  222. }
  223. inline std::string GetPrivateFileRel(BackupID backup_id,
  224. bool tmp = false,
  225. const std::string& file = "") const {
  226. assert(file.size() == 0 || file[0] != '/');
  227. return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
  228. (tmp ? ".tmp" : "") + "/" + file;
  229. }
  230. inline std::string GetSharedFileRel(const std::string& file = "",
  231. bool tmp = false) const {
  232. assert(file.size() == 0 || file[0] != '/');
  233. return std::string("shared/") + (tmp ? "." : "") + file +
  234. (tmp ? ".tmp" : "");
  235. }
  236. inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
  237. bool tmp = false) const {
  238. assert(file.size() == 0 || file[0] != '/');
  239. return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
  240. (tmp ? ".tmp" : "");
  241. }
  242. inline std::string GetSharedFileWithChecksum(const std::string& file,
  243. const uint32_t checksum_value,
  244. const uint64_t file_size) const {
  245. assert(file.size() == 0 || file[0] != '/');
  246. std::string file_copy = file;
  247. return file_copy.insert(file_copy.find_last_of('.'),
  248. "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) +
  249. "_" + ROCKSDB_NAMESPACE::ToString(file_size));
  250. }
  251. inline std::string GetFileFromChecksumFile(const std::string& file) const {
  252. assert(file.size() == 0 || file[0] != '/');
  253. std::string file_copy = file;
  254. size_t first_underscore = file_copy.find_first_of('_');
  255. return file_copy.erase(first_underscore,
  256. file_copy.find_last_of('.') - first_underscore);
  257. }
  258. inline std::string GetBackupMetaDir() const {
  259. return GetAbsolutePath("meta");
  260. }
  261. inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
  262. return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
  263. ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
  264. }
  265. // If size_limit == 0, there is no size limit, copy everything.
  266. //
  267. // Exactly one of src and contents must be non-empty.
  268. //
  269. // @param src If non-empty, the file is copied from this pathname.
  270. // @param contents If non-empty, the file will be created with these contents.
  271. Status CopyOrCreateFile(const std::string& src, const std::string& dst,
  272. const std::string& contents, Env* src_env,
  273. Env* dst_env, const EnvOptions& src_env_options,
  274. bool sync, RateLimiter* rate_limiter,
  275. uint64_t* size = nullptr,
  276. uint32_t* checksum_value = nullptr,
  277. uint64_t size_limit = 0,
  278. std::function<void()> progress_callback = []() {});
  279. Status CalculateChecksum(const std::string& src, Env* src_env,
  280. const EnvOptions& src_env_options,
  281. uint64_t size_limit, uint32_t* checksum_value);
  282. struct CopyOrCreateResult {
  283. uint64_t size;
  284. uint32_t checksum_value;
  285. Status status;
  286. };
  287. // Exactly one of src_path and contents must be non-empty. If src_path is
  288. // non-empty, the file is copied from this pathname. Otherwise, if contents is
  289. // non-empty, the file will be created at dst_path with these contents.
  290. struct CopyOrCreateWorkItem {
  291. std::string src_path;
  292. std::string dst_path;
  293. std::string contents;
  294. Env* src_env;
  295. Env* dst_env;
  296. EnvOptions src_env_options;
  297. bool sync;
  298. RateLimiter* rate_limiter;
  299. uint64_t size_limit;
  300. std::promise<CopyOrCreateResult> result;
  301. std::function<void()> progress_callback;
  302. CopyOrCreateWorkItem()
  303. : src_path(""),
  304. dst_path(""),
  305. contents(""),
  306. src_env(nullptr),
  307. dst_env(nullptr),
  308. src_env_options(),
  309. sync(false),
  310. rate_limiter(nullptr),
  311. size_limit(0) {}
  312. CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
  313. CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
  314. CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
  315. *this = std::move(o);
  316. }
  317. CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
  318. src_path = std::move(o.src_path);
  319. dst_path = std::move(o.dst_path);
  320. contents = std::move(o.contents);
  321. src_env = o.src_env;
  322. dst_env = o.dst_env;
  323. src_env_options = std::move(o.src_env_options);
  324. sync = o.sync;
  325. rate_limiter = o.rate_limiter;
  326. size_limit = o.size_limit;
  327. result = std::move(o.result);
  328. progress_callback = std::move(o.progress_callback);
  329. return *this;
  330. }
  331. CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
  332. std::string _contents, Env* _src_env, Env* _dst_env,
  333. EnvOptions _src_env_options, bool _sync,
  334. RateLimiter* _rate_limiter, uint64_t _size_limit,
  335. std::function<void()> _progress_callback = []() {})
  336. : src_path(std::move(_src_path)),
  337. dst_path(std::move(_dst_path)),
  338. contents(std::move(_contents)),
  339. src_env(_src_env),
  340. dst_env(_dst_env),
  341. src_env_options(std::move(_src_env_options)),
  342. sync(_sync),
  343. rate_limiter(_rate_limiter),
  344. size_limit(_size_limit),
  345. progress_callback(_progress_callback) {}
  346. };
  347. struct BackupAfterCopyOrCreateWorkItem {
  348. std::future<CopyOrCreateResult> result;
  349. bool shared;
  350. bool needed_to_copy;
  351. Env* backup_env;
  352. std::string dst_path_tmp;
  353. std::string dst_path;
  354. std::string dst_relative;
  355. BackupAfterCopyOrCreateWorkItem()
  356. : shared(false),
  357. needed_to_copy(false),
  358. backup_env(nullptr),
  359. dst_path_tmp(""),
  360. dst_path(""),
  361. dst_relative("") {}
  362. BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
  363. ROCKSDB_NOEXCEPT {
  364. *this = std::move(o);
  365. }
  366. BackupAfterCopyOrCreateWorkItem& operator=(
  367. BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
  368. result = std::move(o.result);
  369. shared = o.shared;
  370. needed_to_copy = o.needed_to_copy;
  371. backup_env = o.backup_env;
  372. dst_path_tmp = std::move(o.dst_path_tmp);
  373. dst_path = std::move(o.dst_path);
  374. dst_relative = std::move(o.dst_relative);
  375. return *this;
  376. }
  377. BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
  378. bool _shared, bool _needed_to_copy,
  379. Env* _backup_env, std::string _dst_path_tmp,
  380. std::string _dst_path,
  381. std::string _dst_relative)
  382. : result(std::move(_result)),
  383. shared(_shared),
  384. needed_to_copy(_needed_to_copy),
  385. backup_env(_backup_env),
  386. dst_path_tmp(std::move(_dst_path_tmp)),
  387. dst_path(std::move(_dst_path)),
  388. dst_relative(std::move(_dst_relative)) {}
  389. };
  390. struct RestoreAfterCopyOrCreateWorkItem {
  391. std::future<CopyOrCreateResult> result;
  392. uint32_t checksum_value;
  393. RestoreAfterCopyOrCreateWorkItem()
  394. : checksum_value(0) {}
  395. RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
  396. uint32_t _checksum_value)
  397. : result(std::move(_result)), checksum_value(_checksum_value) {}
  398. RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
  399. ROCKSDB_NOEXCEPT {
  400. *this = std::move(o);
  401. }
  402. RestoreAfterCopyOrCreateWorkItem& operator=(
  403. RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
  404. result = std::move(o.result);
  405. checksum_value = o.checksum_value;
  406. return *this;
  407. }
  408. };
  409. bool initialized_;
  410. std::mutex byte_report_mutex_;
  411. channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
  412. std::vector<port::Thread> threads_;
  413. // Certain operations like PurgeOldBackups and DeleteBackup will trigger
  414. // automatic GarbageCollect (true) unless we've already done one in this
  415. // session and have not failed to delete backup files since then (false).
  416. bool might_need_garbage_collect_ = true;
  417. // Adds a file to the backup work queue to be copied or created if it doesn't
  418. // already exist.
  419. //
  420. // Exactly one of src_dir and contents must be non-empty.
  421. //
  422. // @param src_dir If non-empty, the file in this directory named fname will be
  423. // copied.
  424. // @param fname Name of destination file and, in case of copy, source file.
  425. // @param contents If non-empty, the file will be created with these contents.
  426. Status AddBackupFileWorkItem(
  427. std::unordered_set<std::string>& live_dst_paths,
  428. std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
  429. BackupID backup_id, bool shared, const std::string& src_dir,
  430. const std::string& fname, // starts with "/"
  431. const EnvOptions& src_env_options, RateLimiter* rate_limiter,
  432. uint64_t size_bytes, uint64_t size_limit = 0,
  433. bool shared_checksum = false,
  434. std::function<void()> progress_callback = []() {},
  435. const std::string& contents = std::string());
  436. // backup state data
  437. BackupID latest_backup_id_;
  438. BackupID latest_valid_backup_id_;
  439. std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
  440. std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
  441. corrupt_backups_;
  442. std::unordered_map<std::string,
  443. std::shared_ptr<FileInfo>> backuped_file_infos_;
  444. std::atomic<bool> stop_backup_;
  445. // options data
  446. BackupableDBOptions options_;
  447. Env* db_env_;
  448. Env* backup_env_;
  449. // directories
  450. std::unique_ptr<Directory> backup_directory_;
  451. std::unique_ptr<Directory> shared_directory_;
  452. std::unique_ptr<Directory> meta_directory_;
  453. std::unique_ptr<Directory> private_directory_;
  454. static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
  455. size_t copy_file_buffer_size_;
  456. bool read_only_;
  457. BackupStatistics backup_statistics_;
  458. static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
  459. };
  460. Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
  461. BackupEngine** backup_engine_ptr) {
  462. std::unique_ptr<BackupEngineImpl> backup_engine(
  463. new BackupEngineImpl(env, options));
  464. auto s = backup_engine->Initialize();
  465. if (!s.ok()) {
  466. *backup_engine_ptr = nullptr;
  467. return s;
  468. }
  469. *backup_engine_ptr = backup_engine.release();
  470. return Status::OK();
  471. }
  472. BackupEngineImpl::BackupEngineImpl(Env* db_env,
  473. const BackupableDBOptions& options,
  474. bool read_only)
  475. : initialized_(false),
  476. latest_backup_id_(0),
  477. latest_valid_backup_id_(0),
  478. stop_backup_(false),
  479. options_(options),
  480. db_env_(db_env),
  481. backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
  482. copy_file_buffer_size_(kDefaultCopyFileBufferSize),
  483. read_only_(read_only) {
  484. if (options_.backup_rate_limiter == nullptr &&
  485. options_.backup_rate_limit > 0) {
  486. options_.backup_rate_limiter.reset(
  487. NewGenericRateLimiter(options_.backup_rate_limit));
  488. }
  489. if (options_.restore_rate_limiter == nullptr &&
  490. options_.restore_rate_limit > 0) {
  491. options_.restore_rate_limiter.reset(
  492. NewGenericRateLimiter(options_.restore_rate_limit));
  493. }
  494. }
  495. BackupEngineImpl::~BackupEngineImpl() {
  496. files_to_copy_or_create_.sendEof();
  497. for (auto& t : threads_) {
  498. t.join();
  499. }
  500. LogFlush(options_.info_log);
  501. }
  502. Status BackupEngineImpl::Initialize() {
  503. assert(!initialized_);
  504. initialized_ = true;
  505. if (read_only_) {
  506. ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
  507. }
  508. options_.Dump(options_.info_log);
  509. if (!read_only_) {
  510. // we might need to clean up from previous crash or I/O errors
  511. might_need_garbage_collect_ = true;
  512. if (options_.max_valid_backups_to_open != port::kMaxInt32) {
  513. options_.max_valid_backups_to_open = port::kMaxInt32;
  514. ROCKS_LOG_WARN(
  515. options_.info_log,
  516. "`max_valid_backups_to_open` is not set to the default value. Ignoring "
  517. "its value since BackupEngine is not read-only.");
  518. }
  519. // gather the list of directories that we need to create
  520. std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
  521. directories;
  522. directories.emplace_back(GetAbsolutePath(), &backup_directory_);
  523. if (options_.share_table_files) {
  524. if (options_.share_files_with_checksum) {
  525. directories.emplace_back(
  526. GetAbsolutePath(GetSharedFileWithChecksumRel()),
  527. &shared_directory_);
  528. } else {
  529. directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
  530. &shared_directory_);
  531. }
  532. }
  533. directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
  534. &private_directory_);
  535. directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
  536. // create all the dirs we need
  537. for (const auto& d : directories) {
  538. auto s = backup_env_->CreateDirIfMissing(d.first);
  539. if (s.ok()) {
  540. s = backup_env_->NewDirectory(d.first, d.second);
  541. }
  542. if (!s.ok()) {
  543. return s;
  544. }
  545. }
  546. }
  547. std::vector<std::string> backup_meta_files;
  548. {
  549. auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
  550. if (s.IsNotFound()) {
  551. return Status::NotFound(GetBackupMetaDir() + " is missing");
  552. } else if (!s.ok()) {
  553. return s;
  554. }
  555. }
  556. // create backups_ structure
  557. for (auto& file : backup_meta_files) {
  558. if (file == "." || file == "..") {
  559. continue;
  560. }
  561. ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
  562. BackupID backup_id = 0;
  563. sscanf(file.c_str(), "%u", &backup_id);
  564. if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
  565. if (!read_only_) {
  566. // invalid file name, delete that
  567. auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
  568. ROCKS_LOG_INFO(options_.info_log,
  569. "Unrecognized meta file %s, deleting -- %s",
  570. file.c_str(), s.ToString().c_str());
  571. }
  572. continue;
  573. }
  574. assert(backups_.find(backup_id) == backups_.end());
  575. backups_.insert(std::make_pair(
  576. backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
  577. GetBackupMetaFile(backup_id, false /* tmp */),
  578. GetBackupMetaFile(backup_id, true /* tmp */),
  579. &backuped_file_infos_, backup_env_))));
  580. }
  581. latest_backup_id_ = 0;
  582. latest_valid_backup_id_ = 0;
  583. if (options_.destroy_old_data) { // Destroy old data
  584. assert(!read_only_);
  585. ROCKS_LOG_INFO(
  586. options_.info_log,
  587. "Backup Engine started with destroy_old_data == true, deleting all "
  588. "backups");
  589. auto s = PurgeOldBackups(0);
  590. if (s.ok()) {
  591. s = GarbageCollect();
  592. }
  593. if (!s.ok()) {
  594. return s;
  595. }
  596. } else { // Load data from storage
  597. std::unordered_map<std::string, uint64_t> abs_path_to_size;
  598. for (const auto& rel_dir :
  599. {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
  600. const auto abs_dir = GetAbsolutePath(rel_dir);
  601. InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
  602. }
  603. // load the backups if any, until valid_backups_to_open of the latest
  604. // non-corrupted backups have been successfully opened.
  605. int valid_backups_to_open = options_.max_valid_backups_to_open;
  606. for (auto backup_iter = backups_.rbegin();
  607. backup_iter != backups_.rend();
  608. ++backup_iter) {
  609. assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
  610. if (latest_backup_id_ == 0) {
  611. latest_backup_id_ = backup_iter->first;
  612. }
  613. if (valid_backups_to_open == 0) {
  614. break;
  615. }
  616. InsertPathnameToSizeBytes(
  617. GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
  618. &abs_path_to_size);
  619. Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
  620. abs_path_to_size);
  621. if (s.IsCorruption()) {
  622. ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
  623. backup_iter->first, s.ToString().c_str());
  624. corrupt_backups_.insert(
  625. std::make_pair(backup_iter->first,
  626. std::make_pair(s, std::move(backup_iter->second))));
  627. } else if (!s.ok()) {
  628. // Distinguish corruption errors from errors in the backup Env.
  629. // Errors in the backup Env (i.e., this code path) will cause Open() to
  630. // fail, whereas corruption errors would not cause Open() failures.
  631. return s;
  632. } else {
  633. ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
  634. backup_iter->first,
  635. backup_iter->second->GetInfoString().c_str());
  636. assert(latest_valid_backup_id_ == 0 ||
  637. latest_valid_backup_id_ > backup_iter->first);
  638. if (latest_valid_backup_id_ == 0) {
  639. latest_valid_backup_id_ = backup_iter->first;
  640. }
  641. --valid_backups_to_open;
  642. }
  643. }
  644. for (const auto& corrupt : corrupt_backups_) {
  645. backups_.erase(backups_.find(corrupt.first));
  646. }
  647. // erase the backups before max_valid_backups_to_open
  648. int num_unopened_backups;
  649. if (options_.max_valid_backups_to_open == 0) {
  650. num_unopened_backups = 0;
  651. } else {
  652. num_unopened_backups =
  653. std::max(0, static_cast<int>(backups_.size()) -
  654. options_.max_valid_backups_to_open);
  655. }
  656. for (int i = 0; i < num_unopened_backups; ++i) {
  657. assert(backups_.begin()->second->Empty());
  658. backups_.erase(backups_.begin());
  659. }
  660. }
  661. ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
  662. ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
  663. latest_valid_backup_id_);
  664. // set up threads perform copies from files_to_copy_or_create_ in the
  665. // background
  666. for (int t = 0; t < options_.max_background_operations; t++) {
  667. threads_.emplace_back([this]() {
  668. #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  669. #if __GLIBC_PREREQ(2, 12)
  670. pthread_setname_np(pthread_self(), "backup_engine");
  671. #endif
  672. #endif
  673. CopyOrCreateWorkItem work_item;
  674. while (files_to_copy_or_create_.read(work_item)) {
  675. CopyOrCreateResult result;
  676. result.status = CopyOrCreateFile(
  677. work_item.src_path, work_item.dst_path, work_item.contents,
  678. work_item.src_env, work_item.dst_env, work_item.src_env_options,
  679. work_item.sync, work_item.rate_limiter, &result.size,
  680. &result.checksum_value, work_item.size_limit,
  681. work_item.progress_callback);
  682. work_item.result.set_value(std::move(result));
  683. }
  684. });
  685. }
  686. ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
  687. return Status::OK();
  688. }
  689. Status BackupEngineImpl::CreateNewBackupWithMetadata(
  690. DB* db, const std::string& app_metadata, bool flush_before_backup,
  691. std::function<void()> progress_callback) {
  692. assert(initialized_);
  693. assert(!read_only_);
  694. if (app_metadata.size() > kMaxAppMetaSize) {
  695. return Status::InvalidArgument("App metadata too large");
  696. }
  697. BackupID new_backup_id = latest_backup_id_ + 1;
  698. assert(backups_.find(new_backup_id) == backups_.end());
  699. auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
  700. Status s = backup_env_->FileExists(private_dir);
  701. if (s.ok()) {
  702. // maybe last backup failed and left partial state behind, clean it up.
  703. // need to do this before updating backups_ such that a private dir
  704. // named after new_backup_id will be cleaned up.
  705. // (If an incomplete new backup is followed by an incomplete delete
  706. // of the latest full backup, then there could be more than one next
  707. // id with a private dir, the last thing to be deleted in delete
  708. // backup, but all will be cleaned up with a GarbageCollect.)
  709. s = GarbageCollect();
  710. } else if (s.IsNotFound()) {
  711. // normal case, the new backup's private dir doesn't exist yet
  712. s = Status::OK();
  713. }
  714. auto ret = backups_.insert(std::make_pair(
  715. new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
  716. GetBackupMetaFile(new_backup_id, false /* tmp */),
  717. GetBackupMetaFile(new_backup_id, true /* tmp */),
  718. &backuped_file_infos_, backup_env_))));
  719. assert(ret.second == true);
  720. auto& new_backup = ret.first->second;
  721. new_backup->RecordTimestamp();
  722. new_backup->SetAppMetadata(app_metadata);
  723. auto start_backup = backup_env_->NowMicros();
  724. ROCKS_LOG_INFO(options_.info_log,
  725. "Started the backup process -- creating backup %u",
  726. new_backup_id);
  727. if (s.ok()) {
  728. s = backup_env_->CreateDir(private_dir);
  729. }
  730. RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
  731. if (rate_limiter) {
  732. copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
  733. }
  734. // A set into which we will insert the dst_paths that are calculated for live
  735. // files and live WAL files.
  736. // This is used to check whether a live files shares a dst_path with another
  737. // live file.
  738. std::unordered_set<std::string> live_dst_paths;
  739. std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
  740. // Add a CopyOrCreateWorkItem to the channel for each live file
  741. db->DisableFileDeletions();
  742. if (s.ok()) {
  743. CheckpointImpl checkpoint(db);
  744. uint64_t sequence_number = 0;
  745. DBOptions db_options = db->GetDBOptions();
  746. EnvOptions src_raw_env_options(db_options);
  747. s = checkpoint.CreateCustomCheckpoint(
  748. db_options,
  749. [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
  750. FileType) {
  751. // custom checkpoint will switch to calling copy_file_cb after it sees
  752. // NotSupported returned from link_file_cb.
  753. return Status::NotSupported();
  754. } /* link_file_cb */,
  755. [&](const std::string& src_dirname, const std::string& fname,
  756. uint64_t size_limit_bytes, FileType type) {
  757. if (type == kLogFile && !options_.backup_log_files) {
  758. return Status::OK();
  759. }
  760. Log(options_.info_log, "add file for backup %s", fname.c_str());
  761. uint64_t size_bytes = 0;
  762. Status st;
  763. if (type == kTableFile) {
  764. st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
  765. }
  766. EnvOptions src_env_options;
  767. switch (type) {
  768. case kLogFile:
  769. src_env_options =
  770. db_env_->OptimizeForLogRead(src_raw_env_options);
  771. break;
  772. case kTableFile:
  773. src_env_options = db_env_->OptimizeForCompactionTableRead(
  774. src_raw_env_options, ImmutableDBOptions(db_options));
  775. break;
  776. case kDescriptorFile:
  777. src_env_options =
  778. db_env_->OptimizeForManifestRead(src_raw_env_options);
  779. break;
  780. default:
  781. // Other backed up files (like options file) are not read by live
  782. // DB, so don't need to worry about avoiding mixing buffered and
  783. // direct I/O. Just use plain defaults.
  784. src_env_options = src_raw_env_options;
  785. break;
  786. }
  787. if (st.ok()) {
  788. st = AddBackupFileWorkItem(
  789. live_dst_paths, backup_items_to_finish, new_backup_id,
  790. options_.share_table_files && type == kTableFile, src_dirname,
  791. fname, src_env_options, rate_limiter, size_bytes,
  792. size_limit_bytes,
  793. options_.share_files_with_checksum && type == kTableFile,
  794. progress_callback);
  795. }
  796. return st;
  797. } /* copy_file_cb */,
  798. [&](const std::string& fname, const std::string& contents, FileType) {
  799. Log(options_.info_log, "add file for backup %s", fname.c_str());
  800. return AddBackupFileWorkItem(
  801. live_dst_paths, backup_items_to_finish, new_backup_id,
  802. false /* shared */, "" /* src_dir */, fname,
  803. EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
  804. 0 /* size_limit */, false /* shared_checksum */,
  805. progress_callback, contents);
  806. } /* create_file_cb */,
  807. &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
  808. if (s.ok()) {
  809. new_backup->SetSequenceNumber(sequence_number);
  810. }
  811. }
  812. ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
  813. Status item_status;
  814. for (auto& item : backup_items_to_finish) {
  815. item.result.wait();
  816. auto result = item.result.get();
  817. item_status = result.status;
  818. if (item_status.ok() && item.shared && item.needed_to_copy) {
  819. item_status = item.backup_env->RenameFile(item.dst_path_tmp,
  820. item.dst_path);
  821. }
  822. if (item_status.ok()) {
  823. item_status = new_backup.get()->AddFile(
  824. std::make_shared<FileInfo>(item.dst_relative,
  825. result.size,
  826. result.checksum_value));
  827. }
  828. if (!item_status.ok()) {
  829. s = item_status;
  830. }
  831. }
  832. // we copied all the files, enable file deletions
  833. db->EnableFileDeletions(false);
  834. auto backup_time = backup_env_->NowMicros() - start_backup;
  835. if (s.ok()) {
  836. // persist the backup metadata on the disk
  837. s = new_backup->StoreToFile(options_.sync);
  838. }
  839. if (s.ok() && options_.sync) {
  840. std::unique_ptr<Directory> backup_private_directory;
  841. backup_env_->NewDirectory(
  842. GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
  843. &backup_private_directory);
  844. if (backup_private_directory != nullptr) {
  845. s = backup_private_directory->Fsync();
  846. }
  847. if (s.ok() && private_directory_ != nullptr) {
  848. s = private_directory_->Fsync();
  849. }
  850. if (s.ok() && meta_directory_ != nullptr) {
  851. s = meta_directory_->Fsync();
  852. }
  853. if (s.ok() && shared_directory_ != nullptr) {
  854. s = shared_directory_->Fsync();
  855. }
  856. if (s.ok() && backup_directory_ != nullptr) {
  857. s = backup_directory_->Fsync();
  858. }
  859. }
  860. if (s.ok()) {
  861. backup_statistics_.IncrementNumberSuccessBackup();
  862. }
  863. if (!s.ok()) {
  864. backup_statistics_.IncrementNumberFailBackup();
  865. // clean all the files we might have created
  866. ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
  867. s.ToString().c_str());
  868. ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
  869. backup_statistics_.ToString().c_str());
  870. // delete files that we might have already written
  871. might_need_garbage_collect_ = true;
  872. DeleteBackup(new_backup_id);
  873. return s;
  874. }
  875. // here we know that we succeeded and installed the new backup
  876. // in the LATEST_BACKUP file
  877. latest_backup_id_ = new_backup_id;
  878. latest_valid_backup_id_ = new_backup_id;
  879. ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
  880. // backup_speed is in byte/second
  881. double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
  882. ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
  883. new_backup->GetNumberFiles());
  884. char human_size[16];
  885. AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
  886. ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
  887. ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
  888. backup_time);
  889. ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  890. ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
  891. backup_statistics_.ToString().c_str());
  892. return s;
  893. }
  894. Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
  895. assert(initialized_);
  896. assert(!read_only_);
  897. // Best effort deletion even with errors
  898. Status overall_status = Status::OK();
  899. ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
  900. num_backups_to_keep);
  901. std::vector<BackupID> to_delete;
  902. auto itr = backups_.begin();
  903. while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
  904. to_delete.push_back(itr->first);
  905. itr++;
  906. }
  907. for (auto backup_id : to_delete) {
  908. auto s = DeleteBackupInternal(backup_id);
  909. if (!s.ok()) {
  910. overall_status = s;
  911. }
  912. }
  913. // Clean up after any incomplete backup deletion, potentially from
  914. // earlier session.
  915. if (might_need_garbage_collect_) {
  916. auto s = GarbageCollect();
  917. if (!s.ok() && overall_status.ok()) {
  918. overall_status = s;
  919. }
  920. }
  921. return overall_status;
  922. }
  923. Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
  924. auto s1 = DeleteBackupInternal(backup_id);
  925. auto s2 = Status::OK();
  926. // Clean up after any incomplete backup deletion, potentially from
  927. // earlier session.
  928. if (might_need_garbage_collect_) {
  929. s2 = GarbageCollect();
  930. }
  931. if (!s1.ok()) {
  932. return s1;
  933. } else {
  934. return s2;
  935. }
  936. }
  937. // Does not auto-GarbageCollect
  938. Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
  939. assert(initialized_);
  940. assert(!read_only_);
  941. ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
  942. auto backup = backups_.find(backup_id);
  943. if (backup != backups_.end()) {
  944. auto s = backup->second->Delete();
  945. if (!s.ok()) {
  946. return s;
  947. }
  948. backups_.erase(backup);
  949. } else {
  950. auto corrupt = corrupt_backups_.find(backup_id);
  951. if (corrupt == corrupt_backups_.end()) {
  952. return Status::NotFound("Backup not found");
  953. }
  954. auto s = corrupt->second.second->Delete();
  955. if (!s.ok()) {
  956. return s;
  957. }
  958. corrupt_backups_.erase(corrupt);
  959. }
  960. // After removing meta file, best effort deletion even with errors.
  961. // (Don't delete other files if we can't delete the meta file right
  962. // now.)
  963. std::vector<std::string> to_delete;
  964. for (auto& itr : backuped_file_infos_) {
  965. if (itr.second->refs == 0) {
  966. Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
  967. ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
  968. s.ToString().c_str());
  969. to_delete.push_back(itr.first);
  970. if (!s.ok()) {
  971. // Trying again later might work
  972. might_need_garbage_collect_ = true;
  973. }
  974. }
  975. }
  976. for (auto& td : to_delete) {
  977. backuped_file_infos_.erase(td);
  978. }
  979. // take care of private dirs -- GarbageCollect() will take care of them
  980. // if they are not empty
  981. std::string private_dir = GetPrivateFileRel(backup_id);
  982. Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
  983. ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
  984. private_dir.c_str(), s.ToString().c_str());
  985. if (!s.ok()) {
  986. // Full gc or trying again later might work
  987. might_need_garbage_collect_ = true;
  988. }
  989. return Status::OK();
  990. }
  991. void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
  992. assert(initialized_);
  993. backup_info->reserve(backups_.size());
  994. for (auto& backup : backups_) {
  995. if (!backup.second->Empty()) {
  996. backup_info->push_back(BackupInfo(
  997. backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
  998. backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
  999. }
  1000. }
  1001. }
  1002. void
  1003. BackupEngineImpl::GetCorruptedBackups(
  1004. std::vector<BackupID>* corrupt_backup_ids) {
  1005. assert(initialized_);
  1006. corrupt_backup_ids->reserve(corrupt_backups_.size());
  1007. for (auto& backup : corrupt_backups_) {
  1008. corrupt_backup_ids->push_back(backup.first);
  1009. }
  1010. }
  1011. Status BackupEngineImpl::RestoreDBFromBackup(
  1012. BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
  1013. const RestoreOptions& restore_options) {
  1014. assert(initialized_);
  1015. auto corrupt_itr = corrupt_backups_.find(backup_id);
  1016. if (corrupt_itr != corrupt_backups_.end()) {
  1017. return corrupt_itr->second.first;
  1018. }
  1019. auto backup_itr = backups_.find(backup_id);
  1020. if (backup_itr == backups_.end()) {
  1021. return Status::NotFound("Backup not found");
  1022. }
  1023. auto& backup = backup_itr->second;
  1024. if (backup->Empty()) {
  1025. return Status::NotFound("Backup not found");
  1026. }
  1027. ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
  1028. ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
  1029. static_cast<int>(restore_options.keep_log_files));
  1030. // just in case. Ignore errors
  1031. db_env_->CreateDirIfMissing(db_dir);
  1032. db_env_->CreateDirIfMissing(wal_dir);
  1033. if (restore_options.keep_log_files) {
  1034. // delete files in db_dir, but keep all the log files
  1035. DeleteChildren(db_dir, 1 << kLogFile);
  1036. // move all the files from archive dir to wal_dir
  1037. std::string archive_dir = ArchivalDirectory(wal_dir);
  1038. std::vector<std::string> archive_files;
  1039. db_env_->GetChildren(archive_dir, &archive_files); // ignore errors
  1040. for (const auto& f : archive_files) {
  1041. uint64_t number;
  1042. FileType type;
  1043. bool ok = ParseFileName(f, &number, &type);
  1044. if (ok && type == kLogFile) {
  1045. ROCKS_LOG_INFO(options_.info_log,
  1046. "Moving log file from archive/ to wal_dir: %s",
  1047. f.c_str());
  1048. Status s =
  1049. db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
  1050. if (!s.ok()) {
  1051. // if we can't move log file from archive_dir to wal_dir,
  1052. // we should fail, since it might mean data loss
  1053. return s;
  1054. }
  1055. }
  1056. }
  1057. } else {
  1058. DeleteChildren(wal_dir);
  1059. DeleteChildren(ArchivalDirectory(wal_dir));
  1060. DeleteChildren(db_dir);
  1061. }
  1062. RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
  1063. if (rate_limiter) {
  1064. copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
  1065. }
  1066. Status s;
  1067. std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
  1068. for (const auto& file_info : backup->GetFiles()) {
  1069. const std::string &file = file_info->filename;
  1070. std::string dst;
  1071. // 1. extract the filename
  1072. size_t slash = file.find_last_of('/');
  1073. // file will either be shared/<file>, shared_checksum/<file_crc32_size>
  1074. // or private/<number>/<file>
  1075. assert(slash != std::string::npos);
  1076. dst = file.substr(slash + 1);
  1077. // if the file was in shared_checksum, extract the real file name
  1078. // in this case the file is <number>_<checksum>_<size>.<type>
  1079. if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
  1080. dst = GetFileFromChecksumFile(dst);
  1081. }
  1082. // 2. find the filetype
  1083. uint64_t number;
  1084. FileType type;
  1085. bool ok = ParseFileName(dst, &number, &type);
  1086. if (!ok) {
  1087. return Status::Corruption("Backup corrupted");
  1088. }
  1089. // 3. Construct the final path
  1090. // kLogFile lives in wal_dir and all the rest live in db_dir
  1091. dst = ((type == kLogFile) ? wal_dir : db_dir) +
  1092. "/" + dst;
  1093. ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
  1094. dst.c_str());
  1095. CopyOrCreateWorkItem copy_or_create_work_item(
  1096. GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
  1097. EnvOptions() /* src_env_options */, false, rate_limiter,
  1098. 0 /* size_limit */);
  1099. RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
  1100. copy_or_create_work_item.result.get_future(),
  1101. file_info->checksum_value);
  1102. files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
  1103. restore_items_to_finish.push_back(
  1104. std::move(after_copy_or_create_work_item));
  1105. }
  1106. Status item_status;
  1107. for (auto& item : restore_items_to_finish) {
  1108. item.result.wait();
  1109. auto result = item.result.get();
  1110. item_status = result.status;
  1111. // Note: It is possible that both of the following bad-status cases occur
  1112. // during copying. But, we only return one status.
  1113. if (!item_status.ok()) {
  1114. s = item_status;
  1115. break;
  1116. } else if (item.checksum_value != result.checksum_value) {
  1117. s = Status::Corruption("Checksum check failed");
  1118. break;
  1119. }
  1120. }
  1121. ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
  1122. s.ToString().c_str());
  1123. return s;
  1124. }
  1125. Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
  1126. assert(initialized_);
  1127. auto corrupt_itr = corrupt_backups_.find(backup_id);
  1128. if (corrupt_itr != corrupt_backups_.end()) {
  1129. return corrupt_itr->second.first;
  1130. }
  1131. auto backup_itr = backups_.find(backup_id);
  1132. if (backup_itr == backups_.end()) {
  1133. return Status::NotFound();
  1134. }
  1135. auto& backup = backup_itr->second;
  1136. if (backup->Empty()) {
  1137. return Status::NotFound();
  1138. }
  1139. ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
  1140. std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
  1141. for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
  1142. GetSharedFileWithChecksumRel()}) {
  1143. const auto abs_dir = GetAbsolutePath(rel_dir);
  1144. InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
  1145. }
  1146. for (const auto& file_info : backup->GetFiles()) {
  1147. const auto abs_path = GetAbsolutePath(file_info->filename);
  1148. if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
  1149. return Status::NotFound("File missing: " + abs_path);
  1150. }
  1151. if (file_info->size != curr_abs_path_to_size[abs_path]) {
  1152. return Status::Corruption("File corrupted: " + abs_path);
  1153. }
  1154. }
  1155. return Status::OK();
  1156. }
  1157. Status BackupEngineImpl::CopyOrCreateFile(
  1158. const std::string& src, const std::string& dst, const std::string& contents,
  1159. Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
  1160. RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
  1161. uint64_t size_limit, std::function<void()> progress_callback) {
  1162. assert(src.empty() != contents.empty());
  1163. Status s;
  1164. std::unique_ptr<WritableFile> dst_file;
  1165. std::unique_ptr<SequentialFile> src_file;
  1166. EnvOptions dst_env_options;
  1167. dst_env_options.use_mmap_writes = false;
  1168. // TODO:(gzh) maybe use direct reads/writes here if possible
  1169. if (size != nullptr) {
  1170. *size = 0;
  1171. }
  1172. if (checksum_value != nullptr) {
  1173. *checksum_value = 0;
  1174. }
  1175. // Check if size limit is set. if not, set it to very big number
  1176. if (size_limit == 0) {
  1177. size_limit = std::numeric_limits<uint64_t>::max();
  1178. }
  1179. s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
  1180. if (s.ok() && !src.empty()) {
  1181. s = src_env->NewSequentialFile(src, &src_file, src_env_options);
  1182. }
  1183. if (!s.ok()) {
  1184. return s;
  1185. }
  1186. std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
  1187. NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
  1188. std::unique_ptr<SequentialFileReader> src_reader;
  1189. std::unique_ptr<char[]> buf;
  1190. if (!src.empty()) {
  1191. src_reader.reset(new SequentialFileReader(
  1192. NewLegacySequentialFileWrapper(src_file), src));
  1193. buf.reset(new char[copy_file_buffer_size_]);
  1194. }
  1195. Slice data;
  1196. uint64_t processed_buffer_size = 0;
  1197. do {
  1198. if (stop_backup_.load(std::memory_order_acquire)) {
  1199. return Status::Incomplete("Backup stopped");
  1200. }
  1201. if (!src.empty()) {
  1202. size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
  1203. ? copy_file_buffer_size_
  1204. : static_cast<size_t>(size_limit);
  1205. s = src_reader->Read(buffer_to_read, &data, buf.get());
  1206. processed_buffer_size += buffer_to_read;
  1207. } else {
  1208. data = contents;
  1209. }
  1210. size_limit -= data.size();
  1211. if (!s.ok()) {
  1212. return s;
  1213. }
  1214. if (size != nullptr) {
  1215. *size += data.size();
  1216. }
  1217. if (checksum_value != nullptr) {
  1218. *checksum_value =
  1219. crc32c::Extend(*checksum_value, data.data(), data.size());
  1220. }
  1221. s = dest_writer->Append(data);
  1222. if (rate_limiter != nullptr) {
  1223. rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
  1224. RateLimiter::OpType::kWrite);
  1225. }
  1226. if (processed_buffer_size > options_.callback_trigger_interval_size) {
  1227. processed_buffer_size -= options_.callback_trigger_interval_size;
  1228. std::lock_guard<std::mutex> lock(byte_report_mutex_);
  1229. progress_callback();
  1230. }
  1231. } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
  1232. if (s.ok() && sync) {
  1233. s = dest_writer->Sync(false);
  1234. }
  1235. if (s.ok()) {
  1236. s = dest_writer->Close();
  1237. }
  1238. return s;
  1239. }
  1240. // fname will always start with "/"
  1241. Status BackupEngineImpl::AddBackupFileWorkItem(
  1242. std::unordered_set<std::string>& live_dst_paths,
  1243. std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
  1244. BackupID backup_id, bool shared, const std::string& src_dir,
  1245. const std::string& fname, const EnvOptions& src_env_options,
  1246. RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
  1247. bool shared_checksum, std::function<void()> progress_callback,
  1248. const std::string& contents) {
  1249. assert(!fname.empty() && fname[0] == '/');
  1250. assert(contents.empty() != src_dir.empty());
  1251. std::string dst_relative = fname.substr(1);
  1252. std::string dst_relative_tmp;
  1253. Status s;
  1254. uint32_t checksum_value = 0;
  1255. if (shared && shared_checksum) {
  1256. // add checksum and file length to the file name
  1257. s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
  1258. &checksum_value);
  1259. if (!s.ok()) {
  1260. return s;
  1261. }
  1262. if (size_bytes == port::kMaxUint64) {
  1263. return Status::NotFound("File missing: " + src_dir + fname);
  1264. }
  1265. dst_relative =
  1266. GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
  1267. dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
  1268. dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
  1269. } else if (shared) {
  1270. dst_relative_tmp = GetSharedFileRel(dst_relative, true);
  1271. dst_relative = GetSharedFileRel(dst_relative, false);
  1272. } else {
  1273. dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
  1274. }
  1275. // We copy into `temp_dest_path` and, once finished, rename it to
  1276. // `final_dest_path`. This allows files to atomically appear at
  1277. // `final_dest_path`. We can copy directly to the final path when atomicity
  1278. // is unnecessary, like for files in private backup directories.
  1279. const std::string* copy_dest_path;
  1280. std::string temp_dest_path;
  1281. std::string final_dest_path = GetAbsolutePath(dst_relative);
  1282. if (!dst_relative_tmp.empty()) {
  1283. temp_dest_path = GetAbsolutePath(dst_relative_tmp);
  1284. copy_dest_path = &temp_dest_path;
  1285. } else {
  1286. copy_dest_path = &final_dest_path;
  1287. }
  1288. // if it's shared, we also need to check if it exists -- if it does, no need
  1289. // to copy it again.
  1290. bool need_to_copy = true;
  1291. // true if final_dest_path is the same path as another live file
  1292. const bool same_path =
  1293. live_dst_paths.find(final_dest_path) != live_dst_paths.end();
  1294. bool file_exists = false;
  1295. if (shared && !same_path) {
  1296. Status exist = backup_env_->FileExists(final_dest_path);
  1297. if (exist.ok()) {
  1298. file_exists = true;
  1299. } else if (exist.IsNotFound()) {
  1300. file_exists = false;
  1301. } else {
  1302. assert(s.IsIOError());
  1303. return exist;
  1304. }
  1305. }
  1306. if (!contents.empty()) {
  1307. need_to_copy = false;
  1308. } else if (shared && (same_path || file_exists)) {
  1309. need_to_copy = false;
  1310. if (shared_checksum) {
  1311. ROCKS_LOG_INFO(options_.info_log,
  1312. "%s already present, with checksum %u and size %" PRIu64,
  1313. fname.c_str(), checksum_value, size_bytes);
  1314. } else if (backuped_file_infos_.find(dst_relative) ==
  1315. backuped_file_infos_.end() && !same_path) {
  1316. // file already exists, but it's not referenced by any backup. overwrite
  1317. // the file
  1318. ROCKS_LOG_INFO(
  1319. options_.info_log,
  1320. "%s already present, but not referenced by any backup. We will "
  1321. "overwrite the file.",
  1322. fname.c_str());
  1323. need_to_copy = true;
  1324. backup_env_->DeleteFile(final_dest_path);
  1325. } else {
  1326. // the file is present and referenced by a backup
  1327. ROCKS_LOG_INFO(options_.info_log,
  1328. "%s already present, calculate checksum", fname.c_str());
  1329. s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
  1330. size_limit, &checksum_value);
  1331. }
  1332. }
  1333. live_dst_paths.insert(final_dest_path);
  1334. if (!contents.empty() || need_to_copy) {
  1335. ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
  1336. copy_dest_path->c_str());
  1337. CopyOrCreateWorkItem copy_or_create_work_item(
  1338. src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
  1339. db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
  1340. size_limit, progress_callback);
  1341. BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
  1342. copy_or_create_work_item.result.get_future(), shared, need_to_copy,
  1343. backup_env_, temp_dest_path, final_dest_path, dst_relative);
  1344. files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
  1345. backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
  1346. } else {
  1347. std::promise<CopyOrCreateResult> promise_result;
  1348. BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
  1349. promise_result.get_future(), shared, need_to_copy, backup_env_,
  1350. temp_dest_path, final_dest_path, dst_relative);
  1351. backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
  1352. CopyOrCreateResult result;
  1353. result.status = s;
  1354. result.size = size_bytes;
  1355. result.checksum_value = checksum_value;
  1356. promise_result.set_value(std::move(result));
  1357. }
  1358. return s;
  1359. }
  1360. Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
  1361. const EnvOptions& src_env_options,
  1362. uint64_t size_limit,
  1363. uint32_t* checksum_value) {
  1364. *checksum_value = 0;
  1365. if (size_limit == 0) {
  1366. size_limit = std::numeric_limits<uint64_t>::max();
  1367. }
  1368. std::unique_ptr<SequentialFile> src_file;
  1369. Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
  1370. if (!s.ok()) {
  1371. return s;
  1372. }
  1373. std::unique_ptr<SequentialFileReader> src_reader(
  1374. new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
  1375. std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
  1376. Slice data;
  1377. do {
  1378. if (stop_backup_.load(std::memory_order_acquire)) {
  1379. return Status::Incomplete("Backup stopped");
  1380. }
  1381. size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
  1382. copy_file_buffer_size_ : static_cast<size_t>(size_limit);
  1383. s = src_reader->Read(buffer_to_read, &data, buf.get());
  1384. if (!s.ok()) {
  1385. return s;
  1386. }
  1387. size_limit -= data.size();
  1388. *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
  1389. } while (data.size() > 0 && size_limit > 0);
  1390. return s;
  1391. }
  1392. void BackupEngineImpl::DeleteChildren(const std::string& dir,
  1393. uint32_t file_type_filter) {
  1394. std::vector<std::string> children;
  1395. db_env_->GetChildren(dir, &children); // ignore errors
  1396. for (const auto& f : children) {
  1397. uint64_t number;
  1398. FileType type;
  1399. bool ok = ParseFileName(f, &number, &type);
  1400. if (ok && (file_type_filter & (1 << type))) {
  1401. // don't delete this file
  1402. continue;
  1403. }
  1404. db_env_->DeleteFile(dir + "/" + f); // ignore errors
  1405. }
  1406. }
  1407. Status BackupEngineImpl::InsertPathnameToSizeBytes(
  1408. const std::string& dir, Env* env,
  1409. std::unordered_map<std::string, uint64_t>* result) {
  1410. assert(result != nullptr);
  1411. std::vector<Env::FileAttributes> files_attrs;
  1412. Status status = env->FileExists(dir);
  1413. if (status.ok()) {
  1414. status = env->GetChildrenFileAttributes(dir, &files_attrs);
  1415. } else if (status.IsNotFound()) {
  1416. // Insert no entries can be considered success
  1417. status = Status::OK();
  1418. }
  1419. const bool slash_needed = dir.empty() || dir.back() != '/';
  1420. for (const auto& file_attrs : files_attrs) {
  1421. result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
  1422. file_attrs.size_bytes);
  1423. }
  1424. return status;
  1425. }
  1426. Status BackupEngineImpl::GarbageCollect() {
  1427. assert(!read_only_);
  1428. // We will make a best effort to remove all garbage even in the presence
  1429. // of inconsistencies or I/O failures that inhibit finding garbage.
  1430. Status overall_status = Status::OK();
  1431. // If all goes well, we don't need another auto-GC this session
  1432. might_need_garbage_collect_ = false;
  1433. ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
  1434. // delete obsolete shared files
  1435. for (bool with_checksum : {false, true}) {
  1436. std::vector<std::string> shared_children;
  1437. {
  1438. std::string shared_path;
  1439. if (with_checksum) {
  1440. shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
  1441. } else {
  1442. shared_path = GetAbsolutePath(GetSharedFileRel());
  1443. }
  1444. auto s = backup_env_->FileExists(shared_path);
  1445. if (s.ok()) {
  1446. s = backup_env_->GetChildren(shared_path, &shared_children);
  1447. } else if (s.IsNotFound()) {
  1448. s = Status::OK();
  1449. }
  1450. if (!s.ok()) {
  1451. overall_status = s;
  1452. // Trying again later might work
  1453. might_need_garbage_collect_ = true;
  1454. }
  1455. }
  1456. for (auto& child : shared_children) {
  1457. if (child == "." || child == "..") {
  1458. continue;
  1459. }
  1460. std::string rel_fname;
  1461. if (with_checksum) {
  1462. rel_fname = GetSharedFileWithChecksumRel(child);
  1463. } else {
  1464. rel_fname = GetSharedFileRel(child);
  1465. }
  1466. auto child_itr = backuped_file_infos_.find(rel_fname);
  1467. // if it's not refcounted, delete it
  1468. if (child_itr == backuped_file_infos_.end() ||
  1469. child_itr->second->refs == 0) {
  1470. // this might be a directory, but DeleteFile will just fail in that
  1471. // case, so we're good
  1472. Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
  1473. ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
  1474. rel_fname.c_str(), s.ToString().c_str());
  1475. backuped_file_infos_.erase(rel_fname);
  1476. if (!s.ok()) {
  1477. // Trying again later might work
  1478. might_need_garbage_collect_ = true;
  1479. }
  1480. }
  1481. }
  1482. }
  1483. // delete obsolete private files
  1484. std::vector<std::string> private_children;
  1485. {
  1486. auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
  1487. &private_children);
  1488. if (!s.ok()) {
  1489. overall_status = s;
  1490. // Trying again later might work
  1491. might_need_garbage_collect_ = true;
  1492. }
  1493. }
  1494. for (auto& child : private_children) {
  1495. if (child == "." || child == "..") {
  1496. continue;
  1497. }
  1498. BackupID backup_id = 0;
  1499. bool tmp_dir = child.find(".tmp") != std::string::npos;
  1500. sscanf(child.c_str(), "%u", &backup_id);
  1501. if (!tmp_dir && // if it's tmp_dir, delete it
  1502. (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
  1503. // it's either not a number or it's still alive. continue
  1504. continue;
  1505. }
  1506. // here we have to delete the dir and all its children
  1507. std::string full_private_path =
  1508. GetAbsolutePath(GetPrivateFileRel(backup_id));
  1509. std::vector<std::string> subchildren;
  1510. backup_env_->GetChildren(full_private_path, &subchildren);
  1511. for (auto& subchild : subchildren) {
  1512. if (subchild == "." || subchild == "..") {
  1513. continue;
  1514. }
  1515. Status s = backup_env_->DeleteFile(full_private_path + subchild);
  1516. ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
  1517. (full_private_path + subchild).c_str(),
  1518. s.ToString().c_str());
  1519. if (!s.ok()) {
  1520. // Trying again later might work
  1521. might_need_garbage_collect_ = true;
  1522. }
  1523. }
  1524. // finally delete the private dir
  1525. Status s = backup_env_->DeleteDir(full_private_path);
  1526. ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
  1527. full_private_path.c_str(), s.ToString().c_str());
  1528. if (!s.ok()) {
  1529. // Trying again later might work
  1530. might_need_garbage_collect_ = true;
  1531. }
  1532. }
  1533. assert(overall_status.ok() || might_need_garbage_collect_);
  1534. return overall_status;
  1535. }
  1536. // ------- BackupMeta class --------
  1537. Status BackupEngineImpl::BackupMeta::AddFile(
  1538. std::shared_ptr<FileInfo> file_info) {
  1539. auto itr = file_infos_->find(file_info->filename);
  1540. if (itr == file_infos_->end()) {
  1541. auto ret = file_infos_->insert({file_info->filename, file_info});
  1542. if (ret.second) {
  1543. itr = ret.first;
  1544. itr->second->refs = 1;
  1545. } else {
  1546. // if this happens, something is seriously wrong
  1547. return Status::Corruption("In memory metadata insertion error");
  1548. }
  1549. } else {
  1550. if (itr->second->checksum_value != file_info->checksum_value) {
  1551. return Status::Corruption(
  1552. "Checksum mismatch for existing backup file. Delete old backups and "
  1553. "try again.");
  1554. }
  1555. ++itr->second->refs; // increase refcount if already present
  1556. }
  1557. size_ += file_info->size;
  1558. files_.push_back(itr->second);
  1559. return Status::OK();
  1560. }
  1561. Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
  1562. Status s;
  1563. for (const auto& file : files_) {
  1564. --file->refs; // decrease refcount
  1565. }
  1566. files_.clear();
  1567. // delete meta file
  1568. if (delete_meta) {
  1569. s = env_->FileExists(meta_filename_);
  1570. if (s.ok()) {
  1571. s = env_->DeleteFile(meta_filename_);
  1572. } else if (s.IsNotFound()) {
  1573. s = Status::OK(); // nothing to delete
  1574. }
  1575. }
  1576. timestamp_ = 0;
  1577. return s;
  1578. }
  1579. Slice kMetaDataPrefix("metadata ");
  1580. // each backup meta file is of the format:
  1581. // <timestamp>
  1582. // <seq number>
  1583. // <metadata(literal string)> <metadata> (optional)
  1584. // <number of files>
  1585. // <file1> <crc32(literal string)> <crc32_value>
  1586. // <file2> <crc32(literal string)> <crc32_value>
  1587. // ...
  1588. Status BackupEngineImpl::BackupMeta::LoadFromFile(
  1589. const std::string& backup_dir,
  1590. const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
  1591. assert(Empty());
  1592. Status s;
  1593. std::unique_ptr<SequentialFile> backup_meta_file;
  1594. s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
  1595. if (!s.ok()) {
  1596. return s;
  1597. }
  1598. std::unique_ptr<SequentialFileReader> backup_meta_reader(
  1599. new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
  1600. meta_filename_));
  1601. std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
  1602. Slice data;
  1603. s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
  1604. if (!s.ok() || data.size() == max_backup_meta_file_size_) {
  1605. return s.ok() ? Status::Corruption("File size too big") : s;
  1606. }
  1607. buf[data.size()] = 0;
  1608. uint32_t num_files = 0;
  1609. char *next;
  1610. timestamp_ = strtoull(data.data(), &next, 10);
  1611. data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  1612. sequence_number_ = strtoull(data.data(), &next, 10);
  1613. data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  1614. if (data.starts_with(kMetaDataPrefix)) {
  1615. // app metadata present
  1616. data.remove_prefix(kMetaDataPrefix.size());
  1617. Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
  1618. bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
  1619. if (!decode_success) {
  1620. return Status::Corruption(
  1621. "Failed to decode stored hex encoded app metadata");
  1622. }
  1623. }
  1624. num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
  1625. data.remove_prefix(next - data.data() + 1); // +1 for '\n'
  1626. std::vector<std::shared_ptr<FileInfo>> files;
  1627. Slice checksum_prefix("crc32 ");
  1628. for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
  1629. auto line = GetSliceUntil(&data, '\n');
  1630. std::string filename = GetSliceUntil(&line, ' ').ToString();
  1631. uint64_t size;
  1632. const std::shared_ptr<FileInfo> file_info = GetFile(filename);
  1633. if (file_info) {
  1634. size = file_info->size;
  1635. } else {
  1636. std::string abs_path = backup_dir + "/" + filename;
  1637. try {
  1638. size = abs_path_to_size.at(abs_path);
  1639. } catch (std::out_of_range&) {
  1640. return Status::Corruption("Size missing for pathname: " + abs_path);
  1641. }
  1642. }
  1643. if (line.empty()) {
  1644. return Status::Corruption("File checksum is missing for " + filename +
  1645. " in " + meta_filename_);
  1646. }
  1647. uint32_t checksum_value = 0;
  1648. if (line.starts_with(checksum_prefix)) {
  1649. line.remove_prefix(checksum_prefix.size());
  1650. checksum_value = static_cast<uint32_t>(
  1651. strtoul(line.data(), nullptr, 10));
  1652. if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
  1653. return Status::Corruption("Invalid checksum value for " + filename +
  1654. " in " + meta_filename_);
  1655. }
  1656. } else {
  1657. return Status::Corruption("Unknown checksum type for " + filename +
  1658. " in " + meta_filename_);
  1659. }
  1660. files.emplace_back(new FileInfo(filename, size, checksum_value));
  1661. }
  1662. if (s.ok() && data.size() > 0) {
  1663. // file has to be read completely. if not, we count it as corruption
  1664. s = Status::Corruption("Tailing data in backup meta file in " +
  1665. meta_filename_);
  1666. }
  1667. if (s.ok()) {
  1668. files_.reserve(files.size());
  1669. for (const auto& file_info : files) {
  1670. s = AddFile(file_info);
  1671. if (!s.ok()) {
  1672. break;
  1673. }
  1674. }
  1675. }
  1676. return s;
  1677. }
  1678. Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
  1679. Status s;
  1680. std::unique_ptr<WritableFile> backup_meta_file;
  1681. EnvOptions env_options;
  1682. env_options.use_mmap_writes = false;
  1683. env_options.use_direct_writes = false;
  1684. s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
  1685. if (!s.ok()) {
  1686. return s;
  1687. }
  1688. std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
  1689. size_t len = 0, buf_size = max_backup_meta_file_size_;
  1690. len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
  1691. len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
  1692. sequence_number_);
  1693. if (!app_metadata_.empty()) {
  1694. std::string hex_encoded_metadata =
  1695. Slice(app_metadata_).ToString(/* hex */ true);
  1696. // +1 to accommodate newline character
  1697. size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
  1698. if (hex_meta_strlen >= buf_size) {
  1699. return Status::Corruption("Buffer too small to fit backup metadata");
  1700. }
  1701. else if (len + hex_meta_strlen >= buf_size) {
  1702. backup_meta_file->Append(Slice(buf.get(), len));
  1703. buf.reset();
  1704. std::unique_ptr<char[]> new_reset_buf(
  1705. new char[max_backup_meta_file_size_]);
  1706. buf.swap(new_reset_buf);
  1707. len = 0;
  1708. }
  1709. len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
  1710. kMetaDataPrefix.ToString().c_str(),
  1711. hex_encoded_metadata.c_str());
  1712. }
  1713. char writelen_temp[19];
  1714. if (len + snprintf(writelen_temp, sizeof(writelen_temp),
  1715. "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
  1716. backup_meta_file->Append(Slice(buf.get(), len));
  1717. buf.reset();
  1718. std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
  1719. buf.swap(new_reset_buf);
  1720. len = 0;
  1721. }
  1722. {
  1723. const char *const_write = writelen_temp;
  1724. len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
  1725. }
  1726. for (const auto& file : files_) {
  1727. // use crc32 for now, switch to something else if needed
  1728. size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
  1729. sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
  1730. const char *const_write = writelen_temp;
  1731. if (newlen >= buf_size) {
  1732. backup_meta_file->Append(Slice(buf.get(), len));
  1733. buf.reset();
  1734. std::unique_ptr<char[]> new_reset_buf(
  1735. new char[max_backup_meta_file_size_]);
  1736. buf.swap(new_reset_buf);
  1737. len = 0;
  1738. }
  1739. len += snprintf(buf.get() + len, buf_size - len, "%s%s",
  1740. file->filename.c_str(), const_write);
  1741. }
  1742. s = backup_meta_file->Append(Slice(buf.get(), len));
  1743. if (s.ok() && sync) {
  1744. s = backup_meta_file->Sync();
  1745. }
  1746. if (s.ok()) {
  1747. s = backup_meta_file->Close();
  1748. }
  1749. if (s.ok()) {
  1750. s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
  1751. }
  1752. return s;
  1753. }
  1754. // -------- BackupEngineReadOnlyImpl ---------
  1755. class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
  1756. public:
  1757. BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
  1758. : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
  1759. ~BackupEngineReadOnlyImpl() override {}
  1760. // The returned BackupInfos are in chronological order, which means the
  1761. // latest backup comes last.
  1762. void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
  1763. backup_engine_->GetBackupInfo(backup_info);
  1764. }
  1765. void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
  1766. backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
  1767. }
  1768. Status RestoreDBFromBackup(
  1769. BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
  1770. const RestoreOptions& restore_options = RestoreOptions()) override {
  1771. return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
  1772. restore_options);
  1773. }
  1774. Status RestoreDBFromLatestBackup(
  1775. const std::string& db_dir, const std::string& wal_dir,
  1776. const RestoreOptions& restore_options = RestoreOptions()) override {
  1777. return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
  1778. restore_options);
  1779. }
  1780. Status VerifyBackup(BackupID backup_id) override {
  1781. return backup_engine_->VerifyBackup(backup_id);
  1782. }
  1783. Status Initialize() { return backup_engine_->Initialize(); }
  1784. private:
  1785. std::unique_ptr<BackupEngineImpl> backup_engine_;
  1786. };
  1787. Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
  1788. BackupEngineReadOnly** backup_engine_ptr) {
  1789. if (options.destroy_old_data) {
  1790. return Status::InvalidArgument(
  1791. "Can't destroy old data with ReadOnly BackupEngine");
  1792. }
  1793. std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
  1794. new BackupEngineReadOnlyImpl(env, options));
  1795. auto s = backup_engine->Initialize();
  1796. if (!s.ok()) {
  1797. *backup_engine_ptr = nullptr;
  1798. return s;
  1799. }
  1800. *backup_engine_ptr = backup_engine.release();
  1801. return Status::OK();
  1802. }
  1803. } // namespace ROCKSDB_NAMESPACE
  1804. #endif // ROCKSDB_LITE