agg_merge_test.cc 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "rocksdb/utilities/agg_merge.h"
  6. #include <gtest/gtest.h>
  7. #include <memory>
  8. #include "db/db_test_util.h"
  9. #include "rocksdb/options.h"
  10. #include "test_util/testharness.h"
  11. #include "utilities/agg_merge/agg_merge_impl.h"
  12. #include "utilities/agg_merge/test_agg_merge.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. class AggMergeTest : public DBTestBase {
  15. public:
  16. AggMergeTest() : DBTestBase("agg_merge_db_test", /*env_do_fsync=*/true) {}
  17. };
  18. TEST_F(AggMergeTest, TestUsingMergeOperator) {
  19. ASSERT_OK(AddAggregator("sum", std::make_unique<SumAggregator>()));
  20. ASSERT_OK(AddAggregator("last3", std::make_unique<Last3Aggregator>()));
  21. ASSERT_OK(AddAggregator("mul", std::make_unique<MultipleAggregator>()));
  22. Options options = CurrentOptions();
  23. options.merge_operator = GetAggMergeOperator();
  24. Reopen(options);
  25. std::string v = EncodeHelper::EncodeFuncAndInt("sum", 10);
  26. ASSERT_OK(Merge("foo", v));
  27. v = EncodeHelper::EncodeFuncAndInt("sum", 20);
  28. ASSERT_OK(Merge("foo", v));
  29. v = EncodeHelper::EncodeFuncAndInt("sum", 15);
  30. ASSERT_OK(Merge("foo", v));
  31. v = EncodeHelper::EncodeFuncAndList("last3", {"a", "b"});
  32. ASSERT_OK(Merge("bar", v));
  33. v = EncodeHelper::EncodeFuncAndList("last3", {"c", "d", "e"});
  34. ASSERT_OK(Merge("bar", v));
  35. ASSERT_OK(Flush());
  36. v = EncodeHelper::EncodeFuncAndList("last3", {"f"});
  37. ASSERT_OK(Merge("bar", v));
  38. // Test Put() without aggregation type.
  39. v = EncodeHelper::EncodeFuncAndInt(kUnnamedFuncName, 30);
  40. ASSERT_OK(Put("foo2", v));
  41. v = EncodeHelper::EncodeFuncAndInt("sum", 10);
  42. ASSERT_OK(Merge("foo2", v));
  43. v = EncodeHelper::EncodeFuncAndInt("sum", 20);
  44. ASSERT_OK(Merge("foo2", v));
  45. EXPECT_EQ(EncodeHelper::EncodeFuncAndInt("sum", 45), Get("foo"));
  46. EXPECT_EQ(EncodeHelper::EncodeFuncAndList("last3", {"f", "c", "d"}),
  47. Get("bar"));
  48. EXPECT_EQ(EncodeHelper::EncodeFuncAndInt("sum", 60), Get("foo2"));
  49. // Test changing aggregation type
  50. v = EncodeHelper::EncodeFuncAndInt("mul", 10);
  51. ASSERT_OK(Put("bar2", v));
  52. v = EncodeHelper::EncodeFuncAndInt("mul", 20);
  53. ASSERT_OK(Merge("bar2", v));
  54. v = EncodeHelper::EncodeFuncAndInt("sum", 30);
  55. ASSERT_OK(Merge("bar2", v));
  56. v = EncodeHelper::EncodeFuncAndInt("sum", 40);
  57. ASSERT_OK(Merge("bar2", v));
  58. EXPECT_EQ(EncodeHelper::EncodeFuncAndInt("sum", 10 * 20 + 30 + 40),
  59. Get("bar2"));
  60. // Changing aggregation type with partial merge
  61. v = EncodeHelper::EncodeFuncAndInt("mul", 10);
  62. ASSERT_OK(Merge("foo3", v));
  63. ASSERT_OK(Flush());
  64. v = EncodeHelper::EncodeFuncAndInt("mul", 10);
  65. ASSERT_OK(Merge("foo3", v));
  66. v = EncodeHelper::EncodeFuncAndInt("mul", 10);
  67. ASSERT_OK(Merge("foo3", v));
  68. v = EncodeHelper::EncodeFuncAndInt("sum", 10);
  69. ASSERT_OK(Merge("foo3", v));
  70. ASSERT_OK(Flush());
  71. EXPECT_EQ(EncodeHelper::EncodeFuncAndInt("sum", 10 * 10 * 10 + 10),
  72. Get("foo3"));
  73. // Merge after full merge
  74. v = EncodeHelper::EncodeFuncAndInt("sum", 1);
  75. ASSERT_OK(Merge("foo4", v));
  76. v = EncodeHelper::EncodeFuncAndInt("sum", 2);
  77. ASSERT_OK(Merge("foo4", v));
  78. ASSERT_OK(Flush());
  79. v = EncodeHelper::EncodeFuncAndInt("sum", 3);
  80. ASSERT_OK(Merge("foo4", v));
  81. v = EncodeHelper::EncodeFuncAndInt("sum", 4);
  82. ASSERT_OK(Merge("foo4", v));
  83. ASSERT_OK(Flush());
  84. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  85. v = EncodeHelper::EncodeFuncAndInt("sum", 5);
  86. ASSERT_OK(Merge("foo4", v));
  87. EXPECT_EQ(EncodeHelper::EncodeFuncAndInt("sum", 15), Get("foo4"));
  88. // Test unregistered function name
  89. v = EncodeAggFuncAndPayloadNoCheck("non_existing", "1");
  90. ASSERT_OK(Merge("bar3", v));
  91. std::string v1;
  92. v1 = EncodeAggFuncAndPayloadNoCheck("non_existing", "invalid");
  93. ;
  94. ASSERT_OK(Merge("bar3", v1));
  95. EXPECT_EQ(EncodeAggFuncAndPayloadNoCheck(kErrorFuncName,
  96. EncodeHelper::EncodeList({v, v1})),
  97. Get("bar3"));
  98. // invalidate input
  99. ASSERT_OK(EncodeAggFuncAndPayload("sum", "invalid", v));
  100. ASSERT_OK(Merge("bar4", v));
  101. v1 = EncodeHelper::EncodeFuncAndInt("sum", 20);
  102. ASSERT_OK(Merge("bar4", v1));
  103. std::string aggregated_value = Get("bar4");
  104. Slice func, payload;
  105. ASSERT_TRUE(ExtractAggFuncAndValue(aggregated_value, func, payload));
  106. EXPECT_EQ(kErrorFuncName, func);
  107. std::vector<Slice> decoded_list;
  108. ASSERT_TRUE(ExtractList(payload, decoded_list));
  109. ASSERT_EQ(2, decoded_list.size());
  110. ASSERT_EQ(v, decoded_list[0]);
  111. ASSERT_EQ(v1, decoded_list[1]);
  112. }
  113. } // namespace ROCKSDB_NAMESPACE
  114. int main(int argc, char** argv) {
  115. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  116. ::testing::InitGoogleTest(&argc, argv);
  117. return RUN_ALL_TESTS();
  118. }