blob_db_test.cc 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include <algorithm>
  7. #include <chrono>
  8. #include <cstdlib>
  9. #include <iomanip>
  10. #include <map>
  11. #include <memory>
  12. #include <sstream>
  13. #include <string>
  14. #include <vector>
  15. #include "db/blob_index.h"
  16. #include "db/db_test_util.h"
  17. #include "env/composite_env_wrapper.h"
  18. #include "file/file_util.h"
  19. #include "file/sst_file_manager_impl.h"
  20. #include "port/port.h"
  21. #include "rocksdb/utilities/debug.h"
  22. #include "test_util/fault_injection_test_env.h"
  23. #include "test_util/sync_point.h"
  24. #include "test_util/testharness.h"
  25. #include "util/cast_util.h"
  26. #include "util/random.h"
  27. #include "util/string_util.h"
  28. #include "utilities/blob_db/blob_db.h"
  29. #include "utilities/blob_db/blob_db_impl.h"
  30. namespace ROCKSDB_NAMESPACE {
  31. namespace blob_db {
  32. class BlobDBTest : public testing::Test {
  33. public:
  34. const int kMaxBlobSize = 1 << 14;
  35. struct BlobIndexVersion {
  36. BlobIndexVersion() = default;
  37. BlobIndexVersion(std::string _user_key, uint64_t _file_number,
  38. uint64_t _expiration, SequenceNumber _sequence,
  39. ValueType _type)
  40. : user_key(std::move(_user_key)),
  41. file_number(_file_number),
  42. expiration(_expiration),
  43. sequence(_sequence),
  44. type(_type) {}
  45. std::string user_key;
  46. uint64_t file_number = kInvalidBlobFileNumber;
  47. uint64_t expiration = kNoExpiration;
  48. SequenceNumber sequence = 0;
  49. ValueType type = kTypeValue;
  50. };
  51. BlobDBTest()
  52. : dbname_(test::PerThreadDBPath("blob_db_test")),
  53. mock_env_(new MockTimeEnv(Env::Default())),
  54. fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
  55. blob_db_(nullptr) {
  56. Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
  57. assert(s.ok());
  58. }
  59. ~BlobDBTest() override {
  60. SyncPoint::GetInstance()->ClearAllCallBacks();
  61. Destroy();
  62. }
  63. Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
  64. Options options = Options()) {
  65. options.create_if_missing = true;
  66. return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
  67. }
  68. void Open(BlobDBOptions bdb_options = BlobDBOptions(),
  69. Options options = Options()) {
  70. ASSERT_OK(TryOpen(bdb_options, options));
  71. }
  72. void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
  73. Options options = Options()) {
  74. assert(blob_db_ != nullptr);
  75. delete blob_db_;
  76. blob_db_ = nullptr;
  77. Open(bdb_options, options);
  78. }
  79. void Close() {
  80. assert(blob_db_ != nullptr);
  81. delete blob_db_;
  82. blob_db_ = nullptr;
  83. }
  84. void Destroy() {
  85. if (blob_db_) {
  86. Options options = blob_db_->GetOptions();
  87. BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
  88. delete blob_db_;
  89. blob_db_ = nullptr;
  90. ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
  91. }
  92. }
  93. BlobDBImpl *blob_db_impl() {
  94. return reinterpret_cast<BlobDBImpl *>(blob_db_);
  95. }
  96. Status Put(const Slice &key, const Slice &value,
  97. std::map<std::string, std::string> *data = nullptr) {
  98. Status s = blob_db_->Put(WriteOptions(), key, value);
  99. if (data != nullptr) {
  100. (*data)[key.ToString()] = value.ToString();
  101. }
  102. return s;
  103. }
  104. void Delete(const std::string &key,
  105. std::map<std::string, std::string> *data = nullptr) {
  106. ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
  107. if (data != nullptr) {
  108. data->erase(key);
  109. }
  110. }
  111. Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
  112. std::map<std::string, std::string> *data = nullptr) {
  113. Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
  114. if (data != nullptr) {
  115. (*data)[key.ToString()] = value.ToString();
  116. }
  117. return s;
  118. }
  119. Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
  120. return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
  121. }
  122. void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
  123. std::map<std::string, std::string> *data = nullptr) {
  124. int len = rnd->Next() % kMaxBlobSize + 1;
  125. std::string value = test::RandomHumanReadableString(rnd, len);
  126. ASSERT_OK(
  127. blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
  128. if (data != nullptr) {
  129. (*data)[key] = value;
  130. }
  131. }
  132. void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
  133. std::map<std::string, std::string> *data = nullptr) {
  134. int len = rnd->Next() % kMaxBlobSize + 1;
  135. std::string value = test::RandomHumanReadableString(rnd, len);
  136. ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
  137. expiration));
  138. if (data != nullptr) {
  139. (*data)[key] = value;
  140. }
  141. }
  142. void PutRandom(const std::string &key, Random *rnd,
  143. std::map<std::string, std::string> *data = nullptr) {
  144. PutRandom(blob_db_, key, rnd, data);
  145. }
  146. void PutRandom(DB *db, const std::string &key, Random *rnd,
  147. std::map<std::string, std::string> *data = nullptr) {
  148. int len = rnd->Next() % kMaxBlobSize + 1;
  149. std::string value = test::RandomHumanReadableString(rnd, len);
  150. ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
  151. if (data != nullptr) {
  152. (*data)[key] = value;
  153. }
  154. }
  155. void PutRandomToWriteBatch(
  156. const std::string &key, Random *rnd, WriteBatch *batch,
  157. std::map<std::string, std::string> *data = nullptr) {
  158. int len = rnd->Next() % kMaxBlobSize + 1;
  159. std::string value = test::RandomHumanReadableString(rnd, len);
  160. ASSERT_OK(batch->Put(key, value));
  161. if (data != nullptr) {
  162. (*data)[key] = value;
  163. }
  164. }
  165. // Verify blob db contain expected data and nothing more.
  166. void VerifyDB(const std::map<std::string, std::string> &data) {
  167. VerifyDB(blob_db_, data);
  168. }
  169. void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
  170. // Verify normal Get
  171. auto* cfh = db->DefaultColumnFamily();
  172. for (auto &p : data) {
  173. PinnableSlice value_slice;
  174. ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
  175. ASSERT_EQ(p.second, value_slice.ToString());
  176. std::string value;
  177. ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
  178. ASSERT_EQ(p.second, value);
  179. }
  180. // Verify iterators
  181. Iterator *iter = db->NewIterator(ReadOptions());
  182. iter->SeekToFirst();
  183. for (auto &p : data) {
  184. ASSERT_TRUE(iter->Valid());
  185. ASSERT_EQ(p.first, iter->key().ToString());
  186. ASSERT_EQ(p.second, iter->value().ToString());
  187. iter->Next();
  188. }
  189. ASSERT_FALSE(iter->Valid());
  190. ASSERT_OK(iter->status());
  191. delete iter;
  192. }
  193. void VerifyBaseDB(
  194. const std::map<std::string, KeyVersion> &expected_versions) {
  195. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  196. DB *db = blob_db_->GetRootDB();
  197. const size_t kMaxKeys = 10000;
  198. std::vector<KeyVersion> versions;
  199. GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
  200. ASSERT_EQ(expected_versions.size(), versions.size());
  201. size_t i = 0;
  202. for (auto &key_version : expected_versions) {
  203. const KeyVersion &expected_version = key_version.second;
  204. ASSERT_EQ(expected_version.user_key, versions[i].user_key);
  205. ASSERT_EQ(expected_version.sequence, versions[i].sequence);
  206. ASSERT_EQ(expected_version.type, versions[i].type);
  207. if (versions[i].type == kTypeValue) {
  208. ASSERT_EQ(expected_version.value, versions[i].value);
  209. } else {
  210. ASSERT_EQ(kTypeBlobIndex, versions[i].type);
  211. PinnableSlice value;
  212. ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
  213. versions[i].value, &value));
  214. ASSERT_EQ(expected_version.value, value.ToString());
  215. }
  216. i++;
  217. }
  218. }
  219. void VerifyBaseDBBlobIndex(
  220. const std::map<std::string, BlobIndexVersion> &expected_versions) {
  221. const size_t kMaxKeys = 10000;
  222. std::vector<KeyVersion> versions;
  223. ASSERT_OK(
  224. GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
  225. ASSERT_EQ(versions.size(), expected_versions.size());
  226. size_t i = 0;
  227. for (const auto &expected_pair : expected_versions) {
  228. const BlobIndexVersion &expected_version = expected_pair.second;
  229. ASSERT_EQ(versions[i].user_key, expected_version.user_key);
  230. ASSERT_EQ(versions[i].sequence, expected_version.sequence);
  231. ASSERT_EQ(versions[i].type, expected_version.type);
  232. if (versions[i].type != kTypeBlobIndex) {
  233. ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
  234. ASSERT_EQ(kNoExpiration, expected_version.expiration);
  235. ++i;
  236. continue;
  237. }
  238. BlobIndex blob_index;
  239. ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
  240. const uint64_t file_number = !blob_index.IsInlined()
  241. ? blob_index.file_number()
  242. : kInvalidBlobFileNumber;
  243. ASSERT_EQ(file_number, expected_version.file_number);
  244. const uint64_t expiration =
  245. blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
  246. ASSERT_EQ(expiration, expected_version.expiration);
  247. ++i;
  248. }
  249. }
  250. void InsertBlobs() {
  251. WriteOptions wo;
  252. std::string value;
  253. Random rnd(301);
  254. for (size_t i = 0; i < 100000; i++) {
  255. uint64_t ttl = rnd.Next() % 86400;
  256. PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
  257. }
  258. for (size_t i = 0; i < 10; i++) {
  259. Delete("key" + ToString(i % 500));
  260. }
  261. }
  262. const std::string dbname_;
  263. std::unique_ptr<MockTimeEnv> mock_env_;
  264. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
  265. BlobDB *blob_db_;
  266. }; // class BlobDBTest
  267. TEST_F(BlobDBTest, Put) {
  268. Random rnd(301);
  269. BlobDBOptions bdb_options;
  270. bdb_options.min_blob_size = 0;
  271. bdb_options.disable_background_tasks = true;
  272. Open(bdb_options);
  273. std::map<std::string, std::string> data;
  274. for (size_t i = 0; i < 100; i++) {
  275. PutRandom("key" + ToString(i), &rnd, &data);
  276. }
  277. VerifyDB(data);
  278. }
  279. TEST_F(BlobDBTest, PutWithTTL) {
  280. Random rnd(301);
  281. Options options;
  282. options.env = mock_env_.get();
  283. BlobDBOptions bdb_options;
  284. bdb_options.ttl_range_secs = 1000;
  285. bdb_options.min_blob_size = 0;
  286. bdb_options.blob_file_size = 256 * 1000 * 1000;
  287. bdb_options.disable_background_tasks = true;
  288. Open(bdb_options, options);
  289. std::map<std::string, std::string> data;
  290. mock_env_->set_current_time(50);
  291. for (size_t i = 0; i < 100; i++) {
  292. uint64_t ttl = rnd.Next() % 100;
  293. PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
  294. (ttl <= 50 ? nullptr : &data));
  295. }
  296. mock_env_->set_current_time(100);
  297. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  298. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  299. ASSERT_EQ(1, blob_files.size());
  300. ASSERT_TRUE(blob_files[0]->HasTTL());
  301. ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
  302. VerifyDB(data);
  303. }
  304. TEST_F(BlobDBTest, PutUntil) {
  305. Random rnd(301);
  306. Options options;
  307. options.env = mock_env_.get();
  308. BlobDBOptions bdb_options;
  309. bdb_options.ttl_range_secs = 1000;
  310. bdb_options.min_blob_size = 0;
  311. bdb_options.blob_file_size = 256 * 1000 * 1000;
  312. bdb_options.disable_background_tasks = true;
  313. Open(bdb_options, options);
  314. std::map<std::string, std::string> data;
  315. mock_env_->set_current_time(50);
  316. for (size_t i = 0; i < 100; i++) {
  317. uint64_t expiration = rnd.Next() % 100 + 50;
  318. PutRandomUntil("key" + ToString(i), expiration, &rnd,
  319. (expiration <= 100 ? nullptr : &data));
  320. }
  321. mock_env_->set_current_time(100);
  322. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  323. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  324. ASSERT_EQ(1, blob_files.size());
  325. ASSERT_TRUE(blob_files[0]->HasTTL());
  326. ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
  327. VerifyDB(data);
  328. }
  329. TEST_F(BlobDBTest, StackableDBGet) {
  330. Random rnd(301);
  331. BlobDBOptions bdb_options;
  332. bdb_options.min_blob_size = 0;
  333. bdb_options.disable_background_tasks = true;
  334. Open(bdb_options);
  335. std::map<std::string, std::string> data;
  336. for (size_t i = 0; i < 100; i++) {
  337. PutRandom("key" + ToString(i), &rnd, &data);
  338. }
  339. for (size_t i = 0; i < 100; i++) {
  340. StackableDB *db = blob_db_;
  341. ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
  342. std::string key = "key" + ToString(i);
  343. PinnableSlice pinnable_value;
  344. ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
  345. std::string string_value;
  346. ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
  347. ASSERT_EQ(string_value, pinnable_value.ToString());
  348. ASSERT_EQ(string_value, data[key]);
  349. }
  350. }
  351. TEST_F(BlobDBTest, GetExpiration) {
  352. Options options;
  353. options.env = mock_env_.get();
  354. BlobDBOptions bdb_options;
  355. bdb_options.disable_background_tasks = true;
  356. mock_env_->set_current_time(100);
  357. Open(bdb_options, options);
  358. Put("key1", "value1");
  359. PutWithTTL("key2", "value2", 200);
  360. PinnableSlice value;
  361. uint64_t expiration;
  362. ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
  363. ASSERT_EQ("value1", value.ToString());
  364. ASSERT_EQ(kNoExpiration, expiration);
  365. ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
  366. ASSERT_EQ("value2", value.ToString());
  367. ASSERT_EQ(300 /* = 100 + 200 */, expiration);
  368. }
  369. TEST_F(BlobDBTest, GetIOError) {
  370. Options options;
  371. options.env = fault_injection_env_.get();
  372. BlobDBOptions bdb_options;
  373. bdb_options.min_blob_size = 0; // Make sure value write to blob file
  374. bdb_options.disable_background_tasks = true;
  375. Open(bdb_options, options);
  376. ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
  377. PinnableSlice value;
  378. ASSERT_OK(Put("foo", "bar"));
  379. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  380. Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
  381. ASSERT_TRUE(s.IsIOError());
  382. // Reactivate file system to allow test to close DB.
  383. fault_injection_env_->SetFilesystemActive(true);
  384. }
  385. TEST_F(BlobDBTest, PutIOError) {
  386. Options options;
  387. options.env = fault_injection_env_.get();
  388. BlobDBOptions bdb_options;
  389. bdb_options.min_blob_size = 0; // Make sure value write to blob file
  390. bdb_options.disable_background_tasks = true;
  391. Open(bdb_options, options);
  392. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  393. ASSERT_TRUE(Put("foo", "v1").IsIOError());
  394. fault_injection_env_->SetFilesystemActive(true, Status::IOError());
  395. ASSERT_OK(Put("bar", "v1"));
  396. }
  397. TEST_F(BlobDBTest, WriteBatch) {
  398. Random rnd(301);
  399. BlobDBOptions bdb_options;
  400. bdb_options.min_blob_size = 0;
  401. bdb_options.disable_background_tasks = true;
  402. Open(bdb_options);
  403. std::map<std::string, std::string> data;
  404. for (size_t i = 0; i < 100; i++) {
  405. WriteBatch batch;
  406. for (size_t j = 0; j < 10; j++) {
  407. PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
  408. }
  409. blob_db_->Write(WriteOptions(), &batch);
  410. }
  411. VerifyDB(data);
  412. }
  413. TEST_F(BlobDBTest, Delete) {
  414. Random rnd(301);
  415. BlobDBOptions bdb_options;
  416. bdb_options.min_blob_size = 0;
  417. bdb_options.disable_background_tasks = true;
  418. Open(bdb_options);
  419. std::map<std::string, std::string> data;
  420. for (size_t i = 0; i < 100; i++) {
  421. PutRandom("key" + ToString(i), &rnd, &data);
  422. }
  423. for (size_t i = 0; i < 100; i += 5) {
  424. Delete("key" + ToString(i), &data);
  425. }
  426. VerifyDB(data);
  427. }
  428. TEST_F(BlobDBTest, DeleteBatch) {
  429. Random rnd(301);
  430. BlobDBOptions bdb_options;
  431. bdb_options.min_blob_size = 0;
  432. bdb_options.disable_background_tasks = true;
  433. Open(bdb_options);
  434. for (size_t i = 0; i < 100; i++) {
  435. PutRandom("key" + ToString(i), &rnd);
  436. }
  437. WriteBatch batch;
  438. for (size_t i = 0; i < 100; i++) {
  439. batch.Delete("key" + ToString(i));
  440. }
  441. ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
  442. // DB should be empty.
  443. VerifyDB({});
  444. }
  445. TEST_F(BlobDBTest, Override) {
  446. Random rnd(301);
  447. BlobDBOptions bdb_options;
  448. bdb_options.min_blob_size = 0;
  449. bdb_options.disable_background_tasks = true;
  450. Open(bdb_options);
  451. std::map<std::string, std::string> data;
  452. for (int i = 0; i < 10000; i++) {
  453. PutRandom("key" + ToString(i), &rnd, nullptr);
  454. }
  455. // override all the keys
  456. for (int i = 0; i < 10000; i++) {
  457. PutRandom("key" + ToString(i), &rnd, &data);
  458. }
  459. VerifyDB(data);
  460. }
  461. #ifdef SNAPPY
  462. TEST_F(BlobDBTest, Compression) {
  463. Random rnd(301);
  464. BlobDBOptions bdb_options;
  465. bdb_options.min_blob_size = 0;
  466. bdb_options.disable_background_tasks = true;
  467. bdb_options.compression = CompressionType::kSnappyCompression;
  468. Open(bdb_options);
  469. std::map<std::string, std::string> data;
  470. for (size_t i = 0; i < 100; i++) {
  471. PutRandom("put-key" + ToString(i), &rnd, &data);
  472. }
  473. for (int i = 0; i < 100; i++) {
  474. WriteBatch batch;
  475. for (size_t j = 0; j < 10; j++) {
  476. PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
  477. &batch, &data);
  478. }
  479. blob_db_->Write(WriteOptions(), &batch);
  480. }
  481. VerifyDB(data);
  482. }
  483. TEST_F(BlobDBTest, DecompressAfterReopen) {
  484. Random rnd(301);
  485. BlobDBOptions bdb_options;
  486. bdb_options.min_blob_size = 0;
  487. bdb_options.disable_background_tasks = true;
  488. bdb_options.compression = CompressionType::kSnappyCompression;
  489. Open(bdb_options);
  490. std::map<std::string, std::string> data;
  491. for (size_t i = 0; i < 100; i++) {
  492. PutRandom("put-key" + ToString(i), &rnd, &data);
  493. }
  494. VerifyDB(data);
  495. bdb_options.compression = CompressionType::kNoCompression;
  496. Reopen(bdb_options);
  497. VerifyDB(data);
  498. }
  499. #endif
  500. TEST_F(BlobDBTest, MultipleWriters) {
  501. Open(BlobDBOptions());
  502. std::vector<port::Thread> workers;
  503. std::vector<std::map<std::string, std::string>> data_set(10);
  504. for (uint32_t i = 0; i < 10; i++)
  505. workers.push_back(port::Thread(
  506. [&](uint32_t id) {
  507. Random rnd(301 + id);
  508. for (int j = 0; j < 100; j++) {
  509. std::string key = "key" + ToString(id) + "_" + ToString(j);
  510. if (id < 5) {
  511. PutRandom(key, &rnd, &data_set[id]);
  512. } else {
  513. WriteBatch batch;
  514. PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
  515. blob_db_->Write(WriteOptions(), &batch);
  516. }
  517. }
  518. },
  519. i));
  520. std::map<std::string, std::string> data;
  521. for (size_t i = 0; i < 10; i++) {
  522. workers[i].join();
  523. data.insert(data_set[i].begin(), data_set[i].end());
  524. }
  525. VerifyDB(data);
  526. }
  527. TEST_F(BlobDBTest, SstFileManager) {
  528. // run the same test for Get(), MultiGet() and Iterator each.
  529. std::shared_ptr<SstFileManager> sst_file_manager(
  530. NewSstFileManager(mock_env_.get()));
  531. sst_file_manager->SetDeleteRateBytesPerSecond(1);
  532. SstFileManagerImpl *sfm =
  533. static_cast<SstFileManagerImpl *>(sst_file_manager.get());
  534. BlobDBOptions bdb_options;
  535. bdb_options.min_blob_size = 0;
  536. bdb_options.enable_garbage_collection = true;
  537. bdb_options.garbage_collection_cutoff = 1.0;
  538. Options db_options;
  539. int files_scheduled_to_delete = 0;
  540. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  541. "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
  542. assert(arg);
  543. const std::string *const file_path =
  544. static_cast<const std::string *>(arg);
  545. if (file_path->find(".blob") != std::string::npos) {
  546. ++files_scheduled_to_delete;
  547. }
  548. });
  549. SyncPoint::GetInstance()->EnableProcessing();
  550. db_options.sst_file_manager = sst_file_manager;
  551. Open(bdb_options, db_options);
  552. // Create one obselete file and clean it.
  553. blob_db_->Put(WriteOptions(), "foo", "bar");
  554. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  555. ASSERT_EQ(1, blob_files.size());
  556. std::shared_ptr<BlobFile> bfile = blob_files[0];
  557. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
  558. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  559. blob_db_impl()->TEST_DeleteObsoleteFiles();
  560. // Even if SSTFileManager is not set, DB is creating a dummy one.
  561. ASSERT_EQ(1, files_scheduled_to_delete);
  562. Destroy();
  563. // Make sure that DestroyBlobDB() also goes through delete scheduler.
  564. ASSERT_EQ(2, files_scheduled_to_delete);
  565. SyncPoint::GetInstance()->DisableProcessing();
  566. sfm->WaitForEmptyTrash();
  567. }
  568. TEST_F(BlobDBTest, SstFileManagerRestart) {
  569. int files_scheduled_to_delete = 0;
  570. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  571. "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
  572. assert(arg);
  573. const std::string *const file_path =
  574. static_cast<const std::string *>(arg);
  575. if (file_path->find(".blob") != std::string::npos) {
  576. ++files_scheduled_to_delete;
  577. }
  578. });
  579. // run the same test for Get(), MultiGet() and Iterator each.
  580. std::shared_ptr<SstFileManager> sst_file_manager(
  581. NewSstFileManager(mock_env_.get()));
  582. sst_file_manager->SetDeleteRateBytesPerSecond(1);
  583. SstFileManagerImpl *sfm =
  584. static_cast<SstFileManagerImpl *>(sst_file_manager.get());
  585. BlobDBOptions bdb_options;
  586. bdb_options.min_blob_size = 0;
  587. Options db_options;
  588. SyncPoint::GetInstance()->EnableProcessing();
  589. db_options.sst_file_manager = sst_file_manager;
  590. Open(bdb_options, db_options);
  591. std::string blob_dir = blob_db_impl()->TEST_blob_dir();
  592. blob_db_->Put(WriteOptions(), "foo", "bar");
  593. Close();
  594. // Create 3 dummy trash files under the blob_dir
  595. LegacyFileSystemWrapper fs(db_options.env);
  596. CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
  597. CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
  598. CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
  599. // Make sure that reopening the DB rescan the existing trash files
  600. Open(bdb_options, db_options);
  601. ASSERT_EQ(files_scheduled_to_delete, 2);
  602. sfm->WaitForEmptyTrash();
  603. // There should be exact one file under the blob dir now.
  604. std::vector<std::string> all_files;
  605. ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
  606. int nfiles = 0;
  607. for (const auto &f : all_files) {
  608. assert(!f.empty());
  609. if (f[0] == '.') {
  610. continue;
  611. }
  612. nfiles++;
  613. }
  614. ASSERT_EQ(nfiles, 1);
  615. SyncPoint::GetInstance()->DisableProcessing();
  616. }
  617. TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
  618. BlobDBOptions bdb_options;
  619. bdb_options.min_blob_size = 0;
  620. bdb_options.enable_garbage_collection = true;
  621. bdb_options.garbage_collection_cutoff = 1.0;
  622. bdb_options.disable_background_tasks = true;
  623. // i = when to take snapshot
  624. for (int i = 0; i < 4; i++) {
  625. Destroy();
  626. Open(bdb_options);
  627. const Snapshot *snapshot = nullptr;
  628. // First file
  629. ASSERT_OK(Put("key1", "value"));
  630. if (i == 0) {
  631. snapshot = blob_db_->GetSnapshot();
  632. }
  633. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  634. ASSERT_EQ(1, blob_files.size());
  635. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
  636. // Second file
  637. ASSERT_OK(Put("key2", "value"));
  638. if (i == 1) {
  639. snapshot = blob_db_->GetSnapshot();
  640. }
  641. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  642. ASSERT_EQ(2, blob_files.size());
  643. auto bfile = blob_files[1];
  644. ASSERT_FALSE(bfile->Immutable());
  645. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
  646. // Third file
  647. ASSERT_OK(Put("key3", "value"));
  648. if (i == 2) {
  649. snapshot = blob_db_->GetSnapshot();
  650. }
  651. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  652. ASSERT_TRUE(bfile->Obsolete());
  653. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
  654. bfile->GetObsoleteSequence());
  655. Delete("key2");
  656. if (i == 3) {
  657. snapshot = blob_db_->GetSnapshot();
  658. }
  659. ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
  660. blob_db_impl()->TEST_DeleteObsoleteFiles();
  661. if (i >= 2) {
  662. // The snapshot shouldn't see data in bfile
  663. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  664. blob_db_->ReleaseSnapshot(snapshot);
  665. } else {
  666. // The snapshot will see data in bfile, so the file shouldn't be deleted
  667. ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
  668. blob_db_->ReleaseSnapshot(snapshot);
  669. blob_db_impl()->TEST_DeleteObsoleteFiles();
  670. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  671. }
  672. }
  673. }
  674. TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
  675. Options options;
  676. options.env = mock_env_.get();
  677. mock_env_->set_current_time(0);
  678. Open(BlobDBOptions(), options);
  679. ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
  680. ColumnFamilyHandle *handle = nullptr;
  681. std::string value;
  682. std::vector<std::string> values;
  683. // The call simply pass through to base db. It should succeed.
  684. ASSERT_OK(
  685. blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
  686. ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
  687. ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
  688. .IsNotSupported());
  689. ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
  690. .IsNotSupported());
  691. WriteBatch batch;
  692. batch.Put("k1", "v1");
  693. batch.Put(handle, "k2", "v2");
  694. ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
  695. ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
  696. ASSERT_TRUE(
  697. blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
  698. auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
  699. {"k1", "k2"}, &values);
  700. ASSERT_EQ(2, statuses.size());
  701. ASSERT_TRUE(statuses[0].IsNotSupported());
  702. ASSERT_TRUE(statuses[1].IsNotSupported());
  703. ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
  704. delete handle;
  705. }
  706. TEST_F(BlobDBTest, GetLiveFilesMetaData) {
  707. Random rnd(301);
  708. BlobDBOptions bdb_options;
  709. bdb_options.blob_dir = "blob_dir";
  710. bdb_options.path_relative = true;
  711. bdb_options.min_blob_size = 0;
  712. bdb_options.disable_background_tasks = true;
  713. Open(bdb_options);
  714. std::map<std::string, std::string> data;
  715. for (size_t i = 0; i < 100; i++) {
  716. PutRandom("key" + ToString(i), &rnd, &data);
  717. }
  718. std::vector<LiveFileMetaData> metadata;
  719. blob_db_->GetLiveFilesMetaData(&metadata);
  720. ASSERT_EQ(1U, metadata.size());
  721. // Path should be relative to db_name, but begin with slash.
  722. std::string filename = "/blob_dir/000001.blob";
  723. ASSERT_EQ(filename, metadata[0].name);
  724. ASSERT_EQ(1, metadata[0].file_number);
  725. ASSERT_EQ("default", metadata[0].column_family_name);
  726. std::vector<std::string> livefile;
  727. uint64_t mfs;
  728. ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
  729. ASSERT_EQ(4U, livefile.size());
  730. ASSERT_EQ(filename, livefile[3]);
  731. VerifyDB(data);
  732. }
  733. TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
  734. constexpr size_t kNumKey = 20;
  735. constexpr size_t kNumIteration = 10;
  736. Random rnd(301);
  737. std::map<std::string, std::string> data;
  738. std::vector<bool> is_blob(kNumKey, false);
  739. // Write to plain rocksdb.
  740. Options options;
  741. options.create_if_missing = true;
  742. DB *db = nullptr;
  743. ASSERT_OK(DB::Open(options, dbname_, &db));
  744. for (size_t i = 0; i < kNumIteration; i++) {
  745. auto key_index = rnd.Next() % kNumKey;
  746. std::string key = "key" + ToString(key_index);
  747. PutRandom(db, key, &rnd, &data);
  748. }
  749. VerifyDB(db, data);
  750. delete db;
  751. db = nullptr;
  752. // Open as blob db. Verify it can read existing data.
  753. Open();
  754. VerifyDB(blob_db_, data);
  755. for (size_t i = 0; i < kNumIteration; i++) {
  756. auto key_index = rnd.Next() % kNumKey;
  757. std::string key = "key" + ToString(key_index);
  758. is_blob[key_index] = true;
  759. PutRandom(blob_db_, key, &rnd, &data);
  760. }
  761. VerifyDB(blob_db_, data);
  762. delete blob_db_;
  763. blob_db_ = nullptr;
  764. // Verify plain db return error for keys written by blob db.
  765. ASSERT_OK(DB::Open(options, dbname_, &db));
  766. std::string value;
  767. for (size_t i = 0; i < kNumKey; i++) {
  768. std::string key = "key" + ToString(i);
  769. Status s = db->Get(ReadOptions(), key, &value);
  770. if (data.count(key) == 0) {
  771. ASSERT_TRUE(s.IsNotFound());
  772. } else if (is_blob[i]) {
  773. ASSERT_TRUE(s.IsNotSupported());
  774. } else {
  775. ASSERT_OK(s);
  776. ASSERT_EQ(data[key], value);
  777. }
  778. }
  779. delete db;
  780. }
  781. // Test to verify that a NoSpace IOError Status is returned on reaching
  782. // max_db_size limit.
  783. TEST_F(BlobDBTest, OutOfSpace) {
  784. // Use mock env to stop wall clock.
  785. Options options;
  786. options.env = mock_env_.get();
  787. BlobDBOptions bdb_options;
  788. bdb_options.max_db_size = 200;
  789. bdb_options.is_fifo = false;
  790. bdb_options.disable_background_tasks = true;
  791. Open(bdb_options);
  792. // Each stored blob has an overhead of about 42 bytes currently.
  793. // So a small key + a 100 byte blob should take up ~150 bytes in the db.
  794. std::string value(100, 'v');
  795. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
  796. // Putting another blob should fail as ading it would exceed the max_db_size
  797. // limit.
  798. Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
  799. ASSERT_TRUE(s.IsIOError());
  800. ASSERT_TRUE(s.IsNoSpace());
  801. }
  802. TEST_F(BlobDBTest, FIFOEviction) {
  803. BlobDBOptions bdb_options;
  804. bdb_options.max_db_size = 200;
  805. bdb_options.blob_file_size = 100;
  806. bdb_options.is_fifo = true;
  807. bdb_options.disable_background_tasks = true;
  808. Open(bdb_options);
  809. std::atomic<int> evict_count{0};
  810. SyncPoint::GetInstance()->SetCallBack(
  811. "BlobDBImpl::EvictOldestBlobFile:Evicted",
  812. [&](void *) { evict_count++; });
  813. SyncPoint::GetInstance()->EnableProcessing();
  814. // Each stored blob has an overhead of 32 bytes currently.
  815. // So a 100 byte blob should take up 132 bytes.
  816. std::string value(100, 'v');
  817. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
  818. VerifyDB({{"key1", value}});
  819. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  820. // Adding another 100 bytes blob would take the total size to 264 bytes
  821. // (2*132). max_db_size will be exceeded
  822. // than max_db_size and trigger FIFO eviction.
  823. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
  824. ASSERT_EQ(1, evict_count);
  825. // key1 will exist until corresponding file be deleted.
  826. VerifyDB({{"key1", value}, {"key2", value}});
  827. // Adding another 100 bytes blob without TTL.
  828. ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
  829. ASSERT_EQ(2, evict_count);
  830. // key1 and key2 will exist until corresponding file be deleted.
  831. VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
  832. // The fourth blob file, without TTL.
  833. ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
  834. ASSERT_EQ(3, evict_count);
  835. VerifyDB(
  836. {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
  837. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  838. ASSERT_EQ(4, blob_files.size());
  839. ASSERT_TRUE(blob_files[0]->Obsolete());
  840. ASSERT_TRUE(blob_files[1]->Obsolete());
  841. ASSERT_TRUE(blob_files[2]->Obsolete());
  842. ASSERT_FALSE(blob_files[3]->Obsolete());
  843. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  844. ASSERT_EQ(3, obsolete_files.size());
  845. ASSERT_EQ(blob_files[0], obsolete_files[0]);
  846. ASSERT_EQ(blob_files[1], obsolete_files[1]);
  847. ASSERT_EQ(blob_files[2], obsolete_files[2]);
  848. blob_db_impl()->TEST_DeleteObsoleteFiles();
  849. obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  850. ASSERT_TRUE(obsolete_files.empty());
  851. VerifyDB({{"key4", value}});
  852. }
  853. TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
  854. Options options;
  855. BlobDBOptions bdb_options;
  856. bdb_options.max_db_size = 1000;
  857. bdb_options.blob_file_size = 5000;
  858. bdb_options.is_fifo = true;
  859. bdb_options.disable_background_tasks = true;
  860. Open(bdb_options);
  861. std::atomic<int> evict_count{0};
  862. SyncPoint::GetInstance()->SetCallBack(
  863. "BlobDBImpl::EvictOldestBlobFile:Evicted",
  864. [&](void *) { evict_count++; });
  865. SyncPoint::GetInstance()->EnableProcessing();
  866. std::string value(2000, 'v');
  867. ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
  868. ASSERT_EQ(0, evict_count);
  869. }
  870. TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
  871. BlobDBOptions bdb_options;
  872. bdb_options.is_fifo = true;
  873. bdb_options.min_blob_size = 100;
  874. bdb_options.disable_background_tasks = true;
  875. Options options;
  876. // Use mock env to stop wall clock.
  877. options.env = mock_env_.get();
  878. options.disable_auto_compactions = true;
  879. auto statistics = CreateDBStatistics();
  880. options.statistics = statistics;
  881. Open(bdb_options, options);
  882. ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
  883. std::string small_value(50, 'v');
  884. std::map<std::string, std::string> data;
  885. // Insert some data into LSM tree to make sure FIFO eviction take SST
  886. // file size into account.
  887. for (int i = 0; i < 1000; i++) {
  888. ASSERT_OK(Put("key" + ToString(i), small_value, &data));
  889. }
  890. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  891. uint64_t live_sst_size = 0;
  892. ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
  893. &live_sst_size));
  894. ASSERT_TRUE(live_sst_size > 0);
  895. ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
  896. bdb_options.max_db_size = live_sst_size + 2000;
  897. Reopen(bdb_options, options);
  898. ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
  899. std::string value_1k(1000, 'v');
  900. ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
  901. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  902. VerifyDB(data);
  903. // large_key2 evicts large_key1
  904. ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
  905. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  906. blob_db_impl()->TEST_DeleteObsoleteFiles();
  907. data.erase("large_key1");
  908. VerifyDB(data);
  909. // large_key3 get no enough space even after evicting large_key2, so it
  910. // instead return no space error.
  911. std::string value_2k(2000, 'v');
  912. ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
  913. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  914. // Verify large_key2 still exists.
  915. VerifyDB(data);
  916. }
  917. // Test flush or compaction will trigger FIFO eviction since they update
  918. // total SST file size.
  919. TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
  920. BlobDBOptions bdb_options;
  921. bdb_options.max_db_size = 1000;
  922. bdb_options.is_fifo = true;
  923. bdb_options.min_blob_size = 100;
  924. bdb_options.disable_background_tasks = true;
  925. Options options;
  926. // Use mock env to stop wall clock.
  927. options.env = mock_env_.get();
  928. auto statistics = CreateDBStatistics();
  929. options.statistics = statistics;
  930. options.compression = kNoCompression;
  931. Open(bdb_options, options);
  932. std::string value(800, 'v');
  933. ASSERT_OK(PutWithTTL("large_key", value, 60));
  934. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  935. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  936. VerifyDB({{"large_key", value}});
  937. // Insert some small keys and flush to bring DB out of space.
  938. std::map<std::string, std::string> data;
  939. for (int i = 0; i < 10; i++) {
  940. ASSERT_OK(Put("key" + ToString(i), "v", &data));
  941. }
  942. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  943. // Verify large_key is deleted by FIFO eviction.
  944. blob_db_impl()->TEST_DeleteObsoleteFiles();
  945. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  946. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  947. VerifyDB(data);
  948. }
  949. TEST_F(BlobDBTest, InlineSmallValues) {
  950. constexpr uint64_t kMaxExpiration = 1000;
  951. Random rnd(301);
  952. BlobDBOptions bdb_options;
  953. bdb_options.ttl_range_secs = kMaxExpiration;
  954. bdb_options.min_blob_size = 100;
  955. bdb_options.blob_file_size = 256 * 1000 * 1000;
  956. bdb_options.disable_background_tasks = true;
  957. Options options;
  958. options.env = mock_env_.get();
  959. mock_env_->set_current_time(0);
  960. Open(bdb_options, options);
  961. std::map<std::string, std::string> data;
  962. std::map<std::string, KeyVersion> versions;
  963. for (size_t i = 0; i < 1000; i++) {
  964. bool is_small_value = rnd.Next() % 2;
  965. bool has_ttl = rnd.Next() % 2;
  966. uint64_t expiration = rnd.Next() % kMaxExpiration;
  967. int len = is_small_value ? 50 : 200;
  968. std::string key = "key" + ToString(i);
  969. std::string value = test::RandomHumanReadableString(&rnd, len);
  970. std::string blob_index;
  971. data[key] = value;
  972. SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  973. if (!has_ttl) {
  974. ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
  975. } else {
  976. ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
  977. }
  978. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  979. versions[key] =
  980. KeyVersion(key, value, sequence,
  981. (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
  982. }
  983. VerifyDB(data);
  984. VerifyBaseDB(versions);
  985. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  986. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  987. ASSERT_EQ(2, blob_files.size());
  988. std::shared_ptr<BlobFile> non_ttl_file;
  989. std::shared_ptr<BlobFile> ttl_file;
  990. if (blob_files[0]->HasTTL()) {
  991. ttl_file = blob_files[0];
  992. non_ttl_file = blob_files[1];
  993. } else {
  994. non_ttl_file = blob_files[0];
  995. ttl_file = blob_files[1];
  996. }
  997. ASSERT_FALSE(non_ttl_file->HasTTL());
  998. ASSERT_TRUE(ttl_file->HasTTL());
  999. }
  1000. TEST_F(BlobDBTest, CompactionFilterNotSupported) {
  1001. class TestCompactionFilter : public CompactionFilter {
  1002. const char *Name() const override { return "TestCompactionFilter"; }
  1003. };
  1004. class TestCompactionFilterFactory : public CompactionFilterFactory {
  1005. const char *Name() const override { return "TestCompactionFilterFactory"; }
  1006. std::unique_ptr<CompactionFilter> CreateCompactionFilter(
  1007. const CompactionFilter::Context & /*context*/) override {
  1008. return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
  1009. }
  1010. };
  1011. for (int i = 0; i < 2; i++) {
  1012. Options options;
  1013. if (i == 0) {
  1014. options.compaction_filter = new TestCompactionFilter();
  1015. } else {
  1016. options.compaction_filter_factory.reset(
  1017. new TestCompactionFilterFactory());
  1018. }
  1019. ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
  1020. delete options.compaction_filter;
  1021. }
  1022. }
  1023. // Test comapction filter should remove any expired blob index.
  1024. TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
  1025. constexpr size_t kNumKeys = 100;
  1026. constexpr size_t kNumPuts = 1000;
  1027. constexpr uint64_t kMaxExpiration = 1000;
  1028. constexpr uint64_t kCompactTime = 500;
  1029. constexpr uint64_t kMinBlobSize = 100;
  1030. Random rnd(301);
  1031. mock_env_->set_current_time(0);
  1032. BlobDBOptions bdb_options;
  1033. bdb_options.min_blob_size = kMinBlobSize;
  1034. bdb_options.disable_background_tasks = true;
  1035. Options options;
  1036. options.env = mock_env_.get();
  1037. Open(bdb_options, options);
  1038. std::map<std::string, std::string> data;
  1039. std::map<std::string, std::string> data_after_compact;
  1040. for (size_t i = 0; i < kNumPuts; i++) {
  1041. bool is_small_value = rnd.Next() % 2;
  1042. bool has_ttl = rnd.Next() % 2;
  1043. uint64_t expiration = rnd.Next() % kMaxExpiration;
  1044. int len = is_small_value ? 10 : 200;
  1045. std::string key = "key" + ToString(rnd.Next() % kNumKeys);
  1046. std::string value = test::RandomHumanReadableString(&rnd, len);
  1047. if (!has_ttl) {
  1048. if (is_small_value) {
  1049. std::string blob_entry;
  1050. BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
  1051. // Fake blob index with TTL. See what it will do.
  1052. ASSERT_GT(kMinBlobSize, blob_entry.size());
  1053. value = blob_entry;
  1054. }
  1055. ASSERT_OK(Put(key, value));
  1056. data_after_compact[key] = value;
  1057. } else {
  1058. ASSERT_OK(PutUntil(key, value, expiration));
  1059. if (expiration <= kCompactTime) {
  1060. data_after_compact.erase(key);
  1061. } else {
  1062. data_after_compact[key] = value;
  1063. }
  1064. }
  1065. data[key] = value;
  1066. }
  1067. VerifyDB(data);
  1068. mock_env_->set_current_time(kCompactTime);
  1069. // Take a snapshot before compaction. Make sure expired blob indexes is
  1070. // filtered regardless of snapshot.
  1071. const Snapshot *snapshot = blob_db_->GetSnapshot();
  1072. // Issue manual compaction to trigger compaction filter.
  1073. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1074. blob_db_->ReleaseSnapshot(snapshot);
  1075. // Verify expired blob index are filtered.
  1076. std::vector<KeyVersion> versions;
  1077. const size_t kMaxKeys = 10000;
  1078. GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
  1079. ASSERT_EQ(data_after_compact.size(), versions.size());
  1080. for (auto &version : versions) {
  1081. ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
  1082. }
  1083. VerifyDB(data_after_compact);
  1084. }
  1085. // Test compaction filter should remove any blob index where corresponding
  1086. // blob file has been removed.
  1087. TEST_F(BlobDBTest, FilterFileNotAvailable) {
  1088. BlobDBOptions bdb_options;
  1089. bdb_options.min_blob_size = 0;
  1090. bdb_options.disable_background_tasks = true;
  1091. Options options;
  1092. options.disable_auto_compactions = true;
  1093. Open(bdb_options, options);
  1094. ASSERT_OK(Put("foo", "v1"));
  1095. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1096. ASSERT_EQ(1, blob_files.size());
  1097. ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
  1098. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
  1099. ASSERT_OK(Put("bar", "v2"));
  1100. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1101. ASSERT_EQ(2, blob_files.size());
  1102. ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
  1103. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
  1104. const size_t kMaxKeys = 10000;
  1105. DB *base_db = blob_db_->GetRootDB();
  1106. std::vector<KeyVersion> versions;
  1107. ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
  1108. ASSERT_EQ(2, versions.size());
  1109. ASSERT_EQ("bar", versions[0].user_key);
  1110. ASSERT_EQ("foo", versions[1].user_key);
  1111. VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
  1112. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1113. ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
  1114. ASSERT_EQ(2, versions.size());
  1115. ASSERT_EQ("bar", versions[0].user_key);
  1116. ASSERT_EQ("foo", versions[1].user_key);
  1117. VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
  1118. // Remove the first blob file and compact. foo should be remove from base db.
  1119. blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
  1120. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1121. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1122. ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
  1123. ASSERT_EQ(1, versions.size());
  1124. ASSERT_EQ("bar", versions[0].user_key);
  1125. VerifyDB({{"bar", "v2"}});
  1126. // Remove the second blob file and compact. bar should be remove from base db.
  1127. blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
  1128. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1129. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1130. ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
  1131. ASSERT_EQ(0, versions.size());
  1132. VerifyDB({});
  1133. }
  1134. // Test compaction filter should filter any inlined TTL keys that would have
  1135. // been dropped by last FIFO eviction if they are store out-of-line.
  1136. TEST_F(BlobDBTest, FilterForFIFOEviction) {
  1137. Random rnd(215);
  1138. BlobDBOptions bdb_options;
  1139. bdb_options.min_blob_size = 100;
  1140. bdb_options.ttl_range_secs = 60;
  1141. bdb_options.max_db_size = 0;
  1142. bdb_options.disable_background_tasks = true;
  1143. Options options;
  1144. // Use mock env to stop wall clock.
  1145. mock_env_->set_current_time(0);
  1146. options.env = mock_env_.get();
  1147. auto statistics = CreateDBStatistics();
  1148. options.statistics = statistics;
  1149. options.disable_auto_compactions = true;
  1150. Open(bdb_options, options);
  1151. std::map<std::string, std::string> data;
  1152. std::map<std::string, std::string> data_after_compact;
  1153. // Insert some small values that will be inlined.
  1154. for (int i = 0; i < 1000; i++) {
  1155. std::string key = "key" + ToString(i);
  1156. std::string value = test::RandomHumanReadableString(&rnd, 50);
  1157. uint64_t ttl = rnd.Next() % 120 + 1;
  1158. ASSERT_OK(PutWithTTL(key, value, ttl, &data));
  1159. if (ttl >= 60) {
  1160. data_after_compact[key] = value;
  1161. }
  1162. }
  1163. uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
  1164. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1165. uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
  1166. ASSERT_GT(live_sst_size, 0);
  1167. VerifyDB(data);
  1168. bdb_options.max_db_size = live_sst_size + 30000;
  1169. bdb_options.is_fifo = true;
  1170. Reopen(bdb_options, options);
  1171. VerifyDB(data);
  1172. // Put two large values, each on a different blob file.
  1173. std::string large_value(10000, 'v');
  1174. ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
  1175. ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
  1176. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  1177. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1178. data["large_key1"] = large_value;
  1179. data["large_key2"] = large_value;
  1180. VerifyDB(data);
  1181. // Put a third large value which will bring the DB out of space.
  1182. // FIFO eviction will evict the file of large_key1.
  1183. ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
  1184. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1185. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  1186. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1187. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1188. data.erase("large_key1");
  1189. data["large_key3"] = large_value;
  1190. VerifyDB(data);
  1191. // Putting some more small values. These values shouldn't be evicted by
  1192. // compaction filter since they are inserted after FIFO eviction.
  1193. ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
  1194. ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
  1195. // FIFO eviction doesn't trigger again since there enough room for the flush.
  1196. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1197. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1198. // Manual compact and check if compaction filter evict those keys with
  1199. // expiration < 60.
  1200. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1201. // All keys with expiration < 60, plus large_key1 is filtered by
  1202. // compaction filter.
  1203. ASSERT_EQ(num_keys_to_evict + 1,
  1204. statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
  1205. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1206. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1207. data_after_compact["large_key2"] = large_value;
  1208. data_after_compact["large_key3"] = large_value;
  1209. VerifyDB(data_after_compact);
  1210. }
  1211. TEST_F(BlobDBTest, GarbageCollection) {
  1212. constexpr size_t kNumPuts = 1 << 10;
  1213. constexpr uint64_t kExpiration = 1000;
  1214. constexpr uint64_t kCompactTime = 500;
  1215. constexpr uint64_t kKeySize = 7; // "key" + 4 digits
  1216. constexpr uint64_t kSmallValueSize = 1 << 6;
  1217. constexpr uint64_t kLargeValueSize = 1 << 8;
  1218. constexpr uint64_t kMinBlobSize = 1 << 7;
  1219. static_assert(kSmallValueSize < kMinBlobSize, "");
  1220. static_assert(kLargeValueSize > kMinBlobSize, "");
  1221. constexpr size_t kBlobsPerFile = 8;
  1222. constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
  1223. constexpr uint64_t kBlobFileSize =
  1224. BlobLogHeader::kSize +
  1225. (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
  1226. BlobDBOptions bdb_options;
  1227. bdb_options.min_blob_size = kMinBlobSize;
  1228. bdb_options.blob_file_size = kBlobFileSize;
  1229. bdb_options.enable_garbage_collection = true;
  1230. bdb_options.garbage_collection_cutoff = 0.25;
  1231. bdb_options.disable_background_tasks = true;
  1232. Options options;
  1233. options.env = mock_env_.get();
  1234. options.statistics = CreateDBStatistics();
  1235. Open(bdb_options, options);
  1236. std::map<std::string, std::string> data;
  1237. std::map<std::string, KeyVersion> blob_value_versions;
  1238. std::map<std::string, BlobIndexVersion> blob_index_versions;
  1239. Random rnd(301);
  1240. // Add a bunch of large non-TTL values. These will be written to non-TTL
  1241. // blob files and will be subject to GC.
  1242. for (size_t i = 0; i < kNumPuts; ++i) {
  1243. std::ostringstream oss;
  1244. oss << "key" << std::setw(4) << std::setfill('0') << i;
  1245. const std::string key(oss.str());
  1246. const std::string value(
  1247. test::RandomHumanReadableString(&rnd, kLargeValueSize));
  1248. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1249. ASSERT_OK(Put(key, value));
  1250. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1251. data[key] = value;
  1252. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1253. blob_index_versions[key] =
  1254. BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
  1255. sequence, kTypeBlobIndex);
  1256. }
  1257. // Add some small and/or TTL values that will be ignored during GC.
  1258. // First, add a large TTL value will be written to its own TTL blob file.
  1259. {
  1260. const std::string key("key2000");
  1261. const std::string value(
  1262. test::RandomHumanReadableString(&rnd, kLargeValueSize));
  1263. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1264. ASSERT_OK(PutUntil(key, value, kExpiration));
  1265. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1266. data[key] = value;
  1267. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1268. blob_index_versions[key] =
  1269. BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
  1270. sequence, kTypeBlobIndex);
  1271. }
  1272. // Now add a small TTL value (which will be inlined).
  1273. {
  1274. const std::string key("key3000");
  1275. const std::string value(
  1276. test::RandomHumanReadableString(&rnd, kSmallValueSize));
  1277. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1278. ASSERT_OK(PutUntil(key, value, kExpiration));
  1279. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1280. data[key] = value;
  1281. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1282. blob_index_versions[key] = BlobIndexVersion(
  1283. key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
  1284. }
  1285. // Finally, add a small non-TTL value (which will be stored as a regular
  1286. // value).
  1287. {
  1288. const std::string key("key4000");
  1289. const std::string value(
  1290. test::RandomHumanReadableString(&rnd, kSmallValueSize));
  1291. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1292. ASSERT_OK(Put(key, value));
  1293. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1294. data[key] = value;
  1295. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
  1296. blob_index_versions[key] = BlobIndexVersion(
  1297. key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
  1298. }
  1299. VerifyDB(data);
  1300. VerifyBaseDB(blob_value_versions);
  1301. VerifyBaseDBBlobIndex(blob_index_versions);
  1302. // At this point, we should have 128 immutable non-TTL files with file numbers
  1303. // 1..128.
  1304. {
  1305. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1306. ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
  1307. for (size_t i = 0; i < kNumBlobFiles; ++i) {
  1308. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1309. ASSERT_EQ(live_imm_files[i]->GetFileSize(),
  1310. kBlobFileSize + BlobLogFooter::kSize);
  1311. }
  1312. }
  1313. mock_env_->set_current_time(kCompactTime);
  1314. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1315. // We expect the data to remain the same and the blobs from the oldest N files
  1316. // to be moved to new files. Sequence numbers get zeroed out during the
  1317. // compaction.
  1318. VerifyDB(data);
  1319. for (auto &pair : blob_value_versions) {
  1320. KeyVersion &version = pair.second;
  1321. version.sequence = 0;
  1322. }
  1323. VerifyBaseDB(blob_value_versions);
  1324. const uint64_t cutoff = static_cast<uint64_t>(
  1325. bdb_options.garbage_collection_cutoff * kNumBlobFiles);
  1326. for (auto &pair : blob_index_versions) {
  1327. BlobIndexVersion &version = pair.second;
  1328. version.sequence = 0;
  1329. if (version.file_number == kInvalidBlobFileNumber) {
  1330. continue;
  1331. }
  1332. if (version.file_number > cutoff) {
  1333. continue;
  1334. }
  1335. version.file_number += kNumBlobFiles + 1;
  1336. }
  1337. VerifyBaseDBBlobIndex(blob_index_versions);
  1338. const Statistics *const statistics = options.statistics.get();
  1339. assert(statistics);
  1340. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
  1341. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
  1342. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
  1343. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
  1344. cutoff * kBlobsPerFile);
  1345. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
  1346. cutoff * kBlobsPerFile * kLargeValueSize);
  1347. // At this point, we should have 128 immutable non-TTL files with file numbers
  1348. // 33..128 and 130..161. (129 was taken by the TTL blob file.)
  1349. {
  1350. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1351. ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
  1352. for (size_t i = 0; i < kNumBlobFiles; ++i) {
  1353. uint64_t expected_file_number = i + cutoff + 1;
  1354. if (expected_file_number > kNumBlobFiles) {
  1355. ++expected_file_number;
  1356. }
  1357. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
  1358. ASSERT_EQ(live_imm_files[i]->GetFileSize(),
  1359. kBlobFileSize + BlobLogFooter::kSize);
  1360. }
  1361. }
  1362. }
  1363. TEST_F(BlobDBTest, GarbageCollectionFailure) {
  1364. BlobDBOptions bdb_options;
  1365. bdb_options.min_blob_size = 0;
  1366. bdb_options.enable_garbage_collection = true;
  1367. bdb_options.garbage_collection_cutoff = 1.0;
  1368. bdb_options.disable_background_tasks = true;
  1369. Options db_options;
  1370. db_options.statistics = CreateDBStatistics();
  1371. Open(bdb_options, db_options);
  1372. // Write a couple of valid blobs.
  1373. Put("foo", "bar");
  1374. Put("dead", "beef");
  1375. // Write a fake blob reference into the base DB that cannot be parsed.
  1376. WriteBatch batch;
  1377. ASSERT_OK(WriteBatchInternal::PutBlobIndex(
  1378. &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
  1379. "not a valid blob index"));
  1380. ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
  1381. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1382. ASSERT_EQ(blob_files.size(), 1);
  1383. auto blob_file = blob_files[0];
  1384. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
  1385. ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
  1386. .IsCorruption());
  1387. const Statistics *const statistics = db_options.statistics.get();
  1388. assert(statistics);
  1389. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
  1390. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
  1391. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
  1392. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
  1393. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
  1394. }
  1395. // File should be evicted after expiration.
  1396. TEST_F(BlobDBTest, EvictExpiredFile) {
  1397. BlobDBOptions bdb_options;
  1398. bdb_options.ttl_range_secs = 100;
  1399. bdb_options.min_blob_size = 0;
  1400. bdb_options.disable_background_tasks = true;
  1401. Options options;
  1402. options.env = mock_env_.get();
  1403. Open(bdb_options, options);
  1404. mock_env_->set_current_time(50);
  1405. std::map<std::string, std::string> data;
  1406. ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
  1407. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1408. ASSERT_EQ(1, blob_files.size());
  1409. auto blob_file = blob_files[0];
  1410. ASSERT_FALSE(blob_file->Immutable());
  1411. ASSERT_FALSE(blob_file->Obsolete());
  1412. VerifyDB(data);
  1413. mock_env_->set_current_time(250);
  1414. // The key should expired now.
  1415. blob_db_impl()->TEST_EvictExpiredFiles();
  1416. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1417. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1418. ASSERT_TRUE(blob_file->Immutable());
  1419. ASSERT_TRUE(blob_file->Obsolete());
  1420. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1421. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  1422. ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1423. // Make sure we don't return garbage value after blob file being evicted,
  1424. // but the blob index still exists in the LSM tree.
  1425. std::string val = "";
  1426. ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
  1427. ASSERT_EQ("", val);
  1428. }
  1429. TEST_F(BlobDBTest, DisableFileDeletions) {
  1430. BlobDBOptions bdb_options;
  1431. bdb_options.disable_background_tasks = true;
  1432. Open(bdb_options);
  1433. std::map<std::string, std::string> data;
  1434. for (bool force : {true, false}) {
  1435. ASSERT_OK(Put("foo", "v", &data));
  1436. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1437. ASSERT_EQ(1, blob_files.size());
  1438. auto blob_file = blob_files[0];
  1439. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
  1440. blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
  1441. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1442. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1443. // Call DisableFileDeletions twice.
  1444. ASSERT_OK(blob_db_->DisableFileDeletions());
  1445. ASSERT_OK(blob_db_->DisableFileDeletions());
  1446. // File deletions should be disabled.
  1447. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1448. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1449. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1450. VerifyDB(data);
  1451. // Enable file deletions once. If force=true, file deletion is enabled.
  1452. // Otherwise it needs to enable it for a second time.
  1453. ASSERT_OK(blob_db_->EnableFileDeletions(force));
  1454. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1455. if (!force) {
  1456. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1457. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1458. VerifyDB(data);
  1459. // Call EnableFileDeletions a second time.
  1460. ASSERT_OK(blob_db_->EnableFileDeletions(false));
  1461. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1462. }
  1463. // Regardless of value of `force`, file should be deleted by now.
  1464. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  1465. ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1466. VerifyDB({});
  1467. }
  1468. }
  1469. TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
  1470. BlobDBOptions bdb_options;
  1471. bdb_options.enable_garbage_collection = true;
  1472. bdb_options.disable_background_tasks = true;
  1473. Open(bdb_options);
  1474. // Register some dummy blob files.
  1475. blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
  1476. blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
  1477. blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
  1478. blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
  1479. blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
  1480. // Initialize the blob <-> SST file mapping. First, add some SST files with
  1481. // blob file references, then some without.
  1482. std::vector<LiveFileMetaData> live_files;
  1483. for (uint64_t i = 1; i <= 10; ++i) {
  1484. LiveFileMetaData live_file;
  1485. live_file.file_number = i;
  1486. live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
  1487. live_files.emplace_back(live_file);
  1488. }
  1489. for (uint64_t i = 11; i <= 20; ++i) {
  1490. LiveFileMetaData live_file;
  1491. live_file.file_number = i;
  1492. live_files.emplace_back(live_file);
  1493. }
  1494. blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
  1495. // Check that the blob <-> SST mappings have been correctly initialized.
  1496. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1497. ASSERT_EQ(blob_files.size(), 5);
  1498. {
  1499. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1500. ASSERT_EQ(live_imm_files.size(), 5);
  1501. for (size_t i = 0; i < 5; ++i) {
  1502. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1503. }
  1504. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1505. }
  1506. {
  1507. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1508. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
  1509. const std::vector<bool> expected_obsolete{false, false, false, false,
  1510. false};
  1511. for (size_t i = 0; i < 5; ++i) {
  1512. const auto &blob_file = blob_files[i];
  1513. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1514. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1515. }
  1516. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1517. ASSERT_EQ(live_imm_files.size(), 5);
  1518. for (size_t i = 0; i < 5; ++i) {
  1519. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1520. }
  1521. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1522. }
  1523. // Simulate a flush where the SST does not reference any blob files.
  1524. {
  1525. FlushJobInfo info{};
  1526. info.file_number = 21;
  1527. info.smallest_seqno = 1;
  1528. info.largest_seqno = 100;
  1529. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1530. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1531. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
  1532. const std::vector<bool> expected_obsolete{false, false, false, false,
  1533. false};
  1534. for (size_t i = 0; i < 5; ++i) {
  1535. const auto &blob_file = blob_files[i];
  1536. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1537. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1538. }
  1539. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1540. ASSERT_EQ(live_imm_files.size(), 5);
  1541. for (size_t i = 0; i < 5; ++i) {
  1542. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1543. }
  1544. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1545. }
  1546. // Simulate a flush where the SST references a blob file.
  1547. {
  1548. FlushJobInfo info{};
  1549. info.file_number = 22;
  1550. info.oldest_blob_file_number = 5;
  1551. info.smallest_seqno = 101;
  1552. info.largest_seqno = 200;
  1553. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1554. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1555. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
  1556. const std::vector<bool> expected_obsolete{false, false, false, false,
  1557. false};
  1558. for (size_t i = 0; i < 5; ++i) {
  1559. const auto &blob_file = blob_files[i];
  1560. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1561. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1562. }
  1563. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1564. ASSERT_EQ(live_imm_files.size(), 5);
  1565. for (size_t i = 0; i < 5; ++i) {
  1566. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1567. }
  1568. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1569. }
  1570. // Simulate a compaction. Some inputs and outputs have blob file references,
  1571. // some don't. There is also a trivial move (which means the SST appears on
  1572. // both the input and the output list). Blob file 1 loses all its linked SSTs,
  1573. // and since it got marked immutable at sequence number 200 which has already
  1574. // been flushed, it can be marked obsolete.
  1575. {
  1576. CompactionJobInfo info{};
  1577. info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
  1578. info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
  1579. info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
  1580. info.input_file_infos.emplace_back(
  1581. CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
  1582. info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
  1583. info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1584. info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
  1585. info.output_file_infos.emplace_back(
  1586. CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
  1587. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1588. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1589. {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
  1590. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1591. for (size_t i = 0; i < 5; ++i) {
  1592. const auto &blob_file = blob_files[i];
  1593. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1594. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1595. }
  1596. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1597. ASSERT_EQ(live_imm_files.size(), 4);
  1598. for (size_t i = 0; i < 4; ++i) {
  1599. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1600. }
  1601. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1602. ASSERT_EQ(obsolete_files.size(), 1);
  1603. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1604. }
  1605. // Simulate a failed compaction. No mappings should be updated.
  1606. {
  1607. CompactionJobInfo info{};
  1608. info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
  1609. info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1610. info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
  1611. info.status = Status::Corruption();
  1612. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1613. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1614. {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
  1615. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1616. for (size_t i = 0; i < 5; ++i) {
  1617. const auto &blob_file = blob_files[i];
  1618. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1619. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1620. }
  1621. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1622. ASSERT_EQ(live_imm_files.size(), 4);
  1623. for (size_t i = 0; i < 4; ++i) {
  1624. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1625. }
  1626. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1627. ASSERT_EQ(obsolete_files.size(), 1);
  1628. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1629. }
  1630. // Simulate another compaction. Blob file 2 loses all its linked SSTs
  1631. // but since it got marked immutable at sequence number 300 which hasn't
  1632. // been flushed yet, it cannot be marked obsolete at this point.
  1633. {
  1634. CompactionJobInfo info{};
  1635. info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
  1636. info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1637. info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
  1638. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1639. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1640. {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
  1641. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1642. for (size_t i = 0; i < 5; ++i) {
  1643. const auto &blob_file = blob_files[i];
  1644. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1645. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1646. }
  1647. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1648. ASSERT_EQ(live_imm_files.size(), 4);
  1649. for (size_t i = 0; i < 4; ++i) {
  1650. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1651. }
  1652. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1653. ASSERT_EQ(obsolete_files.size(), 1);
  1654. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1655. }
  1656. // Simulate a flush with largest sequence number 300. This will make it
  1657. // possible to mark blob file 2 obsolete.
  1658. {
  1659. FlushJobInfo info{};
  1660. info.file_number = 26;
  1661. info.smallest_seqno = 201;
  1662. info.largest_seqno = 300;
  1663. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1664. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1665. {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
  1666. const std::vector<bool> expected_obsolete{true, true, false, false, false};
  1667. for (size_t i = 0; i < 5; ++i) {
  1668. const auto &blob_file = blob_files[i];
  1669. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1670. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1671. }
  1672. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1673. ASSERT_EQ(live_imm_files.size(), 3);
  1674. for (size_t i = 0; i < 3; ++i) {
  1675. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
  1676. }
  1677. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1678. ASSERT_EQ(obsolete_files.size(), 2);
  1679. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1680. ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
  1681. }
  1682. }
  1683. TEST_F(BlobDBTest, ShutdownWait) {
  1684. BlobDBOptions bdb_options;
  1685. bdb_options.ttl_range_secs = 100;
  1686. bdb_options.min_blob_size = 0;
  1687. bdb_options.disable_background_tasks = false;
  1688. Options options;
  1689. options.env = mock_env_.get();
  1690. SyncPoint::GetInstance()->LoadDependency({
  1691. {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
  1692. {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
  1693. {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
  1694. {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
  1695. });
  1696. // Force all tasks to be scheduled immediately.
  1697. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1698. "TimeQueue::Add:item.end", [&](void *arg) {
  1699. std::chrono::steady_clock::time_point *tp =
  1700. static_cast<std::chrono::steady_clock::time_point *>(arg);
  1701. *tp =
  1702. std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
  1703. });
  1704. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1705. "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
  1706. // Sleep 3 ms to increase the chance of data race.
  1707. // We've synced up the code so that EvictExpiredFiles()
  1708. // is called concurrently with ~BlobDBImpl().
  1709. // ~BlobDBImpl() is supposed to wait for all background
  1710. // task to shutdown before doing anything else. In order
  1711. // to use the same test to reproduce a bug of the waiting
  1712. // logic, we wait a little bit here, so that TSAN can
  1713. // catch the data race.
  1714. // We should improve the test if we find a better way.
  1715. Env::Default()->SleepForMicroseconds(3000);
  1716. });
  1717. SyncPoint::GetInstance()->EnableProcessing();
  1718. Open(bdb_options, options);
  1719. mock_env_->set_current_time(50);
  1720. std::map<std::string, std::string> data;
  1721. ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
  1722. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1723. ASSERT_EQ(1, blob_files.size());
  1724. auto blob_file = blob_files[0];
  1725. ASSERT_FALSE(blob_file->Immutable());
  1726. ASSERT_FALSE(blob_file->Obsolete());
  1727. VerifyDB(data);
  1728. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
  1729. mock_env_->set_current_time(250);
  1730. // The key should expired now.
  1731. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
  1732. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
  1733. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
  1734. Close();
  1735. SyncPoint::GetInstance()->DisableProcessing();
  1736. }
  1737. } // namespace blob_db
  1738. } // namespace ROCKSDB_NAMESPACE
  1739. // A black-box test for the ttl wrapper around rocksdb
  1740. int main(int argc, char** argv) {
  1741. ::testing::InitGoogleTest(&argc, argv);
  1742. return RUN_ALL_TESTS();
  1743. }
  1744. #else
  1745. #include <stdio.h>
  1746. int main(int /*argc*/, char** /*argv*/) {
  1747. fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
  1748. return 0;
  1749. }
  1750. #endif // !ROCKSDB_LITE