db_with_timestamp_compaction_test.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/compaction/compaction.h"
  10. #include "db/db_test_util.h"
  11. #include "port/stack_trace.h"
  12. #include "test_util/testutil.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. namespace {
  15. std::string Key1(uint64_t key) {
  16. std::string ret;
  17. PutFixed64(&ret, key);
  18. std::reverse(ret.begin(), ret.end());
  19. return ret;
  20. }
  21. std::string Timestamp(uint64_t ts) {
  22. std::string ret;
  23. PutFixed64(&ret, ts);
  24. return ret;
  25. }
  26. } // anonymous namespace
  27. class TimestampCompatibleCompactionTest : public DBTestBase {
  28. public:
  29. TimestampCompatibleCompactionTest()
  30. : DBTestBase("ts_compatible_compaction_test", /*env_do_fsync=*/true) {}
  31. std::string Get(const std::string& key, uint64_t ts) {
  32. ReadOptions read_opts;
  33. std::string ts_str = Timestamp(ts);
  34. Slice ts_slice = ts_str;
  35. read_opts.timestamp = &ts_slice;
  36. std::string value;
  37. Status s = db_->Get(read_opts, key, &value);
  38. if (s.IsNotFound()) {
  39. value.assign("NOT_FOUND");
  40. } else if (!s.ok()) {
  41. value.assign(s.ToString());
  42. }
  43. return value;
  44. }
  45. };
  46. TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) {
  47. Options options = CurrentOptions();
  48. options.env = env_;
  49. options.compaction_style = kCompactionStyleLevel;
  50. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  51. options.level0_file_num_compaction_trigger = 3;
  52. constexpr size_t kNumKeysPerFile = 101;
  53. options.memtable_factory.reset(
  54. test::NewSpecialSkipListFactory(kNumKeysPerFile));
  55. DestroyAndReopen(options);
  56. SyncPoint::GetInstance()->DisableProcessing();
  57. SyncPoint::GetInstance()->ClearAllCallBacks();
  58. SyncPoint::GetInstance()->SetCallBack(
  59. "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
  60. const auto* compaction = static_cast<Compaction*>(arg);
  61. ASSERT_NE(nullptr, compaction);
  62. ASSERT_EQ(0, compaction->start_level());
  63. ASSERT_EQ(1, compaction->num_input_levels());
  64. // Check that all 3 L0 ssts are picked for level compaction.
  65. ASSERT_EQ(3, compaction->num_input_files(0));
  66. });
  67. SyncPoint::GetInstance()->EnableProcessing();
  68. // Write a L0 with keys 0, 1, ..., 99 with ts from 100 to 199.
  69. uint64_t ts = 100;
  70. uint64_t key = 0;
  71. WriteOptions write_opts;
  72. for (; key < kNumKeysPerFile - 1; ++key, ++ts) {
  73. std::string ts_str = Timestamp(ts);
  74. ASSERT_OK(
  75. db_->Put(write_opts, Key1(key), ts_str, "foo_" + std::to_string(key)));
  76. }
  77. // Write another L0 with keys 99 with newer ts.
  78. ASSERT_OK(Flush());
  79. uint64_t saved_read_ts1 = ts++;
  80. key = 99;
  81. for (int i = 0; i < 4; ++i, ++ts) {
  82. std::string ts_str = Timestamp(ts);
  83. ASSERT_OK(
  84. db_->Put(write_opts, Key1(key), ts_str, "bar_" + std::to_string(key)));
  85. }
  86. ASSERT_OK(Flush());
  87. uint64_t saved_read_ts2 = ts++;
  88. // Write another L0 with keys 99, 100, 101, ..., 150
  89. for (; key <= 150; ++key, ++ts) {
  90. std::string ts_str = Timestamp(ts);
  91. ASSERT_OK(
  92. db_->Put(write_opts, Key1(key), ts_str, "foo1_" + std::to_string(key)));
  93. }
  94. ASSERT_OK(Flush());
  95. // Wait for compaction to finish
  96. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  97. uint64_t read_ts = ts;
  98. ASSERT_EQ("foo_99", Get(Key1(99), saved_read_ts1));
  99. ASSERT_EQ("bar_99", Get(Key1(99), saved_read_ts2));
  100. ASSERT_EQ("foo1_99", Get(Key1(99), read_ts));
  101. SyncPoint::GetInstance()->ClearAllCallBacks();
  102. SyncPoint::GetInstance()->DisableProcessing();
  103. }
  104. TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) {
  105. Options options = CurrentOptions();
  106. options.env = env_;
  107. options.compaction_style = kCompactionStyleUniversal;
  108. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  109. options.level0_file_num_compaction_trigger = 3;
  110. options.max_subcompactions = 3;
  111. options.target_file_size_base = 1024;
  112. options.statistics = CreateDBStatistics();
  113. DestroyAndReopen(options);
  114. uint64_t ts = 100;
  115. uint64_t key = 0;
  116. WriteOptions write_opts;
  117. // Write keys 0, 1, ..., 499 with ts from 100 to 599.
  118. {
  119. for (; key <= 499; ++key, ++ts) {
  120. std::string ts_str = Timestamp(ts);
  121. ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
  122. "foo_" + std::to_string(key)));
  123. }
  124. }
  125. // Write keys 500, ..., 999 with ts from 600 to 1099.
  126. {
  127. for (; key <= 999; ++key, ++ts) {
  128. std::string ts_str = Timestamp(ts);
  129. ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
  130. "foo_" + std::to_string(key)));
  131. }
  132. ASSERT_OK(Flush());
  133. }
  134. // Wait for compaction to finish
  135. {
  136. ASSERT_OK(dbfull()->RunManualCompaction(
  137. static_cast_with_check<ColumnFamilyHandleImpl>(
  138. db_->DefaultColumnFamily())
  139. ->cfd(),
  140. 0 /* input_level */, 1 /* output_level */, CompactRangeOptions(),
  141. nullptr /* begin */, nullptr /* end */, true /* exclusive */,
  142. true /* disallow_trivial_move */,
  143. std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */,
  144. "" /*trim_ts*/));
  145. }
  146. // Check stats to make sure multiple subcompactions were scheduled for
  147. // boundaries not to be nullptr.
  148. {
  149. HistogramData num_sub_compactions;
  150. options.statistics->histogramData(NUM_SUBCOMPACTIONS_SCHEDULED,
  151. &num_sub_compactions);
  152. ASSERT_GT(num_sub_compactions.sum, 1);
  153. }
  154. for (key = 0; key <= 999; ++key) {
  155. ASSERT_EQ("foo_" + std::to_string(key), Get(Key1(key), ts));
  156. }
  157. }
  158. class TestFilePartitioner : public SstPartitioner {
  159. public:
  160. explicit TestFilePartitioner() = default;
  161. ~TestFilePartitioner() override = default;
  162. const char* Name() const override { return "TestFilePartitioner"; }
  163. PartitionerResult ShouldPartition(
  164. const PartitionerRequest& /*request*/) override {
  165. return PartitionerResult::kRequired;
  166. }
  167. bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
  168. const Slice& /*largest_user_key*/) override {
  169. return false;
  170. }
  171. };
  172. class TestFilePartitionerFactory : public SstPartitionerFactory {
  173. public:
  174. explicit TestFilePartitionerFactory() = default;
  175. std::unique_ptr<SstPartitioner> CreatePartitioner(
  176. const SstPartitioner::Context& /*context*/) const override {
  177. std::unique_ptr<SstPartitioner> ret =
  178. std::make_unique<TestFilePartitioner>();
  179. return ret;
  180. }
  181. const char* Name() const override { return "TestFilePartitionerFactory"; }
  182. };
  183. TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL0) {
  184. Options options = CurrentOptions();
  185. options.env = env_;
  186. options.sst_partitioner_factory =
  187. std::make_shared<TestFilePartitionerFactory>();
  188. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  189. options.disable_auto_compactions = true;
  190. DestroyAndReopen(options);
  191. constexpr int kNumFiles = 10;
  192. constexpr int kKeysPerFile = 2;
  193. const std::string user_key = "foo";
  194. constexpr uint64_t start_ts = 10000;
  195. uint64_t cur_ts = start_ts;
  196. for (int k = 0; k < kNumFiles; ++k) {
  197. for (int i = 0; i < kKeysPerFile; ++i) {
  198. ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
  199. "v" + std::to_string(i)));
  200. ++cur_ts;
  201. }
  202. ASSERT_OK(db_->Flush(FlushOptions()));
  203. }
  204. std::vector<std::string> input_files{};
  205. {
  206. std::vector<std::string> files;
  207. ASSERT_OK(env_->GetChildren(dbname_, &files));
  208. for (const auto& f : files) {
  209. uint64_t file_num = 0;
  210. FileType file_type = FileType::kWalFile;
  211. if (!ParseFileName(f, &file_num, &file_type) ||
  212. file_type != FileType::kTableFile) {
  213. continue;
  214. }
  215. input_files.emplace_back(f);
  216. }
  217. // sorting here by name, which also happens to sort by generation date.
  218. std::sort(input_files.begin(), input_files.end());
  219. assert(kNumFiles == input_files.size());
  220. std::vector<std::string> tmp;
  221. tmp.emplace_back(input_files[input_files.size() / 2]);
  222. input_files.swap(tmp);
  223. }
  224. {
  225. std::vector<std::string> output_file_names;
  226. CompactionJobInfo compaction_job_info;
  227. ASSERT_OK(db_->CompactFiles(CompactionOptions(), input_files,
  228. /*output_level=*/1, /*output_path_id=*/-1,
  229. &output_file_names, &compaction_job_info));
  230. // We expect the L0 files older than the original provided input were all
  231. // included in the compaction.
  232. ASSERT_EQ(static_cast<size_t>(kNumFiles / 2 + 1),
  233. compaction_job_info.input_files.size());
  234. }
  235. }
  236. TEST_F(TimestampCompatibleCompactionTest, CompactFilesRangeCheckL1) {
  237. Options options = CurrentOptions();
  238. options.env = env_;
  239. options.sst_partitioner_factory =
  240. std::make_shared<TestFilePartitionerFactory>();
  241. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  242. constexpr int kNumFiles = 4;
  243. options.level0_file_num_compaction_trigger = kNumFiles;
  244. DestroyAndReopen(options);
  245. constexpr int kKeysPerFile = 2;
  246. const std::string user_key = "foo";
  247. constexpr uint64_t start_ts = 10000;
  248. uint64_t cur_ts = start_ts;
  249. // Generate some initial files in both L0 and L1.
  250. for (int k = 0; k < kNumFiles; ++k) {
  251. for (int i = 0; i < kKeysPerFile; ++i) {
  252. ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts),
  253. "v" + std::to_string(i)));
  254. ++cur_ts;
  255. }
  256. ASSERT_OK(db_->Flush(FlushOptions()));
  257. }
  258. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  259. ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
  260. ASSERT_EQ(kNumFiles * kKeysPerFile,
  261. NumTableFilesAtLevel(/*level=*/1, /*cf=*/0));
  262. constexpr int additional_l0s = 2;
  263. for (int i = 0; i < additional_l0s; ++i, ++cur_ts) {
  264. ASSERT_OK(db_->Put(WriteOptions(), user_key, Timestamp(cur_ts), "v"));
  265. ASSERT_OK(db_->Flush(FlushOptions()));
  266. }
  267. ASSERT_EQ(additional_l0s, NumTableFilesAtLevel(/*level=*/0, /*cf=*/0));
  268. std::vector<std::string> inputs;
  269. {
  270. std::vector<LiveFileMetaData> fmetas;
  271. db_->GetLiveFilesMetaData(&fmetas);
  272. bool included_one_l1 = false;
  273. for (const auto& meta : fmetas) {
  274. if (meta.level == 0) {
  275. inputs.emplace_back(meta.relative_filename);
  276. } else if (!included_one_l1) {
  277. inputs.emplace_back(meta.relative_filename);
  278. included_one_l1 = true;
  279. }
  280. }
  281. }
  282. ASSERT_EQ(static_cast<size_t>(3), inputs.size());
  283. {
  284. std::vector<std::string> output_file_names;
  285. CompactionJobInfo compaction_job_info;
  286. ASSERT_OK(db_->CompactFiles(CompactionOptions(), inputs, /*output_level=*/1,
  287. /*output_path_id=*/-1, &output_file_names,
  288. &compaction_job_info));
  289. ASSERT_EQ(kNumFiles * kKeysPerFile + 2, output_file_names.size());
  290. ASSERT_EQ(kNumFiles * kKeysPerFile + 2,
  291. static_cast<int>(compaction_job_info.input_files.size()));
  292. }
  293. }
  294. TEST_F(TimestampCompatibleCompactionTest, EmptyCompactionOutput) {
  295. Options options = CurrentOptions();
  296. options.env = env_;
  297. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  298. DestroyAndReopen(options);
  299. std::string ts_str = Timestamp(1);
  300. WriteOptions wopts;
  301. ASSERT_OK(
  302. db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str));
  303. ASSERT_OK(Flush());
  304. ts_str = Timestamp(3);
  305. Slice ts = ts_str;
  306. CompactRangeOptions cro;
  307. // range tombstone will be dropped during compaction
  308. cro.full_history_ts_low = &ts;
  309. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  310. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  311. }
  312. } // namespace ROCKSDB_NAMESPACE
  313. int main(int argc, char** argv) {
  314. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  315. ::testing::InitGoogleTest(&argc, argv);
  316. return RUN_ALL_TESTS();
  317. }