merge_helper_test.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <algorithm>
  6. #include <string>
  7. #include <vector>
  8. #include "db/merge_helper.h"
  9. #include "rocksdb/comparator.h"
  10. #include "test_util/testharness.h"
  11. #include "test_util/testutil.h"
  12. #include "util/coding.h"
  13. #include "utilities/merge_operators.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. class MergeHelperTest : public testing::Test {
  16. public:
  17. MergeHelperTest() { env_ = Env::Default(); }
  18. ~MergeHelperTest() override = default;
  19. Status Run(SequenceNumber stop_before, bool at_bottom,
  20. SequenceNumber latest_snapshot = 0) {
  21. iter_.reset(new test::VectorIterator(ks_, vs_));
  22. iter_->SeekToFirst();
  23. merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(),
  24. merge_op_.get(), filter_.get(), nullptr,
  25. false, latest_snapshot));
  26. return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */,
  27. stop_before, at_bottom);
  28. }
  29. void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
  30. const ValueType& t, const std::string& val,
  31. bool corrupt = false) {
  32. InternalKey ikey(user_key, seq, t);
  33. if (corrupt) {
  34. test::CorruptKeyType(&ikey);
  35. }
  36. ks_.push_back(ikey.Encode().ToString());
  37. vs_.push_back(val);
  38. }
  39. Env* env_;
  40. std::unique_ptr<test::VectorIterator> iter_;
  41. std::shared_ptr<MergeOperator> merge_op_;
  42. std::unique_ptr<MergeHelper> merge_helper_;
  43. std::vector<std::string> ks_;
  44. std::vector<std::string> vs_;
  45. std::unique_ptr<test::FilterNumber> filter_;
  46. };
  47. // If MergeHelper encounters a new key on the last level, we know that
  48. // the key has no more history and it can merge keys.
  49. TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
  50. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  51. AddKeyVal("a", 20, kTypeMerge, test::EncodeInt(1U));
  52. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(3U));
  53. AddKeyVal("b", 10, kTypeMerge, test::EncodeInt(4U)); // <- iter_ after merge
  54. ASSERT_TRUE(Run(0, true).ok());
  55. ASSERT_EQ(ks_[2], iter_->key());
  56. ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]);
  57. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  58. ASSERT_EQ(1U, merge_helper_->keys().size());
  59. ASSERT_EQ(1U, merge_helper_->values().size());
  60. }
  61. // Merging with a value results in a successful merge.
  62. TEST_F(MergeHelperTest, MergeValue) {
  63. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  64. AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(1U));
  65. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  66. AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); // <- iter_ after merge
  67. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
  68. ASSERT_TRUE(Run(0, false).ok());
  69. ASSERT_EQ(ks_[3], iter_->key());
  70. ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]);
  71. ASSERT_EQ(test::EncodeInt(8U), merge_helper_->values()[0]);
  72. ASSERT_EQ(1U, merge_helper_->keys().size());
  73. ASSERT_EQ(1U, merge_helper_->values().size());
  74. }
  75. // Merging stops before a snapshot.
  76. TEST_F(MergeHelperTest, SnapshotBeforeValue) {
  77. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  78. AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
  79. AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(3U)); // <- iter_ after merge
  80. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(1U));
  81. AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U));
  82. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
  83. ASSERT_TRUE(Run(31, true).IsMergeInProgress());
  84. ASSERT_EQ(ks_[2], iter_->key());
  85. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
  86. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  87. ASSERT_EQ(1U, merge_helper_->keys().size());
  88. ASSERT_EQ(1U, merge_helper_->values().size());
  89. }
  90. // MergeHelper preserves the operand stack for merge operators that
  91. // cannot do a partial merge.
  92. TEST_F(MergeHelperTest, NoPartialMerge) {
  93. merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
  94. AddKeyVal("a", 50, kTypeMerge, "v2");
  95. AddKeyVal("a", 40, kTypeMerge, "v"); // <- iter_ after merge
  96. AddKeyVal("a", 30, kTypeMerge, "v");
  97. ASSERT_TRUE(Run(31, true).IsMergeInProgress());
  98. ASSERT_EQ(ks_[2], iter_->key());
  99. ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]);
  100. ASSERT_EQ("v", merge_helper_->values()[0]);
  101. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]);
  102. ASSERT_EQ("v2", merge_helper_->values()[1]);
  103. ASSERT_EQ(2U, merge_helper_->keys().size());
  104. ASSERT_EQ(2U, merge_helper_->values().size());
  105. }
  106. // A single operand can not be merged.
  107. TEST_F(MergeHelperTest, SingleOperand) {
  108. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  109. AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
  110. ASSERT_TRUE(Run(31, false).IsMergeInProgress());
  111. ASSERT_FALSE(iter_->Valid());
  112. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
  113. ASSERT_EQ(test::EncodeInt(1U), merge_helper_->values()[0]);
  114. ASSERT_EQ(1U, merge_helper_->keys().size());
  115. ASSERT_EQ(1U, merge_helper_->values().size());
  116. }
  117. // Merging with a deletion turns the deletion into a value
  118. TEST_F(MergeHelperTest, MergeDeletion) {
  119. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  120. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  121. AddKeyVal("a", 20, kTypeDeletion, "");
  122. ASSERT_TRUE(Run(15, false).ok());
  123. ASSERT_FALSE(iter_->Valid());
  124. ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]);
  125. ASSERT_EQ(test::EncodeInt(3U), merge_helper_->values()[0]);
  126. ASSERT_EQ(1U, merge_helper_->keys().size());
  127. ASSERT_EQ(1U, merge_helper_->values().size());
  128. }
  129. // The merge helper stops upon encountering a corrupt key
  130. TEST_F(MergeHelperTest, CorruptKey) {
  131. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  132. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  133. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(1U));
  134. // Corrupt key
  135. AddKeyVal("a", 20, kTypeDeletion, "", true); // <- iter_ after merge
  136. ASSERT_TRUE(Run(15, false).IsMergeInProgress());
  137. ASSERT_EQ(ks_[2], iter_->key());
  138. ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]);
  139. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  140. ASSERT_EQ(1U, merge_helper_->keys().size());
  141. ASSERT_EQ(1U, merge_helper_->values().size());
  142. }
  143. // The compaction filter is called on every merge operand
  144. TEST_F(MergeHelperTest, FilterMergeOperands) {
  145. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  146. filter_.reset(new test::FilterNumber(5U));
  147. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  148. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); // Filtered
  149. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(3U));
  150. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(1U));
  151. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
  152. AddKeyVal("a", 25, kTypeValue, test::EncodeInt(1U));
  153. ASSERT_TRUE(Run(15, false).ok());
  154. ASSERT_FALSE(iter_->Valid());
  155. MergeOutputIterator merge_output_iter(merge_helper_.get());
  156. merge_output_iter.SeekToFirst();
  157. ASSERT_EQ(test::KeyStr("a", 30, kTypeValue),
  158. merge_output_iter.key().ToString());
  159. ASSERT_EQ(test::EncodeInt(8U), merge_output_iter.value().ToString());
  160. merge_output_iter.Next();
  161. ASSERT_FALSE(merge_output_iter.Valid());
  162. }
  163. TEST_F(MergeHelperTest, FilterAllMergeOperands) {
  164. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  165. filter_.reset(new test::FilterNumber(5U));
  166. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
  167. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U));
  168. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(5U));
  169. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(5U));
  170. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
  171. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
  172. // filtered out all
  173. ASSERT_TRUE(Run(15, false).ok());
  174. ASSERT_FALSE(iter_->Valid());
  175. MergeOutputIterator merge_output_iter(merge_helper_.get());
  176. merge_output_iter.SeekToFirst();
  177. ASSERT_FALSE(merge_output_iter.Valid());
  178. // we have one operand that will survive because it's a delete
  179. AddKeyVal("a", 24, kTypeDeletion, test::EncodeInt(5U));
  180. AddKeyVal("b", 23, kTypeValue, test::EncodeInt(5U));
  181. ASSERT_TRUE(Run(15, true).ok());
  182. merge_output_iter = MergeOutputIterator(merge_helper_.get());
  183. ASSERT_TRUE(iter_->Valid());
  184. merge_output_iter.SeekToFirst();
  185. ASSERT_FALSE(merge_output_iter.Valid());
  186. // when all merge operands are filtered out, we leave the iterator pointing to
  187. // the Put/Delete that survived
  188. ASSERT_EQ(test::KeyStr("a", 24, kTypeDeletion), iter_->key().ToString());
  189. ASSERT_EQ(test::EncodeInt(5U), iter_->value().ToString());
  190. }
  191. // Make sure that merge operands are filtered at the beginning
  192. TEST_F(MergeHelperTest, FilterFirstMergeOperand) {
  193. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  194. filter_.reset(new test::FilterNumber(5U));
  195. AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); // Filtered
  196. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); // Filtered
  197. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
  198. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
  199. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
  200. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
  201. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); // Filtered
  202. AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); // next user key
  203. ASSERT_OK(Run(15, true));
  204. ASSERT_TRUE(iter_->Valid());
  205. MergeOutputIterator merge_output_iter(merge_helper_.get());
  206. merge_output_iter.SeekToFirst();
  207. // sequence number is 29 here, because the first merge operand got filtered
  208. // out
  209. ASSERT_EQ(test::KeyStr("a", 29, kTypeValue),
  210. merge_output_iter.key().ToString());
  211. ASSERT_EQ(test::EncodeInt(6U), merge_output_iter.value().ToString());
  212. merge_output_iter.Next();
  213. ASSERT_FALSE(merge_output_iter.Valid());
  214. // make sure that we're passing user keys into the filter
  215. ASSERT_EQ("a", filter_->last_merge_operand_key());
  216. }
  217. // Make sure that merge operands are not filtered out if there's a snapshot
  218. // pointing at them
  219. TEST_F(MergeHelperTest, DontFilterMergeOperandsBeforeSnapshotTest) {
  220. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  221. filter_.reset(new test::FilterNumber(5U));
  222. AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U));
  223. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
  224. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
  225. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
  226. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
  227. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
  228. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
  229. AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U));
  230. ASSERT_OK(Run(15, true, 32));
  231. ASSERT_TRUE(iter_->Valid());
  232. MergeOutputIterator merge_output_iter(merge_helper_.get());
  233. merge_output_iter.SeekToFirst();
  234. ASSERT_EQ(test::KeyStr("a", 31, kTypeValue),
  235. merge_output_iter.key().ToString());
  236. ASSERT_EQ(test::EncodeInt(26U), merge_output_iter.value().ToString());
  237. merge_output_iter.Next();
  238. ASSERT_FALSE(merge_output_iter.Valid());
  239. }
  240. } // namespace ROCKSDB_NAMESPACE
  241. int main(int argc, char** argv) {
  242. ::testing::InitGoogleTest(&argc, argv);
  243. return RUN_ALL_TESTS();
  244. }