compact_files_test.cc 13 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include <mutex>
  7. #include <string>
  8. #include <thread>
  9. #include <vector>
  10. #include "db/db_impl/db_impl.h"
  11. #include "port/port.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/env.h"
  14. #include "test_util/sync_point.h"
  15. #include "test_util/testharness.h"
  16. #include "util/string_util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class CompactFilesTest : public testing::Test {
  19. public:
  20. CompactFilesTest() {
  21. env_ = Env::Default();
  22. db_name_ = test::PerThreadDBPath("compact_files_test");
  23. }
  24. std::string db_name_;
  25. Env* env_;
  26. };
  27. // A class which remembers the name of each flushed file.
  28. class FlushedFileCollector : public EventListener {
  29. public:
  30. FlushedFileCollector() {}
  31. ~FlushedFileCollector() override {}
  32. void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
  33. std::lock_guard<std::mutex> lock(mutex_);
  34. flushed_files_.push_back(info.file_path);
  35. }
  36. std::vector<std::string> GetFlushedFiles() {
  37. std::lock_guard<std::mutex> lock(mutex_);
  38. std::vector<std::string> result;
  39. for (auto fname : flushed_files_) {
  40. result.push_back(fname);
  41. }
  42. return result;
  43. }
  44. void ClearFlushedFiles() {
  45. std::lock_guard<std::mutex> lock(mutex_);
  46. flushed_files_.clear();
  47. }
  48. private:
  49. std::vector<std::string> flushed_files_;
  50. std::mutex mutex_;
  51. };
  52. TEST_F(CompactFilesTest, L0ConflictsFiles) {
  53. Options options;
  54. // to trigger compaction more easily
  55. const int kWriteBufferSize = 10000;
  56. const int kLevel0Trigger = 2;
  57. options.create_if_missing = true;
  58. options.compaction_style = kCompactionStyleLevel;
  59. // Small slowdown and stop trigger for experimental purpose.
  60. options.level0_slowdown_writes_trigger = 20;
  61. options.level0_stop_writes_trigger = 20;
  62. options.level0_stop_writes_trigger = 20;
  63. options.write_buffer_size = kWriteBufferSize;
  64. options.level0_file_num_compaction_trigger = kLevel0Trigger;
  65. options.compression = kNoCompression;
  66. DB* db = nullptr;
  67. DestroyDB(db_name_, options);
  68. Status s = DB::Open(options, db_name_, &db);
  69. assert(s.ok());
  70. assert(db);
  71. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  72. {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},
  73. {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},
  74. });
  75. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  76. // create couple files
  77. // Background compaction starts and waits in BackgroundCallCompaction:0
  78. for (int i = 0; i < kLevel0Trigger * 4; ++i) {
  79. db->Put(WriteOptions(), ToString(i), "");
  80. db->Put(WriteOptions(), ToString(100 - i), "");
  81. db->Flush(FlushOptions());
  82. }
  83. ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
  84. db->GetColumnFamilyMetaData(&meta);
  85. std::string file1;
  86. for (auto& file : meta.levels[0].files) {
  87. ASSERT_EQ(0, meta.levels[0].level);
  88. if (file1 == "") {
  89. file1 = file.db_path + "/" + file.name;
  90. } else {
  91. std::string file2 = file.db_path + "/" + file.name;
  92. // Another thread starts a compact files and creates an L0 compaction
  93. // The background compaction then notices that there is an L0 compaction
  94. // already in progress and doesn't do an L0 compaction
  95. // Once the background compaction finishes, the compact files finishes
  96. ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
  97. {file1, file2}, 0));
  98. break;
  99. }
  100. }
  101. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  102. delete db;
  103. }
  104. TEST_F(CompactFilesTest, ObsoleteFiles) {
  105. Options options;
  106. // to trigger compaction more easily
  107. const int kWriteBufferSize = 65536;
  108. options.create_if_missing = true;
  109. // Disable RocksDB background compaction.
  110. options.compaction_style = kCompactionStyleNone;
  111. options.level0_slowdown_writes_trigger = (1 << 30);
  112. options.level0_stop_writes_trigger = (1 << 30);
  113. options.write_buffer_size = kWriteBufferSize;
  114. options.max_write_buffer_number = 2;
  115. options.compression = kNoCompression;
  116. // Add listener
  117. FlushedFileCollector* collector = new FlushedFileCollector();
  118. options.listeners.emplace_back(collector);
  119. DB* db = nullptr;
  120. DestroyDB(db_name_, options);
  121. Status s = DB::Open(options, db_name_, &db);
  122. assert(s.ok());
  123. assert(db);
  124. // create couple files
  125. for (int i = 1000; i < 2000; ++i) {
  126. db->Put(WriteOptions(), ToString(i),
  127. std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
  128. }
  129. auto l0_files = collector->GetFlushedFiles();
  130. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
  131. reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();
  132. // verify all compaction input files are deleted
  133. for (auto fname : l0_files) {
  134. ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));
  135. }
  136. delete db;
  137. }
  138. TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
  139. Options options;
  140. options.create_if_missing = true;
  141. // Disable RocksDB background compaction.
  142. options.compaction_style = kCompactionStyleNone;
  143. options.level0_slowdown_writes_trigger = 1000;
  144. options.level0_stop_writes_trigger = 1000;
  145. options.write_buffer_size = 65536;
  146. options.max_write_buffer_number = 2;
  147. options.compression = kNoCompression;
  148. options.max_compaction_bytes = 5000;
  149. // Add listener
  150. FlushedFileCollector* collector = new FlushedFileCollector();
  151. options.listeners.emplace_back(collector);
  152. DB* db = nullptr;
  153. DestroyDB(db_name_, options);
  154. Status s = DB::Open(options, db_name_, &db);
  155. assert(s.ok());
  156. assert(db);
  157. // create couple files
  158. for (int i = 0; i < 500; ++i) {
  159. db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
  160. }
  161. reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
  162. auto l0_files_1 = collector->GetFlushedFiles();
  163. collector->ClearFlushedFiles();
  164. for (int i = 0; i < 500; ++i) {
  165. db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
  166. }
  167. reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
  168. auto l0_files_2 = collector->GetFlushedFiles();
  169. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
  170. ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
  171. // no assertion failure
  172. delete db;
  173. }
  174. TEST_F(CompactFilesTest, CapturingPendingFiles) {
  175. Options options;
  176. options.create_if_missing = true;
  177. // Disable RocksDB background compaction.
  178. options.compaction_style = kCompactionStyleNone;
  179. // Always do full scans for obsolete files (needed to reproduce the issue).
  180. options.delete_obsolete_files_period_micros = 0;
  181. // Add listener.
  182. FlushedFileCollector* collector = new FlushedFileCollector();
  183. options.listeners.emplace_back(collector);
  184. DB* db = nullptr;
  185. DestroyDB(db_name_, options);
  186. Status s = DB::Open(options, db_name_, &db);
  187. assert(s.ok());
  188. assert(db);
  189. // Create 5 files.
  190. for (int i = 0; i < 5; ++i) {
  191. db->Put(WriteOptions(), "key" + ToString(i), "value");
  192. db->Flush(FlushOptions());
  193. }
  194. auto l0_files = collector->GetFlushedFiles();
  195. EXPECT_EQ(5, l0_files.size());
  196. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  197. {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
  198. {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
  199. });
  200. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  201. // Start compacting files.
  202. ROCKSDB_NAMESPACE::port::Thread compaction_thread(
  203. [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
  204. // In the meantime flush another file.
  205. TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
  206. db->Put(WriteOptions(), "key5", "value");
  207. db->Flush(FlushOptions());
  208. TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
  209. compaction_thread.join();
  210. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  211. delete db;
  212. // Make sure we can reopen the DB.
  213. s = DB::Open(options, db_name_, &db);
  214. ASSERT_TRUE(s.ok());
  215. assert(db);
  216. delete db;
  217. }
  218. TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
  219. class FilterWithGet : public CompactionFilter {
  220. public:
  221. bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
  222. std::string* /*new_value*/,
  223. bool* /*value_changed*/) const override {
  224. if (db_ == nullptr) {
  225. return true;
  226. }
  227. std::string res;
  228. db_->Get(ReadOptions(), "", &res);
  229. return true;
  230. }
  231. void SetDB(DB* db) {
  232. db_ = db;
  233. }
  234. const char* Name() const override { return "FilterWithGet"; }
  235. private:
  236. DB* db_;
  237. };
  238. std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
  239. Options options;
  240. options.create_if_missing = true;
  241. options.compaction_filter = cf.get();
  242. DB* db = nullptr;
  243. DestroyDB(db_name_, options);
  244. Status s = DB::Open(options, db_name_, &db);
  245. ASSERT_OK(s);
  246. cf->SetDB(db);
  247. // Write one L0 file
  248. db->Put(WriteOptions(), "K1", "V1");
  249. db->Flush(FlushOptions());
  250. // Compact all L0 files using CompactFiles
  251. ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
  252. db->GetColumnFamilyMetaData(&meta);
  253. for (auto& file : meta.levels[0].files) {
  254. std::string fname = file.db_path + "/" + file.name;
  255. ASSERT_OK(
  256. db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
  257. }
  258. delete db;
  259. }
  260. TEST_F(CompactFilesTest, SentinelCompressionType) {
  261. if (!Zlib_Supported()) {
  262. fprintf(stderr, "zlib compression not supported, skip this test\n");
  263. return;
  264. }
  265. if (!Snappy_Supported()) {
  266. fprintf(stderr, "snappy compression not supported, skip this test\n");
  267. return;
  268. }
  269. // Check that passing `CompressionType::kDisableCompressionOption` to
  270. // `CompactFiles` causes it to use the column family compression options.
  271. for (auto compaction_style :
  272. {CompactionStyle::kCompactionStyleLevel,
  273. CompactionStyle::kCompactionStyleUniversal,
  274. CompactionStyle::kCompactionStyleNone}) {
  275. DestroyDB(db_name_, Options());
  276. Options options;
  277. options.compaction_style = compaction_style;
  278. // L0: Snappy, L1: ZSTD, L2: Snappy
  279. options.compression_per_level = {CompressionType::kSnappyCompression,
  280. CompressionType::kZlibCompression,
  281. CompressionType::kSnappyCompression};
  282. options.create_if_missing = true;
  283. FlushedFileCollector* collector = new FlushedFileCollector();
  284. options.listeners.emplace_back(collector);
  285. DB* db = nullptr;
  286. ASSERT_OK(DB::Open(options, db_name_, &db));
  287. db->Put(WriteOptions(), "key", "val");
  288. db->Flush(FlushOptions());
  289. auto l0_files = collector->GetFlushedFiles();
  290. ASSERT_EQ(1, l0_files.size());
  291. // L0->L1 compaction, so output should be ZSTD-compressed
  292. CompactionOptions compaction_opts;
  293. compaction_opts.compression = CompressionType::kDisableCompressionOption;
  294. ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
  295. ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;
  296. ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
  297. for (const auto& name_and_table_props : all_tables_props) {
  298. ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
  299. name_and_table_props.second->compression_name);
  300. }
  301. delete db;
  302. }
  303. }
  304. TEST_F(CompactFilesTest, GetCompactionJobInfo) {
  305. Options options;
  306. options.create_if_missing = true;
  307. // Disable RocksDB background compaction.
  308. options.compaction_style = kCompactionStyleNone;
  309. options.level0_slowdown_writes_trigger = 1000;
  310. options.level0_stop_writes_trigger = 1000;
  311. options.write_buffer_size = 65536;
  312. options.max_write_buffer_number = 2;
  313. options.compression = kNoCompression;
  314. options.max_compaction_bytes = 5000;
  315. // Add listener
  316. FlushedFileCollector* collector = new FlushedFileCollector();
  317. options.listeners.emplace_back(collector);
  318. DB* db = nullptr;
  319. DestroyDB(db_name_, options);
  320. Status s = DB::Open(options, db_name_, &db);
  321. assert(s.ok());
  322. assert(db);
  323. // create couple files
  324. for (int i = 0; i < 500; ++i) {
  325. db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
  326. }
  327. reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
  328. auto l0_files_1 = collector->GetFlushedFiles();
  329. CompactionOptions co;
  330. co.compression = CompressionType::kLZ4Compression;
  331. CompactionJobInfo compaction_job_info{};
  332. ASSERT_OK(
  333. db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));
  334. ASSERT_EQ(compaction_job_info.base_input_level, 0);
  335. ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());
  336. ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());
  337. ASSERT_EQ(compaction_job_info.compaction_reason,
  338. CompactionReason::kManualCompaction);
  339. ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);
  340. ASSERT_EQ(compaction_job_info.output_level, 0);
  341. ASSERT_OK(compaction_job_info.status);
  342. // no assertion failure
  343. delete db;
  344. }
  345. } // namespace ROCKSDB_NAMESPACE
  346. int main(int argc, char** argv) {
  347. ::testing::InitGoogleTest(&argc, argv);
  348. return RUN_ALL_TESTS();
  349. }
  350. #else
  351. #include <stdio.h>
  352. int main(int /*argc*/, char** /*argv*/) {
  353. fprintf(stderr,
  354. "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");
  355. return 0;
  356. }
  357. #endif // !ROCKSDB_LITE