| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "file/sst_file_manager_impl.h"
- #include <cinttypes>
- #include <vector>
- #include "db/db_impl/db_impl.h"
- #include "env/composite_env_wrapper.h"
- #include "port/port.h"
- #include "rocksdb/env.h"
- #include "rocksdb/sst_file_manager.h"
- #include "test_util/sync_point.h"
- #include "util/mutexlock.h"
- namespace ROCKSDB_NAMESPACE {
- #ifndef ROCKSDB_LITE
- SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
- std::shared_ptr<Logger> logger,
- int64_t rate_bytes_per_sec,
- double max_trash_db_ratio,
- uint64_t bytes_max_delete_chunk)
- : env_(env),
- fs_(fs),
- logger_(logger),
- total_files_size_(0),
- in_progress_files_size_(0),
- compaction_buffer_size_(0),
- cur_compactions_reserved_size_(0),
- max_allowed_space_(0),
- delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
- max_trash_db_ratio, bytes_max_delete_chunk),
- cv_(&mu_),
- closing_(false),
- bg_thread_(nullptr),
- reserved_disk_buffer_(0),
- free_space_trigger_(0),
- cur_instance_(nullptr) {}
- SstFileManagerImpl::~SstFileManagerImpl() {
- Close();
- }
- void SstFileManagerImpl::Close() {
- {
- MutexLock l(&mu_);
- if (closing_) {
- return;
- }
- closing_ = true;
- cv_.SignalAll();
- }
- if (bg_thread_) {
- bg_thread_->join();
- }
- }
- Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
- bool compaction) {
- uint64_t file_size;
- Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
- if (s.ok()) {
- MutexLock l(&mu_);
- OnAddFileImpl(file_path, file_size, compaction);
- }
- TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
- return s;
- }
- Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
- uint64_t file_size, bool compaction) {
- MutexLock l(&mu_);
- OnAddFileImpl(file_path, file_size, compaction);
- TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
- return Status::OK();
- }
- Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
- {
- MutexLock l(&mu_);
- OnDeleteFileImpl(file_path);
- }
- TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
- return Status::OK();
- }
- void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
- MutexLock l(&mu_);
- uint64_t size_added_by_compaction = 0;
- for (size_t i = 0; i < c->num_input_levels(); i++) {
- for (size_t j = 0; j < c->num_input_files(i); j++) {
- FileMetaData* filemeta = c->input(i, j);
- size_added_by_compaction += filemeta->fd.GetFileSize();
- }
- }
- cur_compactions_reserved_size_ -= size_added_by_compaction;
- auto new_files = c->edit()->GetNewFiles();
- for (auto& new_file : new_files) {
- auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
- new_file.second.fd.GetNumber(),
- new_file.second.fd.GetPathId());
- if (in_progress_files_.find(fn) != in_progress_files_.end()) {
- auto tracked_file = tracked_files_.find(fn);
- assert(tracked_file != tracked_files_.end());
- in_progress_files_size_ -= tracked_file->second;
- in_progress_files_.erase(fn);
- }
- }
- }
- Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
- const std::string& new_path,
- uint64_t* file_size) {
- {
- MutexLock l(&mu_);
- if (file_size != nullptr) {
- *file_size = tracked_files_[old_path];
- }
- OnAddFileImpl(new_path, tracked_files_[old_path], false);
- OnDeleteFileImpl(old_path);
- }
- TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
- return Status::OK();
- }
- void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
- MutexLock l(&mu_);
- max_allowed_space_ = max_allowed_space;
- }
- void SstFileManagerImpl::SetCompactionBufferSize(
- uint64_t compaction_buffer_size) {
- MutexLock l(&mu_);
- compaction_buffer_size_ = compaction_buffer_size;
- }
- bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
- MutexLock l(&mu_);
- if (max_allowed_space_ <= 0) {
- return false;
- }
- return total_files_size_ >= max_allowed_space_;
- }
- bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
- MutexLock l(&mu_);
- if (max_allowed_space_ <= 0) {
- return false;
- }
- return total_files_size_ + cur_compactions_reserved_size_ >=
- max_allowed_space_;
- }
- bool SstFileManagerImpl::EnoughRoomForCompaction(
- ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
- Status bg_error) {
- MutexLock l(&mu_);
- uint64_t size_added_by_compaction = 0;
- // First check if we even have the space to do the compaction
- for (size_t i = 0; i < inputs.size(); i++) {
- for (size_t j = 0; j < inputs[i].size(); j++) {
- FileMetaData* filemeta = inputs[i][j];
- size_added_by_compaction += filemeta->fd.GetFileSize();
- }
- }
- // Update cur_compactions_reserved_size_ so concurrent compaction
- // don't max out space
- size_t needed_headroom =
- cur_compactions_reserved_size_ + size_added_by_compaction +
- compaction_buffer_size_;
- if (max_allowed_space_ != 0 &&
- (needed_headroom + total_files_size_ > max_allowed_space_)) {
- return false;
- }
- // Implement more aggressive checks only if this DB instance has already
- // seen a NoSpace() error. This is tin order to contain a single potentially
- // misbehaving DB instance and prevent it from slowing down compactions of
- // other DB instances
- if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
- auto fn =
- TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
- inputs[0][0]->fd.GetPathId());
- uint64_t free_space = 0;
- fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
- // needed_headroom is based on current size reserved by compactions,
- // minus any files created by running compactions as they would count
- // against the reserved size. If user didn't specify any compaction
- // buffer, add reserved_disk_buffer_ that's calculated by default so the
- // compaction doesn't end up leaving nothing for logs and flush SSTs
- if (compaction_buffer_size_ == 0) {
- needed_headroom += reserved_disk_buffer_;
- }
- needed_headroom -= in_progress_files_size_;
- if (free_space < needed_headroom + size_added_by_compaction) {
- // We hit the condition of not enough disk space
- ROCKS_LOG_ERROR(logger_,
- "free space [%" PRIu64
- " bytes] is less than "
- "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
- free_space, needed_headroom);
- return false;
- }
- }
- cur_compactions_reserved_size_ += size_added_by_compaction;
- // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
- // a NoSpace error.
- free_space_trigger_ = cur_compactions_reserved_size_;
- return true;
- }
- uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
- MutexLock l(&mu_);
- return cur_compactions_reserved_size_;
- }
- uint64_t SstFileManagerImpl::GetTotalSize() {
- MutexLock l(&mu_);
- return total_files_size_;
- }
- std::unordered_map<std::string, uint64_t>
- SstFileManagerImpl::GetTrackedFiles() {
- MutexLock l(&mu_);
- return tracked_files_;
- }
- int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
- return delete_scheduler_.GetRateBytesPerSecond();
- }
- void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
- return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
- }
- double SstFileManagerImpl::GetMaxTrashDBRatio() {
- return delete_scheduler_.GetMaxTrashDBRatio();
- }
- void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
- return delete_scheduler_.SetMaxTrashDBRatio(r);
- }
- uint64_t SstFileManagerImpl::GetTotalTrashSize() {
- return delete_scheduler_.GetTotalTrashSize();
- }
- void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
- const std::string& path) {
- MutexLock l(&mu_);
- reserved_disk_buffer_ += size;
- if (path_.empty()) {
- path_ = path;
- }
- }
- void SstFileManagerImpl::ClearError() {
- while (true) {
- MutexLock l(&mu_);
- if (closing_) {
- return;
- }
- uint64_t free_space = 0;
- Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
- free_space = max_allowed_space_ > 0
- ? std::min(max_allowed_space_, free_space)
- : free_space;
- if (s.ok()) {
- // In case of multi-DB instances, some of them may have experienced a
- // soft error and some a hard error. In the SstFileManagerImpl, a hard
- // error will basically override previously reported soft errors. Once
- // we clear the hard error, we don't keep track of previous errors for
- // now
- if (bg_err_.severity() == Status::Severity::kHardError) {
- if (free_space < reserved_disk_buffer_) {
- ROCKS_LOG_ERROR(logger_,
- "free space [%" PRIu64
- " bytes] is less than "
- "required disk buffer [%" PRIu64 " bytes]\n",
- free_space, reserved_disk_buffer_);
- ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
- s = Status::NoSpace();
- }
- } else if (bg_err_.severity() == Status::Severity::kSoftError) {
- if (free_space < free_space_trigger_) {
- ROCKS_LOG_WARN(logger_,
- "free space [%" PRIu64
- " bytes] is less than "
- "free space for compaction trigger [%" PRIu64
- " bytes]\n",
- free_space, free_space_trigger_);
- ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
- s = Status::NoSpace();
- }
- }
- }
- // Someone could have called CancelErrorRecovery() and the list could have
- // become empty, so check again here
- if (s.ok() && !error_handler_list_.empty()) {
- auto error_handler = error_handler_list_.front();
- // Since we will release the mutex, set cur_instance_ to signal to the
- // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
- // to indicate that this DB instance is busy. The DB instance is
- // guaranteed to not be deleted before RecoverFromBGError() returns,
- // since the ErrorHandler::recovery_in_prog_ flag would be true
- cur_instance_ = error_handler;
- mu_.Unlock();
- s = error_handler->RecoverFromBGError();
- TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
- mu_.Lock();
- // The DB instance might have been deleted while we were
- // waiting for the mutex, so check cur_instance_ to make sure its
- // still non-null
- if (cur_instance_) {
- // Check for error again, since the instance may have recovered but
- // immediately got another error. If that's the case, and the new
- // error is also a NoSpace() non-fatal error, leave the instance in
- // the list
- Status err = cur_instance_->GetBGError();
- if (s.ok() && err == Status::NoSpace() &&
- err.severity() < Status::Severity::kFatalError) {
- s = err;
- }
- cur_instance_ = nullptr;
- }
- if (s.ok() || s.IsShutdownInProgress() ||
- (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
- // If shutdown is in progress, abandon this handler instance
- // and continue with the others
- error_handler_list_.pop_front();
- }
- }
- if (!error_handler_list_.empty()) {
- // If there are more instances to be recovered, reschedule after 5
- // seconds
- int64_t wait_until = env_->NowMicros() + 5000000;
- cv_.TimedWait(wait_until);
- }
- // Check again for error_handler_list_ empty, as a DB instance shutdown
- // could have removed it from the queue while we were in timed wait
- if (error_handler_list_.empty()) {
- ROCKS_LOG_INFO(logger_, "Clearing error\n");
- bg_err_ = Status::OK();
- return;
- }
- }
- }
- void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
- Status bg_error) {
- MutexLock l(&mu_);
- if (bg_error.severity() == Status::Severity::kSoftError) {
- if (bg_err_.ok()) {
- // Setting bg_err_ basically means we're in degraded mode
- // Assume that all pending compactions will fail similarly. The trigger
- // for clearing this condition is set to current compaction reserved
- // size, so we stop checking disk space available in
- // EnoughRoomForCompaction once this much free space is available
- bg_err_ = bg_error;
- }
- } else if (bg_error.severity() == Status::Severity::kHardError) {
- bg_err_ = bg_error;
- } else {
- assert(false);
- }
- // If this is the first instance of this error, kick of a thread to poll
- // and recover from this condition
- if (error_handler_list_.empty()) {
- error_handler_list_.push_back(handler);
- // Release lock before calling join. Its ok to do so because
- // error_handler_list_ is now non-empty, so no other invocation of this
- // function will execute this piece of code
- mu_.Unlock();
- if (bg_thread_) {
- bg_thread_->join();
- }
- // Start a new thread. The previous one would have exited.
- bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
- mu_.Lock();
- } else {
- // Check if this DB instance is already in the list
- for (auto iter = error_handler_list_.begin();
- iter != error_handler_list_.end(); ++iter) {
- if ((*iter) == handler) {
- return;
- }
- }
- error_handler_list_.push_back(handler);
- }
- }
- bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
- MutexLock l(&mu_);
- if (cur_instance_ == handler) {
- // This instance is currently busy attempting to recover
- // Nullify it so the recovery thread doesn't attempt to access it again
- cur_instance_ = nullptr;
- return false;
- }
- for (auto iter = error_handler_list_.begin();
- iter != error_handler_list_.end(); ++iter) {
- if ((*iter) == handler) {
- error_handler_list_.erase(iter);
- return true;
- }
- }
- return false;
- }
- Status SstFileManagerImpl::ScheduleFileDeletion(
- const std::string& file_path, const std::string& path_to_sync,
- const bool force_bg) {
- TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
- const_cast<std::string*>(&file_path));
- return delete_scheduler_.DeleteFile(file_path, path_to_sync,
- force_bg);
- }
- void SstFileManagerImpl::WaitForEmptyTrash() {
- delete_scheduler_.WaitForEmptyTrash();
- }
- void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
- uint64_t file_size, bool compaction) {
- auto tracked_file = tracked_files_.find(file_path);
- if (tracked_file != tracked_files_.end()) {
- // File was added before, we will just update the size
- assert(!compaction);
- total_files_size_ -= tracked_file->second;
- total_files_size_ += file_size;
- cur_compactions_reserved_size_ -= file_size;
- } else {
- total_files_size_ += file_size;
- if (compaction) {
- // Keep track of the size of files created by in-progress compactions.
- // When calculating whether there's enough headroom for new compactions,
- // this will be subtracted from cur_compactions_reserved_size_.
- // Otherwise, compactions will be double counted.
- in_progress_files_size_ += file_size;
- in_progress_files_.insert(file_path);
- }
- }
- tracked_files_[file_path] = file_size;
- }
- void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
- auto tracked_file = tracked_files_.find(file_path);
- if (tracked_file == tracked_files_.end()) {
- // File is not tracked
- assert(in_progress_files_.find(file_path) == in_progress_files_.end());
- return;
- }
- total_files_size_ -= tracked_file->second;
- // Check if it belonged to an in-progress compaction
- if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
- in_progress_files_size_ -= tracked_file->second;
- in_progress_files_.erase(file_path);
- }
- tracked_files_.erase(tracked_file);
- }
- SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
- std::string trash_dir,
- int64_t rate_bytes_per_sec,
- bool delete_existing_trash, Status* status,
- double max_trash_db_ratio,
- uint64_t bytes_max_delete_chunk) {
- std::shared_ptr<FileSystem> fs;
- if (env == Env::Default()) {
- fs = FileSystem::Default();
- } else {
- fs.reset(new LegacyFileSystemWrapper(env));
- }
- return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
- delete_existing_trash, status, max_trash_db_ratio,
- bytes_max_delete_chunk);
- }
- SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
- std::shared_ptr<Logger> info_log,
- const std::string& trash_dir,
- int64_t rate_bytes_per_sec,
- bool delete_existing_trash, Status* status,
- double max_trash_db_ratio,
- uint64_t bytes_max_delete_chunk) {
- SstFileManagerImpl* res =
- new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
- max_trash_db_ratio, bytes_max_delete_chunk);
- // trash_dir is deprecated and not needed anymore, but if user passed it
- // we will still remove files in it.
- Status s;
- if (delete_existing_trash && trash_dir != "") {
- std::vector<std::string> files_in_trash;
- s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
- if (s.ok()) {
- for (const std::string& trash_file : files_in_trash) {
- if (trash_file == "." || trash_file == "..") {
- continue;
- }
- std::string path_in_trash = trash_dir + "/" + trash_file;
- res->OnAddFile(path_in_trash);
- Status file_delete =
- res->ScheduleFileDeletion(path_in_trash, trash_dir);
- if (s.ok() && !file_delete.ok()) {
- s = file_delete;
- }
- }
- }
- }
- if (status) {
- *status = s;
- }
- return res;
- }
- #else
- SstFileManager* NewSstFileManager(Env* /*env*/,
- std::shared_ptr<Logger> /*info_log*/,
- std::string /*trash_dir*/,
- int64_t /*rate_bytes_per_sec*/,
- bool /*delete_existing_trash*/,
- Status* status, double /*max_trash_db_ratio*/,
- uint64_t /*bytes_max_delete_chunk*/) {
- if (status) {
- *status =
- Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
- }
- return nullptr;
- }
- #endif // ROCKSDB_LITE
- } // namespace ROCKSDB_NAMESPACE
|