compact_files_test.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <mutex>
  6. #include <string>
  7. #include <thread>
  8. #include <vector>
  9. #include "db/db_impl/db_impl.h"
  10. #include "port/port.h"
  11. #include "rocksdb/db.h"
  12. #include "rocksdb/env.h"
  13. #include "test_util/sync_point.h"
  14. #include "test_util/testharness.h"
  15. #include "util/cast_util.h"
  16. #include "util/string_util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class CompactFilesTest : public testing::Test {
  19. public:
  20. CompactFilesTest() {
  21. env_ = Env::Default();
  22. db_name_ = test::PerThreadDBPath("compact_files_test");
  23. }
  24. std::string db_name_;
  25. Env* env_;
  26. };
  27. // A class which remembers the name of each flushed file.
  28. class FlushedFileCollector : public EventListener {
  29. public:
  30. FlushedFileCollector() = default;
  31. ~FlushedFileCollector() override = default;
  32. void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
  33. std::lock_guard<std::mutex> lock(mutex_);
  34. flushed_files_.push_back(info.file_path);
  35. }
  36. std::vector<std::string> GetFlushedFiles() {
  37. std::lock_guard<std::mutex> lock(mutex_);
  38. std::vector<std::string> result;
  39. for (const auto& fname : flushed_files_) {
  40. result.push_back(fname);
  41. }
  42. return result;
  43. }
  44. void ClearFlushedFiles() {
  45. std::lock_guard<std::mutex> lock(mutex_);
  46. flushed_files_.clear();
  47. }
  48. private:
  49. std::vector<std::string> flushed_files_;
  50. std::mutex mutex_;
  51. };
  52. TEST_F(CompactFilesTest, L0ConflictsFiles) {
  53. Options options;
  54. // to trigger compaction more easily
  55. const int kWriteBufferSize = 10000;
  56. const int kLevel0Trigger = 2;
  57. options.create_if_missing = true;
  58. options.level_compaction_dynamic_level_bytes = false;
  59. options.compaction_style = kCompactionStyleLevel;
  60. // Small slowdown and stop trigger for experimental purpose.
  61. options.level0_slowdown_writes_trigger = 20;
  62. options.level0_stop_writes_trigger = 20;
  63. options.level0_stop_writes_trigger = 20;
  64. options.write_buffer_size = kWriteBufferSize;
  65. options.level0_file_num_compaction_trigger = kLevel0Trigger;
  66. options.compression = kNoCompression;
  67. DB* db = nullptr;
  68. ASSERT_OK(DestroyDB(db_name_, options));
  69. Status s = DB::Open(options, db_name_, &db);
  70. assert(s.ok());
  71. assert(db);
  72. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  73. {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
  74. {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
  75. });
  76. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  77. // create couple files
  78. // Background compaction starts and waits in BackgroundCallCompaction:0
  79. for (int i = 0; i < kLevel0Trigger * 4; ++i) {
  80. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
  81. ASSERT_OK(db->Put(WriteOptions(), std::to_string(100 - i), ""));
  82. ASSERT_OK(db->Flush(FlushOptions()));
  83. }
  84. ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
  85. db->GetColumnFamilyMetaData(&meta);
  86. std::string file1;
  87. for (auto& file : meta.levels[0].files) {
  88. ASSERT_EQ(0, meta.levels[0].level);
  89. if (file1 == "") {
  90. file1 = file.db_path + "/" + file.name;
  91. } else {
  92. std::string file2 = file.db_path + "/" + file.name;
  93. // Another thread starts a compact files and creates an L0 compaction
  94. // The background compaction then notices that there is an L0 compaction
  95. // already in progress and doesn't do an L0 compaction
  96. // Once the background compaction finishes, the compact files finishes
  97. ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
  98. {file1, file2}, 0));
  99. break;
  100. }
  101. }
  102. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  103. delete db;
  104. }
  105. TEST_F(CompactFilesTest, MultipleLevel) {
  106. Options options;
  107. options.create_if_missing = true;
  108. // Otherwise background compaction can happen to
  109. // drain unnecessary level
  110. options.level_compaction_dynamic_level_bytes = false;
  111. options.num_levels = 6;
  112. // Add listener
  113. FlushedFileCollector* collector = new FlushedFileCollector();
  114. options.listeners.emplace_back(collector);
  115. DB* db = nullptr;
  116. ASSERT_OK(DestroyDB(db_name_, options));
  117. Status s = DB::Open(options, db_name_, &db);
  118. ASSERT_OK(s);
  119. ASSERT_NE(db, nullptr);
  120. // create couple files in L0, L3, L4 and L5
  121. for (int i = 5; i > 2; --i) {
  122. collector->ClearFlushedFiles();
  123. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
  124. ASSERT_OK(db->Flush(FlushOptions()));
  125. // Ensure background work is fully finished including listener callbacks
  126. // before accessing listener state.
  127. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
  128. auto l0_files = collector->GetFlushedFiles();
  129. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i));
  130. std::string prop;
  131. ASSERT_TRUE(db->GetProperty(
  132. "rocksdb.num-files-at-level" + std::to_string(i), &prop));
  133. ASSERT_EQ("1", prop);
  134. }
  135. ASSERT_OK(db->Put(WriteOptions(), std::to_string(0), ""));
  136. ASSERT_OK(db->Flush(FlushOptions()));
  137. ColumnFamilyMetaData meta;
  138. db->GetColumnFamilyMetaData(&meta);
  139. // Compact files except the file in L3
  140. std::vector<std::string> files;
  141. for (int i = 0; i < 6; ++i) {
  142. if (i == 3) {
  143. continue;
  144. }
  145. for (auto& file : meta.levels[i].files) {
  146. files.push_back(file.db_path + "/" + file.name);
  147. }
  148. }
  149. SyncPoint::GetInstance()->LoadDependency({
  150. {"CompactionJob::Run():Start", "CompactFilesTest.MultipleLevel:0"},
  151. {"CompactFilesTest.MultipleLevel:1", "CompactFilesImpl:3"},
  152. });
  153. SyncPoint::GetInstance()->EnableProcessing();
  154. std::thread thread([&] {
  155. TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:0");
  156. ASSERT_OK(db->Put(WriteOptions(), "bar", "v2"));
  157. ASSERT_OK(db->Put(WriteOptions(), "foo", "v2"));
  158. ASSERT_OK(db->Flush(FlushOptions()));
  159. TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:1");
  160. });
  161. // Compaction cannot move up the data to higher level
  162. // here we have input file from level 5, so the output level has to be >= 5
  163. for (int invalid_output_level = 0; invalid_output_level < 5;
  164. invalid_output_level++) {
  165. s = db->CompactFiles(CompactionOptions(), files, invalid_output_level);
  166. ASSERT_TRUE(s.IsInvalidArgument());
  167. }
  168. ASSERT_OK(db->CompactFiles(CompactionOptions(), files, 5));
  169. SyncPoint::GetInstance()->DisableProcessing();
  170. thread.join();
  171. delete db;
  172. }
  173. TEST_F(CompactFilesTest, ObsoleteFiles) {
  174. Options options;
  175. // to trigger compaction more easily
  176. const int kWriteBufferSize = 65536;
  177. options.create_if_missing = true;
  178. // Disable RocksDB background compaction.
  179. options.compaction_style = kCompactionStyleNone;
  180. options.level0_slowdown_writes_trigger = (1 << 30);
  181. options.level0_stop_writes_trigger = (1 << 30);
  182. options.write_buffer_size = kWriteBufferSize;
  183. options.max_write_buffer_number = 2;
  184. options.compression = kNoCompression;
  185. // Add listener
  186. FlushedFileCollector* collector = new FlushedFileCollector();
  187. options.listeners.emplace_back(collector);
  188. DB* db = nullptr;
  189. ASSERT_OK(DestroyDB(db_name_, options));
  190. Status s = DB::Open(options, db_name_, &db);
  191. ASSERT_OK(s);
  192. ASSERT_NE(db, nullptr);
  193. // create couple files
  194. for (int i = 1000; i < 2000; ++i) {
  195. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
  196. std::string(kWriteBufferSize / 10, 'a' + (i % 26))));
  197. }
  198. auto l0_files = collector->GetFlushedFiles();
  199. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
  200. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact());
  201. // verify all compaction input files are deleted
  202. for (const auto& fname : l0_files) {
  203. ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
  204. }
  205. delete db;
  206. }
  207. TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
  208. Options options;
  209. options.create_if_missing = true;
  210. // Disable RocksDB background compaction.
  211. options.compaction_style = kCompactionStyleNone;
  212. options.level0_slowdown_writes_trigger = 1000;
  213. options.level0_stop_writes_trigger = 1000;
  214. options.write_buffer_size = 65536;
  215. options.max_write_buffer_number = 2;
  216. options.compression = kNoCompression;
  217. options.max_compaction_bytes = 5000;
  218. // Add listener
  219. FlushedFileCollector* collector = new FlushedFileCollector();
  220. options.listeners.emplace_back(collector);
  221. DB* db = nullptr;
  222. ASSERT_OK(DestroyDB(db_name_, options));
  223. Status s = DB::Open(options, db_name_, &db);
  224. assert(s.ok());
  225. assert(db);
  226. // create couple files
  227. for (int i = 0; i < 500; ++i) {
  228. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
  229. std::string(1000, 'a' + (i % 26))));
  230. }
  231. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
  232. auto l0_files_1 = collector->GetFlushedFiles();
  233. collector->ClearFlushedFiles();
  234. for (int i = 0; i < 500; ++i) {
  235. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
  236. std::string(1000, 'a' + (i % 26))));
  237. }
  238. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
  239. auto l0_files_2 = collector->GetFlushedFiles();
  240. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
  241. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
  242. // no assertion failure
  243. delete db;
  244. }
  245. TEST_F(CompactFilesTest, CapturingPendingFiles) {
  246. Options options;
  247. options.create_if_missing = true;
  248. // Disable RocksDB background compaction.
  249. options.compaction_style = kCompactionStyleNone;
  250. // Always do full scans for obsolete files (needed to reproduce the issue).
  251. options.delete_obsolete_files_period_micros = 0;
  252. // Add listener.
  253. FlushedFileCollector* collector = new FlushedFileCollector();
  254. options.listeners.emplace_back(collector);
  255. DB* db = nullptr;
  256. ASSERT_OK(DestroyDB(db_name_, options));
  257. Status s = DB::Open(options, db_name_, &db);
  258. ASSERT_OK(s);
  259. assert(db);
  260. // Create 5 files.
  261. for (int i = 0; i < 5; ++i) {
  262. ASSERT_OK(db->Put(WriteOptions(), "key" + std::to_string(i), "value"));
  263. ASSERT_OK(db->Flush(FlushOptions()));
  264. }
  265. // Ensure background work is fully finished including listener callbacks
  266. // before accessing listener state.
  267. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
  268. auto l0_files = collector->GetFlushedFiles();
  269. EXPECT_EQ(5, l0_files.size());
  270. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  271. {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
  272. {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
  273. });
  274. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  275. // Start compacting files.
  276. ROCKSDB_NAMESPACE::port::Thread compaction_thread(
  277. [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
  278. // In the meantime flush another file.
  279. TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
  280. ASSERT_OK(db->Put(WriteOptions(), "key5", "value"));
  281. ASSERT_OK(db->Flush(FlushOptions()));
  282. TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
  283. compaction_thread.join();
  284. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  285. delete db;
  286. // Make sure we can reopen the DB.
  287. s = DB::Open(options, db_name_, &db);
  288. ASSERT_OK(s);
  289. assert(db);
  290. delete db;
  291. }
  292. TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
  293. class FilterWithGet : public CompactionFilter {
  294. public:
  295. bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
  296. std::string* /*new_value*/,
  297. bool* /*value_changed*/) const override {
  298. if (db_ == nullptr) {
  299. return true;
  300. }
  301. std::string res;
  302. EXPECT_TRUE(db_->Get(ReadOptions(), "", &res).IsNotFound());
  303. return true;
  304. }
  305. void SetDB(DB* db) { db_ = db; }
  306. const char* Name() const override { return "FilterWithGet"; }
  307. private:
  308. DB* db_;
  309. };
  310. std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
  311. Options options;
  312. options.level_compaction_dynamic_level_bytes = false;
  313. options.create_if_missing = true;
  314. options.compaction_filter = cf.get();
  315. DB* db = nullptr;
  316. ASSERT_OK(DestroyDB(db_name_, options));
  317. Status s = DB::Open(options, db_name_, &db);
  318. ASSERT_OK(s);
  319. cf->SetDB(db);
  320. // Write one L0 file
  321. ASSERT_OK(db->Put(WriteOptions(), "K1", "V1"));
  322. ASSERT_OK(db->Flush(FlushOptions()));
  323. // Compact all L0 files using CompactFiles
  324. ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
  325. db->GetColumnFamilyMetaData(&meta);
  326. for (auto& file : meta.levels[0].files) {
  327. std::string fname = file.db_path + "/" + file.name;
  328. ASSERT_OK(
  329. db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
  330. }
  331. delete db;
  332. }
  333. TEST_F(CompactFilesTest, SentinelCompressionType) {
  334. if (!Zlib_Supported()) {
  335. fprintf(stderr, "zlib compression not supported, skip this test\n");
  336. return;
  337. }
  338. if (!Snappy_Supported()) {
  339. fprintf(stderr, "snappy compression not supported, skip this test\n");
  340. return;
  341. }
  342. // Check that passing `CompressionType::kDisableCompressionOption` to
  343. // `CompactFiles` causes it to use the column family compression options.
  344. for (auto compaction_style : {CompactionStyle::kCompactionStyleLevel,
  345. CompactionStyle::kCompactionStyleUniversal,
  346. CompactionStyle::kCompactionStyleNone}) {
  347. ASSERT_OK(DestroyDB(db_name_, Options()));
  348. Options options;
  349. options.level_compaction_dynamic_level_bytes = false;
  350. options.compaction_style = compaction_style;
  351. // L0: Snappy, L1: ZSTD, L2: Snappy
  352. options.compression_per_level = {CompressionType::kSnappyCompression,
  353. CompressionType::kZlibCompression,
  354. CompressionType::kSnappyCompression};
  355. options.create_if_missing = true;
  356. FlushedFileCollector* collector = new FlushedFileCollector();
  357. options.listeners.emplace_back(collector);
  358. DB* db = nullptr;
  359. ASSERT_OK(DB::Open(options, db_name_, &db));
  360. ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
  361. ASSERT_OK(db->Flush(FlushOptions()));
  362. // Ensure background work is fully finished including listener callbacks
  363. // before accessing listener state.
  364. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
  365. auto l0_files = collector->GetFlushedFiles();
  366. ASSERT_EQ(1, l0_files.size());
  367. // L0->L1 compaction, so output should be ZSTD-compressed
  368. CompactionOptions compaction_opts;
  369. compaction_opts.compression = CompressionType::kDisableCompressionOption;
  370. ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
  371. ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;
  372. ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
  373. for (const auto& name_and_table_props : all_tables_props) {
  374. ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
  375. name_and_table_props.second->compression_name);
  376. }
  377. delete db;
  378. }
  379. }
  380. TEST_F(CompactFilesTest, CompressionWithBlockAlign) {
  381. if (!Snappy_Supported()) {
  382. ROCKSDB_GTEST_SKIP("Test requires Snappy support");
  383. return;
  384. }
  385. Options options;
  386. options.compression = CompressionType::kNoCompression;
  387. options.create_if_missing = true;
  388. options.disable_auto_compactions = true;
  389. std::shared_ptr<FlushedFileCollector> collector =
  390. std::make_shared<FlushedFileCollector>();
  391. options.listeners.push_back(collector);
  392. {
  393. BlockBasedTableOptions bbto;
  394. bbto.block_align = true;
  395. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  396. }
  397. std::unique_ptr<DB> db;
  398. {
  399. DB* _db = nullptr;
  400. ASSERT_OK(DB::Open(options, db_name_, &_db));
  401. db.reset(_db);
  402. }
  403. ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
  404. ASSERT_OK(db->Flush(FlushOptions()));
  405. // Ensure background work is fully finished including listener callbacks
  406. // before accessing listener state.
  407. ASSERT_OK(
  408. static_cast_with_check<DBImpl>(db.get())->TEST_WaitForBackgroundWork());
  409. auto l0_files = collector->GetFlushedFiles();
  410. ASSERT_EQ(1, l0_files.size());
  411. // We can run this test even without Snappy support because we expect the
  412. // `CompactFiles()` to fail before actually invoking Snappy compression.
  413. CompactionOptions compaction_opts;
  414. compaction_opts.compression = CompressionType::kSnappyCompression;
  415. ASSERT_TRUE(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */)
  416. .IsInvalidArgument());
  417. compaction_opts.compression = CompressionType::kDisableCompressionOption;
  418. ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */));
  419. }
  420. TEST_F(CompactFilesTest, GetCompactionJobInfo) {
  421. Options options;
  422. options.create_if_missing = true;
  423. // Disable RocksDB background compaction.
  424. options.compaction_style = kCompactionStyleNone;
  425. options.level0_slowdown_writes_trigger = 1000;
  426. options.level0_stop_writes_trigger = 1000;
  427. options.write_buffer_size = 65536;
  428. options.max_write_buffer_number = 2;
  429. options.compression = kNoCompression;
  430. options.max_compaction_bytes = 5000;
  431. // Add listener
  432. FlushedFileCollector* collector = new FlushedFileCollector();
  433. options.listeners.emplace_back(collector);
  434. DB* db = nullptr;
  435. ASSERT_OK(DestroyDB(db_name_, options));
  436. Status s = DB::Open(options, db_name_, &db);
  437. ASSERT_OK(s);
  438. assert(db);
  439. // create couple files
  440. for (int i = 0; i < 500; ++i) {
  441. ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
  442. std::string(1000, 'a' + (i % 26))));
  443. }
  444. ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
  445. auto l0_files_1 = collector->GetFlushedFiles();
  446. CompactionOptions co;
  447. co.compression = CompressionType::kLZ4Compression;
  448. CompactionJobInfo compaction_job_info{};
  449. ASSERT_OK(
  450. db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));
  451. ASSERT_EQ(compaction_job_info.base_input_level, 0);
  452. ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());
  453. ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());
  454. ASSERT_EQ(compaction_job_info.compaction_reason,
  455. CompactionReason::kManualCompaction);
  456. ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);
  457. ASSERT_EQ(compaction_job_info.output_level, 0);
  458. ASSERT_OK(compaction_job_info.status);
  459. // no assertion failure
  460. delete db;
  461. }
  462. } // namespace ROCKSDB_NAMESPACE
  463. int main(int argc, char** argv) {
  464. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  465. ::testing::InitGoogleTest(&argc, argv);
  466. return RUN_ALL_TESTS();
  467. }