delete_scheduler.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "file/delete_scheduler.h"
  6. #include <cinttypes>
  7. #include <thread>
  8. #include <vector>
  9. #include "file/sst_file_manager_impl.h"
  10. #include "logging/logging.h"
  11. #include "port/port.h"
  12. #include "rocksdb/env.h"
  13. #include "rocksdb/file_system.h"
  14. #include "rocksdb/system_clock.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/mutexlock.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
  19. int64_t rate_bytes_per_sec, Logger* info_log,
  20. SstFileManagerImpl* sst_file_manager,
  21. double max_trash_db_ratio,
  22. uint64_t bytes_max_delete_chunk)
  23. : clock_(clock),
  24. fs_(fs),
  25. total_trash_size_(0),
  26. rate_bytes_per_sec_(rate_bytes_per_sec),
  27. pending_files_(0),
  28. next_trash_bucket_(0),
  29. bytes_max_delete_chunk_(bytes_max_delete_chunk),
  30. closing_(false),
  31. cv_(&mu_),
  32. bg_thread_(nullptr),
  33. info_log_(info_log),
  34. sst_file_manager_(sst_file_manager),
  35. max_trash_db_ratio_(max_trash_db_ratio) {
  36. assert(sst_file_manager != nullptr);
  37. assert(max_trash_db_ratio >= 0);
  38. MaybeCreateBackgroundThread();
  39. }
  40. DeleteScheduler::~DeleteScheduler() {
  41. {
  42. InstrumentedMutexLock l(&mu_);
  43. closing_ = true;
  44. cv_.SignalAll();
  45. }
  46. if (bg_thread_) {
  47. bg_thread_->join();
  48. }
  49. for (const auto& it : bg_errors_) {
  50. it.second.PermitUncheckedError();
  51. }
  52. }
  53. Status DeleteScheduler::DeleteFile(const std::string& file_path,
  54. const std::string& dir_to_sync,
  55. const bool force_bg) {
  56. uint64_t total_size = sst_file_manager_->GetTotalSize();
  57. if (rate_bytes_per_sec_.load() <= 0 ||
  58. (!force_bg &&
  59. total_trash_size_.load() > total_size * max_trash_db_ratio_.load())) {
  60. // Rate limiting is disabled or trash size makes up more than
  61. // max_trash_db_ratio_ (default 25%) of the total DB size
  62. Status s = DeleteFileImmediately(file_path, /*accounted=*/true);
  63. if (s.ok()) {
  64. ROCKS_LOG_INFO(info_log_,
  65. "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
  66. ", total_trash_size %" PRIu64 ", total_size %" PRIi64
  67. ", max_trash_db_ratio %lf",
  68. file_path.c_str(), rate_bytes_per_sec_.load(),
  69. total_trash_size_.load(), total_size,
  70. max_trash_db_ratio_.load());
  71. }
  72. return s;
  73. }
  74. return AddFileToDeletionQueue(file_path, dir_to_sync, /*bucket=*/std::nullopt,
  75. /*accounted=*/true);
  76. }
  77. Status DeleteScheduler::DeleteUnaccountedFile(const std::string& file_path,
  78. const std::string& dir_to_sync,
  79. const bool force_bg,
  80. std::optional<int32_t> bucket) {
  81. uint64_t num_hard_links = 1;
  82. fs_->NumFileLinks(file_path, IOOptions(), &num_hard_links, nullptr)
  83. .PermitUncheckedError();
  84. // We can tolerate rare races where we might immediately delete both links
  85. // to a file.
  86. if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && num_hard_links > 1)) {
  87. Status s = DeleteFileImmediately(file_path, /*accounted=*/false);
  88. if (s.ok()) {
  89. ROCKS_LOG_INFO(info_log_,
  90. "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64,
  91. file_path.c_str(), rate_bytes_per_sec_.load());
  92. }
  93. return s;
  94. }
  95. return AddFileToDeletionQueue(file_path, dir_to_sync, bucket,
  96. /*accounted=*/false);
  97. }
  98. Status DeleteScheduler::DeleteFileImmediately(const std::string& file_path,
  99. bool accounted) {
  100. TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
  101. TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteFile::cb",
  102. const_cast<std::string*>(&file_path));
  103. Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
  104. if (s.ok()) {
  105. s = OnDeleteFile(file_path, accounted);
  106. InstrumentedMutexLock l(&mu_);
  107. RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
  108. }
  109. return s;
  110. }
  111. Status DeleteScheduler::AddFileToDeletionQueue(const std::string& file_path,
  112. const std::string& dir_to_sync,
  113. std::optional<int32_t> bucket,
  114. bool accounted) {
  115. // Move file to trash
  116. std::string trash_file;
  117. Status s = MarkAsTrash(file_path, accounted, &trash_file);
  118. ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
  119. s.ToString().c_str());
  120. if (!s.ok()) {
  121. IGNORE_STATUS_IF_ERROR(s);
  122. ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
  123. file_path.c_str(), s.ToString().c_str());
  124. s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
  125. if (s.ok()) {
  126. s = OnDeleteFile(file_path, accounted);
  127. ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
  128. trash_file.c_str());
  129. InstrumentedMutexLock l(&mu_);
  130. RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
  131. }
  132. return s;
  133. }
  134. // Update the total trash size
  135. if (accounted) {
  136. uint64_t trash_file_size = 0;
  137. IOStatus io_s =
  138. fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
  139. if (io_s.ok()) {
  140. total_trash_size_.fetch_add(trash_file_size);
  141. }
  142. IGNORE_STATUS_IF_ERROR(s);
  143. }
  144. //**TODO: What should we do if we failed to
  145. // get the file size?
  146. // Add file to delete queue
  147. {
  148. InstrumentedMutexLock l(&mu_);
  149. RecordTick(stats_.get(), FILES_MARKED_TRASH);
  150. queue_.emplace(trash_file, dir_to_sync, accounted, bucket);
  151. pending_files_++;
  152. if (bucket.has_value()) {
  153. auto iter = pending_files_in_buckets_.find(bucket.value());
  154. assert(iter != pending_files_in_buckets_.end());
  155. if (iter != pending_files_in_buckets_.end()) {
  156. iter->second++;
  157. }
  158. }
  159. if (pending_files_ == 1) {
  160. cv_.SignalAll();
  161. }
  162. }
  163. return s;
  164. }
  165. std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
  166. InstrumentedMutexLock l(&mu_);
  167. return bg_errors_;
  168. }
  169. const std::string DeleteScheduler::kTrashExtension = ".trash";
  170. bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
  171. return (file_path.size() >= kTrashExtension.size() &&
  172. file_path.rfind(kTrashExtension) ==
  173. file_path.size() - kTrashExtension.size());
  174. }
  175. Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
  176. const std::string& path) {
  177. Status s;
  178. // Check if there are any files marked as trash in this path
  179. std::vector<std::string> files_in_path;
  180. const auto& fs = env->GetFileSystem();
  181. IOOptions io_opts;
  182. io_opts.do_not_recurse = true;
  183. s = fs->GetChildren(path, io_opts, &files_in_path,
  184. /*IODebugContext*=*/nullptr);
  185. if (!s.ok()) {
  186. return s;
  187. }
  188. for (const std::string& current_file : files_in_path) {
  189. if (!DeleteScheduler::IsTrashFile(current_file)) {
  190. // not a trash file, skip
  191. continue;
  192. }
  193. Status file_delete;
  194. std::string trash_file = path + "/" + current_file;
  195. if (sfm) {
  196. // We have an SstFileManager that will schedule the file delete
  197. s = sfm->OnAddFile(trash_file);
  198. file_delete = sfm->ScheduleFileDeletion(trash_file, path);
  199. } else {
  200. // Delete the file immediately
  201. file_delete = env->DeleteFile(trash_file);
  202. }
  203. if (s.ok() && !file_delete.ok()) {
  204. s = file_delete;
  205. }
  206. }
  207. return s;
  208. }
  209. Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
  210. bool accounted, std::string* trash_file) {
  211. // Sanity check of the path
  212. size_t idx = file_path.rfind('/');
  213. if (idx == std::string::npos || idx == file_path.size() - 1) {
  214. return Status::InvalidArgument("file_path is corrupted");
  215. }
  216. if (DeleteScheduler::IsTrashFile(file_path)) {
  217. // This is already a trash file
  218. *trash_file = file_path;
  219. return Status::OK();
  220. }
  221. *trash_file = file_path + kTrashExtension;
  222. // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
  223. // file_move_mu mutex.
  224. int cnt = 0;
  225. Status s;
  226. InstrumentedMutexLock l(&file_move_mu_);
  227. while (true) {
  228. s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
  229. if (s.IsNotFound()) {
  230. // We found a path for our file in trash
  231. s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
  232. break;
  233. } else if (s.ok()) {
  234. // Name conflict, generate new random suffix
  235. *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
  236. } else {
  237. // Error during FileExists call, we cannot continue
  238. break;
  239. }
  240. cnt++;
  241. }
  242. if (s.ok() && accounted) {
  243. s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
  244. }
  245. return s;
  246. }
  247. void DeleteScheduler::BackgroundEmptyTrash() {
  248. TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
  249. while (true) {
  250. InstrumentedMutexLock l(&mu_);
  251. while (queue_.empty() && !closing_) {
  252. cv_.Wait();
  253. }
  254. if (closing_) {
  255. return;
  256. }
  257. // Delete all files in queue_
  258. uint64_t start_time = clock_->NowMicros();
  259. uint64_t total_deleted_bytes = 0;
  260. int64_t current_delete_rate = rate_bytes_per_sec_.load();
  261. while (!queue_.empty() && !closing_) {
  262. // Satisfy static analysis.
  263. std::optional<int32_t> bucket = std::nullopt;
  264. if (current_delete_rate != rate_bytes_per_sec_.load()) {
  265. // User changed the delete rate
  266. current_delete_rate = rate_bytes_per_sec_.load();
  267. start_time = clock_->NowMicros();
  268. total_deleted_bytes = 0;
  269. ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
  270. current_delete_rate);
  271. }
  272. // Get new file to delete
  273. const FileAndDir& fad = queue_.front();
  274. std::string path_in_trash = fad.fname;
  275. std::string dir_to_sync = fad.dir;
  276. bool accounted = fad.accounted;
  277. bucket = fad.bucket;
  278. // We don't need to hold the lock while deleting the file
  279. mu_.Unlock();
  280. uint64_t deleted_bytes = 0;
  281. bool is_complete = true;
  282. // Delete file from trash and update total_penlty value
  283. Status s = DeleteTrashFile(path_in_trash, dir_to_sync, accounted,
  284. &deleted_bytes, &is_complete);
  285. total_deleted_bytes += deleted_bytes;
  286. mu_.Lock();
  287. if (is_complete) {
  288. RecordTick(stats_.get(), FILES_DELETED_FROM_TRASH_QUEUE);
  289. queue_.pop();
  290. }
  291. if (!s.ok()) {
  292. bg_errors_[path_in_trash] = s;
  293. }
  294. // Apply penalty if necessary
  295. uint64_t total_penalty;
  296. if (current_delete_rate > 0) {
  297. // rate limiting is enabled
  298. total_penalty =
  299. ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
  300. ROCKS_LOG_INFO(info_log_,
  301. "Rate limiting is enabled with penalty %" PRIu64
  302. " after deleting file %s",
  303. total_penalty, path_in_trash.c_str());
  304. while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) {
  305. }
  306. } else {
  307. // rate limiting is disabled
  308. total_penalty = 0;
  309. ROCKS_LOG_INFO(info_log_,
  310. "Rate limiting is disabled after deleting file %s",
  311. path_in_trash.c_str());
  312. }
  313. TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
  314. &total_penalty);
  315. int32_t pending_files_in_bucket = std::numeric_limits<int32_t>::max();
  316. if (is_complete) {
  317. pending_files_--;
  318. if (bucket.has_value()) {
  319. auto iter = pending_files_in_buckets_.find(bucket.value());
  320. assert(iter != pending_files_in_buckets_.end());
  321. if (iter != pending_files_in_buckets_.end()) {
  322. pending_files_in_bucket = iter->second--;
  323. }
  324. }
  325. }
  326. if (pending_files_ == 0 || pending_files_in_bucket == 0) {
  327. // Unblock WaitForEmptyTrash or WaitForEmptyTrashBucket since there are
  328. // no more files waiting to be deleted
  329. cv_.SignalAll();
  330. }
  331. }
  332. }
  333. }
  334. Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
  335. const std::string& dir_to_sync,
  336. bool accounted, uint64_t* deleted_bytes,
  337. bool* is_complete) {
  338. uint64_t file_size;
  339. Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
  340. *is_complete = true;
  341. TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
  342. TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteTrashFile::cb",
  343. const_cast<std::string*>(&path_in_trash));
  344. if (s.ok()) {
  345. bool need_full_delete = true;
  346. if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
  347. uint64_t num_hard_links = 2;
  348. // We don't have to worry aobut data race between linking a new
  349. // file after the number of file link check and ftruncte because
  350. // the file is now in trash and no hardlink is supposed to create
  351. // to trash files by RocksDB.
  352. Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
  353. &num_hard_links, nullptr);
  354. if (my_status.ok()) {
  355. if (num_hard_links == 1) {
  356. std::unique_ptr<FSWritableFile> wf;
  357. my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), &wf,
  358. nullptr);
  359. if (my_status.ok()) {
  360. my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
  361. IOOptions(), nullptr);
  362. if (my_status.ok()) {
  363. TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
  364. my_status = wf->Fsync(IOOptions(), nullptr);
  365. }
  366. }
  367. if (my_status.ok()) {
  368. *deleted_bytes = bytes_max_delete_chunk_;
  369. need_full_delete = false;
  370. *is_complete = false;
  371. } else {
  372. ROCKS_LOG_WARN(info_log_,
  373. "Failed to partially delete %s from trash -- %s",
  374. path_in_trash.c_str(), my_status.ToString().c_str());
  375. }
  376. } else {
  377. ROCKS_LOG_INFO(info_log_,
  378. "Cannot delete %s slowly through ftruncate from trash "
  379. "as it has other links",
  380. path_in_trash.c_str());
  381. }
  382. } else if (!num_link_error_printed_) {
  383. ROCKS_LOG_INFO(
  384. info_log_,
  385. "Cannot delete files slowly through ftruncate from trash "
  386. "as Env::NumFileLinks() returns error: %s",
  387. my_status.ToString().c_str());
  388. num_link_error_printed_ = true;
  389. }
  390. }
  391. if (need_full_delete) {
  392. s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
  393. if (!dir_to_sync.empty()) {
  394. std::unique_ptr<FSDirectory> dir_obj;
  395. if (s.ok()) {
  396. s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
  397. }
  398. if (s.ok()) {
  399. s = dir_obj->FsyncWithDirOptions(
  400. IOOptions(), nullptr,
  401. DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
  402. TEST_SYNC_POINT_CALLBACK(
  403. "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
  404. static_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
  405. }
  406. }
  407. if (s.ok()) {
  408. *deleted_bytes = file_size;
  409. s = OnDeleteFile(path_in_trash, accounted);
  410. }
  411. }
  412. }
  413. if (!s.ok()) {
  414. // Error while getting file size or while deleting
  415. ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
  416. path_in_trash.c_str(), s.ToString().c_str());
  417. *deleted_bytes = 0;
  418. } else {
  419. if (accounted) {
  420. total_trash_size_.fetch_sub(*deleted_bytes);
  421. }
  422. }
  423. return s;
  424. }
  425. Status DeleteScheduler::OnDeleteFile(const std::string& file_path,
  426. bool accounted) {
  427. if (accounted) {
  428. return sst_file_manager_->OnDeleteFile(file_path);
  429. }
  430. TEST_SYNC_POINT_CALLBACK("DeleteScheduler::OnDeleteFile",
  431. const_cast<std::string*>(&file_path));
  432. return Status::OK();
  433. }
  434. void DeleteScheduler::WaitForEmptyTrash() {
  435. InstrumentedMutexLock l(&mu_);
  436. while (pending_files_ > 0 && !closing_) {
  437. cv_.Wait();
  438. }
  439. }
  440. std::optional<int32_t> DeleteScheduler::NewTrashBucket() {
  441. if (rate_bytes_per_sec_.load() <= 0) {
  442. return std::nullopt;
  443. }
  444. InstrumentedMutexLock l(&mu_);
  445. int32_t bucket_number = next_trash_bucket_++;
  446. pending_files_in_buckets_.emplace(bucket_number, 0);
  447. return bucket_number;
  448. }
  449. void DeleteScheduler::WaitForEmptyTrashBucket(int32_t bucket) {
  450. InstrumentedMutexLock l(&mu_);
  451. if (bucket >= next_trash_bucket_) {
  452. return;
  453. }
  454. auto iter = pending_files_in_buckets_.find(bucket);
  455. while (iter != pending_files_in_buckets_.end() && iter->second > 0 &&
  456. !closing_) {
  457. cv_.Wait();
  458. iter = pending_files_in_buckets_.find(bucket);
  459. }
  460. pending_files_in_buckets_.erase(bucket);
  461. }
  462. void DeleteScheduler::MaybeCreateBackgroundThread() {
  463. if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
  464. bg_thread_.reset(
  465. new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
  466. ROCKS_LOG_INFO(info_log_,
  467. "Created background thread for deletion scheduler with "
  468. "rate_bytes_per_sec: %" PRIi64,
  469. rate_bytes_per_sec_.load());
  470. }
  471. }
  472. } // namespace ROCKSDB_NAMESPACE