| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include "file/delete_scheduler.h"#include <thread>#include <vector>#include "file/sst_file_manager_impl.h"#include "logging/logging.h"#include "port/port.h"#include "rocksdb/env.h"#include "test_util/sync_point.h"#include "util/mutexlock.h"namespace ROCKSDB_NAMESPACE {DeleteScheduler::DeleteScheduler(Env* env, FileSystem* fs,                                 int64_t rate_bytes_per_sec, Logger* info_log,                                 SstFileManagerImpl* sst_file_manager,                                 double max_trash_db_ratio,                                 uint64_t bytes_max_delete_chunk)    : env_(env),      fs_(fs),      total_trash_size_(0),      rate_bytes_per_sec_(rate_bytes_per_sec),      pending_files_(0),      bytes_max_delete_chunk_(bytes_max_delete_chunk),      closing_(false),      cv_(&mu_),      info_log_(info_log),      sst_file_manager_(sst_file_manager),      max_trash_db_ratio_(max_trash_db_ratio) {  assert(sst_file_manager != nullptr);  assert(max_trash_db_ratio >= 0);  bg_thread_.reset(      new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));}DeleteScheduler::~DeleteScheduler() {  {    InstrumentedMutexLock l(&mu_);    closing_ = true;    cv_.SignalAll();  }  if (bg_thread_) {    bg_thread_->join();  }}Status DeleteScheduler::DeleteFile(const std::string& file_path,                                   const std::string& dir_to_sync,                                   const bool force_bg) {  Status s;  if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&      total_trash_size_.load() >          sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {    // Rate limiting is disabled or trash size makes up more than    // max_trash_db_ratio_ (default 25%) of the total DB size    TEST_SYNC_POINT("DeleteScheduler::DeleteFile");    s = fs_->DeleteFile(file_path, IOOptions(), nullptr);    if (s.ok()) {      sst_file_manager_->OnDeleteFile(file_path);    }    return s;  }  // Move file to trash  std::string trash_file;  s = MarkAsTrash(file_path, &trash_file);  if (!s.ok()) {    ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",                    file_path.c_str(), s.ToString().c_str());    s = fs_->DeleteFile(file_path, IOOptions(), nullptr);    if (s.ok()) {      sst_file_manager_->OnDeleteFile(file_path);    }    return s;  }  // Update the total trash size  uint64_t trash_file_size = 0;  fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);  total_trash_size_.fetch_add(trash_file_size);  // Add file to delete queue  {    InstrumentedMutexLock l(&mu_);    queue_.emplace(trash_file, dir_to_sync);    pending_files_++;    if (pending_files_ == 1) {      cv_.SignalAll();    }  }  return s;}std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {  InstrumentedMutexLock l(&mu_);  return bg_errors_;}const std::string DeleteScheduler::kTrashExtension = ".trash";bool DeleteScheduler::IsTrashFile(const std::string& file_path) {  return (file_path.size() >= kTrashExtension.size() &&          file_path.rfind(kTrashExtension) ==              file_path.size() - kTrashExtension.size());}Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,                                         const std::string& path) {  Status s;  // Check if there are any files marked as trash in this path  std::vector<std::string> files_in_path;  s = env->GetChildren(path, &files_in_path);  if (!s.ok()) {    return s;  }  for (const std::string& current_file : files_in_path) {    if (!DeleteScheduler::IsTrashFile(current_file)) {      // not a trash file, skip      continue;    }    Status file_delete;    std::string trash_file = path + "/" + current_file;    if (sfm) {      // We have an SstFileManager that will schedule the file delete      sfm->OnAddFile(trash_file);      file_delete = sfm->ScheduleFileDeletion(trash_file, path);    } else {      // Delete the file immediately      file_delete = env->DeleteFile(trash_file);    }    if (s.ok() && !file_delete.ok()) {      s = file_delete;    }  }  return s;}Status DeleteScheduler::MarkAsTrash(const std::string& file_path,                                    std::string* trash_file) {  // Sanity check of the path  size_t idx = file_path.rfind("/");  if (idx == std::string::npos || idx == file_path.size() - 1) {    return Status::InvalidArgument("file_path is corrupted");  }  Status s;  if (DeleteScheduler::IsTrashFile(file_path)) {    // This is already a trash file    *trash_file = file_path;    return s;  }  *trash_file = file_path + kTrashExtension;  // TODO(tec) : Implement Env::RenameFileIfNotExist and remove  //             file_move_mu mutex.  int cnt = 0;  InstrumentedMutexLock l(&file_move_mu_);  while (true) {    s = fs_->FileExists(*trash_file, IOOptions(), nullptr);    if (s.IsNotFound()) {      // We found a path for our file in trash      s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);      break;    } else if (s.ok()) {      // Name conflict, generate new random suffix      *trash_file = file_path + std::to_string(cnt) + kTrashExtension;    } else {      // Error during FileExists call, we cannot continue      break;    }    cnt++;  }  if (s.ok()) {    sst_file_manager_->OnMoveFile(file_path, *trash_file);  }  return s;}void DeleteScheduler::BackgroundEmptyTrash() {  TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");  while (true) {    InstrumentedMutexLock l(&mu_);    while (queue_.empty() && !closing_) {      cv_.Wait();    }    if (closing_) {      return;    }    // Delete all files in queue_    uint64_t start_time = env_->NowMicros();    uint64_t total_deleted_bytes = 0;    int64_t current_delete_rate = rate_bytes_per_sec_.load();    while (!queue_.empty() && !closing_) {      if (current_delete_rate != rate_bytes_per_sec_.load()) {        // User changed the delete rate        current_delete_rate = rate_bytes_per_sec_.load();        start_time = env_->NowMicros();        total_deleted_bytes = 0;      }      // Get new file to delete      const FileAndDir& fad = queue_.front();      std::string path_in_trash = fad.fname;      // We dont need to hold the lock while deleting the file      mu_.Unlock();      uint64_t deleted_bytes = 0;      bool is_complete = true;      // Delete file from trash and update total_penlty value      Status s =          DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);      total_deleted_bytes += deleted_bytes;      mu_.Lock();      if (is_complete) {        queue_.pop();      }      if (!s.ok()) {        bg_errors_[path_in_trash] = s;      }      // Apply penlty if necessary      uint64_t total_penlty;      if (current_delete_rate > 0) {        // rate limiting is enabled        total_penlty =            ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);        while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}      } else {        // rate limiting is disabled        total_penlty = 0;      }      TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",                               &total_penlty);      if (is_complete) {        pending_files_--;      }      if (pending_files_ == 0) {        // Unblock WaitForEmptyTrash since there are no more files waiting        // to be deleted        cv_.SignalAll();      }    }  }}Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,                                        const std::string& dir_to_sync,                                        uint64_t* deleted_bytes,                                        bool* is_complete) {  uint64_t file_size;  Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);  *is_complete = true;  TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");  if (s.ok()) {    bool need_full_delete = true;    if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {      uint64_t num_hard_links = 2;      // We don't have to worry aobut data race between linking a new      // file after the number of file link check and ftruncte because      // the file is now in trash and no hardlink is supposed to create      // to trash files by RocksDB.      Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),                                           &num_hard_links, nullptr);      if (my_status.ok()) {        if (num_hard_links == 1) {          std::unique_ptr<FSWritableFile> wf;          my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),                                              &wf, nullptr);          if (my_status.ok()) {            my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,                                     IOOptions(), nullptr);            if (my_status.ok()) {              TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");              my_status = wf->Fsync(IOOptions(), nullptr);            }          }          if (my_status.ok()) {            *deleted_bytes = bytes_max_delete_chunk_;            need_full_delete = false;            *is_complete = false;          } else {            ROCKS_LOG_WARN(info_log_,                           "Failed to partially delete %s from trash -- %s",                           path_in_trash.c_str(), my_status.ToString().c_str());          }        } else {          ROCKS_LOG_INFO(info_log_,                         "Cannot delete %s slowly through ftruncate from trash "                         "as it has other links",                         path_in_trash.c_str());        }      } else if (!num_link_error_printed_) {        ROCKS_LOG_INFO(            info_log_,            "Cannot delete files slowly through ftruncate from trash "            "as Env::NumFileLinks() returns error: %s",            my_status.ToString().c_str());        num_link_error_printed_ = true;      }    }    if (need_full_delete) {      s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);      if (!dir_to_sync.empty()) {        std::unique_ptr<FSDirectory> dir_obj;        if (s.ok()) {          s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);        }        if (s.ok()) {          s = dir_obj->Fsync(IOOptions(), nullptr);          TEST_SYNC_POINT_CALLBACK(              "DeleteScheduler::DeleteTrashFile::AfterSyncDir",              reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));        }      }      *deleted_bytes = file_size;      sst_file_manager_->OnDeleteFile(path_in_trash);    }  }  if (!s.ok()) {    // Error while getting file size or while deleting    ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",                    path_in_trash.c_str(), s.ToString().c_str());    *deleted_bytes = 0;  } else {    total_trash_size_.fetch_sub(*deleted_bytes);  }  return s;}void DeleteScheduler::WaitForEmptyTrash() {  InstrumentedMutexLock l(&mu_);  while (pending_files_ > 0 && !closing_) {    cv_.Wait();  }}}  // namespace ROCKSDB_NAMESPACE#endif  // ROCKSDB_LITE
 |