compaction_job_test.cc 111 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/compaction/compaction_job.h"
  6. #include <algorithm>
  7. #include <array>
  8. #include <cinttypes>
  9. #include <map>
  10. #include <string>
  11. #include <tuple>
  12. #include "db/blob/blob_index.h"
  13. #include "db/column_family.h"
  14. #include "db/db_impl/db_impl.h"
  15. #include "db/error_handler.h"
  16. #include "db/version_set.h"
  17. #include "file/filename.h"
  18. #include "file/random_access_file_reader.h"
  19. #include "file/writable_file_writer.h"
  20. #include "options/options_helper.h"
  21. #include "rocksdb/cache.h"
  22. #include "rocksdb/convenience.h"
  23. #include "rocksdb/db.h"
  24. #include "rocksdb/file_system.h"
  25. #include "rocksdb/options.h"
  26. #include "rocksdb/write_buffer_manager.h"
  27. #include "table/mock_table.h"
  28. #include "table/unique_id_impl.h"
  29. #include "test_util/testharness.h"
  30. #include "test_util/testutil.h"
  31. #include "util/string_util.h"
  32. #include "utilities/merge_operators.h"
  33. namespace ROCKSDB_NAMESPACE {
  34. namespace {
  35. void VerifyInitializationOfCompactionJobStats(
  36. const CompactionJobStats& compaction_job_stats) {
  37. #if !defined(IOS_CROSS_COMPILE)
  38. ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
  39. ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
  40. ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
  41. ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
  42. ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
  43. ASSERT_TRUE(compaction_job_stats.is_manual_compaction);
  44. ASSERT_FALSE(compaction_job_stats.is_remote_compaction);
  45. ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
  46. ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
  47. ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
  48. ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
  49. ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
  50. ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
  51. ASSERT_EQ(compaction_job_stats.num_input_deletion_records, 0U);
  52. ASSERT_EQ(compaction_job_stats.num_expired_deletion_records, 0U);
  53. ASSERT_EQ(compaction_job_stats.num_corrupt_keys, 0U);
  54. #endif // !defined(IOS_CROSS_COMPILE)
  55. }
  56. // Mock FSWritableFile for testing io priority.
  57. // Only override the essential functions for testing compaction io priority.
  58. class MockTestWritableFile : public FSWritableFileOwnerWrapper {
  59. public:
  60. MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
  61. Env::IOPriority io_priority)
  62. : FSWritableFileOwnerWrapper(std::move(file)),
  63. write_io_priority_(io_priority) {}
  64. IOStatus Append(const Slice& data, const IOOptions& options,
  65. IODebugContext* dbg) override {
  66. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  67. return target()->Append(data, options, dbg);
  68. }
  69. IOStatus Append(const Slice& data, const IOOptions& options,
  70. const DataVerificationInfo& verification_info,
  71. IODebugContext* dbg) override {
  72. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  73. return target()->Append(data, options, verification_info, dbg);
  74. }
  75. IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
  76. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  77. return target()->Close(options, dbg);
  78. }
  79. IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
  80. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  81. return target()->Flush(options, dbg);
  82. }
  83. IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
  84. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  85. return target()->Sync(options, dbg);
  86. }
  87. IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
  88. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  89. return target()->Fsync(options, dbg);
  90. }
  91. uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
  92. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  93. return target()->GetFileSize(options, dbg);
  94. }
  95. IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
  96. IODebugContext* dbg) override {
  97. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  98. return target()->RangeSync(offset, nbytes, options, dbg);
  99. }
  100. void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
  101. IODebugContext* dbg) override {
  102. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  103. target()->PrepareWrite(offset, len, options, dbg);
  104. }
  105. IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
  106. IODebugContext* dbg) override {
  107. EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
  108. return target()->Allocate(offset, len, options, dbg);
  109. }
  110. private:
  111. Env::IOPriority write_io_priority_;
  112. };
  113. // Mock FSRandomAccessFile for testing io priority.
  114. // Only override the essential functions for testing compaction io priority.
  115. class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
  116. public:
  117. MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
  118. Env::IOPriority io_priority)
  119. : FSRandomAccessFileOwnerWrapper(std::move(file)),
  120. read_io_priority_(io_priority) {}
  121. IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
  122. Slice* result, char* scratch,
  123. IODebugContext* dbg) const override {
  124. EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
  125. return target()->Read(offset, n, options, result, scratch, dbg);
  126. }
  127. IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
  128. IODebugContext* dbg) override {
  129. EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
  130. return target()->Prefetch(offset, n, options, dbg);
  131. }
  132. private:
  133. Env::IOPriority read_io_priority_;
  134. };
  135. // Mock FileSystem for testing io priority.
  136. class MockTestFileSystem : public FileSystemWrapper {
  137. public:
  138. explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
  139. Env::IOPriority read_io_priority,
  140. Env::IOPriority write_io_priority)
  141. : FileSystemWrapper(base),
  142. read_io_priority_(read_io_priority),
  143. write_io_priority_(write_io_priority) {}
  144. static const char* kClassName() { return "MockTestFileSystem"; }
  145. const char* Name() const override { return kClassName(); }
  146. IOStatus NewRandomAccessFile(const std::string& fname,
  147. const FileOptions& file_opts,
  148. std::unique_ptr<FSRandomAccessFile>* result,
  149. IODebugContext* dbg) override {
  150. IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
  151. EXPECT_OK(s);
  152. result->reset(
  153. new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
  154. return s;
  155. }
  156. IOStatus NewWritableFile(const std::string& fname,
  157. const FileOptions& file_opts,
  158. std::unique_ptr<FSWritableFile>* result,
  159. IODebugContext* dbg) override {
  160. IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
  161. EXPECT_OK(s);
  162. result->reset(
  163. new MockTestWritableFile(std::move(*result), write_io_priority_));
  164. return s;
  165. }
  166. private:
  167. Env::IOPriority read_io_priority_;
  168. Env::IOPriority write_io_priority_;
  169. };
  170. enum TableTypeForTest : uint8_t { kMockTable = 0, kBlockBasedTable = 1 };
  171. } // namespace
  172. class CompactionJobTestBase : public testing::Test {
  173. protected:
  174. CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
  175. std::function<std::string(uint64_t)> encode_u64_ts,
  176. bool test_io_priority, TableTypeForTest table_type)
  177. : dbname_(std::move(dbname)),
  178. ucmp_(ucmp),
  179. db_options_(),
  180. mutable_cf_options_(cf_options_),
  181. mutable_db_options_(),
  182. table_cache_(NewLRUCache(50000, 16)),
  183. write_buffer_manager_(db_options_.db_write_buffer_size),
  184. versions_(new VersionSet(
  185. dbname_, &db_options_, env_options_, table_cache_.get(),
  186. &write_buffer_manager_, &write_controller_,
  187. /*block_cache_tracer=*/nullptr,
  188. /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
  189. /*daily_offpeak_time_utc=*/"",
  190. /*error_handler=*/nullptr, /*unchanging=*/false)),
  191. shutting_down_(false),
  192. mock_table_factory_(new mock::MockTableFactory()),
  193. error_handler_(nullptr, db_options_, &mutex_),
  194. encode_u64_ts_(std::move(encode_u64_ts)),
  195. test_io_priority_(test_io_priority),
  196. table_type_(table_type) {
  197. Env* base_env = Env::Default();
  198. EXPECT_OK(
  199. test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
  200. env_ = base_env;
  201. fs_ = env_->GetFileSystem();
  202. // set default for the tests
  203. mutable_cf_options_.target_file_size_base = 1024 * 1024;
  204. mutable_cf_options_.max_compaction_bytes = 10 * 1024 * 1024;
  205. }
  206. void SetUp() override {
  207. EXPECT_OK(env_->CreateDirIfMissing(dbname_));
  208. db_options_.env = env_;
  209. db_options_.fs = fs_;
  210. db_options_.db_paths.emplace_back(dbname_,
  211. std::numeric_limits<uint64_t>::max());
  212. cf_options_.comparator = ucmp_;
  213. if (table_type_ == TableTypeForTest::kBlockBasedTable) {
  214. BlockBasedTableOptions table_options;
  215. cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
  216. } else if (table_type_ == TableTypeForTest::kMockTable) {
  217. cf_options_.table_factory = mock_table_factory_;
  218. } else {
  219. assert(false);
  220. }
  221. mutable_cf_options_.table_factory = cf_options_.table_factory;
  222. }
  223. std::string GenerateFileName(uint64_t file_number) {
  224. FileMetaData meta;
  225. std::vector<DbPath> db_paths;
  226. db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
  227. meta.fd = FileDescriptor(file_number, 0, 0);
  228. return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
  229. }
  230. std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
  231. const ValueType t, uint64_t ts = 0) {
  232. std::string user_key_with_ts = user_key + encode_u64_ts_(ts);
  233. return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString();
  234. }
  235. static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
  236. uint64_t size) {
  237. std::string blob_index;
  238. BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
  239. kNoCompression);
  240. return blob_index;
  241. }
  242. static std::string BlobStrTTL(uint64_t blob_file_number, uint64_t offset,
  243. uint64_t size, uint64_t expiration) {
  244. std::string blob_index;
  245. BlobIndex::EncodeBlobTTL(&blob_index, expiration, blob_file_number, offset,
  246. size, kNoCompression);
  247. return blob_index;
  248. }
  249. static std::string BlobStrInlinedTTL(const Slice& value,
  250. uint64_t expiration) {
  251. std::string blob_index;
  252. BlobIndex::EncodeInlinedTTL(&blob_index, expiration, value);
  253. return blob_index;
  254. }
  255. // Creates a table with the specificied key value pairs.
  256. void CreateTable(const std::string& table_name,
  257. const mock::KVVector& contents, uint64_t& file_size) {
  258. std::unique_ptr<WritableFileWriter> file_writer;
  259. Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
  260. &file_writer, nullptr);
  261. ASSERT_OK(s);
  262. const ReadOptions read_options;
  263. const WriteOptions write_options;
  264. std::unique_ptr<TableBuilder> table_builder(
  265. cf_options_.table_factory->NewTableBuilder(
  266. TableBuilderOptions(
  267. cfd_->ioptions(), mutable_cf_options_, read_options,
  268. write_options, cfd_->internal_comparator(),
  269. cfd_->internal_tbl_prop_coll_factories(),
  270. CompressionType::kNoCompression, CompressionOptions(),
  271. 0 /* column_family_id */, kDefaultColumnFamilyName,
  272. -1 /* level */, kUnknownNewestKeyTime),
  273. file_writer.get()));
  274. // Build table.
  275. for (const auto& kv : contents) {
  276. std::string key;
  277. std::string value;
  278. std::tie(key, value) = kv;
  279. table_builder->Add(key, value);
  280. }
  281. ASSERT_OK(table_builder->Finish());
  282. file_size = table_builder->FileSize();
  283. }
  284. void AddMockFile(const mock::KVVector& contents, int level = 0) {
  285. assert(contents.size() > 0);
  286. bool first_key = true;
  287. std::string smallest, largest;
  288. InternalKey smallest_key, largest_key;
  289. SequenceNumber smallest_seqno = kMaxSequenceNumber;
  290. SequenceNumber largest_seqno = 0;
  291. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
  292. for (const auto& kv : contents) {
  293. ParsedInternalKey key;
  294. std::string skey;
  295. std::string value;
  296. std::tie(skey, value) = kv;
  297. const Status pik_status =
  298. ParseInternalKey(skey, &key, true /* log_err_key */);
  299. smallest_seqno = std::min(smallest_seqno, key.sequence);
  300. largest_seqno = std::max(largest_seqno, key.sequence);
  301. if (first_key ||
  302. cfd_->user_comparator()->Compare(key.user_key, smallest) < 0) {
  303. smallest.assign(key.user_key.data(), key.user_key.size());
  304. smallest_key.DecodeFrom(skey);
  305. }
  306. if (first_key ||
  307. cfd_->user_comparator()->Compare(key.user_key, largest) > 0) {
  308. largest.assign(key.user_key.data(), key.user_key.size());
  309. largest_key.DecodeFrom(skey);
  310. }
  311. first_key = false;
  312. if (pik_status.ok() && key.type == kTypeBlobIndex) {
  313. BlobIndex blob_index;
  314. const Status s = blob_index.DecodeFrom(value);
  315. if (!s.ok()) {
  316. continue;
  317. }
  318. if (blob_index.IsInlined() || blob_index.HasTTL() ||
  319. blob_index.file_number() == kInvalidBlobFileNumber) {
  320. continue;
  321. }
  322. if (oldest_blob_file_number == kInvalidBlobFileNumber ||
  323. oldest_blob_file_number > blob_index.file_number()) {
  324. oldest_blob_file_number = blob_index.file_number();
  325. }
  326. }
  327. }
  328. uint64_t file_number = versions_->NewFileNumber();
  329. uint64_t file_size = 0;
  330. if (table_type_ == TableTypeForTest::kBlockBasedTable) {
  331. CreateTable(GenerateFileName(file_number), contents, file_size);
  332. } else if (table_type_ == TableTypeForTest::kMockTable) {
  333. file_size = 10;
  334. EXPECT_OK(mock_table_factory_->CreateMockTable(
  335. env_, GenerateFileName(file_number), contents));
  336. } else {
  337. assert(false);
  338. }
  339. VersionEdit edit;
  340. edit.AddFile(
  341. level, file_number, 0, file_size, smallest_key, largest_key,
  342. smallest_seqno, largest_seqno, false, Temperature::kUnknown,
  343. oldest_blob_file_number, kUnknownOldestAncesterTime,
  344. kUnknownFileCreationTime,
  345. versions_->GetColumnFamilySet()->GetDefault()->NewEpochNumber(),
  346. kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2,
  347. /*compensated_range_deletion_size=*/0, /*tail_size=*/0,
  348. /*user_defined_timestamps_persisted=*/true);
  349. mutex_.Lock();
  350. EXPECT_OK(versions_->LogAndApply(
  351. versions_->GetColumnFamilySet()->GetDefault(), read_options_,
  352. write_options_, &edit, &mutex_, nullptr));
  353. mutex_.Unlock();
  354. }
  355. void VerifyTables(int output_level,
  356. const std::vector<mock::KVVector>& expected_results,
  357. std::vector<uint64_t> expected_oldest_blob_file_numbers) {
  358. if (expected_results.empty()) {
  359. ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
  360. return;
  361. }
  362. int expected_output_file_num = 0;
  363. for (const auto& e : expected_results) {
  364. if (!e.empty()) {
  365. ++expected_output_file_num;
  366. }
  367. }
  368. ASSERT_EQ(expected_output_file_num, compaction_job_stats_.num_output_files);
  369. if (expected_output_file_num == 0) {
  370. return;
  371. }
  372. if (expected_oldest_blob_file_numbers.empty()) {
  373. expected_oldest_blob_file_numbers.resize(expected_output_file_num,
  374. kInvalidBlobFileNumber);
  375. }
  376. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  377. if (table_type_ == TableTypeForTest::kMockTable) {
  378. ASSERT_EQ(compaction_job_stats_.num_output_files,
  379. expected_results.size());
  380. mock_table_factory_->AssertLatestFiles(expected_results);
  381. } else {
  382. assert(table_type_ == TableTypeForTest::kBlockBasedTable);
  383. }
  384. auto output_files =
  385. cfd->current()->storage_info()->LevelFiles(output_level);
  386. ASSERT_EQ(expected_output_file_num, output_files.size());
  387. if (table_type_ == TableTypeForTest::kMockTable) {
  388. assert(output_files.size() ==
  389. static_cast<size_t>(expected_output_file_num));
  390. const FileMetaData* const output_file = output_files[0];
  391. ASSERT_EQ(output_file->oldest_blob_file_number,
  392. expected_oldest_blob_file_numbers[0]);
  393. return;
  394. }
  395. for (size_t i = 0; i < expected_results.size(); ++i) {
  396. const FileMetaData* const output_file = output_files[i];
  397. std::string file_name = GenerateFileName(output_file->fd.GetNumber());
  398. const auto& fs = env_->GetFileSystem();
  399. std::unique_ptr<RandomAccessFileReader> freader;
  400. IOStatus ios = RandomAccessFileReader::Create(
  401. fs, file_name, FileOptions(), &freader, nullptr);
  402. ASSERT_OK(ios);
  403. std::unique_ptr<TableReader> table_reader;
  404. uint64_t file_size = output_file->fd.GetFileSize();
  405. ReadOptions read_opts;
  406. Status s = cf_options_.table_factory->NewTableReader(
  407. read_opts,
  408. TableReaderOptions(cfd->ioptions(), /*prefix_extractor=*/nullptr,
  409. /*compression_manager=*/nullptr, FileOptions(),
  410. cfd_->internal_comparator(),
  411. /*block_protection_bytes_per_key=*/0),
  412. std::move(freader), file_size, &table_reader, false);
  413. ASSERT_OK(s);
  414. assert(table_reader);
  415. std::unique_ptr<InternalIterator> iiter(
  416. table_reader->NewIterator(read_opts, nullptr, nullptr, true,
  417. TableReaderCaller::kUncategorized));
  418. assert(iiter);
  419. mock::KVVector from_db;
  420. for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) {
  421. const Slice key = iiter->key();
  422. const Slice value = iiter->value();
  423. from_db.emplace_back(
  424. make_pair(key.ToString(false), value.ToString(false)));
  425. }
  426. ASSERT_EQ(expected_results[i], from_db);
  427. }
  428. }
  429. void SetLastSequence(const SequenceNumber sequence_number) {
  430. versions_->SetLastAllocatedSequence(sequence_number + 1);
  431. versions_->SetLastPublishedSequence(sequence_number + 1);
  432. versions_->SetLastSequence(sequence_number + 1);
  433. }
  434. // returns expected result after compaction
  435. mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
  436. stl_wrappers::KVMap expected_results;
  437. constexpr int kKeysPerFile = 10000;
  438. constexpr int kCorruptKeysPerFile = 200;
  439. constexpr int kMatchingKeys = kKeysPerFile / 2;
  440. SequenceNumber sequence_number = 0;
  441. auto corrupt_id = [&](int id) {
  442. return gen_corrupted_keys && id > 0 && id <= kCorruptKeysPerFile;
  443. };
  444. for (int i = 0; i < 2; ++i) {
  445. auto contents = mock::MakeMockFile();
  446. for (int k = 0; k < kKeysPerFile; ++k) {
  447. auto key = std::to_string(i * kMatchingKeys + k);
  448. auto value = std::to_string(i * kKeysPerFile + k);
  449. InternalKey internal_key(key, ++sequence_number, kTypeValue);
  450. // This is how the key will look like once it's written in bottommost
  451. // file
  452. InternalKey bottommost_internal_key(key, 0, kTypeValue);
  453. if (corrupt_id(k)) {
  454. test::CorruptKeyType(&internal_key);
  455. test::CorruptKeyType(&bottommost_internal_key);
  456. }
  457. contents.push_back({internal_key.Encode().ToString(), value});
  458. if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
  459. expected_results.insert(
  460. {bottommost_internal_key.Encode().ToString(), value});
  461. }
  462. }
  463. mock::SortKVVector(&contents, ucmp_);
  464. AddMockFile(contents);
  465. }
  466. SetLastSequence(sequence_number);
  467. mock::KVVector expected_results_kvvector;
  468. for (auto& kv : expected_results) {
  469. expected_results_kvvector.push_back({kv.first, kv.second});
  470. }
  471. return expected_results_kvvector;
  472. }
  473. void NewDB() {
  474. EXPECT_OK(DestroyDB(dbname_, Options()));
  475. EXPECT_OK(env_->CreateDirIfMissing(dbname_));
  476. std::shared_ptr<Logger> info_log;
  477. DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_);
  478. Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log);
  479. ASSERT_OK(s);
  480. db_options_.info_log = info_log;
  481. versions_.reset(
  482. new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
  483. &write_buffer_manager_, &write_controller_,
  484. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  485. test::kUnitTestDbId, /*db_session_id=*/"",
  486. /*daily_offpeak_time_utc=*/"",
  487. /*error_handler=*/nullptr, /*unchanging=*/false));
  488. compaction_job_stats_.Reset();
  489. VersionEdit new_db;
  490. new_db.SetLogNumber(0);
  491. new_db.SetNextFile(2);
  492. new_db.SetLastSequence(0);
  493. const std::string manifest = DescriptorFileName(dbname_, 1);
  494. std::unique_ptr<WritableFileWriter> file_writer;
  495. const auto& fs = env_->GetFileSystem();
  496. s = WritableFileWriter::Create(fs, manifest,
  497. fs->OptimizeForManifestWrite(env_options_),
  498. &file_writer, nullptr);
  499. ASSERT_OK(s);
  500. {
  501. log::Writer log(std::move(file_writer), 0, false);
  502. std::string record;
  503. new_db.EncodeTo(&record);
  504. s = log.AddRecord(WriteOptions(), record);
  505. }
  506. ASSERT_OK(s);
  507. // Make "CURRENT" file that points to the new manifest file.
  508. s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1,
  509. Temperature::kUnknown, nullptr);
  510. ASSERT_OK(s);
  511. cf_options_.merge_operator = merge_op_;
  512. cf_options_.compaction_filter = compaction_filter_.get();
  513. std::vector<ColumnFamilyDescriptor> column_families;
  514. column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
  515. ASSERT_OK(versions_->Recover(column_families, false));
  516. cfd_ = versions_->GetColumnFamilySet()->GetDefault();
  517. }
  518. // input_files[i] on input_levels[i]
  519. void RunLastLevelCompaction(
  520. const std::vector<std::vector<FileMetaData*>>& input_files,
  521. const std::vector<int> input_levels,
  522. std::function<void(Compaction& comp)>&& verify_func,
  523. std::vector<SequenceNumber>&& snapshots = {}) {
  524. const int kLastLevel = cf_options_.num_levels - 1;
  525. verify_per_key_placement_ = std::move(verify_func);
  526. mock::KVVector empty_map;
  527. RunCompaction(input_files, input_levels, {empty_map}, std::move(snapshots),
  528. kMaxSequenceNumber, kLastLevel, false);
  529. }
  530. // input_files[i] on input_levels[i]
  531. void RunCompaction(
  532. const std::vector<std::vector<FileMetaData*>>& input_files,
  533. const std::vector<int>& input_levels,
  534. const std::vector<mock::KVVector>& expected_results,
  535. std::vector<SequenceNumber>&& snapshots = {},
  536. SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
  537. int output_level = 1, bool verify = true,
  538. std::vector<uint64_t> expected_oldest_blob_file_numbers = {},
  539. bool check_get_priority = false,
  540. Env::IOPriority read_io_priority = Env::IO_TOTAL,
  541. Env::IOPriority write_io_priority = Env::IO_TOTAL,
  542. int max_subcompactions = 0) {
  543. // For compaction, set fs as MockTestFileSystem to check the io_priority.
  544. if (test_io_priority_) {
  545. db_options_.fs.reset(
  546. new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
  547. }
  548. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  549. size_t num_input_files = 0;
  550. std::vector<CompactionInputFiles> compaction_input_files;
  551. for (size_t i = 0; i < input_files.size(); ++i) {
  552. auto level_files = input_files[i];
  553. CompactionInputFiles compaction_level;
  554. compaction_level.level = input_levels[i];
  555. compaction_level.files.insert(compaction_level.files.end(),
  556. level_files.begin(), level_files.end());
  557. compaction_input_files.push_back(compaction_level);
  558. num_input_files += level_files.size();
  559. }
  560. std::vector<FileMetaData*> grandparents;
  561. // it should actually be the next non-empty level
  562. const int kGrandparentsLevel = output_level + 1;
  563. if (kGrandparentsLevel < cf_options_.num_levels) {
  564. grandparents =
  565. cfd_->current()->storage_info()->LevelFiles(kGrandparentsLevel);
  566. }
  567. Compaction compaction(
  568. cfd->current()->storage_info(), cfd->ioptions(),
  569. cfd->GetLatestMutableCFOptions(), mutable_db_options_,
  570. compaction_input_files, output_level,
  571. mutable_cf_options_.target_file_size_base,
  572. mutable_cf_options_.max_compaction_bytes, 0, kNoCompression,
  573. cfd->GetLatestMutableCFOptions().compression_opts,
  574. Temperature::kUnknown, max_subcompactions, grandparents,
  575. /*earliest_snapshot*/ std::nullopt, /*snapshot_checker*/ nullptr,
  576. CompactionReason::kManualCompaction);
  577. compaction.FinalizeInputInfo(cfd->current());
  578. assert(db_options_.info_log);
  579. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
  580. mutex_.Lock();
  581. EventLogger event_logger(db_options_.info_log.get());
  582. // TODO(yiwu) add a mock snapshot checker and add test for it.
  583. SnapshotChecker* snapshot_checker = nullptr;
  584. ASSERT_TRUE(full_history_ts_low_.empty() ||
  585. ucmp_->timestamp_size() == full_history_ts_low_.size());
  586. const std::atomic<bool> kManualCompactionCanceledFalse{false};
  587. JobContext job_context(1, false /* create_superversion */);
  588. job_context.InitSnapshotContext(snapshot_checker, nullptr,
  589. earliest_write_conflict_snapshot,
  590. std::move(snapshots));
  591. CompactionJob compaction_job(
  592. 0, &compaction, db_options_, mutable_db_options_, env_options_,
  593. versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
  594. nullptr, nullptr, &mutex_, &error_handler_, &job_context, table_cache_,
  595. &event_logger, false, false, dbname_, &compaction_job_stats_,
  596. Env::Priority::USER, nullptr /* IOTracer */,
  597. /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
  598. env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
  599. full_history_ts_low_);
  600. VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
  601. compaction_job.Prepare(std::nullopt /*subcompact to be computed*/);
  602. mutex_.Unlock();
  603. Status s = compaction_job.Run();
  604. ASSERT_OK(s);
  605. ASSERT_OK(compaction_job.io_status());
  606. mutex_.Lock();
  607. bool compaction_released = false;
  608. ASSERT_OK(compaction_job.Install(&compaction_released));
  609. ASSERT_OK(compaction_job.io_status());
  610. mutex_.Unlock();
  611. log_buffer.FlushBufferToLog();
  612. if (verify) {
  613. ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
  614. ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
  615. VerifyTables(output_level, expected_results,
  616. expected_oldest_blob_file_numbers);
  617. }
  618. if (check_get_priority) {
  619. CheckGetRateLimiterPriority(compaction_job);
  620. }
  621. if (verify_per_key_placement_) {
  622. // Verify per_key_placement compaction
  623. assert(compaction.SupportsPerKeyPlacement());
  624. verify_per_key_placement_(compaction);
  625. }
  626. }
  627. void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
  628. // When the state from WriteController is normal.
  629. ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW);
  630. WriteController* write_controller =
  631. compaction_job.versions_->GetColumnFamilySet()->write_controller();
  632. {
  633. // When the state from WriteController is Delayed.
  634. std::unique_ptr<WriteControllerToken> delay_token =
  635. write_controller->GetDelayToken(1000000);
  636. ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
  637. }
  638. {
  639. // When the state from WriteController is Stopped.
  640. std::unique_ptr<WriteControllerToken> stop_token =
  641. write_controller->GetStopToken();
  642. ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
  643. }
  644. }
  645. std::shared_ptr<Env> env_guard_;
  646. Env* env_;
  647. std::shared_ptr<FileSystem> fs_;
  648. std::string dbname_;
  649. const Comparator* const ucmp_;
  650. EnvOptions env_options_;
  651. ImmutableDBOptions db_options_;
  652. ColumnFamilyOptions cf_options_;
  653. MutableCFOptions mutable_cf_options_;
  654. MutableDBOptions mutable_db_options_;
  655. const ReadOptions read_options_;
  656. const WriteOptions write_options_;
  657. std::shared_ptr<Cache> table_cache_;
  658. WriteController write_controller_;
  659. WriteBufferManager write_buffer_manager_;
  660. std::unique_ptr<VersionSet> versions_;
  661. InstrumentedMutex mutex_;
  662. std::atomic<bool> shutting_down_;
  663. std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
  664. CompactionJobStats compaction_job_stats_;
  665. ColumnFamilyData* cfd_;
  666. std::unique_ptr<CompactionFilter> compaction_filter_;
  667. std::shared_ptr<MergeOperator> merge_op_;
  668. ErrorHandler error_handler_;
  669. std::string full_history_ts_low_;
  670. const std::function<std::string(uint64_t)> encode_u64_ts_;
  671. const bool test_io_priority_;
  672. std::function<void(Compaction& comp)> verify_per_key_placement_;
  673. const TableTypeForTest table_type_ = kMockTable;
  674. };
  675. // TODO(icanadi) Make it simpler once we mock out VersionSet
  676. class CompactionJobTest : public CompactionJobTestBase {
  677. public:
  678. CompactionJobTest()
  679. : CompactionJobTestBase(
  680. test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
  681. [](uint64_t /*ts*/) { return ""; }, /*test_io_priority=*/false,
  682. TableTypeForTest::kMockTable) {}
  683. };
  684. TEST_F(CompactionJobTest, Simple) {
  685. NewDB();
  686. auto expected_results = CreateTwoFiles(false);
  687. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  688. constexpr int input_level = 0;
  689. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  690. ASSERT_EQ(2U, files.size());
  691. RunCompaction({files}, {input_level}, {expected_results});
  692. }
  693. TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
  694. NewDB();
  695. auto expected_results = CreateTwoFiles(true);
  696. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  697. constexpr int input_level = 0;
  698. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  699. RunCompaction({files}, {input_level}, {expected_results});
  700. ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
  701. }
  702. TEST_F(CompactionJobTest, SimpleDeletion) {
  703. NewDB();
  704. auto file1 = mock::MakeMockFile({{KeyStr("c", 4U, kTypeDeletion), ""},
  705. {KeyStr("c", 3U, kTypeValue), "val"}});
  706. AddMockFile(file1);
  707. auto file2 = mock::MakeMockFile({{KeyStr("b", 2U, kTypeValue), "val"},
  708. {KeyStr("b", 1U, kTypeValue), "val"}});
  709. AddMockFile(file2);
  710. auto expected_results =
  711. mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}});
  712. SetLastSequence(4U);
  713. constexpr int input_level = 0;
  714. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  715. RunCompaction({files}, {input_level}, {expected_results});
  716. }
  717. TEST_F(CompactionJobTest, OutputNothing) {
  718. NewDB();
  719. auto file1 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"}});
  720. AddMockFile(file1);
  721. auto file2 = mock::MakeMockFile({{KeyStr("a", 2U, kTypeDeletion), ""}});
  722. AddMockFile(file2);
  723. auto expected_results = mock::MakeMockFile();
  724. SetLastSequence(4U);
  725. constexpr int input_level = 0;
  726. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  727. RunCompaction({files}, {input_level}, {expected_results});
  728. }
  729. TEST_F(CompactionJobTest, SimpleOverwrite) {
  730. NewDB();
  731. auto file1 = mock::MakeMockFile({
  732. {KeyStr("a", 3U, kTypeValue), "val2"},
  733. {KeyStr("b", 4U, kTypeValue), "val3"},
  734. });
  735. AddMockFile(file1);
  736. auto file2 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
  737. {KeyStr("b", 2U, kTypeValue), "val"}});
  738. AddMockFile(file2);
  739. auto expected_results =
  740. mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "val2"},
  741. {KeyStr("b", 0U, kTypeValue), "val3"}});
  742. SetLastSequence(4U);
  743. constexpr int input_level = 0;
  744. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  745. RunCompaction({files}, {input_level}, {expected_results});
  746. }
  747. TEST_F(CompactionJobTest, SimpleNonLastLevel) {
  748. NewDB();
  749. auto file1 = mock::MakeMockFile({
  750. {KeyStr("a", 5U, kTypeValue), "val2"},
  751. {KeyStr("b", 6U, kTypeValue), "val3"},
  752. });
  753. AddMockFile(file1);
  754. auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
  755. {KeyStr("b", 4U, kTypeValue), "val"}});
  756. AddMockFile(file2, 1);
  757. auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
  758. {KeyStr("b", 2U, kTypeValue), "val"}});
  759. AddMockFile(file3, 2);
  760. // Because level 1 is not the last level, the sequence numbers of a and b
  761. // cannot be set to 0
  762. auto expected_results =
  763. mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
  764. {KeyStr("b", 6U, kTypeValue), "val3"}});
  765. SetLastSequence(6U);
  766. const std::vector<int> input_levels = {0, 1};
  767. auto lvl0_files =
  768. cfd_->current()->storage_info()->LevelFiles(input_levels[0]);
  769. auto lvl1_files =
  770. cfd_->current()->storage_info()->LevelFiles(input_levels[1]);
  771. RunCompaction({lvl0_files, lvl1_files}, input_levels, {expected_results});
  772. }
  773. TEST_F(CompactionJobTest, SimpleMerge) {
  774. merge_op_ = MergeOperators::CreateStringAppendOperator();
  775. NewDB();
  776. auto file1 = mock::MakeMockFile({
  777. {KeyStr("a", 5U, kTypeMerge), "5"},
  778. {KeyStr("a", 4U, kTypeMerge), "4"},
  779. {KeyStr("a", 3U, kTypeValue), "3"},
  780. });
  781. AddMockFile(file1);
  782. auto file2 = mock::MakeMockFile(
  783. {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeValue), "1"}});
  784. AddMockFile(file2);
  785. auto expected_results =
  786. mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
  787. {KeyStr("b", 0U, kTypeValue), "1,2"}});
  788. SetLastSequence(5U);
  789. constexpr int input_level = 0;
  790. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  791. RunCompaction({files}, {input_level}, {expected_results});
  792. }
  793. TEST_F(CompactionJobTest, NonAssocMerge) {
  794. merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
  795. NewDB();
  796. auto file1 = mock::MakeMockFile({
  797. {KeyStr("a", 5U, kTypeMerge), "5"},
  798. {KeyStr("a", 4U, kTypeMerge), "4"},
  799. {KeyStr("a", 3U, kTypeMerge), "3"},
  800. });
  801. AddMockFile(file1);
  802. auto file2 = mock::MakeMockFile(
  803. {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeMerge), "1"}});
  804. AddMockFile(file2);
  805. auto expected_results =
  806. mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
  807. {KeyStr("b", 0U, kTypeValue), "1,2"}});
  808. SetLastSequence(5U);
  809. constexpr int input_level = 0;
  810. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  811. RunCompaction({files}, {input_level}, {expected_results});
  812. }
  813. // Filters merge operands with value 10.
  814. TEST_F(CompactionJobTest, MergeOperandFilter) {
  815. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  816. compaction_filter_.reset(new test::FilterNumber(10U));
  817. NewDB();
  818. auto file1 = mock::MakeMockFile(
  819. {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
  820. {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
  821. {KeyStr("a", 3U, kTypeMerge), test::EncodeInt(3U)}});
  822. AddMockFile(file1);
  823. auto file2 = mock::MakeMockFile({
  824. {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)},
  825. {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)} // Filtered
  826. });
  827. AddMockFile(file2);
  828. auto expected_results =
  829. mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), test::EncodeInt(8U)},
  830. {KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}});
  831. SetLastSequence(5U);
  832. constexpr int input_level = 0;
  833. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  834. RunCompaction({files}, {input_level}, {expected_results});
  835. }
  836. TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
  837. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  838. compaction_filter_.reset(new test::FilterNumber(10U));
  839. NewDB();
  840. auto file1 = mock::MakeMockFile(
  841. {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
  842. {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
  843. {KeyStr("a", 3U, kTypeValue), test::EncodeInt(5U)},
  844. {KeyStr("d", 8U, kTypeMerge), test::EncodeInt(10U)}});
  845. AddMockFile(file1);
  846. auto file2 =
  847. mock::MakeMockFile({{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
  848. {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)},
  849. {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(3U)},
  850. {KeyStr("c", 1U, kTypeValue), test::EncodeInt(7U)},
  851. {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}});
  852. AddMockFile(file2);
  853. auto file3 =
  854. mock::MakeMockFile({{KeyStr("a", 1U, kTypeMerge), test::EncodeInt(3U)}});
  855. AddMockFile(file3, 2);
  856. auto expected_results = mock::MakeMockFile({
  857. {KeyStr("a", 5U, kTypeValue), test::EncodeInt(10U)},
  858. {KeyStr("c", 2U, kTypeValue), test::EncodeInt(10U)},
  859. {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}
  860. // b does not appear because the operands are filtered
  861. });
  862. SetLastSequence(5U);
  863. constexpr int input_level = 0;
  864. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  865. RunCompaction({files}, {input_level}, {expected_results});
  866. }
  867. // Test where all operands/merge results are filtered out.
  868. TEST_F(CompactionJobTest, FilterAllMergeOperands) {
  869. merge_op_ = MergeOperators::CreateUInt64AddOperator();
  870. compaction_filter_.reset(new test::FilterNumber(10U));
  871. NewDB();
  872. auto file1 =
  873. mock::MakeMockFile({{KeyStr("a", 11U, kTypeMerge), test::EncodeInt(10U)},
  874. {KeyStr("a", 10U, kTypeMerge), test::EncodeInt(10U)},
  875. {KeyStr("a", 9U, kTypeMerge), test::EncodeInt(10U)}});
  876. AddMockFile(file1);
  877. auto file2 =
  878. mock::MakeMockFile({{KeyStr("b", 8U, kTypeMerge), test::EncodeInt(10U)},
  879. {KeyStr("b", 7U, kTypeMerge), test::EncodeInt(10U)},
  880. {KeyStr("b", 6U, kTypeMerge), test::EncodeInt(10U)},
  881. {KeyStr("b", 5U, kTypeMerge), test::EncodeInt(10U)},
  882. {KeyStr("b", 4U, kTypeMerge), test::EncodeInt(10U)},
  883. {KeyStr("b", 3U, kTypeMerge), test::EncodeInt(10U)},
  884. {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
  885. {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(10U)},
  886. {KeyStr("c", 1U, kTypeMerge), test::EncodeInt(10U)}});
  887. AddMockFile(file2);
  888. auto file3 =
  889. mock::MakeMockFile({{KeyStr("a", 2U, kTypeMerge), test::EncodeInt(10U)},
  890. {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}});
  891. AddMockFile(file3, 2);
  892. SetLastSequence(11U);
  893. constexpr int input_level = 0;
  894. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  895. mock::KVVector empty_map;
  896. RunCompaction({files}, {input_level}, {empty_map});
  897. }
  898. TEST_F(CompactionJobTest, SimpleSingleDelete) {
  899. NewDB();
  900. auto file1 = mock::MakeMockFile({
  901. {KeyStr("a", 5U, kTypeDeletion), ""},
  902. {KeyStr("b", 6U, kTypeSingleDeletion), ""},
  903. });
  904. AddMockFile(file1);
  905. auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
  906. {KeyStr("b", 4U, kTypeValue), "val"}});
  907. AddMockFile(file2);
  908. auto file3 = mock::MakeMockFile({
  909. {KeyStr("a", 1U, kTypeValue), "val"},
  910. });
  911. AddMockFile(file3, 2);
  912. auto expected_results =
  913. mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
  914. SetLastSequence(6U);
  915. constexpr int input_level = 0;
  916. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  917. RunCompaction({files}, {input_level}, {expected_results});
  918. }
  919. TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
  920. NewDB();
  921. auto file1 = mock::MakeMockFile({
  922. {KeyStr("A", 12U, kTypeSingleDeletion), ""},
  923. {KeyStr("a", 12U, kTypeSingleDeletion), ""},
  924. {KeyStr("b", 21U, kTypeSingleDeletion), ""},
  925. {KeyStr("c", 22U, kTypeSingleDeletion), ""},
  926. {KeyStr("d", 9U, kTypeSingleDeletion), ""},
  927. {KeyStr("f", 21U, kTypeSingleDeletion), ""},
  928. {KeyStr("j", 11U, kTypeSingleDeletion), ""},
  929. {KeyStr("j", 9U, kTypeSingleDeletion), ""},
  930. {KeyStr("k", 12U, kTypeSingleDeletion), ""},
  931. {KeyStr("k", 11U, kTypeSingleDeletion), ""},
  932. {KeyStr("l", 3U, kTypeSingleDeletion), ""},
  933. {KeyStr("l", 2U, kTypeSingleDeletion), ""},
  934. });
  935. AddMockFile(file1);
  936. auto file2 = mock::MakeMockFile({
  937. {KeyStr("0", 2U, kTypeSingleDeletion), ""},
  938. {KeyStr("a", 11U, kTypeValue), "val1"},
  939. {KeyStr("b", 11U, kTypeValue), "val2"},
  940. {KeyStr("c", 21U, kTypeValue), "val3"},
  941. {KeyStr("d", 8U, kTypeValue), "val4"},
  942. {KeyStr("e", 2U, kTypeSingleDeletion), ""},
  943. {KeyStr("f", 1U, kTypeValue), "val1"},
  944. {KeyStr("g", 11U, kTypeSingleDeletion), ""},
  945. {KeyStr("h", 2U, kTypeSingleDeletion), ""},
  946. {KeyStr("m", 12U, kTypeValue), "val1"},
  947. {KeyStr("m", 11U, kTypeSingleDeletion), ""},
  948. {KeyStr("m", 8U, kTypeValue), "val2"},
  949. });
  950. AddMockFile(file2);
  951. auto file3 = mock::MakeMockFile({
  952. {KeyStr("A", 1U, kTypeValue), "val"},
  953. {KeyStr("e", 1U, kTypeValue), "val"},
  954. });
  955. AddMockFile(file3, 2);
  956. auto expected_results = mock::MakeMockFile({
  957. {KeyStr("A", 12U, kTypeSingleDeletion), ""},
  958. {KeyStr("a", 12U, kTypeSingleDeletion), ""},
  959. {KeyStr("a", 11U, kTypeValue), ""},
  960. {KeyStr("b", 21U, kTypeSingleDeletion), ""},
  961. {KeyStr("b", 11U, kTypeValue), "val2"},
  962. {KeyStr("c", 22U, kTypeSingleDeletion), ""},
  963. {KeyStr("c", 21U, kTypeValue), ""},
  964. {KeyStr("e", 2U, kTypeSingleDeletion), ""},
  965. {KeyStr("f", 21U, kTypeSingleDeletion), ""},
  966. {KeyStr("f", 1U, kTypeValue), "val1"},
  967. {KeyStr("g", 11U, kTypeSingleDeletion), ""},
  968. {KeyStr("j", 11U, kTypeSingleDeletion), ""},
  969. {KeyStr("k", 11U, kTypeSingleDeletion), ""},
  970. {KeyStr("m", 12U, kTypeValue), "val1"},
  971. {KeyStr("m", 11U, kTypeSingleDeletion), ""},
  972. {KeyStr("m", 8U, kTypeValue), "val2"},
  973. });
  974. SetLastSequence(22U);
  975. constexpr int input_level = 0;
  976. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  977. RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U}, 10U);
  978. }
  979. TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
  980. NewDB();
  981. // Test multiple snapshots where the earliest snapshot is not a
  982. // write-conflic-snapshot.
  983. auto file1 = mock::MakeMockFile({
  984. {KeyStr("A", 24U, kTypeSingleDeletion), ""},
  985. {KeyStr("A", 23U, kTypeValue), "val"},
  986. {KeyStr("B", 24U, kTypeSingleDeletion), ""},
  987. {KeyStr("B", 23U, kTypeValue), "val"},
  988. {KeyStr("D", 24U, kTypeSingleDeletion), ""},
  989. {KeyStr("G", 32U, kTypeSingleDeletion), ""},
  990. {KeyStr("G", 31U, kTypeValue), "val"},
  991. {KeyStr("G", 24U, kTypeSingleDeletion), ""},
  992. {KeyStr("G", 23U, kTypeValue), "val2"},
  993. {KeyStr("H", 31U, kTypeValue), "val"},
  994. {KeyStr("H", 24U, kTypeSingleDeletion), ""},
  995. {KeyStr("H", 23U, kTypeValue), "val"},
  996. {KeyStr("I", 35U, kTypeSingleDeletion), ""},
  997. {KeyStr("I", 34U, kTypeValue), "val2"},
  998. {KeyStr("I", 33U, kTypeSingleDeletion), ""},
  999. {KeyStr("I", 32U, kTypeValue), "val3"},
  1000. {KeyStr("I", 31U, kTypeSingleDeletion), ""},
  1001. {KeyStr("J", 34U, kTypeValue), "val"},
  1002. {KeyStr("J", 33U, kTypeSingleDeletion), ""},
  1003. {KeyStr("J", 25U, kTypeValue), "val2"},
  1004. {KeyStr("J", 24U, kTypeSingleDeletion), ""},
  1005. });
  1006. AddMockFile(file1);
  1007. auto file2 = mock::MakeMockFile({
  1008. {KeyStr("A", 14U, kTypeSingleDeletion), ""},
  1009. {KeyStr("A", 13U, kTypeValue), "val2"},
  1010. {KeyStr("C", 14U, kTypeSingleDeletion), ""},
  1011. {KeyStr("C", 13U, kTypeValue), "val"},
  1012. {KeyStr("E", 12U, kTypeSingleDeletion), ""},
  1013. {KeyStr("F", 4U, kTypeSingleDeletion), ""},
  1014. {KeyStr("F", 3U, kTypeValue), "val"},
  1015. {KeyStr("G", 14U, kTypeSingleDeletion), ""},
  1016. {KeyStr("G", 13U, kTypeValue), "val3"},
  1017. {KeyStr("H", 14U, kTypeSingleDeletion), ""},
  1018. {KeyStr("H", 13U, kTypeValue), "val2"},
  1019. {KeyStr("I", 13U, kTypeValue), "val4"},
  1020. {KeyStr("I", 12U, kTypeSingleDeletion), ""},
  1021. {KeyStr("I", 11U, kTypeValue), "val5"},
  1022. {KeyStr("J", 15U, kTypeValue), "val3"},
  1023. {KeyStr("J", 14U, kTypeSingleDeletion), ""},
  1024. });
  1025. AddMockFile(file2);
  1026. auto expected_results = mock::MakeMockFile({
  1027. {KeyStr("A", 24U, kTypeSingleDeletion), ""},
  1028. {KeyStr("A", 23U, kTypeValue), ""},
  1029. {KeyStr("B", 24U, kTypeSingleDeletion), ""},
  1030. {KeyStr("B", 23U, kTypeValue), ""},
  1031. {KeyStr("D", 24U, kTypeSingleDeletion), ""},
  1032. {KeyStr("E", 12U, kTypeSingleDeletion), ""},
  1033. {KeyStr("G", 32U, kTypeSingleDeletion), ""},
  1034. {KeyStr("G", 31U, kTypeValue), ""},
  1035. {KeyStr("H", 31U, kTypeValue), "val"},
  1036. {KeyStr("I", 35U, kTypeSingleDeletion), ""},
  1037. {KeyStr("I", 34U, kTypeValue), ""},
  1038. {KeyStr("I", 31U, kTypeSingleDeletion), ""},
  1039. {KeyStr("I", 13U, kTypeValue), "val4"},
  1040. {KeyStr("J", 34U, kTypeValue), "val"},
  1041. {KeyStr("J", 33U, kTypeSingleDeletion), ""},
  1042. {KeyStr("J", 25U, kTypeValue), "val2"},
  1043. {KeyStr("J", 24U, kTypeSingleDeletion), ""},
  1044. {KeyStr("J", 15U, kTypeValue), "val3"},
  1045. {KeyStr("J", 14U, kTypeSingleDeletion), ""},
  1046. });
  1047. SetLastSequence(24U);
  1048. constexpr int input_level = 0;
  1049. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1050. RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U, 30U},
  1051. 20U);
  1052. }
  1053. TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
  1054. NewDB();
  1055. auto file1 = mock::MakeMockFile({
  1056. {KeyStr("A", 10U, kTypeSingleDeletion), ""},
  1057. {KeyStr("dummy", 5U, kTypeValue), "val2"},
  1058. });
  1059. AddMockFile(file1);
  1060. auto file2 = mock::MakeMockFile({
  1061. {KeyStr("A", 0U, kTypeValue), "val"},
  1062. });
  1063. AddMockFile(file2);
  1064. auto expected_results = mock::MakeMockFile({
  1065. {KeyStr("dummy", 0U, kTypeValue), "val2"},
  1066. });
  1067. SetLastSequence(22U);
  1068. constexpr int input_level = 0;
  1069. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1070. RunCompaction({files}, {input_level}, {expected_results}, {});
  1071. }
  1072. TEST_F(CompactionJobTest, MultiSingleDelete) {
  1073. // Tests three scenarios involving multiple single delete/put pairs:
  1074. //
  1075. // A: Put Snapshot SDel Put SDel -> Put Snapshot SDel
  1076. // B: Snapshot Put SDel Put SDel Snapshot -> Snapshot SDel Snapshot
  1077. // C: SDel Put SDel Snapshot Put -> Snapshot Put
  1078. // D: (Put) SDel Snapshot Put SDel -> (Put) SDel Snapshot SDel
  1079. // E: Put SDel Snapshot Put SDel -> Snapshot SDel
  1080. // F: Put SDel Put Sdel Snapshot -> removed
  1081. // G: Snapshot SDel Put SDel Put -> Snapshot Put SDel
  1082. // H: (Put) Put SDel Put Sdel Snapshot -> Removed
  1083. // I: (Put) Snapshot Put SDel Put SDel -> SDel
  1084. // J: Put Put SDel Put SDel SDel Snapshot Put Put SDel SDel Put
  1085. // -> Snapshot Put
  1086. // K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel
  1087. // -> Snapshot Put Snapshot SDel
  1088. // L: SDel Put SDel Put SDel Snapshot SDel Put SDel SDel Put SDel
  1089. // -> Snapshot SDel Put SDel
  1090. // M: (Put) SDel Put SDel Put SDel Snapshot Put SDel SDel Put SDel SDel
  1091. // -> SDel Snapshot Put SDel
  1092. NewDB();
  1093. auto file1 = mock::MakeMockFile({
  1094. {KeyStr("A", 14U, kTypeSingleDeletion), ""},
  1095. {KeyStr("A", 13U, kTypeValue), "val5"},
  1096. {KeyStr("A", 12U, kTypeSingleDeletion), ""},
  1097. {KeyStr("B", 14U, kTypeSingleDeletion), ""},
  1098. {KeyStr("B", 13U, kTypeValue), "val2"},
  1099. {KeyStr("C", 14U, kTypeValue), "val3"},
  1100. {KeyStr("D", 12U, kTypeSingleDeletion), ""},
  1101. {KeyStr("D", 11U, kTypeValue), "val4"},
  1102. {KeyStr("G", 15U, kTypeValue), "val"},
  1103. {KeyStr("G", 14U, kTypeSingleDeletion), ""},
  1104. {KeyStr("G", 13U, kTypeValue), "val"},
  1105. {KeyStr("I", 14U, kTypeSingleDeletion), ""},
  1106. {KeyStr("I", 13U, kTypeValue), "val"},
  1107. {KeyStr("J", 15U, kTypeValue), "val"},
  1108. {KeyStr("J", 14U, kTypeSingleDeletion), ""},
  1109. {KeyStr("J", 13U, kTypeSingleDeletion), ""},
  1110. {KeyStr("J", 12U, kTypeValue), "val"},
  1111. {KeyStr("J", 11U, kTypeValue), "val"},
  1112. {KeyStr("K", 16U, kTypeSingleDeletion), ""},
  1113. {KeyStr("K", 15U, kTypeValue), "val1"},
  1114. {KeyStr("K", 14U, kTypeSingleDeletion), ""},
  1115. {KeyStr("K", 13U, kTypeSingleDeletion), ""},
  1116. {KeyStr("K", 12U, kTypeValue), "val2"},
  1117. {KeyStr("K", 11U, kTypeSingleDeletion), ""},
  1118. {KeyStr("L", 16U, kTypeSingleDeletion), ""},
  1119. {KeyStr("L", 15U, kTypeValue), "val"},
  1120. {KeyStr("L", 14U, kTypeSingleDeletion), ""},
  1121. {KeyStr("L", 13U, kTypeSingleDeletion), ""},
  1122. {KeyStr("L", 12U, kTypeValue), "val"},
  1123. {KeyStr("L", 11U, kTypeSingleDeletion), ""},
  1124. {KeyStr("M", 16U, kTypeSingleDeletion), ""},
  1125. {KeyStr("M", 15U, kTypeSingleDeletion), ""},
  1126. {KeyStr("M", 14U, kTypeValue), "val"},
  1127. {KeyStr("M", 13U, kTypeSingleDeletion), ""},
  1128. {KeyStr("M", 12U, kTypeSingleDeletion), ""},
  1129. {KeyStr("M", 11U, kTypeValue), "val"},
  1130. });
  1131. AddMockFile(file1);
  1132. auto file2 = mock::MakeMockFile({
  1133. {KeyStr("A", 10U, kTypeValue), "val"},
  1134. {KeyStr("B", 12U, kTypeSingleDeletion), ""},
  1135. {KeyStr("B", 11U, kTypeValue), "val2"},
  1136. {KeyStr("C", 10U, kTypeSingleDeletion), ""},
  1137. {KeyStr("C", 9U, kTypeValue), "val6"},
  1138. {KeyStr("C", 8U, kTypeSingleDeletion), ""},
  1139. {KeyStr("D", 10U, kTypeSingleDeletion), ""},
  1140. {KeyStr("E", 12U, kTypeSingleDeletion), ""},
  1141. {KeyStr("E", 11U, kTypeValue), "val"},
  1142. {KeyStr("E", 5U, kTypeSingleDeletion), ""},
  1143. {KeyStr("E", 4U, kTypeValue), "val"},
  1144. {KeyStr("F", 6U, kTypeSingleDeletion), ""},
  1145. {KeyStr("F", 5U, kTypeValue), "val"},
  1146. {KeyStr("F", 4U, kTypeSingleDeletion), ""},
  1147. {KeyStr("F", 3U, kTypeValue), "val"},
  1148. {KeyStr("G", 12U, kTypeSingleDeletion), ""},
  1149. {KeyStr("H", 6U, kTypeSingleDeletion), ""},
  1150. {KeyStr("H", 5U, kTypeValue), "val"},
  1151. {KeyStr("H", 4U, kTypeSingleDeletion), ""},
  1152. {KeyStr("H", 3U, kTypeValue), "val"},
  1153. {KeyStr("I", 12U, kTypeSingleDeletion), ""},
  1154. {KeyStr("I", 11U, kTypeValue), "val"},
  1155. {KeyStr("J", 6U, kTypeSingleDeletion), ""},
  1156. {KeyStr("J", 5U, kTypeSingleDeletion), ""},
  1157. {KeyStr("J", 4U, kTypeValue), "val"},
  1158. {KeyStr("J", 3U, kTypeSingleDeletion), ""},
  1159. {KeyStr("J", 2U, kTypeValue), "val"},
  1160. {KeyStr("K", 8U, kTypeValue), "val3"},
  1161. {KeyStr("K", 7U, kTypeValue), "val4"},
  1162. {KeyStr("K", 6U, kTypeSingleDeletion), ""},
  1163. {KeyStr("K", 5U, kTypeValue), "val5"},
  1164. {KeyStr("K", 2U, kTypeSingleDeletion), ""},
  1165. {KeyStr("K", 1U, kTypeSingleDeletion), ""},
  1166. {KeyStr("L", 5U, kTypeSingleDeletion), ""},
  1167. {KeyStr("L", 4U, kTypeValue), "val"},
  1168. {KeyStr("L", 3U, kTypeSingleDeletion), ""},
  1169. {KeyStr("L", 2U, kTypeValue), "val"},
  1170. {KeyStr("L", 1U, kTypeSingleDeletion), ""},
  1171. {KeyStr("M", 10U, kTypeSingleDeletion), ""},
  1172. {KeyStr("M", 7U, kTypeValue), "val"},
  1173. {KeyStr("M", 5U, kTypeSingleDeletion), ""},
  1174. {KeyStr("M", 4U, kTypeValue), "val"},
  1175. {KeyStr("M", 3U, kTypeSingleDeletion), ""},
  1176. });
  1177. AddMockFile(file2);
  1178. auto file3 = mock::MakeMockFile({
  1179. {KeyStr("D", 1U, kTypeValue), "val"},
  1180. {KeyStr("H", 1U, kTypeValue), "val"},
  1181. {KeyStr("I", 2U, kTypeValue), "val"},
  1182. });
  1183. AddMockFile(file3, 2);
  1184. auto file4 = mock::MakeMockFile({
  1185. {KeyStr("M", 1U, kTypeValue), "val"},
  1186. });
  1187. AddMockFile(file4, 2);
  1188. auto expected_results =
  1189. mock::MakeMockFile({{KeyStr("A", 14U, kTypeSingleDeletion), ""},
  1190. {KeyStr("A", 13U, kTypeValue), ""},
  1191. {KeyStr("A", 12U, kTypeSingleDeletion), ""},
  1192. {KeyStr("A", 10U, kTypeValue), "val"},
  1193. {KeyStr("B", 14U, kTypeSingleDeletion), ""},
  1194. {KeyStr("B", 13U, kTypeValue), ""},
  1195. {KeyStr("C", 14U, kTypeValue), "val3"},
  1196. {KeyStr("D", 12U, kTypeSingleDeletion), ""},
  1197. {KeyStr("D", 11U, kTypeValue), ""},
  1198. {KeyStr("D", 10U, kTypeSingleDeletion), ""},
  1199. {KeyStr("E", 12U, kTypeSingleDeletion), ""},
  1200. {KeyStr("E", 11U, kTypeValue), ""},
  1201. {KeyStr("G", 15U, kTypeValue), "val"},
  1202. {KeyStr("G", 12U, kTypeSingleDeletion), ""},
  1203. {KeyStr("I", 14U, kTypeSingleDeletion), ""},
  1204. {KeyStr("I", 13U, kTypeValue), ""},
  1205. {KeyStr("J", 15U, kTypeValue), "val"},
  1206. {KeyStr("K", 16U, kTypeSingleDeletion), ""},
  1207. {KeyStr("K", 15U, kTypeValue), ""},
  1208. {KeyStr("K", 11U, kTypeSingleDeletion), ""},
  1209. {KeyStr("K", 8U, kTypeValue), "val3"},
  1210. {KeyStr("L", 16U, kTypeSingleDeletion), ""},
  1211. {KeyStr("L", 15U, kTypeValue), ""},
  1212. {KeyStr("L", 11U, kTypeSingleDeletion), ""},
  1213. {KeyStr("M", 15U, kTypeSingleDeletion), ""},
  1214. {KeyStr("M", 14U, kTypeValue), ""},
  1215. {KeyStr("M", 3U, kTypeSingleDeletion), ""}});
  1216. SetLastSequence(22U);
  1217. constexpr int input_level = 0;
  1218. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1219. RunCompaction({files}, {input_level}, {expected_results}, {10U}, 10U);
  1220. }
  1221. // This test documents the behavior where a corrupt key follows a deletion or a
  1222. // single deletion and the (single) deletion gets removed while the corrupt key
  1223. // gets written out. TODO(noetzli): We probably want a better way to treat
  1224. // corrupt keys.
  1225. TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
  1226. NewDB();
  1227. auto file1 =
  1228. mock::MakeMockFile({{test::KeyStr("A", 6U, kTypeValue), "val3"},
  1229. {test::KeyStr("a", 5U, kTypeDeletion), ""},
  1230. {test::KeyStr("a", 4U, kTypeValue, true), "val"}});
  1231. AddMockFile(file1);
  1232. auto file2 =
  1233. mock::MakeMockFile({{test::KeyStr("b", 3U, kTypeSingleDeletion), ""},
  1234. {test::KeyStr("b", 2U, kTypeValue, true), "val"},
  1235. {test::KeyStr("c", 1U, kTypeValue), "val2"}});
  1236. AddMockFile(file2);
  1237. auto expected_results =
  1238. mock::MakeMockFile({{test::KeyStr("A", 0U, kTypeValue), "val3"},
  1239. {test::KeyStr("a", 0U, kTypeValue, true), "val"},
  1240. {test::KeyStr("b", 0U, kTypeValue, true), "val"},
  1241. {test::KeyStr("c", 0U, kTypeValue), "val2"}});
  1242. SetLastSequence(6U);
  1243. constexpr int input_level = 0;
  1244. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1245. RunCompaction({files}, {input_level}, {expected_results});
  1246. }
  1247. TEST_F(CompactionJobTest, OldestBlobFileNumber) {
  1248. NewDB();
  1249. // Note: blob1 is inlined TTL, so it will not be considered for the purposes
  1250. // of identifying the oldest referenced blob file. Similarly, blob6 will be
  1251. // ignored because it has TTL and hence refers to a TTL blob file.
  1252. const stl_wrappers::KVMap::value_type blob1(
  1253. KeyStr("a", 1U, kTypeBlobIndex), BlobStrInlinedTTL("foo", 1234567890ULL));
  1254. const stl_wrappers::KVMap::value_type blob2(KeyStr("b", 2U, kTypeBlobIndex),
  1255. BlobStr(59, 123456, 999));
  1256. const stl_wrappers::KVMap::value_type blob3(KeyStr("c", 3U, kTypeBlobIndex),
  1257. BlobStr(138, 1000, 1 << 8));
  1258. auto file1 = mock::MakeMockFile({blob1, blob2, blob3});
  1259. AddMockFile(file1);
  1260. const stl_wrappers::KVMap::value_type blob4(KeyStr("d", 4U, kTypeBlobIndex),
  1261. BlobStr(199, 3 << 10, 1 << 20));
  1262. const stl_wrappers::KVMap::value_type blob5(KeyStr("e", 5U, kTypeBlobIndex),
  1263. BlobStr(19, 6789, 333));
  1264. const stl_wrappers::KVMap::value_type blob6(
  1265. KeyStr("f", 6U, kTypeBlobIndex),
  1266. BlobStrTTL(5, 2048, 1 << 7, 1234567890ULL));
  1267. auto file2 = mock::MakeMockFile({blob4, blob5, blob6});
  1268. AddMockFile(file2);
  1269. const stl_wrappers::KVMap::value_type expected_blob1(
  1270. KeyStr("a", 0U, kTypeBlobIndex), blob1.second);
  1271. const stl_wrappers::KVMap::value_type expected_blob2(
  1272. KeyStr("b", 0U, kTypeBlobIndex), blob2.second);
  1273. const stl_wrappers::KVMap::value_type expected_blob3(
  1274. KeyStr("c", 0U, kTypeBlobIndex), blob3.second);
  1275. const stl_wrappers::KVMap::value_type expected_blob4(
  1276. KeyStr("d", 0U, kTypeBlobIndex), blob4.second);
  1277. const stl_wrappers::KVMap::value_type expected_blob5(
  1278. KeyStr("e", 0U, kTypeBlobIndex), blob5.second);
  1279. const stl_wrappers::KVMap::value_type expected_blob6(
  1280. KeyStr("f", 0U, kTypeBlobIndex), blob6.second);
  1281. auto expected_results =
  1282. mock::MakeMockFile({expected_blob1, expected_blob2, expected_blob3,
  1283. expected_blob4, expected_blob5, expected_blob6});
  1284. SetLastSequence(6U);
  1285. constexpr int input_level = 0;
  1286. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1287. RunCompaction({files}, {input_level}, {expected_results},
  1288. std::vector<SequenceNumber>(), kMaxSequenceNumber,
  1289. /* output_level */ 1, /* verify */ true,
  1290. /* expected_oldest_blob_file_numbers */ {19});
  1291. }
  1292. TEST_F(CompactionJobTest, VerifyProximalLevelOutput) {
  1293. cf_options_.last_level_temperature = Temperature::kCold;
  1294. SyncPoint::GetInstance()->SetCallBack(
  1295. "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
  1296. auto supports_per_key_placement = static_cast<bool*>(arg);
  1297. *supports_per_key_placement = true;
  1298. });
  1299. std::atomic_uint64_t latest_cold_seq = 0;
  1300. SyncPoint::GetInstance()->SetCallBack(
  1301. "CompactionIterator::PrepareOutput.context", [&](void* arg) {
  1302. auto context = static_cast<PerKeyPlacementContext*>(arg);
  1303. context->output_to_proximal_level = context->seq_num > latest_cold_seq;
  1304. });
  1305. SyncPoint::GetInstance()->EnableProcessing();
  1306. NewDB();
  1307. // Add files on different levels that may overlap
  1308. auto file0_1 = mock::MakeMockFile({{KeyStr("z", 12U, kTypeValue), "val"}});
  1309. AddMockFile(file0_1);
  1310. auto file1_1 = mock::MakeMockFile({{KeyStr("b", 10U, kTypeValue), "val"},
  1311. {KeyStr("f", 11U, kTypeValue), "val"}});
  1312. AddMockFile(file1_1, 1);
  1313. auto file1_2 = mock::MakeMockFile({{KeyStr("j", 12U, kTypeValue), "val"},
  1314. {KeyStr("k", 13U, kTypeValue), "val"}});
  1315. AddMockFile(file1_2, 1);
  1316. auto file1_3 = mock::MakeMockFile({{KeyStr("p", 14U, kTypeValue), "val"},
  1317. {KeyStr("u", 15U, kTypeValue), "val"}});
  1318. AddMockFile(file1_3, 1);
  1319. auto file2_1 = mock::MakeMockFile({{KeyStr("f", 8U, kTypeValue), "val"},
  1320. {KeyStr("h", 9U, kTypeValue), "val"}});
  1321. AddMockFile(file2_1, 2);
  1322. auto file2_2 = mock::MakeMockFile({{KeyStr("m", 6U, kTypeValue), "val"},
  1323. {KeyStr("p", 7U, kTypeValue), "val"}});
  1324. AddMockFile(file2_2, 2);
  1325. auto file3_1 = mock::MakeMockFile({{KeyStr("g", 2U, kTypeValue), "val"},
  1326. {KeyStr("k", 3U, kTypeValue), "val"}});
  1327. AddMockFile(file3_1, 3);
  1328. auto file3_2 = mock::MakeMockFile({{KeyStr("v", 4U, kTypeValue), "val"},
  1329. {KeyStr("x", 5U, kTypeValue), "val"}});
  1330. AddMockFile(file3_2, 3);
  1331. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  1332. const std::vector<int> input_levels = {0, 1, 2, 3};
  1333. auto files0 = cfd->current()->storage_info()->LevelFiles(input_levels[0]);
  1334. auto files1 = cfd->current()->storage_info()->LevelFiles(input_levels[1]);
  1335. auto files2 = cfd->current()->storage_info()->LevelFiles(input_levels[2]);
  1336. auto files3 = cfd->current()->storage_info()->LevelFiles(input_levels[3]);
  1337. RunLastLevelCompaction(
  1338. {files0, files1, files2, files3}, input_levels,
  1339. /*verify_func=*/[&](Compaction& comp) {
  1340. for (char c = 'a'; c <= 'z'; c++) {
  1341. if (c == 'a') {
  1342. comp.TEST_AssertWithinProximalLevelOutputRange(
  1343. "a", true /*expect_failure*/);
  1344. } else {
  1345. std::string c_str{c};
  1346. comp.TEST_AssertWithinProximalLevelOutputRange(c_str);
  1347. }
  1348. }
  1349. });
  1350. }
  1351. TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
  1352. db_options_.enforce_single_del_contracts = false;
  1353. NewDB();
  1354. auto file =
  1355. mock::MakeMockFile({{KeyStr("a", 4U, kTypeSingleDeletion), ""},
  1356. {KeyStr("a", 3U, kTypeDeletion), "dontcare"}});
  1357. AddMockFile(file);
  1358. SetLastSequence(4U);
  1359. auto expected_results = mock::MakeMockFile();
  1360. constexpr int input_level = 0;
  1361. auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1362. RunCompaction({files}, {input_level}, {expected_results});
  1363. }
  1364. TEST_F(CompactionJobTest, InputSerialization) {
  1365. // Setup a random CompactionServiceInput
  1366. CompactionServiceInput input;
  1367. const int kStrMaxLen = 1000;
  1368. Random rnd(static_cast<uint32_t>(time(nullptr)));
  1369. Random64 rnd64(time(nullptr));
  1370. input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
  1371. while (!rnd.OneIn(10)) {
  1372. input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
  1373. }
  1374. while (!rnd.OneIn(10)) {
  1375. input.input_files.emplace_back(rnd.RandomString(
  1376. rnd.Uniform(kStrMaxLen - 1) +
  1377. 1)); // input file name should have at least one character
  1378. }
  1379. input.output_level = 4;
  1380. input.has_begin = rnd.OneIn(2);
  1381. if (input.has_begin) {
  1382. input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
  1383. }
  1384. input.has_end = rnd.OneIn(2);
  1385. if (input.has_end) {
  1386. input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
  1387. }
  1388. std::string output;
  1389. ASSERT_OK(input.Write(&output));
  1390. // Test deserialization
  1391. CompactionServiceInput deserialized1;
  1392. ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1));
  1393. ASSERT_TRUE(deserialized1.TEST_Equals(&input));
  1394. // Test mismatch
  1395. deserialized1.output_level += 10;
  1396. std::string mismatch;
  1397. ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
  1398. ASSERT_EQ(mismatch, "output_level");
  1399. // Test unknown field
  1400. CompactionServiceInput deserialized2;
  1401. output.clear();
  1402. ASSERT_OK(input.Write(&output));
  1403. output.append("new_field=123;");
  1404. ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2));
  1405. ASSERT_TRUE(deserialized2.TEST_Equals(&input));
  1406. // Test missing field
  1407. CompactionServiceInput deserialized3;
  1408. deserialized3.output_level = 0;
  1409. std::string to_remove = "output_level=4;";
  1410. size_t pos = output.find(to_remove);
  1411. ASSERT_TRUE(pos != std::string::npos);
  1412. output.erase(pos, to_remove.length());
  1413. ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3));
  1414. mismatch.clear();
  1415. ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch));
  1416. ASSERT_EQ(mismatch, "output_level");
  1417. // manually set the value back, should match the original structure
  1418. deserialized3.output_level = 4;
  1419. ASSERT_TRUE(deserialized3.TEST_Equals(&input));
  1420. // Test invalid version
  1421. output.clear();
  1422. ASSERT_OK(input.Write(&output));
  1423. uint32_t data_version = DecodeFixed32(output.data());
  1424. const size_t kDataVersionSize = sizeof(data_version);
  1425. ASSERT_EQ(data_version,
  1426. 1U); // Update once the default data version is changed
  1427. char buf[kDataVersionSize];
  1428. EncodeFixed32(buf, data_version + 10); // make sure it's not valid
  1429. output.replace(0, kDataVersionSize, buf, kDataVersionSize);
  1430. Status s = CompactionServiceInput::Read(output, &deserialized3);
  1431. ASSERT_TRUE(s.IsNotSupported());
  1432. }
  1433. TEST_F(CompactionJobTest, ResultSerialization) {
  1434. // Setup a random CompactionServiceResult
  1435. CompactionServiceResult result;
  1436. const int kStrMaxLen = 1000;
  1437. Random rnd(static_cast<uint32_t>(time(nullptr)));
  1438. Random64 rnd64(time(nullptr));
  1439. std::vector<Status> status_list = {
  1440. Status::OK(),
  1441. Status::InvalidArgument("invalid option"),
  1442. Status::Aborted("failed to run"),
  1443. Status::NotSupported("not supported option"),
  1444. };
  1445. result.status =
  1446. status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
  1447. std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
  1448. std::string file_checksum_func_name = "MyAwesomeChecksumGenerator";
  1449. while (!rnd.OneIn(10)) {
  1450. TableProperties tp;
  1451. tp.user_collected_properties.emplace(
  1452. "UCP_Key1", rnd.RandomString(rnd.Uniform(kStrMaxLen)));
  1453. tp.user_collected_properties.emplace(
  1454. "UCP_Key2", rnd.RandomString(rnd.Uniform(kStrMaxLen)));
  1455. tp.readable_properties.emplace("RP_Key1",
  1456. rnd.RandomString(rnd.Uniform(kStrMaxLen)));
  1457. tp.readable_properties.emplace("RP_K2y2",
  1458. rnd.RandomString(rnd.Uniform(kStrMaxLen)));
  1459. UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
  1460. result.output_files.emplace_back(
  1461. rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */,
  1462. rnd64.Uniform(UINT64_MAX) /* file_size */,
  1463. rnd64.Uniform(UINT64_MAX) /* smallest_seqno */,
  1464. rnd64.Uniform(UINT64_MAX) /* largest_seqno */,
  1465. rnd.RandomBinaryString(
  1466. rnd.Uniform(kStrMaxLen)) /* smallest_internal_key */,
  1467. rnd.RandomBinaryString(
  1468. rnd.Uniform(kStrMaxLen)) /* largest_internal_key */,
  1469. rnd64.Uniform(UINT64_MAX) /* oldest_ancester_time */,
  1470. rnd64.Uniform(UINT64_MAX) /* file_creation_time */,
  1471. rnd64.Uniform(UINT64_MAX) /* epoch_number */,
  1472. file_checksum /* file_checksum */,
  1473. file_checksum_func_name /* file_checksum_func_name */,
  1474. rnd64.Uniform(UINT64_MAX) /* paranoid_hash */,
  1475. rnd.OneIn(2) /* marked_for_compaction */, id /* unique_id */, tp,
  1476. false /* is_proximal_level_output */, Temperature::kHot);
  1477. }
  1478. result.output_level = rnd.Uniform(10);
  1479. result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
  1480. result.stats.num_output_records = rnd64.Uniform(UINT64_MAX);
  1481. result.bytes_read = 123;
  1482. result.bytes_written = rnd64.Uniform(UINT64_MAX);
  1483. result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX);
  1484. result.stats.num_output_files = rnd.Uniform(1000);
  1485. result.stats.is_full_compaction = rnd.OneIn(2);
  1486. result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX);
  1487. result.stats.num_input_files = 9;
  1488. std::string output;
  1489. ASSERT_OK(result.Write(&output));
  1490. // Test deserialization
  1491. CompactionServiceResult deserialized1;
  1492. ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1));
  1493. ASSERT_TRUE(deserialized1.TEST_Equals(&result));
  1494. for (size_t i = 0; i < result.output_files.size(); i++) {
  1495. for (const auto& prop :
  1496. result.output_files[i].table_properties.user_collected_properties) {
  1497. ASSERT_EQ(deserialized1.output_files[i]
  1498. .table_properties.user_collected_properties[prop.first],
  1499. prop.second);
  1500. }
  1501. for (const auto& prop :
  1502. result.output_files[i].table_properties.readable_properties) {
  1503. ASSERT_EQ(deserialized1.output_files[i]
  1504. .table_properties.readable_properties[prop.first],
  1505. prop.second);
  1506. }
  1507. }
  1508. // Test mismatch
  1509. deserialized1.stats.num_input_files += 10;
  1510. std::string mismatch;
  1511. ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
  1512. ASSERT_EQ(mismatch, "stats.num_input_files");
  1513. // Test unique id mismatch
  1514. if (!result.output_files.empty()) {
  1515. CompactionServiceResult deserialized_tmp;
  1516. ASSERT_OK(CompactionServiceResult::Read(output, &deserialized_tmp));
  1517. deserialized_tmp.output_files[0].unique_id[0] += 1;
  1518. ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
  1519. ASSERT_EQ(mismatch, "output_files.unique_id");
  1520. deserialized_tmp.status.PermitUncheckedError();
  1521. ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum, file_checksum);
  1522. ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum_func_name,
  1523. file_checksum_func_name);
  1524. ASSERT_EQ(deserialized_tmp.output_files[0].file_temperature,
  1525. Temperature::kHot);
  1526. }
  1527. // Test unknown field
  1528. CompactionServiceResult deserialized2;
  1529. output.clear();
  1530. ASSERT_OK(result.Write(&output));
  1531. output.append("new_field=123;");
  1532. ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2));
  1533. ASSERT_TRUE(deserialized2.TEST_Equals(&result));
  1534. // Test missing field
  1535. CompactionServiceResult deserialized3;
  1536. deserialized3.bytes_read = 0;
  1537. std::string to_remove = "bytes_read=123;";
  1538. size_t pos = output.find(to_remove);
  1539. ASSERT_TRUE(pos != std::string::npos);
  1540. output.erase(pos, to_remove.length());
  1541. ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3));
  1542. mismatch.clear();
  1543. ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch));
  1544. ASSERT_EQ(mismatch, "bytes_read");
  1545. deserialized3.bytes_read = 123;
  1546. ASSERT_TRUE(deserialized3.TEST_Equals(&result));
  1547. // Test invalid version
  1548. output.clear();
  1549. ASSERT_OK(result.Write(&output));
  1550. uint32_t data_version = DecodeFixed32(output.data());
  1551. const size_t kDataVersionSize = sizeof(data_version);
  1552. ASSERT_EQ(data_version,
  1553. 1U); // Update once the default data version is changed
  1554. char buf[kDataVersionSize];
  1555. EncodeFixed32(buf, data_version + 10); // make sure it's not valid
  1556. output.replace(0, kDataVersionSize, buf, kDataVersionSize);
  1557. Status s = CompactionServiceResult::Read(output, &deserialized3);
  1558. ASSERT_TRUE(s.IsNotSupported());
  1559. for (const auto& item : status_list) {
  1560. item.PermitUncheckedError();
  1561. }
  1562. }
  1563. TEST_F(CompactionJobTest, CutForMaxCompactionBytes) {
  1564. // dynamic_file_size option should have no impact on cutting for max
  1565. // compaction bytes.
  1566. NewDB();
  1567. mutable_cf_options_.target_file_size_base = 80;
  1568. mutable_cf_options_.max_compaction_bytes = 21;
  1569. auto file1 = mock::MakeMockFile({
  1570. {KeyStr("c", 5U, kTypeValue), "val2"},
  1571. {KeyStr("n", 6U, kTypeValue), "val3"},
  1572. });
  1573. AddMockFile(file1);
  1574. auto file2 = mock::MakeMockFile({{KeyStr("h", 3U, kTypeValue), "val"},
  1575. {KeyStr("j", 4U, kTypeValue), "val"}});
  1576. AddMockFile(file2, 1);
  1577. // Create three L2 files, each size 10.
  1578. // max_compaction_bytes 21 means the compaction output in L1 will
  1579. // be cut to at least two files.
  1580. auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
  1581. {KeyStr("c", 1U, kTypeValue), "val"},
  1582. {KeyStr("c1", 1U, kTypeValue), "val"},
  1583. {KeyStr("c2", 1U, kTypeValue), "val"},
  1584. {KeyStr("c3", 1U, kTypeValue), "val"},
  1585. {KeyStr("c4", 1U, kTypeValue), "val"},
  1586. {KeyStr("d", 1U, kTypeValue), "val"},
  1587. {KeyStr("e", 2U, kTypeValue), "val"}});
  1588. AddMockFile(file3, 2);
  1589. auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
  1590. {KeyStr("i", 1U, kTypeValue), "val"},
  1591. {KeyStr("i1", 1U, kTypeValue), "val"},
  1592. {KeyStr("i2", 1U, kTypeValue), "val"},
  1593. {KeyStr("i3", 1U, kTypeValue), "val"},
  1594. {KeyStr("i4", 1U, kTypeValue), "val"},
  1595. {KeyStr("j", 1U, kTypeValue), "val"},
  1596. {KeyStr("k", 2U, kTypeValue), "val"}});
  1597. AddMockFile(file4, 2);
  1598. auto file5 = mock::MakeMockFile({{KeyStr("l", 1U, kTypeValue), "val"},
  1599. {KeyStr("m", 1U, kTypeValue), "val"},
  1600. {KeyStr("m1", 1U, kTypeValue), "val"},
  1601. {KeyStr("m2", 1U, kTypeValue), "val"},
  1602. {KeyStr("m3", 1U, kTypeValue), "val"},
  1603. {KeyStr("m4", 1U, kTypeValue), "val"},
  1604. {KeyStr("n", 1U, kTypeValue), "val"},
  1605. {KeyStr("o", 2U, kTypeValue), "val"}});
  1606. AddMockFile(file5, 2);
  1607. // The expected output should be:
  1608. // L1: [c, h, j] [n]
  1609. // L2: [b ... e] [h ... k] [l ... o]
  1610. // It's better to have "j" in the first file, because anyway it's overlapping
  1611. // with the second file on L2.
  1612. // (Note: before this PR, it was cut at "h" because it's using the internal
  1613. // comparator which think L1 "h" with seqno 3 is smaller than L2 "h" with
  1614. // seqno 1, but actually they're overlapped with the compaction picker).
  1615. auto expected_file1 =
  1616. mock::MakeMockFile({{KeyStr("c", 5U, kTypeValue), "val2"},
  1617. {KeyStr("h", 3U, kTypeValue), "val"},
  1618. {KeyStr("j", 4U, kTypeValue), "val"}});
  1619. auto expected_file2 =
  1620. mock::MakeMockFile({{KeyStr("n", 6U, kTypeValue), "val3"}});
  1621. SetLastSequence(6U);
  1622. const std::vector<int> input_levels = {0, 1};
  1623. auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
  1624. auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
  1625. RunCompaction({lvl0_files, lvl1_files}, input_levels,
  1626. {expected_file1, expected_file2});
  1627. }
  1628. TEST_F(CompactionJobTest, CutToSkipGrandparentFile) {
  1629. NewDB();
  1630. // Make sure the grandparent level file size (10) qualifies skipping.
  1631. // Currently, it has to be > 1/8 of target file size.
  1632. mutable_cf_options_.target_file_size_base = 70;
  1633. auto file1 = mock::MakeMockFile({
  1634. {KeyStr("a", 5U, kTypeValue), "val2"},
  1635. {KeyStr("z", 6U, kTypeValue), "val3"},
  1636. });
  1637. AddMockFile(file1);
  1638. auto file2 = mock::MakeMockFile({{KeyStr("c", 3U, kTypeValue), "val"},
  1639. {KeyStr("x", 4U, kTypeValue), "val"}});
  1640. AddMockFile(file2, 1);
  1641. auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
  1642. {KeyStr("d", 2U, kTypeValue), "val"}});
  1643. AddMockFile(file3, 2);
  1644. auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
  1645. {KeyStr("i", 2U, kTypeValue), "val"}});
  1646. AddMockFile(file4, 2);
  1647. auto file5 = mock::MakeMockFile({{KeyStr("v", 1U, kTypeValue), "val"},
  1648. {KeyStr("y", 2U, kTypeValue), "val"}});
  1649. AddMockFile(file5, 2);
  1650. auto expected_file1 =
  1651. mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
  1652. {KeyStr("c", 3U, kTypeValue), "val"}});
  1653. auto expected_file2 =
  1654. mock::MakeMockFile({{KeyStr("x", 4U, kTypeValue), "val"},
  1655. {KeyStr("z", 6U, kTypeValue), "val3"}});
  1656. SetLastSequence(6U);
  1657. const std::vector<int> input_levels = {0, 1};
  1658. auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
  1659. auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
  1660. RunCompaction({lvl0_files, lvl1_files}, input_levels,
  1661. {expected_file1, expected_file2});
  1662. }
  1663. TEST_F(CompactionJobTest, CutToAlignGrandparentBoundary) {
  1664. NewDB();
  1665. // MockTable has 1 byte per entry by default and each file is 10 bytes.
  1666. // When the file size is smaller than 100, it won't cut file earlier to align
  1667. // with its grandparent boundary.
  1668. const size_t kKeyValueSize = 10000;
  1669. mock_table_factory_->SetKeyValueSize(kKeyValueSize);
  1670. mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
  1671. mock::KVVector file1;
  1672. char ch = 'd';
  1673. // Add value from d -> o
  1674. for (char i = 0; i < 12; i++) {
  1675. file1.emplace_back(KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
  1676. "val" + std::to_string(i));
  1677. }
  1678. AddMockFile(file1);
  1679. auto file2 = mock::MakeMockFile({{KeyStr("e", 3U, kTypeValue), "val"},
  1680. {KeyStr("s", 4U, kTypeValue), "val"}});
  1681. AddMockFile(file2, 1);
  1682. // the 1st grandparent file should be skipped
  1683. auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
  1684. {KeyStr("b", 2U, kTypeValue), "val"}});
  1685. AddMockFile(file3, 2);
  1686. auto file4 = mock::MakeMockFile({{KeyStr("c", 1U, kTypeValue), "val"},
  1687. {KeyStr("e", 2U, kTypeValue), "val"}});
  1688. AddMockFile(file4, 2);
  1689. auto file5 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
  1690. {KeyStr("j", 2U, kTypeValue), "val"}});
  1691. AddMockFile(file5, 2);
  1692. auto file6 = mock::MakeMockFile({{KeyStr("k", 1U, kTypeValue), "val"},
  1693. {KeyStr("n", 2U, kTypeValue), "val"}});
  1694. AddMockFile(file6, 2);
  1695. auto file7 = mock::MakeMockFile({{KeyStr("q", 1U, kTypeValue), "val"},
  1696. {KeyStr("t", 2U, kTypeValue), "val"}});
  1697. AddMockFile(file7, 2);
  1698. // The expected outputs are:
  1699. // L1: [d,e,f,g,h,i,j] [k,l,m,n,o,s]
  1700. // L2: [a, b] [c, e] [h, j] [k, n] [q, t]
  1701. // The first output cut earlier at "j", so it could be aligned with L2 files.
  1702. // If dynamic_file_size is not enabled, it will be cut based on the
  1703. // target_file_size
  1704. mock::KVVector expected_file1;
  1705. for (char i = 0; i < 7; i++) {
  1706. expected_file1.emplace_back(
  1707. KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
  1708. "val" + std::to_string(i));
  1709. }
  1710. mock::KVVector expected_file2;
  1711. for (char i = 7; i < 12; i++) {
  1712. expected_file2.emplace_back(
  1713. KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
  1714. "val" + std::to_string(i));
  1715. }
  1716. expected_file2.emplace_back(KeyStr("s", 4U, kTypeValue), "val");
  1717. SetLastSequence(22U);
  1718. const std::vector<int> input_levels = {0, 1};
  1719. auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
  1720. auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
  1721. RunCompaction({lvl0_files, lvl1_files}, input_levels,
  1722. {expected_file1, expected_file2});
  1723. }
  1724. TEST_F(CompactionJobTest, CutToAlignGrandparentBoundarySameKey) {
  1725. NewDB();
  1726. // MockTable has 1 byte per entry by default and each file is 10 bytes.
  1727. // When the file size is smaller than 100, it won't cut file earlier to align
  1728. // with its grandparent boundary.
  1729. const size_t kKeyValueSize = 10000;
  1730. mock_table_factory_->SetKeyValueSize(kKeyValueSize);
  1731. mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
  1732. mock::KVVector file1;
  1733. for (int i = 0; i < 7; i++) {
  1734. file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
  1735. "val" + std::to_string(100 - i));
  1736. }
  1737. file1.emplace_back(KeyStr("b", 90, kTypeValue), "valb");
  1738. AddMockFile(file1);
  1739. auto file2 = mock::MakeMockFile({{KeyStr("a", 93U, kTypeValue), "val93"},
  1740. {KeyStr("b", 90U, kTypeValue), "valb"}});
  1741. AddMockFile(file2, 1);
  1742. auto file3 = mock::MakeMockFile({{KeyStr("a", 89U, kTypeValue), "val"},
  1743. {KeyStr("a", 88U, kTypeValue), "val"}});
  1744. AddMockFile(file3, 2);
  1745. auto file4 = mock::MakeMockFile({{KeyStr("a", 87U, kTypeValue), "val"},
  1746. {KeyStr("a", 86U, kTypeValue), "val"}});
  1747. AddMockFile(file4, 2);
  1748. auto file5 = mock::MakeMockFile({{KeyStr("b", 85U, kTypeValue), "val"},
  1749. {KeyStr("b", 84U, kTypeValue), "val"}});
  1750. AddMockFile(file5, 2);
  1751. mock::KVVector expected_file1;
  1752. for (int i = 0; i < 8; i++) {
  1753. expected_file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
  1754. "val" + std::to_string(100 - i));
  1755. }
  1756. // make sure `b` is cut in a separated file (so internally it's not using
  1757. // internal comparator, which will think the "b:90" (seqno 90) here is smaller
  1758. // than "b:85" on L2.)
  1759. auto expected_file2 =
  1760. mock::MakeMockFile({{KeyStr("b", 90U, kTypeValue), "valb"}});
  1761. SetLastSequence(122U);
  1762. const std::vector<int> input_levels = {0, 1};
  1763. auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
  1764. auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
  1765. // Just keep all the history
  1766. std::vector<SequenceNumber> snapshots;
  1767. for (int i = 80; i <= 100; i++) {
  1768. snapshots.emplace_back(i);
  1769. }
  1770. RunCompaction({lvl0_files, lvl1_files}, input_levels,
  1771. {expected_file1, expected_file2}, std::move(snapshots));
  1772. }
  1773. TEST_F(CompactionJobTest, CutForMaxCompactionBytesSameKey) {
  1774. // dynamic_file_size option should have no impact on cutting for max
  1775. // compaction bytes.
  1776. NewDB();
  1777. mutable_cf_options_.target_file_size_base = 80;
  1778. mutable_cf_options_.max_compaction_bytes = 20;
  1779. auto file1 = mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
  1780. {KeyStr("b", 103U, kTypeValue), "val"}});
  1781. AddMockFile(file1);
  1782. auto file2 = mock::MakeMockFile({{KeyStr("a", 102U, kTypeValue), "val2"},
  1783. {KeyStr("c", 101U, kTypeValue), "val"}});
  1784. AddMockFile(file2, 1);
  1785. for (int i = 0; i < 10; i++) {
  1786. auto file =
  1787. mock::MakeMockFile({{KeyStr("a", 100 - (i * 2), kTypeValue), "val"},
  1788. {KeyStr("a", 99 - (i * 2), kTypeValue), "val"}});
  1789. AddMockFile(file, 2);
  1790. }
  1791. for (int i = 0; i < 10; i++) {
  1792. auto file =
  1793. mock::MakeMockFile({{KeyStr("b", 80 - (i * 2), kTypeValue), "val"},
  1794. {KeyStr("b", 79 - (i * 2), kTypeValue), "val"}});
  1795. AddMockFile(file, 2);
  1796. }
  1797. auto file5 = mock::MakeMockFile({{KeyStr("c", 60U, kTypeValue), "valc"},
  1798. {KeyStr("c", 59U, kTypeValue), "valc"}});
  1799. // "a" has 10 overlapped grandparent files (each size 10), which is far
  1800. // exceeded the `max_compaction_bytes`, but make sure 2 "a" are not separated,
  1801. // as splitting them won't help reducing the compaction size.
  1802. // also make sure "b" and "c" are cut separately.
  1803. mock::KVVector expected_file1 =
  1804. mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
  1805. {KeyStr("a", 102U, kTypeValue), "val2"}});
  1806. mock::KVVector expected_file2 =
  1807. mock::MakeMockFile({{KeyStr("b", 103U, kTypeValue), "val"}});
  1808. mock::KVVector expected_file3 =
  1809. mock::MakeMockFile({{KeyStr("c", 101U, kTypeValue), "val"}});
  1810. SetLastSequence(122U);
  1811. const std::vector<int> input_levels = {0, 1};
  1812. auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
  1813. auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
  1814. // Just keep all the history
  1815. std::vector<SequenceNumber> snapshots;
  1816. for (int i = 80; i <= 105; i++) {
  1817. snapshots.emplace_back(i);
  1818. }
  1819. RunCompaction({lvl0_files, lvl1_files}, input_levels,
  1820. {expected_file1, expected_file2, expected_file3},
  1821. std::move(snapshots));
  1822. }
  1823. class CompactionJobTimestampTest : public CompactionJobTestBase {
  1824. public:
  1825. CompactionJobTimestampTest()
  1826. : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
  1827. test::BytewiseComparatorWithU64TsWrapper(),
  1828. test::EncodeInt, /*test_io_priority=*/false,
  1829. TableTypeForTest::kMockTable) {}
  1830. };
  1831. TEST_F(CompactionJobTimestampTest, GCDisabled) {
  1832. NewDB();
  1833. auto file1 =
  1834. mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
  1835. {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
  1836. {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
  1837. {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}});
  1838. AddMockFile(file1);
  1839. auto file2 = mock::MakeMockFile(
  1840. {{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
  1841. {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
  1842. {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
  1843. {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
  1844. AddMockFile(file2);
  1845. SetLastSequence(10);
  1846. auto expected_results = mock::MakeMockFile(
  1847. {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
  1848. {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
  1849. {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
  1850. {KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
  1851. {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
  1852. {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
  1853. {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
  1854. {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
  1855. constexpr int input_level = 0;
  1856. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1857. RunCompaction({files}, {input_level}, {expected_results});
  1858. }
  1859. TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
  1860. NewDB();
  1861. auto file1 =
  1862. mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
  1863. {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
  1864. {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}});
  1865. AddMockFile(file1);
  1866. auto file2 =
  1867. mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
  1868. {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
  1869. AddMockFile(file2);
  1870. SetLastSequence(101);
  1871. auto expected_results =
  1872. mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
  1873. {KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
  1874. {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
  1875. {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
  1876. {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
  1877. constexpr int input_level = 0;
  1878. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1879. full_history_ts_low_ = encode_u64_ts_(0);
  1880. RunCompaction({files}, {input_level}, {expected_results});
  1881. }
  1882. TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
  1883. NewDB();
  1884. auto file1 = mock::MakeMockFile(
  1885. {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
  1886. {KeyStr("b", 6, ValueType::kTypeSingleDeletion, 99), ""},
  1887. {KeyStr("c", 7, ValueType::kTypeValue, 98), "c7"}});
  1888. AddMockFile(file1);
  1889. auto file2 = mock::MakeMockFile(
  1890. {{KeyStr("a", 4, ValueType::kTypeValue, 97), "a4"},
  1891. {KeyStr("b", 3, ValueType::kTypeValue, 96), "b3"},
  1892. {KeyStr("c", 2, ValueType::kTypeDeletionWithTimestamp, 95), ""},
  1893. {KeyStr("c", 1, ValueType::kTypeValue, 94), "c1"}});
  1894. AddMockFile(file2);
  1895. SetLastSequence(7);
  1896. auto expected_results =
  1897. mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
  1898. constexpr int input_level = 0;
  1899. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1900. full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
  1901. RunCompaction({files}, {input_level}, {expected_results});
  1902. }
  1903. TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
  1904. NewDB();
  1905. auto file1 =
  1906. mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
  1907. {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
  1908. AddMockFile(file1);
  1909. auto file2 = mock::MakeMockFile(
  1910. {{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"},
  1911. {KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"},
  1912. {KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}});
  1913. AddMockFile(file2);
  1914. SetLastSequence(6);
  1915. auto expected_results =
  1916. mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
  1917. {KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"},
  1918. {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
  1919. constexpr int input_level = 0;
  1920. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1921. full_history_ts_low_ = encode_u64_ts_(49);
  1922. RunCompaction({files}, {input_level}, {expected_results});
  1923. }
  1924. class CompactionJobTimestampTestWithBbTable : public CompactionJobTestBase {
  1925. public:
  1926. // Block-based table is needed if we want to test subcompaction partitioning
  1927. // with anchors.
  1928. explicit CompactionJobTimestampTestWithBbTable()
  1929. : CompactionJobTestBase(
  1930. test::PerThreadDBPath("compaction_job_ts_bbt_test"),
  1931. test::BytewiseComparatorWithU64TsWrapper(), test::EncodeInt,
  1932. /*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {}
  1933. };
  1934. TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionAnchorL1) {
  1935. cf_options_.target_file_size_base = 20;
  1936. mutable_cf_options_.target_file_size_base = 20;
  1937. NewDB();
  1938. const std::vector<std::string> keys = {
  1939. KeyStr("a", 20, ValueType::kTypeValue, 200),
  1940. KeyStr("b", 21, ValueType::kTypeValue, 210),
  1941. KeyStr("b", 20, ValueType::kTypeValue, 200),
  1942. KeyStr("b", 18, ValueType::kTypeValue, 180),
  1943. KeyStr("c", 17, ValueType::kTypeValue, 170),
  1944. KeyStr("c", 16, ValueType::kTypeValue, 160),
  1945. KeyStr("c", 15, ValueType::kTypeValue, 150)};
  1946. const std::vector<std::string> values = {"a20", "b21", "b20", "b18",
  1947. "c17", "c16", "c15"};
  1948. constexpr int input_level = 1;
  1949. auto file1 = mock::MakeMockFile(
  1950. {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
  1951. AddMockFile(file1, input_level);
  1952. auto file2 = mock::MakeMockFile(
  1953. {{keys[3], values[3]}, {keys[4], values[4]}, {keys[5], values[5]}});
  1954. AddMockFile(file2, input_level);
  1955. auto file3 = mock::MakeMockFile({{keys[6], values[6]}});
  1956. AddMockFile(file3, input_level);
  1957. SetLastSequence(20);
  1958. auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
  1959. auto output2 = mock::MakeMockFile(
  1960. {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
  1961. auto output3 = mock::MakeMockFile(
  1962. {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
  1963. auto expected_results =
  1964. std::vector<mock::KVVector>{output1, output2, output3};
  1965. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  1966. constexpr int output_level = 2;
  1967. constexpr int max_subcompactions = 4;
  1968. RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
  1969. /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
  1970. output_level, /*verify=*/true, {kInvalidBlobFileNumber},
  1971. /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
  1972. max_subcompactions);
  1973. }
  1974. TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionL0) {
  1975. cf_options_.target_file_size_base = 20;
  1976. mutable_cf_options_.target_file_size_base = 20;
  1977. NewDB();
  1978. const std::vector<std::string> keys = {
  1979. KeyStr("a", 20, ValueType::kTypeValue, 200),
  1980. KeyStr("b", 20, ValueType::kTypeValue, 200),
  1981. KeyStr("b", 19, ValueType::kTypeValue, 190),
  1982. KeyStr("b", 18, ValueType::kTypeValue, 180),
  1983. KeyStr("c", 17, ValueType::kTypeValue, 170),
  1984. KeyStr("c", 16, ValueType::kTypeValue, 160),
  1985. KeyStr("c", 15, ValueType::kTypeValue, 150)};
  1986. const std::vector<std::string> values = {"a20", "b20", "b19", "b18",
  1987. "c17", "c16", "c15"};
  1988. constexpr int input_level = 0;
  1989. auto file1 = mock::MakeMockFile({{keys[5], values[5]}, {keys[6], values[6]}});
  1990. AddMockFile(file1, input_level);
  1991. auto file2 = mock::MakeMockFile({{keys[3], values[3]}, {keys[4], values[4]}});
  1992. AddMockFile(file2, input_level);
  1993. auto file3 = mock::MakeMockFile(
  1994. {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
  1995. AddMockFile(file3, input_level);
  1996. SetLastSequence(20);
  1997. auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
  1998. auto output2 = mock::MakeMockFile(
  1999. {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
  2000. auto output3 = mock::MakeMockFile(
  2001. {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
  2002. auto expected_results =
  2003. std::vector<mock::KVVector>{output1, output2, output3};
  2004. const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
  2005. constexpr int output_level = 1;
  2006. constexpr int max_subcompactions = 4;
  2007. RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
  2008. /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
  2009. output_level, /*verify=*/true, {kInvalidBlobFileNumber},
  2010. /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
  2011. max_subcompactions);
  2012. }
  2013. // The io priority of the compaction reads and writes are different from
  2014. // other DB reads and writes. To prepare the compaction input files, use the
  2015. // default filesystem from Env. To test the io priority of the compaction
  2016. // reads and writes, db_options_.fs is set as MockTestFileSystem.
  2017. class CompactionJobIOPriorityTest : public CompactionJobTestBase {
  2018. public:
  2019. CompactionJobIOPriorityTest()
  2020. : CompactionJobTestBase(
  2021. test::PerThreadDBPath("compaction_job_io_priority_test"),
  2022. BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
  2023. /*test_io_priority=*/true, TableTypeForTest::kBlockBasedTable) {}
  2024. };
  2025. TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
  2026. // When the state from WriteController is normal.
  2027. NewDB();
  2028. mock::KVVector expected_results = CreateTwoFiles(false);
  2029. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  2030. constexpr int input_level = 0;
  2031. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  2032. ASSERT_EQ(2U, files.size());
  2033. RunCompaction({files}, {input_level}, {expected_results}, {},
  2034. kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
  2035. Env::IO_LOW, Env::IO_LOW);
  2036. }
  2037. TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
  2038. // When the state from WriteController is Delayed.
  2039. NewDB();
  2040. mock::KVVector expected_results = CreateTwoFiles(false);
  2041. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  2042. constexpr int input_level = 0;
  2043. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  2044. ASSERT_EQ(2U, files.size());
  2045. {
  2046. std::unique_ptr<WriteControllerToken> delay_token =
  2047. write_controller_.GetDelayToken(1000000);
  2048. RunCompaction({files}, {input_level}, {expected_results}, {},
  2049. kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
  2050. Env::IO_USER, Env::IO_USER);
  2051. }
  2052. }
  2053. TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
  2054. // When the state from WriteController is Stalled.
  2055. NewDB();
  2056. mock::KVVector expected_results = CreateTwoFiles(false);
  2057. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  2058. constexpr int input_level = 0;
  2059. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  2060. ASSERT_EQ(2U, files.size());
  2061. {
  2062. std::unique_ptr<WriteControllerToken> stop_token =
  2063. write_controller_.GetStopToken();
  2064. RunCompaction({files}, {input_level}, {expected_results}, {},
  2065. kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
  2066. Env::IO_USER, Env::IO_USER);
  2067. }
  2068. }
  2069. TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
  2070. NewDB();
  2071. mock::KVVector expected_results = CreateTwoFiles(false);
  2072. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  2073. constexpr int input_level = 0;
  2074. auto files = cfd->current()->storage_info()->LevelFiles(input_level);
  2075. ASSERT_EQ(2U, files.size());
  2076. RunCompaction({files}, {input_level}, {expected_results}, {},
  2077. kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, true,
  2078. Env::IO_LOW, Env::IO_LOW);
  2079. }
  2080. class ResumableCompactionJobTest : public CompactionJobTestBase {
  2081. public:
  2082. ResumableCompactionJobTest()
  2083. : CompactionJobTestBase(
  2084. test::PerThreadDBPath("allow_resumption_job_test"),
  2085. BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
  2086. /*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {}
  2087. protected:
  2088. static constexpr const char* kCancelBeforeThisKey = "cancel_before_this_key";
  2089. std::string progress_dir_;
  2090. bool enable_cancel_ = false;
  2091. std::atomic<int> stop_count_{0};
  2092. std::atomic<bool> cancel_{false};
  2093. void SetUp() override {
  2094. CompactionJobTestBase::SetUp();
  2095. SyncPoint::GetInstance()->SetCallBack(
  2096. "CompactionOutputs::ShouldStopBefore::manual_decision",
  2097. [this](void* p) {
  2098. auto* pair = static_cast<std::pair<bool*, const Slice>*>(p);
  2099. *(pair->first) = true;
  2100. // Cancel after outputting a specific key
  2101. if (enable_cancel_) {
  2102. ParsedInternalKey parsed_key;
  2103. if (ParseInternalKey(pair->second, &parsed_key, true).ok()) {
  2104. if (parsed_key.user_key == kCancelBeforeThisKey) {
  2105. cancel_.store(true);
  2106. }
  2107. }
  2108. }
  2109. });
  2110. SyncPoint::GetInstance()->EnableProcessing();
  2111. }
  2112. void TearDown() override {
  2113. SyncPoint::GetInstance()->DisableProcessing();
  2114. SyncPoint::GetInstance()->ClearAllCallBacks();
  2115. if (env_->FileExists(progress_dir_).ok()) {
  2116. std::vector<std::string> files;
  2117. EXPECT_OK(env_->GetChildren(progress_dir_, &files));
  2118. for (const auto& file : files) {
  2119. if (file != "." && file != "..") {
  2120. EXPECT_OK(env_->DeleteFile(progress_dir_ + "/" + file));
  2121. }
  2122. }
  2123. EXPECT_OK(env_->DeleteDir(progress_dir_));
  2124. }
  2125. CompactionJobTestBase::TearDown();
  2126. }
  2127. void NewDB() {
  2128. if (env_->FileExists(progress_dir_).ok()) {
  2129. std::vector<std::string> files;
  2130. EXPECT_OK(env_->GetChildren(progress_dir_, &files));
  2131. for (const auto& file : files) {
  2132. if (file != "." && file != "..") {
  2133. EXPECT_OK(env_->DeleteFile(progress_dir_ + "/" + file));
  2134. }
  2135. }
  2136. EXPECT_OK(env_->DeleteDir(progress_dir_));
  2137. }
  2138. CompactionJobTestBase::NewDB();
  2139. progress_dir_ = test::PerThreadDBPath("compaction_progress");
  2140. ASSERT_OK(env_->CreateDirIfMissing(progress_dir_));
  2141. }
  2142. void EnableCompactionCancel() { enable_cancel_ = true; }
  2143. void DisableCompactionCancel() {
  2144. enable_cancel_ = false;
  2145. cancel_.store(false);
  2146. }
  2147. std::unique_ptr<log::Writer> CreateCompactionProgressWriter(
  2148. const std::string& compaction_progress_file) {
  2149. std::unique_ptr<FSWritableFile> file;
  2150. EXPECT_OK(fs_->NewWritableFile(compaction_progress_file, FileOptions(),
  2151. &file, nullptr));
  2152. auto file_writer = std::make_unique<WritableFileWriter>(
  2153. std::move(file), compaction_progress_file, FileOptions());
  2154. auto compaction_progress_writer =
  2155. std::make_unique<log::Writer>(std::move(file_writer), 0, false);
  2156. return compaction_progress_writer;
  2157. }
  2158. Status RunCompactionWithProgressTracking(
  2159. const CompactionProgress& compaction_progress,
  2160. log::Writer* compaction_progress_writer,
  2161. std::vector<SequenceNumber> snapshots = {},
  2162. std::shared_ptr<Statistics> stats = nullptr) {
  2163. mutex_.Lock();
  2164. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  2165. auto files = cfd->current()->storage_info()->LevelFiles(0);
  2166. db_options_.statistics = stats;
  2167. db_options_.stats = db_options_.statistics.get();
  2168. std::vector<CompactionInputFiles> compaction_input_files;
  2169. CompactionInputFiles level;
  2170. level.level = 0;
  2171. level.files = files;
  2172. compaction_input_files.push_back(level);
  2173. Compaction compaction(
  2174. cfd->current()->storage_info(), cfd->ioptions(),
  2175. cfd->GetLatestMutableCFOptions(), mutable_db_options_,
  2176. compaction_input_files, 1, mutable_cf_options_.target_file_size_base,
  2177. mutable_cf_options_.max_compaction_bytes, 0, kNoCompression,
  2178. cfd->GetLatestMutableCFOptions().compression_opts,
  2179. Temperature::kUnknown, 0, {}, std::nullopt, nullptr,
  2180. CompactionReason::kManualCompaction);
  2181. compaction.FinalizeInputInfo(cfd->current());
  2182. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
  2183. EventLogger event_logger(db_options_.info_log.get());
  2184. JobContext job_context(1, false);
  2185. job_context.InitSnapshotContext(nullptr, nullptr, kMaxSequenceNumber,
  2186. std::move(snapshots));
  2187. CompactionJobStats job_stats;
  2188. CompactionJob compaction_job(
  2189. 0, &compaction, db_options_, mutable_db_options_, env_options_,
  2190. versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
  2191. nullptr, stats.get(), &mutex_, &error_handler_, &job_context,
  2192. table_cache_, &event_logger, false, false, dbname_, &job_stats,
  2193. Env::Priority::USER, nullptr, cancel_, env_->GenerateUniqueId(),
  2194. DBImpl::GenerateDbSessionId(nullptr), "");
  2195. compaction_job.Prepare(std::nullopt, compaction_progress,
  2196. compaction_progress_writer);
  2197. mutex_.Unlock();
  2198. compaction_job.Run().PermitUncheckedError();
  2199. EXPECT_OK(compaction_job.io_status());
  2200. mutex_.Lock();
  2201. bool compaction_released = false;
  2202. Status s = compaction_job.Install(&compaction_released);
  2203. mutex_.Unlock();
  2204. if (!compaction_released) {
  2205. compaction.ReleaseCompactionFiles(s);
  2206. }
  2207. return s;
  2208. }
  2209. SubcompactionProgress ReadAndParseProgress(
  2210. const std::string& compaction_progress_file) {
  2211. std::unique_ptr<FSSequentialFile> seq_file;
  2212. EXPECT_OK(fs_->NewSequentialFile(compaction_progress_file, FileOptions(),
  2213. &seq_file, nullptr));
  2214. auto file_reader = std::make_unique<SequentialFileReader>(
  2215. std::move(seq_file), compaction_progress_file, 0, nullptr);
  2216. log::Reader reader(nullptr, std::move(file_reader), nullptr, true, 0);
  2217. SubcompactionProgressBuilder builder;
  2218. std::string record;
  2219. Slice slice;
  2220. while (reader.ReadRecord(&slice, &record)) {
  2221. VersionEdit edit;
  2222. if (!edit.DecodeFrom(slice).ok()) {
  2223. continue;
  2224. }
  2225. builder.ProcessVersionEdit(edit);
  2226. }
  2227. EXPECT_TRUE(builder.HasAccumulatedSubcompactionProgress());
  2228. return builder.GetAccumulatedSubcompactionProgress();
  2229. }
  2230. // Test utility function to verify that compaction progress was correctly
  2231. // persisted to the progress file after compaction interruption.
  2232. //
  2233. // VERIFIES:
  2234. // - Progress file exists and has expected size (empty if no progress
  2235. // expected)
  2236. // - Next internal key to compact matches expected user key with proper format
  2237. // - Number of processed input records matches position in ordered input keys
  2238. // - Number of processed output records equals number of processed input
  2239. // records (by test design to simplify verification)
  2240. // - Each output file contains exactly one user key (by test design to
  2241. // simplify verification)
  2242. void VerifyCompactionProgressPersisted(
  2243. const std::string& compaction_progress_file,
  2244. const std::string& next_user_key_to_compact,
  2245. const std::vector<std::string>& ordered_intput_keys) {
  2246. ASSERT_OK(env_->FileExists(compaction_progress_file));
  2247. uint64_t file_size;
  2248. ASSERT_OK(env_->GetFileSize(compaction_progress_file, &file_size));
  2249. if (next_user_key_to_compact.empty()) {
  2250. ASSERT_EQ(file_size, 0);
  2251. return;
  2252. }
  2253. const auto& subcompaction_progress =
  2254. ReadAndParseProgress(compaction_progress_file);
  2255. ASSERT_FALSE(subcompaction_progress.next_internal_key_to_compact.empty());
  2256. ParsedInternalKey parsed_next_key;
  2257. ASSERT_OK(
  2258. ParseInternalKey(subcompaction_progress.next_internal_key_to_compact,
  2259. &parsed_next_key, true /* log_err_key */));
  2260. ASSERT_EQ(parsed_next_key.user_key, next_user_key_to_compact);
  2261. ASSERT_EQ(parsed_next_key.sequence, kMaxSequenceNumber);
  2262. ASSERT_EQ(parsed_next_key.type, kValueTypeForSeek);
  2263. auto it = std::find(ordered_intput_keys.begin(), ordered_intput_keys.end(),
  2264. next_user_key_to_compact);
  2265. ASSERT_TRUE(it != ordered_intput_keys.end());
  2266. auto next_key_index = std::distance(ordered_intput_keys.begin(), it);
  2267. ASSERT_EQ(subcompaction_progress.num_processed_input_records,
  2268. next_key_index);
  2269. ASSERT_EQ(subcompaction_progress.output_level_progress
  2270. .GetNumProcessedOutputRecords(),
  2271. next_key_index);
  2272. ASSERT_EQ(
  2273. subcompaction_progress.output_level_progress.GetOutputFiles().size(),
  2274. next_key_index);
  2275. for (size_t i = 0;
  2276. i <
  2277. subcompaction_progress.output_level_progress.GetOutputFiles().size();
  2278. ++i) {
  2279. const auto& output_file =
  2280. subcompaction_progress.output_level_progress.GetOutputFiles()[i];
  2281. ASSERT_EQ(output_file.smallest.user_key().ToString(),
  2282. output_file.largest.user_key().ToString());
  2283. ASSERT_EQ(output_file.largest.user_key().ToString(),
  2284. ordered_intput_keys[i]);
  2285. }
  2286. }
  2287. void RunCancelAndResumeTest(
  2288. const std::initializer_list<mock::KVPair>& input_file_1,
  2289. const std::initializer_list<mock::KVPair>& input_file_2,
  2290. uint64_t last_sequence, const std::vector<uint64_t>& snapshots,
  2291. const std::string& expected_next_key_to_compact,
  2292. const std::vector<std::string>& expected_input_keys, bool exists_progress,
  2293. bool cancelled_past_mid_point = false) {
  2294. std::shared_ptr<Statistics> stats = ROCKSDB_NAMESPACE::CreateDBStatistics();
  2295. auto file1 = mock::MakeMockFile(input_file_1);
  2296. AddMockFile(file1);
  2297. auto file2 = mock::MakeMockFile(input_file_2);
  2298. AddMockFile(file2);
  2299. SetLastSequence(last_sequence);
  2300. // First compaction (will be cancelled)
  2301. std::string compaction_progress_file =
  2302. CompactionProgressFileName(progress_dir_, 123);
  2303. std::unique_ptr<log::Writer> compaction_progress_writer =
  2304. CreateCompactionProgressWriter(compaction_progress_file);
  2305. ASSERT_OK(stats->Reset());
  2306. EnableCompactionCancel();
  2307. Status status = RunCompactionWithProgressTracking(
  2308. CompactionProgress{}, compaction_progress_writer.get(), snapshots,
  2309. stats);
  2310. ASSERT_TRUE(status.IsManualCompactionPaused());
  2311. DisableCompactionCancel();
  2312. HistogramData cancelled_compaction_stats;
  2313. stats->histogramData(FILE_WRITE_COMPACTION_MICROS,
  2314. &cancelled_compaction_stats);
  2315. VerifyCompactionProgressPersisted(compaction_progress_file,
  2316. expected_next_key_to_compact,
  2317. expected_input_keys);
  2318. // Resume compaction
  2319. CompactionProgress compaction_progress;
  2320. if (exists_progress) {
  2321. compaction_progress.push_back(
  2322. ReadAndParseProgress(compaction_progress_file));
  2323. }
  2324. std::string compaction_progress_file_2 =
  2325. CompactionProgressFileName(progress_dir_, 234);
  2326. std::unique_ptr<log::Writer> compaction_progress_writer_2 =
  2327. CreateCompactionProgressWriter(compaction_progress_file_2);
  2328. ASSERT_OK(stats->Reset());
  2329. status = RunCompactionWithProgressTracking(
  2330. compaction_progress, compaction_progress_writer_2.get(),
  2331. {} /* snapshots */, stats);
  2332. ASSERT_OK(status);
  2333. if (cancelled_past_mid_point) {
  2334. HistogramData resumed_compaction_stats;
  2335. stats->histogramData(FILE_WRITE_COMPACTION_MICROS,
  2336. &resumed_compaction_stats);
  2337. ASSERT_GT(cancelled_compaction_stats.count,
  2338. resumed_compaction_stats.count);
  2339. }
  2340. }
  2341. };
  2342. TEST_F(ResumableCompactionJobTest, BasicProgressPersistence) {
  2343. NewDB();
  2344. auto file1 = mock::MakeMockFile({
  2345. {KeyStr("a", 1U, kTypeValue), "val1"},
  2346. {KeyStr("b", 2U, kTypeValue), "val2"},
  2347. });
  2348. AddMockFile(file1);
  2349. auto file2 = mock::MakeMockFile({
  2350. {KeyStr("c", 3U, kTypeValue), "val3"},
  2351. {KeyStr("d", 4U, kTypeValue), "val4"},
  2352. });
  2353. AddMockFile(file2);
  2354. SetLastSequence(4U);
  2355. std::string compaction_progress_file =
  2356. CompactionProgressFileName(progress_dir_, 123);
  2357. std::unique_ptr<log::Writer> compaction_progress_writer =
  2358. CreateCompactionProgressWriter(compaction_progress_file);
  2359. Status status = RunCompactionWithProgressTracking(
  2360. CompactionProgress(), compaction_progress_writer.get());
  2361. ASSERT_OK(status);
  2362. VerifyCompactionProgressPersisted(
  2363. compaction_progress_file, "d" /* next_user_key_to_compact */,
  2364. {"a", "b", "c", "d"} /* ordered_intput_keys */);
  2365. }
  2366. TEST_F(ResumableCompactionJobTest, BasicProgressResume) {
  2367. NewDB();
  2368. RunCancelAndResumeTest(
  2369. {{KeyStr("a", 1U, kTypeValue), "val1"},
  2370. {KeyStr("b", 2U, kTypeValue), "val2"}} /* input_file_1 */,
  2371. {{KeyStr("bb", 3U, kTypeValue), "val3"},
  2372. {KeyStr(kCancelBeforeThisKey, 4U, kTypeValue),
  2373. "val4"}} /* input_file_2 */,
  2374. 4U /* last_sequence */, {} /* snapshots */,
  2375. kCancelBeforeThisKey /* expected_next_key_to_compact */,
  2376. {"a", "b", "bb", kCancelBeforeThisKey} /* expected_input_keys */,
  2377. true /* exists_progress */, true /* cancelled_past_mid_point*/);
  2378. }
  2379. TEST_F(ResumableCompactionJobTest, NoProgressResumeOnSameKey) {
  2380. NewDB();
  2381. RunCancelAndResumeTest(
  2382. {{KeyStr(kCancelBeforeThisKey, 1U, kTypeValue),
  2383. "val1"}} /* input_file_1 */,
  2384. {{KeyStr(kCancelBeforeThisKey, 2U, kTypeValue),
  2385. "val2"}} /* input_file_2 */,
  2386. 2U /* last_sequence */, {1U} /* snapshots */,
  2387. "" /* expected_next_key_to_compact */,
  2388. {kCancelBeforeThisKey, kCancelBeforeThisKey} /* expected_input_keys */,
  2389. false /* exists_progress */);
  2390. }
  2391. TEST_F(ResumableCompactionJobTest, NoProgressResumeOnDeleteRange) {
  2392. NewDB();
  2393. RunCancelAndResumeTest(
  2394. {{KeyStr(kCancelBeforeThisKey, 1U, kTypeValue),
  2395. "val1"}} /* input_file_1 */,
  2396. {{KeyStr(kCancelBeforeThisKey, 2U, kTypeRangeDeletion),
  2397. "val2"}} /* input_file_2 */,
  2398. 2U /* last_sequence */, {1U} /* snapshots */,
  2399. "" /* expected_next_key_to_compact */,
  2400. {kCancelBeforeThisKey, kCancelBeforeThisKey} /* expected_input_keys */,
  2401. false /* exists_progress */);
  2402. }
  2403. TEST_F(ResumableCompactionJobTest, NoProgressResumeOnMerge) {
  2404. merge_op_ = MergeOperators::CreateStringAppendOperator();
  2405. NewDB();
  2406. RunCancelAndResumeTest(
  2407. {{KeyStr("a", 1U, kTypeValue), "val1"},
  2408. {KeyStr("b", 2U, kTypeValue), "val2"}} /* input_file_1 */,
  2409. {{KeyStr("bb", 3U, kTypeValue), "val3"},
  2410. {KeyStr(kCancelBeforeThisKey, 4U, kTypeMerge),
  2411. "val4"}} /* input_file_2 */,
  2412. 4U /* last_sequence */, {} /* snapshots */,
  2413. "bb" /* expected_next_key_to_compact */,
  2414. {"a", "b", "bb", kCancelBeforeThisKey} /* expected_input_keys */,
  2415. true /* exists_progress */);
  2416. }
  2417. TEST_F(ResumableCompactionJobTest, NoProgressResumeOnSingleDelete) {
  2418. NewDB();
  2419. RunCancelAndResumeTest(
  2420. {{KeyStr("a", 1U, kTypeValue), "val1"},
  2421. {KeyStr("b", 2U, kTypeValue), "val2"},
  2422. {KeyStr(kCancelBeforeThisKey, 3U, kTypeValue),
  2423. "val3"}} /* input_file_1 */,
  2424. {{KeyStr(kCancelBeforeThisKey, 4U, kTypeSingleDeletion), ""},
  2425. {KeyStr("d", 5U, kTypeValue), "val4"}} /* input_file_2 */,
  2426. 5U /* last_sequence */, {3U} /* snapshots */,
  2427. "b" /* expected_next_key_to_compact */,
  2428. {"a", "b", kCancelBeforeThisKey, kCancelBeforeThisKey,
  2429. "d"} /* expected_input_keys */,
  2430. true /* exists_progress */);
  2431. }
  2432. TEST_F(ResumableCompactionJobTest, NoProgressResumeOnDeletionAtBottom) {
  2433. NewDB();
  2434. RunCancelAndResumeTest(
  2435. {{KeyStr("a", 1U, kTypeValue), "val1"},
  2436. {KeyStr("b", 2U, kTypeValue), "val2"},
  2437. {KeyStr(kCancelBeforeThisKey, 3U, kTypeValue),
  2438. "val3"}} /* input_file_1 */,
  2439. {{KeyStr(kCancelBeforeThisKey, 4U, kTypeDeletion), ""},
  2440. {KeyStr("d", 5U, kTypeValue), "val4"}} /* input_file_2 */,
  2441. 5U /* last_sequence */, {3U} /* snapshots */,
  2442. "b" /* expected_next_key_to_compact */,
  2443. {"a", "b", kCancelBeforeThisKey, kCancelBeforeThisKey,
  2444. "d"} /* expected_input_keys */,
  2445. true /* exists_progress */);
  2446. }
  2447. } // namespace ROCKSDB_NAMESPACE
  2448. int main(int argc, char** argv) {
  2449. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2450. ::testing::InitGoogleTest(&argc, argv);
  2451. RegisterCustomObjects(argc, argv);
  2452. return RUN_ALL_TESTS();
  2453. }