merge_helper_test.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/merge_helper.h"
  6. #include <algorithm>
  7. #include <string>
  8. #include <vector>
  9. #include "db/dbformat.h"
  10. #include "rocksdb/comparator.h"
  11. #include "test_util/testharness.h"
  12. #include "test_util/testutil.h"
  13. #include "util/coding.h"
  14. #include "util/vector_iterator.h"
  15. #include "utilities/merge_operators.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class MergeHelperTest : public testing::Test {
  18. public:
  19. MergeHelperTest() : icmp_(BytewiseComparator()) { env_ = Env::Default(); }
  20. ~MergeHelperTest() override = default;
  21. Status Run(SequenceNumber stop_before, bool at_bottom,
  22. SequenceNumber latest_snapshot = 0) {
  23. iter_.reset(new VectorIterator(ks_, vs_, &icmp_));
  24. iter_->SeekToFirst();
  25. merge_helper_.reset(new MergeHelper(env_, icmp_.user_comparator(),
  26. merge_op_.get(), filter_.get(), nullptr,
  27. false, latest_snapshot));
  28. return merge_helper_->MergeUntil(
  29. iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom,
  30. false /* allow_data_in_errors */, nullptr /* blob_fetcher */,
  31. nullptr /* full_history_ts_low */, nullptr /* prefetch_buffers */,
  32. nullptr /* c_iter_stats */);
  33. }
  34. void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
  35. const ValueType& t, const std::string& val,
  36. bool corrupt = false) {
  37. InternalKey ikey(user_key, seq, t);
  38. if (corrupt) {
  39. test::CorruptKeyType(&ikey);
  40. }
  41. ks_.push_back(ikey.Encode().ToString());
  42. vs_.push_back(val);
  43. }
  44. Env* env_;
  45. InternalKeyComparator icmp_;
  46. std::unique_ptr<VectorIterator> iter_;
  47. std::shared_ptr<MergeOperator> merge_op_;
  48. std::unique_ptr<MergeHelper> merge_helper_;
  49. std::vector<std::string> ks_;
  50. std::vector<std::string> vs_;
  51. std::unique_ptr<test::FilterNumber> filter_;
  52. };
  53. // If MergeHelper encounters a new key on the last level, we know that
  54. // the key has no more history and it can merge keys.
  55. TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
  56. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  57. AddKeyVal("a", 20, kTypeMerge, test::EncodeInt(1U));
  58. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(3U));
  59. AddKeyVal("b", 10, kTypeMerge, test::EncodeInt(4U)); // <- iter_ after merge
  60. ASSERT_TRUE(Run(0, true).ok());
  61. ASSERT_EQ(ks_[2], iter_->key());
  62. ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]);
  63. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  64. ASSERT_EQ(1U, merge_helper_->keys().size());
  65. ASSERT_EQ(1U, merge_helper_->values().size());
  66. }
  67. // Merging with a value results in a successful merge.
  68. TEST_F(MergeHelperTest, MergeValue) {
  69. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  70. AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(1U));
  71. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  72. AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); // <- iter_ after merge
  73. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
  74. ASSERT_TRUE(Run(0, false).ok());
  75. ASSERT_EQ(ks_[3], iter_->key());
  76. ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]);
  77. ASSERT_EQ(test::EncodeInt(8U), merge_helper_->values()[0]);
  78. ASSERT_EQ(1U, merge_helper_->keys().size());
  79. ASSERT_EQ(1U, merge_helper_->values().size());
  80. }
  81. // Merging stops before a snapshot.
  82. TEST_F(MergeHelperTest, SnapshotBeforeValue) {
  83. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  84. AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
  85. AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(3U)); // <- iter_ after merge
  86. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(1U));
  87. AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U));
  88. AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
  89. ASSERT_TRUE(Run(31, true).IsMergeInProgress());
  90. ASSERT_EQ(ks_[2], iter_->key());
  91. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
  92. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  93. ASSERT_EQ(1U, merge_helper_->keys().size());
  94. ASSERT_EQ(1U, merge_helper_->values().size());
  95. }
  96. // MergeHelper preserves the operand stack for merge operators that
  97. // cannot do a partial merge.
  98. TEST_F(MergeHelperTest, NoPartialMerge) {
  99. merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
  100. AddKeyVal("a", 50, kTypeMerge, "v2");
  101. AddKeyVal("a", 40, kTypeMerge, "v"); // <- iter_ after merge
  102. AddKeyVal("a", 30, kTypeMerge, "v");
  103. ASSERT_TRUE(Run(31, true).IsMergeInProgress());
  104. ASSERT_EQ(ks_[2], iter_->key());
  105. ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]);
  106. ASSERT_EQ("v", merge_helper_->values()[0]);
  107. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]);
  108. ASSERT_EQ("v2", merge_helper_->values()[1]);
  109. ASSERT_EQ(2U, merge_helper_->keys().size());
  110. ASSERT_EQ(2U, merge_helper_->values().size());
  111. }
  112. // A single operand can not be merged.
  113. TEST_F(MergeHelperTest, SingleOperand) {
  114. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  115. AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
  116. ASSERT_TRUE(Run(31, false).IsMergeInProgress());
  117. ASSERT_FALSE(iter_->Valid());
  118. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
  119. ASSERT_EQ(test::EncodeInt(1U), merge_helper_->values()[0]);
  120. ASSERT_EQ(1U, merge_helper_->keys().size());
  121. ASSERT_EQ(1U, merge_helper_->values().size());
  122. }
  123. // Merging with a deletion turns the deletion into a value
  124. TEST_F(MergeHelperTest, MergeDeletion) {
  125. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  126. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  127. AddKeyVal("a", 20, kTypeDeletion, "");
  128. ASSERT_TRUE(Run(15, false).ok());
  129. ASSERT_FALSE(iter_->Valid());
  130. ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]);
  131. ASSERT_EQ(test::EncodeInt(3U), merge_helper_->values()[0]);
  132. ASSERT_EQ(1U, merge_helper_->keys().size());
  133. ASSERT_EQ(1U, merge_helper_->values().size());
  134. }
  135. // The merge helper stops upon encountering a corrupt key
  136. TEST_F(MergeHelperTest, CorruptKey) {
  137. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  138. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  139. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(1U));
  140. // Corrupt key
  141. AddKeyVal("a", 20, kTypeDeletion, "", true); // <- iter_ after merge
  142. ASSERT_TRUE(Run(15, false).IsMergeInProgress());
  143. ASSERT_EQ(ks_[2], iter_->key());
  144. ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]);
  145. ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
  146. ASSERT_EQ(1U, merge_helper_->keys().size());
  147. ASSERT_EQ(1U, merge_helper_->values().size());
  148. }
  149. // The compaction filter is called on every merge operand
  150. TEST_F(MergeHelperTest, FilterMergeOperands) {
  151. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  152. filter_.reset(new test::FilterNumber(5U));
  153. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
  154. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); // Filtered
  155. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(3U));
  156. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(1U));
  157. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
  158. AddKeyVal("a", 25, kTypeValue, test::EncodeInt(1U));
  159. ASSERT_TRUE(Run(15, false).ok());
  160. ASSERT_FALSE(iter_->Valid());
  161. MergeOutputIterator merge_output_iter(merge_helper_.get());
  162. merge_output_iter.SeekToFirst();
  163. ASSERT_EQ(test::KeyStr("a", 30, kTypeValue),
  164. merge_output_iter.key().ToString());
  165. ASSERT_EQ(test::EncodeInt(8U), merge_output_iter.value().ToString());
  166. merge_output_iter.Next();
  167. ASSERT_FALSE(merge_output_iter.Valid());
  168. }
  169. TEST_F(MergeHelperTest, FilterAllMergeOperands) {
  170. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  171. filter_.reset(new test::FilterNumber(5U));
  172. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
  173. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U));
  174. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(5U));
  175. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(5U));
  176. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
  177. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
  178. // filtered out all
  179. ASSERT_TRUE(Run(15, false).ok());
  180. ASSERT_FALSE(iter_->Valid());
  181. MergeOutputIterator merge_output_iter(merge_helper_.get());
  182. merge_output_iter.SeekToFirst();
  183. ASSERT_FALSE(merge_output_iter.Valid());
  184. // we have one operand that will survive because it's a delete
  185. AddKeyVal("a", 24, kTypeDeletion, test::EncodeInt(5U));
  186. AddKeyVal("b", 23, kTypeValue, test::EncodeInt(5U));
  187. ASSERT_TRUE(Run(15, true).ok());
  188. merge_output_iter = MergeOutputIterator(merge_helper_.get());
  189. ASSERT_TRUE(iter_->Valid());
  190. merge_output_iter.SeekToFirst();
  191. ASSERT_FALSE(merge_output_iter.Valid());
  192. // when all merge operands are filtered out, we leave the iterator pointing to
  193. // the Put/Delete that survived
  194. ASSERT_EQ(test::KeyStr("a", 24, kTypeDeletion), iter_->key().ToString());
  195. ASSERT_EQ(test::EncodeInt(5U), iter_->value().ToString());
  196. }
  197. // Make sure that merge operands are filtered at the beginning
  198. TEST_F(MergeHelperTest, FilterFirstMergeOperand) {
  199. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  200. filter_.reset(new test::FilterNumber(5U));
  201. AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); // Filtered
  202. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); // Filtered
  203. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
  204. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
  205. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
  206. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
  207. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); // Filtered
  208. AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); // next user key
  209. ASSERT_OK(Run(15, true));
  210. ASSERT_TRUE(iter_->Valid());
  211. MergeOutputIterator merge_output_iter(merge_helper_.get());
  212. merge_output_iter.SeekToFirst();
  213. // sequence number is 29 here, because the first merge operand got filtered
  214. // out
  215. ASSERT_EQ(test::KeyStr("a", 29, kTypeValue),
  216. merge_output_iter.key().ToString());
  217. ASSERT_EQ(test::EncodeInt(6U), merge_output_iter.value().ToString());
  218. merge_output_iter.Next();
  219. ASSERT_FALSE(merge_output_iter.Valid());
  220. // make sure that we're passing user keys into the filter
  221. ASSERT_EQ("a", filter_->last_merge_operand_key());
  222. }
  223. // Make sure that merge operands are not filtered out if there's a snapshot
  224. // pointing at them
  225. TEST_F(MergeHelperTest, DontFilterMergeOperandsBeforeSnapshotTest) {
  226. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  227. filter_.reset(new test::FilterNumber(5U));
  228. AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U));
  229. AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
  230. AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
  231. AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
  232. AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
  233. AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
  234. AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
  235. AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U));
  236. ASSERT_OK(Run(15, true, 32));
  237. ASSERT_TRUE(iter_->Valid());
  238. MergeOutputIterator merge_output_iter(merge_helper_.get());
  239. merge_output_iter.SeekToFirst();
  240. ASSERT_EQ(test::KeyStr("a", 31, kTypeValue),
  241. merge_output_iter.key().ToString());
  242. ASSERT_EQ(test::EncodeInt(26U), merge_output_iter.value().ToString());
  243. merge_output_iter.Next();
  244. ASSERT_FALSE(merge_output_iter.Valid());
  245. }
  246. } // namespace ROCKSDB_NAMESPACE
  247. int main(int argc, char** argv) {
  248. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  249. ::testing::InitGoogleTest(&argc, argv);
  250. return RUN_ALL_TESTS();
  251. }