blob_db_test.cc 85 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/blob_db/blob_db.h"
  6. #include <algorithm>
  7. #include <chrono>
  8. #include <cstdlib>
  9. #include <iomanip>
  10. #include <map>
  11. #include <memory>
  12. #include <sstream>
  13. #include <string>
  14. #include <vector>
  15. #include "db/blob/blob_index.h"
  16. #include "db/db_test_util.h"
  17. #include "env/composite_env_wrapper.h"
  18. #include "file/file_util.h"
  19. #include "file/sst_file_manager_impl.h"
  20. #include "port/port.h"
  21. #include "rocksdb/utilities/debug.h"
  22. #include "test_util/mock_time_env.h"
  23. #include "test_util/sync_point.h"
  24. #include "test_util/testharness.h"
  25. #include "util/random.h"
  26. #include "util/string_util.h"
  27. #include "utilities/blob_db/blob_db_impl.h"
  28. #include "utilities/fault_injection_env.h"
  29. namespace ROCKSDB_NAMESPACE::blob_db {
  30. class BlobDBTest : public testing::Test {
  31. public:
  32. const int kMaxBlobSize = 1 << 14;
  33. struct BlobIndexVersion {
  34. BlobIndexVersion() = default;
  35. BlobIndexVersion(std::string _user_key, uint64_t _file_number,
  36. uint64_t _expiration, SequenceNumber _sequence,
  37. ValueType _type)
  38. : user_key(std::move(_user_key)),
  39. file_number(_file_number),
  40. expiration(_expiration),
  41. sequence(_sequence),
  42. type(_type) {}
  43. std::string user_key;
  44. uint64_t file_number = kInvalidBlobFileNumber;
  45. uint64_t expiration = kNoExpiration;
  46. SequenceNumber sequence = 0;
  47. ValueType type = kTypeValue;
  48. };
  49. BlobDBTest()
  50. : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
  51. mock_clock_ = std::make_shared<MockSystemClock>(SystemClock::Default());
  52. mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_));
  53. fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default()));
  54. Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
  55. assert(s.ok());
  56. }
  57. ~BlobDBTest() override {
  58. SyncPoint::GetInstance()->ClearAllCallBacks();
  59. Destroy();
  60. }
  61. Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
  62. Options options = Options()) {
  63. options.create_if_missing = true;
  64. if (options.env == mock_env_.get()) {
  65. // Need to disable stats dumping and persisting which also use
  66. // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
  67. // With mocked time, this can hang on some platforms (MacOS)
  68. // because (a) on some platforms, pthread_cond_timedwait does not appear
  69. // to release the lock for other threads to operate if the deadline time
  70. // is already passed, and (b) TimedWait calls are currently a bad
  71. // abstraction because the deadline parameter is usually computed from
  72. // Env time, but is interpreted in real clock time.
  73. options.stats_dump_period_sec = 0;
  74. options.stats_persist_period_sec = 0;
  75. }
  76. return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
  77. }
  78. void Open(BlobDBOptions bdb_options = BlobDBOptions(),
  79. Options options = Options()) {
  80. ASSERT_OK(TryOpen(bdb_options, options));
  81. }
  82. void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
  83. Options options = Options()) {
  84. assert(blob_db_ != nullptr);
  85. delete blob_db_;
  86. blob_db_ = nullptr;
  87. Open(bdb_options, options);
  88. }
  89. void Close() {
  90. assert(blob_db_ != nullptr);
  91. delete blob_db_;
  92. blob_db_ = nullptr;
  93. }
  94. void Destroy() {
  95. if (blob_db_) {
  96. Options options = blob_db_->GetOptions();
  97. BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
  98. delete blob_db_;
  99. blob_db_ = nullptr;
  100. ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
  101. }
  102. }
  103. BlobDBImpl *blob_db_impl() { return static_cast<BlobDBImpl *>(blob_db_); }
  104. Status Put(const Slice &key, const Slice &value,
  105. std::map<std::string, std::string> *data = nullptr) {
  106. Status s = blob_db_->Put(WriteOptions(), key, value);
  107. if (data != nullptr) {
  108. (*data)[key.ToString()] = value.ToString();
  109. }
  110. return s;
  111. }
  112. void Delete(const std::string &key,
  113. std::map<std::string, std::string> *data = nullptr) {
  114. ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
  115. if (data != nullptr) {
  116. data->erase(key);
  117. }
  118. }
  119. Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
  120. std::map<std::string, std::string> *data = nullptr) {
  121. Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
  122. if (data != nullptr) {
  123. (*data)[key.ToString()] = value.ToString();
  124. }
  125. return s;
  126. }
  127. Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
  128. return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
  129. }
  130. void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
  131. std::map<std::string, std::string> *data = nullptr) {
  132. int len = rnd->Next() % kMaxBlobSize + 1;
  133. std::string value = rnd->HumanReadableString(len);
  134. ASSERT_OK(
  135. blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
  136. if (data != nullptr) {
  137. (*data)[key] = value;
  138. }
  139. }
  140. void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
  141. std::map<std::string, std::string> *data = nullptr) {
  142. int len = rnd->Next() % kMaxBlobSize + 1;
  143. std::string value = rnd->HumanReadableString(len);
  144. ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
  145. expiration));
  146. if (data != nullptr) {
  147. (*data)[key] = value;
  148. }
  149. }
  150. void PutRandom(const std::string &key, Random *rnd,
  151. std::map<std::string, std::string> *data = nullptr) {
  152. PutRandom(blob_db_, key, rnd, data);
  153. }
  154. void PutRandom(DB *db, const std::string &key, Random *rnd,
  155. std::map<std::string, std::string> *data = nullptr) {
  156. int len = rnd->Next() % kMaxBlobSize + 1;
  157. std::string value = rnd->HumanReadableString(len);
  158. ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
  159. if (data != nullptr) {
  160. (*data)[key] = value;
  161. }
  162. }
  163. void PutRandomToWriteBatch(
  164. const std::string &key, Random *rnd, WriteBatch *batch,
  165. std::map<std::string, std::string> *data = nullptr) {
  166. int len = rnd->Next() % kMaxBlobSize + 1;
  167. std::string value = rnd->HumanReadableString(len);
  168. ASSERT_OK(batch->Put(key, value));
  169. if (data != nullptr) {
  170. (*data)[key] = value;
  171. }
  172. }
  173. // Verify blob db contain expected data and nothing more.
  174. void VerifyDB(const std::map<std::string, std::string> &data) {
  175. VerifyDB(blob_db_, data);
  176. }
  177. void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
  178. // Verify normal Get
  179. auto *cfh = db->DefaultColumnFamily();
  180. for (auto &p : data) {
  181. PinnableSlice value_slice;
  182. ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
  183. ASSERT_EQ(p.second, value_slice.ToString());
  184. std::string value;
  185. ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
  186. ASSERT_EQ(p.second, value);
  187. }
  188. // Verify iterators
  189. Iterator *iter = db->NewIterator(ReadOptions());
  190. iter->SeekToFirst();
  191. for (auto &p : data) {
  192. ASSERT_TRUE(iter->Valid());
  193. ASSERT_EQ(p.first, iter->key().ToString());
  194. ASSERT_EQ(p.second, iter->value().ToString());
  195. iter->Next();
  196. }
  197. ASSERT_FALSE(iter->Valid());
  198. ASSERT_OK(iter->status());
  199. delete iter;
  200. }
  201. void VerifyBaseDB(
  202. const std::map<std::string, KeyVersion> &expected_versions) {
  203. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  204. DB *db = blob_db_->GetRootDB();
  205. const size_t kMaxKeys = 10000;
  206. std::vector<KeyVersion> versions;
  207. ASSERT_OK(GetAllKeyVersions(db, {}, {}, kMaxKeys, &versions));
  208. ASSERT_EQ(expected_versions.size(), versions.size());
  209. size_t i = 0;
  210. for (auto &key_version : expected_versions) {
  211. const KeyVersion &expected_version = key_version.second;
  212. ASSERT_EQ(expected_version.user_key, versions[i].user_key);
  213. ASSERT_EQ(expected_version.sequence, versions[i].sequence);
  214. ASSERT_EQ(expected_version.type, versions[i].type);
  215. if (versions[i].type == kTypeValue) {
  216. ASSERT_EQ(expected_version.value, versions[i].value);
  217. } else {
  218. ASSERT_EQ(kTypeBlobIndex, versions[i].type);
  219. PinnableSlice value;
  220. ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
  221. versions[i].value, &value));
  222. ASSERT_EQ(expected_version.value, value.ToString());
  223. }
  224. i++;
  225. }
  226. }
  227. void VerifyBaseDBBlobIndex(
  228. const std::map<std::string, BlobIndexVersion> &expected_versions) {
  229. const size_t kMaxKeys = 10000;
  230. std::vector<KeyVersion> versions;
  231. ASSERT_OK(
  232. GetAllKeyVersions(blob_db_->GetRootDB(), {}, {}, kMaxKeys, &versions));
  233. ASSERT_EQ(versions.size(), expected_versions.size());
  234. size_t i = 0;
  235. for (const auto &expected_pair : expected_versions) {
  236. const BlobIndexVersion &expected_version = expected_pair.second;
  237. ASSERT_EQ(versions[i].user_key, expected_version.user_key);
  238. ASSERT_EQ(versions[i].sequence, expected_version.sequence);
  239. ASSERT_EQ(versions[i].type, expected_version.type);
  240. if (versions[i].type != kTypeBlobIndex) {
  241. ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
  242. ASSERT_EQ(kNoExpiration, expected_version.expiration);
  243. ++i;
  244. continue;
  245. }
  246. BlobIndex blob_index;
  247. ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
  248. const uint64_t file_number = !blob_index.IsInlined()
  249. ? blob_index.file_number()
  250. : kInvalidBlobFileNumber;
  251. ASSERT_EQ(file_number, expected_version.file_number);
  252. const uint64_t expiration =
  253. blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
  254. ASSERT_EQ(expiration, expected_version.expiration);
  255. ++i;
  256. }
  257. }
  258. void InsertBlobs() {
  259. WriteOptions wo;
  260. std::string value;
  261. Random rnd(301);
  262. for (size_t i = 0; i < 100000; i++) {
  263. uint64_t ttl = rnd.Next() % 86400;
  264. PutRandomWithTTL("key" + std::to_string(i % 500), ttl, &rnd, nullptr);
  265. }
  266. for (size_t i = 0; i < 10; i++) {
  267. Delete("key" + std::to_string(i % 500));
  268. }
  269. }
  270. const std::string dbname_;
  271. std::shared_ptr<MockSystemClock> mock_clock_;
  272. std::unique_ptr<Env> mock_env_;
  273. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
  274. BlobDB *blob_db_;
  275. }; // class BlobDBTest
  276. TEST_F(BlobDBTest, Put) {
  277. Random rnd(301);
  278. BlobDBOptions bdb_options;
  279. bdb_options.min_blob_size = 0;
  280. bdb_options.disable_background_tasks = true;
  281. Open(bdb_options);
  282. std::map<std::string, std::string> data;
  283. for (size_t i = 0; i < 100; i++) {
  284. PutRandom("key" + std::to_string(i), &rnd, &data);
  285. }
  286. VerifyDB(data);
  287. }
  288. TEST_F(BlobDBTest, PutWithTTL) {
  289. Random rnd(301);
  290. Options options;
  291. options.env = mock_env_.get();
  292. BlobDBOptions bdb_options;
  293. bdb_options.ttl_range_secs = 1000;
  294. bdb_options.min_blob_size = 0;
  295. bdb_options.blob_file_size = 256 * 1000 * 1000;
  296. bdb_options.disable_background_tasks = true;
  297. Open(bdb_options, options);
  298. std::map<std::string, std::string> data;
  299. mock_clock_->SetCurrentTime(50);
  300. for (size_t i = 0; i < 100; i++) {
  301. uint64_t ttl = rnd.Next() % 100;
  302. PutRandomWithTTL("key" + std::to_string(i), ttl, &rnd,
  303. (ttl <= 50 ? nullptr : &data));
  304. }
  305. mock_clock_->SetCurrentTime(100);
  306. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  307. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  308. ASSERT_EQ(1, blob_files.size());
  309. ASSERT_TRUE(blob_files[0]->HasTTL());
  310. ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
  311. VerifyDB(data);
  312. }
  313. TEST_F(BlobDBTest, PutUntil) {
  314. Random rnd(301);
  315. Options options;
  316. options.env = mock_env_.get();
  317. BlobDBOptions bdb_options;
  318. bdb_options.ttl_range_secs = 1000;
  319. bdb_options.min_blob_size = 0;
  320. bdb_options.blob_file_size = 256 * 1000 * 1000;
  321. bdb_options.disable_background_tasks = true;
  322. Open(bdb_options, options);
  323. std::map<std::string, std::string> data;
  324. mock_clock_->SetCurrentTime(50);
  325. for (size_t i = 0; i < 100; i++) {
  326. uint64_t expiration = rnd.Next() % 100 + 50;
  327. PutRandomUntil("key" + std::to_string(i), expiration, &rnd,
  328. (expiration <= 100 ? nullptr : &data));
  329. }
  330. mock_clock_->SetCurrentTime(100);
  331. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  332. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  333. ASSERT_EQ(1, blob_files.size());
  334. ASSERT_TRUE(blob_files[0]->HasTTL());
  335. ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
  336. VerifyDB(data);
  337. }
  338. TEST_F(BlobDBTest, StackableDBGet) {
  339. Random rnd(301);
  340. BlobDBOptions bdb_options;
  341. bdb_options.min_blob_size = 0;
  342. bdb_options.disable_background_tasks = true;
  343. Open(bdb_options);
  344. std::map<std::string, std::string> data;
  345. for (size_t i = 0; i < 100; i++) {
  346. PutRandom("key" + std::to_string(i), &rnd, &data);
  347. }
  348. for (size_t i = 0; i < 100; i++) {
  349. StackableDB *db = blob_db_;
  350. ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
  351. std::string key = "key" + std::to_string(i);
  352. PinnableSlice pinnable_value;
  353. ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
  354. std::string string_value;
  355. ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
  356. ASSERT_EQ(string_value, pinnable_value.ToString());
  357. ASSERT_EQ(string_value, data[key]);
  358. }
  359. }
  360. TEST_F(BlobDBTest, GetExpiration) {
  361. Options options;
  362. options.env = mock_env_.get();
  363. BlobDBOptions bdb_options;
  364. bdb_options.disable_background_tasks = true;
  365. mock_clock_->SetCurrentTime(100);
  366. Open(bdb_options, options);
  367. ASSERT_OK(Put("key1", "value1"));
  368. ASSERT_OK(PutWithTTL("key2", "value2", 200));
  369. PinnableSlice value;
  370. uint64_t expiration;
  371. ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
  372. ASSERT_EQ("value1", value.ToString());
  373. ASSERT_EQ(kNoExpiration, expiration);
  374. ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
  375. ASSERT_EQ("value2", value.ToString());
  376. ASSERT_EQ(300 /* = 100 + 200 */, expiration);
  377. }
  378. TEST_F(BlobDBTest, GetIOError) {
  379. Options options;
  380. options.env = fault_injection_env_.get();
  381. BlobDBOptions bdb_options;
  382. bdb_options.min_blob_size = 0; // Make sure value write to blob file
  383. bdb_options.disable_background_tasks = true;
  384. Open(bdb_options, options);
  385. ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
  386. PinnableSlice value;
  387. ASSERT_OK(Put("foo", "bar"));
  388. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  389. Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
  390. ASSERT_TRUE(s.IsIOError());
  391. // Reactivate file system to allow test to close DB.
  392. fault_injection_env_->SetFilesystemActive(true);
  393. }
  394. TEST_F(BlobDBTest, PutIOError) {
  395. Options options;
  396. options.env = fault_injection_env_.get();
  397. BlobDBOptions bdb_options;
  398. bdb_options.min_blob_size = 0; // Make sure value write to blob file
  399. bdb_options.disable_background_tasks = true;
  400. Open(bdb_options, options);
  401. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  402. ASSERT_TRUE(Put("foo", "v1").IsIOError());
  403. fault_injection_env_->SetFilesystemActive(true, Status::IOError());
  404. ASSERT_OK(Put("bar", "v1"));
  405. }
  406. TEST_F(BlobDBTest, WriteBatch) {
  407. Random rnd(301);
  408. BlobDBOptions bdb_options;
  409. bdb_options.min_blob_size = 0;
  410. bdb_options.disable_background_tasks = true;
  411. Open(bdb_options);
  412. std::map<std::string, std::string> data;
  413. for (size_t i = 0; i < 100; i++) {
  414. WriteBatch batch;
  415. for (size_t j = 0; j < 10; j++) {
  416. PutRandomToWriteBatch("key" + std::to_string(j * 100 + i), &rnd, &batch,
  417. &data);
  418. }
  419. ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
  420. }
  421. VerifyDB(data);
  422. }
  423. TEST_F(BlobDBTest, Delete) {
  424. Random rnd(301);
  425. BlobDBOptions bdb_options;
  426. bdb_options.min_blob_size = 0;
  427. bdb_options.disable_background_tasks = true;
  428. Open(bdb_options);
  429. std::map<std::string, std::string> data;
  430. for (size_t i = 0; i < 100; i++) {
  431. PutRandom("key" + std::to_string(i), &rnd, &data);
  432. }
  433. for (size_t i = 0; i < 100; i += 5) {
  434. Delete("key" + std::to_string(i), &data);
  435. }
  436. VerifyDB(data);
  437. }
  438. TEST_F(BlobDBTest, DeleteBatch) {
  439. Random rnd(301);
  440. BlobDBOptions bdb_options;
  441. bdb_options.min_blob_size = 0;
  442. bdb_options.disable_background_tasks = true;
  443. Open(bdb_options);
  444. for (size_t i = 0; i < 100; i++) {
  445. PutRandom("key" + std::to_string(i), &rnd);
  446. }
  447. WriteBatch batch;
  448. for (size_t i = 0; i < 100; i++) {
  449. ASSERT_OK(batch.Delete("key" + std::to_string(i)));
  450. }
  451. ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
  452. // DB should be empty.
  453. VerifyDB({});
  454. }
  455. TEST_F(BlobDBTest, Override) {
  456. Random rnd(301);
  457. BlobDBOptions bdb_options;
  458. bdb_options.min_blob_size = 0;
  459. bdb_options.disable_background_tasks = true;
  460. Open(bdb_options);
  461. std::map<std::string, std::string> data;
  462. for (int i = 0; i < 10000; i++) {
  463. PutRandom("key" + std::to_string(i), &rnd, nullptr);
  464. }
  465. // override all the keys
  466. for (int i = 0; i < 10000; i++) {
  467. PutRandom("key" + std::to_string(i), &rnd, &data);
  468. }
  469. VerifyDB(data);
  470. }
  471. #ifdef SNAPPY
  472. TEST_F(BlobDBTest, Compression) {
  473. Random rnd(301);
  474. BlobDBOptions bdb_options;
  475. bdb_options.min_blob_size = 0;
  476. bdb_options.disable_background_tasks = true;
  477. bdb_options.compression = CompressionType::kSnappyCompression;
  478. Open(bdb_options);
  479. std::map<std::string, std::string> data;
  480. for (size_t i = 0; i < 100; i++) {
  481. PutRandom("put-key" + std::to_string(i), &rnd, &data);
  482. }
  483. for (int i = 0; i < 100; i++) {
  484. WriteBatch batch;
  485. for (size_t j = 0; j < 10; j++) {
  486. PutRandomToWriteBatch("write-batch-key" + std::to_string(j * 100 + i),
  487. &rnd, &batch, &data);
  488. }
  489. ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
  490. }
  491. VerifyDB(data);
  492. }
  493. TEST_F(BlobDBTest, DecompressAfterReopen) {
  494. Random rnd(301);
  495. BlobDBOptions bdb_options;
  496. bdb_options.min_blob_size = 0;
  497. bdb_options.disable_background_tasks = true;
  498. bdb_options.compression = CompressionType::kSnappyCompression;
  499. Open(bdb_options);
  500. std::map<std::string, std::string> data;
  501. for (size_t i = 0; i < 100; i++) {
  502. PutRandom("put-key" + std::to_string(i), &rnd, &data);
  503. }
  504. VerifyDB(data);
  505. bdb_options.compression = CompressionType::kNoCompression;
  506. Reopen(bdb_options);
  507. VerifyDB(data);
  508. }
  509. TEST_F(BlobDBTest, EnableDisableCompressionGC) {
  510. Random rnd(301);
  511. BlobDBOptions bdb_options;
  512. bdb_options.min_blob_size = 0;
  513. bdb_options.garbage_collection_cutoff = 1.0;
  514. bdb_options.disable_background_tasks = true;
  515. bdb_options.compression = kSnappyCompression;
  516. Open(bdb_options);
  517. std::map<std::string, std::string> data;
  518. size_t data_idx = 0;
  519. for (; data_idx < 100; data_idx++) {
  520. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  521. }
  522. VerifyDB(data);
  523. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  524. ASSERT_EQ(1, blob_files.size());
  525. ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
  526. // disable compression
  527. bdb_options.compression = kNoCompression;
  528. Reopen(bdb_options);
  529. // Add more data with new compression type
  530. for (; data_idx < 200; data_idx++) {
  531. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  532. }
  533. VerifyDB(data);
  534. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  535. ASSERT_EQ(2, blob_files.size());
  536. ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
  537. // Enable GC. If we do it earlier the snapshot release triggered compaction
  538. // may compact files and trigger GC before we can verify there are two files.
  539. bdb_options.enable_garbage_collection = true;
  540. Reopen(bdb_options);
  541. // Trigger compaction
  542. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  543. blob_db_impl()->TEST_DeleteObsoleteFiles();
  544. VerifyDB(data);
  545. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  546. for (const auto &bfile : blob_files) {
  547. ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
  548. }
  549. // enabling the compression again
  550. bdb_options.compression = kSnappyCompression;
  551. Reopen(bdb_options);
  552. // Add more data with new compression type
  553. for (; data_idx < 300; data_idx++) {
  554. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  555. }
  556. VerifyDB(data);
  557. // Trigger compaction
  558. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  559. blob_db_impl()->TEST_DeleteObsoleteFiles();
  560. VerifyDB(data);
  561. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  562. for (const auto &bfile : blob_files) {
  563. ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
  564. }
  565. }
  566. #ifdef LZ4
  567. // Test switch compression types and run GC, it needs both Snappy and LZ4
  568. // support.
  569. TEST_F(BlobDBTest, ChangeCompressionGC) {
  570. Random rnd(301);
  571. BlobDBOptions bdb_options;
  572. bdb_options.min_blob_size = 0;
  573. bdb_options.garbage_collection_cutoff = 1.0;
  574. bdb_options.disable_background_tasks = true;
  575. bdb_options.compression = kLZ4Compression;
  576. Open(bdb_options);
  577. std::map<std::string, std::string> data;
  578. size_t data_idx = 0;
  579. for (; data_idx < 100; data_idx++) {
  580. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  581. }
  582. VerifyDB(data);
  583. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  584. ASSERT_EQ(1, blob_files.size());
  585. ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
  586. // Change compression type
  587. bdb_options.compression = kSnappyCompression;
  588. Reopen(bdb_options);
  589. // Add more data with Snappy compression type
  590. for (; data_idx < 200; data_idx++) {
  591. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  592. }
  593. VerifyDB(data);
  594. // Verify blob file compression type
  595. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  596. ASSERT_EQ(2, blob_files.size());
  597. ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
  598. // Enable GC. If we do it earlier the snapshot release triggered compaction
  599. // may compact files and trigger GC before we can verify there are two files.
  600. bdb_options.enable_garbage_collection = true;
  601. Reopen(bdb_options);
  602. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  603. VerifyDB(data);
  604. blob_db_impl()->TEST_DeleteObsoleteFiles();
  605. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  606. for (const auto &bfile : blob_files) {
  607. ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
  608. }
  609. // Disable compression
  610. bdb_options.compression = kNoCompression;
  611. Reopen(bdb_options);
  612. for (; data_idx < 300; data_idx++) {
  613. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  614. }
  615. VerifyDB(data);
  616. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  617. VerifyDB(data);
  618. blob_db_impl()->TEST_DeleteObsoleteFiles();
  619. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  620. for (const auto &bfile : blob_files) {
  621. ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
  622. }
  623. // switching different compression types to generate mixed compression types
  624. bdb_options.compression = kSnappyCompression;
  625. Reopen(bdb_options);
  626. for (; data_idx < 400; data_idx++) {
  627. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  628. }
  629. VerifyDB(data);
  630. bdb_options.compression = kLZ4Compression;
  631. Reopen(bdb_options);
  632. for (; data_idx < 500; data_idx++) {
  633. PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
  634. }
  635. VerifyDB(data);
  636. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  637. VerifyDB(data);
  638. blob_db_impl()->TEST_DeleteObsoleteFiles();
  639. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  640. for (const auto &bfile : blob_files) {
  641. ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
  642. }
  643. }
  644. #endif // LZ4
  645. #endif // SNAPPY
  646. TEST_F(BlobDBTest, MultipleWriters) {
  647. Open(BlobDBOptions());
  648. std::vector<port::Thread> workers;
  649. std::vector<std::map<std::string, std::string>> data_set(10);
  650. for (uint32_t i = 0; i < 10; i++) {
  651. workers.emplace_back(
  652. [&](uint32_t id) {
  653. Random rnd(301 + id);
  654. for (int j = 0; j < 100; j++) {
  655. std::string key =
  656. "key" + std::to_string(id) + "_" + std::to_string(j);
  657. if (id < 5) {
  658. PutRandom(key, &rnd, &data_set[id]);
  659. } else {
  660. WriteBatch batch;
  661. PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
  662. ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
  663. }
  664. }
  665. },
  666. i);
  667. }
  668. std::map<std::string, std::string> data;
  669. for (size_t i = 0; i < 10; i++) {
  670. workers[i].join();
  671. data.insert(data_set[i].begin(), data_set[i].end());
  672. }
  673. VerifyDB(data);
  674. }
  675. TEST_F(BlobDBTest, SstFileManager) {
  676. // run the same test for Get(), MultiGet() and Iterator each.
  677. std::shared_ptr<SstFileManager> sst_file_manager(
  678. NewSstFileManager(mock_env_.get()));
  679. sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
  680. SstFileManagerImpl *sfm =
  681. static_cast<SstFileManagerImpl *>(sst_file_manager.get());
  682. BlobDBOptions bdb_options;
  683. bdb_options.min_blob_size = 0;
  684. bdb_options.enable_garbage_collection = true;
  685. bdb_options.garbage_collection_cutoff = 1.0;
  686. Options db_options;
  687. int files_scheduled_to_delete = 0;
  688. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  689. "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
  690. assert(arg);
  691. const std::string *const file_path =
  692. static_cast<const std::string *>(arg);
  693. if (file_path->find(".blob") != std::string::npos) {
  694. ++files_scheduled_to_delete;
  695. }
  696. });
  697. SyncPoint::GetInstance()->EnableProcessing();
  698. db_options.sst_file_manager = sst_file_manager;
  699. Open(bdb_options, db_options);
  700. // Create one obselete file and clean it.
  701. ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
  702. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  703. ASSERT_EQ(1, blob_files.size());
  704. std::shared_ptr<BlobFile> bfile = blob_files[0];
  705. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
  706. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  707. blob_db_impl()->TEST_DeleteObsoleteFiles();
  708. // Even if SSTFileManager is not set, DB is creating a dummy one.
  709. ASSERT_EQ(1, files_scheduled_to_delete);
  710. Destroy();
  711. // Make sure that DestroyBlobDB() also goes through delete scheduler.
  712. ASSERT_EQ(2, files_scheduled_to_delete);
  713. SyncPoint::GetInstance()->DisableProcessing();
  714. sfm->WaitForEmptyTrash();
  715. }
  716. TEST_F(BlobDBTest, SstFileManagerRestart) {
  717. int files_scheduled_to_delete = 0;
  718. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  719. "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
  720. assert(arg);
  721. const std::string *const file_path =
  722. static_cast<const std::string *>(arg);
  723. if (file_path->find(".blob") != std::string::npos) {
  724. ++files_scheduled_to_delete;
  725. }
  726. });
  727. // run the same test for Get(), MultiGet() and Iterator each.
  728. std::shared_ptr<SstFileManager> sst_file_manager(
  729. NewSstFileManager(mock_env_.get()));
  730. sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
  731. SstFileManagerImpl *sfm =
  732. static_cast<SstFileManagerImpl *>(sst_file_manager.get());
  733. BlobDBOptions bdb_options;
  734. bdb_options.min_blob_size = 0;
  735. Options db_options;
  736. SyncPoint::GetInstance()->EnableProcessing();
  737. db_options.sst_file_manager = sst_file_manager;
  738. Open(bdb_options, db_options);
  739. std::string blob_dir = blob_db_impl()->TEST_blob_dir();
  740. ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
  741. Close();
  742. // Create 3 dummy trash files under the blob_dir
  743. const auto &fs = db_options.env->GetFileSystem();
  744. ASSERT_OK(CreateFile(fs, blob_dir + "/000666.blob.trash", "", false));
  745. ASSERT_OK(CreateFile(fs, blob_dir + "/000888.blob.trash", "", true));
  746. ASSERT_OK(CreateFile(fs, blob_dir + "/something_not_match.trash", "", false));
  747. // Make sure that reopening the DB rescan the existing trash files
  748. Open(bdb_options, db_options);
  749. ASSERT_EQ(files_scheduled_to_delete, 2);
  750. sfm->WaitForEmptyTrash();
  751. // There should be exact one file under the blob dir now.
  752. std::vector<std::string> all_files;
  753. ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
  754. int nfiles = 0;
  755. for (const auto &f : all_files) {
  756. assert(!f.empty());
  757. if (f[0] == '.') {
  758. continue;
  759. }
  760. nfiles++;
  761. }
  762. ASSERT_EQ(nfiles, 1);
  763. SyncPoint::GetInstance()->DisableProcessing();
  764. }
  765. TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
  766. BlobDBOptions bdb_options;
  767. bdb_options.min_blob_size = 0;
  768. bdb_options.enable_garbage_collection = true;
  769. bdb_options.garbage_collection_cutoff = 1.0;
  770. bdb_options.disable_background_tasks = true;
  771. Options options;
  772. options.disable_auto_compactions = true;
  773. // i = when to take snapshot
  774. for (int i = 0; i < 4; i++) {
  775. Destroy();
  776. Open(bdb_options, options);
  777. const Snapshot *snapshot = nullptr;
  778. // First file
  779. ASSERT_OK(Put("key1", "value"));
  780. if (i == 0) {
  781. snapshot = blob_db_->GetSnapshot();
  782. }
  783. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  784. ASSERT_EQ(1, blob_files.size());
  785. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
  786. // Second file
  787. ASSERT_OK(Put("key2", "value"));
  788. if (i == 1) {
  789. snapshot = blob_db_->GetSnapshot();
  790. }
  791. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  792. ASSERT_EQ(2, blob_files.size());
  793. auto bfile = blob_files[1];
  794. ASSERT_FALSE(bfile->Immutable());
  795. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
  796. // Third file
  797. ASSERT_OK(Put("key3", "value"));
  798. if (i == 2) {
  799. snapshot = blob_db_->GetSnapshot();
  800. }
  801. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  802. ASSERT_TRUE(bfile->Obsolete());
  803. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
  804. bfile->GetObsoleteSequence());
  805. Delete("key2");
  806. if (i == 3) {
  807. snapshot = blob_db_->GetSnapshot();
  808. }
  809. ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
  810. blob_db_impl()->TEST_DeleteObsoleteFiles();
  811. if (i >= 2) {
  812. // The snapshot shouldn't see data in bfile
  813. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  814. blob_db_->ReleaseSnapshot(snapshot);
  815. } else {
  816. // The snapshot will see data in bfile, so the file shouldn't be deleted
  817. ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
  818. blob_db_->ReleaseSnapshot(snapshot);
  819. blob_db_impl()->TEST_DeleteObsoleteFiles();
  820. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  821. }
  822. }
  823. }
  824. TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
  825. Options options;
  826. options.env = mock_env_.get();
  827. mock_clock_->SetCurrentTime(0);
  828. Open(BlobDBOptions(), options);
  829. ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
  830. ColumnFamilyHandle *handle = nullptr;
  831. std::string value;
  832. std::vector<std::string> values;
  833. // The call simply pass through to base db. It should succeed.
  834. ASSERT_OK(
  835. blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
  836. ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
  837. ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
  838. .IsNotSupported());
  839. ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
  840. .IsNotSupported());
  841. WriteBatch batch;
  842. ASSERT_OK(batch.Put("k1", "v1"));
  843. ASSERT_OK(batch.Put(handle, "k2", "v2"));
  844. ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
  845. ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
  846. ASSERT_TRUE(
  847. blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
  848. auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
  849. {"k1", "k2"}, &values);
  850. ASSERT_EQ(2, statuses.size());
  851. ASSERT_TRUE(statuses[0].IsNotSupported());
  852. ASSERT_TRUE(statuses[1].IsNotSupported());
  853. ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
  854. delete handle;
  855. }
  856. TEST_F(BlobDBTest, GetLiveFilesMetaData) {
  857. Random rnd(301);
  858. BlobDBOptions bdb_options;
  859. bdb_options.blob_dir = "blob_dir";
  860. bdb_options.path_relative = true;
  861. bdb_options.ttl_range_secs = 10;
  862. bdb_options.min_blob_size = 0;
  863. bdb_options.disable_background_tasks = true;
  864. Options options;
  865. options.env = mock_env_.get();
  866. Open(bdb_options, options);
  867. std::map<std::string, std::string> data;
  868. for (size_t i = 0; i < 100; i++) {
  869. PutRandom("key" + std::to_string(i), &rnd, &data);
  870. }
  871. constexpr uint64_t expiration = 1000ULL;
  872. PutRandomUntil("key100", expiration, &rnd, &data);
  873. std::vector<LiveFileMetaData> metadata;
  874. blob_db_->GetLiveFilesMetaData(&metadata);
  875. ASSERT_EQ(2U, metadata.size());
  876. // Path should be relative to db_name, but begin with slash.
  877. const std::string filename1("/blob_dir/000001.blob");
  878. ASSERT_EQ(filename1, metadata[0].name);
  879. ASSERT_EQ(1, metadata[0].file_number);
  880. ASSERT_EQ(0, metadata[0].oldest_ancester_time);
  881. ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
  882. const std::string filename2("/blob_dir/000002.blob");
  883. ASSERT_EQ(filename2, metadata[1].name);
  884. ASSERT_EQ(2, metadata[1].file_number);
  885. ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
  886. ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
  887. std::vector<std::string> livefile;
  888. uint64_t mfs;
  889. ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
  890. ASSERT_EQ(5U, livefile.size());
  891. ASSERT_EQ(filename1, livefile[3]);
  892. ASSERT_EQ(filename2, livefile[4]);
  893. std::vector<LiveFileStorageInfo> all_files, blob_files;
  894. ASSERT_OK(blob_db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(),
  895. &all_files));
  896. for (size_t i = 0; i < all_files.size(); i++) {
  897. if (all_files[i].file_type == kBlobFile) {
  898. blob_files.push_back(all_files[i]);
  899. }
  900. }
  901. ASSERT_EQ(2U, blob_files.size());
  902. ASSERT_GT(all_files.size(), blob_files.size());
  903. ASSERT_EQ("000001.blob", blob_files[0].relative_filename);
  904. ASSERT_EQ(blob_db_impl()->TEST_blob_dir(), blob_files[0].directory);
  905. ASSERT_GT(blob_files[0].size, 0);
  906. ASSERT_EQ("000002.blob", blob_files[1].relative_filename);
  907. ASSERT_EQ(blob_db_impl()->TEST_blob_dir(), blob_files[1].directory);
  908. ASSERT_GT(blob_files[1].size, 0);
  909. VerifyDB(data);
  910. }
  911. TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
  912. constexpr size_t kNumKey = 20;
  913. constexpr size_t kNumIteration = 10;
  914. Random rnd(301);
  915. std::map<std::string, std::string> data;
  916. std::vector<bool> is_blob(kNumKey, false);
  917. // Write to plain rocksdb.
  918. Options options;
  919. options.create_if_missing = true;
  920. DB *db = nullptr;
  921. ASSERT_OK(DB::Open(options, dbname_, &db));
  922. for (size_t i = 0; i < kNumIteration; i++) {
  923. auto key_index = rnd.Next() % kNumKey;
  924. std::string key = "key" + std::to_string(key_index);
  925. PutRandom(db, key, &rnd, &data);
  926. }
  927. VerifyDB(db, data);
  928. delete db;
  929. db = nullptr;
  930. // Open as blob db. Verify it can read existing data.
  931. Open();
  932. VerifyDB(blob_db_, data);
  933. for (size_t i = 0; i < kNumIteration; i++) {
  934. auto key_index = rnd.Next() % kNumKey;
  935. std::string key = "key" + std::to_string(key_index);
  936. is_blob[key_index] = true;
  937. PutRandom(blob_db_, key, &rnd, &data);
  938. }
  939. VerifyDB(blob_db_, data);
  940. delete blob_db_;
  941. blob_db_ = nullptr;
  942. // Verify plain db return error for keys written by blob db.
  943. ASSERT_OK(DB::Open(options, dbname_, &db));
  944. std::string value;
  945. for (size_t i = 0; i < kNumKey; i++) {
  946. std::string key = "key" + std::to_string(i);
  947. Status s = db->Get(ReadOptions(), key, &value);
  948. if (data.count(key) == 0) {
  949. ASSERT_TRUE(s.IsNotFound());
  950. } else if (is_blob[i]) {
  951. ASSERT_TRUE(s.IsCorruption());
  952. } else {
  953. ASSERT_OK(s);
  954. ASSERT_EQ(data[key], value);
  955. }
  956. }
  957. delete db;
  958. }
  959. // Test to verify that a NoSpace IOError Status is returned on reaching
  960. // max_db_size limit.
  961. TEST_F(BlobDBTest, OutOfSpace) {
  962. // Use mock env to stop wall clock.
  963. Options options;
  964. options.env = mock_env_.get();
  965. BlobDBOptions bdb_options;
  966. bdb_options.max_db_size = 200;
  967. bdb_options.is_fifo = false;
  968. bdb_options.disable_background_tasks = true;
  969. Open(bdb_options);
  970. // Each stored blob has an overhead of about 42 bytes currently.
  971. // So a small key + a 100 byte blob should take up ~150 bytes in the db.
  972. std::string value(100, 'v');
  973. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
  974. // Putting another blob should fail as ading it would exceed the max_db_size
  975. // limit.
  976. Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
  977. ASSERT_TRUE(s.IsIOError());
  978. ASSERT_TRUE(s.IsNoSpace());
  979. }
  980. TEST_F(BlobDBTest, FIFOEviction) {
  981. BlobDBOptions bdb_options;
  982. bdb_options.max_db_size = 200;
  983. bdb_options.blob_file_size = 100;
  984. bdb_options.is_fifo = true;
  985. bdb_options.disable_background_tasks = true;
  986. Open(bdb_options);
  987. std::atomic<int> evict_count{0};
  988. SyncPoint::GetInstance()->SetCallBack(
  989. "BlobDBImpl::EvictOldestBlobFile:Evicted",
  990. [&](void *) { evict_count++; });
  991. SyncPoint::GetInstance()->EnableProcessing();
  992. // Each stored blob has an overhead of 32 bytes currently.
  993. // So a 100 byte blob should take up 132 bytes.
  994. std::string value(100, 'v');
  995. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
  996. VerifyDB({{"key1", value}});
  997. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  998. // Adding another 100 bytes blob would take the total size to 264 bytes
  999. // (2*132). max_db_size will be exceeded
  1000. // than max_db_size and trigger FIFO eviction.
  1001. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
  1002. ASSERT_EQ(1, evict_count);
  1003. // key1 will exist until corresponding file be deleted.
  1004. VerifyDB({{"key1", value}, {"key2", value}});
  1005. // Adding another 100 bytes blob without TTL.
  1006. ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
  1007. ASSERT_EQ(2, evict_count);
  1008. // key1 and key2 will exist until corresponding file be deleted.
  1009. VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
  1010. // The fourth blob file, without TTL.
  1011. ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
  1012. ASSERT_EQ(3, evict_count);
  1013. VerifyDB(
  1014. {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
  1015. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1016. ASSERT_EQ(4, blob_files.size());
  1017. ASSERT_TRUE(blob_files[0]->Obsolete());
  1018. ASSERT_TRUE(blob_files[1]->Obsolete());
  1019. ASSERT_TRUE(blob_files[2]->Obsolete());
  1020. ASSERT_FALSE(blob_files[3]->Obsolete());
  1021. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1022. ASSERT_EQ(3, obsolete_files.size());
  1023. ASSERT_EQ(blob_files[0], obsolete_files[0]);
  1024. ASSERT_EQ(blob_files[1], obsolete_files[1]);
  1025. ASSERT_EQ(blob_files[2], obsolete_files[2]);
  1026. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1027. obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1028. ASSERT_TRUE(obsolete_files.empty());
  1029. VerifyDB({{"key4", value}});
  1030. }
  1031. TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
  1032. Options options;
  1033. BlobDBOptions bdb_options;
  1034. bdb_options.max_db_size = 1000;
  1035. bdb_options.blob_file_size = 5000;
  1036. bdb_options.is_fifo = true;
  1037. bdb_options.disable_background_tasks = true;
  1038. Open(bdb_options);
  1039. std::atomic<int> evict_count{0};
  1040. SyncPoint::GetInstance()->SetCallBack(
  1041. "BlobDBImpl::EvictOldestBlobFile:Evicted",
  1042. [&](void *) { evict_count++; });
  1043. SyncPoint::GetInstance()->EnableProcessing();
  1044. std::string value(2000, 'v');
  1045. ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
  1046. ASSERT_EQ(0, evict_count);
  1047. }
  1048. TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
  1049. BlobDBOptions bdb_options;
  1050. bdb_options.is_fifo = true;
  1051. bdb_options.min_blob_size = 100;
  1052. bdb_options.disable_background_tasks = true;
  1053. Options options;
  1054. // Use mock env to stop wall clock.
  1055. options.env = mock_env_.get();
  1056. options.disable_auto_compactions = true;
  1057. auto statistics = CreateDBStatistics();
  1058. options.statistics = statistics;
  1059. Open(bdb_options, options);
  1060. SyncPoint::GetInstance()->LoadDependency(
  1061. {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
  1062. "BlobDBTest.FIFOEviction_NoEnoughBlobFilesToEvict:AfterFlush"}});
  1063. SyncPoint::GetInstance()->EnableProcessing();
  1064. ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
  1065. std::string small_value(50, 'v');
  1066. std::map<std::string, std::string> data;
  1067. // Insert some data into LSM tree to make sure FIFO eviction take SST
  1068. // file size into account.
  1069. for (int i = 0; i < 1000; i++) {
  1070. ASSERT_OK(Put("key" + std::to_string(i), small_value, &data));
  1071. }
  1072. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1073. uint64_t live_sst_size = 0;
  1074. ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
  1075. &live_sst_size));
  1076. ASSERT_TRUE(live_sst_size > 0);
  1077. TEST_SYNC_POINT(
  1078. "BlobDBTest.FIFOEviction_NoEnoughBlobFilesToEvict:AfterFlush");
  1079. ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
  1080. bdb_options.max_db_size = live_sst_size + 2000;
  1081. Reopen(bdb_options, options);
  1082. ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
  1083. std::string value_1k(1000, 'v');
  1084. ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
  1085. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1086. VerifyDB(data);
  1087. // large_key2 evicts large_key1
  1088. ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
  1089. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1090. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1091. data.erase("large_key1");
  1092. VerifyDB(data);
  1093. // large_key3 get no enough space even after evicting large_key2, so it
  1094. // instead return no space error.
  1095. std::string value_2k(2000, 'v');
  1096. ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
  1097. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1098. // Verify large_key2 still exists.
  1099. VerifyDB(data);
  1100. SyncPoint::GetInstance()->DisableProcessing();
  1101. }
  1102. // Test flush or compaction will trigger FIFO eviction since they update
  1103. // total SST file size.
  1104. TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
  1105. BlobDBOptions bdb_options;
  1106. bdb_options.max_db_size = 1000;
  1107. bdb_options.is_fifo = true;
  1108. bdb_options.min_blob_size = 100;
  1109. bdb_options.disable_background_tasks = true;
  1110. Options options;
  1111. // Use mock env to stop wall clock.
  1112. options.env = mock_env_.get();
  1113. auto statistics = CreateDBStatistics();
  1114. options.statistics = statistics;
  1115. options.compression = kNoCompression;
  1116. Open(bdb_options, options);
  1117. SyncPoint::GetInstance()->LoadDependency(
  1118. {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
  1119. "BlobDBTest.FIFOEviction_TriggerOnSSTSizeChange:AfterFlush"}});
  1120. SyncPoint::GetInstance()->EnableProcessing();
  1121. std::string value(800, 'v');
  1122. ASSERT_OK(PutWithTTL("large_key", value, 60));
  1123. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1124. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1125. VerifyDB({{"large_key", value}});
  1126. // Insert some small keys and flush to bring DB out of space.
  1127. std::map<std::string, std::string> data;
  1128. for (int i = 0; i < 10; i++) {
  1129. ASSERT_OK(Put("key" + std::to_string(i), "v", &data));
  1130. }
  1131. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1132. TEST_SYNC_POINT("BlobDBTest.FIFOEviction_TriggerOnSSTSizeChange:AfterFlush");
  1133. // Verify large_key is deleted by FIFO eviction.
  1134. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1135. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  1136. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1137. VerifyDB(data);
  1138. SyncPoint::GetInstance()->DisableProcessing();
  1139. }
  1140. TEST_F(BlobDBTest, InlineSmallValues) {
  1141. constexpr uint64_t kMaxExpiration = 1000;
  1142. Random rnd(301);
  1143. BlobDBOptions bdb_options;
  1144. bdb_options.ttl_range_secs = kMaxExpiration;
  1145. bdb_options.min_blob_size = 100;
  1146. bdb_options.blob_file_size = 256 * 1000 * 1000;
  1147. bdb_options.disable_background_tasks = true;
  1148. Options options;
  1149. options.env = mock_env_.get();
  1150. mock_clock_->SetCurrentTime(0);
  1151. Open(bdb_options, options);
  1152. std::map<std::string, std::string> data;
  1153. std::map<std::string, KeyVersion> versions;
  1154. for (size_t i = 0; i < 1000; i++) {
  1155. bool is_small_value = rnd.Next() % 2;
  1156. bool has_ttl = rnd.Next() % 2;
  1157. uint64_t expiration = rnd.Next() % kMaxExpiration;
  1158. int len = is_small_value ? 50 : 200;
  1159. std::string key = "key" + std::to_string(i);
  1160. std::string value = rnd.HumanReadableString(len);
  1161. std::string blob_index;
  1162. data[key] = value;
  1163. SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1164. if (!has_ttl) {
  1165. ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
  1166. } else {
  1167. ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
  1168. }
  1169. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1170. versions[key] =
  1171. KeyVersion(key, value, sequence,
  1172. (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
  1173. }
  1174. VerifyDB(data);
  1175. VerifyBaseDB(versions);
  1176. auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
  1177. auto blob_files = bdb_impl->TEST_GetBlobFiles();
  1178. ASSERT_EQ(2, blob_files.size());
  1179. std::shared_ptr<BlobFile> non_ttl_file;
  1180. std::shared_ptr<BlobFile> ttl_file;
  1181. if (blob_files[0]->HasTTL()) {
  1182. ttl_file = blob_files[0];
  1183. non_ttl_file = blob_files[1];
  1184. } else {
  1185. non_ttl_file = blob_files[0];
  1186. ttl_file = blob_files[1];
  1187. }
  1188. ASSERT_FALSE(non_ttl_file->HasTTL());
  1189. ASSERT_TRUE(ttl_file->HasTTL());
  1190. }
  1191. TEST_F(BlobDBTest, UserCompactionFilter) {
  1192. class CustomerFilter : public CompactionFilter {
  1193. public:
  1194. bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
  1195. std::string *new_value, bool *value_changed) const override {
  1196. *value_changed = false;
  1197. // changing value size to test value transitions between inlined data
  1198. // and stored-in-blob data
  1199. if (value.size() % 4 == 1) {
  1200. *new_value = value.ToString();
  1201. // double size by duplicating value
  1202. *new_value += *new_value;
  1203. *value_changed = true;
  1204. return false;
  1205. } else if (value.size() % 3 == 1) {
  1206. *new_value = value.ToString();
  1207. // trancate value size by half
  1208. *new_value = new_value->substr(0, new_value->size() / 2);
  1209. *value_changed = true;
  1210. return false;
  1211. } else if (value.size() % 2 == 1) {
  1212. return true;
  1213. }
  1214. return false;
  1215. }
  1216. bool IgnoreSnapshots() const override { return true; }
  1217. const char *Name() const override { return "CustomerFilter"; }
  1218. };
  1219. class CustomerFilterFactory : public CompactionFilterFactory {
  1220. const char *Name() const override { return "CustomerFilterFactory"; }
  1221. std::unique_ptr<CompactionFilter> CreateCompactionFilter(
  1222. const CompactionFilter::Context & /*context*/) override {
  1223. return std::unique_ptr<CompactionFilter>(new CustomerFilter());
  1224. }
  1225. };
  1226. constexpr size_t kNumPuts = 1 << 10;
  1227. // Generate both inlined and blob value
  1228. constexpr uint64_t kMinValueSize = 1 << 6;
  1229. constexpr uint64_t kMaxValueSize = 1 << 8;
  1230. constexpr uint64_t kMinBlobSize = 1 << 7;
  1231. static_assert(kMinValueSize < kMinBlobSize);
  1232. static_assert(kMaxValueSize > kMinBlobSize);
  1233. BlobDBOptions bdb_options;
  1234. bdb_options.min_blob_size = kMinBlobSize;
  1235. bdb_options.blob_file_size = kMaxValueSize * 10;
  1236. bdb_options.disable_background_tasks = true;
  1237. if (Snappy_Supported()) {
  1238. bdb_options.compression = CompressionType::kSnappyCompression;
  1239. }
  1240. // case_num == 0: Test user defined compaction filter
  1241. // case_num == 1: Test user defined compaction filter factory
  1242. for (int case_num = 0; case_num < 2; case_num++) {
  1243. Options options;
  1244. if (case_num == 0) {
  1245. options.compaction_filter = new CustomerFilter();
  1246. } else {
  1247. options.compaction_filter_factory.reset(new CustomerFilterFactory());
  1248. }
  1249. options.disable_auto_compactions = true;
  1250. options.env = mock_env_.get();
  1251. options.statistics = CreateDBStatistics();
  1252. Open(bdb_options, options);
  1253. std::map<std::string, std::string> data;
  1254. std::map<std::string, std::string> data_after_compact;
  1255. Random rnd(301);
  1256. uint64_t value_size = kMinValueSize;
  1257. int drop_record = 0;
  1258. for (size_t i = 0; i < kNumPuts; ++i) {
  1259. std::ostringstream oss;
  1260. oss << "key" << std::setw(4) << std::setfill('0') << i;
  1261. const std::string key(oss.str());
  1262. const std::string value = rnd.HumanReadableString((int)value_size);
  1263. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1264. ASSERT_OK(Put(key, value));
  1265. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1266. data[key] = value;
  1267. if (value.length() % 4 == 1) {
  1268. data_after_compact[key] = value + value;
  1269. } else if (value.length() % 3 == 1) {
  1270. data_after_compact[key] = value.substr(0, value.size() / 2);
  1271. } else if (value.length() % 2 == 1) {
  1272. ++drop_record;
  1273. } else {
  1274. data_after_compact[key] = value;
  1275. }
  1276. if (++value_size > kMaxValueSize) {
  1277. value_size = kMinValueSize;
  1278. }
  1279. }
  1280. // Verify full data set
  1281. VerifyDB(data);
  1282. // Applying compaction filter for records
  1283. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1284. // Verify data after compaction, only value with even length left.
  1285. VerifyDB(data_after_compact);
  1286. ASSERT_EQ(drop_record,
  1287. options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
  1288. delete options.compaction_filter;
  1289. Destroy();
  1290. }
  1291. }
  1292. // Test user comapction filter when there is IO error on blob data.
  1293. TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
  1294. class CustomerFilter : public CompactionFilter {
  1295. public:
  1296. bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
  1297. std::string *new_value, bool *value_changed) const override {
  1298. *new_value = value.ToString() + "_new";
  1299. *value_changed = true;
  1300. return false;
  1301. }
  1302. bool IgnoreSnapshots() const override { return true; }
  1303. const char *Name() const override { return "CustomerFilter"; }
  1304. };
  1305. constexpr size_t kNumPuts = 100;
  1306. constexpr int kValueSize = 100;
  1307. BlobDBOptions bdb_options;
  1308. bdb_options.min_blob_size = 0;
  1309. bdb_options.blob_file_size = kValueSize * 10;
  1310. bdb_options.disable_background_tasks = true;
  1311. bdb_options.compression = CompressionType::kNoCompression;
  1312. std::vector<std::string> io_failure_cases = {
  1313. "BlobDBImpl::CreateBlobFileAndWriter",
  1314. "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
  1315. "BlobDBImpl::CloseBlobFile"};
  1316. for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
  1317. Options options;
  1318. options.compaction_filter = new CustomerFilter();
  1319. options.disable_auto_compactions = true;
  1320. options.env = fault_injection_env_.get();
  1321. options.statistics = CreateDBStatistics();
  1322. Open(bdb_options, options);
  1323. std::map<std::string, std::string> data;
  1324. Random rnd(301);
  1325. for (size_t i = 0; i < kNumPuts; ++i) {
  1326. std::ostringstream oss;
  1327. oss << "key" << std::setw(4) << std::setfill('0') << i;
  1328. const std::string key(oss.str());
  1329. const std::string value = rnd.HumanReadableString(kValueSize);
  1330. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1331. ASSERT_OK(Put(key, value));
  1332. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1333. data[key] = value;
  1334. }
  1335. // Verify full data set
  1336. VerifyDB(data);
  1337. SyncPoint::GetInstance()->SetCallBack(
  1338. io_failure_cases[case_num], [&](void * /*arg*/) {
  1339. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  1340. });
  1341. SyncPoint::GetInstance()->EnableProcessing();
  1342. auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  1343. ASSERT_TRUE(s.IsIOError());
  1344. // Reactivate file system to allow test to verify and close DB.
  1345. fault_injection_env_->SetFilesystemActive(true);
  1346. SyncPoint::GetInstance()->DisableProcessing();
  1347. SyncPoint::GetInstance()->ClearAllCallBacks();
  1348. // Verify full data set after compaction failure
  1349. VerifyDB(data);
  1350. delete options.compaction_filter;
  1351. Destroy();
  1352. }
  1353. }
  1354. // Test comapction filter should remove any expired blob index.
  1355. TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
  1356. constexpr size_t kNumKeys = 100;
  1357. constexpr size_t kNumPuts = 1000;
  1358. constexpr uint64_t kMaxExpiration = 1000;
  1359. constexpr uint64_t kCompactTime = 500;
  1360. constexpr uint64_t kMinBlobSize = 100;
  1361. Random rnd(301);
  1362. mock_clock_->SetCurrentTime(0);
  1363. BlobDBOptions bdb_options;
  1364. bdb_options.min_blob_size = kMinBlobSize;
  1365. bdb_options.disable_background_tasks = true;
  1366. Options options;
  1367. options.env = mock_env_.get();
  1368. Open(bdb_options, options);
  1369. std::map<std::string, std::string> data;
  1370. std::map<std::string, std::string> data_after_compact;
  1371. for (size_t i = 0; i < kNumPuts; i++) {
  1372. bool is_small_value = rnd.Next() % 2;
  1373. bool has_ttl = rnd.Next() % 2;
  1374. uint64_t expiration = rnd.Next() % kMaxExpiration;
  1375. int len = is_small_value ? 10 : 200;
  1376. std::string key = "key" + std::to_string(rnd.Next() % kNumKeys);
  1377. std::string value = rnd.HumanReadableString(len);
  1378. if (!has_ttl) {
  1379. if (is_small_value) {
  1380. std::string blob_entry;
  1381. BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
  1382. // Fake blob index with TTL. See what it will do.
  1383. ASSERT_GT(kMinBlobSize, blob_entry.size());
  1384. value = blob_entry;
  1385. }
  1386. ASSERT_OK(Put(key, value));
  1387. data_after_compact[key] = value;
  1388. } else {
  1389. ASSERT_OK(PutUntil(key, value, expiration));
  1390. if (expiration <= kCompactTime) {
  1391. data_after_compact.erase(key);
  1392. } else {
  1393. data_after_compact[key] = value;
  1394. }
  1395. }
  1396. data[key] = value;
  1397. }
  1398. VerifyDB(data);
  1399. mock_clock_->SetCurrentTime(kCompactTime);
  1400. // Take a snapshot before compaction. Make sure expired blob indexes is
  1401. // filtered regardless of snapshot.
  1402. const Snapshot *snapshot = blob_db_->GetSnapshot();
  1403. // Issue manual compaction to trigger compaction filter.
  1404. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1405. blob_db_->ReleaseSnapshot(snapshot);
  1406. // Verify expired blob index are filtered.
  1407. std::vector<KeyVersion> versions;
  1408. const size_t kMaxKeys = 10000;
  1409. ASSERT_OK(GetAllKeyVersions(blob_db_, {}, {}, kMaxKeys, &versions));
  1410. ASSERT_EQ(data_after_compact.size(), versions.size());
  1411. for (auto &version : versions) {
  1412. ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
  1413. }
  1414. VerifyDB(data_after_compact);
  1415. }
  1416. // Test compaction filter should remove any blob index where corresponding
  1417. // blob file has been removed.
  1418. TEST_F(BlobDBTest, FilterFileNotAvailable) {
  1419. BlobDBOptions bdb_options;
  1420. bdb_options.min_blob_size = 0;
  1421. bdb_options.disable_background_tasks = true;
  1422. Options options;
  1423. options.disable_auto_compactions = true;
  1424. Open(bdb_options, options);
  1425. ASSERT_OK(Put("foo", "v1"));
  1426. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1427. ASSERT_EQ(1, blob_files.size());
  1428. ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
  1429. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
  1430. ASSERT_OK(Put("bar", "v2"));
  1431. blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1432. ASSERT_EQ(2, blob_files.size());
  1433. ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
  1434. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
  1435. const size_t kMaxKeys = 10000;
  1436. DB *base_db = blob_db_->GetRootDB();
  1437. std::vector<KeyVersion> versions;
  1438. ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
  1439. ASSERT_EQ(2, versions.size());
  1440. ASSERT_EQ("bar", versions[0].user_key);
  1441. ASSERT_EQ("foo", versions[1].user_key);
  1442. VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
  1443. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1444. ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
  1445. ASSERT_EQ(2, versions.size());
  1446. ASSERT_EQ("bar", versions[0].user_key);
  1447. ASSERT_EQ("foo", versions[1].user_key);
  1448. VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
  1449. // Remove the first blob file and compact. foo should be remove from base db.
  1450. blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
  1451. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1452. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1453. ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
  1454. ASSERT_EQ(1, versions.size());
  1455. ASSERT_EQ("bar", versions[0].user_key);
  1456. VerifyDB({{"bar", "v2"}});
  1457. // Remove the second blob file and compact. bar should be remove from base db.
  1458. blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
  1459. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1460. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1461. ASSERT_OK(GetAllKeyVersions(base_db, {}, {}, kMaxKeys, &versions));
  1462. ASSERT_EQ(0, versions.size());
  1463. VerifyDB({});
  1464. }
  1465. // Test compaction filter should filter any inlined TTL keys that would have
  1466. // been dropped by last FIFO eviction if they are store out-of-line.
  1467. TEST_F(BlobDBTest, FilterForFIFOEviction) {
  1468. Random rnd(215);
  1469. BlobDBOptions bdb_options;
  1470. bdb_options.min_blob_size = 100;
  1471. bdb_options.ttl_range_secs = 60;
  1472. bdb_options.max_db_size = 0;
  1473. bdb_options.disable_background_tasks = true;
  1474. Options options;
  1475. // Use mock env to stop wall clock.
  1476. mock_clock_->SetCurrentTime(0);
  1477. options.env = mock_env_.get();
  1478. auto statistics = CreateDBStatistics();
  1479. options.statistics = statistics;
  1480. options.disable_auto_compactions = true;
  1481. Open(bdb_options, options);
  1482. SyncPoint::GetInstance()->LoadDependency(
  1483. {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
  1484. "BlobDBTest.FilterForFIFOEviction:AfterFlush"}});
  1485. SyncPoint::GetInstance()->EnableProcessing();
  1486. std::map<std::string, std::string> data;
  1487. std::map<std::string, std::string> data_after_compact;
  1488. // Insert some small values that will be inlined.
  1489. for (int i = 0; i < 1000; i++) {
  1490. std::string key = "key" + std::to_string(i);
  1491. std::string value = rnd.HumanReadableString(50);
  1492. uint64_t ttl = rnd.Next() % 120 + 1;
  1493. ASSERT_OK(PutWithTTL(key, value, ttl, &data));
  1494. if (ttl >= 60) {
  1495. data_after_compact[key] = value;
  1496. }
  1497. }
  1498. uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
  1499. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1500. TEST_SYNC_POINT("BlobDBTest.FilterForFIFOEviction:AfterFlush");
  1501. uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
  1502. ASSERT_GT(live_sst_size, 0);
  1503. VerifyDB(data);
  1504. bdb_options.max_db_size = live_sst_size + 30000;
  1505. bdb_options.is_fifo = true;
  1506. Reopen(bdb_options, options);
  1507. VerifyDB(data);
  1508. // Put two large values, each on a different blob file.
  1509. std::string large_value(10000, 'v');
  1510. ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
  1511. ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
  1512. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  1513. ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1514. data["large_key1"] = large_value;
  1515. data["large_key2"] = large_value;
  1516. VerifyDB(data);
  1517. // Put a third large value which will bring the DB out of space.
  1518. // FIFO eviction will evict the file of large_key1.
  1519. ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
  1520. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1521. ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
  1522. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1523. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1524. data.erase("large_key1");
  1525. data["large_key3"] = large_value;
  1526. VerifyDB(data);
  1527. // Putting some more small values. These values shouldn't be evicted by
  1528. // compaction filter since they are inserted after FIFO eviction.
  1529. ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
  1530. ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
  1531. // FIFO eviction doesn't trigger again since there enough room for the flush.
  1532. ASSERT_OK(blob_db_->Flush(FlushOptions()));
  1533. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1534. // Manual compact and check if compaction filter evict those keys with
  1535. // expiration < 60.
  1536. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1537. // All keys with expiration < 60, plus large_key1 is filtered by
  1538. // compaction filter.
  1539. ASSERT_EQ(num_keys_to_evict + 1,
  1540. statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
  1541. ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
  1542. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1543. data_after_compact["large_key2"] = large_value;
  1544. data_after_compact["large_key3"] = large_value;
  1545. VerifyDB(data_after_compact);
  1546. SyncPoint::GetInstance()->DisableProcessing();
  1547. }
  1548. TEST_F(BlobDBTest, GarbageCollection) {
  1549. constexpr size_t kNumPuts = 1 << 10;
  1550. constexpr uint64_t kExpiration = 1000;
  1551. constexpr uint64_t kCompactTime = 500;
  1552. constexpr uint64_t kKeySize = 7; // "key" + 4 digits
  1553. constexpr uint64_t kSmallValueSize = 1 << 6;
  1554. constexpr uint64_t kLargeValueSize = 1 << 8;
  1555. constexpr uint64_t kMinBlobSize = 1 << 7;
  1556. static_assert(kSmallValueSize < kMinBlobSize);
  1557. static_assert(kLargeValueSize > kMinBlobSize);
  1558. constexpr size_t kBlobsPerFile = 8;
  1559. constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
  1560. constexpr uint64_t kBlobFileSize =
  1561. BlobLogHeader::kSize +
  1562. (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
  1563. BlobDBOptions bdb_options;
  1564. bdb_options.min_blob_size = kMinBlobSize;
  1565. bdb_options.blob_file_size = kBlobFileSize;
  1566. bdb_options.enable_garbage_collection = true;
  1567. bdb_options.garbage_collection_cutoff = 0.25;
  1568. bdb_options.disable_background_tasks = true;
  1569. Options options;
  1570. options.env = mock_env_.get();
  1571. options.statistics = CreateDBStatistics();
  1572. Open(bdb_options, options);
  1573. std::map<std::string, std::string> data;
  1574. std::map<std::string, KeyVersion> blob_value_versions;
  1575. std::map<std::string, BlobIndexVersion> blob_index_versions;
  1576. Random rnd(301);
  1577. // Add a bunch of large non-TTL values. These will be written to non-TTL
  1578. // blob files and will be subject to GC.
  1579. for (size_t i = 0; i < kNumPuts; ++i) {
  1580. std::ostringstream oss;
  1581. oss << "key" << std::setw(4) << std::setfill('0') << i;
  1582. const std::string key(oss.str());
  1583. const std::string value = rnd.HumanReadableString(kLargeValueSize);
  1584. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1585. ASSERT_OK(Put(key, value));
  1586. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1587. data[key] = value;
  1588. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1589. blob_index_versions[key] =
  1590. BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
  1591. sequence, kTypeBlobIndex);
  1592. }
  1593. // Add some small and/or TTL values that will be ignored during GC.
  1594. // First, add a large TTL value will be written to its own TTL blob file.
  1595. {
  1596. const std::string key("key2000");
  1597. const std::string value = rnd.HumanReadableString(kLargeValueSize);
  1598. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1599. ASSERT_OK(PutUntil(key, value, kExpiration));
  1600. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1601. data[key] = value;
  1602. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1603. blob_index_versions[key] =
  1604. BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
  1605. sequence, kTypeBlobIndex);
  1606. }
  1607. // Now add a small TTL value (which will be inlined).
  1608. {
  1609. const std::string key("key3000");
  1610. const std::string value = rnd.HumanReadableString(kSmallValueSize);
  1611. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1612. ASSERT_OK(PutUntil(key, value, kExpiration));
  1613. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1614. data[key] = value;
  1615. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
  1616. blob_index_versions[key] = BlobIndexVersion(
  1617. key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
  1618. }
  1619. // Finally, add a small non-TTL value (which will be stored as a regular
  1620. // value).
  1621. {
  1622. const std::string key("key4000");
  1623. const std::string value = rnd.HumanReadableString(kSmallValueSize);
  1624. const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
  1625. ASSERT_OK(Put(key, value));
  1626. ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
  1627. data[key] = value;
  1628. blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
  1629. blob_index_versions[key] = BlobIndexVersion(
  1630. key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
  1631. }
  1632. VerifyDB(data);
  1633. VerifyBaseDB(blob_value_versions);
  1634. VerifyBaseDBBlobIndex(blob_index_versions);
  1635. // At this point, we should have 128 immutable non-TTL files with file numbers
  1636. // 1..128.
  1637. {
  1638. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1639. ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
  1640. for (size_t i = 0; i < kNumBlobFiles; ++i) {
  1641. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1642. ASSERT_EQ(live_imm_files[i]->GetFileSize(),
  1643. kBlobFileSize + BlobLogFooter::kSize);
  1644. }
  1645. }
  1646. mock_clock_->SetCurrentTime(kCompactTime);
  1647. ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1648. // We expect the data to remain the same and the blobs from the oldest N files
  1649. // to be moved to new files. Sequence numbers get zeroed out during the
  1650. // compaction.
  1651. VerifyDB(data);
  1652. for (auto &pair : blob_value_versions) {
  1653. KeyVersion &version = pair.second;
  1654. version.sequence = 0;
  1655. }
  1656. VerifyBaseDB(blob_value_versions);
  1657. const uint64_t cutoff = static_cast<uint64_t>(
  1658. bdb_options.garbage_collection_cutoff * kNumBlobFiles);
  1659. for (auto &pair : blob_index_versions) {
  1660. BlobIndexVersion &version = pair.second;
  1661. version.sequence = 0;
  1662. if (version.file_number == kInvalidBlobFileNumber) {
  1663. continue;
  1664. }
  1665. if (version.file_number > cutoff) {
  1666. continue;
  1667. }
  1668. version.file_number += kNumBlobFiles + 1;
  1669. }
  1670. VerifyBaseDBBlobIndex(blob_index_versions);
  1671. const Statistics *const statistics = options.statistics.get();
  1672. assert(statistics);
  1673. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
  1674. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
  1675. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
  1676. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
  1677. cutoff * kBlobsPerFile);
  1678. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
  1679. cutoff * kBlobsPerFile * kLargeValueSize);
  1680. // At this point, we should have 128 immutable non-TTL files with file numbers
  1681. // 33..128 and 130..161. (129 was taken by the TTL blob file.)
  1682. {
  1683. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1684. ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
  1685. for (size_t i = 0; i < kNumBlobFiles; ++i) {
  1686. uint64_t expected_file_number = i + cutoff + 1;
  1687. if (expected_file_number > kNumBlobFiles) {
  1688. ++expected_file_number;
  1689. }
  1690. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
  1691. ASSERT_EQ(live_imm_files[i]->GetFileSize(),
  1692. kBlobFileSize + BlobLogFooter::kSize);
  1693. }
  1694. }
  1695. }
  1696. TEST_F(BlobDBTest, GarbageCollectionFailure) {
  1697. BlobDBOptions bdb_options;
  1698. bdb_options.min_blob_size = 0;
  1699. bdb_options.enable_garbage_collection = true;
  1700. bdb_options.garbage_collection_cutoff = 1.0;
  1701. bdb_options.disable_background_tasks = true;
  1702. Options db_options;
  1703. db_options.statistics = CreateDBStatistics();
  1704. Open(bdb_options, db_options);
  1705. // Write a couple of valid blobs.
  1706. ASSERT_OK(Put("foo", "bar"));
  1707. ASSERT_OK(Put("dead", "beef"));
  1708. // Write a fake blob reference into the base DB that points to a non-existing
  1709. // blob file.
  1710. std::string blob_index;
  1711. BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
  1712. /* size */ 5678, kNoCompression);
  1713. WriteBatch batch;
  1714. ASSERT_OK(WriteBatchInternal::PutBlobIndex(
  1715. &batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index));
  1716. ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
  1717. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1718. ASSERT_EQ(blob_files.size(), 1);
  1719. auto blob_file = blob_files[0];
  1720. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
  1721. ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
  1722. .IsIOError());
  1723. const Statistics *const statistics = db_options.statistics.get();
  1724. assert(statistics);
  1725. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
  1726. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
  1727. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
  1728. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
  1729. ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
  1730. }
  1731. // File should be evicted after expiration.
  1732. TEST_F(BlobDBTest, EvictExpiredFile) {
  1733. BlobDBOptions bdb_options;
  1734. bdb_options.ttl_range_secs = 100;
  1735. bdb_options.min_blob_size = 0;
  1736. bdb_options.disable_background_tasks = true;
  1737. Options options;
  1738. options.env = mock_env_.get();
  1739. Open(bdb_options, options);
  1740. mock_clock_->SetCurrentTime(50);
  1741. std::map<std::string, std::string> data;
  1742. ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
  1743. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1744. ASSERT_EQ(1, blob_files.size());
  1745. auto blob_file = blob_files[0];
  1746. ASSERT_FALSE(blob_file->Immutable());
  1747. ASSERT_FALSE(blob_file->Obsolete());
  1748. VerifyDB(data);
  1749. mock_clock_->SetCurrentTime(250);
  1750. // The key should expired now.
  1751. blob_db_impl()->TEST_EvictExpiredFiles();
  1752. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1753. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1754. ASSERT_TRUE(blob_file->Immutable());
  1755. ASSERT_TRUE(blob_file->Obsolete());
  1756. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1757. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  1758. ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1759. // Make sure we don't return garbage value after blob file being evicted,
  1760. // but the blob index still exists in the LSM tree.
  1761. std::string val;
  1762. ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
  1763. ASSERT_EQ("", val);
  1764. }
  1765. TEST_F(BlobDBTest, DisableFileDeletions) {
  1766. BlobDBOptions bdb_options;
  1767. bdb_options.disable_background_tasks = true;
  1768. Open(bdb_options);
  1769. std::map<std::string, std::string> data;
  1770. ASSERT_OK(Put("foo", "v", &data));
  1771. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1772. ASSERT_EQ(1, blob_files.size());
  1773. auto blob_file = blob_files[0];
  1774. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
  1775. blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
  1776. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1777. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1778. // Call DisableFileDeletions twice.
  1779. ASSERT_OK(blob_db_->DisableFileDeletions());
  1780. ASSERT_OK(blob_db_->DisableFileDeletions());
  1781. // File deletions should be disabled.
  1782. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1783. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1784. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1785. VerifyDB(data);
  1786. // Enable file deletions once. File deletion will later get enabled when
  1787. // `EnableFileDeletions` called for a second time.
  1788. ASSERT_OK(blob_db_->EnableFileDeletions());
  1789. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1790. ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
  1791. ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1792. VerifyDB(data);
  1793. // Call EnableFileDeletions a second time.
  1794. ASSERT_OK(blob_db_->EnableFileDeletions());
  1795. blob_db_impl()->TEST_DeleteObsoleteFiles();
  1796. // File should be deleted by now.
  1797. ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
  1798. ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
  1799. VerifyDB({});
  1800. }
  1801. TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
  1802. BlobDBOptions bdb_options;
  1803. bdb_options.enable_garbage_collection = true;
  1804. bdb_options.disable_background_tasks = true;
  1805. Open(bdb_options);
  1806. // Register some dummy blob files.
  1807. blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
  1808. blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
  1809. blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
  1810. blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
  1811. blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
  1812. // Initialize the blob <-> SST file mapping. First, add some SST files with
  1813. // blob file references, then some without.
  1814. std::vector<LiveFileMetaData> live_files;
  1815. for (uint64_t i = 1; i <= 10; ++i) {
  1816. LiveFileMetaData live_file;
  1817. live_file.file_number = i;
  1818. live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
  1819. live_files.emplace_back(live_file);
  1820. }
  1821. for (uint64_t i = 11; i <= 20; ++i) {
  1822. LiveFileMetaData live_file;
  1823. live_file.file_number = i;
  1824. live_files.emplace_back(live_file);
  1825. }
  1826. blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
  1827. // Check that the blob <-> SST mappings have been correctly initialized.
  1828. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  1829. ASSERT_EQ(blob_files.size(), 5);
  1830. {
  1831. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1832. ASSERT_EQ(live_imm_files.size(), 5);
  1833. for (size_t i = 0; i < 5; ++i) {
  1834. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1835. }
  1836. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1837. }
  1838. {
  1839. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1840. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
  1841. const std::vector<bool> expected_obsolete{false, false, false, false,
  1842. false};
  1843. for (size_t i = 0; i < 5; ++i) {
  1844. const auto &blob_file = blob_files[i];
  1845. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1846. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1847. }
  1848. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1849. ASSERT_EQ(live_imm_files.size(), 5);
  1850. for (size_t i = 0; i < 5; ++i) {
  1851. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1852. }
  1853. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1854. }
  1855. // Simulate a flush where the SST does not reference any blob files.
  1856. {
  1857. FlushJobInfo info{};
  1858. info.file_number = 21;
  1859. info.smallest_seqno = 1;
  1860. info.largest_seqno = 100;
  1861. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1862. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1863. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
  1864. const std::vector<bool> expected_obsolete{false, false, false, false,
  1865. false};
  1866. for (size_t i = 0; i < 5; ++i) {
  1867. const auto &blob_file = blob_files[i];
  1868. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1869. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1870. }
  1871. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1872. ASSERT_EQ(live_imm_files.size(), 5);
  1873. for (size_t i = 0; i < 5; ++i) {
  1874. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1875. }
  1876. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1877. }
  1878. // Simulate a flush where the SST references a blob file.
  1879. {
  1880. FlushJobInfo info{};
  1881. info.file_number = 22;
  1882. info.oldest_blob_file_number = 5;
  1883. info.smallest_seqno = 101;
  1884. info.largest_seqno = 200;
  1885. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1886. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1887. {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
  1888. const std::vector<bool> expected_obsolete{false, false, false, false,
  1889. false};
  1890. for (size_t i = 0; i < 5; ++i) {
  1891. const auto &blob_file = blob_files[i];
  1892. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1893. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1894. }
  1895. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1896. ASSERT_EQ(live_imm_files.size(), 5);
  1897. for (size_t i = 0; i < 5; ++i) {
  1898. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
  1899. }
  1900. ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
  1901. }
  1902. // Simulate a compaction. Some inputs and outputs have blob file references,
  1903. // some don't. There is also a trivial move (which means the SST appears on
  1904. // both the input and the output list). Blob file 1 loses all its linked SSTs,
  1905. // and since it got marked immutable at sequence number 200 which has already
  1906. // been flushed, it can be marked obsolete.
  1907. {
  1908. CompactionJobInfo info{};
  1909. info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
  1910. info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
  1911. info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
  1912. info.input_file_infos.emplace_back(
  1913. CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
  1914. info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
  1915. info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1916. info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
  1917. info.output_file_infos.emplace_back(
  1918. CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
  1919. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1920. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1921. {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
  1922. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1923. for (size_t i = 0; i < 5; ++i) {
  1924. const auto &blob_file = blob_files[i];
  1925. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1926. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1927. }
  1928. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1929. ASSERT_EQ(live_imm_files.size(), 4);
  1930. for (size_t i = 0; i < 4; ++i) {
  1931. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1932. }
  1933. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1934. ASSERT_EQ(obsolete_files.size(), 1);
  1935. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1936. }
  1937. // Simulate a failed compaction. No mappings should be updated.
  1938. {
  1939. CompactionJobInfo info{};
  1940. info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
  1941. info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1942. info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
  1943. info.status = Status::Corruption();
  1944. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1945. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1946. {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
  1947. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1948. for (size_t i = 0; i < 5; ++i) {
  1949. const auto &blob_file = blob_files[i];
  1950. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1951. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1952. }
  1953. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1954. ASSERT_EQ(live_imm_files.size(), 4);
  1955. for (size_t i = 0; i < 4; ++i) {
  1956. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1957. }
  1958. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1959. ASSERT_EQ(obsolete_files.size(), 1);
  1960. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1961. }
  1962. // Simulate another compaction. Blob file 2 loses all its linked SSTs
  1963. // but since it got marked immutable at sequence number 300 which hasn't
  1964. // been flushed yet, it cannot be marked obsolete at this point.
  1965. {
  1966. CompactionJobInfo info{};
  1967. info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
  1968. info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
  1969. info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
  1970. blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
  1971. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1972. {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
  1973. const std::vector<bool> expected_obsolete{true, false, false, false, false};
  1974. for (size_t i = 0; i < 5; ++i) {
  1975. const auto &blob_file = blob_files[i];
  1976. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  1977. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  1978. }
  1979. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  1980. ASSERT_EQ(live_imm_files.size(), 4);
  1981. for (size_t i = 0; i < 4; ++i) {
  1982. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
  1983. }
  1984. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  1985. ASSERT_EQ(obsolete_files.size(), 1);
  1986. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  1987. }
  1988. // Simulate a flush with largest sequence number 300. This will make it
  1989. // possible to mark blob file 2 obsolete.
  1990. {
  1991. FlushJobInfo info{};
  1992. info.file_number = 26;
  1993. info.smallest_seqno = 201;
  1994. info.largest_seqno = 300;
  1995. blob_db_impl()->TEST_ProcessFlushJobInfo(info);
  1996. const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
  1997. {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
  1998. const std::vector<bool> expected_obsolete{true, true, false, false, false};
  1999. for (size_t i = 0; i < 5; ++i) {
  2000. const auto &blob_file = blob_files[i];
  2001. ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
  2002. ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
  2003. }
  2004. auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
  2005. ASSERT_EQ(live_imm_files.size(), 3);
  2006. for (size_t i = 0; i < 3; ++i) {
  2007. ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
  2008. }
  2009. auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
  2010. ASSERT_EQ(obsolete_files.size(), 2);
  2011. ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
  2012. ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
  2013. }
  2014. }
  2015. TEST_F(BlobDBTest, ShutdownWait) {
  2016. BlobDBOptions bdb_options;
  2017. bdb_options.ttl_range_secs = 100;
  2018. bdb_options.min_blob_size = 0;
  2019. bdb_options.disable_background_tasks = false;
  2020. Options options;
  2021. options.env = mock_env_.get();
  2022. SyncPoint::GetInstance()->LoadDependency({
  2023. {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
  2024. {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
  2025. {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
  2026. {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
  2027. });
  2028. // Force all tasks to be scheduled immediately.
  2029. SyncPoint::GetInstance()->SetCallBack(
  2030. "TimeQueue::Add:item.end", [&](void *arg) {
  2031. std::chrono::steady_clock::time_point *tp =
  2032. static_cast<std::chrono::steady_clock::time_point *>(arg);
  2033. *tp =
  2034. std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
  2035. });
  2036. SyncPoint::GetInstance()->SetCallBack(
  2037. "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
  2038. // Sleep 3 ms to increase the chance of data race.
  2039. // We've synced up the code so that EvictExpiredFiles()
  2040. // is called concurrently with ~BlobDBImpl().
  2041. // ~BlobDBImpl() is supposed to wait for all background
  2042. // task to shutdown before doing anything else. In order
  2043. // to use the same test to reproduce a bug of the waiting
  2044. // logic, we wait a little bit here, so that TSAN can
  2045. // catch the data race.
  2046. // We should improve the test if we find a better way.
  2047. Env::Default()->SleepForMicroseconds(3000);
  2048. });
  2049. SyncPoint::GetInstance()->EnableProcessing();
  2050. Open(bdb_options, options);
  2051. mock_clock_->SetCurrentTime(50);
  2052. std::map<std::string, std::string> data;
  2053. ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
  2054. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  2055. ASSERT_EQ(1, blob_files.size());
  2056. auto blob_file = blob_files[0];
  2057. ASSERT_FALSE(blob_file->Immutable());
  2058. ASSERT_FALSE(blob_file->Obsolete());
  2059. VerifyDB(data);
  2060. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
  2061. mock_clock_->SetCurrentTime(250);
  2062. // The key should expired now.
  2063. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
  2064. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
  2065. TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
  2066. Close();
  2067. SyncPoint::GetInstance()->DisableProcessing();
  2068. }
  2069. TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
  2070. Options options;
  2071. options.statistics = CreateDBStatistics();
  2072. BlobDBOptions blob_options;
  2073. blob_options.min_blob_size = 0;
  2074. blob_options.bytes_per_sync = 1 << 20;
  2075. blob_options.disable_background_tasks = true;
  2076. Open(blob_options, options);
  2077. ASSERT_OK(Put("foo", "bar"));
  2078. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  2079. ASSERT_EQ(blob_files.size(), 1);
  2080. ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
  2081. ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
  2082. }
  2083. TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
  2084. Options options;
  2085. options.env = fault_injection_env_.get();
  2086. BlobDBOptions blob_options;
  2087. blob_options.min_blob_size = 0;
  2088. blob_options.bytes_per_sync = 1 << 20;
  2089. blob_options.disable_background_tasks = true;
  2090. Open(blob_options, options);
  2091. ASSERT_OK(Put("foo", "bar"));
  2092. auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
  2093. ASSERT_EQ(blob_files.size(), 1);
  2094. SyncPoint::GetInstance()->SetCallBack(
  2095. "BlobLogWriter::Sync", [this](void * /* arg */) {
  2096. fault_injection_env_->SetFilesystemActive(false, Status::IOError());
  2097. });
  2098. SyncPoint::GetInstance()->EnableProcessing();
  2099. const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
  2100. fault_injection_env_->SetFilesystemActive(true);
  2101. SyncPoint::GetInstance()->DisableProcessing();
  2102. SyncPoint::GetInstance()->ClearAllCallBacks();
  2103. ASSERT_TRUE(s.IsIOError());
  2104. }
  2105. } // namespace ROCKSDB_NAMESPACE::blob_db
  2106. // A black-box test for the ttl wrapper around rocksdb
  2107. int main(int argc, char **argv) {
  2108. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2109. ::testing::InitGoogleTest(&argc, argv);
  2110. return RUN_ALL_TESTS();
  2111. }