wal_manager.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/wal_manager.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <memory>
  13. #include <vector>
  14. #include "db/log_reader.h"
  15. #include "db/log_writer.h"
  16. #include "db/transaction_log_impl.h"
  17. #include "db/write_batch_internal.h"
  18. #include "file/file_util.h"
  19. #include "file/filename.h"
  20. #include "file/sequence_file_reader.h"
  21. #include "logging/logging.h"
  22. #include "port/port.h"
  23. #include "rocksdb/env.h"
  24. #include "rocksdb/options.h"
  25. #include "rocksdb/write_batch.h"
  26. #include "test_util/sync_point.h"
  27. #include "util/cast_util.h"
  28. #include "util/coding.h"
  29. #include "util/mutexlock.h"
  30. #include "util/string_util.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
  33. auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
  34. if (s.ok()) {
  35. MutexLock l(&read_first_record_cache_mutex_);
  36. read_first_record_cache_.erase(number);
  37. }
  38. return s;
  39. }
  40. Status WalManager::GetSortedWalFiles(VectorWalPtr& files, bool need_seqnos,
  41. bool include_archived) {
  42. // First get sorted files in db dir, then get sorted files from archived
  43. // dir, to avoid a race condition where a log file is moved to archived
  44. // dir in between.
  45. Status s;
  46. // list wal files in main db dir.
  47. VectorWalPtr logs;
  48. s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile, need_seqnos);
  49. if (!include_archived || !s.ok()) {
  50. return s;
  51. }
  52. // Reproduce the race condition where a log file is moved
  53. // to archived dir, between these two sync points, used in
  54. // (DBTest,TransactionLogIteratorRace)
  55. TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
  56. TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
  57. files.clear();
  58. // list wal files in archive dir.
  59. std::string archivedir = ArchivalDirectory(wal_dir_);
  60. Status exists = env_->FileExists(archivedir);
  61. if (exists.ok()) {
  62. s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile, need_seqnos);
  63. if (!s.ok()) {
  64. return s;
  65. }
  66. } else if (!exists.IsNotFound()) {
  67. assert(s.ok());
  68. return exists;
  69. }
  70. uint64_t latest_archived_log_number = 0;
  71. if (!files.empty()) {
  72. latest_archived_log_number = files.back()->LogNumber();
  73. ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
  74. latest_archived_log_number);
  75. }
  76. files.reserve(files.size() + logs.size());
  77. for (auto& log : logs) {
  78. if (log->LogNumber() > latest_archived_log_number) {
  79. files.push_back(std::move(log));
  80. } else {
  81. // When the race condition happens, we could see the
  82. // same log in both db dir and archived dir. Simply
  83. // ignore the one in db dir. Note that, if we read
  84. // archived dir first, we would have missed the log file.
  85. ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
  86. log->PathName().c_str());
  87. }
  88. }
  89. return s;
  90. }
  91. Status WalManager::GetUpdatesSince(
  92. SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
  93. const TransactionLogIterator::ReadOptions& read_options,
  94. VersionSet* version_set) {
  95. if (seq_per_batch_) {
  96. return Status::NotSupported();
  97. }
  98. assert(!seq_per_batch_);
  99. // Get all sorted Wal Files.
  100. // Do binary search and open files and find the seq number.
  101. std::unique_ptr<VectorWalPtr> wal_files(new VectorWalPtr);
  102. Status s = GetSortedWalFiles(*wal_files);
  103. if (!s.ok()) {
  104. return s;
  105. }
  106. s = RetainProbableWalFiles(*wal_files, seq);
  107. if (!s.ok()) {
  108. return s;
  109. }
  110. iter->reset(new TransactionLogIteratorImpl(
  111. wal_dir_, &db_options_, read_options, file_options_, seq,
  112. std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
  113. return (*iter)->status();
  114. }
  115. // 1. Go through all archived files and
  116. // a. if ttl is enabled, delete outdated files
  117. // b. if archive size limit is enabled, delete empty files,
  118. // compute file number and size.
  119. // 2. If size limit is enabled:
  120. // a. compute how many files should be deleted
  121. // b. get sorted non-empty archived logs
  122. // c. delete what should be deleted
  123. void WalManager::PurgeObsoleteWALFiles() {
  124. bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
  125. bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
  126. if (!ttl_enabled && !size_limit_enabled) {
  127. return;
  128. }
  129. int64_t current_time = 0;
  130. Status s = db_options_.clock->GetCurrentTime(&current_time);
  131. if (!s.ok()) {
  132. ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
  133. s.ToString().c_str());
  134. assert(false);
  135. return;
  136. }
  137. uint64_t const now_seconds = static_cast<uint64_t>(current_time);
  138. uint64_t const time_to_check =
  139. ttl_enabled
  140. ? std::min(kDefaultIntervalToDeleteObsoleteWAL,
  141. std::max(uint64_t{1}, db_options_.WAL_ttl_seconds / 2))
  142. : kDefaultIntervalToDeleteObsoleteWAL;
  143. uint64_t old_last_run_time = purge_wal_files_last_run_.LoadRelaxed();
  144. do {
  145. if (old_last_run_time + time_to_check > now_seconds) {
  146. // last run is recent enough, no need to purge
  147. return;
  148. }
  149. } while (!purge_wal_files_last_run_.CasWeakRelaxed(
  150. /*expected=*/old_last_run_time, /*desired=*/now_seconds));
  151. std::string archival_dir = ArchivalDirectory(wal_dir_);
  152. std::vector<std::string> files;
  153. s = env_->GetChildren(archival_dir, &files);
  154. if (!s.ok()) {
  155. ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
  156. s.ToString().c_str());
  157. return;
  158. }
  159. size_t log_files_num = 0;
  160. uint64_t log_file_size = 0;
  161. for (auto& f : files) {
  162. uint64_t number;
  163. FileType type;
  164. if (ParseFileName(f, &number, &type) && type == kWalFile) {
  165. std::string const file_path = archival_dir + "/" + f;
  166. if (ttl_enabled) {
  167. uint64_t file_m_time;
  168. s = env_->GetFileModificationTime(file_path, &file_m_time);
  169. if (!s.ok()) {
  170. ROCKS_LOG_WARN(db_options_.info_log,
  171. "Can't get file mod time: %s: %s", file_path.c_str(),
  172. s.ToString().c_str());
  173. continue;
  174. }
  175. if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
  176. s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
  177. /*force_fg=*/!wal_in_db_path_);
  178. if (!s.ok()) {
  179. ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
  180. file_path.c_str(), s.ToString().c_str());
  181. continue;
  182. } else {
  183. MutexLock l(&read_first_record_cache_mutex_);
  184. read_first_record_cache_.erase(number);
  185. }
  186. continue;
  187. }
  188. }
  189. if (size_limit_enabled) {
  190. uint64_t file_size;
  191. s = env_->GetFileSize(file_path, &file_size);
  192. if (!s.ok()) {
  193. ROCKS_LOG_ERROR(db_options_.info_log,
  194. "Unable to get file size: %s: %s", file_path.c_str(),
  195. s.ToString().c_str());
  196. return;
  197. } else {
  198. if (file_size > 0) {
  199. log_file_size = std::max(log_file_size, file_size);
  200. ++log_files_num;
  201. } else {
  202. s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
  203. /*force_fg=*/!wal_in_db_path_);
  204. if (!s.ok()) {
  205. ROCKS_LOG_WARN(db_options_.info_log,
  206. "Unable to delete file: %s: %s", file_path.c_str(),
  207. s.ToString().c_str());
  208. continue;
  209. } else {
  210. MutexLock l(&read_first_record_cache_mutex_);
  211. read_first_record_cache_.erase(number);
  212. }
  213. }
  214. }
  215. }
  216. }
  217. }
  218. if (0 == log_files_num || !size_limit_enabled) {
  219. return;
  220. }
  221. size_t const files_keep_num = static_cast<size_t>(
  222. db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size);
  223. if (log_files_num <= files_keep_num) {
  224. return;
  225. }
  226. size_t files_del_num = log_files_num - files_keep_num;
  227. VectorWalPtr archived_logs;
  228. s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile,
  229. /*need_seqno=*/false);
  230. if (!s.ok()) {
  231. ROCKS_LOG_WARN(db_options_.info_log,
  232. "Unable to get archived WALs from: %s: %s",
  233. archival_dir.c_str(), s.ToString().c_str());
  234. files_del_num = 0;
  235. } else if (files_del_num > archived_logs.size()) {
  236. ROCKS_LOG_WARN(db_options_.info_log,
  237. "Trying to delete more archived log files than "
  238. "exist. Deleting all");
  239. files_del_num = archived_logs.size();
  240. }
  241. for (size_t i = 0; i < files_del_num; ++i) {
  242. std::string const file_path = archived_logs[i]->PathName();
  243. s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
  244. /*force_fg=*/!wal_in_db_path_);
  245. if (!s.ok()) {
  246. ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
  247. file_path.c_str(), s.ToString().c_str());
  248. continue;
  249. } else {
  250. MutexLock l(&read_first_record_cache_mutex_);
  251. read_first_record_cache_.erase(archived_logs[i]->LogNumber());
  252. }
  253. }
  254. }
  255. void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
  256. auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
  257. // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  258. TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
  259. Status s = env_->RenameFile(fname, archived_log_name);
  260. IGNORE_STATUS_IF_ERROR(s);
  261. // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  262. TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
  263. // The sync point below is used in
  264. // (CheckPointTest, CheckpointWithArchievedLog)
  265. TEST_SYNC_POINT("WalManager::ArchiveWALFile");
  266. ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
  267. fname.c_str(), archived_log_name.c_str(),
  268. s.ToString().c_str());
  269. }
  270. Status WalManager::GetSortedWalsOfType(const std::string& path,
  271. VectorWalPtr& log_files,
  272. WalFileType log_type, bool need_seqnos) {
  273. std::vector<std::string> all_files;
  274. const Status status = env_->GetChildren(path, &all_files);
  275. if (!status.ok()) {
  276. return status;
  277. }
  278. log_files.reserve(all_files.size());
  279. for (const auto& f : all_files) {
  280. uint64_t number;
  281. FileType type;
  282. if (ParseFileName(f, &number, &type) && type == kWalFile) {
  283. SequenceNumber sequence;
  284. if (need_seqnos) {
  285. Status s = ReadFirstRecord(log_type, number, &sequence);
  286. if (!s.ok()) {
  287. return s;
  288. }
  289. if (sequence == 0) {
  290. // empty file
  291. continue;
  292. }
  293. } else {
  294. sequence = 0;
  295. }
  296. // Reproduce the race condition where a log file is moved
  297. // to archived dir, between these two sync points, used in
  298. // (DBTest,TransactionLogIteratorRace)
  299. TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
  300. TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
  301. uint64_t size_bytes;
  302. Status s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
  303. // re-try in case the alive log file has been moved to archive.
  304. if (!s.ok() && log_type == kAliveLogFile) {
  305. std::string archived_file = ArchivedLogFileName(path, number);
  306. if (env_->FileExists(archived_file).ok()) {
  307. s = env_->GetFileSize(archived_file, &size_bytes);
  308. if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
  309. // oops, the file just got deleted from archived dir! move on
  310. s = Status::OK();
  311. continue;
  312. }
  313. }
  314. }
  315. if (!s.ok()) {
  316. return s;
  317. }
  318. log_files.emplace_back(
  319. new WalFileImpl(number, log_type, sequence, size_bytes));
  320. }
  321. }
  322. std::sort(
  323. log_files.begin(), log_files.end(),
  324. [](const std::unique_ptr<WalFile>& a, const std::unique_ptr<WalFile>& b) {
  325. WalFileImpl* a_impl = static_cast_with_check<WalFileImpl>(a.get());
  326. WalFileImpl* b_impl = static_cast_with_check<WalFileImpl>(b.get());
  327. return *a_impl < *b_impl;
  328. });
  329. return status;
  330. }
  331. Status WalManager::RetainProbableWalFiles(VectorWalPtr& all_logs,
  332. const SequenceNumber target) {
  333. int64_t start = 0; // signed to avoid overflow when target is < first file.
  334. int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
  335. // Binary Search. avoid opening all files.
  336. while (end >= start) {
  337. int64_t mid = start + (end - start) / 2; // Avoid overflow.
  338. SequenceNumber current_seq_num =
  339. all_logs.at(static_cast<size_t>(mid))->StartSequence();
  340. if (current_seq_num == target) {
  341. end = mid;
  342. break;
  343. } else if (current_seq_num < target) {
  344. start = mid + 1;
  345. } else {
  346. end = mid - 1;
  347. }
  348. }
  349. // end could be -ve.
  350. size_t start_index =
  351. static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
  352. // The last wal file is always included
  353. all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
  354. return Status::OK();
  355. }
  356. Status WalManager::ReadFirstRecord(const WalFileType type,
  357. const uint64_t number,
  358. SequenceNumber* sequence) {
  359. *sequence = 0;
  360. if (type != kAliveLogFile && type != kArchivedLogFile) {
  361. ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
  362. std::to_string(type).c_str());
  363. return Status::NotSupported("File Type Not Known " + std::to_string(type));
  364. }
  365. {
  366. MutexLock l(&read_first_record_cache_mutex_);
  367. auto itr = read_first_record_cache_.find(number);
  368. if (itr != read_first_record_cache_.end()) {
  369. *sequence = itr->second;
  370. return Status::OK();
  371. }
  372. }
  373. Status s;
  374. if (type == kAliveLogFile) {
  375. std::string fname = LogFileName(wal_dir_, number);
  376. s = ReadFirstLine(fname, number, sequence);
  377. if (!s.ok() && env_->FileExists(fname).ok()) {
  378. // return any error that is not caused by non-existing file
  379. return s;
  380. }
  381. }
  382. if (type == kArchivedLogFile || !s.ok()) {
  383. // check if the file got moved to archive.
  384. std::string archived_file = ArchivedLogFileName(wal_dir_, number);
  385. s = ReadFirstLine(archived_file, number, sequence);
  386. // maybe the file was deleted from archive dir. If that's the case, return
  387. // Status::OK(). The caller with identify this as empty file because
  388. // *sequence == 0
  389. if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
  390. return Status::OK();
  391. }
  392. }
  393. if (s.ok() && *sequence != 0) {
  394. MutexLock l(&read_first_record_cache_mutex_);
  395. read_first_record_cache_.insert({number, *sequence});
  396. }
  397. return s;
  398. }
  399. Status WalManager::GetLiveWalFile(uint64_t number,
  400. std::unique_ptr<WalFile>* log_file) {
  401. if (!log_file) {
  402. return Status::InvalidArgument("log_file not preallocated.");
  403. }
  404. if (!number) {
  405. return Status::PathNotFound("log file not available");
  406. }
  407. Status s;
  408. uint64_t size_bytes;
  409. s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
  410. if (!s.ok()) {
  411. return s;
  412. }
  413. log_file->reset(new WalFileImpl(number, kAliveLogFile,
  414. 0, // SequenceNumber
  415. size_bytes));
  416. return Status::OK();
  417. }
  418. // the function returns status.ok() and sequence == 0 if the file exists, but is
  419. // empty
  420. Status WalManager::ReadFirstLine(const std::string& fname,
  421. const uint64_t number,
  422. SequenceNumber* sequence) {
  423. struct LogReporter : public log::Reader::Reporter {
  424. Env* env;
  425. Logger* info_log;
  426. const char* fname;
  427. Status* status;
  428. bool ignore_error; // true if db_options_.paranoid_checks==false
  429. void Corruption(size_t bytes, const Status& s,
  430. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  431. ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
  432. (this->ignore_error ? "(ignoring error) " : ""), fname,
  433. static_cast<int>(bytes), s.ToString().c_str());
  434. if (this->status->ok()) {
  435. // only keep the first error
  436. *this->status = s;
  437. }
  438. }
  439. };
  440. std::unique_ptr<FSSequentialFile> file;
  441. Status status = fs_->NewSequentialFile(
  442. fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
  443. std::unique_ptr<SequentialFileReader> file_reader(
  444. new SequentialFileReader(std::move(file), fname, io_tracer_));
  445. if (!status.ok()) {
  446. return status;
  447. }
  448. LogReporter reporter;
  449. reporter.env = env_;
  450. reporter.info_log = db_options_.info_log.get();
  451. reporter.fname = fname.c_str();
  452. reporter.status = &status;
  453. reporter.ignore_error = !db_options_.paranoid_checks;
  454. log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
  455. true /*checksum*/, number);
  456. std::string scratch;
  457. Slice record;
  458. if (reader.ReadRecord(&record, &scratch) &&
  459. (status.ok() || !db_options_.paranoid_checks)) {
  460. if (record.size() < WriteBatchInternal::kHeader) {
  461. reporter.Corruption(record.size(),
  462. Status::Corruption("log record too small"));
  463. // TODO read record's till the first no corrupt entry?
  464. } else {
  465. WriteBatch batch;
  466. // We can overwrite an existing non-OK Status since it'd only reach here
  467. // with `paranoid_checks == false`.
  468. status = WriteBatchInternal::SetContents(&batch, record);
  469. if (status.ok()) {
  470. *sequence = WriteBatchInternal::Sequence(&batch);
  471. return status;
  472. }
  473. }
  474. }
  475. if (status.ok() && reader.IsCompressedAndEmptyFile()) {
  476. // In case of wal_compression, it writes a `kSetCompressionType` record
  477. // which is not associated with any sequence number. As result for an empty
  478. // file, GetSortedWalsOfType() will skip these WALs causing the operations
  479. // to fail.
  480. // Therefore, in order to avoid that failure, it sets sequence_number to 1
  481. // indicating those WALs should be included.
  482. *sequence = 1;
  483. } else {
  484. // ReadRecord might have returned false on EOF, which means that the log
  485. // file is empty. Or, a failure may have occurred while processing the first
  486. // entry. In any case, return status and set sequence number to 0.
  487. *sequence = 0;
  488. }
  489. return status;
  490. }
  491. } // namespace ROCKSDB_NAMESPACE