| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/db_test_util.h"
- #include "port/stack_trace.h"
- #include "rocksdb/merge_operator.h"
- #include "rocksdb/perf_context.h"
- #include "rocksdb/utilities/debug.h"
- #include "table/block_based/block_builder.h"
- #include "test_util/sync_point.h"
- #include "utilities/fault_injection_env.h"
- #include "utilities/merge_operators.h"
- #include "utilities/merge_operators/sortlist.h"
- #include "utilities/merge_operators/string_append/stringappend2.h"
- namespace ROCKSDB_NAMESPACE {
- namespace {
- class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
- public:
- LimitedStringAppendMergeOp(int limit, char delim)
- : StringAppendTESTOperator(delim), limit_(limit) {}
- const char* Name() const override {
- return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
- }
- bool ShouldMerge(const std::vector<Slice>& operands) const override {
- if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
- return true;
- }
- return false;
- }
- private:
- size_t limit_ = 0;
- };
- } // anonymous namespace
- class DBMergeOperandTest : public DBTestBase {
- public:
- DBMergeOperandTest()
- : DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
- };
- TEST_F(DBMergeOperandTest, CacheEvictedMergeOperandReadAfterFreeBug) {
- // There was a bug of reading merge operands after they are mistakely freed
- // in DB::GetMergeOperands, which is surfaced by cache full.
- // See PR#9507 for more.
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- BlockBasedTableOptions table_options;
- // Small cache to simulate cache full
- table_options.block_cache = NewLRUCache(1);
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- Reopen(options);
- int num_records = 4;
- int number_of_operands = 0;
- std::vector<PinnableSlice> values(num_records);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = num_records;
- ASSERT_OK(Merge("k1", "v1"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k1", "v2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k1", "v3"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k1", "v4"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(number_of_operands, 4);
- ASSERT_EQ(values[0].ToString(), "v1");
- ASSERT_EQ(values[1].ToString(), "v2");
- ASSERT_EQ(values[2].ToString(), "v3");
- ASSERT_EQ(values[3].ToString(), "v4");
- }
- TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {
- // Repro for a bug where a memtable containing a merge operand could be
- // deleted before the merge operand was saved to the result.
- auto options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- Reopen(options);
- ASSERT_OK(Merge("key", "value"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::GetImpl:PostMemTableGet:0",
- "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
- {"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
- "DBImpl::GetImpl:PostMemTableGet:1"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- auto flush_thread = port::Thread([&]() {
- TEST_SYNC_POINT(
- "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
- ASSERT_OK(Flush());
- TEST_SYNC_POINT(
- "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
- });
- PinnableSlice value;
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = 1;
- int number_of_operands;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", &value, &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(1, number_of_operands);
- flush_thread.join();
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
- Options options = CurrentOptions();
- int limit = 2;
- // Use only the latest two merge operands.
- options.merge_operator =
- std::make_shared<LimitedStringAppendMergeOp>(limit, ',');
- Reopen(options);
- int num_records = 4;
- int number_of_operands = 0;
- std::vector<PinnableSlice> values(num_records);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = num_records;
- // k0 value in memtable
- ASSERT_OK(Put("k0", "PutARock"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k0", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "PutARock");
- // k0.1 value in SST
- ASSERT_OK(Put("k0.1", "RockInSST"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k0.1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "RockInSST");
- // All k1 values are in memtable.
- ASSERT_OK(Merge("k1", "a"));
- ASSERT_OK(Put("k1", "x"));
- ASSERT_OK(Merge("k1", "b"));
- ASSERT_OK(Merge("k1", "c"));
- ASSERT_OK(Merge("k1", "d"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "x");
- ASSERT_EQ(values[1], "b");
- ASSERT_EQ(values[2], "c");
- ASSERT_EQ(values[3], "d");
- // expected_max_number_of_operands is less than number of merge operands so
- // status should be Incomplete.
- merge_operands_info.expected_max_number_of_operands = num_records - 1;
- Status status = db_->GetMergeOperands(
- ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
- &merge_operands_info, &number_of_operands);
- ASSERT_EQ(status.IsIncomplete(), true);
- merge_operands_info.expected_max_number_of_operands = num_records;
- // All k1.1 values are in memtable.
- ASSERT_OK(Merge("k1.1", "r"));
- ASSERT_OK(Delete("k1.1"));
- ASSERT_OK(Merge("k1.1", "c"));
- ASSERT_OK(Merge("k1.1", "k"));
- ASSERT_OK(Merge("k1.1", "s"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k1.1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "c");
- ASSERT_EQ(values[1], "k");
- ASSERT_EQ(values[2], "s");
- // All k2 values are flushed to L0 into a single file.
- ASSERT_OK(Merge("k2", "q"));
- ASSERT_OK(Merge("k2", "w"));
- ASSERT_OK(Merge("k2", "e"));
- ASSERT_OK(Merge("k2", "r"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k2", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "q");
- ASSERT_EQ(values[1], "w");
- ASSERT_EQ(values[2], "e");
- ASSERT_EQ(values[3], "r");
- // All k2.1 values are flushed to L0 into a single file.
- ASSERT_OK(Merge("k2.1", "m"));
- ASSERT_OK(Put("k2.1", "l"));
- ASSERT_OK(Merge("k2.1", "n"));
- ASSERT_OK(Merge("k2.1", "o"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k2.1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "l,n,o");
- // All k2.2 values are flushed to L0 into a single file.
- ASSERT_OK(Merge("k2.2", "g"));
- ASSERT_OK(Delete("k2.2"));
- ASSERT_OK(Merge("k2.2", "o"));
- ASSERT_OK(Merge("k2.2", "t"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k2.2", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "o,t");
- // Do some compaction that will make the following tests more predictable
- // Slice start("PutARock");
- // Slice end("t");
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // All k3 values are flushed and are in different files.
- ASSERT_OK(Merge("k3", "ab"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "bc"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "cd"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "de"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k3", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "ab");
- ASSERT_EQ(values[1], "bc");
- ASSERT_EQ(values[2], "cd");
- ASSERT_EQ(values[3], "de");
- // All k3.1 values are flushed and are in different files.
- ASSERT_OK(Merge("k3.1", "ab"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("k3.1", "bc"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3.1", "cd"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3.1", "de"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k3.1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "bc");
- ASSERT_EQ(values[1], "cd");
- ASSERT_EQ(values[2], "de");
- // All k3.2 values are flushed and are in different files.
- ASSERT_OK(Merge("k3.2", "ab"));
- ASSERT_OK(Flush());
- ASSERT_OK(Delete("k3.2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3.2", "cd"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3.2", "de"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k3.2", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(number_of_operands, 2);
- ASSERT_EQ(values[0], "cd");
- ASSERT_EQ(values[1], "de");
- // All K4 values are in different levels
- ASSERT_OK(Merge("k4", "ba"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(4);
- ASSERT_OK(Merge("k4", "cb"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(3);
- ASSERT_OK(Merge("k4", "dc"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(1);
- ASSERT_OK(Merge("k4", "ed"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k4", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(number_of_operands, 4);
- ASSERT_EQ(values[0], "ba");
- ASSERT_EQ(values[1], "cb");
- ASSERT_EQ(values[2], "dc");
- ASSERT_EQ(values[3], "ed");
- // First 3 k5 values are in SST and next 4 k5 values are in Immutable
- // Memtable
- ASSERT_OK(Merge("k5", "who"));
- ASSERT_OK(Merge("k5", "am"));
- ASSERT_OK(Merge("k5", "i"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("k5", "remember"));
- ASSERT_OK(Merge("k5", "i"));
- ASSERT_OK(Merge("k5", "am"));
- ASSERT_OK(Merge("k5", "rocks"));
- ASSERT_OK(dbfull()->TEST_SwitchMemtable());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k5", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(number_of_operands, 4);
- ASSERT_EQ(values[0], "remember");
- ASSERT_EQ(values[1], "i");
- ASSERT_EQ(values[2], "am");
- ASSERT_EQ(values[3], "rocks");
- // GetMergeOperands() in ReadOnly DB
- ASSERT_OK(Merge("k6", "better"));
- ASSERT_OK(Merge("k6", "call"));
- ASSERT_OK(Merge("k6", "saul"));
- ASSERT_OK(ReadOnlyReopen(options));
- std::vector<PinnableSlice> readonly_values(num_records);
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k6", readonly_values.data(),
- &merge_operands_info, &number_of_operands));
- ASSERT_EQ(number_of_operands, 3);
- ASSERT_EQ(readonly_values[0], "better");
- ASSERT_EQ(readonly_values[1], "call");
- ASSERT_EQ(readonly_values[2], "saul");
- }
- TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
- Options options = CurrentOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- // Use only the latest two merge operands.
- options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
- Reopen(options);
- int num_records = 4;
- int number_of_operands = 0;
- std::vector<PinnableSlice> values(num_records);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = num_records;
- // All k1 values are in memtable.
- ASSERT_OK(Put("k1", "x"));
- ASSERT_OK(Merge("k1", "b"));
- ASSERT_OK(Merge("k1", "c"));
- ASSERT_OK(Merge("k1", "d"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k1", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "x");
- ASSERT_EQ(values[1], "b");
- ASSERT_EQ(values[2], "c");
- ASSERT_EQ(values[3], "d");
- // expected_max_number_of_operands is less than number of merge operands so
- // status should be Incomplete.
- merge_operands_info.expected_max_number_of_operands = num_records - 1;
- Status status = db_->GetMergeOperands(
- ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
- &merge_operands_info, &number_of_operands);
- ASSERT_EQ(status.IsIncomplete(), true);
- merge_operands_info.expected_max_number_of_operands = num_records;
- // All k2 values are flushed to L0 into a single file.
- ASSERT_OK(Put("k2", "q"));
- ASSERT_OK(Merge("k2", "w"));
- ASSERT_OK(Merge("k2", "e"));
- ASSERT_OK(Merge("k2", "r"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k2", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "q,w,e,r");
- // Do some compaction that will make the following tests more predictable
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // All k3 values are flushed and are in different files.
- ASSERT_OK(Put("k3", "ab"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "bc"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "cd"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "de"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k3", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "ab");
- ASSERT_EQ(values[1], "bc");
- ASSERT_EQ(values[2], "cd");
- ASSERT_EQ(values[3], "de");
- // All K4 values are in different levels
- ASSERT_OK(Put("k4", "ba"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(4);
- ASSERT_OK(Merge("k4", "cb"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(3);
- ASSERT_OK(Merge("k4", "dc"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(1);
- ASSERT_OK(Merge("k4", "ed"));
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "k4", values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(values[0], "ba");
- ASSERT_EQ(values[1], "cb");
- ASSERT_EQ(values[2], "dc");
- ASSERT_EQ(values[3], "ed");
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) {
- // These constants are chosen to trigger the large result optimization
- // (pinning a bundle of `DBImpl` resources).
- const int kNumOperands = 1024;
- const int kOperandLen = 1024;
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- Random rnd(301);
- std::vector<std::string> expected_merge_operands;
- expected_merge_operands.reserve(kNumOperands);
- for (int i = 0; i < kNumOperands; ++i) {
- expected_merge_operands.emplace_back(rnd.RandomString(kOperandLen));
- ASSERT_OK(Merge("key", expected_merge_operands.back()));
- }
- std::vector<PinnableSlice> merge_operands(kNumOperands);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = kNumOperands;
- int num_merge_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", merge_operands.data(),
- &merge_operands_info, &num_merge_operands));
- ASSERT_EQ(num_merge_operands, kNumOperands);
- // Ensures the large result optimization was used.
- for (int i = 0; i < kNumOperands; ++i) {
- ASSERT_TRUE(merge_operands[i].IsPinned());
- }
- // Add a Flush() to change the `SuperVersion` to challenge the resource
- // pinning.
- ASSERT_OK(Flush());
- for (int i = 0; i < kNumOperands; ++i) {
- ASSERT_EQ(expected_merge_operands[i], merge_operands[i]);
- }
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInMemtable) {
- const int kNumOperands = 10;
- const int kNumOperandsToFetch = 5;
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- Random rnd(301);
- std::vector<std::string> expected_merge_operands;
- expected_merge_operands.reserve(kNumOperands);
- for (int i = 0; i < kNumOperands; ++i) {
- expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
- ASSERT_OK(Merge("key", expected_merge_operands.back()));
- }
- std::vector<PinnableSlice> merge_operands(kNumOperands);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = kNumOperands;
- int num_fetched = 0;
- merge_operands_info.continue_cb = [&](Slice /* value */) {
- num_fetched++;
- return num_fetched != kNumOperandsToFetch;
- };
- int num_merge_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", merge_operands.data(),
- &merge_operands_info, &num_merge_operands));
- ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
- ASSERT_EQ(kNumOperandsToFetch, num_fetched);
- for (int i = 0; i < kNumOperandsToFetch; ++i) {
- ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
- merge_operands[i]);
- }
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitBaseValue) {
- // The continuation callback doesn't need to be called on a base value because
- // there's no remaining work to be saved.
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- Random rnd(301);
- std::string expected_value = rnd.RandomString(7 /* len */);
- ASSERT_OK(Put("key", expected_value));
- std::vector<PinnableSlice> merge_operands(1);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = 1;
- int num_fetched = 0;
- merge_operands_info.continue_cb = [&num_fetched](Slice /* value */) {
- num_fetched++;
- return true;
- };
- int num_merge_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", merge_operands.data(),
- &merge_operands_info, &num_merge_operands));
- ASSERT_EQ(1, num_merge_operands);
- ASSERT_EQ(0, num_fetched);
- ASSERT_EQ(expected_value, merge_operands[0]);
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInSst) {
- const int kNumOperands = 10;
- const int kNumOperandsToFetch = 5;
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- Random rnd(301);
- std::vector<std::string> expected_merge_operands;
- expected_merge_operands.reserve(kNumOperands);
- for (int i = 0; i < kNumOperands; ++i) {
- expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
- ASSERT_OK(Merge("key", expected_merge_operands.back()));
- ASSERT_OK(Flush());
- }
- std::vector<PinnableSlice> merge_operands(kNumOperands);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = kNumOperands;
- int num_fetched = 0;
- merge_operands_info.continue_cb = [&](Slice /* value */) {
- num_fetched++;
- return num_fetched != kNumOperandsToFetch;
- };
- int num_merge_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", merge_operands.data(),
- &merge_operands_info, &num_merge_operands));
- ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
- ASSERT_EQ(kNumOperandsToFetch, num_fetched);
- for (int i = 0; i < kNumOperandsToFetch; ++i) {
- ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
- merge_operands[i]);
- }
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandsBaseDeletionInImmMem) {
- // In this test, "k1" has a MERGE in a mutable memtable on top of a base
- // DELETE in an immutable memtable.
- Options opts = CurrentOptions();
- opts.max_write_buffer_number = 10;
- opts.min_write_buffer_number_to_merge = 10;
- opts.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
- Reopen(opts);
- ASSERT_OK(Put("k1", "val"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("k0", "val"));
- ASSERT_OK(Delete("k1"));
- ASSERT_OK(Put("k2", "val"));
- ASSERT_OK(dbfull()->TEST_SwitchMemtable());
- ASSERT_OK(Merge("k1", "val"));
- {
- std::vector<PinnableSlice> values(2);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands =
- static_cast<int>(values.size());
- std::string key = "k1", from_db;
- int number_of_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- key, values.data(), &merge_operands_info,
- &number_of_operands));
- ASSERT_EQ(1, number_of_operands);
- from_db = values[0].ToString();
- ASSERT_EQ("val", from_db);
- }
- {
- std::string val;
- ASSERT_OK(db_->Get(ReadOptions(), "k1", &val));
- ASSERT_EQ("val", val);
- }
- }
- TEST_F(DBMergeOperandTest, GetMergeOperandCallbackStopAtImm) {
- Options options = CurrentOptions();
- options.max_write_buffer_number = 10;
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- Random rnd(301);
- ASSERT_OK(db_->PauseBackgroundWork());
- ASSERT_OK(Merge("key", "v1"));
- ASSERT_OK(dbfull()->TEST_SwitchMemtable());
- // Keep this merge in an immutable memtable
- uint64_t num_imm = 0;
- ASSERT_TRUE(
- db_->GetIntProperty(DB::Properties::kNumImmutableMemTable, &num_imm));
- ASSERT_EQ(num_imm, 1);
- ASSERT_OK(Merge("key", "v2"));
- std::vector<PinnableSlice> merge_operands(2);
- GetMergeOperandsOptions merge_operands_info;
- merge_operands_info.expected_max_number_of_operands = 2;
- int num_fetched = 0;
- merge_operands_info.continue_cb = [&num_fetched](Slice /* value */) {
- num_fetched++;
- // Stop in the first immutable memtable.
- return num_fetched < 2;
- };
- int num_merge_operands = 0;
- ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
- "key", merge_operands.data(),
- &merge_operands_info, &num_merge_operands));
- ASSERT_EQ(2, num_merge_operands);
- ASSERT_EQ(2, num_fetched);
- ASSERT_EQ("v1", merge_operands[0]);
- ASSERT_EQ("v2", merge_operands[1]);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|