db_impl_files.cc 24 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <cinttypes>
  11. #include <set>
  12. #include <unordered_set>
  13. #include "db/event_helpers.h"
  14. #include "db/memtable_list.h"
  15. #include "file/file_util.h"
  16. #include "file/sst_file_manager_impl.h"
  17. #include "util/autovector.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. uint64_t DBImpl::MinLogNumberToKeep() {
  20. if (allow_2pc()) {
  21. return versions_->min_log_number_to_keep_2pc();
  22. } else {
  23. return versions_->MinLogNumberWithUnflushedData();
  24. }
  25. }
  26. uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
  27. mutex_.AssertHeld();
  28. if (!pending_outputs_.empty()) {
  29. return *pending_outputs_.begin();
  30. }
  31. return std::numeric_limits<uint64_t>::max();
  32. }
  33. // * Returns the list of live files in 'sst_live'
  34. // If it's doing full scan:
  35. // * Returns the list of all files in the filesystem in
  36. // 'full_scan_candidate_files'.
  37. // Otherwise, gets obsolete files from VersionSet.
  38. // no_full_scan = true -- never do the full scan using GetChildren()
  39. // force = false -- don't force the full scan, except every
  40. // mutable_db_options_.delete_obsolete_files_period_micros
  41. // force = true -- force the full scan
  42. void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
  43. bool no_full_scan) {
  44. mutex_.AssertHeld();
  45. // if deletion is disabled, do nothing
  46. if (disable_delete_obsolete_files_ > 0) {
  47. return;
  48. }
  49. bool doing_the_full_scan = false;
  50. // logic for figuring out if we're doing the full scan
  51. if (no_full_scan) {
  52. doing_the_full_scan = false;
  53. } else if (force ||
  54. mutable_db_options_.delete_obsolete_files_period_micros == 0) {
  55. doing_the_full_scan = true;
  56. } else {
  57. const uint64_t now_micros = env_->NowMicros();
  58. if ((delete_obsolete_files_last_run_ +
  59. mutable_db_options_.delete_obsolete_files_period_micros) <
  60. now_micros) {
  61. doing_the_full_scan = true;
  62. delete_obsolete_files_last_run_ = now_micros;
  63. }
  64. }
  65. // don't delete files that might be currently written to from compaction
  66. // threads
  67. // Since job_context->min_pending_output is set, until file scan finishes,
  68. // mutex_ cannot be released. Otherwise, we might see no min_pending_output
  69. // here but later find newer generated unfinalized files while scanning.
  70. if (!pending_outputs_.empty()) {
  71. job_context->min_pending_output = *pending_outputs_.begin();
  72. } else {
  73. // delete all of them
  74. job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
  75. }
  76. // Get obsolete files. This function will also update the list of
  77. // pending files in VersionSet().
  78. versions_->GetObsoleteFiles(&job_context->sst_delete_files,
  79. &job_context->manifest_delete_files,
  80. job_context->min_pending_output);
  81. // Mark the elements in job_context->sst_delete_files as grabbedForPurge
  82. // so that other threads calling FindObsoleteFiles with full_scan=true
  83. // will not add these files to candidate list for purge.
  84. for (const auto& sst_to_del : job_context->sst_delete_files) {
  85. MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
  86. }
  87. // store the current filenum, lognum, etc
  88. job_context->manifest_file_number = versions_->manifest_file_number();
  89. job_context->pending_manifest_file_number =
  90. versions_->pending_manifest_file_number();
  91. job_context->log_number = MinLogNumberToKeep();
  92. job_context->prev_log_number = versions_->prev_log_number();
  93. versions_->AddLiveFiles(&job_context->sst_live);
  94. if (doing_the_full_scan) {
  95. InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
  96. dbname_);
  97. std::set<std::string> paths;
  98. for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
  99. path_id++) {
  100. paths.insert(immutable_db_options_.db_paths[path_id].path);
  101. }
  102. // Note that if cf_paths is not specified in the ColumnFamilyOptions
  103. // of a particular column family, we use db_paths as the cf_paths
  104. // setting. Hence, there can be multiple duplicates of files from db_paths
  105. // in the following code. The duplicate are removed while identifying
  106. // unique files in PurgeObsoleteFiles.
  107. for (auto cfd : *versions_->GetColumnFamilySet()) {
  108. for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
  109. path_id++) {
  110. auto& path = cfd->ioptions()->cf_paths[path_id].path;
  111. if (paths.find(path) == paths.end()) {
  112. paths.insert(path);
  113. }
  114. }
  115. }
  116. for (auto& path : paths) {
  117. // set of all files in the directory. We'll exclude files that are still
  118. // alive in the subsequent processings.
  119. std::vector<std::string> files;
  120. env_->GetChildren(path, &files); // Ignore errors
  121. for (const std::string& file : files) {
  122. uint64_t number;
  123. FileType type;
  124. // 1. If we cannot parse the file name, we skip;
  125. // 2. If the file with file_number equals number has already been
  126. // grabbed for purge by another compaction job, or it has already been
  127. // schedule for purge, we also skip it if we
  128. // are doing full scan in order to avoid double deletion of the same
  129. // file under race conditions. See
  130. // https://github.com/facebook/rocksdb/issues/3573
  131. if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
  132. !ShouldPurge(number)) {
  133. continue;
  134. }
  135. // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
  136. job_context->full_scan_candidate_files.emplace_back("/" + file, path);
  137. }
  138. }
  139. // Add log files in wal_dir
  140. if (immutable_db_options_.wal_dir != dbname_) {
  141. std::vector<std::string> log_files;
  142. env_->GetChildren(immutable_db_options_.wal_dir,
  143. &log_files); // Ignore errors
  144. for (const std::string& log_file : log_files) {
  145. job_context->full_scan_candidate_files.emplace_back(
  146. log_file, immutable_db_options_.wal_dir);
  147. }
  148. }
  149. // Add info log files in db_log_dir
  150. if (!immutable_db_options_.db_log_dir.empty() &&
  151. immutable_db_options_.db_log_dir != dbname_) {
  152. std::vector<std::string> info_log_files;
  153. // Ignore errors
  154. env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
  155. for (std::string& log_file : info_log_files) {
  156. job_context->full_scan_candidate_files.emplace_back(
  157. log_file, immutable_db_options_.db_log_dir);
  158. }
  159. }
  160. }
  161. // logs_ is empty when called during recovery, in which case there can't yet
  162. // be any tracked obsolete logs
  163. if (!alive_log_files_.empty() && !logs_.empty()) {
  164. uint64_t min_log_number = job_context->log_number;
  165. size_t num_alive_log_files = alive_log_files_.size();
  166. // find newly obsoleted log files
  167. while (alive_log_files_.begin()->number < min_log_number) {
  168. auto& earliest = *alive_log_files_.begin();
  169. if (immutable_db_options_.recycle_log_file_num >
  170. log_recycle_files_.size()) {
  171. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  172. "adding log %" PRIu64 " to recycle list\n",
  173. earliest.number);
  174. log_recycle_files_.push_back(earliest.number);
  175. } else {
  176. job_context->log_delete_files.push_back(earliest.number);
  177. }
  178. if (job_context->size_log_to_delete == 0) {
  179. job_context->prev_total_log_size = total_log_size_;
  180. job_context->num_alive_log_files = num_alive_log_files;
  181. }
  182. job_context->size_log_to_delete += earliest.size;
  183. total_log_size_ -= earliest.size;
  184. if (two_write_queues_) {
  185. log_write_mutex_.Lock();
  186. }
  187. alive_log_files_.pop_front();
  188. if (two_write_queues_) {
  189. log_write_mutex_.Unlock();
  190. }
  191. // Current log should always stay alive since it can't have
  192. // number < MinLogNumber().
  193. assert(alive_log_files_.size());
  194. }
  195. while (!logs_.empty() && logs_.front().number < min_log_number) {
  196. auto& log = logs_.front();
  197. if (log.getting_synced) {
  198. log_sync_cv_.Wait();
  199. // logs_ could have changed while we were waiting.
  200. continue;
  201. }
  202. logs_to_free_.push_back(log.ReleaseWriter());
  203. {
  204. InstrumentedMutexLock wl(&log_write_mutex_);
  205. logs_.pop_front();
  206. }
  207. }
  208. // Current log cannot be obsolete.
  209. assert(!logs_.empty());
  210. }
  211. // We're just cleaning up for DB::Write().
  212. assert(job_context->logs_to_free.empty());
  213. job_context->logs_to_free = logs_to_free_;
  214. job_context->log_recycle_files.assign(log_recycle_files_.begin(),
  215. log_recycle_files_.end());
  216. if (job_context->HaveSomethingToDelete()) {
  217. ++pending_purge_obsolete_files_;
  218. }
  219. logs_to_free_.clear();
  220. }
  221. namespace {
  222. bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
  223. const JobContext::CandidateFileInfo& second) {
  224. if (first.file_name > second.file_name) {
  225. return true;
  226. } else if (first.file_name < second.file_name) {
  227. return false;
  228. } else {
  229. return (first.file_path > second.file_path);
  230. }
  231. }
  232. }; // namespace
  233. // Delete obsolete files and log status and information of file deletion
  234. void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
  235. const std::string& path_to_sync,
  236. FileType type, uint64_t number) {
  237. Status file_deletion_status;
  238. if (type == kTableFile || type == kLogFile) {
  239. file_deletion_status =
  240. DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
  241. /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
  242. } else {
  243. file_deletion_status = env_->DeleteFile(fname);
  244. }
  245. TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
  246. &file_deletion_status);
  247. if (file_deletion_status.ok()) {
  248. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  249. "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
  250. fname.c_str(), type, number,
  251. file_deletion_status.ToString().c_str());
  252. } else if (env_->FileExists(fname).IsNotFound()) {
  253. ROCKS_LOG_INFO(
  254. immutable_db_options_.info_log,
  255. "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
  256. " -- %s\n",
  257. job_id, fname.c_str(), type, number,
  258. file_deletion_status.ToString().c_str());
  259. } else {
  260. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  261. "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
  262. job_id, fname.c_str(), type, number,
  263. file_deletion_status.ToString().c_str());
  264. }
  265. if (type == kTableFile) {
  266. EventHelpers::LogAndNotifyTableFileDeletion(
  267. &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
  268. immutable_db_options_.listeners);
  269. }
  270. }
  271. // Diffs the files listed in filenames and those that do not
  272. // belong to live files are possibly removed. Also, removes all the
  273. // files in sst_delete_files and log_delete_files.
  274. // It is not necessary to hold the mutex when invoking this method.
  275. void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
  276. TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
  277. // we'd better have sth to delete
  278. assert(state.HaveSomethingToDelete());
  279. // FindObsoleteFiles() should've populated this so nonzero
  280. assert(state.manifest_file_number != 0);
  281. // Now, convert live list to an unordered map, WITHOUT mutex held;
  282. // set is slow.
  283. std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
  284. for (const FileDescriptor& fd : state.sst_live) {
  285. sst_live_map[fd.GetNumber()] = &fd;
  286. }
  287. std::unordered_set<uint64_t> log_recycle_files_set(
  288. state.log_recycle_files.begin(), state.log_recycle_files.end());
  289. auto candidate_files = state.full_scan_candidate_files;
  290. candidate_files.reserve(
  291. candidate_files.size() + state.sst_delete_files.size() +
  292. state.log_delete_files.size() + state.manifest_delete_files.size());
  293. // We may ignore the dbname when generating the file names.
  294. for (auto& file : state.sst_delete_files) {
  295. candidate_files.emplace_back(
  296. MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
  297. if (file.metadata->table_reader_handle) {
  298. table_cache_->Release(file.metadata->table_reader_handle);
  299. }
  300. file.DeleteMetadata();
  301. }
  302. for (auto file_num : state.log_delete_files) {
  303. if (file_num > 0) {
  304. candidate_files.emplace_back(LogFileName(file_num),
  305. immutable_db_options_.wal_dir);
  306. }
  307. }
  308. for (const auto& filename : state.manifest_delete_files) {
  309. candidate_files.emplace_back(filename, dbname_);
  310. }
  311. // dedup state.candidate_files so we don't try to delete the same
  312. // file twice
  313. std::sort(candidate_files.begin(), candidate_files.end(),
  314. CompareCandidateFile);
  315. candidate_files.erase(
  316. std::unique(candidate_files.begin(), candidate_files.end()),
  317. candidate_files.end());
  318. if (state.prev_total_log_size > 0) {
  319. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  320. "[JOB %d] Try to delete WAL files size %" PRIu64
  321. ", prev total WAL file size %" PRIu64
  322. ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
  323. state.job_id, state.size_log_to_delete,
  324. state.prev_total_log_size, state.num_alive_log_files);
  325. }
  326. std::vector<std::string> old_info_log_files;
  327. InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
  328. dbname_);
  329. // File numbers of most recent two OPTIONS file in candidate_files (found in
  330. // previos FindObsoleteFiles(full_scan=true))
  331. // At this point, there must not be any duplicate file numbers in
  332. // candidate_files.
  333. uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
  334. uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
  335. for (const auto& candidate_file : candidate_files) {
  336. const std::string& fname = candidate_file.file_name;
  337. uint64_t number;
  338. FileType type;
  339. if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
  340. type != kOptionsFile) {
  341. continue;
  342. }
  343. if (number > optsfile_num1) {
  344. optsfile_num2 = optsfile_num1;
  345. optsfile_num1 = number;
  346. } else if (number > optsfile_num2) {
  347. optsfile_num2 = number;
  348. }
  349. }
  350. // Close WALs before trying to delete them.
  351. for (const auto w : state.logs_to_free) {
  352. // TODO: maybe check the return value of Close.
  353. w->Close();
  354. }
  355. bool own_files = OwnTablesAndLogs();
  356. std::unordered_set<uint64_t> files_to_del;
  357. for (const auto& candidate_file : candidate_files) {
  358. const std::string& to_delete = candidate_file.file_name;
  359. uint64_t number;
  360. FileType type;
  361. // Ignore file if we cannot recognize it.
  362. if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
  363. continue;
  364. }
  365. bool keep = true;
  366. switch (type) {
  367. case kLogFile:
  368. keep = ((number >= state.log_number) ||
  369. (number == state.prev_log_number) ||
  370. (log_recycle_files_set.find(number) !=
  371. log_recycle_files_set.end()));
  372. break;
  373. case kDescriptorFile:
  374. // Keep my manifest file, and any newer incarnations'
  375. // (can happen during manifest roll)
  376. keep = (number >= state.manifest_file_number);
  377. break;
  378. case kTableFile:
  379. // If the second condition is not there, this makes
  380. // DontDeletePendingOutputs fail
  381. keep = (sst_live_map.find(number) != sst_live_map.end()) ||
  382. number >= state.min_pending_output;
  383. if (!keep) {
  384. files_to_del.insert(number);
  385. }
  386. break;
  387. case kTempFile:
  388. // Any temp files that are currently being written to must
  389. // be recorded in pending_outputs_, which is inserted into "live".
  390. // Also, SetCurrentFile creates a temp file when writing out new
  391. // manifest, which is equal to state.pending_manifest_file_number. We
  392. // should not delete that file
  393. //
  394. // TODO(yhchiang): carefully modify the third condition to safely
  395. // remove the temp options files.
  396. keep = (sst_live_map.find(number) != sst_live_map.end()) ||
  397. (number == state.pending_manifest_file_number) ||
  398. (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
  399. break;
  400. case kInfoLogFile:
  401. keep = true;
  402. if (number != 0) {
  403. old_info_log_files.push_back(to_delete);
  404. }
  405. break;
  406. case kOptionsFile:
  407. keep = (number >= optsfile_num2);
  408. TEST_SYNC_POINT_CALLBACK(
  409. "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
  410. reinterpret_cast<void*>(&number));
  411. TEST_SYNC_POINT_CALLBACK(
  412. "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
  413. reinterpret_cast<void*>(&keep));
  414. break;
  415. case kCurrentFile:
  416. case kDBLockFile:
  417. case kIdentityFile:
  418. case kMetaDatabase:
  419. case kBlobFile:
  420. keep = true;
  421. break;
  422. }
  423. if (keep) {
  424. continue;
  425. }
  426. std::string fname;
  427. std::string dir_to_sync;
  428. if (type == kTableFile) {
  429. // evict from cache
  430. TableCache::Evict(table_cache_.get(), number);
  431. fname = MakeTableFileName(candidate_file.file_path, number);
  432. dir_to_sync = candidate_file.file_path;
  433. } else {
  434. dir_to_sync =
  435. (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
  436. fname = dir_to_sync +
  437. ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
  438. (!to_delete.empty() && to_delete.front() == '/')
  439. ? ""
  440. : "/") +
  441. to_delete;
  442. }
  443. #ifndef ROCKSDB_LITE
  444. if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
  445. immutable_db_options_.wal_size_limit_mb > 0)) {
  446. wal_manager_.ArchiveWALFile(fname, number);
  447. continue;
  448. }
  449. #endif // !ROCKSDB_LITE
  450. // If I do not own these files, e.g. secondary instance with max_open_files
  451. // = -1, then no need to delete or schedule delete these files since they
  452. // will be removed by their owner, e.g. the primary instance.
  453. if (!own_files) {
  454. continue;
  455. }
  456. Status file_deletion_status;
  457. if (schedule_only) {
  458. InstrumentedMutexLock guard_lock(&mutex_);
  459. SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
  460. } else {
  461. DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
  462. }
  463. }
  464. {
  465. // After purging obsolete files, remove them from files_grabbed_for_purge_.
  466. InstrumentedMutexLock guard_lock(&mutex_);
  467. autovector<uint64_t> to_be_removed;
  468. for (auto fn : files_grabbed_for_purge_) {
  469. if (files_to_del.count(fn) != 0) {
  470. to_be_removed.emplace_back(fn);
  471. }
  472. }
  473. for (auto fn : to_be_removed) {
  474. files_grabbed_for_purge_.erase(fn);
  475. }
  476. }
  477. // Delete old info log files.
  478. size_t old_info_log_file_count = old_info_log_files.size();
  479. if (old_info_log_file_count != 0 &&
  480. old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
  481. std::sort(old_info_log_files.begin(), old_info_log_files.end());
  482. size_t end =
  483. old_info_log_file_count - immutable_db_options_.keep_log_file_num;
  484. for (unsigned int i = 0; i <= end; i++) {
  485. std::string& to_delete = old_info_log_files.at(i);
  486. std::string full_path_to_delete =
  487. (immutable_db_options_.db_log_dir.empty()
  488. ? dbname_
  489. : immutable_db_options_.db_log_dir) +
  490. "/" + to_delete;
  491. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  492. "[JOB %d] Delete info log file %s\n", state.job_id,
  493. full_path_to_delete.c_str());
  494. Status s = env_->DeleteFile(full_path_to_delete);
  495. if (!s.ok()) {
  496. if (env_->FileExists(full_path_to_delete).IsNotFound()) {
  497. ROCKS_LOG_INFO(
  498. immutable_db_options_.info_log,
  499. "[JOB %d] Tried to delete non-existing info log file %s FAILED "
  500. "-- %s\n",
  501. state.job_id, to_delete.c_str(), s.ToString().c_str());
  502. } else {
  503. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  504. "[JOB %d] Delete info log file %s FAILED -- %s\n",
  505. state.job_id, to_delete.c_str(),
  506. s.ToString().c_str());
  507. }
  508. }
  509. }
  510. }
  511. #ifndef ROCKSDB_LITE
  512. wal_manager_.PurgeObsoleteWALFiles();
  513. #endif // ROCKSDB_LITE
  514. LogFlush(immutable_db_options_.info_log);
  515. InstrumentedMutexLock l(&mutex_);
  516. --pending_purge_obsolete_files_;
  517. assert(pending_purge_obsolete_files_ >= 0);
  518. if (pending_purge_obsolete_files_ == 0) {
  519. bg_cv_.SignalAll();
  520. }
  521. TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
  522. }
  523. void DBImpl::DeleteObsoleteFiles() {
  524. mutex_.AssertHeld();
  525. JobContext job_context(next_job_id_.fetch_add(1));
  526. FindObsoleteFiles(&job_context, true);
  527. mutex_.Unlock();
  528. if (job_context.HaveSomethingToDelete()) {
  529. PurgeObsoleteFiles(job_context);
  530. }
  531. job_context.Clean();
  532. mutex_.Lock();
  533. }
  534. uint64_t FindMinPrepLogReferencedByMemTable(
  535. VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
  536. const autovector<MemTable*>& memtables_to_flush) {
  537. uint64_t min_log = 0;
  538. // we must look through the memtables for two phase transactions
  539. // that have been committed but not yet flushed
  540. for (auto loop_cfd : *vset->GetColumnFamilySet()) {
  541. if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
  542. continue;
  543. }
  544. auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
  545. memtables_to_flush);
  546. if (log > 0 && (min_log == 0 || log < min_log)) {
  547. min_log = log;
  548. }
  549. log = loop_cfd->mem()->GetMinLogContainingPrepSection();
  550. if (log > 0 && (min_log == 0 || log < min_log)) {
  551. min_log = log;
  552. }
  553. }
  554. return min_log;
  555. }
  556. uint64_t PrecomputeMinLogNumberToKeep(
  557. VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
  558. autovector<VersionEdit*> edit_list,
  559. const autovector<MemTable*>& memtables_to_flush,
  560. LogsWithPrepTracker* prep_tracker) {
  561. assert(vset != nullptr);
  562. assert(prep_tracker != nullptr);
  563. // Calculate updated min_log_number_to_keep
  564. // Since the function should only be called in 2pc mode, log number in
  565. // the version edit should be sufficient.
  566. // Precompute the min log number containing unflushed data for the column
  567. // family being flushed (`cfd_to_flush`).
  568. uint64_t cf_min_log_number_to_keep = 0;
  569. for (auto& e : edit_list) {
  570. if (e->HasLogNumber()) {
  571. cf_min_log_number_to_keep =
  572. std::max(cf_min_log_number_to_keep, e->GetLogNumber());
  573. }
  574. }
  575. if (cf_min_log_number_to_keep == 0) {
  576. // No version edit contains information on log number. The log number
  577. // for this column family should stay the same as it is.
  578. cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
  579. }
  580. // Get min log number containing unflushed data for other column families.
  581. uint64_t min_log_number_to_keep =
  582. vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
  583. if (cf_min_log_number_to_keep != 0) {
  584. min_log_number_to_keep =
  585. std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
  586. }
  587. // if are 2pc we must consider logs containing prepared
  588. // sections of outstanding transactions.
  589. //
  590. // We must check min logs with outstanding prep before we check
  591. // logs references by memtables because a log referenced by the
  592. // first data structure could transition to the second under us.
  593. //
  594. // TODO: iterating over all column families under db mutex.
  595. // should find more optimal solution
  596. auto min_log_in_prep_heap =
  597. prep_tracker->FindMinLogContainingOutstandingPrep();
  598. if (min_log_in_prep_heap != 0 &&
  599. min_log_in_prep_heap < min_log_number_to_keep) {
  600. min_log_number_to_keep = min_log_in_prep_heap;
  601. }
  602. uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
  603. vset, &cfd_to_flush, memtables_to_flush);
  604. if (min_log_refed_by_mem != 0 &&
  605. min_log_refed_by_mem < min_log_number_to_keep) {
  606. min_log_number_to_keep = min_log_refed_by_mem;
  607. }
  608. return min_log_number_to_keep;
  609. }
  610. } // namespace ROCKSDB_NAMESPACE