obsolete_files_test.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <algorithm>
  10. #include <cstdlib>
  11. #include <map>
  12. #include <string>
  13. #include <vector>
  14. #include "db/db_impl/db_impl.h"
  15. #include "db/db_test_util.h"
  16. #include "db/version_set.h"
  17. #include "db/write_batch_internal.h"
  18. #include "file/filename.h"
  19. #include "port/stack_trace.h"
  20. #include "rocksdb/db.h"
  21. #include "rocksdb/env.h"
  22. #include "rocksdb/transaction_log.h"
  23. #include "test_util/sync_point.h"
  24. #include "test_util/testharness.h"
  25. #include "test_util/testutil.h"
  26. #include "util/string_util.h"
  27. namespace ROCKSDB_NAMESPACE {
  28. class ObsoleteFilesTest : public DBTestBase {
  29. public:
  30. ObsoleteFilesTest()
  31. : DBTestBase("obsolete_files_test", /*env_do_fsync=*/true),
  32. wal_dir_(dbname_ + "/wal_files") {}
  33. void AddKeys(int numkeys, int startkey) {
  34. WriteOptions options;
  35. options.sync = false;
  36. for (int i = startkey; i < (numkeys + startkey); i++) {
  37. std::string temp = std::to_string(i);
  38. Slice key(temp);
  39. Slice value(temp);
  40. ASSERT_OK(db_->Put(options, key, value));
  41. }
  42. }
  43. void createLevel0Files(int numFiles, int numKeysPerFile) {
  44. int startKey = 0;
  45. for (int i = 0; i < numFiles; i++) {
  46. AddKeys(numKeysPerFile, startKey);
  47. startKey += numKeysPerFile;
  48. ASSERT_OK(dbfull()->TEST_FlushMemTable());
  49. ASSERT_OK(
  50. dbfull()->TEST_WaitForCompact()); // wait for background flush (flush
  51. // is also a kind of compaction).
  52. }
  53. }
  54. void CheckFileTypeCounts(const std::string& dir, int required_log,
  55. int required_sst, int required_manifest) {
  56. std::vector<std::string> filenames;
  57. ASSERT_OK(env_->GetChildren(dir, &filenames));
  58. int log_cnt = 0;
  59. int sst_cnt = 0;
  60. int manifest_cnt = 0;
  61. for (const auto& file : filenames) {
  62. uint64_t number;
  63. FileType type;
  64. if (ParseFileName(file, &number, &type)) {
  65. log_cnt += (type == kWalFile);
  66. sst_cnt += (type == kTableFile);
  67. manifest_cnt += (type == kDescriptorFile);
  68. }
  69. }
  70. ASSERT_EQ(required_log, log_cnt);
  71. ASSERT_EQ(required_sst, sst_cnt);
  72. ASSERT_EQ(required_manifest, manifest_cnt);
  73. }
  74. void ReopenDB() {
  75. Options options = CurrentOptions();
  76. // Trigger compaction when the number of level 0 files reaches 2.
  77. options.create_if_missing = true;
  78. options.level0_file_num_compaction_trigger = 2;
  79. options.disable_auto_compactions = false;
  80. options.delete_obsolete_files_period_micros = 0; // always do full purge
  81. options.enable_thread_tracking = true;
  82. options.write_buffer_size = 1024 * 1024 * 1000;
  83. options.target_file_size_base = 1024 * 1024 * 1000;
  84. options.max_bytes_for_level_base = 1024 * 1024 * 1000;
  85. options.WAL_ttl_seconds = 300; // Used to test log files
  86. options.WAL_size_limit_MB = 1024; // Used to test log files
  87. options.wal_dir = wal_dir_;
  88. // Note: the following prevents an otherwise harmless data race between the
  89. // test setup code (AddBlobFile) in ObsoleteFilesTest.BlobFiles and the
  90. // periodic stat dumping thread.
  91. options.stats_dump_period_sec = 0;
  92. Destroy(options);
  93. Reopen(options);
  94. }
  95. const std::string wal_dir_;
  96. };
  97. TEST_F(ObsoleteFilesTest, RaceForObsoleteFileDeletion) {
  98. ReopenDB();
  99. SyncPoint::GetInstance()->DisableProcessing();
  100. SyncPoint::GetInstance()->LoadDependency({
  101. {"DBImpl::BackgroundCallCompaction:FoundObsoleteFiles",
  102. "ObsoleteFilesTest::RaceForObsoleteFileDeletion:1"},
  103. {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
  104. "ObsoleteFilesTest::RaceForObsoleteFileDeletion:2"},
  105. });
  106. SyncPoint::GetInstance()->SetCallBack(
  107. "DBImpl::DeleteObsoleteFileImpl:AfterDeletion", [&](void* arg) {
  108. Status* p_status = static_cast<Status*>(arg);
  109. ASSERT_OK(*p_status);
  110. });
  111. SyncPoint::GetInstance()->SetCallBack(
  112. "DBImpl::CloseHelper:PendingPurgeFinished", [&](void* arg) {
  113. std::unordered_set<uint64_t>* files_grabbed_for_purge_ptr =
  114. reinterpret_cast<std::unordered_set<uint64_t>*>(arg);
  115. ASSERT_TRUE(files_grabbed_for_purge_ptr->empty());
  116. });
  117. SyncPoint::GetInstance()->EnableProcessing();
  118. createLevel0Files(2, 50000);
  119. CheckFileTypeCounts(wal_dir_, 1, 0, 0);
  120. port::Thread user_thread([this]() {
  121. JobContext jobCxt(0);
  122. TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:1");
  123. dbfull()->TEST_LockMutex();
  124. dbfull()->FindObsoleteFiles(&jobCxt, true /* force=true */,
  125. false /* no_full_scan=false */);
  126. dbfull()->TEST_UnlockMutex();
  127. TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:2");
  128. dbfull()->PurgeObsoleteFiles(jobCxt);
  129. jobCxt.Clean();
  130. });
  131. user_thread.join();
  132. }
  133. TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) {
  134. ReopenDB();
  135. createLevel0Files(2, 50000);
  136. CheckFileTypeCounts(wal_dir_, 1, 0, 0);
  137. ASSERT_OK(dbfull()->DisableFileDeletions());
  138. for (int i = 0; i != 4; ++i) {
  139. if (i % 2) {
  140. ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(),
  141. {{"paranoid_file_checks", "false"}}));
  142. } else {
  143. ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(),
  144. {{"paranoid_file_checks", "true"}}));
  145. }
  146. }
  147. ASSERT_OK(dbfull()->EnableFileDeletions());
  148. Close();
  149. std::vector<std::string> files;
  150. int opts_file_count = 0;
  151. ASSERT_OK(env_->GetChildren(dbname_, &files));
  152. for (const auto& file : files) {
  153. uint64_t file_num;
  154. Slice dummy_info_log_name_prefix;
  155. FileType type;
  156. WalFileType log_type;
  157. if (ParseFileName(file, &file_num, dummy_info_log_name_prefix, &type,
  158. &log_type) &&
  159. type == kOptionsFile) {
  160. opts_file_count++;
  161. }
  162. }
  163. ASSERT_EQ(2, opts_file_count);
  164. }
  165. TEST_F(ObsoleteFilesTest, BlobFiles) {
  166. ReopenDB();
  167. VersionSet* const versions = dbfull()->GetVersionSet();
  168. assert(versions);
  169. assert(versions->GetColumnFamilySet());
  170. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  171. assert(cfd);
  172. const auto& cf_paths = cfd->ioptions().cf_paths;
  173. assert(!cf_paths.empty());
  174. const std::string& path = cf_paths.front().path;
  175. // Add an obsolete blob file.
  176. constexpr uint64_t first_blob_file_number = 234;
  177. versions->AddObsoleteBlobFile(first_blob_file_number, path);
  178. // Add a live blob file.
  179. Version* const version = cfd->current();
  180. assert(version);
  181. VersionStorageInfo* const storage_info = version->storage_info();
  182. assert(storage_info);
  183. constexpr uint64_t second_blob_file_number = 456;
  184. constexpr uint64_t second_total_blob_count = 100;
  185. constexpr uint64_t second_total_blob_bytes = 2000000;
  186. constexpr char second_checksum_method[] = "CRC32B";
  187. constexpr char second_checksum_value[] = "\x6d\xbd\xf2\x3a";
  188. auto shared_meta = SharedBlobFileMetaData::Create(
  189. second_blob_file_number, second_total_blob_count, second_total_blob_bytes,
  190. second_checksum_method, second_checksum_value);
  191. constexpr uint64_t second_garbage_blob_count = 0;
  192. constexpr uint64_t second_garbage_blob_bytes = 0;
  193. auto meta = BlobFileMetaData::Create(
  194. std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
  195. second_garbage_blob_count, second_garbage_blob_bytes);
  196. storage_info->AddBlobFile(std::move(meta));
  197. // Check for obsolete files and make sure the first blob file is picked up
  198. // and grabbed for purge. The second blob file should be on the live list.
  199. constexpr int job_id = 0;
  200. JobContext job_context{job_id};
  201. dbfull()->TEST_LockMutex();
  202. constexpr bool force_full_scan = false;
  203. dbfull()->FindObsoleteFiles(&job_context, force_full_scan);
  204. dbfull()->TEST_UnlockMutex();
  205. ASSERT_TRUE(job_context.HaveSomethingToDelete());
  206. ASSERT_EQ(job_context.blob_delete_files.size(), 1);
  207. ASSERT_EQ(job_context.blob_delete_files[0].GetBlobFileNumber(),
  208. first_blob_file_number);
  209. const auto& files_grabbed_for_purge =
  210. dbfull()->TEST_GetFilesGrabbedForPurge();
  211. ASSERT_NE(files_grabbed_for_purge.find(first_blob_file_number),
  212. files_grabbed_for_purge.end());
  213. ASSERT_EQ(job_context.blob_live.size(), 1);
  214. ASSERT_EQ(job_context.blob_live[0], second_blob_file_number);
  215. // Hack the job context a bit by adding a few files to the full scan
  216. // list and adjusting the pending file number. We add the two files
  217. // above as well as two additional ones, where one is old
  218. // and should be cleaned up, and the other is still pending.
  219. constexpr uint64_t old_blob_file_number = 123;
  220. constexpr uint64_t pending_blob_file_number = 567;
  221. job_context.full_scan_candidate_files.emplace_back(
  222. BlobFileName(old_blob_file_number), path);
  223. job_context.full_scan_candidate_files.emplace_back(
  224. BlobFileName(first_blob_file_number), path);
  225. job_context.full_scan_candidate_files.emplace_back(
  226. BlobFileName(second_blob_file_number), path);
  227. job_context.full_scan_candidate_files.emplace_back(
  228. BlobFileName(pending_blob_file_number), path);
  229. job_context.min_pending_output = pending_blob_file_number;
  230. // Purge obsolete files and make sure we purge the old file and the first file
  231. // (and keep the second file and the pending file).
  232. std::vector<std::string> deleted_files;
  233. SyncPoint::GetInstance()->SetCallBack(
  234. "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", [&](void* arg) {
  235. const std::string* file = static_cast<std::string*>(arg);
  236. assert(file);
  237. constexpr char blob_extension[] = ".blob";
  238. if (file->find(blob_extension) != std::string::npos) {
  239. deleted_files.emplace_back(*file);
  240. }
  241. });
  242. SyncPoint::GetInstance()->EnableProcessing();
  243. dbfull()->PurgeObsoleteFiles(job_context);
  244. job_context.Clean();
  245. SyncPoint::GetInstance()->DisableProcessing();
  246. SyncPoint::GetInstance()->ClearAllCallBacks();
  247. ASSERT_EQ(files_grabbed_for_purge.find(first_blob_file_number),
  248. files_grabbed_for_purge.end());
  249. std::sort(deleted_files.begin(), deleted_files.end());
  250. const std::vector<std::string> expected_deleted_files{
  251. BlobFileName(path, old_blob_file_number),
  252. BlobFileName(path, first_blob_file_number)};
  253. ASSERT_EQ(deleted_files, expected_deleted_files);
  254. }
  255. } // namespace ROCKSDB_NAMESPACE
  256. int main(int argc, char** argv) {
  257. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  258. ::testing::InitGoogleTest(&argc, argv);
  259. RegisterCustomObjects(argc, argv);
  260. return RUN_ALL_TESTS();
  261. }