wal_manager.cc 17 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/wal_manager.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <memory>
  13. #include <vector>
  14. #include "db/log_reader.h"
  15. #include "db/log_writer.h"
  16. #include "db/transaction_log_impl.h"
  17. #include "db/write_batch_internal.h"
  18. #include "file/file_util.h"
  19. #include "file/filename.h"
  20. #include "file/sequence_file_reader.h"
  21. #include "logging/logging.h"
  22. #include "port/port.h"
  23. #include "rocksdb/env.h"
  24. #include "rocksdb/options.h"
  25. #include "rocksdb/write_batch.h"
  26. #include "test_util/sync_point.h"
  27. #include "util/cast_util.h"
  28. #include "util/coding.h"
  29. #include "util/mutexlock.h"
  30. #include "util/string_util.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. #ifndef ROCKSDB_LITE
  33. Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
  34. auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname);
  35. if (s.ok()) {
  36. MutexLock l(&read_first_record_cache_mutex_);
  37. read_first_record_cache_.erase(number);
  38. }
  39. return s;
  40. }
  41. Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
  42. // First get sorted files in db dir, then get sorted files from archived
  43. // dir, to avoid a race condition where a log file is moved to archived
  44. // dir in between.
  45. Status s;
  46. // list wal files in main db dir.
  47. VectorLogPtr logs;
  48. s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
  49. if (!s.ok()) {
  50. return s;
  51. }
  52. // Reproduce the race condition where a log file is moved
  53. // to archived dir, between these two sync points, used in
  54. // (DBTest,TransactionLogIteratorRace)
  55. TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
  56. TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
  57. files.clear();
  58. // list wal files in archive dir.
  59. std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
  60. Status exists = env_->FileExists(archivedir);
  61. if (exists.ok()) {
  62. s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
  63. if (!s.ok()) {
  64. return s;
  65. }
  66. } else if (!exists.IsNotFound()) {
  67. assert(s.IsIOError());
  68. return s;
  69. }
  70. uint64_t latest_archived_log_number = 0;
  71. if (!files.empty()) {
  72. latest_archived_log_number = files.back()->LogNumber();
  73. ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
  74. latest_archived_log_number);
  75. }
  76. files.reserve(files.size() + logs.size());
  77. for (auto& log : logs) {
  78. if (log->LogNumber() > latest_archived_log_number) {
  79. files.push_back(std::move(log));
  80. } else {
  81. // When the race condition happens, we could see the
  82. // same log in both db dir and archived dir. Simply
  83. // ignore the one in db dir. Note that, if we read
  84. // archived dir first, we would have missed the log file.
  85. ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
  86. log->PathName().c_str());
  87. }
  88. }
  89. return s;
  90. }
  91. Status WalManager::GetUpdatesSince(
  92. SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
  93. const TransactionLogIterator::ReadOptions& read_options,
  94. VersionSet* version_set) {
  95. // Get all sorted Wal Files.
  96. // Do binary search and open files and find the seq number.
  97. std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
  98. Status s = GetSortedWalFiles(*wal_files);
  99. if (!s.ok()) {
  100. return s;
  101. }
  102. s = RetainProbableWalFiles(*wal_files, seq);
  103. if (!s.ok()) {
  104. return s;
  105. }
  106. iter->reset(new TransactionLogIteratorImpl(
  107. db_options_.wal_dir, &db_options_, read_options, file_options_, seq,
  108. std::move(wal_files), version_set, seq_per_batch_));
  109. return (*iter)->status();
  110. }
  111. // 1. Go through all archived files and
  112. // a. if ttl is enabled, delete outdated files
  113. // b. if archive size limit is enabled, delete empty files,
  114. // compute file number and size.
  115. // 2. If size limit is enabled:
  116. // a. compute how many files should be deleted
  117. // b. get sorted non-empty archived logs
  118. // c. delete what should be deleted
  119. void WalManager::PurgeObsoleteWALFiles() {
  120. bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
  121. bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
  122. if (!ttl_enabled && !size_limit_enabled) {
  123. return;
  124. }
  125. int64_t current_time;
  126. Status s = env_->GetCurrentTime(&current_time);
  127. if (!s.ok()) {
  128. ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
  129. s.ToString().c_str());
  130. assert(false);
  131. return;
  132. }
  133. uint64_t const now_seconds = static_cast<uint64_t>(current_time);
  134. uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
  135. ? db_options_.wal_ttl_seconds / 2
  136. : kDefaultIntervalToDeleteObsoleteWAL;
  137. if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
  138. return;
  139. }
  140. purge_wal_files_last_run_ = now_seconds;
  141. std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
  142. std::vector<std::string> files;
  143. s = env_->GetChildren(archival_dir, &files);
  144. if (!s.ok()) {
  145. ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
  146. s.ToString().c_str());
  147. assert(false);
  148. return;
  149. }
  150. size_t log_files_num = 0;
  151. uint64_t log_file_size = 0;
  152. for (auto& f : files) {
  153. uint64_t number;
  154. FileType type;
  155. if (ParseFileName(f, &number, &type) && type == kLogFile) {
  156. std::string const file_path = archival_dir + "/" + f;
  157. if (ttl_enabled) {
  158. uint64_t file_m_time;
  159. s = env_->GetFileModificationTime(file_path, &file_m_time);
  160. if (!s.ok()) {
  161. ROCKS_LOG_WARN(db_options_.info_log,
  162. "Can't get file mod time: %s: %s", file_path.c_str(),
  163. s.ToString().c_str());
  164. continue;
  165. }
  166. if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
  167. s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
  168. /*force_fg=*/!wal_in_db_path_);
  169. if (!s.ok()) {
  170. ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
  171. file_path.c_str(), s.ToString().c_str());
  172. continue;
  173. } else {
  174. MutexLock l(&read_first_record_cache_mutex_);
  175. read_first_record_cache_.erase(number);
  176. }
  177. continue;
  178. }
  179. }
  180. if (size_limit_enabled) {
  181. uint64_t file_size;
  182. s = env_->GetFileSize(file_path, &file_size);
  183. if (!s.ok()) {
  184. ROCKS_LOG_ERROR(db_options_.info_log,
  185. "Unable to get file size: %s: %s", file_path.c_str(),
  186. s.ToString().c_str());
  187. return;
  188. } else {
  189. if (file_size > 0) {
  190. log_file_size = std::max(log_file_size, file_size);
  191. ++log_files_num;
  192. } else {
  193. s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
  194. /*force_fg=*/!wal_in_db_path_);
  195. if (!s.ok()) {
  196. ROCKS_LOG_WARN(db_options_.info_log,
  197. "Unable to delete file: %s: %s", file_path.c_str(),
  198. s.ToString().c_str());
  199. continue;
  200. } else {
  201. MutexLock l(&read_first_record_cache_mutex_);
  202. read_first_record_cache_.erase(number);
  203. }
  204. }
  205. }
  206. }
  207. }
  208. }
  209. if (0 == log_files_num || !size_limit_enabled) {
  210. return;
  211. }
  212. size_t const files_keep_num =
  213. static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size);
  214. if (log_files_num <= files_keep_num) {
  215. return;
  216. }
  217. size_t files_del_num = log_files_num - files_keep_num;
  218. VectorLogPtr archived_logs;
  219. GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
  220. if (files_del_num > archived_logs.size()) {
  221. ROCKS_LOG_WARN(db_options_.info_log,
  222. "Trying to delete more archived log files than "
  223. "exist. Deleting all");
  224. files_del_num = archived_logs.size();
  225. }
  226. for (size_t i = 0; i < files_del_num; ++i) {
  227. std::string const file_path = archived_logs[i]->PathName();
  228. s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
  229. db_options_.wal_dir, false,
  230. /*force_fg=*/!wal_in_db_path_);
  231. if (!s.ok()) {
  232. ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
  233. file_path.c_str(), s.ToString().c_str());
  234. continue;
  235. } else {
  236. MutexLock l(&read_first_record_cache_mutex_);
  237. read_first_record_cache_.erase(archived_logs[i]->LogNumber());
  238. }
  239. }
  240. }
  241. void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
  242. auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
  243. // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  244. TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
  245. Status s = env_->RenameFile(fname, archived_log_name);
  246. // The sync point below is used in (DBTest,TransactionLogIteratorRace)
  247. TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
  248. ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
  249. fname.c_str(), archived_log_name.c_str(),
  250. s.ToString().c_str());
  251. }
  252. Status WalManager::GetSortedWalsOfType(const std::string& path,
  253. VectorLogPtr& log_files,
  254. WalFileType log_type) {
  255. std::vector<std::string> all_files;
  256. const Status status = env_->GetChildren(path, &all_files);
  257. if (!status.ok()) {
  258. return status;
  259. }
  260. log_files.reserve(all_files.size());
  261. for (const auto& f : all_files) {
  262. uint64_t number;
  263. FileType type;
  264. if (ParseFileName(f, &number, &type) && type == kLogFile) {
  265. SequenceNumber sequence;
  266. Status s = ReadFirstRecord(log_type, number, &sequence);
  267. if (!s.ok()) {
  268. return s;
  269. }
  270. if (sequence == 0) {
  271. // empty file
  272. continue;
  273. }
  274. // Reproduce the race condition where a log file is moved
  275. // to archived dir, between these two sync points, used in
  276. // (DBTest,TransactionLogIteratorRace)
  277. TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
  278. TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
  279. uint64_t size_bytes;
  280. s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
  281. // re-try in case the alive log file has been moved to archive.
  282. if (!s.ok() && log_type == kAliveLogFile) {
  283. std::string archived_file = ArchivedLogFileName(path, number);
  284. if (env_->FileExists(archived_file).ok()) {
  285. s = env_->GetFileSize(archived_file, &size_bytes);
  286. if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
  287. // oops, the file just got deleted from archived dir! move on
  288. s = Status::OK();
  289. continue;
  290. }
  291. }
  292. }
  293. if (!s.ok()) {
  294. return s;
  295. }
  296. log_files.push_back(std::unique_ptr<LogFile>(
  297. new LogFileImpl(number, log_type, sequence, size_bytes)));
  298. }
  299. }
  300. std::sort(
  301. log_files.begin(), log_files.end(),
  302. [](const std::unique_ptr<LogFile>& a, const std::unique_ptr<LogFile>& b) {
  303. LogFileImpl* a_impl =
  304. static_cast_with_check<LogFileImpl, LogFile>(a.get());
  305. LogFileImpl* b_impl =
  306. static_cast_with_check<LogFileImpl, LogFile>(b.get());
  307. return *a_impl < *b_impl;
  308. });
  309. return status;
  310. }
  311. Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
  312. const SequenceNumber target) {
  313. int64_t start = 0; // signed to avoid overflow when target is < first file.
  314. int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
  315. // Binary Search. avoid opening all files.
  316. while (end >= start) {
  317. int64_t mid = start + (end - start) / 2; // Avoid overflow.
  318. SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence();
  319. if (current_seq_num == target) {
  320. end = mid;
  321. break;
  322. } else if (current_seq_num < target) {
  323. start = mid + 1;
  324. } else {
  325. end = mid - 1;
  326. }
  327. }
  328. // end could be -ve.
  329. size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
  330. // The last wal file is always included
  331. all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
  332. return Status::OK();
  333. }
  334. Status WalManager::ReadFirstRecord(const WalFileType type,
  335. const uint64_t number,
  336. SequenceNumber* sequence) {
  337. *sequence = 0;
  338. if (type != kAliveLogFile && type != kArchivedLogFile) {
  339. ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
  340. ToString(type).c_str());
  341. return Status::NotSupported(
  342. "File Type Not Known " + ToString(type));
  343. }
  344. {
  345. MutexLock l(&read_first_record_cache_mutex_);
  346. auto itr = read_first_record_cache_.find(number);
  347. if (itr != read_first_record_cache_.end()) {
  348. *sequence = itr->second;
  349. return Status::OK();
  350. }
  351. }
  352. Status s;
  353. if (type == kAliveLogFile) {
  354. std::string fname = LogFileName(db_options_.wal_dir, number);
  355. s = ReadFirstLine(fname, number, sequence);
  356. if (!s.ok() && env_->FileExists(fname).ok()) {
  357. // return any error that is not caused by non-existing file
  358. return s;
  359. }
  360. }
  361. if (type == kArchivedLogFile || !s.ok()) {
  362. // check if the file got moved to archive.
  363. std::string archived_file =
  364. ArchivedLogFileName(db_options_.wal_dir, number);
  365. s = ReadFirstLine(archived_file, number, sequence);
  366. // maybe the file was deleted from archive dir. If that's the case, return
  367. // Status::OK(). The caller with identify this as empty file because
  368. // *sequence == 0
  369. if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
  370. return Status::OK();
  371. }
  372. }
  373. if (s.ok() && *sequence != 0) {
  374. MutexLock l(&read_first_record_cache_mutex_);
  375. read_first_record_cache_.insert({number, *sequence});
  376. }
  377. return s;
  378. }
  379. Status WalManager::GetLiveWalFile(uint64_t number,
  380. std::unique_ptr<LogFile>* log_file) {
  381. if (!log_file) {
  382. return Status::InvalidArgument("log_file not preallocated.");
  383. }
  384. if (!number) {
  385. return Status::PathNotFound("log file not available");
  386. }
  387. Status s;
  388. uint64_t size_bytes;
  389. s = env_->GetFileSize(LogFileName(db_options_.wal_dir, number), &size_bytes);
  390. if (!s.ok()) {
  391. return s;
  392. }
  393. log_file->reset(new LogFileImpl(number, kAliveLogFile,
  394. 0, // SequenceNumber
  395. size_bytes));
  396. return Status::OK();
  397. }
  398. // the function returns status.ok() and sequence == 0 if the file exists, but is
  399. // empty
  400. Status WalManager::ReadFirstLine(const std::string& fname,
  401. const uint64_t number,
  402. SequenceNumber* sequence) {
  403. struct LogReporter : public log::Reader::Reporter {
  404. Env* env;
  405. Logger* info_log;
  406. const char* fname;
  407. Status* status;
  408. bool ignore_error; // true if db_options_.paranoid_checks==false
  409. void Corruption(size_t bytes, const Status& s) override {
  410. ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
  411. (this->ignore_error ? "(ignoring error) " : ""), fname,
  412. static_cast<int>(bytes), s.ToString().c_str());
  413. if (this->status->ok()) {
  414. // only keep the first error
  415. *this->status = s;
  416. }
  417. }
  418. };
  419. std::unique_ptr<FSSequentialFile> file;
  420. Status status = fs_->NewSequentialFile(fname,
  421. fs_->OptimizeForLogRead(file_options_),
  422. &file, nullptr);
  423. std::unique_ptr<SequentialFileReader> file_reader(
  424. new SequentialFileReader(std::move(file), fname));
  425. if (!status.ok()) {
  426. return status;
  427. }
  428. LogReporter reporter;
  429. reporter.env = env_;
  430. reporter.info_log = db_options_.info_log.get();
  431. reporter.fname = fname.c_str();
  432. reporter.status = &status;
  433. reporter.ignore_error = !db_options_.paranoid_checks;
  434. log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
  435. true /*checksum*/, number);
  436. std::string scratch;
  437. Slice record;
  438. if (reader.ReadRecord(&record, &scratch) &&
  439. (status.ok() || !db_options_.paranoid_checks)) {
  440. if (record.size() < WriteBatchInternal::kHeader) {
  441. reporter.Corruption(record.size(),
  442. Status::Corruption("log record too small"));
  443. // TODO read record's till the first no corrupt entry?
  444. } else {
  445. WriteBatch batch;
  446. WriteBatchInternal::SetContents(&batch, record);
  447. *sequence = WriteBatchInternal::Sequence(&batch);
  448. return Status::OK();
  449. }
  450. }
  451. // ReadRecord returns false on EOF, which means that the log file is empty. we
  452. // return status.ok() in that case and set sequence number to 0
  453. *sequence = 0;
  454. return status;
  455. }
  456. #endif // ROCKSDB_LITE
  457. } // namespace ROCKSDB_NAMESPACE