db_sst_test.cc 71 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_test_util.h"
  10. #include "env/mock_env.h"
  11. #include "file/sst_file_manager_impl.h"
  12. #include "port/port.h"
  13. #include "port/stack_trace.h"
  14. #include "rocksdb/cache.h"
  15. #include "rocksdb/sst_file_manager.h"
  16. #include "rocksdb/table.h"
  17. #include "util/random.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. class DBSSTTest : public DBTestBase {
  20. public:
  21. DBSSTTest(const std::string& test_name = "db_sst_test")
  22. : DBTestBase(test_name, /*env_do_fsync=*/true) {}
  23. };
  24. // A class which remembers the name of each flushed file.
  25. class FlushedFileCollector : public EventListener {
  26. public:
  27. FlushedFileCollector() = default;
  28. ~FlushedFileCollector() override = default;
  29. void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
  30. std::lock_guard<std::mutex> lock(mutex_);
  31. flushed_files_.push_back(info.file_path);
  32. }
  33. std::vector<std::string> GetFlushedFiles() {
  34. std::lock_guard<std::mutex> lock(mutex_);
  35. std::vector<std::string> result;
  36. for (const auto& fname : flushed_files_) {
  37. result.push_back(fname);
  38. }
  39. return result;
  40. }
  41. void ClearFlushedFiles() {
  42. std::lock_guard<std::mutex> lock(mutex_);
  43. flushed_files_.clear();
  44. }
  45. private:
  46. std::vector<std::string> flushed_files_;
  47. std::mutex mutex_;
  48. };
  49. TEST_F(DBSSTTest, DontDeletePendingOutputs) {
  50. Options options;
  51. options.env = env_;
  52. options.create_if_missing = true;
  53. DestroyAndReopen(options);
  54. // Every time we write to a table file, call FOF/POF with full DB scan. This
  55. // will make sure our pending_outputs_ protection work correctly
  56. std::function<void()> purge_obsolete_files_function = [&]() {
  57. JobContext job_context(0);
  58. dbfull()->TEST_LockMutex();
  59. dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
  60. dbfull()->TEST_UnlockMutex();
  61. dbfull()->PurgeObsoleteFiles(job_context);
  62. job_context.Clean();
  63. };
  64. env_->table_write_callback_ = &purge_obsolete_files_function;
  65. for (int i = 0; i < 2; ++i) {
  66. ASSERT_OK(Put("a", "begin"));
  67. ASSERT_OK(Put("z", "end"));
  68. ASSERT_OK(Flush());
  69. }
  70. // If pending output guard does not work correctly, PurgeObsoleteFiles() will
  71. // delete the file that Compaction is trying to create, causing this: error
  72. // db/db_test.cc:975: IO error:
  73. // /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory
  74. Compact("a", "b");
  75. }
  76. // 1 Create some SST files by inserting K-V pairs into DB
  77. // 2 Close DB and change suffix from ".sst" to ".ldb" for every other SST file
  78. // 3 Open DB and check if all key can be read
  79. TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
  80. Options options = CurrentOptions();
  81. options.write_buffer_size = 110 << 10; // 110KB
  82. options.num_levels = 4;
  83. DestroyAndReopen(options);
  84. Random rnd(301);
  85. int key_id = 0;
  86. for (int i = 0; i < 10; ++i) {
  87. GenerateNewFile(&rnd, &key_id, false);
  88. }
  89. ASSERT_OK(Flush());
  90. Close();
  91. int const num_files = GetSstFileCount(dbname_);
  92. ASSERT_GT(num_files, 0);
  93. Reopen(options);
  94. std::vector<std::string> values;
  95. values.reserve(key_id);
  96. for (int k = 0; k < key_id; ++k) {
  97. values.push_back(Get(Key(k)));
  98. }
  99. Close();
  100. std::vector<std::string> filenames;
  101. GetSstFiles(env_, dbname_, &filenames);
  102. int num_ldb_files = 0;
  103. for (size_t i = 0; i < filenames.size(); ++i) {
  104. if (i & 1) {
  105. continue;
  106. }
  107. std::string const rdb_name = dbname_ + "/" + filenames[i];
  108. std::string const ldb_name = Rocks2LevelTableFileName(rdb_name);
  109. ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok());
  110. ++num_ldb_files;
  111. }
  112. ASSERT_GT(num_ldb_files, 0);
  113. ASSERT_EQ(num_files, GetSstFileCount(dbname_));
  114. Reopen(options);
  115. for (int k = 0; k < key_id; ++k) {
  116. ASSERT_EQ(values[k], Get(Key(k)));
  117. }
  118. Destroy(options);
  119. }
  120. TEST_F(DBSSTTest, DontDeleteMovedFile) {
  121. // This test triggers move compaction and verifies that the file is not
  122. // deleted when it's part of move compaction
  123. Options options = CurrentOptions();
  124. options.env = env_;
  125. options.create_if_missing = true;
  126. options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
  127. options.level0_file_num_compaction_trigger =
  128. 2; // trigger compaction when we have 2 files
  129. DestroyAndReopen(options);
  130. Random rnd(301);
  131. // Create two 1MB sst files
  132. for (int i = 0; i < 2; ++i) {
  133. // Create 1MB sst file
  134. for (int j = 0; j < 100; ++j) {
  135. ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
  136. }
  137. ASSERT_OK(Flush());
  138. }
  139. // this should execute both L0->L1 and L1->(move)->L2 compactions
  140. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  141. ASSERT_EQ("0,0,1", FilesPerLevel(0));
  142. // If the moved file is actually deleted (the move-safeguard in
  143. // ~Version::Version() is not there), we get this failure:
  144. // Corruption: Can't access /000009.sst
  145. Reopen(options);
  146. }
  147. // This reproduces a bug where we don't delete a file because when it was
  148. // supposed to be deleted, it was blocked by pending_outputs
  149. // Consider:
  150. // 1. current file_number is 13
  151. // 2. compaction (1) starts, blocks deletion of all files starting with 13
  152. // (pending outputs)
  153. // 3. file 13 is created by compaction (2)
  154. // 4. file 13 is consumed by compaction (3) and file 15 was created. Since file
  155. // 13 has no references, it is put into VersionSet::obsolete_files_
  156. // 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13
  157. // is deleted from obsolete_files_ set.
  158. // 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by
  159. // pending outputs since compaction (1) is still running. It is not deleted and
  160. // it is not present in obsolete_files_ anymore. Therefore, we never delete it.
  161. TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
  162. Options options = CurrentOptions();
  163. options.env = env_;
  164. options.write_buffer_size = 2 * 1024 * 1024; // 2 MB
  165. options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
  166. options.level0_file_num_compaction_trigger =
  167. 2; // trigger compaction when we have 2 files
  168. options.max_background_flushes = 2;
  169. options.max_background_compactions = 2;
  170. OnFileDeletionListener* listener = new OnFileDeletionListener();
  171. options.listeners.emplace_back(listener);
  172. Reopen(options);
  173. Random rnd(301);
  174. // Create two 1MB sst files
  175. for (int i = 0; i < 2; ++i) {
  176. // Create 1MB sst file
  177. for (int j = 0; j < 100; ++j) {
  178. ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
  179. }
  180. ASSERT_OK(Flush());
  181. }
  182. // this should execute both L0->L1 and L1->(move)->L2 compactions
  183. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  184. ASSERT_EQ("0,0,1", FilesPerLevel(0));
  185. test::SleepingBackgroundTask blocking_thread;
  186. port::Mutex mutex_;
  187. bool already_blocked(false);
  188. // block the flush
  189. std::function<void()> block_first_time = [&]() {
  190. bool blocking = false;
  191. {
  192. MutexLock l(&mutex_);
  193. if (!already_blocked) {
  194. blocking = true;
  195. already_blocked = true;
  196. }
  197. }
  198. if (blocking) {
  199. blocking_thread.DoSleep();
  200. }
  201. };
  202. env_->table_write_callback_ = &block_first_time;
  203. // Insert 2.5MB data, which should trigger a flush because we exceed
  204. // write_buffer_size. The flush will be blocked with block_first_time
  205. // pending_file is protecting all the files created after
  206. for (int j = 0; j < 256; ++j) {
  207. ASSERT_OK(Put(Key(j), rnd.RandomString(10 * 1024)));
  208. }
  209. blocking_thread.WaitUntilSleeping();
  210. ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));
  211. ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
  212. std::vector<LiveFileMetaData> metadata;
  213. db_->GetLiveFilesMetaData(&metadata);
  214. ASSERT_EQ(metadata.size(), 1U);
  215. auto file_on_L2 = metadata[0].name;
  216. listener->SetExpectedFileName(dbname_ + file_on_L2);
  217. ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr,
  218. true /* disallow trivial move */));
  219. ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
  220. // finish the flush!
  221. blocking_thread.WakeUp();
  222. blocking_thread.WaitUntilDone();
  223. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  224. // File just flushed is too big for L0 and L1 so gets moved to L2.
  225. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  226. ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));
  227. metadata.clear();
  228. db_->GetLiveFilesMetaData(&metadata);
  229. ASSERT_EQ(metadata.size(), 2U);
  230. // This file should have been deleted during last compaction
  231. ASSERT_EQ(Status::NotFound(), env_->FileExists(dbname_ + file_on_L2));
  232. listener->VerifyMatchedCount(1);
  233. }
  234. // Test that producing an empty .sst file does not write it out to
  235. // disk, and that the DeleteFile() env method is not called for
  236. // removing the non-existing file later.
  237. TEST_F(DBSSTTest, DeleteFileNotCalledForNotCreatedSSTFile) {
  238. Options options = CurrentOptions();
  239. options.env = env_;
  240. OnFileDeletionListener* listener = new OnFileDeletionListener();
  241. options.listeners.emplace_back(listener);
  242. Reopen(options);
  243. // Flush the empty database.
  244. ASSERT_OK(Flush());
  245. ASSERT_EQ("", FilesPerLevel(0));
  246. // We expect no .sst files.
  247. std::vector<LiveFileMetaData> metadata;
  248. db_->GetLiveFilesMetaData(&metadata);
  249. ASSERT_EQ(metadata.size(), 0U);
  250. // We expect no file deletions.
  251. listener->VerifyMatchedCount(0);
  252. }
  253. // Test that producing a non-empty .sst file does write it out to
  254. // disk, and that the DeleteFile() env method is not called for removing
  255. // the file later.
  256. TEST_F(DBSSTTest, DeleteFileNotCalledForCreatedSSTFile) {
  257. Options options = CurrentOptions();
  258. options.env = env_;
  259. OnFileDeletionListener* listener = new OnFileDeletionListener();
  260. options.listeners.emplace_back(listener);
  261. Reopen(options);
  262. ASSERT_OK(Put("pika", "choo"));
  263. // Flush the non-empty database.
  264. ASSERT_OK(Flush());
  265. ASSERT_EQ("1", FilesPerLevel(0));
  266. // We expect 1 .sst files.
  267. std::vector<LiveFileMetaData> metadata;
  268. db_->GetLiveFilesMetaData(&metadata);
  269. ASSERT_EQ(metadata.size(), 1U);
  270. // We expect no file deletions.
  271. listener->VerifyMatchedCount(0);
  272. }
  273. TEST_F(DBSSTTest, DBWithSstFileManager) {
  274. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  275. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  276. int files_added = 0;
  277. int files_deleted = 0;
  278. int files_moved = 0;
  279. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  280. "SstFileManagerImpl::OnAddFile", [&](void* /*arg*/) { files_added++; });
  281. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  282. "SstFileManagerImpl::OnDeleteFile",
  283. [&](void* /*arg*/) { files_deleted++; });
  284. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  285. "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
  286. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  287. Options options = CurrentOptions();
  288. options.sst_file_manager = sst_file_manager;
  289. DestroyAndReopen(options);
  290. Random rnd(301);
  291. for (int i = 0; i < 25; i++) {
  292. GenerateNewRandomFile(&rnd);
  293. ASSERT_OK(Flush());
  294. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  295. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  296. // Verify that we are tracking all sst files in dbname_
  297. std::unordered_map<std::string, uint64_t> files_in_db;
  298. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  299. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  300. }
  301. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  302. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  303. std::unordered_map<std::string, uint64_t> files_in_db;
  304. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  305. // Verify that we are tracking all sst files in dbname_
  306. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  307. // Verify the total files size
  308. uint64_t total_files_size = 0;
  309. for (auto& file_to_size : files_in_db) {
  310. total_files_size += file_to_size.second;
  311. }
  312. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  313. // We flushed at least 25 files
  314. ASSERT_GE(files_added, 25);
  315. // Compaction must have deleted some files
  316. ASSERT_GT(files_deleted, 0);
  317. // No files were moved
  318. ASSERT_EQ(files_moved, 0);
  319. Close();
  320. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  321. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  322. Reopen(options);
  323. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  324. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  325. // Verify that we track all the files again after the DB is closed and opened
  326. Close();
  327. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  328. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  329. sst_file_manager.reset(NewSstFileManager(env_));
  330. options.sst_file_manager = sst_file_manager;
  331. sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  332. Reopen(options);
  333. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  334. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  335. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  336. }
  337. TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
  338. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  339. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  340. int files_added = 0;
  341. int files_deleted = 0;
  342. int files_moved = 0;
  343. int files_scheduled_to_delete = 0;
  344. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  345. "SstFileManagerImpl::OnAddFile", [&](void* arg) {
  346. const std::string* const file_path =
  347. static_cast<const std::string*>(arg);
  348. if (file_path->find(".blob") != std::string::npos) {
  349. files_added++;
  350. }
  351. });
  352. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  353. "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
  354. const std::string* const file_path =
  355. static_cast<const std::string*>(arg);
  356. if (file_path->find(".blob") != std::string::npos) {
  357. files_deleted++;
  358. }
  359. });
  360. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  361. "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
  362. assert(arg);
  363. const std::string* const file_path =
  364. static_cast<const std::string*>(arg);
  365. if (file_path->find(".blob") != std::string::npos) {
  366. ++files_scheduled_to_delete;
  367. }
  368. });
  369. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  370. "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
  371. int64_t untracked_files = 0;
  372. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  373. "SstFileManagerImpl::OnUntrackFile",
  374. [&](void* /*arg*/) { ++untracked_files; });
  375. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  376. Options options = CurrentOptions();
  377. options.sst_file_manager = sst_file_manager;
  378. options.enable_blob_files = true;
  379. options.blob_file_size = 32; // create one blob per file
  380. DestroyAndReopen(options);
  381. Random rnd(301);
  382. for (int i = 0; i < 10; i++) {
  383. ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i)));
  384. ASSERT_OK(Flush());
  385. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  386. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  387. // Verify that we are tracking all sst and blob files in dbname_
  388. std::unordered_map<std::string, uint64_t> files_in_db;
  389. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  390. ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
  391. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  392. }
  393. std::vector<uint64_t> blob_files = GetBlobFileNumbers();
  394. ASSERT_EQ(files_added, blob_files.size());
  395. // No blob file is obsoleted.
  396. ASSERT_EQ(files_deleted, 0);
  397. ASSERT_EQ(files_scheduled_to_delete, 0);
  398. // No files were moved.
  399. ASSERT_EQ(files_moved, 0);
  400. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  401. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  402. std::unordered_map<std::string, uint64_t> files_in_db;
  403. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  404. ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
  405. // Verify that we are tracking all sst and blob files in dbname_
  406. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  407. // Verify the total files size
  408. uint64_t total_files_size = 0;
  409. for (auto& file_to_size : files_in_db) {
  410. total_files_size += file_to_size.second;
  411. }
  412. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  413. Close();
  414. ASSERT_EQ(untracked_files, files_in_db.size());
  415. untracked_files = 0;
  416. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  417. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  418. Reopen(options);
  419. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  420. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  421. // Verify that we track all the files again after the DB is closed and opened.
  422. Close();
  423. ASSERT_EQ(untracked_files, files_in_db.size());
  424. untracked_files = 0;
  425. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  426. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  427. sst_file_manager.reset(NewSstFileManager(env_));
  428. options.sst_file_manager = sst_file_manager;
  429. sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  430. Reopen(options);
  431. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  432. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  433. // Destroy DB and it will remove all the blob files from sst file manager and
  434. // blob files deletion will go through ScheduleFileDeletion.
  435. ASSERT_EQ(files_deleted, 0);
  436. ASSERT_EQ(files_scheduled_to_delete, 0);
  437. Close();
  438. ASSERT_EQ(untracked_files, files_in_db.size());
  439. untracked_files = 0;
  440. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  441. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  442. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  443. "SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
  444. assert(arg);
  445. const std::string* const file_path =
  446. static_cast<const std::string*>(arg);
  447. if (EndsWith(*file_path, ".blob")) {
  448. ++files_scheduled_to_delete;
  449. }
  450. });
  451. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  452. "DeleteScheduler::OnDeleteFile", [&](void* arg) {
  453. const std::string* const file_path =
  454. static_cast<const std::string*>(arg);
  455. if (EndsWith(*file_path, ".blob")) {
  456. files_deleted++;
  457. }
  458. });
  459. ASSERT_OK(DestroyDB(dbname_, options));
  460. ASSERT_EQ(files_deleted, blob_files.size());
  461. ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
  462. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  463. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  464. }
  465. TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
  466. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  467. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  468. Options options = CurrentOptions();
  469. options.sst_file_manager = sst_file_manager;
  470. options.enable_blob_files = true;
  471. options.blob_file_size = 32; // create one blob per file
  472. options.disable_auto_compactions = true;
  473. options.enable_blob_garbage_collection = true;
  474. options.blob_garbage_collection_age_cutoff = 0.5;
  475. int files_added = 0;
  476. int files_deleted = 0;
  477. int files_moved = 0;
  478. int files_scheduled_to_delete = 0;
  479. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  480. "SstFileManagerImpl::OnAddFile", [&](void* arg) {
  481. const std::string* const file_path =
  482. static_cast<const std::string*>(arg);
  483. if (file_path->find(".blob") != std::string::npos) {
  484. files_added++;
  485. }
  486. });
  487. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  488. "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
  489. const std::string* const file_path =
  490. static_cast<const std::string*>(arg);
  491. if (file_path->find(".blob") != std::string::npos) {
  492. files_deleted++;
  493. }
  494. });
  495. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  496. "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
  497. assert(arg);
  498. const std::string* const file_path =
  499. static_cast<const std::string*>(arg);
  500. if (file_path->find(".blob") != std::string::npos) {
  501. ++files_scheduled_to_delete;
  502. }
  503. });
  504. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  505. "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
  506. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  507. DestroyAndReopen(options);
  508. Random rnd(301);
  509. constexpr char first_key[] = "first_key";
  510. constexpr char first_value[] = "first_value";
  511. constexpr char second_key[] = "second_key";
  512. constexpr char second_value[] = "second_value";
  513. ASSERT_OK(Put(first_key, first_value));
  514. ASSERT_OK(Put(second_key, second_value));
  515. ASSERT_OK(Flush());
  516. constexpr char third_key[] = "third_key";
  517. constexpr char third_value[] = "third_value";
  518. constexpr char fourth_key[] = "fourth_key";
  519. constexpr char fourth_value[] = "fourth_value";
  520. constexpr char fifth_key[] = "fifth_key";
  521. constexpr char fifth_value[] = "fifth_value";
  522. ASSERT_OK(Put(third_key, third_value));
  523. ASSERT_OK(Put(fourth_key, fourth_value));
  524. ASSERT_OK(Put(fifth_key, fifth_value));
  525. ASSERT_OK(Flush());
  526. const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
  527. ASSERT_EQ(original_blob_files.size(), 5);
  528. ASSERT_EQ(files_added, 5);
  529. ASSERT_EQ(files_deleted, 0);
  530. ASSERT_EQ(files_scheduled_to_delete, 0);
  531. ASSERT_EQ(files_moved, 0);
  532. {
  533. // Verify that we are tracking all sst and blob files in dbname_
  534. std::unordered_map<std::string, uint64_t> files_in_db;
  535. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  536. ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
  537. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  538. }
  539. const size_t cutoff_index = static_cast<size_t>(
  540. options.blob_garbage_collection_age_cutoff * original_blob_files.size());
  541. size_t expected_number_of_files = original_blob_files.size();
  542. // Note: turning off enable_blob_files before the compaction results in
  543. // garbage collected values getting inlined.
  544. ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
  545. expected_number_of_files -= cutoff_index;
  546. files_added = 0;
  547. constexpr Slice* begin = nullptr;
  548. constexpr Slice* end = nullptr;
  549. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  550. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  551. sfm->WaitForEmptyTrash();
  552. ASSERT_EQ(Get(first_key), first_value);
  553. ASSERT_EQ(Get(second_key), second_value);
  554. ASSERT_EQ(Get(third_key), third_value);
  555. ASSERT_EQ(Get(fourth_key), fourth_value);
  556. ASSERT_EQ(Get(fifth_key), fifth_value);
  557. const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
  558. ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
  559. // No new file is added.
  560. ASSERT_EQ(files_added, 0);
  561. ASSERT_EQ(files_deleted, cutoff_index);
  562. ASSERT_EQ(files_scheduled_to_delete, cutoff_index);
  563. ASSERT_EQ(files_moved, 0);
  564. // Original blob files below the cutoff should be gone, original blob files at
  565. // or above the cutoff should be still there
  566. for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
  567. ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
  568. }
  569. {
  570. // Verify that we are tracking all sst and blob files in dbname_
  571. std::unordered_map<std::string, uint64_t> files_in_db;
  572. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
  573. ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
  574. ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
  575. }
  576. Close();
  577. ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
  578. ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
  579. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  580. "SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
  581. assert(arg);
  582. const std::string* const file_path =
  583. static_cast<const std::string*>(arg);
  584. if (EndsWith(*file_path, ".blob")) {
  585. ++files_scheduled_to_delete;
  586. }
  587. });
  588. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  589. "DeleteScheduler::OnDeleteFile", [&](void* arg) {
  590. const std::string* const file_path =
  591. static_cast<const std::string*>(arg);
  592. if (EndsWith(*file_path, ".blob")) {
  593. files_deleted++;
  594. }
  595. });
  596. ASSERT_OK(DestroyDB(dbname_, options));
  597. sfm->WaitForEmptyTrash();
  598. ASSERT_EQ(files_deleted, 5);
  599. ASSERT_EQ(files_scheduled_to_delete, 5);
  600. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  601. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  602. }
  603. class DBSSTTestRateLimit : public DBSSTTest,
  604. public ::testing::WithParamInterface<bool> {
  605. public:
  606. DBSSTTestRateLimit() : DBSSTTest() {}
  607. ~DBSSTTestRateLimit() override = default;
  608. };
  609. TEST_P(DBSSTTestRateLimit, RateLimitedDelete) {
  610. Destroy(last_options_);
  611. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  612. {"DBSSTTest::RateLimitedDelete:1",
  613. "DeleteScheduler::BackgroundEmptyTrash"},
  614. });
  615. std::vector<uint64_t> penalties;
  616. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  617. "DeleteScheduler::BackgroundEmptyTrash:Wait",
  618. [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
  619. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  620. "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
  621. // Turn timed wait into a simulated sleep
  622. uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
  623. uint64_t cur_time = env_->NowMicros();
  624. if (*abs_time_us > cur_time) {
  625. env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
  626. }
  627. // Plus an additional short, random amount
  628. env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));
  629. // Set wait until time to before (actual) current time to force not
  630. // to sleep
  631. *abs_time_us = Env::Default()->NowMicros();
  632. });
  633. // Disable PeriodicTaskScheduler as it also has TimedWait, which could update
  634. // the simulated sleep time
  635. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  636. "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) {
  637. bool* disable_scheduler = static_cast<bool*>(arg);
  638. *disable_scheduler = true;
  639. });
  640. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  641. bool different_wal_dir = GetParam();
  642. Options options = CurrentOptions();
  643. SetTimeElapseOnlySleepOnReopen(&options);
  644. options.disable_auto_compactions = true;
  645. options.env = env_;
  646. options.statistics = CreateDBStatistics();
  647. if (different_wal_dir) {
  648. options.wal_dir = alternative_wal_dir_;
  649. }
  650. int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
  651. Status s;
  652. options.sst_file_manager.reset(
  653. NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  654. ASSERT_OK(s);
  655. options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  656. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  657. sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
  658. WriteOptions wo;
  659. if (!different_wal_dir) {
  660. wo.disableWAL = true;
  661. }
  662. Reopen(options);
  663. // Create 4 files in L0
  664. for (char v = 'a'; v <= 'd'; v++) {
  665. ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
  666. ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
  667. ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
  668. ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
  669. ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
  670. ASSERT_OK(Flush());
  671. }
  672. // We created 4 sst files in L0
  673. ASSERT_EQ("4", FilesPerLevel(0));
  674. std::vector<LiveFileMetaData> metadata;
  675. db_->GetLiveFilesMetaData(&metadata);
  676. // Compaction will move the 4 files in L0 to trash and create 1 L1 file
  677. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  678. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  679. ASSERT_EQ("0,1", FilesPerLevel(0));
  680. uint64_t delete_start_time = env_->NowMicros();
  681. // Hold BackgroundEmptyTrash
  682. TEST_SYNC_POINT("DBSSTTest::RateLimitedDelete:1");
  683. sfm->WaitForEmptyTrash();
  684. uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
  685. uint64_t total_files_size = 0;
  686. uint64_t expected_penlty = 0;
  687. ASSERT_EQ(penalties.size(), metadata.size());
  688. for (size_t i = 0; i < metadata.size(); i++) {
  689. total_files_size += metadata[i].size;
  690. expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec);
  691. ASSERT_EQ(expected_penlty, penalties[i]);
  692. }
  693. ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
  694. ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
  695. ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
  696. ASSERT_EQ(
  697. 0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
  698. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  699. }
  700. INSTANTIATE_TEST_CASE_P(RateLimitedDelete, DBSSTTestRateLimit,
  701. ::testing::Bool());
  702. TEST_F(DBSSTTest, RateLimitedWALDelete) {
  703. Destroy(last_options_);
  704. std::vector<uint64_t> penalties;
  705. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  706. "DeleteScheduler::BackgroundEmptyTrash:Wait",
  707. [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
  708. Options options = CurrentOptions();
  709. options.disable_auto_compactions = true;
  710. options.compression = kNoCompression;
  711. options.env = env_;
  712. int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
  713. Status s;
  714. options.sst_file_manager.reset(
  715. NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  716. ASSERT_OK(s);
  717. options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  718. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  719. sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
  720. SetTimeElapseOnlySleepOnReopen(&options);
  721. ASSERT_OK(TryReopen(options));
  722. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  723. // Create 4 files in L0
  724. for (char v = 'a'; v <= 'd'; v++) {
  725. ASSERT_OK(Put("Key2", DummyString(1024, v)));
  726. ASSERT_OK(Put("Key3", DummyString(1024, v)));
  727. ASSERT_OK(Put("Key4", DummyString(1024, v)));
  728. ASSERT_OK(Put("Key1", DummyString(1024, v)));
  729. ASSERT_OK(Put("Key4", DummyString(1024, v)));
  730. ASSERT_OK(Flush());
  731. }
  732. // We created 4 sst files in L0
  733. ASSERT_EQ("4", FilesPerLevel(0));
  734. // Compaction will move the 4 files in L0 to trash and create 1 L1 file.
  735. // Use kForceOptimized to not rewrite the new L1 file.
  736. CompactRangeOptions cro;
  737. cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
  738. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  739. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  740. ASSERT_EQ("0,1", FilesPerLevel(0));
  741. sfm->WaitForEmptyTrash();
  742. ASSERT_EQ(penalties.size(), 8);
  743. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  744. }
  745. class DBWALTestWithParam
  746. : public DBTestBase,
  747. public testing::WithParamInterface<std::tuple<std::string, bool>> {
  748. public:
  749. explicit DBWALTestWithParam()
  750. : DBTestBase("db_wal_test_with_params", /*env_do_fsync=*/true) {
  751. wal_dir_ = std::get<0>(GetParam());
  752. wal_dir_same_as_dbname_ = std::get<1>(GetParam());
  753. }
  754. std::string wal_dir_;
  755. bool wal_dir_same_as_dbname_;
  756. };
  757. TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
  758. class MyEnv : public EnvWrapper {
  759. public:
  760. MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
  761. const char* Name() const override { return "MyEnv"; }
  762. Status DeleteFile(const std::string& fname) override {
  763. if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
  764. return Status::OK();
  765. }
  766. return target()->DeleteFile(fname);
  767. }
  768. void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
  769. private:
  770. bool fake_log_delete;
  771. };
  772. std::unique_ptr<MyEnv> env(new MyEnv(env_));
  773. Destroy(last_options_);
  774. env->set_fake_log_delete(true);
  775. Options options = CurrentOptions();
  776. options.disable_auto_compactions = true;
  777. options.compression = kNoCompression;
  778. options.env = env.get();
  779. options.wal_dir = dbname_ + wal_dir_;
  780. int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
  781. Status s;
  782. options.sst_file_manager.reset(
  783. NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  784. ASSERT_OK(s);
  785. options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
  786. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  787. sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
  788. Reopen(options);
  789. // Create 4 files in L0
  790. for (char v = 'a'; v <= 'd'; v++) {
  791. if (v == 'c') {
  792. // Maximize the chance that the last log file will be preserved in trash
  793. // before restarting the DB. (Enable slow deletion but at a very slow
  794. // deletion rate)
  795. // We have to set this on the 2nd to last file for it to delay deletion
  796. // on the last file. (Quirk of DeleteScheduler::BackgroundEmptyTrash())
  797. options.sst_file_manager->SetDeleteRateBytesPerSecond(1);
  798. }
  799. ASSERT_OK(Put("Key2", DummyString(1024, v)));
  800. ASSERT_OK(Put("Key3", DummyString(1024, v)));
  801. ASSERT_OK(Put("Key4", DummyString(1024, v)));
  802. ASSERT_OK(Put("Key1", DummyString(1024, v)));
  803. ASSERT_OK(Put("Key4", DummyString(1024, v)));
  804. ASSERT_OK(Flush());
  805. }
  806. // We created 4 sst files in L0
  807. ASSERT_EQ("4", FilesPerLevel(0));
  808. Close();
  809. options.sst_file_manager.reset();
  810. std::vector<std::string> filenames;
  811. int trash_log_count = 0;
  812. if (!wal_dir_same_as_dbname_) {
  813. // Forcibly create some trash log files
  814. std::unique_ptr<WritableFile> result;
  815. ASSERT_OK(env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
  816. EnvOptions()));
  817. result.reset();
  818. }
  819. ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
  820. for (const std::string& fname : filenames) {
  821. if (fname.find(".log.trash") != std::string::npos) {
  822. trash_log_count++;
  823. }
  824. }
  825. ASSERT_GE(trash_log_count, 1);
  826. env->set_fake_log_delete(false);
  827. Reopen(options);
  828. filenames.clear();
  829. trash_log_count = 0;
  830. ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
  831. for (const std::string& fname : filenames) {
  832. if (fname.find(".log.trash") != std::string::npos) {
  833. trash_log_count++;
  834. }
  835. }
  836. ASSERT_EQ(trash_log_count, 0);
  837. Close();
  838. }
  839. INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
  840. ::testing::Values(std::make_tuple("", true),
  841. std::make_tuple("_wal_dir", false)));
  842. // Test param: max_trash_db_ratio for DeleteScheduler
  843. class DBObsoleteFileDeletionOnOpenTest
  844. : public DBSSTTest,
  845. public ::testing::WithParamInterface<double> {
  846. public:
  847. explicit DBObsoleteFileDeletionOnOpenTest()
  848. : DBSSTTest("db_sst_deletion_on_open_test") {}
  849. };
  850. TEST_P(DBObsoleteFileDeletionOnOpenTest, Basic) {
  851. Options options = CurrentOptions();
  852. options.sst_file_manager.reset(
  853. NewSstFileManager(env_, nullptr, "", 1024 * 1024 /* 1 MB/sec */));
  854. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  855. sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
  856. double max_trash_db_ratio = GetParam();
  857. sfm->delete_scheduler()->SetMaxTrashDBRatio(max_trash_db_ratio);
  858. int bg_delete_file = 0;
  859. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  860. "DeleteScheduler::DeleteTrashFile:DeleteFile",
  861. [&](void* /*arg*/) { bg_delete_file++; });
  862. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  863. Destroy(last_options_);
  864. // Add some trash files to the db directory so the DB can clean them up
  865. ASSERT_OK(env_->CreateDirIfMissing(dbname_));
  866. ASSERT_OK(
  867. WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash", false));
  868. ASSERT_OK(
  869. WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash", false));
  870. ASSERT_OK(
  871. WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash", false));
  872. // Manually add an obsolete sst file. Obsolete SST files are discovered and
  873. // deleted upon recovery.
  874. uint64_t sst_file_number = 100;
  875. const std::string kObsoleteSstFileOne =
  876. MakeTableFileName(dbname_, sst_file_number);
  877. ASSERT_OK(WriteStringToFile(env_, "abc", kObsoleteSstFileOne, false));
  878. // The slow deletion on recovery had a bug before where a file's size is not
  879. // first tracked in `total_size_` in SstFileManager before passed to
  880. // DeleteScheduler. The first obsolete file is still slow deleted because
  881. // 0 (total_trash_size_) > 0 (total_size_) * 1000 (max_trash_db_ratio)
  882. // is always false.
  883. // Here we explicitly create a second obsolete file to verify this bug's fix
  884. const std::string kObsoleteSstFileTwo =
  885. MakeTableFileName(dbname_, sst_file_number - 1);
  886. ASSERT_OK(WriteStringToFile(env_, "abc", kObsoleteSstFileTwo, false));
  887. // Reopen the DB and verify that it deletes existing trash files and obsolete
  888. // SST files with rate limiting.
  889. Reopen(options);
  890. sfm->WaitForEmptyTrash();
  891. ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
  892. ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
  893. ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
  894. ASSERT_NOK(env_->FileExists(kObsoleteSstFileOne));
  895. ASSERT_NOK(env_->FileExists(kObsoleteSstFileTwo));
  896. // The files in the DB's directory are all either trash or obsolete sst files.
  897. // So the trash/db ratio is 1. A ratio equal to or higher than 1 should
  898. // schedule all files' deletion in background. A ratio lower than 1 may
  899. // send some files to be deleted immediately.
  900. if (max_trash_db_ratio < 1) {
  901. ASSERT_LE(bg_delete_file, 5);
  902. } else {
  903. ASSERT_EQ(bg_delete_file, 5);
  904. }
  905. ASSERT_EQ(sfm->GetTotalSize(), 0);
  906. ASSERT_EQ(sfm->delete_scheduler()->GetTotalTrashSize(), 0);
  907. }
  908. INSTANTIATE_TEST_CASE_P(DBObsoleteFileDeletionOnOpenTest,
  909. DBObsoleteFileDeletionOnOpenTest,
  910. ::testing::Values(0, 0.5, 1, 1.2));
  911. // Create a DB with 2 db_paths, and generate multiple files in the 2
  912. // db_paths using CompactRangeOptions, make sure that files that were
  913. // deleted from first db_path were deleted using DeleteScheduler and
  914. // files in the second path were not.
  915. TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
  916. std::atomic<int> bg_delete_file(0);
  917. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  918. "DeleteScheduler::DeleteTrashFile:DeleteFile",
  919. [&](void* /*arg*/) { bg_delete_file++; });
  920. // The deletion scheduler sometimes skips marking file as trash according to
  921. // a heuristic. In that case the deletion will go through the below SyncPoint.
  922. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  923. "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { bg_delete_file++; });
  924. Options options = CurrentOptions();
  925. options.disable_auto_compactions = true;
  926. options.db_paths.emplace_back(dbname_, 1024 * 100);
  927. options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
  928. options.env = env_;
  929. int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
  930. Status s;
  931. options.sst_file_manager.reset(
  932. NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s,
  933. /* max_trash_db_ratio= */ 1.1));
  934. ASSERT_OK(s);
  935. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  936. DestroyAndReopen(options);
  937. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  938. WriteOptions wo;
  939. wo.disableWAL = true;
  940. // Create 4 files in L0
  941. for (int i = 0; i < 4; i++) {
  942. ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A'), wo));
  943. ASSERT_OK(Flush());
  944. }
  945. // We created 4 sst files in L0
  946. ASSERT_EQ("4", FilesPerLevel(0));
  947. // Compaction will delete files from L0 in first db path and generate a new
  948. // file in L1 in second db path
  949. CompactRangeOptions compact_options;
  950. compact_options.target_path_id = 1;
  951. Slice begin("Key0");
  952. Slice end("Key3");
  953. ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
  954. ASSERT_EQ("0,1", FilesPerLevel(0));
  955. // Create 4 files in L0
  956. for (int i = 4; i < 8; i++) {
  957. ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'B'), wo));
  958. ASSERT_OK(Flush());
  959. }
  960. ASSERT_EQ("4,1", FilesPerLevel(0));
  961. // Compaction will delete files from L0 in first db path and generate a new
  962. // file in L1 in second db path
  963. begin = "Key4";
  964. end = "Key7";
  965. ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
  966. ASSERT_EQ("0,2", FilesPerLevel(0));
  967. sfm->WaitForEmptyTrash();
  968. ASSERT_EQ(bg_delete_file, 8);
  969. // Compaction will delete both files and regenerate a file in L1 in second
  970. // db path. The deleted files should still be cleaned up via delete scheduler.
  971. compact_options.bottommost_level_compaction =
  972. BottommostLevelCompaction::kForceOptimized;
  973. ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
  974. ASSERT_EQ("0,1", FilesPerLevel(0));
  975. sfm->WaitForEmptyTrash();
  976. ASSERT_EQ(bg_delete_file, 10);
  977. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  978. }
  979. TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
  980. int bg_delete_file = 0;
  981. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  982. "DeleteScheduler::DeleteTrashFile:DeleteFile",
  983. [&](void* /*arg*/) { bg_delete_file++; });
  984. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  985. Status s;
  986. Options options = CurrentOptions();
  987. options.disable_auto_compactions = true;
  988. options.env = env_;
  989. options.sst_file_manager.reset(
  990. NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
  991. ASSERT_OK(s);
  992. DestroyAndReopen(options);
  993. // Create 4 files in L0
  994. for (int i = 0; i < 4; i++) {
  995. ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A')));
  996. ASSERT_OK(Flush());
  997. }
  998. // We created 4 sst files in L0
  999. ASSERT_EQ("4", FilesPerLevel(0));
  1000. // Close DB and destroy it using DeleteScheduler
  1001. Close();
  1002. int num_sst_files = 0;
  1003. int num_wal_files = 0;
  1004. std::vector<std::string> db_files;
  1005. ASSERT_OK(env_->GetChildren(dbname_, &db_files));
  1006. for (const std::string& f : db_files) {
  1007. if (f.substr(f.find_last_of('.') + 1) == "sst") {
  1008. num_sst_files++;
  1009. } else if (f.substr(f.find_last_of('.') + 1) == "log") {
  1010. num_wal_files++;
  1011. }
  1012. }
  1013. ASSERT_GT(num_sst_files, 0);
  1014. ASSERT_GT(num_wal_files, 0);
  1015. auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
  1016. sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
  1017. // Set an extra high trash ratio to prevent immediate/non-rate limited
  1018. // deletions
  1019. sfm->delete_scheduler()->SetMaxTrashDBRatio(1000.0);
  1020. ASSERT_OK(DestroyDB(dbname_, options));
  1021. sfm->WaitForEmptyTrash();
  1022. ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
  1023. }
  1024. TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
  1025. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1026. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1027. Options options = CurrentOptions();
  1028. options.sst_file_manager = sst_file_manager;
  1029. options.disable_auto_compactions = true;
  1030. DestroyAndReopen(options);
  1031. Random rnd(301);
  1032. // Generate a file containing 100 keys.
  1033. for (int i = 0; i < 100; i++) {
  1034. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1035. }
  1036. ASSERT_OK(Flush());
  1037. uint64_t first_file_size = 0;
  1038. std::unordered_map<std::string, uint64_t> files_in_db;
  1039. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size));
  1040. ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
  1041. // Set the maximum allowed space usage to the current total size
  1042. sfm->SetMaxAllowedSpaceUsage(first_file_size + 1);
  1043. ASSERT_OK(Put("key1", "val1"));
  1044. // This flush will cause bg_error_ and will fail
  1045. ASSERT_NOK(Flush());
  1046. }
  1047. TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) {
  1048. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1049. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1050. Options options = CurrentOptions();
  1051. options.sst_file_manager = sst_file_manager;
  1052. options.disable_auto_compactions = true;
  1053. options.enable_blob_files = true;
  1054. DestroyAndReopen(options);
  1055. Random rnd(301);
  1056. // Generate a file containing keys.
  1057. for (int i = 0; i < 10; i++) {
  1058. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1059. }
  1060. ASSERT_OK(Flush());
  1061. uint64_t files_size = 0;
  1062. uint64_t total_files_size = 0;
  1063. std::unordered_map<std::string, uint64_t> files_in_db;
  1064. ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size));
  1065. // Make sure blob files are considered by SSTFileManage in size limits.
  1066. ASSERT_GT(files_size, 0);
  1067. total_files_size = files_size;
  1068. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size));
  1069. total_files_size += files_size;
  1070. ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
  1071. // Set the maximum allowed space usage to the current total size.
  1072. sfm->SetMaxAllowedSpaceUsage(total_files_size + 1);
  1073. bool max_allowed_space_reached = false;
  1074. bool delete_blob_file = false;
  1075. // Sync point called after blob file is closed and max allowed space is
  1076. // checked.
  1077. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1078. "BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached",
  1079. [&](void* /*arg*/) { max_allowed_space_reached = true; });
  1080. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1081. "BuildTable::AfterDeleteFile",
  1082. [&](void* /*arg*/) { delete_blob_file = true; });
  1083. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  1084. {
  1085. "BuildTable::AfterDeleteFile",
  1086. "DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1",
  1087. },
  1088. });
  1089. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1090. ASSERT_OK(Put("key1", "val1"));
  1091. // This flush will fail
  1092. ASSERT_NOK(Flush());
  1093. ASSERT_TRUE(max_allowed_space_reached);
  1094. TEST_SYNC_POINT("DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1");
  1095. ASSERT_TRUE(delete_blob_file);
  1096. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1097. }
  1098. TEST_F(DBSSTTest, CancellingCompactionsWorks) {
  1099. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1100. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1101. Options options = CurrentOptions();
  1102. options.sst_file_manager = sst_file_manager;
  1103. options.level0_file_num_compaction_trigger = 2;
  1104. options.statistics = CreateDBStatistics();
  1105. DestroyAndReopen(options);
  1106. int completed_compactions = 0;
  1107. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1108. "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* /*arg*/) {
  1109. sfm->SetMaxAllowedSpaceUsage(0);
  1110. ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  1111. });
  1112. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1113. "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
  1114. [&](void* /*arg*/) { completed_compactions++; });
  1115. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1116. Random rnd(301);
  1117. // Generate a file containing 10 keys.
  1118. for (int i = 0; i < 10; i++) {
  1119. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1120. }
  1121. ASSERT_OK(Flush());
  1122. uint64_t total_file_size = 0;
  1123. std::unordered_map<std::string, uint64_t> files_in_db;
  1124. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
  1125. // Set the maximum allowed space usage to the current total size
  1126. sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
  1127. // Generate another file to trigger compaction.
  1128. for (int i = 0; i < 10; i++) {
  1129. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1130. }
  1131. ASSERT_OK(Flush());
  1132. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1133. // Because we set a callback in CancelledCompaction, we actually
  1134. // let the compaction run
  1135. ASSERT_GT(completed_compactions, 0);
  1136. ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  1137. // Make sure the stat is bumped
  1138. ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
  1139. COMPACTION_CANCELLED),
  1140. 0);
  1141. ASSERT_EQ(0,
  1142. dbfull()->immutable_db_options().statistics.get()->getTickerCount(
  1143. FILES_MARKED_TRASH));
  1144. ASSERT_EQ(4,
  1145. dbfull()->immutable_db_options().statistics.get()->getTickerCount(
  1146. FILES_DELETED_IMMEDIATELY));
  1147. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1148. }
  1149. TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
  1150. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1151. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1152. Options options = CurrentOptions();
  1153. options.sst_file_manager = sst_file_manager;
  1154. options.statistics = CreateDBStatistics();
  1155. FlushedFileCollector* collector = new FlushedFileCollector();
  1156. options.listeners.emplace_back(collector);
  1157. DestroyAndReopen(options);
  1158. Random rnd(301);
  1159. // Generate a file containing 10 keys.
  1160. for (int i = 0; i < 10; i++) {
  1161. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1162. }
  1163. ASSERT_OK(Flush());
  1164. uint64_t total_file_size = 0;
  1165. std::unordered_map<std::string, uint64_t> files_in_db;
  1166. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
  1167. // Set the maximum allowed space usage to the current total size
  1168. sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
  1169. // Generate another file to trigger compaction.
  1170. for (int i = 0; i < 10; i++) {
  1171. ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
  1172. }
  1173. ASSERT_OK(Flush());
  1174. // OK, now trigger a manual compaction
  1175. ASSERT_TRUE(dbfull()
  1176. ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
  1177. .IsCompactionTooLarge());
  1178. // Wait for manual compaction to get scheduled and finish
  1179. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1180. ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  1181. // Make sure the stat is bumped
  1182. ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
  1183. COMPACTION_CANCELLED),
  1184. 1);
  1185. // Now make sure CompactFiles also gets cancelled
  1186. auto l0_files = collector->GetFlushedFiles();
  1187. ASSERT_TRUE(
  1188. dbfull()
  1189. ->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0)
  1190. .IsCompactionTooLarge());
  1191. // Wait for manual compaction to get scheduled and finish
  1192. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1193. ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
  1194. COMPACTION_CANCELLED),
  1195. 2);
  1196. ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  1197. // Now let the flush through and make sure GetCompactionsReservedSize
  1198. // returns to normal
  1199. sfm->SetMaxAllowedSpaceUsage(0);
  1200. int completed_compactions = 0;
  1201. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1202. "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });
  1203. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1204. ASSERT_OK(dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
  1205. l0_files, 0));
  1206. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1207. ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
  1208. ASSERT_GT(completed_compactions, 0);
  1209. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1210. }
  1211. TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
  1212. // This test will set a maximum allowed space for the DB, then it will
  1213. // keep filling the DB until the limit is reached and bg_error_ is set.
  1214. // When bg_error_ is set we will verify that the DB size is greater
  1215. // than the limit.
  1216. std::vector<int> max_space_limits_mbs = {1, 10};
  1217. std::atomic<bool> bg_error_set(false);
  1218. std::atomic<int> reached_max_space_on_flush(0);
  1219. std::atomic<int> reached_max_space_on_compaction(0);
  1220. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1221. "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
  1222. [&](void* arg) {
  1223. Status* bg_error = static_cast<Status*>(arg);
  1224. bg_error_set = true;
  1225. reached_max_space_on_flush++;
  1226. // clear error to ensure compaction callback is called
  1227. *bg_error = Status::OK();
  1228. });
  1229. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1230. "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
  1231. bool* enough_room = static_cast<bool*>(arg);
  1232. *enough_room = true;
  1233. });
  1234. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1235. "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
  1236. [&](void* /*arg*/) {
  1237. bg_error_set = true;
  1238. reached_max_space_on_compaction++;
  1239. });
  1240. for (auto limit_mb : max_space_limits_mbs) {
  1241. bg_error_set = false;
  1242. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
  1243. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1244. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1245. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1246. Options options = CurrentOptions();
  1247. options.sst_file_manager = sst_file_manager;
  1248. options.write_buffer_size = 1024 * 512; // 512 Kb
  1249. DestroyAndReopen(options);
  1250. Random rnd(301);
  1251. sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);
  1252. // It is easy to detect if the test is stuck in a loop. No need for
  1253. // complex termination logic.
  1254. while (true) {
  1255. auto s = Put(rnd.RandomString(10), rnd.RandomString(50));
  1256. if (!s.ok()) {
  1257. break;
  1258. }
  1259. }
  1260. ASSERT_TRUE(bg_error_set);
  1261. uint64_t total_sst_files_size = 0;
  1262. std::unordered_map<std::string, uint64_t> files_in_db;
  1263. ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size));
  1264. ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
  1265. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1266. }
  1267. ASSERT_GT(reached_max_space_on_flush, 0);
  1268. ASSERT_GT(reached_max_space_on_compaction, 0);
  1269. }
  1270. TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
  1271. // Open DB with infinite max open files
  1272. // - First iteration use 1 thread to open files
  1273. // - Second iteration use 5 threads to open files
  1274. for (int iter = 0; iter < 2; iter++) {
  1275. Options options;
  1276. options.create_if_missing = true;
  1277. options.write_buffer_size = 100000;
  1278. options.disable_auto_compactions = true;
  1279. options.max_open_files = -1;
  1280. if (iter == 0) {
  1281. options.max_file_opening_threads = 1;
  1282. } else {
  1283. options.max_file_opening_threads = 5;
  1284. }
  1285. options = CurrentOptions(options);
  1286. DestroyAndReopen(options);
  1287. // Create 12 Files in L0 (then move then to L2)
  1288. for (int i = 0; i < 12; i++) {
  1289. std::string k = "L2_" + Key(i);
  1290. ASSERT_OK(Put(k, k + std::string(1000, 'a')));
  1291. ASSERT_OK(Flush());
  1292. }
  1293. CompactRangeOptions compact_options;
  1294. compact_options.change_level = true;
  1295. compact_options.target_level = 2;
  1296. ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
  1297. // Create 12 Files in L0
  1298. for (int i = 0; i < 12; i++) {
  1299. std::string k = "L0_" + Key(i);
  1300. ASSERT_OK(Put(k, k + std::string(1000, 'a')));
  1301. ASSERT_OK(Flush());
  1302. }
  1303. Close();
  1304. // Reopening the DB will load all existing files
  1305. Reopen(options);
  1306. ASSERT_EQ("12,0,12", FilesPerLevel(0));
  1307. std::vector<std::vector<FileMetaData>> files;
  1308. dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
  1309. for (const auto& level : files) {
  1310. for (const auto& file : level) {
  1311. ASSERT_TRUE(file.table_reader_handle != nullptr);
  1312. }
  1313. }
  1314. for (int i = 0; i < 12; i++) {
  1315. ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a'));
  1316. ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a'));
  1317. }
  1318. }
  1319. }
  1320. TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
  1321. for (CacheEntryRoleOptions::Decision charge_table_reader :
  1322. {CacheEntryRoleOptions::Decision::kEnabled,
  1323. CacheEntryRoleOptions::Decision::kDisabled}) {
  1324. // Open DB with infinite max open files
  1325. // - First iteration use 1 thread to open files
  1326. // - Second iteration use 5 threads to open files
  1327. for (int iter = 0; iter < 2; iter++) {
  1328. Options options;
  1329. options.create_if_missing = true;
  1330. options.write_buffer_size = 100000;
  1331. options.disable_auto_compactions = true;
  1332. options.max_open_files = -1;
  1333. BlockBasedTableOptions table_options;
  1334. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  1335. if (iter == 0) {
  1336. options.max_file_opening_threads = 1;
  1337. } else {
  1338. options.max_file_opening_threads = 5;
  1339. }
  1340. DestroyAndReopen(options);
  1341. // Create 5 Files in L0 (then move then to L2)
  1342. for (int i = 0; i < 5; i++) {
  1343. std::string k = "L2_" + Key(i);
  1344. ASSERT_OK(Put(k, k + std::string(1000, 'a')));
  1345. ASSERT_OK(Flush()) << i;
  1346. }
  1347. CompactRangeOptions compact_options;
  1348. compact_options.change_level = true;
  1349. compact_options.target_level = 2;
  1350. ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
  1351. // Create 5 Files in L0
  1352. for (int i = 0; i < 5; i++) {
  1353. std::string k = "L0_" + Key(i);
  1354. ASSERT_OK(Put(k, k + std::string(1000, 'a')));
  1355. ASSERT_OK(Flush());
  1356. }
  1357. Close();
  1358. table_options.cache_usage_options.options_overrides.insert(
  1359. {CacheEntryRole::kBlockBasedTableReader,
  1360. {/*.charged = */ charge_table_reader}});
  1361. table_options.block_cache =
  1362. NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */,
  1363. true /* strict_capacity_limit */);
  1364. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  1365. // Reopening the DB will try to load all existing files, conditionally
  1366. // subject to memory limit
  1367. Status s = TryReopen(options);
  1368. if (charge_table_reader == CacheEntryRoleOptions::Decision::kEnabled) {
  1369. EXPECT_TRUE(s.IsMemoryLimit());
  1370. EXPECT_TRUE(s.ToString().find(
  1371. kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
  1372. CacheEntryRole::kBlockBasedTableReader)]) !=
  1373. std::string::npos);
  1374. EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
  1375. std::string::npos);
  1376. } else {
  1377. EXPECT_TRUE(s.ok());
  1378. ASSERT_EQ("5,0,5", FilesPerLevel(0));
  1379. }
  1380. }
  1381. }
  1382. }
  1383. TEST_F(DBSSTTest, GetTotalSstFilesSize) {
  1384. // FIXME: L0 file and L1+ file also differ in size of `oldest_key_time`.
  1385. // L0 file has non-zero `oldest_key_time` while L1+ files have 0.
  1386. // The test passes since L1+ file uses current time instead of 0
  1387. // as oldest_ancestor_time.
  1388. //
  1389. // We don't propagate oldest-key-time table property on compaction and
  1390. // just write 0 as default value. This affect the exact table size, since
  1391. // we encode table properties as varint64. Force time to be 0 to work around
  1392. // it. Should remove the workaround after we propagate the property on
  1393. // compaction.
  1394. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1395. "FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
  1396. uint64_t* current_time = static_cast<uint64_t*>(arg);
  1397. *current_time = 0;
  1398. });
  1399. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1400. Options options = CurrentOptions();
  1401. options.disable_auto_compactions = true;
  1402. options.compression = kNoCompression;
  1403. DestroyAndReopen(options);
  1404. // Generate 5 files in L0
  1405. for (int i = 0; i < 5; i++) {
  1406. for (int j = 0; j < 10; j++) {
  1407. std::string val = "val_file_" + std::to_string(i);
  1408. ASSERT_OK(Put(Key(j), val));
  1409. }
  1410. ASSERT_OK(Flush());
  1411. }
  1412. ASSERT_EQ("5", FilesPerLevel(0));
  1413. std::vector<LiveFileMetaData> live_files_meta;
  1414. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1415. ASSERT_EQ(live_files_meta.size(), 5);
  1416. uint64_t single_file_size = live_files_meta[0].size;
  1417. uint64_t live_sst_files_size = 0;
  1418. uint64_t total_sst_files_size = 0;
  1419. for (const auto& file_meta : live_files_meta) {
  1420. live_sst_files_size += file_meta.size;
  1421. }
  1422. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1423. &total_sst_files_size));
  1424. // Live SST files = 5
  1425. // Total SST files = 5
  1426. ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  1427. ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
  1428. // hold current version
  1429. std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
  1430. ASSERT_OK(iter1->status());
  1431. // Compact 5 files into 1 file in L0
  1432. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1433. ASSERT_EQ("0,1", FilesPerLevel(0));
  1434. live_files_meta.clear();
  1435. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1436. ASSERT_EQ(live_files_meta.size(), 1);
  1437. live_sst_files_size = 0;
  1438. total_sst_files_size = 0;
  1439. for (const auto& file_meta : live_files_meta) {
  1440. live_sst_files_size += file_meta.size;
  1441. }
  1442. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1443. &total_sst_files_size));
  1444. // Live SST files = 1 (compacted file)
  1445. // Total SST files = 6 (5 original files + compacted file)
  1446. ASSERT_EQ(live_sst_files_size, 1 * single_file_size);
  1447. ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
  1448. // hold current version
  1449. std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
  1450. ASSERT_OK(iter2->status());
  1451. // Delete all keys and compact, this will delete all live files
  1452. for (int i = 0; i < 10; i++) {
  1453. ASSERT_OK(Delete(Key(i)));
  1454. }
  1455. ASSERT_OK(Flush());
  1456. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1457. ASSERT_EQ("", FilesPerLevel(0));
  1458. live_files_meta.clear();
  1459. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1460. ASSERT_EQ(live_files_meta.size(), 0);
  1461. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1462. &total_sst_files_size));
  1463. // Live SST files = 0
  1464. // Total SST files = 6 (5 original files + compacted file)
  1465. ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
  1466. ASSERT_OK(iter1->status());
  1467. iter1.reset();
  1468. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1469. &total_sst_files_size));
  1470. // Live SST files = 0
  1471. // Total SST files = 1 (compacted file)
  1472. ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
  1473. ASSERT_OK(iter2->status());
  1474. iter2.reset();
  1475. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1476. &total_sst_files_size));
  1477. // Live SST files = 0
  1478. // Total SST files = 0
  1479. ASSERT_EQ(total_sst_files_size, 0);
  1480. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1481. }
  1482. TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
  1483. Options options = CurrentOptions();
  1484. options.disable_auto_compactions = true;
  1485. options.compression = kNoCompression;
  1486. DestroyAndReopen(options);
  1487. // Generate 5 files in L0
  1488. for (int i = 0; i < 5; i++) {
  1489. ASSERT_OK(Put(Key(i), "val"));
  1490. ASSERT_OK(Flush());
  1491. }
  1492. ASSERT_EQ("5", FilesPerLevel(0));
  1493. std::vector<LiveFileMetaData> live_files_meta;
  1494. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1495. ASSERT_EQ(live_files_meta.size(), 5);
  1496. uint64_t single_file_size = live_files_meta[0].size;
  1497. uint64_t live_sst_files_size = 0;
  1498. uint64_t total_sst_files_size = 0;
  1499. for (const auto& file_meta : live_files_meta) {
  1500. live_sst_files_size += file_meta.size;
  1501. }
  1502. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1503. &total_sst_files_size));
  1504. // Live SST files = 5
  1505. // Total SST files = 5
  1506. ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  1507. ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
  1508. // hold current version
  1509. std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
  1510. ASSERT_OK(iter1->status());
  1511. // Compaction will do trivial move from L0 to L1
  1512. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1513. ASSERT_EQ("0,5", FilesPerLevel(0));
  1514. live_files_meta.clear();
  1515. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1516. ASSERT_EQ(live_files_meta.size(), 5);
  1517. live_sst_files_size = 0;
  1518. total_sst_files_size = 0;
  1519. for (const auto& file_meta : live_files_meta) {
  1520. live_sst_files_size += file_meta.size;
  1521. }
  1522. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1523. &total_sst_files_size));
  1524. // Live SST files = 5
  1525. // Total SST files = 5 (used in 2 version)
  1526. ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
  1527. ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
  1528. // hold current version
  1529. std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
  1530. ASSERT_OK(iter2->status());
  1531. // Delete all keys and compact, this will delete all live files
  1532. for (int i = 0; i < 5; i++) {
  1533. ASSERT_OK(Delete(Key(i)));
  1534. }
  1535. ASSERT_OK(Flush());
  1536. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1537. ASSERT_EQ("", FilesPerLevel(0));
  1538. live_files_meta.clear();
  1539. dbfull()->GetLiveFilesMetaData(&live_files_meta);
  1540. ASSERT_EQ(live_files_meta.size(), 0);
  1541. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1542. &total_sst_files_size));
  1543. // Live SST files = 0
  1544. // Total SST files = 5 (used in 2 version)
  1545. ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
  1546. ASSERT_OK(iter1->status());
  1547. iter1.reset();
  1548. ASSERT_OK(iter2->status());
  1549. iter2.reset();
  1550. ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
  1551. &total_sst_files_size));
  1552. // Live SST files = 0
  1553. // Total SST files = 0
  1554. ASSERT_EQ(total_sst_files_size, 0);
  1555. }
  1556. // This test if blob files are recorded by SST File Manager when Compaction job
  1557. // creates/delete them and in case of AtomicFlush.
  1558. TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
  1559. std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
  1560. auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
  1561. Options options = CurrentOptions();
  1562. options.sst_file_manager = sst_file_manager;
  1563. options.enable_blob_files = true;
  1564. options.min_blob_size = 0;
  1565. options.disable_auto_compactions = true;
  1566. options.enable_blob_garbage_collection = true;
  1567. options.blob_garbage_collection_age_cutoff = 0.5;
  1568. options.atomic_flush = true;
  1569. int files_added = 0;
  1570. int files_deleted = 0;
  1571. int files_scheduled_to_delete = 0;
  1572. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1573. "SstFileManagerImpl::OnAddFile", [&](void* arg) {
  1574. const std::string* const file_path =
  1575. static_cast<const std::string*>(arg);
  1576. if (EndsWith(*file_path, ".blob")) {
  1577. files_added++;
  1578. }
  1579. });
  1580. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1581. "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
  1582. const std::string* const file_path =
  1583. static_cast<const std::string*>(arg);
  1584. if (EndsWith(*file_path, ".blob")) {
  1585. files_deleted++;
  1586. }
  1587. });
  1588. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1589. "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
  1590. assert(arg);
  1591. const std::string* const file_path =
  1592. static_cast<const std::string*>(arg);
  1593. if (EndsWith(*file_path, ".blob")) {
  1594. ++files_scheduled_to_delete;
  1595. }
  1596. });
  1597. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1598. DestroyAndReopen(options);
  1599. Random rnd(301);
  1600. ASSERT_OK(Put("key_1", "value_1"));
  1601. ASSERT_OK(Put("key_2", "value_2"));
  1602. ASSERT_OK(Put("key_3", "value_3"));
  1603. ASSERT_OK(Put("key_4", "value_4"));
  1604. ASSERT_OK(Flush());
  1605. // Overwrite will create the garbage data.
  1606. ASSERT_OK(Put("key_3", "new_value_3"));
  1607. ASSERT_OK(Put("key_4", "new_value_4"));
  1608. ASSERT_OK(Flush());
  1609. ASSERT_OK(Put("Key5", "blob_value5"));
  1610. ASSERT_OK(Put("Key6", "blob_value6"));
  1611. ASSERT_OK(Flush());
  1612. ASSERT_EQ(files_added, 3);
  1613. ASSERT_EQ(files_deleted, 0);
  1614. ASSERT_EQ(files_scheduled_to_delete, 0);
  1615. files_added = 0;
  1616. constexpr Slice* begin = nullptr;
  1617. constexpr Slice* end = nullptr;
  1618. // Compaction job will create a new file and delete the older files.
  1619. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  1620. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1621. ASSERT_EQ(files_added, 1);
  1622. ASSERT_EQ(files_scheduled_to_delete, 1);
  1623. sfm->WaitForEmptyTrash();
  1624. ASSERT_EQ(files_deleted, 1);
  1625. Close();
  1626. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1627. "SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
  1628. assert(arg);
  1629. const std::string* const file_path =
  1630. static_cast<const std::string*>(arg);
  1631. if (EndsWith(*file_path, ".blob")) {
  1632. ++files_scheduled_to_delete;
  1633. }
  1634. });
  1635. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1636. "DeleteScheduler::OnDeleteFile", [&](void* arg) {
  1637. const std::string* const file_path =
  1638. static_cast<const std::string*>(arg);
  1639. if (EndsWith(*file_path, ".blob")) {
  1640. files_deleted++;
  1641. }
  1642. });
  1643. ASSERT_OK(DestroyDB(dbname_, options));
  1644. ASSERT_EQ(files_scheduled_to_delete, 4);
  1645. sfm->WaitForEmptyTrash();
  1646. ASSERT_EQ(files_deleted, 4);
  1647. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1648. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1649. }
  1650. TEST_F(DBSSTTest, SstGetFileSizeFails) {
  1651. // Build an SST file
  1652. ASSERT_OK(Put("x", "zaphod"));
  1653. ASSERT_OK(Flush());
  1654. std::vector<LiveFileMetaData> metadata;
  1655. db_->GetLiveFilesMetaData(&metadata);
  1656. ASSERT_EQ(1U, metadata.size());
  1657. std::string filename = dbname_ + metadata[0].name;
  1658. // Prepare for fault injection
  1659. std::shared_ptr<FaultInjectionTestFS> fault_fs =
  1660. std::make_shared<FaultInjectionTestFS>(
  1661. CurrentOptions().env->GetFileSystem());
  1662. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  1663. Options options = CurrentOptions();
  1664. options.env = fault_fs_env.get();
  1665. options.paranoid_checks = false; // don't check file sizes on open
  1666. for (int i = 0; i < 4; i++) {
  1667. SCOPED_TRACE("Iteration = " + std::to_string(i));
  1668. fault_fs->SetFailRandomAccessGetFileSizeSst(false);
  1669. fault_fs->SetFailFilesystemGetFileSizeSst(false);
  1670. Close();
  1671. if (i == 1) {
  1672. // Just FSRandomAccessFile::GetFileSize fails, which should be worked
  1673. // around
  1674. fault_fs->SetFailRandomAccessGetFileSizeSst(true);
  1675. } else if (i == 2) {
  1676. // FileSystem::GetFileSize fails, which should be worked around if
  1677. // FSRandomAccessFile::GetFileSize is supported
  1678. fault_fs->SetFailFilesystemGetFileSizeSst(true);
  1679. } else if (i == 3) {
  1680. // Both GetFileSize APIs fail with an IOError
  1681. fault_fs->SetFailRandomAccessGetFileSizeSst(true);
  1682. fault_fs->SetFailFilesystemGetFileSizeSst(true);
  1683. }
  1684. ASSERT_OK(TryReopen(options));
  1685. std::string value;
  1686. Status get_status = db_->Get({}, "x", &value);
  1687. if (i < 2) {
  1688. ASSERT_OK(get_status);
  1689. } else if (i == 2) {
  1690. if (encrypted_env_) {
  1691. // Can't recover because RandomAccessFile::GetFileSize is not supported
  1692. // on EncryptedEnv
  1693. // Fail with propagated IOError. (Not Corruption nor NotSupported!)
  1694. ASSERT_EQ(get_status.code(), Status::Code::kIOError);
  1695. ASSERT_STREQ(get_status.getState(), "FileSystem::GetFileSize failed");
  1696. } else {
  1697. // Never sees the FileSystem::GetFileSize failure
  1698. ASSERT_OK(get_status);
  1699. }
  1700. } else {
  1701. ASSERT_EQ(i, 3);
  1702. // Fail with propagated IOError. (Not Corruption nor NotSupported!)
  1703. ASSERT_EQ(get_status.code(), Status::Code::kIOError);
  1704. ASSERT_STREQ(get_status.getState(), "FileSystem::GetFileSize failed");
  1705. }
  1706. }
  1707. Close();
  1708. }
  1709. } // namespace ROCKSDB_NAMESPACE
  1710. int main(int argc, char** argv) {
  1711. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1712. ::testing::InitGoogleTest(&argc, argv);
  1713. RegisterCustomObjects(argc, argv);
  1714. return RUN_ALL_TESTS();
  1715. }