| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/blob/blob_index.h"
- #include "db/blob/blob_log_format.h"
- #include "db/db_test_util.h"
- #include "port/stack_trace.h"
- #include "test_util/sync_point.h"
- namespace ROCKSDB_NAMESPACE {
- class DBBlobCompactionTest : public DBTestBase {
- public:
- explicit DBBlobCompactionTest()
- : DBTestBase("db_blob_compaction_test", /*env_do_fsync=*/false) {}
- const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
- VersionSet* const versions = dbfull()->GetVersionSet();
- assert(versions);
- assert(versions->GetColumnFamilySet());
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- assert(cfd);
- const InternalStats* const internal_stats = cfd->internal_stats();
- assert(internal_stats);
- return internal_stats->TEST_GetCompactionStats();
- }
- };
- namespace {
- class FilterByKeyLength : public CompactionFilter {
- public:
- explicit FilterByKeyLength(size_t len) : length_threshold_(len) {}
- const char* Name() const override {
- return "rocksdb.compaction.filter.by.key.length";
- }
- CompactionFilter::Decision FilterBlobByKey(
- int /*level*/, const Slice& key, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- if (key.size() < length_threshold_) {
- return CompactionFilter::Decision::kRemove;
- }
- return CompactionFilter::Decision::kKeep;
- }
- private:
- size_t length_threshold_;
- };
- class FilterByValueLength : public CompactionFilter {
- public:
- explicit FilterByValueLength(size_t len) : length_threshold_(len) {}
- const char* Name() const override {
- return "rocksdb.compaction.filter.by.value.length";
- }
- CompactionFilter::Decision FilterV2(
- int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
- const Slice& existing_value, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- if (existing_value.size() < length_threshold_) {
- return CompactionFilter::Decision::kRemove;
- }
- return CompactionFilter::Decision::kKeep;
- }
- private:
- size_t length_threshold_;
- };
- class BadBlobCompactionFilter : public CompactionFilter {
- public:
- explicit BadBlobCompactionFilter(std::string prefix,
- CompactionFilter::Decision filter_by_key,
- CompactionFilter::Decision filter_v2)
- : prefix_(std::move(prefix)),
- filter_blob_by_key_(filter_by_key),
- filter_v2_(filter_v2) {}
- const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
- CompactionFilter::Decision FilterBlobByKey(
- int /*level*/, const Slice& key, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- if (key.size() >= prefix_.size() &&
- 0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
- return CompactionFilter::Decision::kUndetermined;
- }
- return filter_blob_by_key_;
- }
- CompactionFilter::Decision FilterV2(
- int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
- const Slice& /*existing_value*/, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- return filter_v2_;
- }
- private:
- const std::string prefix_;
- const CompactionFilter::Decision filter_blob_by_key_;
- const CompactionFilter::Decision filter_v2_;
- };
- class ValueBlindWriteFilter : public CompactionFilter {
- public:
- explicit ValueBlindWriteFilter(std::string new_val)
- : new_value_(std::move(new_val)) {}
- const char* Name() const override {
- return "rocksdb.compaction.filter.blind.write";
- }
- CompactionFilter::Decision FilterBlobByKey(
- int level, const Slice& key, std::string* new_value,
- std::string* skip_until) const override;
- private:
- const std::string new_value_;
- };
- CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
- int /*level*/, const Slice& /*key*/, std::string* new_value,
- std::string* /*skip_until*/) const {
- assert(new_value);
- new_value->assign(new_value_);
- return CompactionFilter::Decision::kChangeValue;
- }
- class ValueMutationFilter : public CompactionFilter {
- public:
- explicit ValueMutationFilter(std::string padding)
- : padding_(std::move(padding)) {}
- const char* Name() const override {
- return "rocksdb.compaction.filter.value.mutation";
- }
- CompactionFilter::Decision FilterV2(int level, const Slice& key,
- ValueType value_type,
- const Slice& existing_value,
- std::string* new_value,
- std::string* skip_until) const override;
- private:
- const std::string padding_;
- };
- CompactionFilter::Decision ValueMutationFilter::FilterV2(
- int /*level*/, const Slice& /*key*/, ValueType value_type,
- const Slice& existing_value, std::string* new_value,
- std::string* /*skip_until*/) const {
- assert(CompactionFilter::ValueType::kBlobIndex != value_type);
- if (CompactionFilter::ValueType::kValue != value_type) {
- return CompactionFilter::Decision::kKeep;
- }
- assert(new_value);
- new_value->assign(existing_value.data(), existing_value.size());
- new_value->append(padding_);
- return CompactionFilter::Decision::kChangeValue;
- }
- class AlwaysKeepFilter : public CompactionFilter {
- public:
- explicit AlwaysKeepFilter() = default;
- const char* Name() const override {
- return "rocksdb.compaction.filter.always.keep";
- }
- CompactionFilter::Decision FilterV2(
- int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
- const Slice& /*existing_value*/, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- return CompactionFilter::Decision::kKeep;
- }
- };
- class SkipUntilFilter : public CompactionFilter {
- public:
- explicit SkipUntilFilter(std::string skip_until)
- : skip_until_(std::move(skip_until)) {}
- const char* Name() const override {
- return "rocksdb.compaction.filter.skip.until";
- }
- CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */,
- ValueType /* value_type */,
- const Slice& /* existing_value */,
- std::string* /* new_value */,
- std::string* skip_until) const override {
- assert(skip_until);
- *skip_until = skip_until_;
- return CompactionFilter::Decision::kRemoveAndSkipUntil;
- }
- private:
- std::string skip_until_;
- };
- } // anonymous namespace
- class DBBlobBadCompactionFilterTest
- : public DBBlobCompactionTest,
- public testing::WithParamInterface<
- std::tuple<std::string, CompactionFilter::Decision,
- CompactionFilter::Decision>> {
- public:
- explicit DBBlobBadCompactionFilterTest()
- : compaction_filter_guard_(new BadBlobCompactionFilter(
- std::get<0>(GetParam()), std::get<1>(GetParam()),
- std::get<2>(GetParam()))) {}
- protected:
- std::unique_ptr<CompactionFilter> compaction_filter_guard_;
- };
- INSTANTIATE_TEST_CASE_P(
- BadCompactionFilter, DBBlobBadCompactionFilterTest,
- testing::Combine(
- testing::Values("a"),
- testing::Values(CompactionFilter::Decision::kChangeBlobIndex,
- CompactionFilter::Decision::kIOError),
- testing::Values(CompactionFilter::Decision::kUndetermined,
- CompactionFilter::Decision::kChangeBlobIndex,
- CompactionFilter::Decision::kIOError)));
- TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.create_if_missing = true;
- constexpr size_t kKeyLength = 2;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new FilterByKeyLength(kKeyLength));
- options.compaction_filter = compaction_filter_guard.get();
- constexpr char short_key[] = "a";
- constexpr char long_key[] = "abc";
- constexpr char blob_value[] = "value";
- DestroyAndReopen(options);
- ASSERT_OK(Put(short_key, blob_value));
- ASSERT_OK(Put(long_key, blob_value));
- ASSERT_OK(Flush());
- CompactRangeOptions cro;
- ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- std::string value;
- ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound());
- value.clear();
- ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
- ASSERT_EQ("value", value);
- const auto& compaction_stats = GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- // Filter decides between kKeep and kRemove solely based on key;
- // this involves neither reading nor writing blobs
- ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
- ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, FilterByValueLength) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 5;
- options.create_if_missing = true;
- constexpr size_t kValueLength = 5;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new FilterByValueLength(kValueLength));
- options.compaction_filter = compaction_filter_guard.get();
- const std::vector<std::string> short_value_keys = {"a", "e", "j"};
- constexpr char short_value[] = "val";
- const std::vector<std::string> long_value_keys = {"b", "f", "k"};
- constexpr char long_value[] = "valuevalue";
- DestroyAndReopen(options);
- for (size_t i = 0; i < short_value_keys.size(); ++i) {
- ASSERT_OK(Put(short_value_keys[i], short_value));
- }
- for (size_t i = 0; i < short_value_keys.size(); ++i) {
- ASSERT_OK(Put(long_value_keys[i], long_value));
- }
- ASSERT_OK(Flush());
- CompactRangeOptions cro;
- ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- std::string value;
- for (size_t i = 0; i < short_value_keys.size(); ++i) {
- ASSERT_TRUE(
- db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound());
- value.clear();
- }
- for (size_t i = 0; i < long_value_keys.size(); ++i) {
- ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value));
- ASSERT_EQ(long_value, value);
- }
- const auto& compaction_stats = GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- // Filter decides between kKeep and kRemove based on value;
- // this involves reading but not writing blobs
- ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
- ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 1000;
- options.blob_file_starting_level = 5;
- options.create_if_missing = true;
- // Open DB with fixed-prefix sst-partitioner so that compaction will cut
- // new table file when encountering a new key whose 1-byte prefix changes.
- constexpr size_t key_len = 1;
- options.sst_partitioner_factory =
- NewSstPartitionerFixedPrefixFactory(key_len);
- ASSERT_OK(TryReopen(options));
- constexpr size_t blob_size = 3000;
- constexpr char first_key[] = "a";
- const std::string first_blob(blob_size, 'a');
- ASSERT_OK(Put(first_key, first_blob));
- constexpr char second_key[] = "b";
- const std::string second_blob(2 * blob_size, 'b');
- ASSERT_OK(Put(second_key, second_blob));
- constexpr char third_key[] = "d";
- const std::string third_blob(blob_size, 'd');
- ASSERT_OK(Put(third_key, third_blob));
- ASSERT_OK(Flush());
- constexpr char fourth_key[] = "c";
- const std::string fourth_blob(blob_size, 'c');
- ASSERT_OK(Put(fourth_key, fourth_blob));
- ASSERT_OK(Flush());
- ASSERT_EQ(0, GetBlobFileNumbers().size());
- ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
- ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- // No blob file should be created since blob_file_starting_level is 5.
- ASSERT_EQ(0, GetBlobFileNumbers().size());
- ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
- ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
- {
- options.blob_file_starting_level = 1;
- DestroyAndReopen(options);
- ASSERT_OK(Put(first_key, first_blob));
- ASSERT_OK(Put(second_key, second_blob));
- ASSERT_OK(Put(third_key, third_blob));
- ASSERT_OK(Flush());
- ASSERT_OK(Put(fourth_key, fourth_blob));
- ASSERT_OK(Flush());
- ASSERT_EQ(0, GetBlobFileNumbers().size());
- ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
- ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- // The compaction's output level equals to blob_file_starting_level.
- ASSERT_EQ(1, GetBlobFileNumbers().size());
- ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
- ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
- }
- Close();
- }
- TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.create_if_missing = true;
- constexpr char new_blob_value[] = "new_blob_value";
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new ValueBlindWriteFilter(new_blob_value));
- options.compaction_filter = compaction_filter_guard.get();
- DestroyAndReopen(options);
- const std::vector<std::string> keys = {"a", "b", "c"};
- const std::vector<std::string> values = {"a_value", "b_value", "c_value"};
- assert(keys.size() == values.size());
- for (size_t i = 0; i < keys.size(); ++i) {
- ASSERT_OK(Put(keys[i], values[i]));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- for (const auto& key : keys) {
- ASSERT_EQ(new_blob_value, Get(key));
- }
- const auto& compaction_stats = GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- // Filter unconditionally changes value in FilterBlobByKey;
- // this involves writing but not reading blobs
- ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
- ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, SkipUntilFilter) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new SkipUntilFilter("z"));
- options.compaction_filter = compaction_filter_guard.get();
- Reopen(options);
- const std::vector<std::string> keys{"a", "b", "c"};
- const std::vector<std::string> values{"a_value", "b_value", "c_value"};
- assert(keys.size() == values.size());
- for (size_t i = 0; i < keys.size(); ++i) {
- ASSERT_OK(Put(keys[i], values[i]));
- }
- ASSERT_OK(Flush());
- int process_in_flow_called = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
- [&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
- /* end */ nullptr));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- for (const auto& key : keys) {
- ASSERT_EQ(Get(key), "NOT_FOUND");
- }
- // Make sure SkipUntil was performed using iteration rather than Seek
- ASSERT_EQ(process_in_flow_called, keys.size());
- Close();
- }
- TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.create_if_missing = true;
- options.compaction_filter = compaction_filter_guard_.get();
- DestroyAndReopen(options);
- ASSERT_OK(Put("b", "value"));
- ASSERT_OK(Flush());
- ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr)
- .IsNotSupported());
- Close();
- DestroyAndReopen(options);
- std::string key(std::get<0>(GetParam()));
- ASSERT_OK(Put(key, "value"));
- ASSERT_OK(Flush());
- ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr)
- .IsNotSupported());
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) {
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new ValueMutationFilter(""));
- options.compaction_filter = compaction_filter_guard.get();
- DestroyAndReopen(options);
- constexpr char key[] = "key";
- constexpr char blob[] = "blob";
- // Fake an inlined TTL blob index.
- std::string blob_index;
- constexpr uint64_t expiration = 1234567890;
- BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
- WriteBatch batch;
- ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
- ASSERT_OK(db_->Write(WriteOptions(), &batch));
- ASSERT_OK(Flush());
- ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr)
- .IsCorruption());
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionFilter) {
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- constexpr char padding[] = "_delta";
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new ValueMutationFilter(padding));
- options.compaction_filter = compaction_filter_guard.get();
- DestroyAndReopen(options);
- const std::vector<std::pair<std::string, std::string>> kvs = {
- {"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
- for (const auto& kv : kvs) {
- ASSERT_OK(Put(kv.first, kv.second));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- for (const auto& kv : kvs) {
- ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
- }
- const auto& compaction_stats = GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- // Filter changes the value using the previous value in FilterV2;
- // this involves reading and writing blobs
- ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
- ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new ValueMutationFilter(""));
- options.compaction_filter = compaction_filter_guard.get();
- DestroyAndReopen(options);
- constexpr char key[] = "key";
- constexpr char blob[] = "blob";
- ASSERT_OK(Put(key, blob));
- ASSERT_OK(Flush());
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
- [](void* arg) {
- Slice* const blob_index = static_cast<Slice*>(arg);
- assert(blob_index);
- assert(!blob_index->empty());
- blob_index->remove_prefix(1);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr)
- .IsCorruption());
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new AlwaysKeepFilter());
- options.compaction_filter = compaction_filter_guard.get();
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "foo_value"));
- ASSERT_OK(Flush());
- std::vector<uint64_t> blob_files = GetBlobFileNumbers();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- ASSERT_EQ(blob_files, GetBlobFileNumbers());
- const auto& compaction_stats = GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- // Filter decides to keep the existing value in FilterV2;
- // this involves reading but not writing blobs
- ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
- ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, TrackGarbage) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- Reopen(options);
- // First table+blob file pair: 4 blobs with different keys
- constexpr char first_key[] = "first_key";
- constexpr char first_value[] = "first_value";
- constexpr char second_key[] = "second_key";
- constexpr char second_value[] = "second_value";
- constexpr char third_key[] = "third_key";
- constexpr char third_value[] = "third_value";
- constexpr char fourth_key[] = "fourth_key";
- constexpr char fourth_value[] = "fourth_value";
- ASSERT_OK(Put(first_key, first_value));
- ASSERT_OK(Put(second_key, second_value));
- ASSERT_OK(Put(third_key, third_value));
- ASSERT_OK(Put(fourth_key, fourth_value));
- ASSERT_OK(Flush());
- // Second table+blob file pair: overwrite 2 existing keys
- constexpr char new_first_value[] = "new_first_value";
- constexpr char new_second_value[] = "new_second_value";
- ASSERT_OK(Put(first_key, new_first_value));
- ASSERT_OK(Put(second_key, new_second_value));
- ASSERT_OK(Flush());
- // Compact them together. The first blob file should have 2 garbage blobs
- // corresponding to the 2 overwritten keys.
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
- VersionSet* const versions = dbfull()->GetVersionSet();
- assert(versions);
- assert(versions->GetColumnFamilySet());
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- assert(cfd);
- Version* const current = cfd->current();
- assert(current);
- const VersionStorageInfo* const storage_info = current->storage_info();
- assert(storage_info);
- const auto& blob_files = storage_info->GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 2);
- {
- const auto& meta = blob_files.front();
- assert(meta);
- constexpr uint64_t first_expected_bytes =
- sizeof(first_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
- 1);
- constexpr uint64_t second_expected_bytes =
- sizeof(second_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
- 1);
- constexpr uint64_t third_expected_bytes =
- sizeof(third_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) -
- 1);
- constexpr uint64_t fourth_expected_bytes =
- sizeof(fourth_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) -
- 1);
- ASSERT_EQ(meta->GetTotalBlobCount(), 4);
- ASSERT_EQ(meta->GetTotalBlobBytes(),
- first_expected_bytes + second_expected_bytes +
- third_expected_bytes + fourth_expected_bytes);
- ASSERT_EQ(meta->GetGarbageBlobCount(), 2);
- ASSERT_EQ(meta->GetGarbageBlobBytes(),
- first_expected_bytes + second_expected_bytes);
- }
- {
- const auto& meta = blob_files.back();
- assert(meta);
- constexpr uint64_t new_first_expected_bytes =
- sizeof(new_first_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
- 1);
- constexpr uint64_t new_second_expected_bytes =
- sizeof(new_second_value) - 1 +
- BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
- 1);
- ASSERT_EQ(meta->GetTotalBlobCount(), 2);
- ASSERT_EQ(meta->GetTotalBlobBytes(),
- new_first_expected_bytes + new_second_expected_bytes);
- ASSERT_EQ(meta->GetGarbageBlobCount(), 0);
- ASSERT_EQ(meta->GetGarbageBlobBytes(), 0);
- }
- }
- TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- options.disable_auto_compactions = true;
- Reopen(options);
- ASSERT_OK(Put("Key1", "v1_1"));
- ASSERT_OK(Put("Key2", "v2_1"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("Key1", "v1_2"));
- ASSERT_OK(Merge("Key2", "v2_2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("Key1", "v1_3"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
- ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.enable_blob_garbage_collection = true;
- options.blob_garbage_collection_age_cutoff = 1.0;
- options.blob_compaction_readahead_size = 1 << 10;
- options.disable_auto_compactions = true;
- Reopen(options);
- ASSERT_OK(Put("key", "lime"));
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("key", "pie"));
- ASSERT_OK(Put("foo", "baz"));
- ASSERT_OK(Flush());
- size_t num_non_prefetch_reads = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "BlobFileReader::GetBlob:ReadFromFile",
- [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
- SyncPoint::GetInstance()->EnableProcessing();
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- ASSERT_EQ(Get("key"), "pie");
- ASSERT_EQ(Get("foo"), "baz");
- ASSERT_EQ(num_non_prefetch_reads, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) {
- Options options = GetDefaultOptions();
- std::unique_ptr<CompactionFilter> compaction_filter_guard(
- new ValueMutationFilter("pie"));
- options.compaction_filter = compaction_filter_guard.get();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.blob_compaction_readahead_size = 1 << 10;
- options.disable_auto_compactions = true;
- Reopen(options);
- ASSERT_OK(Put("key", "lime"));
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- size_t num_non_prefetch_reads = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "BlobFileReader::GetBlob:ReadFromFile",
- [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
- SyncPoint::GetInstance()->EnableProcessing();
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- ASSERT_EQ(Get("key"), "limepie");
- ASSERT_EQ(Get("foo"), "barpie");
- ASSERT_EQ(num_non_prefetch_reads, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.blob_compaction_readahead_size = 1 << 10;
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- options.disable_auto_compactions = true;
- Reopen(options);
- ASSERT_OK(Put("key", "lime"));
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- ASSERT_OK(Merge("key", "pie"));
- ASSERT_OK(Merge("foo", "baz"));
- ASSERT_OK(Flush());
- size_t num_non_prefetch_reads = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "BlobFileReader::GetBlob:ReadFromFile",
- [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
- SyncPoint::GetInstance()->EnableProcessing();
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- ASSERT_EQ(Get("key"), "lime,pie");
- ASSERT_EQ(Get("foo"), "bar,baz");
- ASSERT_EQ(num_non_prefetch_reads, 0);
- Close();
- }
- TEST_F(DBBlobCompactionTest, CompactionDoNotFillCache) {
- Options options = GetDefaultOptions();
- options.enable_blob_files = true;
- options.min_blob_size = 0;
- options.enable_blob_garbage_collection = true;
- options.blob_garbage_collection_age_cutoff = 1.0;
- options.disable_auto_compactions = true;
- options.statistics = CreateDBStatistics();
- LRUCacheOptions cache_options;
- cache_options.capacity = 1 << 20;
- cache_options.metadata_charge_policy = kDontChargeCacheMetadata;
- options.blob_cache = NewLRUCache(cache_options);
- Reopen(options);
- ASSERT_OK(Put("key", "lime"));
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("key", "pie"));
- ASSERT_OK(Put("foo", "baz"));
- ASSERT_OK(Flush());
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
- ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
- Close();
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- RegisterCustomObjects(argc, argv);
- return RUN_ALL_TESTS();
- }
|