| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "utilities/blob_db/blob_db.h"
- #include <algorithm>
- #include <chrono>
- #include <cstdlib>
- #include <iomanip>
- #include <map>
- #include <memory>
- #include <sstream>
- #include <string>
- #include <vector>
- #include "db/blob/blob_index.h"
- #include "db/db_test_util.h"
- #include "env/composite_env_wrapper.h"
- #include "file/file_util.h"
- #include "file/sst_file_manager_impl.h"
- #include "port/port.h"
- #include "rocksdb/utilities/debug.h"
- #include "test_util/mock_time_env.h"
- #include "test_util/sync_point.h"
- #include "test_util/testharness.h"
- #include "util/random.h"
- #include "util/string_util.h"
- #include "utilities/blob_db/blob_db_impl.h"
- #include "utilities/fault_injection_env.h"
- namespace ROCKSDB_NAMESPACE::blob_db {
- class BlobDBTest : public testing::Test {
- public:
- const int kMaxBlobSize = 1 << 14;
- struct BlobIndexVersion {
- BlobIndexVersion() = default;
- BlobIndexVersion(std::string _user_key, uint64_t _file_number,
- uint64_t _expiration, SequenceNumber _sequence,
- ValueType _type)
- : user_key(std::move(_user_key)),
- file_number(_file_number),
- expiration(_expiration),
- sequence(_sequence),
- type(_type) {}
- std::string user_key;
- uint64_t file_number = kInvalidBlobFileNumber;
- uint64_t expiration = kNoExpiration;
- SequenceNumber sequence = 0;
- ValueType type = kTypeValue;
- };
- BlobDBTest()
- : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
- mock_clock_ = std::make_shared<MockSystemClock>(SystemClock::Default());
- mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_));
- fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default()));
- Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
- assert(s.ok());
- }
- ~BlobDBTest() override {
- SyncPoint::GetInstance()->ClearAllCallBacks();
- Destroy();
- }
- Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- options.create_if_missing = true;
- if (options.env == mock_env_.get()) {
- // Need to disable stats dumping and persisting which also use
- // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
- // With mocked time, this can hang on some platforms (MacOS)
- // because (a) on some platforms, pthread_cond_timedwait does not appear
- // to release the lock for other threads to operate if the deadline time
- // is already passed, and (b) TimedWait calls are currently a bad
- // abstraction because the deadline parameter is usually computed from
- // Env time, but is interpreted in real clock time.
- options.stats_dump_period_sec = 0;
- options.stats_persist_period_sec = 0;
- }
- return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
- }
- void Open(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- ASSERT_OK(TryOpen(bdb_options, options));
- }
- void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
- Options options = Options()) {
- assert(blob_db_ != nullptr);
- delete blob_db_;
- blob_db_ = nullptr;
- Open(bdb_options, options);
- }
- void Close() {
- assert(blob_db_ != nullptr);
- delete blob_db_;
- blob_db_ = nullptr;
- }
- void Destroy() {
- if (blob_db_) {
- Options options = blob_db_->GetOptions();
- BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
- delete blob_db_;
- blob_db_ = nullptr;
- ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
- }
- }
- BlobDBImpl *blob_db_impl() { return static_cast<BlobDBImpl *>(blob_db_); }
- Status Put(const Slice &key, const Slice &value,
- std::map<std::string, std::string> *data = nullptr) {
- Status s = blob_db_->Put(WriteOptions(), key, value);
- if (data != nullptr) {
- (*data)[key.ToString()] = value.ToString();
- }
- return s;
- }
- void Delete(const std::string &key,
- std::map<std::string, std::string> *data = nullptr) {
- ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
- if (data != nullptr) {
- data->erase(key);
- }
- }
- Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
- std::map<std::string, std::string> *data = nullptr) {
- Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
- if (data != nullptr) {
- (*data)[key.ToString()] = value.ToString();
- }
- return s;
- }
- Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
- return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
- }
- void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = rnd->HumanReadableString(len);
- ASSERT_OK(
- blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = rnd->HumanReadableString(len);
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
- expiration));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandom(const std::string &key, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- PutRandom(blob_db_, key, rnd, data);
- }
- void PutRandom(DB *db, const std::string &key, Random *rnd,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = rnd->HumanReadableString(len);
- ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- void PutRandomToWriteBatch(
- const std::string &key, Random *rnd, WriteBatch *batch,
- std::map<std::string, std::string> *data = nullptr) {
- int len = rnd->Next() % kMaxBlobSize + 1;
- std::string value = rnd->HumanReadableString(len);
- ASSERT_OK(batch->Put(key, value));
- if (data != nullptr) {
- (*data)[key] = value;
- }
- }
- // Verify blob db contain expected data and nothing more.
- void VerifyDB(const std::map<std::string, std::string> &data) {
- VerifyDB(blob_db_, data);
- }
- void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
- // Verify normal Get
- auto *cfh = db->DefaultColumnFamily();
- for (auto &p : data) {
- PinnableSlice value_slice;
- ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
- ASSERT_EQ(p.second, value_slice.ToString());
- std::string value;
- ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
- ASSERT_EQ(p.second, value);
- }
- // Verify iterators
- Iterator *iter = db->NewIterator(ReadOptions());
- iter->SeekToFirst();
- for (auto &p : data) {
- ASSERT_TRUE(iter->Valid());
- ASSERT_EQ(p.first, iter->key().ToString());
- ASSERT_EQ(p.second, iter->value().ToString());
- iter->Next();
- }
- ASSERT_FALSE(iter->Valid());
- ASSERT_OK(iter->status());
- delete iter;
- }
- void VerifyBaseDB(
- const std::map<std::string, KeyVersion> &expected_versions) {
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- DB *db = blob_db_->GetRootDB();
- const size_t kMaxKeys = 10000;
- std::vector<KeyVersion> versions;
- ASSERT_OK(GetAllKeyVersions(db, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(expected_versions.size(), versions.size());
- size_t i = 0;
- for (auto &key_version : expected_versions) {
- const KeyVersion &expected_version = key_version.second;
- ASSERT_EQ(expected_version.user_key, versions[i].user_key);
- ASSERT_EQ(expected_version.sequence, versions[i].sequence);
- ASSERT_EQ(expected_version.type, versions[i].type);
- if (versions[i].type == kTypeValue) {
- ASSERT_EQ(expected_version.value, versions[i].value);
- } else {
- ASSERT_EQ(kTypeBlobIndex, versions[i].type);
- PinnableSlice value;
- ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
- versions[i].value, &value));
- ASSERT_EQ(expected_version.value, value.ToString());
- }
- i++;
- }
- }
- void VerifyBaseDBBlobIndex(
- const std::map<std::string, BlobIndexVersion> &expected_versions) {
- const size_t kMaxKeys = 10000;
- std::vector<KeyVersion> versions;
- ASSERT_OK(
- GetAllKeyVersions(blob_db_->GetRootDB(), {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(versions.size(), expected_versions.size());
- size_t i = 0;
- for (const auto &expected_pair : expected_versions) {
- const BlobIndexVersion &expected_version = expected_pair.second;
- ASSERT_EQ(versions[i].user_key, expected_version.user_key);
- ASSERT_EQ(versions[i].sequence, expected_version.sequence);
- ASSERT_EQ(versions[i].type, expected_version.type);
- if (versions[i].type != kTypeBlobIndex) {
- ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
- ASSERT_EQ(kNoExpiration, expected_version.expiration);
- ++i;
- continue;
- }
- BlobIndex blob_index;
- ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
- const uint64_t file_number = !blob_index.IsInlined()
- ? blob_index.file_number()
- : kInvalidBlobFileNumber;
- ASSERT_EQ(file_number, expected_version.file_number);
- const uint64_t expiration =
- blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
- ASSERT_EQ(expiration, expected_version.expiration);
- ++i;
- }
- }
- void InsertBlobs() {
- WriteOptions wo;
- std::string value;
- Random rnd(301);
- for (size_t i = 0; i < 100000; i++) {
- uint64_t ttl = rnd.Next() % 86400;
- PutRandomWithTTL("key" + std::to_string(i % 500), ttl, &rnd, nullptr);
- }
- for (size_t i = 0; i < 10; i++) {
- Delete("key" + std::to_string(i % 500));
- }
- }
- const std::string dbname_;
- std::shared_ptr<MockSystemClock> mock_clock_;
- std::unique_ptr<Env> mock_env_;
- std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
- BlobDB *blob_db_;
- }; // class BlobDBTest
- TEST_F(BlobDBTest, Put) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + std::to_string(i), &rnd, &data);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, PutWithTTL) {
- Random rnd(301);
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 1000;
- bdb_options.min_blob_size = 0;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- mock_clock_->SetCurrentTime(50);
- for (size_t i = 0; i < 100; i++) {
- uint64_t ttl = rnd.Next() % 100;
- PutRandomWithTTL("key" + std::to_string(i), ttl, &rnd,
- (ttl <= 50 ? nullptr : &data));
- }
- mock_clock_->SetCurrentTime(100);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_TRUE(blob_files[0]->HasTTL());
- ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, PutUntil) {
- Random rnd(301);
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 1000;
- bdb_options.min_blob_size = 0;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- mock_clock_->SetCurrentTime(50);
- for (size_t i = 0; i < 100; i++) {
- uint64_t expiration = rnd.Next() % 100 + 50;
- PutRandomUntil("key" + std::to_string(i), expiration, &rnd,
- (expiration <= 100 ? nullptr : &data));
- }
- mock_clock_->SetCurrentTime(100);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_TRUE(blob_files[0]->HasTTL());
- ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, StackableDBGet) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + std::to_string(i), &rnd, &data);
- }
- for (size_t i = 0; i < 100; i++) {
- StackableDB *db = blob_db_;
- ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
- std::string key = "key" + std::to_string(i);
- PinnableSlice pinnable_value;
- ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
- std::string string_value;
- ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
- ASSERT_EQ(string_value, pinnable_value.ToString());
- ASSERT_EQ(string_value, data[key]);
- }
- }
- TEST_F(BlobDBTest, GetExpiration) {
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.disable_background_tasks = true;
- mock_clock_->SetCurrentTime(100);
- Open(bdb_options, options);
- ASSERT_OK(Put("key1", "value1"));
- ASSERT_OK(PutWithTTL("key2", "value2", 200));
- PinnableSlice value;
- uint64_t expiration;
- ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
- ASSERT_EQ("value1", value.ToString());
- ASSERT_EQ(kNoExpiration, expiration);
- ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
- ASSERT_EQ("value2", value.ToString());
- ASSERT_EQ(300 /* = 100 + 200 */, expiration);
- }
- TEST_F(BlobDBTest, GetIOError) {
- Options options;
- options.env = fault_injection_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0; // Make sure value write to blob file
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
- PinnableSlice value;
- ASSERT_OK(Put("foo", "bar"));
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
- ASSERT_TRUE(s.IsIOError());
- // Reactivate file system to allow test to close DB.
- fault_injection_env_->SetFilesystemActive(true);
- }
- TEST_F(BlobDBTest, PutIOError) {
- Options options;
- options.env = fault_injection_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0; // Make sure value write to blob file
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- ASSERT_TRUE(Put("foo", "v1").IsIOError());
- fault_injection_env_->SetFilesystemActive(true, Status::IOError());
- ASSERT_OK(Put("bar", "v1"));
- }
- TEST_F(BlobDBTest, WriteBatch) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- WriteBatch batch;
- for (size_t j = 0; j < 10; j++) {
- PutRandomToWriteBatch("key" + std::to_string(j * 100 + i), &rnd, &batch,
- &data);
- }
- ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, Delete) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + std::to_string(i), &rnd, &data);
- }
- for (size_t i = 0; i < 100; i += 5) {
- Delete("key" + std::to_string(i), &data);
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, DeleteBatch) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + std::to_string(i), &rnd);
- }
- WriteBatch batch;
- for (size_t i = 0; i < 100; i++) {
- ASSERT_OK(batch.Delete("key" + std::to_string(i)));
- }
- ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
- // DB should be empty.
- VerifyDB({});
- }
- TEST_F(BlobDBTest, Override) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (int i = 0; i < 10000; i++) {
- PutRandom("key" + std::to_string(i), &rnd, nullptr);
- }
- // override all the keys
- for (int i = 0; i < 10000; i++) {
- PutRandom("key" + std::to_string(i), &rnd, &data);
- }
- VerifyDB(data);
- }
- #ifdef SNAPPY
- TEST_F(BlobDBTest, Compression) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = CompressionType::kSnappyCompression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("put-key" + std::to_string(i), &rnd, &data);
- }
- for (int i = 0; i < 100; i++) {
- WriteBatch batch;
- for (size_t j = 0; j < 10; j++) {
- PutRandomToWriteBatch("write-batch-key" + std::to_string(j * 100 + i),
- &rnd, &batch, &data);
- }
- ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, DecompressAfterReopen) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = CompressionType::kSnappyCompression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("put-key" + std::to_string(i), &rnd, &data);
- }
- VerifyDB(data);
- bdb_options.compression = CompressionType::kNoCompression;
- Reopen(bdb_options);
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, EnableDisableCompressionGC) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = kSnappyCompression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- size_t data_idx = 0;
- for (; data_idx < 100; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
- // disable compression
- bdb_options.compression = kNoCompression;
- Reopen(bdb_options);
- // Add more data with new compression type
- for (; data_idx < 200; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
- // Enable GC. If we do it earlier the snapshot release triggered compaction
- // may compact files and trigger GC before we can verify there are two files.
- bdb_options.enable_garbage_collection = true;
- Reopen(bdb_options);
- // Trigger compaction
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- VerifyDB(data);
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- for (const auto &bfile : blob_files) {
- ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
- }
- // enabling the compression again
- bdb_options.compression = kSnappyCompression;
- Reopen(bdb_options);
- // Add more data with new compression type
- for (; data_idx < 300; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- // Trigger compaction
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- VerifyDB(data);
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- for (const auto &bfile : blob_files) {
- ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
- }
- }
- #ifdef LZ4
- // Test switch compression types and run GC, it needs both Snappy and LZ4
- // support.
- TEST_F(BlobDBTest, ChangeCompressionGC) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = kLZ4Compression;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- size_t data_idx = 0;
- for (; data_idx < 100; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
- // Change compression type
- bdb_options.compression = kSnappyCompression;
- Reopen(bdb_options);
- // Add more data with Snappy compression type
- for (; data_idx < 200; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- // Verify blob file compression type
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
- // Enable GC. If we do it earlier the snapshot release triggered compaction
- // may compact files and trigger GC before we can verify there are two files.
- bdb_options.enable_garbage_collection = true;
- Reopen(bdb_options);
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyDB(data);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- for (const auto &bfile : blob_files) {
- ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
- }
- // Disable compression
- bdb_options.compression = kNoCompression;
- Reopen(bdb_options);
- for (; data_idx < 300; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyDB(data);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- for (const auto &bfile : blob_files) {
- ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
- }
- // switching different compression types to generate mixed compression types
- bdb_options.compression = kSnappyCompression;
- Reopen(bdb_options);
- for (; data_idx < 400; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- bdb_options.compression = kLZ4Compression;
- Reopen(bdb_options);
- for (; data_idx < 500; data_idx++) {
- PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
- }
- VerifyDB(data);
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyDB(data);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- for (const auto &bfile : blob_files) {
- ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
- }
- }
- #endif // LZ4
- #endif // SNAPPY
- TEST_F(BlobDBTest, MultipleWriters) {
- Open(BlobDBOptions());
- std::vector<port::Thread> workers;
- std::vector<std::map<std::string, std::string>> data_set(10);
- for (uint32_t i = 0; i < 10; i++) {
- workers.emplace_back(
- [&](uint32_t id) {
- Random rnd(301 + id);
- for (int j = 0; j < 100; j++) {
- std::string key =
- "key" + std::to_string(id) + "_" + std::to_string(j);
- if (id < 5) {
- PutRandom(key, &rnd, &data_set[id]);
- } else {
- WriteBatch batch;
- PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
- ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
- }
- }
- },
- i);
- }
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 10; i++) {
- workers[i].join();
- data.insert(data_set[i].begin(), data_set[i].end());
- }
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, SstFileManager) {
- // run the same test for Get(), MultiGet() and Iterator each.
- std::shared_ptr<SstFileManager> sst_file_manager(
- NewSstFileManager(mock_env_.get()));
- sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
- SstFileManagerImpl *sfm =
- static_cast<SstFileManagerImpl *>(sst_file_manager.get());
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- Options db_options;
- int files_scheduled_to_delete = 0;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
- assert(arg);
- const std::string *const file_path =
- static_cast<const std::string *>(arg);
- if (file_path->find(".blob") != std::string::npos) {
- ++files_scheduled_to_delete;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- db_options.sst_file_manager = sst_file_manager;
- Open(bdb_options, db_options);
- // Create one obselete file and clean it.
- ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- std::shared_ptr<BlobFile> bfile = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- // Even if SSTFileManager is not set, DB is creating a dummy one.
- ASSERT_EQ(1, files_scheduled_to_delete);
- Destroy();
- // Make sure that DestroyBlobDB() also goes through delete scheduler.
- ASSERT_EQ(2, files_scheduled_to_delete);
- SyncPoint::GetInstance()->DisableProcessing();
- sfm->WaitForEmptyTrash();
- }
- TEST_F(BlobDBTest, SstFileManagerRestart) {
- int files_scheduled_to_delete = 0;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
- assert(arg);
- const std::string *const file_path =
- static_cast<const std::string *>(arg);
- if (file_path->find(".blob") != std::string::npos) {
- ++files_scheduled_to_delete;
- }
- });
- // run the same test for Get(), MultiGet() and Iterator each.
- std::shared_ptr<SstFileManager> sst_file_manager(
- NewSstFileManager(mock_env_.get()));
- sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
- SstFileManagerImpl *sfm =
- static_cast<SstFileManagerImpl *>(sst_file_manager.get());
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- Options db_options;
- SyncPoint::GetInstance()->EnableProcessing();
- db_options.sst_file_manager = sst_file_manager;
- Open(bdb_options, db_options);
- std::string blob_dir = blob_db_impl()->TEST_blob_dir();
- ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
- Close();
- // Create 3 dummy trash files under the blob_dir
- const auto &fs = db_options.env->GetFileSystem();
- ASSERT_OK(CreateFile(fs, blob_dir + "/000666.blob.trash", "", false));
- ASSERT_OK(CreateFile(fs, blob_dir + "/000888.blob.trash", "", true));
- ASSERT_OK(CreateFile(fs, blob_dir + "/something_not_match.trash", "", false));
- // Make sure that reopening the DB rescan the existing trash files
- Open(bdb_options, db_options);
- ASSERT_EQ(files_scheduled_to_delete, 2);
- sfm->WaitForEmptyTrash();
- // There should be exact one file under the blob dir now.
- std::vector<std::string> all_files;
- ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
- int nfiles = 0;
- for (const auto &f : all_files) {
- assert(!f.empty());
- if (f[0] == '.') {
- continue;
- }
- nfiles++;
- }
- ASSERT_EQ(nfiles, 1);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.disable_auto_compactions = true;
- // i = when to take snapshot
- for (int i = 0; i < 4; i++) {
- Destroy();
- Open(bdb_options, options);
- const Snapshot *snapshot = nullptr;
- // First file
- ASSERT_OK(Put("key1", "value"));
- if (i == 0) {
- snapshot = blob_db_->GetSnapshot();
- }
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- // Second file
- ASSERT_OK(Put("key2", "value"));
- if (i == 1) {
- snapshot = blob_db_->GetSnapshot();
- }
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- auto bfile = blob_files[1];
- ASSERT_FALSE(bfile->Immutable());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- // Third file
- ASSERT_OK(Put("key3", "value"));
- if (i == 2) {
- snapshot = blob_db_->GetSnapshot();
- }
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_TRUE(bfile->Obsolete());
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
- bfile->GetObsoleteSequence());
- Delete("key2");
- if (i == 3) {
- snapshot = blob_db_->GetSnapshot();
- }
- ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- if (i >= 2) {
- // The snapshot shouldn't see data in bfile
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- } else {
- // The snapshot will see data in bfile, so the file shouldn't be deleted
- ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- }
- }
- }
- TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
- Options options;
- options.env = mock_env_.get();
- mock_clock_->SetCurrentTime(0);
- Open(BlobDBOptions(), options);
- ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
- ColumnFamilyHandle *handle = nullptr;
- std::string value;
- std::vector<std::string> values;
- // The call simply pass through to base db. It should succeed.
- ASSERT_OK(
- blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
- ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
- ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
- .IsNotSupported());
- ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
- .IsNotSupported());
- WriteBatch batch;
- ASSERT_OK(batch.Put("k1", "v1"));
- ASSERT_OK(batch.Put(handle, "k2", "v2"));
- ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
- ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
- ASSERT_TRUE(
- blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
- auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
- {"k1", "k2"}, &values);
- ASSERT_EQ(2, statuses.size());
- ASSERT_TRUE(statuses[0].IsNotSupported());
- ASSERT_TRUE(statuses[1].IsNotSupported());
- ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
- delete handle;
- }
- TEST_F(BlobDBTest, GetLiveFilesMetaData) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.blob_dir = "blob_dir";
- bdb_options.path_relative = true;
- bdb_options.ttl_range_secs = 10;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- for (size_t i = 0; i < 100; i++) {
- PutRandom("key" + std::to_string(i), &rnd, &data);
- }
- constexpr uint64_t expiration = 1000ULL;
- PutRandomUntil("key100", expiration, &rnd, &data);
- std::vector<LiveFileMetaData> metadata;
- blob_db_->GetLiveFilesMetaData(&metadata);
- ASSERT_EQ(2U, metadata.size());
- // Path should be relative to db_name, but begin with slash.
- const std::string filename1("/blob_dir/000001.blob");
- ASSERT_EQ(filename1, metadata[0].name);
- ASSERT_EQ(1, metadata[0].file_number);
- ASSERT_EQ(0, metadata[0].oldest_ancester_time);
- ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
- const std::string filename2("/blob_dir/000002.blob");
- ASSERT_EQ(filename2, metadata[1].name);
- ASSERT_EQ(2, metadata[1].file_number);
- ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
- ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
- std::vector<std::string> livefile;
- uint64_t mfs;
- ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
- ASSERT_EQ(5U, livefile.size());
- ASSERT_EQ(filename1, livefile[3]);
- ASSERT_EQ(filename2, livefile[4]);
- std::vector<LiveFileStorageInfo> all_files, blob_files;
- ASSERT_OK(blob_db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(),
- &all_files));
- for (size_t i = 0; i < all_files.size(); i++) {
- if (all_files[i].file_type == kBlobFile) {
- blob_files.push_back(all_files[i]);
- }
- }
- ASSERT_EQ(2U, blob_files.size());
- ASSERT_GT(all_files.size(), blob_files.size());
- ASSERT_EQ("000001.blob", blob_files[0].relative_filename);
- ASSERT_EQ(blob_db_impl()->TEST_blob_dir(), blob_files[0].directory);
- ASSERT_GT(blob_files[0].size, 0);
- ASSERT_EQ("000002.blob", blob_files[1].relative_filename);
- ASSERT_EQ(blob_db_impl()->TEST_blob_dir(), blob_files[1].directory);
- ASSERT_GT(blob_files[1].size, 0);
- VerifyDB(data);
- }
- TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
- constexpr size_t kNumKey = 20;
- constexpr size_t kNumIteration = 10;
- Random rnd(301);
- std::map<std::string, std::string> data;
- std::vector<bool> is_blob(kNumKey, false);
- // Write to plain rocksdb.
- Options options;
- options.create_if_missing = true;
- DB *db = nullptr;
- ASSERT_OK(DB::Open(options, dbname_, &db));
- for (size_t i = 0; i < kNumIteration; i++) {
- auto key_index = rnd.Next() % kNumKey;
- std::string key = "key" + std::to_string(key_index);
- PutRandom(db, key, &rnd, &data);
- }
- VerifyDB(db, data);
- delete db;
- db = nullptr;
- // Open as blob db. Verify it can read existing data.
- Open();
- VerifyDB(blob_db_, data);
- for (size_t i = 0; i < kNumIteration; i++) {
- auto key_index = rnd.Next() % kNumKey;
- std::string key = "key" + std::to_string(key_index);
- is_blob[key_index] = true;
- PutRandom(blob_db_, key, &rnd, &data);
- }
- VerifyDB(blob_db_, data);
- delete blob_db_;
- blob_db_ = nullptr;
- // Verify plain db return error for keys written by blob db.
- ASSERT_OK(DB::Open(options, dbname_, &db));
- std::string value;
- for (size_t i = 0; i < kNumKey; i++) {
- std::string key = "key" + std::to_string(i);
- Status s = db->Get(ReadOptions(), key, &value);
- if (data.count(key) == 0) {
- ASSERT_TRUE(s.IsNotFound());
- } else if (is_blob[i]) {
- ASSERT_TRUE(s.IsCorruption());
- } else {
- ASSERT_OK(s);
- ASSERT_EQ(data[key], value);
- }
- }
- delete db;
- }
- // Test to verify that a NoSpace IOError Status is returned on reaching
- // max_db_size limit.
- TEST_F(BlobDBTest, OutOfSpace) {
- // Use mock env to stop wall clock.
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 200;
- bdb_options.is_fifo = false;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- // Each stored blob has an overhead of about 42 bytes currently.
- // So a small key + a 100 byte blob should take up ~150 bytes in the db.
- std::string value(100, 'v');
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
- // Putting another blob should fail as ading it would exceed the max_db_size
- // limit.
- Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_TRUE(s.IsNoSpace());
- }
- TEST_F(BlobDBTest, FIFOEviction) {
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 200;
- bdb_options.blob_file_size = 100;
- bdb_options.is_fifo = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::atomic<int> evict_count{0};
- SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictOldestBlobFile:Evicted",
- [&](void *) { evict_count++; });
- SyncPoint::GetInstance()->EnableProcessing();
- // Each stored blob has an overhead of 32 bytes currently.
- // So a 100 byte blob should take up 132 bytes.
- std::string value(100, 'v');
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
- VerifyDB({{"key1", value}});
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- // Adding another 100 bytes blob would take the total size to 264 bytes
- // (2*132). max_db_size will be exceeded
- // than max_db_size and trigger FIFO eviction.
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
- ASSERT_EQ(1, evict_count);
- // key1 will exist until corresponding file be deleted.
- VerifyDB({{"key1", value}, {"key2", value}});
- // Adding another 100 bytes blob without TTL.
- ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
- ASSERT_EQ(2, evict_count);
- // key1 and key2 will exist until corresponding file be deleted.
- VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
- // The fourth blob file, without TTL.
- ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
- ASSERT_EQ(3, evict_count);
- VerifyDB(
- {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(4, blob_files.size());
- ASSERT_TRUE(blob_files[0]->Obsolete());
- ASSERT_TRUE(blob_files[1]->Obsolete());
- ASSERT_TRUE(blob_files[2]->Obsolete());
- ASSERT_FALSE(blob_files[3]->Obsolete());
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(3, obsolete_files.size());
- ASSERT_EQ(blob_files[0], obsolete_files[0]);
- ASSERT_EQ(blob_files[1], obsolete_files[1]);
- ASSERT_EQ(blob_files[2], obsolete_files[2]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_TRUE(obsolete_files.empty());
- VerifyDB({{"key4", value}});
- }
- TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
- Options options;
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 1000;
- bdb_options.blob_file_size = 5000;
- bdb_options.is_fifo = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::atomic<int> evict_count{0};
- SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictOldestBlobFile:Evicted",
- [&](void *) { evict_count++; });
- SyncPoint::GetInstance()->EnableProcessing();
- std::string value(2000, 'v');
- ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
- ASSERT_EQ(0, evict_count);
- }
- TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
- BlobDBOptions bdb_options;
- bdb_options.is_fifo = true;
- bdb_options.min_blob_size = 100;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- options.env = mock_env_.get();
- options.disable_auto_compactions = true;
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- Open(bdb_options, options);
- SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
- "BlobDBTest.FIFOEviction_NoEnoughBlobFilesToEvict:AfterFlush"}});
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
- std::string small_value(50, 'v');
- std::map<std::string, std::string> data;
- // Insert some data into LSM tree to make sure FIFO eviction take SST
- // file size into account.
- for (int i = 0; i < 1000; i++) {
- ASSERT_OK(Put("key" + std::to_string(i), small_value, &data));
- }
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- uint64_t live_sst_size = 0;
- ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
- &live_sst_size));
- ASSERT_TRUE(live_sst_size > 0);
- TEST_SYNC_POINT(
- "BlobDBTest.FIFOEviction_NoEnoughBlobFilesToEvict:AfterFlush");
- ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
- bdb_options.max_db_size = live_sst_size + 2000;
- Reopen(bdb_options, options);
- ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
- std::string value_1k(1000, 'v');
- ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB(data);
- // large_key2 evicts large_key1
- ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- data.erase("large_key1");
- VerifyDB(data);
- // large_key3 get no enough space even after evicting large_key2, so it
- // instead return no space error.
- std::string value_2k(2000, 'v');
- ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- // Verify large_key2 still exists.
- VerifyDB(data);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- // Test flush or compaction will trigger FIFO eviction since they update
- // total SST file size.
- TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 1000;
- bdb_options.is_fifo = true;
- bdb_options.min_blob_size = 100;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- options.env = mock_env_.get();
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- options.compression = kNoCompression;
- Open(bdb_options, options);
- SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
- "BlobDBTest.FIFOEviction_TriggerOnSSTSizeChange:AfterFlush"}});
- SyncPoint::GetInstance()->EnableProcessing();
- std::string value(800, 'v');
- ASSERT_OK(PutWithTTL("large_key", value, 60));
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB({{"large_key", value}});
- // Insert some small keys and flush to bring DB out of space.
- std::map<std::string, std::string> data;
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(Put("key" + std::to_string(i), "v", &data));
- }
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- TEST_SYNC_POINT("BlobDBTest.FIFOEviction_TriggerOnSSTSizeChange:AfterFlush");
- // Verify large_key is deleted by FIFO eviction.
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- VerifyDB(data);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(BlobDBTest, InlineSmallValues) {
- constexpr uint64_t kMaxExpiration = 1000;
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = kMaxExpiration;
- bdb_options.min_blob_size = 100;
- bdb_options.blob_file_size = 256 * 1000 * 1000;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- mock_clock_->SetCurrentTime(0);
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, KeyVersion> versions;
- for (size_t i = 0; i < 1000; i++) {
- bool is_small_value = rnd.Next() % 2;
- bool has_ttl = rnd.Next() % 2;
- uint64_t expiration = rnd.Next() % kMaxExpiration;
- int len = is_small_value ? 50 : 200;
- std::string key = "key" + std::to_string(i);
- std::string value = rnd.HumanReadableString(len);
- std::string blob_index;
- data[key] = value;
- SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- if (!has_ttl) {
- ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
- } else {
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
- }
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- versions[key] =
- KeyVersion(key, value, sequence,
- (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
- }
- VerifyDB(data);
- VerifyBaseDB(versions);
- auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
- auto blob_files = bdb_impl->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- std::shared_ptr<BlobFile> non_ttl_file;
- std::shared_ptr<BlobFile> ttl_file;
- if (blob_files[0]->HasTTL()) {
- ttl_file = blob_files[0];
- non_ttl_file = blob_files[1];
- } else {
- non_ttl_file = blob_files[0];
- ttl_file = blob_files[1];
- }
- ASSERT_FALSE(non_ttl_file->HasTTL());
- ASSERT_TRUE(ttl_file->HasTTL());
- }
- TEST_F(BlobDBTest, UserCompactionFilter) {
- class CustomerFilter : public CompactionFilter {
- public:
- bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
- std::string *new_value, bool *value_changed) const override {
- *value_changed = false;
- // changing value size to test value transitions between inlined data
- // and stored-in-blob data
- if (value.size() % 4 == 1) {
- *new_value = value.ToString();
- // double size by duplicating value
- *new_value += *new_value;
- *value_changed = true;
- return false;
- } else if (value.size() % 3 == 1) {
- *new_value = value.ToString();
- // trancate value size by half
- *new_value = new_value->substr(0, new_value->size() / 2);
- *value_changed = true;
- return false;
- } else if (value.size() % 2 == 1) {
- return true;
- }
- return false;
- }
- bool IgnoreSnapshots() const override { return true; }
- const char *Name() const override { return "CustomerFilter"; }
- };
- class CustomerFilterFactory : public CompactionFilterFactory {
- const char *Name() const override { return "CustomerFilterFactory"; }
- std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context & /*context*/) override {
- return std::unique_ptr<CompactionFilter>(new CustomerFilter());
- }
- };
- constexpr size_t kNumPuts = 1 << 10;
- // Generate both inlined and blob value
- constexpr uint64_t kMinValueSize = 1 << 6;
- constexpr uint64_t kMaxValueSize = 1 << 8;
- constexpr uint64_t kMinBlobSize = 1 << 7;
- static_assert(kMinValueSize < kMinBlobSize);
- static_assert(kMaxValueSize > kMinBlobSize);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = kMinBlobSize;
- bdb_options.blob_file_size = kMaxValueSize * 10;
- bdb_options.disable_background_tasks = true;
- if (Snappy_Supported()) {
- bdb_options.compression = CompressionType::kSnappyCompression;
- }
- // case_num == 0: Test user defined compaction filter
- // case_num == 1: Test user defined compaction filter factory
- for (int case_num = 0; case_num < 2; case_num++) {
- Options options;
- if (case_num == 0) {
- options.compaction_filter = new CustomerFilter();
- } else {
- options.compaction_filter_factory.reset(new CustomerFilterFactory());
- }
- options.disable_auto_compactions = true;
- options.env = mock_env_.get();
- options.statistics = CreateDBStatistics();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, std::string> data_after_compact;
- Random rnd(301);
- uint64_t value_size = kMinValueSize;
- int drop_record = 0;
- for (size_t i = 0; i < kNumPuts; ++i) {
- std::ostringstream oss;
- oss << "key" << std::setw(4) << std::setfill('0') << i;
- const std::string key(oss.str());
- const std::string value = rnd.HumanReadableString((int)value_size);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- if (value.length() % 4 == 1) {
- data_after_compact[key] = value + value;
- } else if (value.length() % 3 == 1) {
- data_after_compact[key] = value.substr(0, value.size() / 2);
- } else if (value.length() % 2 == 1) {
- ++drop_record;
- } else {
- data_after_compact[key] = value;
- }
- if (++value_size > kMaxValueSize) {
- value_size = kMinValueSize;
- }
- }
- // Verify full data set
- VerifyDB(data);
- // Applying compaction filter for records
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // Verify data after compaction, only value with even length left.
- VerifyDB(data_after_compact);
- ASSERT_EQ(drop_record,
- options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
- delete options.compaction_filter;
- Destroy();
- }
- }
- // Test user comapction filter when there is IO error on blob data.
- TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
- class CustomerFilter : public CompactionFilter {
- public:
- bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
- std::string *new_value, bool *value_changed) const override {
- *new_value = value.ToString() + "_new";
- *value_changed = true;
- return false;
- }
- bool IgnoreSnapshots() const override { return true; }
- const char *Name() const override { return "CustomerFilter"; }
- };
- constexpr size_t kNumPuts = 100;
- constexpr int kValueSize = 100;
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.blob_file_size = kValueSize * 10;
- bdb_options.disable_background_tasks = true;
- bdb_options.compression = CompressionType::kNoCompression;
- std::vector<std::string> io_failure_cases = {
- "BlobDBImpl::CreateBlobFileAndWriter",
- "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
- "BlobDBImpl::CloseBlobFile"};
- for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
- Options options;
- options.compaction_filter = new CustomerFilter();
- options.disable_auto_compactions = true;
- options.env = fault_injection_env_.get();
- options.statistics = CreateDBStatistics();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- Random rnd(301);
- for (size_t i = 0; i < kNumPuts; ++i) {
- std::ostringstream oss;
- oss << "key" << std::setw(4) << std::setfill('0') << i;
- const std::string key(oss.str());
- const std::string value = rnd.HumanReadableString(kValueSize);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- }
- // Verify full data set
- VerifyDB(data);
- SyncPoint::GetInstance()->SetCallBack(
- io_failure_cases[case_num], [&](void * /*arg*/) {
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- });
- SyncPoint::GetInstance()->EnableProcessing();
- auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_TRUE(s.IsIOError());
- // Reactivate file system to allow test to verify and close DB.
- fault_injection_env_->SetFilesystemActive(true);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // Verify full data set after compaction failure
- VerifyDB(data);
- delete options.compaction_filter;
- Destroy();
- }
- }
- // Test comapction filter should remove any expired blob index.
- TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
- constexpr size_t kNumKeys = 100;
- constexpr size_t kNumPuts = 1000;
- constexpr uint64_t kMaxExpiration = 1000;
- constexpr uint64_t kCompactTime = 500;
- constexpr uint64_t kMinBlobSize = 100;
- Random rnd(301);
- mock_clock_->SetCurrentTime(0);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = kMinBlobSize;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, std::string> data_after_compact;
- for (size_t i = 0; i < kNumPuts; i++) {
- bool is_small_value = rnd.Next() % 2;
- bool has_ttl = rnd.Next() % 2;
- uint64_t expiration = rnd.Next() % kMaxExpiration;
- int len = is_small_value ? 10 : 200;
- std::string key = "key" + std::to_string(rnd.Next() % kNumKeys);
- std::string value = rnd.HumanReadableString(len);
- if (!has_ttl) {
- if (is_small_value) {
- std::string blob_entry;
- BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
- // Fake blob index with TTL. See what it will do.
- ASSERT_GT(kMinBlobSize, blob_entry.size());
- value = blob_entry;
- }
- ASSERT_OK(Put(key, value));
- data_after_compact[key] = value;
- } else {
- ASSERT_OK(PutUntil(key, value, expiration));
- if (expiration <= kCompactTime) {
- data_after_compact.erase(key);
- } else {
- data_after_compact[key] = value;
- }
- }
- data[key] = value;
- }
- VerifyDB(data);
- mock_clock_->SetCurrentTime(kCompactTime);
- // Take a snapshot before compaction. Make sure expired blob indexes is
- // filtered regardless of snapshot.
- const Snapshot *snapshot = blob_db_->GetSnapshot();
- // Issue manual compaction to trigger compaction filter.
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- blob_db_->ReleaseSnapshot(snapshot);
- // Verify expired blob index are filtered.
- std::vector<KeyVersion> versions;
- const size_t kMaxKeys = 10000;
- ASSERT_OK(GetAllKeyVersions(blob_db_, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(data_after_compact.size(), versions.size());
- for (auto &version : versions) {
- ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
- }
- VerifyDB(data_after_compact);
- }
- // Test compaction filter should remove any blob index where corresponding
- // blob file has been removed.
- TEST_F(BlobDBTest, FilterFileNotAvailable) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.disable_auto_compactions = true;
- Open(bdb_options, options);
- ASSERT_OK(Put("foo", "v1"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- ASSERT_OK(Put("bar", "v2"));
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
- const size_t kMaxKeys = 10000;
- DB *base_db = blob_db_->GetRootDB();
- std::vector<KeyVersion> versions;
- ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(2, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- ASSERT_EQ("foo", versions[1].user_key);
- VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(2, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- ASSERT_EQ("foo", versions[1].user_key);
- VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
- // Remove the first blob file and compact. foo should be remove from base db.
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(1, versions.size());
- ASSERT_EQ("bar", versions[0].user_key);
- VerifyDB({{"bar", "v2"}});
- // Remove the second blob file and compact. bar should be remove from base db.
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
- ASSERT_EQ(0, versions.size());
- VerifyDB({});
- }
- // Test compaction filter should filter any inlined TTL keys that would have
- // been dropped by last FIFO eviction if they are store out-of-line.
- TEST_F(BlobDBTest, FilterForFIFOEviction) {
- Random rnd(215);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 100;
- bdb_options.ttl_range_secs = 60;
- bdb_options.max_db_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- // Use mock env to stop wall clock.
- mock_clock_->SetCurrentTime(0);
- options.env = mock_env_.get();
- auto statistics = CreateDBStatistics();
- options.statistics = statistics;
- options.disable_auto_compactions = true;
- Open(bdb_options, options);
- SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
- "BlobDBTest.FilterForFIFOEviction:AfterFlush"}});
- SyncPoint::GetInstance()->EnableProcessing();
- std::map<std::string, std::string> data;
- std::map<std::string, std::string> data_after_compact;
- // Insert some small values that will be inlined.
- for (int i = 0; i < 1000; i++) {
- std::string key = "key" + std::to_string(i);
- std::string value = rnd.HumanReadableString(50);
- uint64_t ttl = rnd.Next() % 120 + 1;
- ASSERT_OK(PutWithTTL(key, value, ttl, &data));
- if (ttl >= 60) {
- data_after_compact[key] = value;
- }
- }
- uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- TEST_SYNC_POINT("BlobDBTest.FilterForFIFOEviction:AfterFlush");
- uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
- ASSERT_GT(live_sst_size, 0);
- VerifyDB(data);
- bdb_options.max_db_size = live_sst_size + 30000;
- bdb_options.is_fifo = true;
- Reopen(bdb_options, options);
- VerifyDB(data);
- // Put two large values, each on a different blob file.
- std::string large_value(10000, 'v');
- ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
- ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- data["large_key1"] = large_value;
- data["large_key2"] = large_value;
- VerifyDB(data);
- // Put a third large value which will bring the DB out of space.
- // FIFO eviction will evict the file of large_key1.
- ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- data.erase("large_key1");
- data["large_key3"] = large_value;
- VerifyDB(data);
- // Putting some more small values. These values shouldn't be evicted by
- // compaction filter since they are inserted after FIFO eviction.
- ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
- ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
- // FIFO eviction doesn't trigger again since there enough room for the flush.
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- // Manual compact and check if compaction filter evict those keys with
- // expiration < 60.
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // All keys with expiration < 60, plus large_key1 is filtered by
- // compaction filter.
- ASSERT_EQ(num_keys_to_evict + 1,
- statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
- ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- data_after_compact["large_key2"] = large_value;
- data_after_compact["large_key3"] = large_value;
- VerifyDB(data_after_compact);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(BlobDBTest, GarbageCollection) {
- constexpr size_t kNumPuts = 1 << 10;
- constexpr uint64_t kExpiration = 1000;
- constexpr uint64_t kCompactTime = 500;
- constexpr uint64_t kKeySize = 7; // "key" + 4 digits
- constexpr uint64_t kSmallValueSize = 1 << 6;
- constexpr uint64_t kLargeValueSize = 1 << 8;
- constexpr uint64_t kMinBlobSize = 1 << 7;
- static_assert(kSmallValueSize < kMinBlobSize);
- static_assert(kLargeValueSize > kMinBlobSize);
- constexpr size_t kBlobsPerFile = 8;
- constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
- constexpr uint64_t kBlobFileSize =
- BlobLogHeader::kSize +
- (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = kMinBlobSize;
- bdb_options.blob_file_size = kBlobFileSize;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 0.25;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- options.statistics = CreateDBStatistics();
- Open(bdb_options, options);
- std::map<std::string, std::string> data;
- std::map<std::string, KeyVersion> blob_value_versions;
- std::map<std::string, BlobIndexVersion> blob_index_versions;
- Random rnd(301);
- // Add a bunch of large non-TTL values. These will be written to non-TTL
- // blob files and will be subject to GC.
- for (size_t i = 0; i < kNumPuts; ++i) {
- std::ostringstream oss;
- oss << "key" << std::setw(4) << std::setfill('0') << i;
- const std::string key(oss.str());
- const std::string value = rnd.HumanReadableString(kLargeValueSize);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] =
- BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
- sequence, kTypeBlobIndex);
- }
- // Add some small and/or TTL values that will be ignored during GC.
- // First, add a large TTL value will be written to its own TTL blob file.
- {
- const std::string key("key2000");
- const std::string value = rnd.HumanReadableString(kLargeValueSize);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(PutUntil(key, value, kExpiration));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] =
- BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
- sequence, kTypeBlobIndex);
- }
- // Now add a small TTL value (which will be inlined).
- {
- const std::string key("key3000");
- const std::string value = rnd.HumanReadableString(kSmallValueSize);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(PutUntil(key, value, kExpiration));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
- blob_index_versions[key] = BlobIndexVersion(
- key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
- }
- // Finally, add a small non-TTL value (which will be stored as a regular
- // value).
- {
- const std::string key("key4000");
- const std::string value = rnd.HumanReadableString(kSmallValueSize);
- const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
- ASSERT_OK(Put(key, value));
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
- data[key] = value;
- blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
- blob_index_versions[key] = BlobIndexVersion(
- key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
- }
- VerifyDB(data);
- VerifyBaseDB(blob_value_versions);
- VerifyBaseDBBlobIndex(blob_index_versions);
- // At this point, we should have 128 immutable non-TTL files with file numbers
- // 1..128.
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
- for (size_t i = 0; i < kNumBlobFiles; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- ASSERT_EQ(live_imm_files[i]->GetFileSize(),
- kBlobFileSize + BlobLogFooter::kSize);
- }
- }
- mock_clock_->SetCurrentTime(kCompactTime);
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // We expect the data to remain the same and the blobs from the oldest N files
- // to be moved to new files. Sequence numbers get zeroed out during the
- // compaction.
- VerifyDB(data);
- for (auto &pair : blob_value_versions) {
- KeyVersion &version = pair.second;
- version.sequence = 0;
- }
- VerifyBaseDB(blob_value_versions);
- const uint64_t cutoff = static_cast<uint64_t>(
- bdb_options.garbage_collection_cutoff * kNumBlobFiles);
- for (auto &pair : blob_index_versions) {
- BlobIndexVersion &version = pair.second;
- version.sequence = 0;
- if (version.file_number == kInvalidBlobFileNumber) {
- continue;
- }
- if (version.file_number > cutoff) {
- continue;
- }
- version.file_number += kNumBlobFiles + 1;
- }
- VerifyBaseDBBlobIndex(blob_index_versions);
- const Statistics *const statistics = options.statistics.get();
- assert(statistics);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
- cutoff * kBlobsPerFile);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
- cutoff * kBlobsPerFile * kLargeValueSize);
- // At this point, we should have 128 immutable non-TTL files with file numbers
- // 33..128 and 130..161. (129 was taken by the TTL blob file.)
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
- for (size_t i = 0; i < kNumBlobFiles; ++i) {
- uint64_t expected_file_number = i + cutoff + 1;
- if (expected_file_number > kNumBlobFiles) {
- ++expected_file_number;
- }
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
- ASSERT_EQ(live_imm_files[i]->GetFileSize(),
- kBlobFileSize + BlobLogFooter::kSize);
- }
- }
- }
- TEST_F(BlobDBTest, GarbageCollectionFailure) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.enable_garbage_collection = true;
- bdb_options.garbage_collection_cutoff = 1.0;
- bdb_options.disable_background_tasks = true;
- Options db_options;
- db_options.statistics = CreateDBStatistics();
- Open(bdb_options, db_options);
- // Write a couple of valid blobs.
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Put("dead", "beef"));
- // Write a fake blob reference into the base DB that points to a non-existing
- // blob file.
- std::string blob_index;
- BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
- /* size */ 5678, kNoCompression);
- WriteBatch batch;
- ASSERT_OK(WriteBatchInternal::PutBlobIndex(
- &batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index));
- ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
- auto blob_file = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
- ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
- .IsIOError());
- const Statistics *const statistics = db_options.statistics.get();
- assert(statistics);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
- ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
- }
- // File should be evicted after expiration.
- TEST_F(BlobDBTest, EvictExpiredFile) {
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 100;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Options options;
- options.env = mock_env_.get();
- Open(bdb_options, options);
- mock_clock_->SetCurrentTime(50);
- std::map<std::string, std::string> data;
- ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_FALSE(blob_file->Immutable());
- ASSERT_FALSE(blob_file->Obsolete());
- VerifyDB(data);
- mock_clock_->SetCurrentTime(250);
- // The key should expired now.
- blob_db_impl()->TEST_EvictExpiredFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- ASSERT_TRUE(blob_file->Immutable());
- ASSERT_TRUE(blob_file->Obsolete());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
- // Make sure we don't return garbage value after blob file being evicted,
- // but the blob index still exists in the LSM tree.
- std::string val;
- ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
- ASSERT_EQ("", val);
- }
- TEST_F(BlobDBTest, DisableFileDeletions) {
- BlobDBOptions bdb_options;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::map<std::string, std::string> data;
- ASSERT_OK(Put("foo", "v", &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
- blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- // Call DisableFileDeletions twice.
- ASSERT_OK(blob_db_->DisableFileDeletions());
- ASSERT_OK(blob_db_->DisableFileDeletions());
- // File deletions should be disabled.
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB(data);
- // Enable file deletions once. File deletion will later get enabled when
- // `EnableFileDeletions` called for a second time.
- ASSERT_OK(blob_db_->EnableFileDeletions());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB(data);
- // Call EnableFileDeletions a second time.
- ASSERT_OK(blob_db_->EnableFileDeletions());
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- // File should be deleted by now.
- ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
- ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB({});
- }
- TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
- BlobDBOptions bdb_options;
- bdb_options.enable_garbage_collection = true;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- // Register some dummy blob files.
- blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
- blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
- blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
- blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
- blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
- // Initialize the blob <-> SST file mapping. First, add some SST files with
- // blob file references, then some without.
- std::vector<LiveFileMetaData> live_files;
- for (uint64_t i = 1; i <= 10; ++i) {
- LiveFileMetaData live_file;
- live_file.file_number = i;
- live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
- live_files.emplace_back(live_file);
- }
- for (uint64_t i = 11; i <= 20; ++i) {
- LiveFileMetaData live_file;
- live_file.file_number = i;
- live_files.emplace_back(live_file);
- }
- blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
- // Check that the blob <-> SST mappings have been correctly initialized.
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 5);
- {
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- {
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a flush where the SST does not reference any blob files.
- {
- FlushJobInfo info{};
- info.file_number = 21;
- info.smallest_seqno = 1;
- info.largest_seqno = 100;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a flush where the SST references a blob file.
- {
- FlushJobInfo info{};
- info.file_number = 22;
- info.oldest_blob_file_number = 5;
- info.smallest_seqno = 101;
- info.largest_seqno = 200;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{false, false, false, false,
- false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 5);
- for (size_t i = 0; i < 5; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
- }
- ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
- }
- // Simulate a compaction. Some inputs and outputs have blob file references,
- // some don't. There is also a trivial move (which means the SST appears on
- // both the input and the output list). Blob file 1 loses all its linked SSTs,
- // and since it got marked immutable at sequence number 200 which has already
- // been flushed, it can be marked obsolete.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
- info.input_file_infos.emplace_back(
- CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
- info.output_file_infos.emplace_back(
- CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate a failed compaction. No mappings should be updated.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
- info.status = Status::Corruption();
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate another compaction. Blob file 2 loses all its linked SSTs
- // but since it got marked immutable at sequence number 300 which hasn't
- // been flushed yet, it cannot be marked obsolete at this point.
- {
- CompactionJobInfo info{};
- info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
- info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
- info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
- blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{true, false, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 4);
- for (size_t i = 0; i < 4; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 1);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- }
- // Simulate a flush with largest sequence number 300. This will make it
- // possible to mark blob file 2 obsolete.
- {
- FlushJobInfo info{};
- info.file_number = 26;
- info.smallest_seqno = 201;
- info.largest_seqno = 300;
- blob_db_impl()->TEST_ProcessFlushJobInfo(info);
- const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
- {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
- const std::vector<bool> expected_obsolete{true, true, false, false, false};
- for (size_t i = 0; i < 5; ++i) {
- const auto &blob_file = blob_files[i];
- ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
- ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
- }
- auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
- ASSERT_EQ(live_imm_files.size(), 3);
- for (size_t i = 0; i < 3; ++i) {
- ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
- }
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(obsolete_files.size(), 2);
- ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
- ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
- }
- }
- TEST_F(BlobDBTest, ShutdownWait) {
- BlobDBOptions bdb_options;
- bdb_options.ttl_range_secs = 100;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = false;
- Options options;
- options.env = mock_env_.get();
- SyncPoint::GetInstance()->LoadDependency({
- {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
- {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
- {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
- {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
- });
- // Force all tasks to be scheduled immediately.
- SyncPoint::GetInstance()->SetCallBack(
- "TimeQueue::Add:item.end", [&](void *arg) {
- std::chrono::steady_clock::time_point *tp =
- static_cast<std::chrono::steady_clock::time_point *>(arg);
- *tp =
- std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
- });
- SyncPoint::GetInstance()->SetCallBack(
- "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
- // Sleep 3 ms to increase the chance of data race.
- // We've synced up the code so that EvictExpiredFiles()
- // is called concurrently with ~BlobDBImpl().
- // ~BlobDBImpl() is supposed to wait for all background
- // task to shutdown before doing anything else. In order
- // to use the same test to reproduce a bug of the waiting
- // logic, we wait a little bit here, so that TSAN can
- // catch the data race.
- // We should improve the test if we find a better way.
- Env::Default()->SleepForMicroseconds(3000);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Open(bdb_options, options);
- mock_clock_->SetCurrentTime(50);
- std::map<std::string, std::string> data;
- ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- auto blob_file = blob_files[0];
- ASSERT_FALSE(blob_file->Immutable());
- ASSERT_FALSE(blob_file->Obsolete());
- VerifyDB(data);
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
- mock_clock_->SetCurrentTime(250);
- // The key should expired now.
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
- TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
- Close();
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
- Options options;
- options.statistics = CreateDBStatistics();
- BlobDBOptions blob_options;
- blob_options.min_blob_size = 0;
- blob_options.bytes_per_sync = 1 << 20;
- blob_options.disable_background_tasks = true;
- Open(blob_options, options);
- ASSERT_OK(Put("foo", "bar"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
- }
- TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
- Options options;
- options.env = fault_injection_env_.get();
- BlobDBOptions blob_options;
- blob_options.min_blob_size = 0;
- blob_options.bytes_per_sync = 1 << 20;
- blob_options.disable_background_tasks = true;
- Open(blob_options, options);
- ASSERT_OK(Put("foo", "bar"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
- SyncPoint::GetInstance()->SetCallBack(
- "BlobLogWriter::Sync", [this](void * /* arg */) {
- fault_injection_env_->SetFilesystemActive(false, Status::IOError());
- });
- SyncPoint::GetInstance()->EnableProcessing();
- const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
- fault_injection_env_->SetFilesystemActive(true);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- ASSERT_TRUE(s.IsIOError());
- }
- } // namespace ROCKSDB_NAMESPACE::blob_db
- // A black-box test for the ttl wrapper around rocksdb
- int main(int argc, char **argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|