compaction_job_stats_test.cc 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <algorithm>
  10. #include <cinttypes>
  11. #include <iostream>
  12. #include <mutex>
  13. #include <queue>
  14. #include <set>
  15. #include <thread>
  16. #include <unordered_set>
  17. #include <utility>
  18. #include "db/db_impl/db_impl.h"
  19. #include "db/dbformat.h"
  20. #include "db/job_context.h"
  21. #include "db/version_set.h"
  22. #include "db/write_batch_internal.h"
  23. #include "env/mock_env.h"
  24. #include "file/filename.h"
  25. #include "monitoring/statistics_impl.h"
  26. #include "monitoring/thread_status_util.h"
  27. #include "port/stack_trace.h"
  28. #include "rocksdb/cache.h"
  29. #include "rocksdb/compaction_filter.h"
  30. #include "rocksdb/convenience.h"
  31. #include "rocksdb/db.h"
  32. #include "rocksdb/env.h"
  33. #include "rocksdb/experimental.h"
  34. #include "rocksdb/filter_policy.h"
  35. #include "rocksdb/options.h"
  36. #include "rocksdb/perf_context.h"
  37. #include "rocksdb/slice.h"
  38. #include "rocksdb/slice_transform.h"
  39. #include "rocksdb/table.h"
  40. #include "rocksdb/table_properties.h"
  41. #include "rocksdb/thread_status.h"
  42. #include "rocksdb/utilities/checkpoint.h"
  43. #include "rocksdb/utilities/write_batch_with_index.h"
  44. #include "table/block_based/block_based_table_factory.h"
  45. #include "table/mock_table.h"
  46. #include "table/plain/plain_table_factory.h"
  47. #include "test_util/sync_point.h"
  48. #include "test_util/testharness.h"
  49. #include "test_util/testutil.h"
  50. #include "util/cast_util.h"
  51. #include "util/compression.h"
  52. #include "util/hash.h"
  53. #include "util/mutexlock.h"
  54. #include "util/rate_limiter_impl.h"
  55. #include "util/string_util.h"
  56. #include "utilities/merge_operators.h"
  57. #if !defined(IOS_CROSS_COMPILE)
  58. namespace ROCKSDB_NAMESPACE {
  59. static std::string RandomString(Random* rnd, int len, double ratio) {
  60. std::string r;
  61. test::CompressibleString(rnd, ratio, len, &r);
  62. return r;
  63. }
  64. std::string Key(uint64_t key, int length) {
  65. const int kBufSize = 1000;
  66. char buf[kBufSize];
  67. if (length > kBufSize) {
  68. length = kBufSize;
  69. }
  70. snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
  71. return std::string(buf);
  72. }
  73. class CompactionJobStatsTest : public testing::Test,
  74. public testing::WithParamInterface<bool> {
  75. public:
  76. std::string dbname_;
  77. std::string alternative_wal_dir_;
  78. Env* env_;
  79. DB* db_;
  80. std::vector<ColumnFamilyHandle*> handles_;
  81. uint32_t max_subcompactions_;
  82. Options last_options_;
  83. CompactionJobStatsTest() : env_(Env::Default()) {
  84. env_->SetBackgroundThreads(1, Env::LOW);
  85. env_->SetBackgroundThreads(1, Env::HIGH);
  86. dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
  87. alternative_wal_dir_ = dbname_ + "/wal";
  88. Options options;
  89. options.create_if_missing = true;
  90. max_subcompactions_ = GetParam();
  91. options.max_subcompactions = max_subcompactions_;
  92. auto delete_options = options;
  93. delete_options.wal_dir = alternative_wal_dir_;
  94. EXPECT_OK(DestroyDB(dbname_, delete_options));
  95. // Destroy it for not alternative WAL dir is used.
  96. EXPECT_OK(DestroyDB(dbname_, options));
  97. db_ = nullptr;
  98. Reopen(options);
  99. }
  100. ~CompactionJobStatsTest() override {
  101. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  102. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
  103. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  104. Close();
  105. Options options;
  106. options.db_paths.emplace_back(dbname_, 0);
  107. options.db_paths.emplace_back(dbname_ + "_2", 0);
  108. options.db_paths.emplace_back(dbname_ + "_3", 0);
  109. options.db_paths.emplace_back(dbname_ + "_4", 0);
  110. EXPECT_OK(DestroyDB(dbname_, options));
  111. }
  112. // Required if inheriting from testing::WithParamInterface<>
  113. static void SetUpTestCase() {}
  114. static void TearDownTestCase() {}
  115. DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }
  116. void CreateColumnFamilies(const std::vector<std::string>& cfs,
  117. const Options& options) {
  118. ColumnFamilyOptions cf_opts(options);
  119. size_t cfi = handles_.size();
  120. handles_.resize(cfi + cfs.size());
  121. for (const auto& cf : cfs) {
  122. ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
  123. }
  124. }
  125. void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
  126. const Options& options) {
  127. CreateColumnFamilies(cfs, options);
  128. std::vector<std::string> cfs_plus_default = cfs;
  129. cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
  130. ReopenWithColumnFamilies(cfs_plus_default, options);
  131. }
  132. void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
  133. const std::vector<Options>& options) {
  134. ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  135. }
  136. void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
  137. const Options& options) {
  138. ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  139. }
  140. Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
  141. const std::vector<Options>& options) {
  142. Close();
  143. EXPECT_EQ(cfs.size(), options.size());
  144. std::vector<ColumnFamilyDescriptor> column_families;
  145. for (size_t i = 0; i < cfs.size(); ++i) {
  146. column_families.emplace_back(cfs[i], options[i]);
  147. }
  148. DBOptions db_opts = DBOptions(options[0]);
  149. return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
  150. }
  151. Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
  152. const Options& options) {
  153. Close();
  154. std::vector<Options> v_opts(cfs.size(), options);
  155. return TryReopenWithColumnFamilies(cfs, v_opts);
  156. }
  157. void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); }
  158. void Close() {
  159. for (auto h : handles_) {
  160. delete h;
  161. }
  162. handles_.clear();
  163. delete db_;
  164. db_ = nullptr;
  165. }
  166. void DestroyAndReopen(const Options& options) {
  167. // Destroy using last options
  168. Destroy(last_options_);
  169. ASSERT_OK(TryReopen(options));
  170. }
  171. void Destroy(const Options& options) {
  172. Close();
  173. ASSERT_OK(DestroyDB(dbname_, options));
  174. }
  175. Status ReadOnlyReopen(const Options& options) {
  176. return DB::OpenForReadOnly(options, dbname_, &db_);
  177. }
  178. Status TryReopen(const Options& options) {
  179. Close();
  180. last_options_ = options;
  181. return DB::Open(options, dbname_, &db_);
  182. }
  183. Status Flush(int cf = 0) {
  184. if (cf == 0) {
  185. return db_->Flush(FlushOptions());
  186. } else {
  187. return db_->Flush(FlushOptions(), handles_[cf]);
  188. }
  189. }
  190. Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
  191. return db_->Put(wo, k, v);
  192. }
  193. Status Put(int cf, const Slice& k, const Slice& v,
  194. WriteOptions wo = WriteOptions()) {
  195. return db_->Put(wo, handles_[cf], k, v);
  196. }
  197. Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); }
  198. Status Delete(int cf, const std::string& k) {
  199. return db_->Delete(WriteOptions(), handles_[cf], k);
  200. }
  201. std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
  202. ReadOptions options;
  203. options.verify_checksums = true;
  204. options.snapshot = snapshot;
  205. std::string result;
  206. Status s = db_->Get(options, k, &result);
  207. if (s.IsNotFound()) {
  208. result = "NOT_FOUND";
  209. } else if (!s.ok()) {
  210. result = s.ToString();
  211. }
  212. return result;
  213. }
  214. std::string Get(int cf, const std::string& k,
  215. const Snapshot* snapshot = nullptr) {
  216. ReadOptions options;
  217. options.verify_checksums = true;
  218. options.snapshot = snapshot;
  219. std::string result;
  220. Status s = db_->Get(options, handles_[cf], k, &result);
  221. if (s.IsNotFound()) {
  222. result = "NOT_FOUND";
  223. } else if (!s.ok()) {
  224. result = s.ToString();
  225. }
  226. return result;
  227. }
  228. int NumTableFilesAtLevel(int level, int cf = 0) {
  229. std::string property;
  230. if (cf == 0) {
  231. // default cfd
  232. EXPECT_TRUE(db_->GetProperty(
  233. "rocksdb.num-files-at-level" + std::to_string(level), &property));
  234. } else {
  235. EXPECT_TRUE(db_->GetProperty(
  236. handles_[cf], "rocksdb.num-files-at-level" + std::to_string(level),
  237. &property));
  238. }
  239. return atoi(property.c_str());
  240. }
  241. // Return spread of files per level
  242. std::string FilesPerLevel(int cf = 0) {
  243. int num_levels =
  244. (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
  245. std::string result;
  246. size_t last_non_zero_offset = 0;
  247. for (int level = 0; level < num_levels; level++) {
  248. int f = NumTableFilesAtLevel(level, cf);
  249. char buf[100];
  250. snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
  251. result += buf;
  252. if (f > 0) {
  253. last_non_zero_offset = result.size();
  254. }
  255. }
  256. result.resize(last_non_zero_offset);
  257. return result;
  258. }
  259. Status Size(uint64_t* size, const Slice& start, const Slice& limit,
  260. int cf = 0) {
  261. Range r(start, limit);
  262. if (cf == 0) {
  263. return db_->GetApproximateSizes(&r, 1, size);
  264. } else {
  265. return db_->GetApproximateSizes(handles_[1], &r, 1, size);
  266. }
  267. }
  268. void Compact(int cf, const Slice& start, const Slice& limit,
  269. uint32_t target_path_id) {
  270. CompactRangeOptions compact_options;
  271. compact_options.target_path_id = target_path_id;
  272. ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
  273. }
  274. void Compact(int cf, const Slice& start, const Slice& limit) {
  275. ASSERT_OK(
  276. db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
  277. }
  278. void Compact(const Slice& start, const Slice& limit) {
  279. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
  280. }
  281. void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
  282. ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
  283. true /* disallow trivial move */));
  284. }
  285. // Do n memtable compactions, each of which produces an sstable
  286. // covering the range [small,large].
  287. void MakeTables(int n, const std::string& small, const std::string& large,
  288. int cf = 0) {
  289. for (int i = 0; i < n; i++) {
  290. ASSERT_OK(Put(cf, small, "begin"));
  291. ASSERT_OK(Put(cf, large, "end"));
  292. ASSERT_OK(Flush(cf));
  293. }
  294. }
  295. static void SetDeletionCompactionStats(CompactionJobStats* stats,
  296. uint64_t input_deletions,
  297. uint64_t expired_deletions,
  298. uint64_t records_replaced) {
  299. stats->num_input_deletion_records = input_deletions;
  300. stats->num_expired_deletion_records = expired_deletions;
  301. stats->num_records_replaced = records_replaced;
  302. }
  303. void MakeTableWithKeyValues(Random* rnd, uint64_t smallest, uint64_t largest,
  304. int key_size, int value_size, uint64_t interval,
  305. double ratio, int cf = 0) {
  306. for (auto key = smallest; key < largest; key += interval) {
  307. ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
  308. Slice(RandomString(rnd, value_size, ratio))));
  309. }
  310. ASSERT_OK(Flush(cf));
  311. }
  312. // This function behaves with the implicit understanding that two
  313. // rounds of keys are inserted into the database, as per the behavior
  314. // of the DeletionStatsTest.
  315. void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
  316. uint64_t interval, int deletion_interval,
  317. int key_size, uint64_t cutoff_key_num,
  318. CompactionJobStats* stats, int cf = 0) {
  319. // interval needs to be >= 2 so that deletion entries can be inserted
  320. // that are intended to not result in an actual key deletion by using
  321. // an offset of 1 from another existing key
  322. ASSERT_GE(interval, 2);
  323. uint64_t ctr = 1;
  324. uint32_t deletions_made = 0;
  325. uint32_t num_deleted = 0;
  326. uint32_t num_expired = 0;
  327. for (auto key = smallest; key <= largest; key += interval, ctr++) {
  328. if (ctr % deletion_interval == 0) {
  329. ASSERT_OK(Delete(cf, Key(key, key_size)));
  330. deletions_made++;
  331. num_deleted++;
  332. if (key > cutoff_key_num) {
  333. num_expired++;
  334. }
  335. }
  336. }
  337. // Insert some deletions for keys that don't exist that
  338. // are both in and out of the key range
  339. ASSERT_OK(Delete(cf, Key(smallest + 1, key_size)));
  340. deletions_made++;
  341. ASSERT_OK(Delete(cf, Key(smallest - 1, key_size)));
  342. deletions_made++;
  343. num_expired++;
  344. ASSERT_OK(Delete(cf, Key(smallest - 9, key_size)));
  345. deletions_made++;
  346. num_expired++;
  347. ASSERT_OK(Flush(cf));
  348. SetDeletionCompactionStats(stats, deletions_made, num_expired, num_deleted);
  349. }
  350. };
  351. // An EventListener which helps verify the compaction results in
  352. // test CompactionJobStatsTest.
  353. class CompactionJobStatsChecker : public EventListener {
  354. public:
  355. CompactionJobStatsChecker()
  356. : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
  357. size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
  358. void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
  359. // Once a compaction completed, this function will verify the returned
  360. // CompactionJobInfo with the oldest CompactionJobInfo added earlier
  361. // in "expected_stats_" which has not yet being used for verification.
  362. void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
  363. if (verify_next_comp_io_stats_) {
  364. ASSERT_GT(ci.stats.file_write_nanos, 0);
  365. ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
  366. ASSERT_GT(ci.stats.file_fsync_nanos, 0);
  367. ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
  368. verify_next_comp_io_stats_ = false;
  369. }
  370. std::lock_guard<std::mutex> lock(mutex_);
  371. if (expected_stats_.size()) {
  372. Verify(ci.stats, expected_stats_.front());
  373. expected_stats_.pop();
  374. }
  375. }
  376. // A helper function which verifies whether two CompactionJobStats
  377. // match. The verification of all compaction stats are done by
  378. // ASSERT_EQ except for the total input / output bytes, which we
  379. // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
  380. // 10% in uncompressed case and 20% when compression is used.
  381. virtual void Verify(const CompactionJobStats& current_stats,
  382. const CompactionJobStats& stats) {
  383. // time
  384. ASSERT_GT(current_stats.elapsed_micros, 0U);
  385. ASSERT_EQ(current_stats.num_input_records, stats.num_input_records);
  386. ASSERT_EQ(current_stats.num_input_files, stats.num_input_files);
  387. ASSERT_EQ(current_stats.num_input_files_at_output_level,
  388. stats.num_input_files_at_output_level);
  389. ASSERT_EQ(current_stats.num_output_records, stats.num_output_records);
  390. ASSERT_EQ(current_stats.num_output_files, stats.num_output_files);
  391. ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
  392. ASSERT_EQ(current_stats.is_manual_compaction, stats.is_manual_compaction);
  393. // file size
  394. double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
  395. ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
  396. stats.total_input_bytes);
  397. ASSERT_LE(current_stats.total_input_bytes,
  398. stats.total_input_bytes * (1.00 + kFileSizeBias));
  399. ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
  400. stats.total_output_bytes);
  401. ASSERT_LE(current_stats.total_output_bytes,
  402. stats.total_output_bytes * (1.00 + kFileSizeBias));
  403. ASSERT_EQ(current_stats.total_input_raw_key_bytes,
  404. stats.total_input_raw_key_bytes);
  405. ASSERT_EQ(current_stats.total_input_raw_value_bytes,
  406. stats.total_input_raw_value_bytes);
  407. ASSERT_EQ(current_stats.num_records_replaced, stats.num_records_replaced);
  408. ASSERT_EQ(current_stats.num_corrupt_keys, stats.num_corrupt_keys);
  409. ASSERT_EQ(std::string(current_stats.smallest_output_key_prefix),
  410. std::string(stats.smallest_output_key_prefix));
  411. ASSERT_EQ(std::string(current_stats.largest_output_key_prefix),
  412. std::string(stats.largest_output_key_prefix));
  413. }
  414. // Add an expected compaction stats, which will be used to
  415. // verify the CompactionJobStats returned by the OnCompactionCompleted()
  416. // callback.
  417. void AddExpectedStats(const CompactionJobStats& stats) {
  418. std::lock_guard<std::mutex> lock(mutex_);
  419. expected_stats_.push(stats);
  420. }
  421. void EnableCompression(bool flag) { compression_enabled_ = flag; }
  422. bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
  423. private:
  424. std::mutex mutex_;
  425. std::queue<CompactionJobStats> expected_stats_;
  426. bool compression_enabled_;
  427. bool verify_next_comp_io_stats_;
  428. };
  429. // An EventListener which helps verify the compaction statistics in
  430. // the test DeletionStatsTest.
  431. class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
  432. public:
  433. // Verifies whether two CompactionJobStats match.
  434. void Verify(const CompactionJobStats& current_stats,
  435. const CompactionJobStats& stats) override {
  436. ASSERT_EQ(current_stats.num_input_deletion_records,
  437. stats.num_input_deletion_records);
  438. ASSERT_EQ(current_stats.num_expired_deletion_records,
  439. stats.num_expired_deletion_records);
  440. ASSERT_EQ(current_stats.num_records_replaced, stats.num_records_replaced);
  441. ASSERT_EQ(current_stats.num_corrupt_keys, stats.num_corrupt_keys);
  442. }
  443. };
  444. namespace {
  445. uint64_t EstimatedFileSize(uint64_t num_records, size_t key_size,
  446. size_t value_size, double compression_ratio = 1.0,
  447. size_t block_size = 4096,
  448. int bloom_bits_per_key = 10) {
  449. const size_t kPerKeyOverhead = 8;
  450. const size_t kFooterSize = 512;
  451. uint64_t data_size = static_cast<uint64_t>(
  452. num_records *
  453. (key_size + value_size * compression_ratio + kPerKeyOverhead));
  454. return data_size + kFooterSize +
  455. num_records * bloom_bits_per_key / 8 // filter block
  456. + data_size * (key_size + 8) / block_size; // index block
  457. }
  458. namespace {
  459. void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
  460. assert(prefix_length > 0);
  461. size_t length = src.size() > prefix_length ? prefix_length : src.size();
  462. dst->assign(src.data(), length);
  463. }
  464. } // namespace
  465. CompactionJobStats NewManualCompactionJobStats(
  466. const std::string& smallest_key, const std::string& largest_key,
  467. size_t num_input_files, size_t num_input_files_at_output_level,
  468. uint64_t num_input_records, size_t key_size, size_t value_size,
  469. size_t num_output_files, uint64_t num_output_records,
  470. double compression_ratio, uint64_t num_records_replaced,
  471. bool is_full = false, bool is_manual = true) {
  472. CompactionJobStats stats;
  473. stats.Reset();
  474. stats.num_input_records = num_input_records;
  475. stats.num_input_files = num_input_files;
  476. stats.num_input_files_at_output_level = num_input_files_at_output_level;
  477. stats.num_output_records = num_output_records;
  478. stats.num_output_files = num_output_files;
  479. stats.total_input_bytes =
  480. EstimatedFileSize(num_input_records / num_input_files, key_size,
  481. value_size, compression_ratio) *
  482. num_input_files;
  483. stats.total_output_bytes =
  484. EstimatedFileSize(num_output_records / num_output_files, key_size,
  485. value_size, compression_ratio) *
  486. num_output_files;
  487. stats.total_input_raw_key_bytes = num_input_records * (key_size + 8);
  488. stats.total_input_raw_value_bytes = num_input_records * value_size;
  489. stats.is_full_compaction = is_full;
  490. stats.is_manual_compaction = is_manual;
  491. stats.num_records_replaced = num_records_replaced;
  492. CopyPrefix(smallest_key, CompactionJobStats::kMaxPrefixLength,
  493. &stats.smallest_output_key_prefix);
  494. CopyPrefix(largest_key, CompactionJobStats::kMaxPrefixLength,
  495. &stats.largest_output_key_prefix);
  496. return stats;
  497. }
  498. CompressionType GetAnyCompression() {
  499. if (Snappy_Supported()) {
  500. return kSnappyCompression;
  501. } else if (Zlib_Supported()) {
  502. return kZlibCompression;
  503. } else if (BZip2_Supported()) {
  504. return kBZip2Compression;
  505. } else if (LZ4_Supported()) {
  506. return kLZ4Compression;
  507. } else if (XPRESS_Supported()) {
  508. return kXpressCompression;
  509. }
  510. return kNoCompression;
  511. }
  512. } // namespace
  513. TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
  514. Random rnd(301);
  515. const int kBufSize = 100;
  516. char buf[kBufSize];
  517. uint64_t key_base = 100000000l;
  518. // Note: key_base must be multiple of num_keys_per_L0_file
  519. int num_keys_per_L0_file = 100;
  520. const int kTestScale = 8;
  521. const int kKeySize = 10;
  522. const int kValueSize = 1000;
  523. const double kCompressionRatio = 0.5;
  524. double compression_ratio = 1.0;
  525. uint64_t key_interval = key_base / num_keys_per_L0_file;
  526. // Whenever a compaction completes, this listener will try to
  527. // verify whether the returned CompactionJobStats matches
  528. // what we expect. The expected CompactionJobStats is added
  529. // via AddExpectedStats().
  530. auto* stats_checker = new CompactionJobStatsChecker();
  531. Options options;
  532. options.level_compaction_dynamic_level_bytes = false;
  533. options.listeners.emplace_back(stats_checker);
  534. options.create_if_missing = true;
  535. // just enough setting to hold off auto-compaction.
  536. options.level0_file_num_compaction_trigger = kTestScale + 1;
  537. options.num_levels = 3;
  538. options.compression = kNoCompression;
  539. options.max_subcompactions = max_subcompactions_;
  540. options.bytes_per_sync = 512 * 1024;
  541. options.report_bg_io_stats = true;
  542. for (int test = 0; test < 2; ++test) {
  543. DestroyAndReopen(options);
  544. CreateAndReopenWithCF({"pikachu"}, options);
  545. // 1st Phase: generate "num_L0_files" L0 files.
  546. int num_L0_files = 0;
  547. for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
  548. start_key += key_base) {
  549. MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1,
  550. kKeySize, kValueSize, key_interval,
  551. compression_ratio, 1);
  552. snprintf(buf, kBufSize, "%d", ++num_L0_files);
  553. ASSERT_EQ(std::string(buf), FilesPerLevel(1));
  554. }
  555. ASSERT_EQ(std::to_string(num_L0_files), FilesPerLevel(1));
  556. // 2nd Phase: perform L0 -> L1 compaction.
  557. int L0_compaction_count = 6;
  558. int count = 1;
  559. std::string smallest_key;
  560. std::string largest_key;
  561. for (uint64_t start_key = key_base;
  562. start_key <= key_base * L0_compaction_count;
  563. start_key += key_base, count++) {
  564. smallest_key = Key(start_key, 10);
  565. largest_key = Key(start_key + key_base - key_interval, 10);
  566. stats_checker->AddExpectedStats(NewManualCompactionJobStats(
  567. smallest_key, largest_key, 1, 0, num_keys_per_L0_file, kKeySize,
  568. kValueSize, 1, num_keys_per_L0_file, compression_ratio, 0));
  569. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
  570. TEST_Compact(0, 1, smallest_key, largest_key);
  571. snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
  572. ASSERT_EQ(std::string(buf), FilesPerLevel(1));
  573. }
  574. // compact two files into one in the last L0 -> L1 compaction
  575. int num_remaining_L0 = num_L0_files - L0_compaction_count;
  576. smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
  577. largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
  578. stats_checker->AddExpectedStats(NewManualCompactionJobStats(
  579. smallest_key, largest_key, num_remaining_L0, 0,
  580. num_keys_per_L0_file * num_remaining_L0, kKeySize, kValueSize, 1,
  581. num_keys_per_L0_file * num_remaining_L0, compression_ratio, 0));
  582. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
  583. TEST_Compact(0, 1, smallest_key, largest_key);
  584. int num_L1_files = num_L0_files - num_remaining_L0 + 1;
  585. num_L0_files = 0;
  586. snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
  587. ASSERT_EQ(std::string(buf), FilesPerLevel(1));
  588. // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
  589. int sparseness = 2;
  590. for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
  591. start_key += key_base * sparseness) {
  592. MakeTableWithKeyValues(
  593. &rnd, start_key, start_key + key_base * sparseness - 1, kKeySize,
  594. kValueSize, key_base * sparseness / num_keys_per_L0_file,
  595. compression_ratio, 1);
  596. snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
  597. ASSERT_EQ(std::string(buf), FilesPerLevel(1));
  598. }
  599. // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
  600. // When subcompactions are enabled, the number of output files increases
  601. // by 1 because multiple threads are consuming the input and generating
  602. // output files without coordinating to see if the output could fit into
  603. // a smaller number of files like it does when it runs sequentially
  604. int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
  605. for (uint64_t start_key = key_base; num_L0_files > 1;
  606. start_key += key_base * sparseness) {
  607. smallest_key = Key(start_key, 10);
  608. largest_key = Key(start_key + key_base * sparseness - key_interval, 10);
  609. stats_checker->AddExpectedStats(NewManualCompactionJobStats(
  610. smallest_key, largest_key, 3, 2, num_keys_per_L0_file * 3, kKeySize,
  611. kValueSize, num_output_files,
  612. num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
  613. compression_ratio, num_keys_per_L0_file));
  614. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
  615. Compact(1, smallest_key, largest_key);
  616. if (options.max_subcompactions == 1) {
  617. --num_L1_files;
  618. }
  619. snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
  620. ASSERT_EQ(std::string(buf), FilesPerLevel(1));
  621. }
  622. // 5th Phase: Do a full compaction, which involves in two sub-compactions.
  623. // Here we expect to have 1 L0 files and 4 L1 files
  624. // In the first sub-compaction, we expect L0 compaction.
  625. smallest_key = Key(key_base, 10);
  626. largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
  627. stats_checker->AddExpectedStats(NewManualCompactionJobStats(
  628. Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key, 2, 1,
  629. num_keys_per_L0_file * 3, kKeySize, kValueSize, 1,
  630. num_keys_per_L0_file * 2, compression_ratio, num_keys_per_L0_file));
  631. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
  632. Compact(1, smallest_key, largest_key);
  633. num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
  634. char L1_buf[4];
  635. snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
  636. std::string L1_files(L1_buf);
  637. ASSERT_EQ(L1_files, FilesPerLevel(1));
  638. options.compression = GetAnyCompression();
  639. if (options.compression == kNoCompression) {
  640. break;
  641. }
  642. stats_checker->EnableCompression(true);
  643. compression_ratio = kCompressionRatio;
  644. for (int i = 0; i < 5; i++) {
  645. ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
  646. Slice(RandomString(&rnd, 512 * 1024, 1))));
  647. }
  648. ASSERT_OK(Flush(1));
  649. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
  650. stats_checker->set_verify_next_comp_io_stats(true);
  651. std::atomic<bool> first_prepare_write(true);
  652. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  653. "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
  654. if (first_prepare_write.load()) {
  655. options.env->SleepForMicroseconds(3);
  656. first_prepare_write.store(false);
  657. }
  658. });
  659. std::atomic<bool> first_flush(true);
  660. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  661. "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
  662. if (first_flush.load()) {
  663. options.env->SleepForMicroseconds(3);
  664. first_flush.store(false);
  665. }
  666. });
  667. std::atomic<bool> first_sync(true);
  668. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  669. "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
  670. if (first_sync.load()) {
  671. options.env->SleepForMicroseconds(3);
  672. first_sync.store(false);
  673. }
  674. });
  675. std::atomic<bool> first_range_sync(true);
  676. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  677. "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
  678. if (first_range_sync.load()) {
  679. options.env->SleepForMicroseconds(3);
  680. first_range_sync.store(false);
  681. }
  682. });
  683. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  684. Compact(1, smallest_key, largest_key);
  685. ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
  686. ASSERT_TRUE(!first_prepare_write.load());
  687. ASSERT_TRUE(!first_flush.load());
  688. ASSERT_TRUE(!first_sync.load());
  689. ASSERT_TRUE(!first_range_sync.load());
  690. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  691. }
  692. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
  693. }
  694. TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
  695. Random rnd(301);
  696. uint64_t key_base = 100000l;
  697. // Note: key_base must be multiple of num_keys_per_L0_file
  698. int num_keys_per_L0_file = 20;
  699. const int kTestScale = 8; // make sure this is even
  700. const int kKeySize = 10;
  701. const int kValueSize = 100;
  702. double compression_ratio = 1.0;
  703. uint64_t key_interval = key_base / num_keys_per_L0_file;
  704. uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
  705. uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
  706. const std::string smallest_key = Key(key_base - 10, kKeySize);
  707. const std::string largest_key = Key(largest_key_num + 10, kKeySize);
  708. // Whenever a compaction completes, this listener will try to
  709. // verify whether the returned CompactionJobStats matches
  710. // what we expect.
  711. auto* stats_checker = new CompactionJobDeletionStatsChecker();
  712. Options options;
  713. options.level_compaction_dynamic_level_bytes = false;
  714. options.listeners.emplace_back(stats_checker);
  715. options.create_if_missing = true;
  716. options.level0_file_num_compaction_trigger = kTestScale + 1;
  717. options.num_levels = 3;
  718. options.compression = kNoCompression;
  719. options.max_bytes_for_level_multiplier = 2;
  720. options.max_subcompactions = max_subcompactions_;
  721. DestroyAndReopen(options);
  722. CreateAndReopenWithCF({"pikachu"}, options);
  723. // Stage 1: Generate several L0 files and then send them to L2 by
  724. // using CompactRangeOptions and CompactRange(). These files will
  725. // have a strict subset of the keys from the full key-range
  726. for (uint64_t start_key = key_base; start_key <= key_base * kTestScale / 2;
  727. start_key += key_base) {
  728. MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
  729. kValueSize, key_interval, compression_ratio, 1);
  730. }
  731. CompactRangeOptions cr_options;
  732. cr_options.change_level = true;
  733. cr_options.target_level = 2;
  734. ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
  735. ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
  736. // Stage 2: Generate files including keys from the entire key range
  737. for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
  738. start_key += key_base) {
  739. MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
  740. kValueSize, key_interval, compression_ratio, 1);
  741. }
  742. // Send these L0 files to L1
  743. TEST_Compact(0, 1, smallest_key, largest_key);
  744. ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
  745. // Add a new record and flush so now there is a L0 file
  746. // with a value too (not just deletions from the next step)
  747. ASSERT_OK(Put(1, Key(key_base - 6, kKeySize), "test"));
  748. ASSERT_OK(Flush(1));
  749. // Stage 3: Generate L0 files with some deletions so now
  750. // there are files with the same key range in L0, L1, and L2
  751. int deletion_interval = 3;
  752. CompactionJobStats first_compaction_stats;
  753. SelectivelyDeleteKeys(key_base, largest_key_num, key_interval,
  754. deletion_interval, kKeySize, cutoff_key_num,
  755. &first_compaction_stats, 1);
  756. stats_checker->AddExpectedStats(first_compaction_stats);
  757. // Stage 4: Trigger compaction and verify the stats
  758. TEST_Compact(0, 1, smallest_key, largest_key);
  759. }
  760. namespace {
  761. int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
  762. uint32_t compaction_input_units;
  763. for (compaction_input_units = 1; num_flushes >= compaction_input_units;
  764. compaction_input_units *= 2) {
  765. if ((num_flushes & compaction_input_units) != 0) {
  766. return compaction_input_units > 1 ? compaction_input_units : 0;
  767. }
  768. }
  769. return 0;
  770. }
  771. } // namespace
  772. TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
  773. Random rnd(301);
  774. uint64_t key_base = 100000000l;
  775. // Note: key_base must be multiple of num_keys_per_L0_file
  776. int num_keys_per_table = 100;
  777. const uint32_t kTestScale = 6;
  778. const int kKeySize = 10;
  779. const int kValueSize = 900;
  780. double compression_ratio = 1.0;
  781. uint64_t key_interval = key_base / num_keys_per_table;
  782. auto* stats_checker = new CompactionJobStatsChecker();
  783. Options options;
  784. options.listeners.emplace_back(stats_checker);
  785. options.create_if_missing = true;
  786. options.num_levels = 3;
  787. options.compression = kNoCompression;
  788. options.level0_file_num_compaction_trigger = 2;
  789. options.target_file_size_base = num_keys_per_table * 1000;
  790. options.compaction_style = kCompactionStyleUniversal;
  791. options.compaction_options_universal.size_ratio = 1;
  792. options.compaction_options_universal.max_size_amplification_percent = 1000;
  793. options.max_subcompactions = max_subcompactions_;
  794. DestroyAndReopen(options);
  795. CreateAndReopenWithCF({"pikachu"}, options);
  796. // Generates the expected CompactionJobStats for each compaction
  797. for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
  798. // Here we treat one newly flushed file as an unit.
  799. //
  800. // For example, if a newly flushed file is 100k, and a compaction has
  801. // 4 input units, then this compaction inputs 400k.
  802. uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
  803. if (num_input_units == 0) {
  804. continue;
  805. }
  806. // A full compaction only happens when the number of flushes equals to
  807. // the number of compaction input runs.
  808. bool is_full = num_flushes == num_input_units;
  809. // The following statement determines the expected smallest key
  810. // based on whether it is a full compaction.
  811. uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);
  812. stats_checker->AddExpectedStats(NewManualCompactionJobStats(
  813. Key(smallest_key, 10),
  814. Key(smallest_key + key_base * num_input_units - key_interval, 10),
  815. num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
  816. num_keys_per_table * num_input_units, kKeySize, kValueSize,
  817. num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
  818. false));
  819. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  820. }
  821. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
  822. for (uint64_t start_key = key_base; start_key <= key_base * kTestScale;
  823. start_key += key_base) {
  824. MakeTableWithKeyValues(&rnd, start_key, start_key + key_base - 1, kKeySize,
  825. kValueSize, key_interval, compression_ratio, 1);
  826. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
  827. }
  828. ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
  829. }
  830. INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
  831. ::testing::Values(1, 4));
  832. } // namespace ROCKSDB_NAMESPACE
  833. int main(int argc, char** argv) {
  834. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  835. ::testing::InitGoogleTest(&argc, argv);
  836. return RUN_ALL_TESTS();
  837. }
  838. #else
  839. int main(int /*argc*/, char** /*argv*/) { return 0; }
  840. #endif // !defined(IOS_CROSS_COMPILE)