db_blob_compaction_test.cc 30 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_index.h"
  6. #include "db/blob/blob_log_format.h"
  7. #include "db/db_test_util.h"
  8. #include "port/stack_trace.h"
  9. #include "test_util/sync_point.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. class DBBlobCompactionTest : public DBTestBase {
  12. public:
  13. explicit DBBlobCompactionTest()
  14. : DBTestBase("db_blob_compaction_test", /*env_do_fsync=*/false) {}
  15. const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
  16. VersionSet* const versions = dbfull()->GetVersionSet();
  17. assert(versions);
  18. assert(versions->GetColumnFamilySet());
  19. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  20. assert(cfd);
  21. const InternalStats* const internal_stats = cfd->internal_stats();
  22. assert(internal_stats);
  23. return internal_stats->TEST_GetCompactionStats();
  24. }
  25. };
  26. namespace {
  27. class FilterByKeyLength : public CompactionFilter {
  28. public:
  29. explicit FilterByKeyLength(size_t len) : length_threshold_(len) {}
  30. const char* Name() const override {
  31. return "rocksdb.compaction.filter.by.key.length";
  32. }
  33. CompactionFilter::Decision FilterBlobByKey(
  34. int /*level*/, const Slice& key, std::string* /*new_value*/,
  35. std::string* /*skip_until*/) const override {
  36. if (key.size() < length_threshold_) {
  37. return CompactionFilter::Decision::kRemove;
  38. }
  39. return CompactionFilter::Decision::kKeep;
  40. }
  41. private:
  42. size_t length_threshold_;
  43. };
  44. class FilterByValueLength : public CompactionFilter {
  45. public:
  46. explicit FilterByValueLength(size_t len) : length_threshold_(len) {}
  47. const char* Name() const override {
  48. return "rocksdb.compaction.filter.by.value.length";
  49. }
  50. CompactionFilter::Decision FilterV2(
  51. int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
  52. const Slice& existing_value, std::string* /*new_value*/,
  53. std::string* /*skip_until*/) const override {
  54. if (existing_value.size() < length_threshold_) {
  55. return CompactionFilter::Decision::kRemove;
  56. }
  57. return CompactionFilter::Decision::kKeep;
  58. }
  59. private:
  60. size_t length_threshold_;
  61. };
  62. class BadBlobCompactionFilter : public CompactionFilter {
  63. public:
  64. explicit BadBlobCompactionFilter(std::string prefix,
  65. CompactionFilter::Decision filter_by_key,
  66. CompactionFilter::Decision filter_v2)
  67. : prefix_(std::move(prefix)),
  68. filter_blob_by_key_(filter_by_key),
  69. filter_v2_(filter_v2) {}
  70. const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
  71. CompactionFilter::Decision FilterBlobByKey(
  72. int /*level*/, const Slice& key, std::string* /*new_value*/,
  73. std::string* /*skip_until*/) const override {
  74. if (key.size() >= prefix_.size() &&
  75. 0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
  76. return CompactionFilter::Decision::kUndetermined;
  77. }
  78. return filter_blob_by_key_;
  79. }
  80. CompactionFilter::Decision FilterV2(
  81. int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
  82. const Slice& /*existing_value*/, std::string* /*new_value*/,
  83. std::string* /*skip_until*/) const override {
  84. return filter_v2_;
  85. }
  86. private:
  87. const std::string prefix_;
  88. const CompactionFilter::Decision filter_blob_by_key_;
  89. const CompactionFilter::Decision filter_v2_;
  90. };
  91. class ValueBlindWriteFilter : public CompactionFilter {
  92. public:
  93. explicit ValueBlindWriteFilter(std::string new_val)
  94. : new_value_(std::move(new_val)) {}
  95. const char* Name() const override {
  96. return "rocksdb.compaction.filter.blind.write";
  97. }
  98. CompactionFilter::Decision FilterBlobByKey(
  99. int level, const Slice& key, std::string* new_value,
  100. std::string* skip_until) const override;
  101. private:
  102. const std::string new_value_;
  103. };
  104. CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
  105. int /*level*/, const Slice& /*key*/, std::string* new_value,
  106. std::string* /*skip_until*/) const {
  107. assert(new_value);
  108. new_value->assign(new_value_);
  109. return CompactionFilter::Decision::kChangeValue;
  110. }
  111. class ValueMutationFilter : public CompactionFilter {
  112. public:
  113. explicit ValueMutationFilter(std::string padding)
  114. : padding_(std::move(padding)) {}
  115. const char* Name() const override {
  116. return "rocksdb.compaction.filter.value.mutation";
  117. }
  118. CompactionFilter::Decision FilterV2(int level, const Slice& key,
  119. ValueType value_type,
  120. const Slice& existing_value,
  121. std::string* new_value,
  122. std::string* skip_until) const override;
  123. private:
  124. const std::string padding_;
  125. };
  126. CompactionFilter::Decision ValueMutationFilter::FilterV2(
  127. int /*level*/, const Slice& /*key*/, ValueType value_type,
  128. const Slice& existing_value, std::string* new_value,
  129. std::string* /*skip_until*/) const {
  130. assert(CompactionFilter::ValueType::kBlobIndex != value_type);
  131. if (CompactionFilter::ValueType::kValue != value_type) {
  132. return CompactionFilter::Decision::kKeep;
  133. }
  134. assert(new_value);
  135. new_value->assign(existing_value.data(), existing_value.size());
  136. new_value->append(padding_);
  137. return CompactionFilter::Decision::kChangeValue;
  138. }
  139. class AlwaysKeepFilter : public CompactionFilter {
  140. public:
  141. explicit AlwaysKeepFilter() = default;
  142. const char* Name() const override {
  143. return "rocksdb.compaction.filter.always.keep";
  144. }
  145. CompactionFilter::Decision FilterV2(
  146. int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
  147. const Slice& /*existing_value*/, std::string* /*new_value*/,
  148. std::string* /*skip_until*/) const override {
  149. return CompactionFilter::Decision::kKeep;
  150. }
  151. };
  152. class SkipUntilFilter : public CompactionFilter {
  153. public:
  154. explicit SkipUntilFilter(std::string skip_until)
  155. : skip_until_(std::move(skip_until)) {}
  156. const char* Name() const override {
  157. return "rocksdb.compaction.filter.skip.until";
  158. }
  159. CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */,
  160. ValueType /* value_type */,
  161. const Slice& /* existing_value */,
  162. std::string* /* new_value */,
  163. std::string* skip_until) const override {
  164. assert(skip_until);
  165. *skip_until = skip_until_;
  166. return CompactionFilter::Decision::kRemoveAndSkipUntil;
  167. }
  168. private:
  169. std::string skip_until_;
  170. };
  171. } // anonymous namespace
  172. class DBBlobBadCompactionFilterTest
  173. : public DBBlobCompactionTest,
  174. public testing::WithParamInterface<
  175. std::tuple<std::string, CompactionFilter::Decision,
  176. CompactionFilter::Decision>> {
  177. public:
  178. explicit DBBlobBadCompactionFilterTest()
  179. : compaction_filter_guard_(new BadBlobCompactionFilter(
  180. std::get<0>(GetParam()), std::get<1>(GetParam()),
  181. std::get<2>(GetParam()))) {}
  182. protected:
  183. std::unique_ptr<CompactionFilter> compaction_filter_guard_;
  184. };
  185. INSTANTIATE_TEST_CASE_P(
  186. BadCompactionFilter, DBBlobBadCompactionFilterTest,
  187. testing::Combine(
  188. testing::Values("a"),
  189. testing::Values(CompactionFilter::Decision::kChangeBlobIndex,
  190. CompactionFilter::Decision::kIOError),
  191. testing::Values(CompactionFilter::Decision::kUndetermined,
  192. CompactionFilter::Decision::kChangeBlobIndex,
  193. CompactionFilter::Decision::kIOError)));
  194. TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
  195. Options options = GetDefaultOptions();
  196. options.enable_blob_files = true;
  197. options.min_blob_size = 0;
  198. options.create_if_missing = true;
  199. constexpr size_t kKeyLength = 2;
  200. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  201. new FilterByKeyLength(kKeyLength));
  202. options.compaction_filter = compaction_filter_guard.get();
  203. constexpr char short_key[] = "a";
  204. constexpr char long_key[] = "abc";
  205. constexpr char blob_value[] = "value";
  206. DestroyAndReopen(options);
  207. ASSERT_OK(Put(short_key, blob_value));
  208. ASSERT_OK(Put(long_key, blob_value));
  209. ASSERT_OK(Flush());
  210. CompactRangeOptions cro;
  211. ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  212. std::string value;
  213. ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound());
  214. value.clear();
  215. ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
  216. ASSERT_EQ("value", value);
  217. const auto& compaction_stats = GetCompactionStats();
  218. ASSERT_GE(compaction_stats.size(), 2);
  219. // Filter decides between kKeep and kRemove solely based on key;
  220. // this involves neither reading nor writing blobs
  221. ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
  222. ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
  223. Close();
  224. }
  225. TEST_F(DBBlobCompactionTest, FilterByValueLength) {
  226. Options options = GetDefaultOptions();
  227. options.enable_blob_files = true;
  228. options.min_blob_size = 5;
  229. options.create_if_missing = true;
  230. constexpr size_t kValueLength = 5;
  231. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  232. new FilterByValueLength(kValueLength));
  233. options.compaction_filter = compaction_filter_guard.get();
  234. const std::vector<std::string> short_value_keys = {"a", "e", "j"};
  235. constexpr char short_value[] = "val";
  236. const std::vector<std::string> long_value_keys = {"b", "f", "k"};
  237. constexpr char long_value[] = "valuevalue";
  238. DestroyAndReopen(options);
  239. for (size_t i = 0; i < short_value_keys.size(); ++i) {
  240. ASSERT_OK(Put(short_value_keys[i], short_value));
  241. }
  242. for (size_t i = 0; i < short_value_keys.size(); ++i) {
  243. ASSERT_OK(Put(long_value_keys[i], long_value));
  244. }
  245. ASSERT_OK(Flush());
  246. CompactRangeOptions cro;
  247. ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  248. std::string value;
  249. for (size_t i = 0; i < short_value_keys.size(); ++i) {
  250. ASSERT_TRUE(
  251. db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound());
  252. value.clear();
  253. }
  254. for (size_t i = 0; i < long_value_keys.size(); ++i) {
  255. ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value));
  256. ASSERT_EQ(long_value, value);
  257. }
  258. const auto& compaction_stats = GetCompactionStats();
  259. ASSERT_GE(compaction_stats.size(), 2);
  260. // Filter decides between kKeep and kRemove based on value;
  261. // this involves reading but not writing blobs
  262. ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
  263. ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
  264. Close();
  265. }
  266. TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
  267. Options options = GetDefaultOptions();
  268. options.enable_blob_files = true;
  269. options.min_blob_size = 1000;
  270. options.blob_file_starting_level = 5;
  271. options.create_if_missing = true;
  272. // Open DB with fixed-prefix sst-partitioner so that compaction will cut
  273. // new table file when encountering a new key whose 1-byte prefix changes.
  274. constexpr size_t key_len = 1;
  275. options.sst_partitioner_factory =
  276. NewSstPartitionerFixedPrefixFactory(key_len);
  277. ASSERT_OK(TryReopen(options));
  278. constexpr size_t blob_size = 3000;
  279. constexpr char first_key[] = "a";
  280. const std::string first_blob(blob_size, 'a');
  281. ASSERT_OK(Put(first_key, first_blob));
  282. constexpr char second_key[] = "b";
  283. const std::string second_blob(2 * blob_size, 'b');
  284. ASSERT_OK(Put(second_key, second_blob));
  285. constexpr char third_key[] = "d";
  286. const std::string third_blob(blob_size, 'd');
  287. ASSERT_OK(Put(third_key, third_blob));
  288. ASSERT_OK(Flush());
  289. constexpr char fourth_key[] = "c";
  290. const std::string fourth_blob(blob_size, 'c');
  291. ASSERT_OK(Put(fourth_key, fourth_blob));
  292. ASSERT_OK(Flush());
  293. ASSERT_EQ(0, GetBlobFileNumbers().size());
  294. ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
  295. ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
  296. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  297. /*end=*/nullptr));
  298. // No blob file should be created since blob_file_starting_level is 5.
  299. ASSERT_EQ(0, GetBlobFileNumbers().size());
  300. ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
  301. ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
  302. {
  303. options.blob_file_starting_level = 1;
  304. DestroyAndReopen(options);
  305. ASSERT_OK(Put(first_key, first_blob));
  306. ASSERT_OK(Put(second_key, second_blob));
  307. ASSERT_OK(Put(third_key, third_blob));
  308. ASSERT_OK(Flush());
  309. ASSERT_OK(Put(fourth_key, fourth_blob));
  310. ASSERT_OK(Flush());
  311. ASSERT_EQ(0, GetBlobFileNumbers().size());
  312. ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
  313. ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
  314. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  315. /*end=*/nullptr));
  316. // The compaction's output level equals to blob_file_starting_level.
  317. ASSERT_EQ(1, GetBlobFileNumbers().size());
  318. ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
  319. ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
  320. }
  321. Close();
  322. }
  323. TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
  324. Options options = GetDefaultOptions();
  325. options.enable_blob_files = true;
  326. options.min_blob_size = 0;
  327. options.create_if_missing = true;
  328. constexpr char new_blob_value[] = "new_blob_value";
  329. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  330. new ValueBlindWriteFilter(new_blob_value));
  331. options.compaction_filter = compaction_filter_guard.get();
  332. DestroyAndReopen(options);
  333. const std::vector<std::string> keys = {"a", "b", "c"};
  334. const std::vector<std::string> values = {"a_value", "b_value", "c_value"};
  335. assert(keys.size() == values.size());
  336. for (size_t i = 0; i < keys.size(); ++i) {
  337. ASSERT_OK(Put(keys[i], values[i]));
  338. }
  339. ASSERT_OK(Flush());
  340. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  341. /*end=*/nullptr));
  342. for (const auto& key : keys) {
  343. ASSERT_EQ(new_blob_value, Get(key));
  344. }
  345. const auto& compaction_stats = GetCompactionStats();
  346. ASSERT_GE(compaction_stats.size(), 2);
  347. // Filter unconditionally changes value in FilterBlobByKey;
  348. // this involves writing but not reading blobs
  349. ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
  350. ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
  351. Close();
  352. }
  353. TEST_F(DBBlobCompactionTest, SkipUntilFilter) {
  354. Options options = GetDefaultOptions();
  355. options.enable_blob_files = true;
  356. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  357. new SkipUntilFilter("z"));
  358. options.compaction_filter = compaction_filter_guard.get();
  359. Reopen(options);
  360. const std::vector<std::string> keys{"a", "b", "c"};
  361. const std::vector<std::string> values{"a_value", "b_value", "c_value"};
  362. assert(keys.size() == values.size());
  363. for (size_t i = 0; i < keys.size(); ++i) {
  364. ASSERT_OK(Put(keys[i], values[i]));
  365. }
  366. ASSERT_OK(Flush());
  367. int process_in_flow_called = 0;
  368. SyncPoint::GetInstance()->SetCallBack(
  369. "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
  370. [&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; });
  371. SyncPoint::GetInstance()->EnableProcessing();
  372. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
  373. /* end */ nullptr));
  374. SyncPoint::GetInstance()->DisableProcessing();
  375. SyncPoint::GetInstance()->ClearAllCallBacks();
  376. for (const auto& key : keys) {
  377. ASSERT_EQ(Get(key), "NOT_FOUND");
  378. }
  379. // Make sure SkipUntil was performed using iteration rather than Seek
  380. ASSERT_EQ(process_in_flow_called, keys.size());
  381. Close();
  382. }
  383. TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
  384. Options options = GetDefaultOptions();
  385. options.enable_blob_files = true;
  386. options.min_blob_size = 0;
  387. options.create_if_missing = true;
  388. options.compaction_filter = compaction_filter_guard_.get();
  389. DestroyAndReopen(options);
  390. ASSERT_OK(Put("b", "value"));
  391. ASSERT_OK(Flush());
  392. ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  393. /*end=*/nullptr)
  394. .IsNotSupported());
  395. Close();
  396. DestroyAndReopen(options);
  397. std::string key(std::get<0>(GetParam()));
  398. ASSERT_OK(Put(key, "value"));
  399. ASSERT_OK(Flush());
  400. ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  401. /*end=*/nullptr)
  402. .IsNotSupported());
  403. Close();
  404. }
  405. TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) {
  406. Options options = GetDefaultOptions();
  407. options.create_if_missing = true;
  408. options.enable_blob_files = true;
  409. options.min_blob_size = 0;
  410. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  411. new ValueMutationFilter(""));
  412. options.compaction_filter = compaction_filter_guard.get();
  413. DestroyAndReopen(options);
  414. constexpr char key[] = "key";
  415. constexpr char blob[] = "blob";
  416. // Fake an inlined TTL blob index.
  417. std::string blob_index;
  418. constexpr uint64_t expiration = 1234567890;
  419. BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
  420. WriteBatch batch;
  421. ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
  422. ASSERT_OK(db_->Write(WriteOptions(), &batch));
  423. ASSERT_OK(Flush());
  424. ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  425. /*end=*/nullptr)
  426. .IsCorruption());
  427. Close();
  428. }
  429. TEST_F(DBBlobCompactionTest, CompactionFilter) {
  430. Options options = GetDefaultOptions();
  431. options.create_if_missing = true;
  432. options.enable_blob_files = true;
  433. options.min_blob_size = 0;
  434. constexpr char padding[] = "_delta";
  435. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  436. new ValueMutationFilter(padding));
  437. options.compaction_filter = compaction_filter_guard.get();
  438. DestroyAndReopen(options);
  439. const std::vector<std::pair<std::string, std::string>> kvs = {
  440. {"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
  441. for (const auto& kv : kvs) {
  442. ASSERT_OK(Put(kv.first, kv.second));
  443. }
  444. ASSERT_OK(Flush());
  445. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  446. /*end=*/nullptr));
  447. for (const auto& kv : kvs) {
  448. ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
  449. }
  450. const auto& compaction_stats = GetCompactionStats();
  451. ASSERT_GE(compaction_stats.size(), 2);
  452. // Filter changes the value using the previous value in FilterV2;
  453. // this involves reading and writing blobs
  454. ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
  455. ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
  456. Close();
  457. }
  458. TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
  459. Options options = GetDefaultOptions();
  460. options.create_if_missing = true;
  461. options.enable_blob_files = true;
  462. options.min_blob_size = 0;
  463. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  464. new ValueMutationFilter(""));
  465. options.compaction_filter = compaction_filter_guard.get();
  466. DestroyAndReopen(options);
  467. constexpr char key[] = "key";
  468. constexpr char blob[] = "blob";
  469. ASSERT_OK(Put(key, blob));
  470. ASSERT_OK(Flush());
  471. SyncPoint::GetInstance()->SetCallBack(
  472. "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
  473. [](void* arg) {
  474. Slice* const blob_index = static_cast<Slice*>(arg);
  475. assert(blob_index);
  476. assert(!blob_index->empty());
  477. blob_index->remove_prefix(1);
  478. });
  479. SyncPoint::GetInstance()->EnableProcessing();
  480. ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  481. /*end=*/nullptr)
  482. .IsCorruption());
  483. SyncPoint::GetInstance()->DisableProcessing();
  484. SyncPoint::GetInstance()->ClearAllCallBacks();
  485. Close();
  486. }
  487. TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
  488. Options options = GetDefaultOptions();
  489. options.create_if_missing = true;
  490. options.enable_blob_files = true;
  491. options.min_blob_size = 0;
  492. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  493. new AlwaysKeepFilter());
  494. options.compaction_filter = compaction_filter_guard.get();
  495. DestroyAndReopen(options);
  496. ASSERT_OK(Put("foo", "foo_value"));
  497. ASSERT_OK(Flush());
  498. std::vector<uint64_t> blob_files = GetBlobFileNumbers();
  499. ASSERT_EQ(1, blob_files.size());
  500. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  501. /*end=*/nullptr));
  502. ASSERT_EQ(blob_files, GetBlobFileNumbers());
  503. const auto& compaction_stats = GetCompactionStats();
  504. ASSERT_GE(compaction_stats.size(), 2);
  505. // Filter decides to keep the existing value in FilterV2;
  506. // this involves reading but not writing blobs
  507. ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
  508. ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
  509. Close();
  510. }
  511. TEST_F(DBBlobCompactionTest, TrackGarbage) {
  512. Options options = GetDefaultOptions();
  513. options.enable_blob_files = true;
  514. Reopen(options);
  515. // First table+blob file pair: 4 blobs with different keys
  516. constexpr char first_key[] = "first_key";
  517. constexpr char first_value[] = "first_value";
  518. constexpr char second_key[] = "second_key";
  519. constexpr char second_value[] = "second_value";
  520. constexpr char third_key[] = "third_key";
  521. constexpr char third_value[] = "third_value";
  522. constexpr char fourth_key[] = "fourth_key";
  523. constexpr char fourth_value[] = "fourth_value";
  524. ASSERT_OK(Put(first_key, first_value));
  525. ASSERT_OK(Put(second_key, second_value));
  526. ASSERT_OK(Put(third_key, third_value));
  527. ASSERT_OK(Put(fourth_key, fourth_value));
  528. ASSERT_OK(Flush());
  529. // Second table+blob file pair: overwrite 2 existing keys
  530. constexpr char new_first_value[] = "new_first_value";
  531. constexpr char new_second_value[] = "new_second_value";
  532. ASSERT_OK(Put(first_key, new_first_value));
  533. ASSERT_OK(Put(second_key, new_second_value));
  534. ASSERT_OK(Flush());
  535. // Compact them together. The first blob file should have 2 garbage blobs
  536. // corresponding to the 2 overwritten keys.
  537. constexpr Slice* begin = nullptr;
  538. constexpr Slice* end = nullptr;
  539. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  540. VersionSet* const versions = dbfull()->GetVersionSet();
  541. assert(versions);
  542. assert(versions->GetColumnFamilySet());
  543. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  544. assert(cfd);
  545. Version* const current = cfd->current();
  546. assert(current);
  547. const VersionStorageInfo* const storage_info = current->storage_info();
  548. assert(storage_info);
  549. const auto& blob_files = storage_info->GetBlobFiles();
  550. ASSERT_EQ(blob_files.size(), 2);
  551. {
  552. const auto& meta = blob_files.front();
  553. assert(meta);
  554. constexpr uint64_t first_expected_bytes =
  555. sizeof(first_value) - 1 +
  556. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
  557. 1);
  558. constexpr uint64_t second_expected_bytes =
  559. sizeof(second_value) - 1 +
  560. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
  561. 1);
  562. constexpr uint64_t third_expected_bytes =
  563. sizeof(third_value) - 1 +
  564. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) -
  565. 1);
  566. constexpr uint64_t fourth_expected_bytes =
  567. sizeof(fourth_value) - 1 +
  568. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) -
  569. 1);
  570. ASSERT_EQ(meta->GetTotalBlobCount(), 4);
  571. ASSERT_EQ(meta->GetTotalBlobBytes(),
  572. first_expected_bytes + second_expected_bytes +
  573. third_expected_bytes + fourth_expected_bytes);
  574. ASSERT_EQ(meta->GetGarbageBlobCount(), 2);
  575. ASSERT_EQ(meta->GetGarbageBlobBytes(),
  576. first_expected_bytes + second_expected_bytes);
  577. }
  578. {
  579. const auto& meta = blob_files.back();
  580. assert(meta);
  581. constexpr uint64_t new_first_expected_bytes =
  582. sizeof(new_first_value) - 1 +
  583. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
  584. 1);
  585. constexpr uint64_t new_second_expected_bytes =
  586. sizeof(new_second_value) - 1 +
  587. BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
  588. 1);
  589. ASSERT_EQ(meta->GetTotalBlobCount(), 2);
  590. ASSERT_EQ(meta->GetTotalBlobBytes(),
  591. new_first_expected_bytes + new_second_expected_bytes);
  592. ASSERT_EQ(meta->GetGarbageBlobCount(), 0);
  593. ASSERT_EQ(meta->GetGarbageBlobBytes(), 0);
  594. }
  595. }
  596. TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
  597. Options options = GetDefaultOptions();
  598. options.enable_blob_files = true;
  599. options.min_blob_size = 0;
  600. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  601. options.disable_auto_compactions = true;
  602. Reopen(options);
  603. ASSERT_OK(Put("Key1", "v1_1"));
  604. ASSERT_OK(Put("Key2", "v2_1"));
  605. ASSERT_OK(Flush());
  606. ASSERT_OK(Merge("Key1", "v1_2"));
  607. ASSERT_OK(Merge("Key2", "v2_2"));
  608. ASSERT_OK(Flush());
  609. ASSERT_OK(Merge("Key1", "v1_3"));
  610. ASSERT_OK(Flush());
  611. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  612. /*end=*/nullptr));
  613. ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
  614. ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
  615. Close();
  616. }
  617. TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) {
  618. Options options = GetDefaultOptions();
  619. options.enable_blob_files = true;
  620. options.min_blob_size = 0;
  621. options.enable_blob_garbage_collection = true;
  622. options.blob_garbage_collection_age_cutoff = 1.0;
  623. options.blob_compaction_readahead_size = 1 << 10;
  624. options.disable_auto_compactions = true;
  625. Reopen(options);
  626. ASSERT_OK(Put("key", "lime"));
  627. ASSERT_OK(Put("foo", "bar"));
  628. ASSERT_OK(Flush());
  629. ASSERT_OK(Put("key", "pie"));
  630. ASSERT_OK(Put("foo", "baz"));
  631. ASSERT_OK(Flush());
  632. size_t num_non_prefetch_reads = 0;
  633. SyncPoint::GetInstance()->SetCallBack(
  634. "BlobFileReader::GetBlob:ReadFromFile",
  635. [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
  636. SyncPoint::GetInstance()->EnableProcessing();
  637. constexpr Slice* begin = nullptr;
  638. constexpr Slice* end = nullptr;
  639. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  640. SyncPoint::GetInstance()->DisableProcessing();
  641. SyncPoint::GetInstance()->ClearAllCallBacks();
  642. ASSERT_EQ(Get("key"), "pie");
  643. ASSERT_EQ(Get("foo"), "baz");
  644. ASSERT_EQ(num_non_prefetch_reads, 0);
  645. Close();
  646. }
  647. TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) {
  648. Options options = GetDefaultOptions();
  649. std::unique_ptr<CompactionFilter> compaction_filter_guard(
  650. new ValueMutationFilter("pie"));
  651. options.compaction_filter = compaction_filter_guard.get();
  652. options.enable_blob_files = true;
  653. options.min_blob_size = 0;
  654. options.blob_compaction_readahead_size = 1 << 10;
  655. options.disable_auto_compactions = true;
  656. Reopen(options);
  657. ASSERT_OK(Put("key", "lime"));
  658. ASSERT_OK(Put("foo", "bar"));
  659. ASSERT_OK(Flush());
  660. size_t num_non_prefetch_reads = 0;
  661. SyncPoint::GetInstance()->SetCallBack(
  662. "BlobFileReader::GetBlob:ReadFromFile",
  663. [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
  664. SyncPoint::GetInstance()->EnableProcessing();
  665. constexpr Slice* begin = nullptr;
  666. constexpr Slice* end = nullptr;
  667. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  668. SyncPoint::GetInstance()->DisableProcessing();
  669. SyncPoint::GetInstance()->ClearAllCallBacks();
  670. ASSERT_EQ(Get("key"), "limepie");
  671. ASSERT_EQ(Get("foo"), "barpie");
  672. ASSERT_EQ(num_non_prefetch_reads, 0);
  673. Close();
  674. }
  675. TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) {
  676. Options options = GetDefaultOptions();
  677. options.enable_blob_files = true;
  678. options.min_blob_size = 0;
  679. options.blob_compaction_readahead_size = 1 << 10;
  680. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  681. options.disable_auto_compactions = true;
  682. Reopen(options);
  683. ASSERT_OK(Put("key", "lime"));
  684. ASSERT_OK(Put("foo", "bar"));
  685. ASSERT_OK(Flush());
  686. ASSERT_OK(Merge("key", "pie"));
  687. ASSERT_OK(Merge("foo", "baz"));
  688. ASSERT_OK(Flush());
  689. size_t num_non_prefetch_reads = 0;
  690. SyncPoint::GetInstance()->SetCallBack(
  691. "BlobFileReader::GetBlob:ReadFromFile",
  692. [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
  693. SyncPoint::GetInstance()->EnableProcessing();
  694. constexpr Slice* begin = nullptr;
  695. constexpr Slice* end = nullptr;
  696. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  697. SyncPoint::GetInstance()->DisableProcessing();
  698. SyncPoint::GetInstance()->ClearAllCallBacks();
  699. ASSERT_EQ(Get("key"), "lime,pie");
  700. ASSERT_EQ(Get("foo"), "bar,baz");
  701. ASSERT_EQ(num_non_prefetch_reads, 0);
  702. Close();
  703. }
  704. TEST_F(DBBlobCompactionTest, CompactionDoNotFillCache) {
  705. Options options = GetDefaultOptions();
  706. options.enable_blob_files = true;
  707. options.min_blob_size = 0;
  708. options.enable_blob_garbage_collection = true;
  709. options.blob_garbage_collection_age_cutoff = 1.0;
  710. options.disable_auto_compactions = true;
  711. options.statistics = CreateDBStatistics();
  712. LRUCacheOptions cache_options;
  713. cache_options.capacity = 1 << 20;
  714. cache_options.metadata_charge_policy = kDontChargeCacheMetadata;
  715. options.blob_cache = NewLRUCache(cache_options);
  716. Reopen(options);
  717. ASSERT_OK(Put("key", "lime"));
  718. ASSERT_OK(Put("foo", "bar"));
  719. ASSERT_OK(Flush());
  720. ASSERT_OK(Put("key", "pie"));
  721. ASSERT_OK(Put("foo", "baz"));
  722. ASSERT_OK(Flush());
  723. constexpr Slice* begin = nullptr;
  724. constexpr Slice* end = nullptr;
  725. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
  726. ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
  727. Close();
  728. }
  729. } // namespace ROCKSDB_NAMESPACE
  730. int main(int argc, char** argv) {
  731. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  732. ::testing::InitGoogleTest(&argc, argv);
  733. RegisterCustomObjects(argc, argv);
  734. return RUN_ALL_TESTS();
  735. }