delete_scheduler.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "file/delete_scheduler.h"
  7. #include <thread>
  8. #include <vector>
  9. #include "file/sst_file_manager_impl.h"
  10. #include "logging/logging.h"
  11. #include "port/port.h"
  12. #include "rocksdb/env.h"
  13. #include "test_util/sync_point.h"
  14. #include "util/mutexlock.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. DeleteScheduler::DeleteScheduler(Env* env, FileSystem* fs,
  17. int64_t rate_bytes_per_sec, Logger* info_log,
  18. SstFileManagerImpl* sst_file_manager,
  19. double max_trash_db_ratio,
  20. uint64_t bytes_max_delete_chunk)
  21. : env_(env),
  22. fs_(fs),
  23. total_trash_size_(0),
  24. rate_bytes_per_sec_(rate_bytes_per_sec),
  25. pending_files_(0),
  26. bytes_max_delete_chunk_(bytes_max_delete_chunk),
  27. closing_(false),
  28. cv_(&mu_),
  29. info_log_(info_log),
  30. sst_file_manager_(sst_file_manager),
  31. max_trash_db_ratio_(max_trash_db_ratio) {
  32. assert(sst_file_manager != nullptr);
  33. assert(max_trash_db_ratio >= 0);
  34. bg_thread_.reset(
  35. new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
  36. }
  37. DeleteScheduler::~DeleteScheduler() {
  38. {
  39. InstrumentedMutexLock l(&mu_);
  40. closing_ = true;
  41. cv_.SignalAll();
  42. }
  43. if (bg_thread_) {
  44. bg_thread_->join();
  45. }
  46. }
  47. Status DeleteScheduler::DeleteFile(const std::string& file_path,
  48. const std::string& dir_to_sync,
  49. const bool force_bg) {
  50. Status s;
  51. if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&
  52. total_trash_size_.load() >
  53. sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
  54. // Rate limiting is disabled or trash size makes up more than
  55. // max_trash_db_ratio_ (default 25%) of the total DB size
  56. TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
  57. s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
  58. if (s.ok()) {
  59. sst_file_manager_->OnDeleteFile(file_path);
  60. }
  61. return s;
  62. }
  63. // Move file to trash
  64. std::string trash_file;
  65. s = MarkAsTrash(file_path, &trash_file);
  66. if (!s.ok()) {
  67. ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
  68. file_path.c_str(), s.ToString().c_str());
  69. s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
  70. if (s.ok()) {
  71. sst_file_manager_->OnDeleteFile(file_path);
  72. }
  73. return s;
  74. }
  75. // Update the total trash size
  76. uint64_t trash_file_size = 0;
  77. fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
  78. total_trash_size_.fetch_add(trash_file_size);
  79. // Add file to delete queue
  80. {
  81. InstrumentedMutexLock l(&mu_);
  82. queue_.emplace(trash_file, dir_to_sync);
  83. pending_files_++;
  84. if (pending_files_ == 1) {
  85. cv_.SignalAll();
  86. }
  87. }
  88. return s;
  89. }
  90. std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
  91. InstrumentedMutexLock l(&mu_);
  92. return bg_errors_;
  93. }
  94. const std::string DeleteScheduler::kTrashExtension = ".trash";
  95. bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
  96. return (file_path.size() >= kTrashExtension.size() &&
  97. file_path.rfind(kTrashExtension) ==
  98. file_path.size() - kTrashExtension.size());
  99. }
  100. Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
  101. const std::string& path) {
  102. Status s;
  103. // Check if there are any files marked as trash in this path
  104. std::vector<std::string> files_in_path;
  105. s = env->GetChildren(path, &files_in_path);
  106. if (!s.ok()) {
  107. return s;
  108. }
  109. for (const std::string& current_file : files_in_path) {
  110. if (!DeleteScheduler::IsTrashFile(current_file)) {
  111. // not a trash file, skip
  112. continue;
  113. }
  114. Status file_delete;
  115. std::string trash_file = path + "/" + current_file;
  116. if (sfm) {
  117. // We have an SstFileManager that will schedule the file delete
  118. sfm->OnAddFile(trash_file);
  119. file_delete = sfm->ScheduleFileDeletion(trash_file, path);
  120. } else {
  121. // Delete the file immediately
  122. file_delete = env->DeleteFile(trash_file);
  123. }
  124. if (s.ok() && !file_delete.ok()) {
  125. s = file_delete;
  126. }
  127. }
  128. return s;
  129. }
  130. Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
  131. std::string* trash_file) {
  132. // Sanity check of the path
  133. size_t idx = file_path.rfind("/");
  134. if (idx == std::string::npos || idx == file_path.size() - 1) {
  135. return Status::InvalidArgument("file_path is corrupted");
  136. }
  137. Status s;
  138. if (DeleteScheduler::IsTrashFile(file_path)) {
  139. // This is already a trash file
  140. *trash_file = file_path;
  141. return s;
  142. }
  143. *trash_file = file_path + kTrashExtension;
  144. // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
  145. // file_move_mu mutex.
  146. int cnt = 0;
  147. InstrumentedMutexLock l(&file_move_mu_);
  148. while (true) {
  149. s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
  150. if (s.IsNotFound()) {
  151. // We found a path for our file in trash
  152. s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
  153. break;
  154. } else if (s.ok()) {
  155. // Name conflict, generate new random suffix
  156. *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
  157. } else {
  158. // Error during FileExists call, we cannot continue
  159. break;
  160. }
  161. cnt++;
  162. }
  163. if (s.ok()) {
  164. sst_file_manager_->OnMoveFile(file_path, *trash_file);
  165. }
  166. return s;
  167. }
  168. void DeleteScheduler::BackgroundEmptyTrash() {
  169. TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
  170. while (true) {
  171. InstrumentedMutexLock l(&mu_);
  172. while (queue_.empty() && !closing_) {
  173. cv_.Wait();
  174. }
  175. if (closing_) {
  176. return;
  177. }
  178. // Delete all files in queue_
  179. uint64_t start_time = env_->NowMicros();
  180. uint64_t total_deleted_bytes = 0;
  181. int64_t current_delete_rate = rate_bytes_per_sec_.load();
  182. while (!queue_.empty() && !closing_) {
  183. if (current_delete_rate != rate_bytes_per_sec_.load()) {
  184. // User changed the delete rate
  185. current_delete_rate = rate_bytes_per_sec_.load();
  186. start_time = env_->NowMicros();
  187. total_deleted_bytes = 0;
  188. }
  189. // Get new file to delete
  190. const FileAndDir& fad = queue_.front();
  191. std::string path_in_trash = fad.fname;
  192. // We dont need to hold the lock while deleting the file
  193. mu_.Unlock();
  194. uint64_t deleted_bytes = 0;
  195. bool is_complete = true;
  196. // Delete file from trash and update total_penlty value
  197. Status s =
  198. DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
  199. total_deleted_bytes += deleted_bytes;
  200. mu_.Lock();
  201. if (is_complete) {
  202. queue_.pop();
  203. }
  204. if (!s.ok()) {
  205. bg_errors_[path_in_trash] = s;
  206. }
  207. // Apply penlty if necessary
  208. uint64_t total_penlty;
  209. if (current_delete_rate > 0) {
  210. // rate limiting is enabled
  211. total_penlty =
  212. ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
  213. while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
  214. } else {
  215. // rate limiting is disabled
  216. total_penlty = 0;
  217. }
  218. TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
  219. &total_penlty);
  220. if (is_complete) {
  221. pending_files_--;
  222. }
  223. if (pending_files_ == 0) {
  224. // Unblock WaitForEmptyTrash since there are no more files waiting
  225. // to be deleted
  226. cv_.SignalAll();
  227. }
  228. }
  229. }
  230. }
  231. Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
  232. const std::string& dir_to_sync,
  233. uint64_t* deleted_bytes,
  234. bool* is_complete) {
  235. uint64_t file_size;
  236. Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
  237. *is_complete = true;
  238. TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
  239. if (s.ok()) {
  240. bool need_full_delete = true;
  241. if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
  242. uint64_t num_hard_links = 2;
  243. // We don't have to worry aobut data race between linking a new
  244. // file after the number of file link check and ftruncte because
  245. // the file is now in trash and no hardlink is supposed to create
  246. // to trash files by RocksDB.
  247. Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
  248. &num_hard_links, nullptr);
  249. if (my_status.ok()) {
  250. if (num_hard_links == 1) {
  251. std::unique_ptr<FSWritableFile> wf;
  252. my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),
  253. &wf, nullptr);
  254. if (my_status.ok()) {
  255. my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
  256. IOOptions(), nullptr);
  257. if (my_status.ok()) {
  258. TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
  259. my_status = wf->Fsync(IOOptions(), nullptr);
  260. }
  261. }
  262. if (my_status.ok()) {
  263. *deleted_bytes = bytes_max_delete_chunk_;
  264. need_full_delete = false;
  265. *is_complete = false;
  266. } else {
  267. ROCKS_LOG_WARN(info_log_,
  268. "Failed to partially delete %s from trash -- %s",
  269. path_in_trash.c_str(), my_status.ToString().c_str());
  270. }
  271. } else {
  272. ROCKS_LOG_INFO(info_log_,
  273. "Cannot delete %s slowly through ftruncate from trash "
  274. "as it has other links",
  275. path_in_trash.c_str());
  276. }
  277. } else if (!num_link_error_printed_) {
  278. ROCKS_LOG_INFO(
  279. info_log_,
  280. "Cannot delete files slowly through ftruncate from trash "
  281. "as Env::NumFileLinks() returns error: %s",
  282. my_status.ToString().c_str());
  283. num_link_error_printed_ = true;
  284. }
  285. }
  286. if (need_full_delete) {
  287. s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
  288. if (!dir_to_sync.empty()) {
  289. std::unique_ptr<FSDirectory> dir_obj;
  290. if (s.ok()) {
  291. s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
  292. }
  293. if (s.ok()) {
  294. s = dir_obj->Fsync(IOOptions(), nullptr);
  295. TEST_SYNC_POINT_CALLBACK(
  296. "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
  297. reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
  298. }
  299. }
  300. *deleted_bytes = file_size;
  301. sst_file_manager_->OnDeleteFile(path_in_trash);
  302. }
  303. }
  304. if (!s.ok()) {
  305. // Error while getting file size or while deleting
  306. ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
  307. path_in_trash.c_str(), s.ToString().c_str());
  308. *deleted_bytes = 0;
  309. } else {
  310. total_trash_size_.fetch_sub(*deleted_bytes);
  311. }
  312. return s;
  313. }
  314. void DeleteScheduler::WaitForEmptyTrash() {
  315. InstrumentedMutexLock l(&mu_);
  316. while (pending_files_ > 0 && !closing_) {
  317. cv_.Wait();
  318. }
  319. }
  320. } // namespace ROCKSDB_NAMESPACE
  321. #endif // ROCKSDB_LITE