| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <string>
- #include <vector>
- #include "db/db_test_util.h"
- #include "db/forward_iterator.h"
- #include "port/stack_trace.h"
- #include "rocksdb/merge_operator.h"
- #include "utilities/merge_operators.h"
- #include "utilities/merge_operators/string_append/stringappend2.h"
- namespace ROCKSDB_NAMESPACE {
- class TestReadCallback : public ReadCallback {
- public:
- TestReadCallback(SnapshotChecker* snapshot_checker,
- SequenceNumber snapshot_seq)
- : ReadCallback(snapshot_seq),
- snapshot_checker_(snapshot_checker),
- snapshot_seq_(snapshot_seq) {}
- bool IsVisibleFullCheck(SequenceNumber seq) override {
- return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
- SnapshotCheckerResult::kInSnapshot;
- }
- private:
- SnapshotChecker* snapshot_checker_;
- SequenceNumber snapshot_seq_;
- };
- // Test merge operator functionality.
- class DBMergeOperatorTest : public DBTestBase {
- public:
- DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
- std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
- const Slice& key,
- const Snapshot* snapshot = nullptr) {
- SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
- : snapshot->GetSequenceNumber();
- TestReadCallback read_callback(snapshot_checker, seq);
- ReadOptions read_opt;
- read_opt.snapshot = snapshot;
- PinnableSlice value;
- DBImpl::GetImplOptions get_impl_options;
- get_impl_options.column_family = db_->DefaultColumnFamily();
- get_impl_options.value = &value;
- get_impl_options.callback = &read_callback;
- Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
- if (!s.ok()) {
- return s.ToString();
- }
- return value.ToString();
- }
- };
- TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
- class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
- public:
- LimitedStringAppendMergeOp(int limit, char delim)
- : StringAppendTESTOperator(delim), limit_(limit) {}
- const char* Name() const override {
- return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
- }
- bool ShouldMerge(const std::vector<Slice>& operands) const override {
- if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
- return true;
- }
- return false;
- }
- private:
- size_t limit_ = 0;
- };
- Options options;
- options.create_if_missing = true;
- // Use only the latest two merge operands.
- options.merge_operator =
- std::make_shared<LimitedStringAppendMergeOp>(2, ',');
- options.env = env_;
- Reopen(options);
- // All K1 values are in memtable.
- ASSERT_OK(Merge("k1", "a"));
- ASSERT_OK(Merge("k1", "b"));
- ASSERT_OK(Merge("k1", "c"));
- ASSERT_OK(Merge("k1", "d"));
- std::string value;
- ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
- // Make sure that only the latest two merge operands are used. If this was
- // not the case the value would be "a,b,c,d".
- ASSERT_EQ(value, "c,d");
- // All K2 values are flushed to L0 into a single file.
- ASSERT_OK(Merge("k2", "a"));
- ASSERT_OK(Merge("k2", "b"));
- ASSERT_OK(Merge("k2", "c"));
- ASSERT_OK(Merge("k2", "d"));
- ASSERT_OK(Flush());
- ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
- ASSERT_EQ(value, "c,d");
- // All K3 values are flushed and are in different files.
- ASSERT_OK(Merge("k3", "ab"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "bc"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "cd"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("k3", "de"));
- ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
- ASSERT_EQ(value, "cd,de");
- // All K4 values are in different levels
- ASSERT_OK(Merge("k4", "ab"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(4);
- ASSERT_OK(Merge("k4", "bc"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(3);
- ASSERT_OK(Merge("k4", "cd"));
- ASSERT_OK(Flush());
- MoveFilesToLevel(1);
- ASSERT_OK(Merge("k4", "de"));
- ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
- ASSERT_EQ(value, "cd,de");
- }
- TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
- Options options;
- options.create_if_missing = true;
- options.merge_operator.reset(new TestPutOperator());
- options.env = env_;
- Reopen(options);
- ASSERT_OK(Merge("k1", "v1"));
- ASSERT_OK(Merge("k1", "corrupted"));
- std::string value;
- ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
- VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
- }
- TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
- Options options;
- options.create_if_missing = true;
- options.merge_operator.reset(new TestPutOperator());
- options.max_successive_merges = 3;
- options.env = env_;
- Reopen(options);
- ASSERT_OK(Merge("k1", "v1"));
- ASSERT_OK(Merge("k1", "v2"));
- // Will trigger a merge when hitting max_successive_merges and the merge
- // will fail. The delta will be inserted nevertheless.
- ASSERT_OK(Merge("k1", "corrupted"));
- // Data should stay unmerged after the error.
- VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
- }
- TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
- Options options;
- options.create_if_missing = true;
- options.merge_operator.reset(new TestPutOperator());
- options.env = env_;
- DestroyAndReopen(options);
- ASSERT_OK(Merge("k1", "v1"));
- ASSERT_OK(Merge("k1", "corrupted"));
- ASSERT_OK(Put("k2", "v2"));
- auto* iter = db_->NewIterator(ReadOptions());
- iter->Seek("k1");
- ASSERT_FALSE(iter->Valid());
- ASSERT_TRUE(iter->status().IsCorruption());
- delete iter;
- iter = db_->NewIterator(ReadOptions());
- iter->Seek("k2");
- ASSERT_TRUE(iter->Valid());
- ASSERT_OK(iter->status());
- iter->Prev();
- ASSERT_FALSE(iter->Valid());
- ASSERT_TRUE(iter->status().IsCorruption());
- delete iter;
- VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
- DestroyAndReopen(options);
- ASSERT_OK(Merge("k1", "v1"));
- ASSERT_OK(Put("k2", "v2"));
- ASSERT_OK(Merge("k2", "corrupted"));
- iter = db_->NewIterator(ReadOptions());
- iter->Seek("k1");
- ASSERT_TRUE(iter->Valid());
- ASSERT_OK(iter->status());
- iter->Next();
- ASSERT_FALSE(iter->Valid());
- ASSERT_TRUE(iter->status().IsCorruption());
- delete iter;
- VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
- }
- class MergeOperatorPinningTest : public DBMergeOperatorTest,
- public testing::WithParamInterface<bool> {
- public:
- MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
- bool disable_block_cache_;
- };
- INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
- ::testing::Bool());
- #ifndef ROCKSDB_LITE
- TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
- Options options = CurrentOptions();
- BlockBasedTableOptions table_options;
- table_options.block_size = 1; // every block will contain one entry
- table_options.no_block_cache = disable_block_cache_;
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
- options.level0_slowdown_writes_trigger = (1 << 30);
- options.level0_stop_writes_trigger = (1 << 30);
- options.disable_auto_compactions = true;
- DestroyAndReopen(options);
- const int kKeysPerFile = 10;
- const int kOperandsPerKeyPerFile = 7;
- const int kOperandSize = 100;
- // Filse to write in L0 before compacting to lower level
- const int kFilesPerLevel = 3;
- Random rnd(301);
- std::map<std::string, std::string> true_data;
- int batch_num = 1;
- int lvl_to_fill = 4;
- int key_id = 0;
- while (true) {
- for (int j = 0; j < kKeysPerFile; j++) {
- std::string key = Key(key_id % 35);
- key_id++;
- for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
- std::string val = RandomString(&rnd, kOperandSize);
- ASSERT_OK(db_->Merge(WriteOptions(), key, val));
- if (true_data[key].size() == 0) {
- true_data[key] = val;
- } else {
- true_data[key] += "," + val;
- }
- }
- }
- if (lvl_to_fill == -1) {
- // Keep last batch in memtable and stop
- break;
- }
- ASSERT_OK(Flush());
- if (batch_num % kFilesPerLevel == 0) {
- if (lvl_to_fill != 0) {
- MoveFilesToLevel(lvl_to_fill);
- }
- lvl_to_fill--;
- }
- batch_num++;
- }
- // 3 L0 files
- // 1 L1 file
- // 3 L2 files
- // 1 L3 file
- // 3 L4 Files
- ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
- VerifyDBFromMap(true_data);
- }
- class MergeOperatorHook : public MergeOperator {
- public:
- explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
- : merge_op_(_merge_op) {}
- bool FullMergeV2(const MergeOperationInput& merge_in,
- MergeOperationOutput* merge_out) const override {
- before_merge_();
- bool res = merge_op_->FullMergeV2(merge_in, merge_out);
- after_merge_();
- return res;
- }
- const char* Name() const override { return merge_op_->Name(); }
- std::shared_ptr<MergeOperator> merge_op_;
- std::function<void()> before_merge_ = []() {};
- std::function<void()> after_merge_ = []() {};
- };
- TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
- Options options = CurrentOptions();
- auto merge_hook =
- std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
- options.merge_operator = merge_hook;
- options.disable_auto_compactions = true;
- options.level0_slowdown_writes_trigger = (1 << 30);
- options.level0_stop_writes_trigger = (1 << 30);
- options.max_open_files = 20;
- BlockBasedTableOptions bbto;
- bbto.no_block_cache = disable_block_cache_;
- if (bbto.no_block_cache == false) {
- bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
- } else {
- bbto.block_cache = nullptr;
- }
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- DestroyAndReopen(options);
- const int kNumOperands = 30;
- const int kNumKeys = 1000;
- const int kOperandSize = 100;
- Random rnd(301);
- // 1000 keys every key have 30 operands, every operand is in a different file
- std::map<std::string, std::string> true_data;
- for (int i = 0; i < kNumOperands; i++) {
- for (int j = 0; j < kNumKeys; j++) {
- std::string k = Key(j);
- std::string v = RandomString(&rnd, kOperandSize);
- ASSERT_OK(db_->Merge(WriteOptions(), k, v));
- true_data[k] = std::max(true_data[k], v);
- }
- ASSERT_OK(Flush());
- }
- std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
- ASSERT_EQ(file_numbers.size(), kNumOperands);
- int merge_cnt = 0;
- // Code executed before merge operation
- merge_hook->before_merge_ = [&]() {
- // Evict all tables from cache before every merge operation
- for (uint64_t num : file_numbers) {
- TableCache::Evict(dbfull()->TEST_table_cache(), num);
- }
- // Decrease cache capacity to force all unrefed blocks to be evicted
- if (bbto.block_cache) {
- bbto.block_cache->SetCapacity(1);
- }
- merge_cnt++;
- };
- // Code executed after merge operation
- merge_hook->after_merge_ = [&]() {
- // Increase capacity again after doing the merge
- if (bbto.block_cache) {
- bbto.block_cache->SetCapacity(64 * 1024 * 1024);
- }
- };
- size_t total_reads;
- VerifyDBFromMap(true_data, &total_reads);
- ASSERT_EQ(merge_cnt, total_reads);
- db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- VerifyDBFromMap(true_data, &total_reads);
- }
- TEST_P(MergeOperatorPinningTest, TailingIterator) {
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateMaxOperator();
- BlockBasedTableOptions bbto;
- bbto.no_block_cache = disable_block_cache_;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- DestroyAndReopen(options);
- const int kNumOperands = 100;
- const int kNumWrites = 100000;
- std::function<void()> writer_func = [&]() {
- int k = 0;
- for (int i = 0; i < kNumWrites; i++) {
- db_->Merge(WriteOptions(), Key(k), Key(k));
- if (i && i % kNumOperands == 0) {
- k++;
- }
- if (i && i % 127 == 0) {
- ASSERT_OK(Flush());
- }
- if (i && i % 317 == 0) {
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- }
- }
- };
- std::function<void()> reader_func = [&]() {
- ReadOptions ro;
- ro.tailing = true;
- Iterator* iter = db_->NewIterator(ro);
- iter->SeekToFirst();
- for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
- while (!iter->Valid()) {
- // wait for the key to be written
- env_->SleepForMicroseconds(100);
- iter->Seek(Key(i));
- }
- ASSERT_EQ(iter->key(), Key(i));
- ASSERT_EQ(iter->value(), Key(i));
- iter->Next();
- }
- delete iter;
- };
- ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
- ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
- writer_thread.join();
- reader_thread.join();
- }
- TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- // Overview of the test:
- // * There are two merge operands for the same key: one in an sst file,
- // another in a memtable.
- // * Seek a tailing iterator to this key.
- // * As part of the seek, the iterator will:
- // (a) first visit the operand in the memtable and tell ForwardIterator
- // to pin this operand, then
- // (b) move on to the operand in the sst file, then pass both operands
- // to merge operator.
- // * The memtable may get flushed and unreferenced by another thread between
- // (a) and (b). The test simulates it by flushing the memtable inside a
- // SyncPoint callback located between (a) and (b).
- // * In this case it's ForwardIterator's responsibility to keep the memtable
- // pinned until (b) is complete. There used to be a bug causing
- // ForwardIterator to not pin it in some circumstances. This test
- // reproduces it.
- db_->Merge(WriteOptions(), "key", "sst");
- db_->Flush(FlushOptions()); // Switch to SuperVersion A
- db_->Merge(WriteOptions(), "key", "memtable");
- // Pin SuperVersion A
- std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
- bool pushed_first_operand = false;
- bool stepped_to_next_operand = false;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
- EXPECT_FALSE(pushed_first_operand);
- pushed_first_operand = true;
- db_->Flush(FlushOptions()); // Switch to SuperVersion B
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
- EXPECT_FALSE(stepped_to_next_operand);
- stepped_to_next_operand = true;
- someone_else.reset(); // Unpin SuperVersion A
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ReadOptions ro;
- ro.tailing = true;
- std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
- iter->Seek("key");
- ASSERT_TRUE(iter->status().ok());
- ASSERT_TRUE(iter->Valid());
- EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
- EXPECT_TRUE(pushed_first_operand);
- EXPECT_TRUE(stepped_to_next_operand);
- }
- #endif // ROCKSDB_LITE
- TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- DestroyAndReopen(options);
- class TestSnapshotChecker : public SnapshotChecker {
- public:
- SnapshotCheckerResult CheckInSnapshot(
- SequenceNumber seq, SequenceNumber snapshot_seq) const override {
- return IsInSnapshot(seq, snapshot_seq)
- ? SnapshotCheckerResult::kInSnapshot
- : SnapshotCheckerResult::kNotInSnapshot;
- }
- bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
- switch (snapshot_seq) {
- case 0:
- return seq == 0;
- case 1:
- return seq <= 1;
- case 2:
- // seq = 2 not visible to snapshot with seq = 2
- return seq <= 1;
- case 3:
- return seq <= 3;
- case 4:
- // seq = 4 not visible to snpahost with seq = 4
- return seq <= 3;
- default:
- // seq >=4 is uncommitted
- return seq <= 4;
- };
- }
- };
- TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
- dbfull()->SetSnapshotChecker(snapshot_checker);
- std::string value;
- ASSERT_OK(Merge("foo", "v1"));
- ASSERT_EQ(1, db_->GetLatestSequenceNumber());
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
- ASSERT_OK(Merge("foo", "v2"));
- ASSERT_EQ(2, db_->GetLatestSequenceNumber());
- // v2 is not visible to latest snapshot, which has seq = 2.
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
- // Take a snapshot with seq = 2.
- const Snapshot* snapshot1 = db_->GetSnapshot();
- ASSERT_EQ(2, snapshot1->GetSequenceNumber());
- // v2 is not visible to snapshot1, which has seq = 2
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
- // Verify flush doesn't alter the result.
- ASSERT_OK(Flush());
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
- ASSERT_OK(Merge("foo", "v3"));
- ASSERT_EQ(3, db_->GetLatestSequenceNumber());
- ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
- ASSERT_OK(Merge("foo", "v4"));
- ASSERT_EQ(4, db_->GetLatestSequenceNumber());
- // v4 is not visible to latest snapshot, which has seq = 4.
- ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
- const Snapshot* snapshot2 = db_->GetSnapshot();
- ASSERT_EQ(4, snapshot2->GetSequenceNumber());
- // v4 is not visible to snapshot2, which has seq = 4.
- ASSERT_EQ("v1,v2,v3",
- GetWithReadCallback(snapshot_checker, "foo", snapshot2));
- // Verify flush doesn't alter the result.
- ASSERT_OK(Flush());
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
- ASSERT_EQ("v1,v2,v3",
- GetWithReadCallback(snapshot_checker, "foo", snapshot2));
- ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
- ASSERT_OK(Merge("foo", "v5"));
- ASSERT_EQ(5, db_->GetLatestSequenceNumber());
- // v5 is uncommitted
- ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
- // full manual compaction.
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // Verify compaction doesn't alter the result.
- ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
- ASSERT_EQ("v1,v2,v3",
- GetWithReadCallback(snapshot_checker, "foo", snapshot2));
- ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
- db_->ReleaseSnapshot(snapshot1);
- db_->ReleaseSnapshot(snapshot2);
- }
- class PerConfigMergeOperatorPinningTest
- : public DBMergeOperatorTest,
- public testing::WithParamInterface<std::tuple<bool, int>> {
- public:
- PerConfigMergeOperatorPinningTest() {
- std::tie(disable_block_cache_, option_config_) = GetParam();
- }
- bool disable_block_cache_;
- };
- INSTANTIATE_TEST_CASE_P(
- MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
- ::testing::Combine(::testing::Bool(),
- ::testing::Range(static_cast<int>(DBTestBase::kDefault),
- static_cast<int>(DBTestBase::kEnd))));
- TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
- if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
- return;
- }
- Options options = CurrentOptions();
- options.merge_operator = MergeOperators::CreateMaxOperator();
- BlockBasedTableOptions table_options;
- table_options.no_block_cache = disable_block_cache_;
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- DestroyAndReopen(options);
- Random rnd(301);
- std::map<std::string, std::string> true_data;
- const int kTotalMerges = 5000;
- // Every key gets ~10 operands
- const int kKeyRange = kTotalMerges / 10;
- const int kOperandSize = 20;
- const int kNumPutBefore = kKeyRange / 10; // 10% value
- const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
- const int kNumDelete = kKeyRange / 10; // 10% delete
- // kNumPutBefore keys will have base values
- for (int i = 0; i < kNumPutBefore; i++) {
- std::string key = Key(rnd.Next() % kKeyRange);
- std::string value = RandomString(&rnd, kOperandSize);
- ASSERT_OK(db_->Put(WriteOptions(), key, value));
- true_data[key] = value;
- }
- // Do kTotalMerges merges
- for (int i = 0; i < kTotalMerges; i++) {
- std::string key = Key(rnd.Next() % kKeyRange);
- std::string value = RandomString(&rnd, kOperandSize);
- ASSERT_OK(db_->Merge(WriteOptions(), key, value));
- if (true_data[key] < value) {
- true_data[key] = value;
- }
- }
- // Overwrite random kNumPutAfter keys
- for (int i = 0; i < kNumPutAfter; i++) {
- std::string key = Key(rnd.Next() % kKeyRange);
- std::string value = RandomString(&rnd, kOperandSize);
- ASSERT_OK(db_->Put(WriteOptions(), key, value));
- true_data[key] = value;
- }
- // Delete random kNumDelete keys
- for (int i = 0; i < kNumDelete; i++) {
- std::string key = Key(rnd.Next() % kKeyRange);
- ASSERT_OK(db_->Delete(WriteOptions(), key));
- true_data.erase(key);
- }
- VerifyDBFromMap(true_data);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|