sst_file_manager_impl.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "file/sst_file_manager_impl.h"
  6. #include <cinttypes>
  7. #include <vector>
  8. #include "db/db_impl/db_impl.h"
  9. #include "env/composite_env_wrapper.h"
  10. #include "port/port.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/sst_file_manager.h"
  13. #include "test_util/sync_point.h"
  14. #include "util/mutexlock.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. #ifndef ROCKSDB_LITE
  17. SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
  18. std::shared_ptr<Logger> logger,
  19. int64_t rate_bytes_per_sec,
  20. double max_trash_db_ratio,
  21. uint64_t bytes_max_delete_chunk)
  22. : env_(env),
  23. fs_(fs),
  24. logger_(logger),
  25. total_files_size_(0),
  26. in_progress_files_size_(0),
  27. compaction_buffer_size_(0),
  28. cur_compactions_reserved_size_(0),
  29. max_allowed_space_(0),
  30. delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
  31. max_trash_db_ratio, bytes_max_delete_chunk),
  32. cv_(&mu_),
  33. closing_(false),
  34. bg_thread_(nullptr),
  35. reserved_disk_buffer_(0),
  36. free_space_trigger_(0),
  37. cur_instance_(nullptr) {}
  38. SstFileManagerImpl::~SstFileManagerImpl() {
  39. Close();
  40. }
  41. void SstFileManagerImpl::Close() {
  42. {
  43. MutexLock l(&mu_);
  44. if (closing_) {
  45. return;
  46. }
  47. closing_ = true;
  48. cv_.SignalAll();
  49. }
  50. if (bg_thread_) {
  51. bg_thread_->join();
  52. }
  53. }
  54. Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
  55. bool compaction) {
  56. uint64_t file_size;
  57. Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
  58. if (s.ok()) {
  59. MutexLock l(&mu_);
  60. OnAddFileImpl(file_path, file_size, compaction);
  61. }
  62. TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
  63. return s;
  64. }
  65. Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
  66. uint64_t file_size, bool compaction) {
  67. MutexLock l(&mu_);
  68. OnAddFileImpl(file_path, file_size, compaction);
  69. TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
  70. return Status::OK();
  71. }
  72. Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
  73. {
  74. MutexLock l(&mu_);
  75. OnDeleteFileImpl(file_path);
  76. }
  77. TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
  78. return Status::OK();
  79. }
  80. void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
  81. MutexLock l(&mu_);
  82. uint64_t size_added_by_compaction = 0;
  83. for (size_t i = 0; i < c->num_input_levels(); i++) {
  84. for (size_t j = 0; j < c->num_input_files(i); j++) {
  85. FileMetaData* filemeta = c->input(i, j);
  86. size_added_by_compaction += filemeta->fd.GetFileSize();
  87. }
  88. }
  89. cur_compactions_reserved_size_ -= size_added_by_compaction;
  90. auto new_files = c->edit()->GetNewFiles();
  91. for (auto& new_file : new_files) {
  92. auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
  93. new_file.second.fd.GetNumber(),
  94. new_file.second.fd.GetPathId());
  95. if (in_progress_files_.find(fn) != in_progress_files_.end()) {
  96. auto tracked_file = tracked_files_.find(fn);
  97. assert(tracked_file != tracked_files_.end());
  98. in_progress_files_size_ -= tracked_file->second;
  99. in_progress_files_.erase(fn);
  100. }
  101. }
  102. }
  103. Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
  104. const std::string& new_path,
  105. uint64_t* file_size) {
  106. {
  107. MutexLock l(&mu_);
  108. if (file_size != nullptr) {
  109. *file_size = tracked_files_[old_path];
  110. }
  111. OnAddFileImpl(new_path, tracked_files_[old_path], false);
  112. OnDeleteFileImpl(old_path);
  113. }
  114. TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
  115. return Status::OK();
  116. }
  117. void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
  118. MutexLock l(&mu_);
  119. max_allowed_space_ = max_allowed_space;
  120. }
  121. void SstFileManagerImpl::SetCompactionBufferSize(
  122. uint64_t compaction_buffer_size) {
  123. MutexLock l(&mu_);
  124. compaction_buffer_size_ = compaction_buffer_size;
  125. }
  126. bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
  127. MutexLock l(&mu_);
  128. if (max_allowed_space_ <= 0) {
  129. return false;
  130. }
  131. return total_files_size_ >= max_allowed_space_;
  132. }
  133. bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
  134. MutexLock l(&mu_);
  135. if (max_allowed_space_ <= 0) {
  136. return false;
  137. }
  138. return total_files_size_ + cur_compactions_reserved_size_ >=
  139. max_allowed_space_;
  140. }
  141. bool SstFileManagerImpl::EnoughRoomForCompaction(
  142. ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
  143. Status bg_error) {
  144. MutexLock l(&mu_);
  145. uint64_t size_added_by_compaction = 0;
  146. // First check if we even have the space to do the compaction
  147. for (size_t i = 0; i < inputs.size(); i++) {
  148. for (size_t j = 0; j < inputs[i].size(); j++) {
  149. FileMetaData* filemeta = inputs[i][j];
  150. size_added_by_compaction += filemeta->fd.GetFileSize();
  151. }
  152. }
  153. // Update cur_compactions_reserved_size_ so concurrent compaction
  154. // don't max out space
  155. size_t needed_headroom =
  156. cur_compactions_reserved_size_ + size_added_by_compaction +
  157. compaction_buffer_size_;
  158. if (max_allowed_space_ != 0 &&
  159. (needed_headroom + total_files_size_ > max_allowed_space_)) {
  160. return false;
  161. }
  162. // Implement more aggressive checks only if this DB instance has already
  163. // seen a NoSpace() error. This is tin order to contain a single potentially
  164. // misbehaving DB instance and prevent it from slowing down compactions of
  165. // other DB instances
  166. if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
  167. auto fn =
  168. TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
  169. inputs[0][0]->fd.GetPathId());
  170. uint64_t free_space = 0;
  171. fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
  172. // needed_headroom is based on current size reserved by compactions,
  173. // minus any files created by running compactions as they would count
  174. // against the reserved size. If user didn't specify any compaction
  175. // buffer, add reserved_disk_buffer_ that's calculated by default so the
  176. // compaction doesn't end up leaving nothing for logs and flush SSTs
  177. if (compaction_buffer_size_ == 0) {
  178. needed_headroom += reserved_disk_buffer_;
  179. }
  180. needed_headroom -= in_progress_files_size_;
  181. if (free_space < needed_headroom + size_added_by_compaction) {
  182. // We hit the condition of not enough disk space
  183. ROCKS_LOG_ERROR(logger_,
  184. "free space [%" PRIu64
  185. " bytes] is less than "
  186. "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
  187. free_space, needed_headroom);
  188. return false;
  189. }
  190. }
  191. cur_compactions_reserved_size_ += size_added_by_compaction;
  192. // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
  193. // a NoSpace error.
  194. free_space_trigger_ = cur_compactions_reserved_size_;
  195. return true;
  196. }
  197. uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
  198. MutexLock l(&mu_);
  199. return cur_compactions_reserved_size_;
  200. }
  201. uint64_t SstFileManagerImpl::GetTotalSize() {
  202. MutexLock l(&mu_);
  203. return total_files_size_;
  204. }
  205. std::unordered_map<std::string, uint64_t>
  206. SstFileManagerImpl::GetTrackedFiles() {
  207. MutexLock l(&mu_);
  208. return tracked_files_;
  209. }
  210. int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
  211. return delete_scheduler_.GetRateBytesPerSecond();
  212. }
  213. void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
  214. return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
  215. }
  216. double SstFileManagerImpl::GetMaxTrashDBRatio() {
  217. return delete_scheduler_.GetMaxTrashDBRatio();
  218. }
  219. void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
  220. return delete_scheduler_.SetMaxTrashDBRatio(r);
  221. }
  222. uint64_t SstFileManagerImpl::GetTotalTrashSize() {
  223. return delete_scheduler_.GetTotalTrashSize();
  224. }
  225. void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
  226. const std::string& path) {
  227. MutexLock l(&mu_);
  228. reserved_disk_buffer_ += size;
  229. if (path_.empty()) {
  230. path_ = path;
  231. }
  232. }
  233. void SstFileManagerImpl::ClearError() {
  234. while (true) {
  235. MutexLock l(&mu_);
  236. if (closing_) {
  237. return;
  238. }
  239. uint64_t free_space = 0;
  240. Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
  241. free_space = max_allowed_space_ > 0
  242. ? std::min(max_allowed_space_, free_space)
  243. : free_space;
  244. if (s.ok()) {
  245. // In case of multi-DB instances, some of them may have experienced a
  246. // soft error and some a hard error. In the SstFileManagerImpl, a hard
  247. // error will basically override previously reported soft errors. Once
  248. // we clear the hard error, we don't keep track of previous errors for
  249. // now
  250. if (bg_err_.severity() == Status::Severity::kHardError) {
  251. if (free_space < reserved_disk_buffer_) {
  252. ROCKS_LOG_ERROR(logger_,
  253. "free space [%" PRIu64
  254. " bytes] is less than "
  255. "required disk buffer [%" PRIu64 " bytes]\n",
  256. free_space, reserved_disk_buffer_);
  257. ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
  258. s = Status::NoSpace();
  259. }
  260. } else if (bg_err_.severity() == Status::Severity::kSoftError) {
  261. if (free_space < free_space_trigger_) {
  262. ROCKS_LOG_WARN(logger_,
  263. "free space [%" PRIu64
  264. " bytes] is less than "
  265. "free space for compaction trigger [%" PRIu64
  266. " bytes]\n",
  267. free_space, free_space_trigger_);
  268. ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
  269. s = Status::NoSpace();
  270. }
  271. }
  272. }
  273. // Someone could have called CancelErrorRecovery() and the list could have
  274. // become empty, so check again here
  275. if (s.ok() && !error_handler_list_.empty()) {
  276. auto error_handler = error_handler_list_.front();
  277. // Since we will release the mutex, set cur_instance_ to signal to the
  278. // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
  279. // to indicate that this DB instance is busy. The DB instance is
  280. // guaranteed to not be deleted before RecoverFromBGError() returns,
  281. // since the ErrorHandler::recovery_in_prog_ flag would be true
  282. cur_instance_ = error_handler;
  283. mu_.Unlock();
  284. s = error_handler->RecoverFromBGError();
  285. TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
  286. mu_.Lock();
  287. // The DB instance might have been deleted while we were
  288. // waiting for the mutex, so check cur_instance_ to make sure its
  289. // still non-null
  290. if (cur_instance_) {
  291. // Check for error again, since the instance may have recovered but
  292. // immediately got another error. If that's the case, and the new
  293. // error is also a NoSpace() non-fatal error, leave the instance in
  294. // the list
  295. Status err = cur_instance_->GetBGError();
  296. if (s.ok() && err == Status::NoSpace() &&
  297. err.severity() < Status::Severity::kFatalError) {
  298. s = err;
  299. }
  300. cur_instance_ = nullptr;
  301. }
  302. if (s.ok() || s.IsShutdownInProgress() ||
  303. (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
  304. // If shutdown is in progress, abandon this handler instance
  305. // and continue with the others
  306. error_handler_list_.pop_front();
  307. }
  308. }
  309. if (!error_handler_list_.empty()) {
  310. // If there are more instances to be recovered, reschedule after 5
  311. // seconds
  312. int64_t wait_until = env_->NowMicros() + 5000000;
  313. cv_.TimedWait(wait_until);
  314. }
  315. // Check again for error_handler_list_ empty, as a DB instance shutdown
  316. // could have removed it from the queue while we were in timed wait
  317. if (error_handler_list_.empty()) {
  318. ROCKS_LOG_INFO(logger_, "Clearing error\n");
  319. bg_err_ = Status::OK();
  320. return;
  321. }
  322. }
  323. }
  324. void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
  325. Status bg_error) {
  326. MutexLock l(&mu_);
  327. if (bg_error.severity() == Status::Severity::kSoftError) {
  328. if (bg_err_.ok()) {
  329. // Setting bg_err_ basically means we're in degraded mode
  330. // Assume that all pending compactions will fail similarly. The trigger
  331. // for clearing this condition is set to current compaction reserved
  332. // size, so we stop checking disk space available in
  333. // EnoughRoomForCompaction once this much free space is available
  334. bg_err_ = bg_error;
  335. }
  336. } else if (bg_error.severity() == Status::Severity::kHardError) {
  337. bg_err_ = bg_error;
  338. } else {
  339. assert(false);
  340. }
  341. // If this is the first instance of this error, kick of a thread to poll
  342. // and recover from this condition
  343. if (error_handler_list_.empty()) {
  344. error_handler_list_.push_back(handler);
  345. // Release lock before calling join. Its ok to do so because
  346. // error_handler_list_ is now non-empty, so no other invocation of this
  347. // function will execute this piece of code
  348. mu_.Unlock();
  349. if (bg_thread_) {
  350. bg_thread_->join();
  351. }
  352. // Start a new thread. The previous one would have exited.
  353. bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
  354. mu_.Lock();
  355. } else {
  356. // Check if this DB instance is already in the list
  357. for (auto iter = error_handler_list_.begin();
  358. iter != error_handler_list_.end(); ++iter) {
  359. if ((*iter) == handler) {
  360. return;
  361. }
  362. }
  363. error_handler_list_.push_back(handler);
  364. }
  365. }
  366. bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
  367. MutexLock l(&mu_);
  368. if (cur_instance_ == handler) {
  369. // This instance is currently busy attempting to recover
  370. // Nullify it so the recovery thread doesn't attempt to access it again
  371. cur_instance_ = nullptr;
  372. return false;
  373. }
  374. for (auto iter = error_handler_list_.begin();
  375. iter != error_handler_list_.end(); ++iter) {
  376. if ((*iter) == handler) {
  377. error_handler_list_.erase(iter);
  378. return true;
  379. }
  380. }
  381. return false;
  382. }
  383. Status SstFileManagerImpl::ScheduleFileDeletion(
  384. const std::string& file_path, const std::string& path_to_sync,
  385. const bool force_bg) {
  386. TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
  387. const_cast<std::string*>(&file_path));
  388. return delete_scheduler_.DeleteFile(file_path, path_to_sync,
  389. force_bg);
  390. }
  391. void SstFileManagerImpl::WaitForEmptyTrash() {
  392. delete_scheduler_.WaitForEmptyTrash();
  393. }
  394. void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
  395. uint64_t file_size, bool compaction) {
  396. auto tracked_file = tracked_files_.find(file_path);
  397. if (tracked_file != tracked_files_.end()) {
  398. // File was added before, we will just update the size
  399. assert(!compaction);
  400. total_files_size_ -= tracked_file->second;
  401. total_files_size_ += file_size;
  402. cur_compactions_reserved_size_ -= file_size;
  403. } else {
  404. total_files_size_ += file_size;
  405. if (compaction) {
  406. // Keep track of the size of files created by in-progress compactions.
  407. // When calculating whether there's enough headroom for new compactions,
  408. // this will be subtracted from cur_compactions_reserved_size_.
  409. // Otherwise, compactions will be double counted.
  410. in_progress_files_size_ += file_size;
  411. in_progress_files_.insert(file_path);
  412. }
  413. }
  414. tracked_files_[file_path] = file_size;
  415. }
  416. void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
  417. auto tracked_file = tracked_files_.find(file_path);
  418. if (tracked_file == tracked_files_.end()) {
  419. // File is not tracked
  420. assert(in_progress_files_.find(file_path) == in_progress_files_.end());
  421. return;
  422. }
  423. total_files_size_ -= tracked_file->second;
  424. // Check if it belonged to an in-progress compaction
  425. if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
  426. in_progress_files_size_ -= tracked_file->second;
  427. in_progress_files_.erase(file_path);
  428. }
  429. tracked_files_.erase(tracked_file);
  430. }
  431. SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
  432. std::string trash_dir,
  433. int64_t rate_bytes_per_sec,
  434. bool delete_existing_trash, Status* status,
  435. double max_trash_db_ratio,
  436. uint64_t bytes_max_delete_chunk) {
  437. std::shared_ptr<FileSystem> fs;
  438. if (env == Env::Default()) {
  439. fs = FileSystem::Default();
  440. } else {
  441. fs.reset(new LegacyFileSystemWrapper(env));
  442. }
  443. return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
  444. delete_existing_trash, status, max_trash_db_ratio,
  445. bytes_max_delete_chunk);
  446. }
  447. SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
  448. std::shared_ptr<Logger> info_log,
  449. const std::string& trash_dir,
  450. int64_t rate_bytes_per_sec,
  451. bool delete_existing_trash, Status* status,
  452. double max_trash_db_ratio,
  453. uint64_t bytes_max_delete_chunk) {
  454. SstFileManagerImpl* res =
  455. new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
  456. max_trash_db_ratio, bytes_max_delete_chunk);
  457. // trash_dir is deprecated and not needed anymore, but if user passed it
  458. // we will still remove files in it.
  459. Status s;
  460. if (delete_existing_trash && trash_dir != "") {
  461. std::vector<std::string> files_in_trash;
  462. s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
  463. if (s.ok()) {
  464. for (const std::string& trash_file : files_in_trash) {
  465. if (trash_file == "." || trash_file == "..") {
  466. continue;
  467. }
  468. std::string path_in_trash = trash_dir + "/" + trash_file;
  469. res->OnAddFile(path_in_trash);
  470. Status file_delete =
  471. res->ScheduleFileDeletion(path_in_trash, trash_dir);
  472. if (s.ok() && !file_delete.ok()) {
  473. s = file_delete;
  474. }
  475. }
  476. }
  477. }
  478. if (status) {
  479. *status = s;
  480. }
  481. return res;
  482. }
  483. #else
  484. SstFileManager* NewSstFileManager(Env* /*env*/,
  485. std::shared_ptr<Logger> /*info_log*/,
  486. std::string /*trash_dir*/,
  487. int64_t /*rate_bytes_per_sec*/,
  488. bool /*delete_existing_trash*/,
  489. Status* status, double /*max_trash_db_ratio*/,
  490. uint64_t /*bytes_max_delete_chunk*/) {
  491. if (status) {
  492. *status =
  493. Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
  494. }
  495. return nullptr;
  496. }
  497. #endif // ROCKSDB_LITE
  498. } // namespace ROCKSDB_NAMESPACE