checkpoint_impl.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2012 Facebook.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file.
  9. #ifndef ROCKSDB_LITE
  10. #include "utilities/checkpoint/checkpoint_impl.h"
  11. #include <algorithm>
  12. #include <cinttypes>
  13. #include <string>
  14. #include <vector>
  15. #include "db/wal_manager.h"
  16. #include "file/file_util.h"
  17. #include "file/filename.h"
  18. #include "port/port.h"
  19. #include "rocksdb/db.h"
  20. #include "rocksdb/env.h"
  21. #include "rocksdb/metadata.h"
  22. #include "rocksdb/transaction_log.h"
  23. #include "rocksdb/utilities/checkpoint.h"
  24. #include "test_util/sync_point.h"
  25. namespace ROCKSDB_NAMESPACE {
  26. Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
  27. *checkpoint_ptr = new CheckpointImpl(db);
  28. return Status::OK();
  29. }
  30. Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
  31. uint64_t /*log_size_for_flush*/) {
  32. return Status::NotSupported("");
  33. }
  34. void CheckpointImpl::CleanStagingDirectory(
  35. const std::string& full_private_path, Logger* info_log) {
  36. std::vector<std::string> subchildren;
  37. Status s = db_->GetEnv()->FileExists(full_private_path);
  38. if (s.IsNotFound()) {
  39. return;
  40. }
  41. ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
  42. full_private_path.c_str(), s.ToString().c_str());
  43. db_->GetEnv()->GetChildren(full_private_path, &subchildren);
  44. for (auto& subchild : subchildren) {
  45. std::string subchild_path = full_private_path + "/" + subchild;
  46. s = db_->GetEnv()->DeleteFile(subchild_path);
  47. ROCKS_LOG_INFO(info_log, "Delete file %s -- %s",
  48. subchild_path.c_str(), s.ToString().c_str());
  49. }
  50. // finally delete the private dir
  51. s = db_->GetEnv()->DeleteDir(full_private_path);
  52. ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
  53. full_private_path.c_str(), s.ToString().c_str());
  54. }
  55. Status Checkpoint::ExportColumnFamily(
  56. ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/,
  57. ExportImportFilesMetaData** /*metadata*/) {
  58. return Status::NotSupported("");
  59. }
  60. // Builds an openable snapshot of RocksDB
  61. Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
  62. uint64_t log_size_for_flush) {
  63. DBOptions db_options = db_->GetDBOptions();
  64. Status s = db_->GetEnv()->FileExists(checkpoint_dir);
  65. if (s.ok()) {
  66. return Status::InvalidArgument("Directory exists");
  67. } else if (!s.IsNotFound()) {
  68. assert(s.IsIOError());
  69. return s;
  70. }
  71. ROCKS_LOG_INFO(
  72. db_options.info_log,
  73. "Started the snapshot process -- creating snapshot in directory %s",
  74. checkpoint_dir.c_str());
  75. size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/');
  76. if (final_nonslash_idx == std::string::npos) {
  77. // npos means it's only slashes or empty. Non-empty means it's the root
  78. // directory, but it shouldn't be because we verified above the directory
  79. // doesn't exist.
  80. assert(checkpoint_dir.empty());
  81. return Status::InvalidArgument("invalid checkpoint directory name");
  82. }
  83. std::string full_private_path =
  84. checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
  85. ROCKS_LOG_INFO(
  86. db_options.info_log,
  87. "Snapshot process -- using temporary directory %s",
  88. full_private_path.c_str());
  89. CleanStagingDirectory(full_private_path, db_options.info_log.get());
  90. // create snapshot directory
  91. s = db_->GetEnv()->CreateDir(full_private_path);
  92. uint64_t sequence_number = 0;
  93. if (s.ok()) {
  94. db_->DisableFileDeletions();
  95. s = CreateCustomCheckpoint(
  96. db_options,
  97. [&](const std::string& src_dirname, const std::string& fname,
  98. FileType) {
  99. ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
  100. return db_->GetFileSystem()->LinkFile(src_dirname + fname,
  101. full_private_path + fname,
  102. IOOptions(), nullptr);
  103. } /* link_file_cb */,
  104. [&](const std::string& src_dirname, const std::string& fname,
  105. uint64_t size_limit_bytes, FileType) {
  106. ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
  107. return CopyFile(db_->GetFileSystem(), src_dirname + fname,
  108. full_private_path + fname, size_limit_bytes,
  109. db_options.use_fsync);
  110. } /* copy_file_cb */,
  111. [&](const std::string& fname, const std::string& contents, FileType) {
  112. ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
  113. return CreateFile(db_->GetFileSystem(), full_private_path + fname,
  114. contents, db_options.use_fsync);
  115. } /* create_file_cb */,
  116. &sequence_number, log_size_for_flush);
  117. // we copied all the files, enable file deletions
  118. db_->EnableFileDeletions(false);
  119. }
  120. if (s.ok()) {
  121. // move tmp private backup to real snapshot directory
  122. s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
  123. }
  124. if (s.ok()) {
  125. std::unique_ptr<Directory> checkpoint_directory;
  126. db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
  127. if (checkpoint_directory != nullptr) {
  128. s = checkpoint_directory->Fsync();
  129. }
  130. }
  131. if (s.ok()) {
  132. // here we know that we succeeded and installed the new snapshot
  133. ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
  134. ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
  135. sequence_number);
  136. } else {
  137. // clean all the files we might have created
  138. ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
  139. s.ToString().c_str());
  140. CleanStagingDirectory(full_private_path, db_options.info_log.get());
  141. }
  142. return s;
  143. }
  144. Status CheckpointImpl::CreateCustomCheckpoint(
  145. const DBOptions& db_options,
  146. std::function<Status(const std::string& src_dirname,
  147. const std::string& src_fname, FileType type)>
  148. link_file_cb,
  149. std::function<Status(const std::string& src_dirname,
  150. const std::string& src_fname,
  151. uint64_t size_limit_bytes, FileType type)>
  152. copy_file_cb,
  153. std::function<Status(const std::string& fname, const std::string& contents,
  154. FileType type)>
  155. create_file_cb,
  156. uint64_t* sequence_number, uint64_t log_size_for_flush) {
  157. Status s;
  158. std::vector<std::string> live_files;
  159. uint64_t manifest_file_size = 0;
  160. uint64_t min_log_num = port::kMaxUint64;
  161. *sequence_number = db_->GetLatestSequenceNumber();
  162. bool same_fs = true;
  163. VectorLogPtr live_wal_files;
  164. bool flush_memtable = true;
  165. if (s.ok()) {
  166. if (!db_options.allow_2pc) {
  167. if (log_size_for_flush == port::kMaxUint64) {
  168. flush_memtable = false;
  169. } else if (log_size_for_flush > 0) {
  170. // If out standing log files are small, we skip the flush.
  171. s = db_->GetSortedWalFiles(live_wal_files);
  172. if (!s.ok()) {
  173. return s;
  174. }
  175. // Don't flush column families if total log size is smaller than
  176. // log_size_for_flush. We copy the log files instead.
  177. // We may be able to cover 2PC case too.
  178. uint64_t total_wal_size = 0;
  179. for (auto& wal : live_wal_files) {
  180. total_wal_size += wal->SizeFileBytes();
  181. }
  182. if (total_wal_size < log_size_for_flush) {
  183. flush_memtable = false;
  184. }
  185. live_wal_files.clear();
  186. }
  187. }
  188. // this will return live_files prefixed with "/"
  189. s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
  190. if (s.ok() && db_options.allow_2pc) {
  191. // If 2PC is enabled, we need to get minimum log number after the flush.
  192. // Need to refetch the live files to recapture the snapshot.
  193. if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
  194. &min_log_num)) {
  195. return Status::InvalidArgument(
  196. "2PC enabled but cannot fine the min log number to keep.");
  197. }
  198. // We need to refetch live files with flush to handle this case:
  199. // A previous 000001.log contains the prepare record of transaction tnx1.
  200. // The current log file is 000002.log, and sequence_number points to this
  201. // file.
  202. // After calling GetLiveFiles(), 000003.log is created.
  203. // Then tnx1 is committed. The commit record is written to 000003.log.
  204. // Now we fetch min_log_num, which will be 3.
  205. // Then only 000002.log and 000003.log will be copied, and 000001.log will
  206. // be skipped. 000003.log contains commit message of tnx1, but we don't
  207. // have respective prepare record for it.
  208. // In order to avoid this situation, we need to force flush to make sure
  209. // all transactions committed before getting min_log_num will be flushed
  210. // to SST files.
  211. // We cannot get min_log_num before calling the GetLiveFiles() for the
  212. // first time, because if we do that, all the logs files will be included,
  213. // far more than needed.
  214. s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
  215. }
  216. TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
  217. TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
  218. db_->FlushWAL(false /* sync */);
  219. }
  220. // if we have more than one column family, we need to also get WAL files
  221. if (s.ok()) {
  222. s = db_->GetSortedWalFiles(live_wal_files);
  223. }
  224. if (!s.ok()) {
  225. return s;
  226. }
  227. size_t wal_size = live_wal_files.size();
  228. // copy/hard link live_files
  229. std::string manifest_fname, current_fname;
  230. for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
  231. uint64_t number;
  232. FileType type;
  233. bool ok = ParseFileName(live_files[i], &number, &type);
  234. if (!ok) {
  235. s = Status::Corruption("Can't parse file name. This is very bad");
  236. break;
  237. }
  238. // we should only get sst, options, manifest and current files here
  239. assert(type == kTableFile || type == kDescriptorFile ||
  240. type == kCurrentFile || type == kOptionsFile);
  241. assert(live_files[i].size() > 0 && live_files[i][0] == '/');
  242. if (type == kCurrentFile) {
  243. // We will craft the current file manually to ensure it's consistent with
  244. // the manifest number. This is necessary because current's file contents
  245. // can change during checkpoint creation.
  246. current_fname = live_files[i];
  247. continue;
  248. } else if (type == kDescriptorFile) {
  249. manifest_fname = live_files[i];
  250. }
  251. std::string src_fname = live_files[i];
  252. // rules:
  253. // * if it's kTableFile, then it's shared
  254. // * if it's kDescriptorFile, limit the size to manifest_file_size
  255. // * always copy if cross-device link
  256. if ((type == kTableFile) && same_fs) {
  257. s = link_file_cb(db_->GetName(), src_fname, type);
  258. if (s.IsNotSupported()) {
  259. same_fs = false;
  260. s = Status::OK();
  261. }
  262. }
  263. if ((type != kTableFile) || (!same_fs)) {
  264. s = copy_file_cb(db_->GetName(), src_fname,
  265. (type == kDescriptorFile) ? manifest_file_size : 0,
  266. type);
  267. }
  268. }
  269. if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
  270. create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
  271. kCurrentFile);
  272. }
  273. ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
  274. live_wal_files.size());
  275. // Link WAL files. Copy exact size of last one because it is the only one
  276. // that has changes after the last flush.
  277. for (size_t i = 0; s.ok() && i < wal_size; ++i) {
  278. if ((live_wal_files[i]->Type() == kAliveLogFile) &&
  279. (!flush_memtable ||
  280. live_wal_files[i]->StartSequence() >= *sequence_number ||
  281. live_wal_files[i]->LogNumber() >= min_log_num)) {
  282. if (i + 1 == wal_size) {
  283. s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
  284. live_wal_files[i]->SizeFileBytes(), kLogFile);
  285. break;
  286. }
  287. if (same_fs) {
  288. // we only care about live log files
  289. s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
  290. kLogFile);
  291. if (s.IsNotSupported()) {
  292. same_fs = false;
  293. s = Status::OK();
  294. }
  295. }
  296. if (!same_fs) {
  297. s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
  298. kLogFile);
  299. }
  300. }
  301. }
  302. return s;
  303. }
  304. // Exports all live SST files of a specified Column Family onto export_dir,
  305. // returning SST files information in metadata.
  306. Status CheckpointImpl::ExportColumnFamily(
  307. ColumnFamilyHandle* handle, const std::string& export_dir,
  308. ExportImportFilesMetaData** metadata) {
  309. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handle);
  310. const auto cf_name = cfh->GetName();
  311. const auto db_options = db_->GetDBOptions();
  312. assert(metadata != nullptr);
  313. assert(*metadata == nullptr);
  314. auto s = db_->GetEnv()->FileExists(export_dir);
  315. if (s.ok()) {
  316. return Status::InvalidArgument("Specified export_dir exists");
  317. } else if (!s.IsNotFound()) {
  318. assert(s.IsIOError());
  319. return s;
  320. }
  321. const auto final_nonslash_idx = export_dir.find_last_not_of('/');
  322. if (final_nonslash_idx == std::string::npos) {
  323. return Status::InvalidArgument("Specified export_dir invalid");
  324. }
  325. ROCKS_LOG_INFO(db_options.info_log,
  326. "[%s] export column family onto export directory %s",
  327. cf_name.c_str(), export_dir.c_str());
  328. // Create a temporary export directory.
  329. const auto tmp_export_dir =
  330. export_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
  331. s = db_->GetEnv()->CreateDir(tmp_export_dir);
  332. if (s.ok()) {
  333. s = db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), handle);
  334. }
  335. ColumnFamilyMetaData db_metadata;
  336. if (s.ok()) {
  337. // Export live sst files with file deletions disabled.
  338. s = db_->DisableFileDeletions();
  339. if (s.ok()) {
  340. db_->GetColumnFamilyMetaData(handle, &db_metadata);
  341. s = ExportFilesInMetaData(
  342. db_options, db_metadata,
  343. [&](const std::string& src_dirname, const std::string& fname) {
  344. ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s",
  345. cf_name.c_str(), fname.c_str());
  346. return db_->GetEnv()->LinkFile(src_dirname + fname,
  347. tmp_export_dir + fname);
  348. } /*link_file_cb*/,
  349. [&](const std::string& src_dirname, const std::string& fname) {
  350. ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
  351. cf_name.c_str(), fname.c_str());
  352. return CopyFile(db_->GetFileSystem(), src_dirname + fname,
  353. tmp_export_dir + fname, 0, db_options.use_fsync);
  354. } /*copy_file_cb*/);
  355. const auto enable_status = db_->EnableFileDeletions(false /*force*/);
  356. if (s.ok()) {
  357. s = enable_status;
  358. }
  359. }
  360. }
  361. auto moved_to_user_specified_dir = false;
  362. if (s.ok()) {
  363. // Move temporary export directory to the actual export directory.
  364. s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir);
  365. }
  366. if (s.ok()) {
  367. // Fsync export directory.
  368. moved_to_user_specified_dir = true;
  369. std::unique_ptr<Directory> dir_ptr;
  370. s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr);
  371. if (s.ok()) {
  372. assert(dir_ptr != nullptr);
  373. s = dir_ptr->Fsync();
  374. }
  375. }
  376. if (s.ok()) {
  377. // Export of files succeeded. Fill in the metadata information.
  378. auto result_metadata = new ExportImportFilesMetaData();
  379. result_metadata->db_comparator_name = handle->GetComparator()->Name();
  380. for (const auto& level_metadata : db_metadata.levels) {
  381. for (const auto& file_metadata : level_metadata.files) {
  382. LiveFileMetaData live_file_metadata;
  383. live_file_metadata.size = file_metadata.size;
  384. live_file_metadata.name = std::move(file_metadata.name);
  385. live_file_metadata.file_number = file_metadata.file_number;
  386. live_file_metadata.db_path = export_dir;
  387. live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
  388. live_file_metadata.largest_seqno = file_metadata.largest_seqno;
  389. live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
  390. live_file_metadata.largestkey = std::move(file_metadata.largestkey);
  391. live_file_metadata.oldest_blob_file_number =
  392. file_metadata.oldest_blob_file_number;
  393. live_file_metadata.level = level_metadata.level;
  394. result_metadata->files.push_back(live_file_metadata);
  395. }
  396. *metadata = result_metadata;
  397. }
  398. ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.",
  399. cf_name.c_str());
  400. } else {
  401. // Failure: Clean up all the files/directories created.
  402. ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s",
  403. cf_name.c_str(), s.ToString().c_str());
  404. std::vector<std::string> subchildren;
  405. const auto cleanup_dir =
  406. moved_to_user_specified_dir ? export_dir : tmp_export_dir;
  407. db_->GetEnv()->GetChildren(cleanup_dir, &subchildren);
  408. for (const auto& subchild : subchildren) {
  409. const auto subchild_path = cleanup_dir + "/" + subchild;
  410. const auto status = db_->GetEnv()->DeleteFile(subchild_path);
  411. if (!status.ok()) {
  412. ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s",
  413. subchild_path.c_str(), status.ToString().c_str());
  414. }
  415. }
  416. const auto status = db_->GetEnv()->DeleteDir(cleanup_dir);
  417. if (!status.ok()) {
  418. ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s",
  419. cleanup_dir.c_str(), status.ToString().c_str());
  420. }
  421. }
  422. return s;
  423. }
  424. Status CheckpointImpl::ExportFilesInMetaData(
  425. const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
  426. std::function<Status(const std::string& src_dirname,
  427. const std::string& src_fname)>
  428. link_file_cb,
  429. std::function<Status(const std::string& src_dirname,
  430. const std::string& src_fname)>
  431. copy_file_cb) {
  432. Status s;
  433. auto hardlink_file = true;
  434. // Copy/hard link files in metadata.
  435. size_t num_files = 0;
  436. for (const auto& level_metadata : metadata.levels) {
  437. for (const auto& file_metadata : level_metadata.files) {
  438. uint64_t number;
  439. FileType type;
  440. const auto ok = ParseFileName(file_metadata.name, &number, &type);
  441. if (!ok) {
  442. s = Status::Corruption("Could not parse file name");
  443. break;
  444. }
  445. // We should only get sst files here.
  446. assert(type == kTableFile);
  447. assert(file_metadata.size > 0 && file_metadata.name[0] == '/');
  448. const auto src_fname = file_metadata.name;
  449. ++num_files;
  450. if (hardlink_file) {
  451. s = link_file_cb(db_->GetName(), src_fname);
  452. if (num_files == 1 && s.IsNotSupported()) {
  453. // Fallback to copy if link failed due to cross-device directories.
  454. hardlink_file = false;
  455. s = Status::OK();
  456. }
  457. }
  458. if (!hardlink_file) {
  459. s = copy_file_cb(db_->GetName(), src_fname);
  460. }
  461. if (!s.ok()) {
  462. break;
  463. }
  464. }
  465. }
  466. ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt,
  467. num_files);
  468. return s;
  469. }
  470. } // namespace ROCKSDB_NAMESPACE
  471. #endif // ROCKSDB_LITE