| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #include <algorithm>
- #include <chrono>
- #include <cstdlib>
- #include <iomanip>
- #include <map>
- #include <memory>
- #include <sstream>
- #include <string>
- #include <vector>
- #include "db/blob_index.h"
- #include "db/db_test_util.h"
- #include "env/composite_env_wrapper.h"
- #include "file/file_util.h"
- #include "file/sst_file_manager_impl.h"
- #include "port/port.h"
- #include "rocksdb/utilities/debug.h"
- #include "test_util/fault_injection_test_env.h"
- #include "test_util/sync_point.h"
- #include "test_util/testharness.h"
- #include "util/cast_util.h"
- #include "util/random.h"
- #include "util/string_util.h"
- #include "utilities/blob_db/blob_db.h"
- #include "utilities/blob_db/blob_db_impl.h"
- namespace ROCKSDB_NAMESPACE {
- namespace blob_db {
- class BlobDBTest : public testing::Test {
- public:
- const int kMaxBlobSize = 1 << 14;
- struct BlobIndexVersion {
- BlobIndexVersion() = default;
- BlobIndexVersion(std::string _user_key, uint64_t _file_number,
- uint64_t _expiration, SequenceNumber _sequence,
- ValueType _type)
- : user_key(std::move(_user_key)),
- file_number(_file_number),
- expiration(_expiration),
- sequence(_sequence),
- type(_type) {}
- std::string user_key;
- uint64_t file_number = kInvalidBlobFileNumber;
- uint64_t expiration = kNoExpiration;
- SequenceNumber sequence = 0;
- ValueType type = kTypeValue;
- };
- BlobDBTest()
- : dbname_(test::PerThreadDBPath("blob_db_test")),
- mock_env_(new MockTimeEnv(Env::Default())),
- fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
- blob_db_(nullptr) {
- Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
- assert(s.ok());
- }
- ~BlobDBTest() override {
- SyncPoint::GetInstance()->ClearAllCallBacks();
- Destroy();
- }
- Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- options.create_if_missing = true;
- return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
- }
- void Open(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- ASSERT_OK(TryOpen(bdb_options, options));
- }
- void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- assert(blob_db_ != nullptr);
- delete blob_db_;
- blob_db_ = nullptr;
- Open(bdb_options, options);
- }
- void Close() {
- assert(blob_db_ != nullptr);
- delete blob_db_;
- blob_db_ = nullptr;
- }
- void Destroy() {
- if (blob_db_) {
- Options options = blob_db_->GetOptions();
- BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
- delete blob_db_;
- blob_db_ = nullptr;
- ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
- }
- }
- BlobDBImpl *blob_db_impl() {
- return reinterpret_cast<BlobDBImpl *>(blob_db_);
- }
- Status Put(const Slice &key, const Slice &value,
- std::map<std::string, std::string> *data = nullptr) {
- Status s = blob_db_->Put(WriteOptions(), key, value);
- if (data != nullptr) {
- (*data)[key.ToString()] = value.ToString();
- }
- return s;
- }
- void Delete(const std::string &key,
- std::map<std::string, std::string> *data = nullptr) {
- ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
- if (data != nullptr) {
- data->erase(key);
- }
- }
- Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
- std::map<std::string, std::string> *data = nullptr) {
- Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
- if (data != nullptr) {
- (*data)[key.ToString()] = value.ToString();
- }
- return s;
- }
- Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
- return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
- }
- void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = test::RandomHumanReadableString(rnd, len);
- ASSERT_OK(
- blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = test::RandomHumanReadableString(rnd, len);
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
- expiration));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandom(const std::string &key, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- PutRandom(blob_db_, key, rnd, data);
- }
- void PutRandom(DB *db, const std::string &key, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = test::RandomHumanReadableString(rnd, len);
- ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandomToWriteBatch(
- const std::string &key, Random *rnd, WriteBatch *batch,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = test::RandomHumanReadableString(rnd, len);
- ASSERT_OK(batch->Put(key, value));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- // Verify blob db contain expected data and nothing more.
- void VerifyDB(const std::map<std::string, std::string> &data) {
- VerifyDB(blob_db_, data);
- }
- void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
- // Verify normal Get
- auto* cfh = db->DefaultColumnFamily();
- for (auto &p : data) {
- PinnableSlice value_slice;
- ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
- ASSERT_EQ(p.second, value_slice.ToString());
- std::string value;
- ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
- ASSERT_EQ(p.second, value);
- }
- // Verify iterators
- Iterator *iter = db->NewIterator(ReadOptions());
- iter->SeekToFirst();
- for (auto &p : data) {
- ASSERT_TRUE(iter->Valid());
- ASSERT_EQ(p.first, iter->key().ToString());
- ASSERT_EQ(p.second, iter->value().ToString());
- iter->Next();
- }
- ASSERT_FALSE(iter->Valid());
- ASSERT_OK(iter->status());
- delete iter;
- }
- void VerifyBaseDB(
- const std::map<std::string, KeyVersion> &expected_versions) {
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- DB *db = blob_db_->GetRootDB();
- const size_t kMaxKeys = 10000;
- std::vector<KeyVersion> versions;
- GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
- ASSERT_EQ(expected_versions.size(), versions.size());
- size_t i = 0;
- for (auto &key_version : expected_versions) {
- const KeyVersion &expected_version = key_version.second;
- ASSERT_EQ(expected_version.user_key, versions[i].user_key);
- ASSERT_EQ(expected_version.sequence, versions[i].sequence);
- ASSERT_EQ(expected_version.type, versions[i].type);
- if (versions[i].type == kTypeValue) {
- ASSERT_EQ(expected_version.value, versions[i].value);
- } else {
- ASSERT_EQ(kTypeBlobIndex, versions[i].type);
- PinnableSlice value;
- ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
- versions[i].value, &value));
- ASSERT_EQ(expected_version.value, value.ToString());
- }
- i++;
- }
- }
- void VerifyBaseDBBlobIndex(
- const std::map<std::string, BlobIndexVersion> &expected_versions) {
- const size_t kMaxKeys = 10000;
- std::vector<KeyVersion> versions;
- ASSERT_OK(
- GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
- ASSERT_EQ(versions.size(), expected_versions.size());
- size_t i = 0;
- for (const auto &expected_pair : expected_versions) {
- const BlobIndexVersion &expected_version = expected_pair.second;
- ASSERT_EQ(versions[i].user_key, expected_version.user_key);
- ASSERT_EQ(versions[i].sequence, expected_version.sequence);
- ASSERT_EQ(versions[i].type, expected_version.type);
- if (versions[i].type != kTypeBlobIndex) {
- ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
- ASSERT_EQ(kNoExpiration, expected_version.expiration);
- ++i;
- continue;
- }
- BlobIndex blob_index;
- ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
- const uint64_t file_number = !blob_index.IsInlined()
- ? blob_index.file_number()
- : kInvalidBlobFileNumber;
- ASSERT_EQ(file_number, expected_version.file_number);
- const uint64_t expiration =
- blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
- ASSERT_EQ(expiration, expected_version.expiration);
- ++i;
- }
- }
- void InsertBlobs() {
- WriteOptions wo;
- std::string value;
- Random rnd(301);
- for (size_t i = 0; i < 100000; i++) {
- uint64_t ttl = rnd.Next() % 86400;
- PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
- }
- for (size_t i = 0; i < 10; i++) {
- Delete("key" + ToString(i % 500));
- }
- }
- const std::string dbname_;
- std::unique_ptr<MockTimeEnv> mock_env_;
- std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
- BlobDB *blob_db_;
- }; // class BlobDBTest
- TEST_F(BlobDBTest, Put) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, PutWithTTL) {
- Random rnd(301);
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 1000;
- bdb_options.min_blob_size = 0;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- mock_env_->set_current_time(50);
- for (size_t i = 0; i < 100; i++) {
- uint64_t ttl = rnd.Next() % 100;
- PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
- (ttl <= 50 ? nullptr : &data));
- }
- mock_env_->set_current_time(100);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_TRUE(blob_files[0]->HasTTL());
- ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, PutUntil) {
- Random rnd(301);
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 1000;
- bdb_options.min_blob_size = 0;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- mock_env_->set_current_time(50);
- for (size_t i = 0; i < 100; i++) {
- uint64_t expiration = rnd.Next() % 100 + 50;
- PutRandomUntil("key" + ToString(i), expiration, &rnd,
- (expiration <= 100 ? nullptr : &data));
- }
- mock_env_->set_current_time(100);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_TRUE(blob_files[0]->HasTTL());
- ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, StackableDBGet) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- for (size_t i = 0; i < 100; i++) {
- StackableDB *db = blob_db_;
- ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
- std::string key = "key" + ToString(i);
- PinnableSlice pinnable_value;
- ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
- std::string string_value;
- ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
- ASSERT_EQ(string_value, pinnable_value.ToString());
- ASSERT_EQ(string_value, data[key]);
- }
- }
- TEST_F(BlobDBTest, GetExpiration) {
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.disable_background_tasks = true;
- mock_env_->set_current_time(100);
- Open(bdb_options, options);
- Put("key1", "value1");
- PutWithTTL("key2", "value2", 200);
- PinnableSlice value;
- uint64_t expiration;
- ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
- ASSERT_EQ("value1", value.ToString());
- ASSERT_EQ(kNoExpiration, expiration);
- ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
- ASSERT_EQ("value2", value.ToString());
- ASSERT_EQ(300 /* = 100 + 200 */, expiration);
- }
- TEST_F(BlobDBTest, GetIOError) {
- Options options;
- options.env = fault_injection_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0; // Make sure value write to blob file
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
- PinnableSlice value;
- ASSERT_OK(Put("foo", "bar"));
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
- ASSERT_TRUE(s.IsIOError());
- // Reactivate file system to allow test to close DB.
- fault_injection_env_->SetFilesystemActive(true);
- }
- TEST_F(BlobDBTest, PutIOError) {
- Options options;
- options.env = fault_injection_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0; // Make sure value write to blob file
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- ASSERT_TRUE(Put("foo", "v1").IsIOError());
- fault_injection_env_->SetFilesystemActive(true, Status::IOError());
- ASSERT_OK(Put("bar", "v1"));
- }
- TEST_F(BlobDBTest, WriteBatch) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- WriteBatch batch;
- for (size_t j = 0; j < 10; j++) {
- PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
- }
- blob_db_->Write(WriteOptions(), &batch);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, Delete) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- for (size_t i = 0; i < 100; i += 5) {
- Delete("key" + ToString(i), &data);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, DeleteBatch) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + ToString(i), &rnd);
- }
- WriteBatch batch;
- for (size_t i = 0; i < 100; i++) {
- batch.Delete("key" + ToString(i));
- }
- ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
- // DB should be empty.
- VerifyDB({});
- }
- TEST_F(BlobDBTest, Override) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (int i = 0; i < 10000; i++) {
- PutRandom("key" + ToString(i), &rnd, nullptr);
- }
- // override all the keys
- for (int i = 0; i < 10000; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- VerifyDB(data);
- }
- #ifdef SNAPPY
- TEST_F(BlobDBTest, Compression) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = CompressionType::kSnappyCompression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("put-key" + ToString(i), &rnd, &data);
- }
- for (int i = 0; i < 100; i++) {
- WriteBatch batch;
- for (size_t j = 0; j < 10; j++) {
- PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
- &batch, &data);
- }
- blob_db_->Write(WriteOptions(), &batch);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, DecompressAfterReopen) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = CompressionType::kSnappyCompression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("put-key" + ToString(i), &rnd, &data);
- }
- VerifyDB(data);
- bdb_options.compression = CompressionType::kNoCompression;
- Reopen(bdb_options);
- VerifyDB(data);
- }
- #endif
- TEST_F(BlobDBTest, MultipleWriters) {
- Open(BlobDBOptions());
- std::vector<port::Thread> workers;
- std::vector<std::map<std::string, std::string>> data_set(10);
- for (uint32_t i = 0; i < 10; i++)
- workers.push_back(port::Thread(
- [&](uint32_t id) {
- Random rnd(301 + id);
- for (int j = 0; j < 100; j++) {
- std::string key = "key" + ToString(id) + "_" + ToString(j);
- if (id < 5) {
- PutRandom(key, &rnd, &data_set[id]);
- } else {
- WriteBatch batch;
- PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
- blob_db_->Write(WriteOptions(), &batch);
- }
- }
- },
- i));
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 10; i++) {
- workers[i].join();
- data.insert(data_set[i].begin(), data_set[i].end());
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, SstFileManager) {
- // run the same test for Get(), MultiGet() and Iterator each.
- std::shared_ptr<SstFileManager> sst_file_manager(
- NewSstFileManager(mock_env_.get()));
- sst_file_manager->SetDeleteRateBytesPerSecond(1);
- SstFileManagerImpl *sfm =
- static_cast<SstFileManagerImpl *>(sst_file_manager.get());
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- Options db_options;
- int files_scheduled_to_delete = 0;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
- assert(arg);
- const std::string *const file_path =
- static_cast<const std::string *>(arg);
- if (file_path->find(".blob") != std::string::npos) {
- ++files_scheduled_to_delete;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- db_options.sst_file_manager = sst_file_manager;
- Open(bdb_options, db_options);
- // Create one obselete file and clean it.
- blob_db_->Put(WriteOptions(), "foo", "bar");
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- std::shared_ptr<BlobFile> bfile = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- // Even if SSTFileManager is not set, DB is creating a dummy one.
- ASSERT_EQ(1, files_scheduled_to_delete);
- Destroy();
- // Make sure that DestroyBlobDB() also goes through delete scheduler.
- ASSERT_EQ(2, files_scheduled_to_delete);
- SyncPoint::GetInstance()->DisableProcessing();
- sfm->WaitForEmptyTrash();
- }
- TEST_F(BlobDBTest, SstFileManagerRestart) {
- int files_scheduled_to_delete = 0;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
- assert(arg);
- const std::string *const file_path =
- static_cast<const std::string *>(arg);
- if (file_path->find(".blob") != std::string::npos) {
- ++files_scheduled_to_delete;
- }
- });
- // run the same test for Get(), MultiGet() and Iterator each.
- std::shared_ptr<SstFileManager> sst_file_manager(
- NewSstFileManager(mock_env_.get()));
- sst_file_manager->SetDeleteRateBytesPerSecond(1);
- SstFileManagerImpl *sfm =
- static_cast<SstFileManagerImpl *>(sst_file_manager.get());
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- Options db_options;
- SyncPoint::GetInstance()->EnableProcessing();
- db_options.sst_file_manager = sst_file_manager;
- Open(bdb_options, db_options);
- std::string blob_dir = blob_db_impl()->TEST_blob_dir();
- blob_db_->Put(WriteOptions(), "foo", "bar");
- Close();
- // Create 3 dummy trash files under the blob_dir
- LegacyFileSystemWrapper fs(db_options.env);
- CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
- CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
- CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
- // Make sure that reopening the DB rescan the existing trash files
- Open(bdb_options, db_options);
- ASSERT_EQ(files_scheduled_to_delete, 2);
- sfm->WaitForEmptyTrash();
- // There should be exact one file under the blob dir now.
- std::vector<std::string> all_files;
- ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
- int nfiles = 0;
- for (const auto &f : all_files) {
- assert(!f.empty());
- if (f[0] == '.') {
- continue;
- }
- nfiles++;
- }
- ASSERT_EQ(nfiles, 1);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- // i = when to take snapshot
- for (int i = 0; i < 4; i++) {
- Destroy();
- Open(bdb_options);
- const Snapshot *snapshot = nullptr;
- // First file
- ASSERT_OK(Put("key1", "value"));
- if (i == 0) {
- snapshot = blob_db_->GetSnapshot();
- }
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- // Second file
- ASSERT_OK(Put("key2", "value"));
- if (i == 1) {
- snapshot = blob_db_->GetSnapshot();
- }
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- auto bfile = blob_files[1];
- ASSERT_FALSE(bfile->Immutable());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- // Third file
- ASSERT_OK(Put("key3", "value"));
- if (i == 2) {
- snapshot = blob_db_->GetSnapshot();
- }
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_TRUE(bfile->Obsolete());
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
- bfile->GetObsoleteSequence());
- Delete("key2");
- if (i == 3) {
- snapshot = blob_db_->GetSnapshot();
- }
- ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- if (i >= 2) {
- // The snapshot shouldn't see data in bfile
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- } else {
- // The snapshot will see data in bfile, so the file shouldn't be deleted
- ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- }
- }
- }
- TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
- Options options;
- options.env = mock_env_.get();
- mock_env_->set_current_time(0);
- Open(BlobDBOptions(), options);
- ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
- ColumnFamilyHandle *handle = nullptr;
- std::string value;
- std::vector<std::string> values;
- // The call simply pass through to base db. It should succeed.
- ASSERT_OK(
- blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
- ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
- ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
- .IsNotSupported());
- ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
- .IsNotSupported());
- WriteBatch batch;
- batch.Put("k1", "v1");
- batch.Put(handle, "k2", "v2");
- ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
- ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
- ASSERT_TRUE(
- blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
- auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
- {"k1", "k2"}, &values);
- ASSERT_EQ(2, statuses.size());
- ASSERT_TRUE(statuses[0].IsNotSupported());
- ASSERT_TRUE(statuses[1].IsNotSupported());
- ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
- delete handle;
- }
- TEST_F(BlobDBTest, GetLiveFilesMetaData) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.blob_dir = "blob_dir";
- bdb_options.path_relative = true;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- std::vector<LiveFileMetaData> metadata;
- blob_db_->GetLiveFilesMetaData(&metadata);
- ASSERT_EQ(1U, metadata.size());
- // Path should be relative to db_name, but begin with slash.
- std::string filename = "/blob_dir/000001.blob";
- ASSERT_EQ(filename, metadata[0].name);
- ASSERT_EQ(1, metadata[0].file_number);
- ASSERT_EQ("default", metadata[0].column_family_name);
- std::vector<std::string> livefile;
- uint64_t mfs;
- ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
- ASSERT_EQ(4U, livefile.size());
- ASSERT_EQ(filename, livefile[3]);
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
- constexpr size_t kNumKey = 20;
- constexpr size_t kNumIteration = 10;
- Random rnd(301);
- std::map<std::string, std::string> data;
- std::vector<bool> is_blob(kNumKey, false);
- // Write to plain rocksdb.
- Options options;
- options.create_if_missing = true;
- DB *db = nullptr;
- ASSERT_OK(DB::Open(options, dbname_, &db));
- for (size_t i = 0; i < kNumIteration; i++) {
- auto key_index = rnd.Next() % kNumKey;
- std::string key = "key" + ToString(key_index);
- PutRandom(db, key, &rnd, &data);
- }
- VerifyDB(db, data);
- delete db;
- db = nullptr;
- // Open as blob db. Verify it can read existing data.
- Open();
- VerifyDB(blob_db_, data);
- for (size_t i = 0; i < kNumIteration; i++) {
- auto key_index = rnd.Next() % kNumKey;
- std::string key = "key" + ToString(key_index);
- is_blob[key_index] = true;
- PutRandom(blob_db_, key, &rnd, &data);
- }
- VerifyDB(blob_db_, data);
- delete blob_db_;
- blob_db_ = nullptr;
- // Verify plain db return error for keys written by blob db.
- ASSERT_OK(DB::Open(options, dbname_, &db));
- std::string value;
- for (size_t i = 0; i < kNumKey; i++) {
- std::string key = "key" + ToString(i);
- Status s = db->Get(ReadOptions(), key, &value);
- if (data.count(key) == 0) {
- ASSERT_TRUE(s.IsNotFound());
- } else if (is_blob[i]) {
- ASSERT_TRUE(s.IsNotSupported());
- } else {
- ASSERT_OK(s);
- ASSERT_EQ(data[key], value);
- }
- }
- delete db;
- }
- // Test to verify that a NoSpace IOError Status is returned on reaching
- // max_db_size limit.
- TEST_F(BlobDBTest, OutOfSpace) {
- // Use mock env to stop wall clock.
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 200;
- bdb_options.is_fifo = false;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- // Each stored blob has an overhead of about 42 bytes currently.
- // So a small key + a 100 byte blob should take up ~150 bytes in the db.
- std::string value(100, 'v');
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
- // Putting another blob should fail as ading it would exceed the max_db_size
- // limit.
- Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_TRUE(s.IsNoSpace());
- }
- TEST_F(BlobDBTest, FIFOEviction) {
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 200;
- bdb_options.blob_file_size = 100;
- bdb_options.is_fifo = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::atomic<int> evict_count{0};
- SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictOldestBlobFile:Evicted",
- [&](void *) { evict_count++; });
- SyncPoint::GetInstance()->EnableProcessing();
- // Each stored blob has an overhead of 32 bytes currently.
- // So a 100 byte blob should take up 132 bytes.
- std::string value(100, 'v');
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
- VerifyDB({{"key1", value}});
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- // Adding another 100 bytes blob would take the total size to 264 bytes
- // (2*132). max_db_size will be exceeded
- // than max_db_size and trigger FIFO eviction.
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
- ASSERT_EQ(1, evict_count);
- // key1 will exist until corresponding file be deleted.
- VerifyDB({{"key1", value}, {"key2", value}});
- // Adding another 100 bytes blob without TTL.
- ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
- ASSERT_EQ(2, evict_count);
- // key1 and key2 will exist until corresponding file be deleted.
- VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
- // The fourth blob file, without TTL.
- ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
- ASSERT_EQ(3, evict_count);
- VerifyDB(
- {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(4, blob_files.size());
- ASSERT_TRUE(blob_files[0]->Obsolete());
- ASSERT_TRUE(blob_files[1]->Obsolete());
- ASSERT_TRUE(blob_files[2]->Obsolete());
- ASSERT_FALSE(blob_files[3]->Obsolete());
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(3, obsolete_files.size());
- ASSERT_EQ(blob_files[0], obsolete_files[0]);
- ASSERT_EQ(blob_files[1], obsolete_files[1]);
- ASSERT_EQ(blob_files[2], obsolete_files[2]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_TRUE(obsolete_files.empty());
- VerifyDB({{"key4", value}});
- }
- TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
- Options options;
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 1000;
- bdb_options.blob_file_size = 5000;
- bdb_options.is_fifo = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::atomic<int> evict_count{0};
- SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictOldestBlobFile:Evicted",
- [&](void *) { evict_count++; });
- SyncPoint::GetInstance()->EnableProcessing();
- std::string value(2000, 'v');
- ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
- ASSERT_EQ(0, evict_count);
- }
- TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
- BlobDBOptions bdb_options;
- bdb_options.is_fifo = true;
- bdb_options.min_blob_size = 100;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- options.env = mock_env_.get();
- options.disable_auto_compactions = true;
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- Open(bdb_options, options);
- ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
- std::string small_value(50, 'v');
- std::map<std::string, std::string> data;
- // Insert some data into LSM tree to make sure FIFO eviction take SST
- // file size into account.
- for (int i = 0; i < 1000; i++) {
- ASSERT_OK(Put("key" + ToString(i), small_value, &data));
- }
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- uint64_t live_sst_size = 0;
- ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
- &live_sst_size));
- ASSERT_TRUE(live_sst_size > 0);
- ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
- bdb_options.max_db_size = live_sst_size + 2000;
- Reopen(bdb_options, options);
- ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
- std::string value_1k(1000, 'v');
- ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB(data);
- // large_key2 evicts large_key1
- ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- data.erase("large_key1");
- VerifyDB(data);
- // large_key3 get no enough space even after evicting large_key2, so it
- // instead return no space error.
- std::string value_2k(2000, 'v');
- ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- // Verify large_key2 still exists.
- VerifyDB(data);
- }
- // Test flush or compaction will trigger FIFO eviction since they update
- // total SST file size.
- TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 1000;
- bdb_options.is_fifo = true;
- bdb_options.min_blob_size = 100;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- options.env = mock_env_.get();
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- options.compression = kNoCompression;
- Open(bdb_options, options);
- std::string value(800, 'v');
- ASSERT_OK(PutWithTTL("large_key", value, 60));
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB({{"large_key", value}});
- // Insert some small keys and flush to bring DB out of space.
- std::map<std::string, std::string> data;
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(Put("key" + ToString(i), "v", &data));
- }
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- // Verify large_key is deleted by FIFO eviction.
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, InlineSmallValues) {
- constexpr uint64_t kMaxExpiration = 1000;
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = kMaxExpiration;
- bdb_options.min_blob_size = 100;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- mock_env_->set_current_time(0);
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, KeyVersion> versions;
- for (size_t i = 0; i < 1000; i++) {
- bool is_small_value = rnd.Next() % 2;
- bool has_ttl = rnd.Next() % 2;
- uint64_t expiration = rnd.Next() % kMaxExpiration;
- int len = is_small_value ? 50 : 200;
- std::string key = "key" + ToString(i);
- std::string value = test::RandomHumanReadableString(&rnd, len);
- std::string blob_index;
- data[key] = value;
- SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- if (!has_ttl) {
- ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
- } else {
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
- }
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- versions[key] =
- KeyVersion(key, value, sequence,
- (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
- }
- VerifyDB(data);
- VerifyBaseDB(versions);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- std::shared_ptr<BlobFile> non_ttl_file;
- std::shared_ptr<BlobFile> ttl_file;
- if (blob_files[0]->HasTTL()) {
- ttl_file = blob_files[0];
- non_ttl_file = blob_files[1];
- } else {
- non_ttl_file = blob_files[0];
- ttl_file = blob_files[1];
- }
- ASSERT_FALSE(non_ttl_file->HasTTL());
- ASSERT_TRUE(ttl_file->HasTTL());
- }
- TEST_F(BlobDBTest, CompactionFilterNotSupported) {
- class TestCompactionFilter : public CompactionFilter {
- const char *Name() const override { return "TestCompactionFilter"; }
- };
- class TestCompactionFilterFactory : public CompactionFilterFactory {
- const char *Name() const override { return "TestCompactionFilterFactory"; }
- std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context & /*context*/) override {
- return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
- }
- };
- for (int i = 0; i < 2; i++) {
- Options options;
- if (i == 0) {
- options.compaction_filter = new TestCompactionFilter();
- } else {
- options.compaction_filter_factory.reset(
- new TestCompactionFilterFactory());
- }
- ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
- delete options.compaction_filter;
- }
- }
- // Test comapction filter should remove any expired blob index.
- TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
- constexpr size_t kNumKeys = 100;
- constexpr size_t kNumPuts = 1000;
- constexpr uint64_t kMaxExpiration = 1000;
- constexpr uint64_t kCompactTime = 500;
- constexpr uint64_t kMinBlobSize = 100;
- Random rnd(301);
- mock_env_->set_current_time(0);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = kMinBlobSize;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, std::string> data_after_compact;
- for (size_t i = 0; i < kNumPuts; i++) {
- bool is_small_value = rnd.Next() % 2;
- bool has_ttl = rnd.Next() % 2;
- uint64_t expiration = rnd.Next() % kMaxExpiration;
- int len = is_small_value ? 10 : 200;
- std::string key = "key" + ToString(rnd.Next() % kNumKeys);
- std::string value = test::RandomHumanReadableString(&rnd, len);
- if (!has_ttl) {
- if (is_small_value) {
- std::string blob_entry;
- BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
- // Fake blob index with TTL. See what it will do.
- ASSERT_GT(kMinBlobSize, blob_entry.size());
- value = blob_entry;
- }
- ASSERT_OK(Put(key, value));
- data_after_compact[key] = value;
- } else {
- ASSERT_OK(PutUntil(key, value, expiration));
- if (expiration <= kCompactTime) {
- data_after_compact.erase(key);
- } else {
- data_after_compact[key] = value;
- }
- }
- data[key] = value;
- }
- VerifyDB(data);
- mock_env_->set_current_time(kCompactTime);
- // Take a snapshot before compaction. Make sure expired blob indexes is
- // filtered regardless of snapshot.
- const Snapshot *snapshot = blob_db_->GetSnapshot();
- // Issue manual compaction to trigger compaction filter.
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_->ReleaseSnapshot(snapshot);
- // Verify expired blob index are filtered.
- std::vector<KeyVersion> versions;
- const size_t kMaxKeys = 10000;
- GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
- ASSERT_EQ(data_after_compact.size(), versions.size());
- for (auto &version : versions) {
- ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
- }
- VerifyDB(data_after_compact);
- }
- // Test compaction filter should remove any blob index where corresponding
- // blob file has been removed.
- TEST_F(BlobDBTest, FilterFileNotAvailable) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.disable_auto_compactions = true;
- Open(bdb_options, options);
- ASSERT_OK(Put("foo", "v1"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- ASSERT_OK(Put("bar", "v2"));
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
- const size_t kMaxKeys = 10000;
- DB *base_db = blob_db_->GetRootDB();
- std::vector<KeyVersion> versions;
- ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
- ASSERT_EQ(2, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- ASSERT_EQ("foo", versions[1].user_key);
- VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
- ASSERT_EQ(2, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- ASSERT_EQ("foo", versions[1].user_key);
- VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
- // Remove the first blob file and compact. foo should be remove from base db.
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
- ASSERT_EQ(1, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- VerifyDB({{"bar", "v2"}});
- // Remove the second blob file and compact. bar should be remove from base db.
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
- ASSERT_EQ(0, versions.size());
- VerifyDB({});
- }
- // Test compaction filter should filter any inlined TTL keys that would have
- // been dropped by last FIFO eviction if they are store out-of-line.
- TEST_F(BlobDBTest, FilterForFIFOEviction) {
- Random rnd(215);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 100;
- bdb_options.ttl_range_secs = 60;
- bdb_options.max_db_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- mock_env_->set_current_time(0);
- options.env = mock_env_.get();
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- options.disable_auto_compactions = true;
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, std::string> data_after_compact;
- // Insert some small values that will be inlined.
- for (int i = 0; i < 1000; i++) {
- std::string key = "key" + ToString(i);
- std::string value = test::RandomHumanReadableString(&rnd, 50);
- uint64_t ttl = rnd.Next() % 120 + 1;
- ASSERT_OK(PutWithTTL(key, value, ttl, &data));
- if (ttl >= 60) {
- data_after_compact[key] = value;
- }
- }
- uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
- ASSERT_GT(live_sst_size, 0);
- VerifyDB(data);
- bdb_options.max_db_size = live_sst_size + 30000;
- bdb_options.is_fifo = true;
- Reopen(bdb_options, options);
- VerifyDB(data);
- // Put two large values, each on a different blob file.
- std::string large_value(10000, 'v');
- ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
- ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- data["large_key1"] = large_value;
- data["large_key2"] = large_value;
- VerifyDB(data);
- // Put a third large value which will bring the DB out of space.
- // FIFO eviction will evict the file of large_key1.
- ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- data.erase("large_key1");
- data["large_key3"] = large_value;
- VerifyDB(data);
- // Putting some more small values. These values shouldn't be evicted by
- // compaction filter since they are inserted after FIFO eviction.
- ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
- ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
- // FIFO eviction doesn't trigger again since there enough room for the flush.
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- // Manual compact and check if compaction filter evict those keys with
- // expiration < 60.
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // All keys with expiration < 60, plus large_key1 is filtered by
- // compaction filter.
- ASSERT_EQ(num_keys_to_evict + 1,
- statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- data_after_compact["large_key2"] = large_value;
- data_after_compact["large_key3"] = large_value;
- VerifyDB(data_after_compact);
- }
- TEST_F(BlobDBTest, GarbageCollection) {
- constexpr size_t kNumPuts = 1 << 10;
- constexpr uint64_t kExpiration = 1000;
- constexpr uint64_t kCompactTime = 500;
- constexpr uint64_t kKeySize = 7; // "key" + 4 digits
- constexpr uint64_t kSmallValueSize = 1 << 6;
- constexpr uint64_t kLargeValueSize = 1 << 8;
- constexpr uint64_t kMinBlobSize = 1 << 7;
- static_assert(kSmallValueSize < kMinBlobSize, "");
- static_assert(kLargeValueSize > kMinBlobSize, "");
- constexpr size_t kBlobsPerFile = 8;
- constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
- constexpr uint64_t kBlobFileSize =
- BlobLogHeader::kSize +
- (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = kMinBlobSize;
- bdb_options.blob_file_size = kBlobFileSize;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 0.25;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- options.statistics = CreateDBStatistics();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, KeyVersion> blob_value_versions;
- std::map<std::string, BlobIndexVersion> blob_index_versions;
- Random rnd(301);
- // Add a bunch of large non-TTL values. These will be written to non-TTL
- // blob files and will be subject to GC.
- for (size_t i = 0; i < kNumPuts; ++i) {
- std::ostringstream oss;
- oss << "key" << std::setw(4) << std::setfill('0') << i;
- const std::string key(oss.str());
- const std::string value(
- test::RandomHumanReadableString(&rnd, kLargeValueSize));
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] =
- BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
- sequence, kTypeBlobIndex);
- }
- // Add some small and/or TTL values that will be ignored during GC.
- // First, add a large TTL value will be written to its own TTL blob file.
- {
- const std::string key("key2000");
- const std::string value(
- test::RandomHumanReadableString(&rnd, kLargeValueSize));
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(PutUntil(key, value, kExpiration));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] =
- BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
- sequence, kTypeBlobIndex);
- }
- // Now add a small TTL value (which will be inlined).
- {
- const std::string key("key3000");
- const std::string value(
- test::RandomHumanReadableString(&rnd, kSmallValueSize));
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(PutUntil(key, value, kExpiration));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] = BlobIndexVersion(
- key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
- }
- // Finally, add a small non-TTL value (which will be stored as a regular
- // value).
- {
- const std::string key("key4000");
- const std::string value(
- test::RandomHumanReadableString(&rnd, kSmallValueSize));
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
- blob_index_versions[key] = BlobIndexVersion(
- key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
- }
- VerifyDB(data);
- VerifyBaseDB(blob_value_versions);
- VerifyBaseDBBlobIndex(blob_index_versions);
- // At this point, we should have 128 immutable non-TTL files with file numbers
- // 1..128.
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
- for (size_t i = 0; i < kNumBlobFiles; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- ASSERT_EQ(live_imm_files[i]->GetFileSize(),
- kBlobFileSize + BlobLogFooter::kSize);
- }
- }
- mock_env_->set_current_time(kCompactTime);
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // We expect the data to remain the same and the blobs from the oldest N files
- // to be moved to new files. Sequence numbers get zeroed out during the
- // compaction.
- VerifyDB(data);
- for (auto &pair : blob_value_versions) {
- KeyVersion &version = pair.second;
- version.sequence = 0;
- }
- VerifyBaseDB(blob_value_versions);
- const uint64_t cutoff = static_cast<uint64_t>(
- bdb_options.garbage_collection_cutoff * kNumBlobFiles);
- for (auto &pair : blob_index_versions) {
- BlobIndexVersion &version = pair.second;
- version.sequence = 0;
- if (version.file_number == kInvalidBlobFileNumber) {
- continue;
- }
- if (version.file_number > cutoff) {
- continue;
- }
- version.file_number += kNumBlobFiles + 1;
- }
- VerifyBaseDBBlobIndex(blob_index_versions);
- const Statistics *const statistics = options.statistics.get();
- assert(statistics);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
- cutoff * kBlobsPerFile);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
- cutoff * kBlobsPerFile * kLargeValueSize);
- // At this point, we should have 128 immutable non-TTL files with file numbers
- // 33..128 and 130..161. (129 was taken by the TTL blob file.)
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
- for (size_t i = 0; i < kNumBlobFiles; ++i) {
- uint64_t expected_file_number = i + cutoff + 1;
- if (expected_file_number > kNumBlobFiles) {
- ++expected_file_number;
- }
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
- ASSERT_EQ(live_imm_files[i]->GetFileSize(),
- kBlobFileSize + BlobLogFooter::kSize);
- }
- }
- }
- TEST_F(BlobDBTest, GarbageCollectionFailure) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- Options db_options;
- db_options.statistics = CreateDBStatistics();
- Open(bdb_options, db_options);
- // Write a couple of valid blobs.
- Put("foo", "bar");
- Put("dead", "beef");
- // Write a fake blob reference into the base DB that cannot be parsed.
- WriteBatch batch;
- ASSERT_OK(WriteBatchInternal::PutBlobIndex(
- &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
- "not a valid blob index"));
- ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
- auto blob_file = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
- ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
- .IsCorruption());
- const Statistics *const statistics = db_options.statistics.get();
- assert(statistics);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
- }
- // File should be evicted after expiration.
- TEST_F(BlobDBTest, EvictExpiredFile) {
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 100;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- Open(bdb_options, options);
- mock_env_->set_current_time(50);
- std::map<std::string, std::string> data;
- ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_FALSE(blob_file->Immutable());
- ASSERT_FALSE(blob_file->Obsolete());
- VerifyDB(data);
- mock_env_->set_current_time(250);
- // The key should expired now.
- blob_db_impl()->TEST_EvictExpiredFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- ASSERT_TRUE(blob_file->Immutable());
- ASSERT_TRUE(blob_file->Obsolete());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
- // Make sure we don't return garbage value after blob file being evicted,
- // but the blob index still exists in the LSM tree.
- std::string val = "";
- ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
- ASSERT_EQ("", val);
- }
- TEST_F(BlobDBTest, DisableFileDeletions) {
- BlobDBOptions bdb_options;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (bool force : {true, false}) {
- ASSERT_OK(Put("foo", "v", &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- // Call DisableFileDeletions twice.
- ASSERT_OK(blob_db_->DisableFileDeletions());
- ASSERT_OK(blob_db_->DisableFileDeletions());
- // File deletions should be disabled.
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB(data);
- // Enable file deletions once. If force=true, file deletion is enabled.
- // Otherwise it needs to enable it for a second time.
- ASSERT_OK(blob_db_->EnableFileDeletions(force));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- if (!force) {
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB(data);
- // Call EnableFileDeletions a second time.
- ASSERT_OK(blob_db_->EnableFileDeletions(false));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- }
- // Regardless of value of `force`, file should be deleted by now.
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB({});
- }
- }
- TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
- BlobDBOptions bdb_options;
- bdb_options.enable_garbage_collection = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- // Register some dummy blob files.
- blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
- blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
- blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
- blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
- blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
- // Initialize the blob <-> SST file mapping. First, add some SST files with
- // blob file references, then some without.
- std::vector<LiveFileMetaData> live_files;
- for (uint64_t i = 1; i <= 10; ++i) {
- LiveFileMetaData live_file;
- live_file.file_number = i;
- live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
- live_files.emplace_back(live_file);
- }
- for (uint64_t i = 11; i <= 20; ++i) {
- LiveFileMetaData live_file;
- live_file.file_number = i;
- live_files.emplace_back(live_file);
- }
- blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
- // Check that the blob <-> SST mappings have been correctly initialized.
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 5);
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- {
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a flush where the SST does not reference any blob files.
- {
- FlushJobInfo info{};
- info.file_number = 21;
- info.smallest_seqno = 1;
- info.largest_seqno = 100;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a flush where the SST references a blob file.
- {
- FlushJobInfo info{};
- info.file_number = 22;
- info.oldest_blob_file_number = 5;
- info.smallest_seqno = 101;
- info.largest_seqno = 200;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a compaction. Some inputs and outputs have blob file references,
- // some don't. There is also a trivial move (which means the SST appears on
- // both the input and the output list). Blob file 1 loses all its linked SSTs,
- // and since it got marked immutable at sequence number 200 which has already
- // been flushed, it can be marked obsolete.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
- info.input_file_infos.emplace_back(
- CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
- info.output_file_infos.emplace_back(
- CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate a failed compaction. No mappings should be updated.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
- info.status = Status::Corruption();
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate another compaction. Blob file 2 loses all its linked SSTs
- // but since it got marked immutable at sequence number 300 which hasn't
- // been flushed yet, it cannot be marked obsolete at this point.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate a flush with largest sequence number 300. This will make it
- // possible to mark blob file 2 obsolete.
- {
- FlushJobInfo info{};
- info.file_number = 26;
- info.smallest_seqno = 201;
- info.largest_seqno = 300;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{true, true, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 3);
- for (size_t i = 0; i < 3; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 2);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
- }
- }
- TEST_F(BlobDBTest, ShutdownWait) {
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 100;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = false;
- Options options;
- options.env = mock_env_.get();
- SyncPoint::GetInstance()->LoadDependency({
- {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
- {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
- {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
- {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
- });
- // Force all tasks to be scheduled immediately.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "TimeQueue::Add:item.end", [&](void *arg) {
- std::chrono::steady_clock::time_point *tp =
- static_cast<std::chrono::steady_clock::time_point *>(arg);
- *tp =
- std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
- // Sleep 3 ms to increase the chance of data race.
- // We've synced up the code so that EvictExpiredFiles()
- // is called concurrently with ~BlobDBImpl().
- // ~BlobDBImpl() is supposed to wait for all background
- // task to shutdown before doing anything else. In order
- // to use the same test to reproduce a bug of the waiting
- // logic, we wait a little bit here, so that TSAN can
- // catch the data race.
- // We should improve the test if we find a better way.
- Env::Default()->SleepForMicroseconds(3000);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Open(bdb_options, options);
- mock_env_->set_current_time(50);
- std::map<std::string, std::string> data;
- ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_FALSE(blob_file->Immutable());
- ASSERT_FALSE(blob_file->Obsolete());
- VerifyDB(data);
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
- mock_env_->set_current_time(250);
- // The key should expired now.
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
- Close();
- SyncPoint::GetInstance()->DisableProcessing();
- }
- } // namespace blob_db
- } // namespace ROCKSDB_NAMESPACE
- // A black-box test for the ttl wrapper around rocksdb
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
- #else
- #include <stdio.h>
- int main(int /*argc*/, char** /*argv*/) {
- fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
- return 0;
- }
- #endif // !ROCKSDB_LITE
|