flush_job_test.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <algorithm>
  6. #include <array>
  7. #include <map>
  8. #include <string>
  9. #include "db/blob_index.h"
  10. #include "db/column_family.h"
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/flush_job.h"
  13. #include "db/version_set.h"
  14. #include "file/writable_file_writer.h"
  15. #include "rocksdb/cache.h"
  16. #include "rocksdb/write_buffer_manager.h"
  17. #include "table/mock_table.h"
  18. #include "test_util/testharness.h"
  19. #include "test_util/testutil.h"
  20. #include "util/string_util.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. // TODO(icanadi) Mock out everything else:
  23. // 1. VersionSet
  24. // 2. Memtable
  25. class FlushJobTest : public testing::Test {
  26. public:
  27. FlushJobTest()
  28. : env_(Env::Default()),
  29. fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
  30. dbname_(test::PerThreadDBPath("flush_job_test")),
  31. options_(),
  32. db_options_(options_),
  33. column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
  34. table_cache_(NewLRUCache(50000, 16)),
  35. write_buffer_manager_(db_options_.db_write_buffer_size),
  36. shutting_down_(false),
  37. mock_table_factory_(new mock::MockTableFactory()) {
  38. EXPECT_OK(env_->CreateDirIfMissing(dbname_));
  39. db_options_.db_paths.emplace_back(dbname_,
  40. std::numeric_limits<uint64_t>::max());
  41. db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  42. // TODO(icanadi) Remove this once we mock out VersionSet
  43. NewDB();
  44. std::vector<ColumnFamilyDescriptor> column_families;
  45. cf_options_.table_factory = mock_table_factory_;
  46. for (const auto& cf_name : column_family_names_) {
  47. column_families.emplace_back(cf_name, cf_options_);
  48. }
  49. db_options_.env = env_;
  50. db_options_.fs = fs_;
  51. versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
  52. table_cache_.get(), &write_buffer_manager_,
  53. &write_controller_,
  54. /*block_cache_tracer=*/nullptr));
  55. EXPECT_OK(versions_->Recover(column_families, false));
  56. }
  57. void NewDB() {
  58. SetIdentityFile(env_, dbname_);
  59. VersionEdit new_db;
  60. if (db_options_.write_dbid_to_manifest) {
  61. DBImpl* impl = new DBImpl(DBOptions(), dbname_);
  62. std::string db_id;
  63. impl->GetDbIdentityFromIdentityFile(&db_id);
  64. new_db.SetDBId(db_id);
  65. }
  66. new_db.SetLogNumber(0);
  67. new_db.SetNextFile(2);
  68. new_db.SetLastSequence(0);
  69. autovector<VersionEdit> new_cfs;
  70. SequenceNumber last_seq = 1;
  71. uint32_t cf_id = 1;
  72. for (size_t i = 1; i != column_family_names_.size(); ++i) {
  73. VersionEdit new_cf;
  74. new_cf.AddColumnFamily(column_family_names_[i]);
  75. new_cf.SetColumnFamily(cf_id++);
  76. new_cf.SetLogNumber(0);
  77. new_cf.SetNextFile(2);
  78. new_cf.SetLastSequence(last_seq++);
  79. new_cfs.emplace_back(new_cf);
  80. }
  81. const std::string manifest = DescriptorFileName(dbname_, 1);
  82. std::unique_ptr<WritableFile> file;
  83. Status s = env_->NewWritableFile(
  84. manifest, &file, env_->OptimizeForManifestWrite(env_options_));
  85. ASSERT_OK(s);
  86. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  87. NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions()));
  88. {
  89. log::Writer log(std::move(file_writer), 0, false);
  90. std::string record;
  91. new_db.EncodeTo(&record);
  92. s = log.AddRecord(record);
  93. for (const auto& e : new_cfs) {
  94. record.clear();
  95. e.EncodeTo(&record);
  96. s = log.AddRecord(record);
  97. ASSERT_OK(s);
  98. }
  99. }
  100. ASSERT_OK(s);
  101. // Make "CURRENT" file that points to the new manifest file.
  102. s = SetCurrentFile(env_, dbname_, 1, nullptr);
  103. }
  104. Env* env_;
  105. std::shared_ptr<FileSystem> fs_;
  106. std::string dbname_;
  107. EnvOptions env_options_;
  108. Options options_;
  109. ImmutableDBOptions db_options_;
  110. const std::vector<std::string> column_family_names_;
  111. std::shared_ptr<Cache> table_cache_;
  112. WriteController write_controller_;
  113. WriteBufferManager write_buffer_manager_;
  114. ColumnFamilyOptions cf_options_;
  115. std::unique_ptr<VersionSet> versions_;
  116. InstrumentedMutex mutex_;
  117. std::atomic<bool> shutting_down_;
  118. std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
  119. };
  120. TEST_F(FlushJobTest, Empty) {
  121. JobContext job_context(0);
  122. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  123. EventLogger event_logger(db_options_.info_log.get());
  124. SnapshotChecker* snapshot_checker = nullptr; // not relavant
  125. FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
  126. db_options_, *cfd->GetLatestMutableCFOptions(),
  127. nullptr /* memtable_id */, env_options_, versions_.get(),
  128. &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
  129. snapshot_checker, &job_context, nullptr, nullptr, nullptr,
  130. kNoCompression, nullptr, &event_logger, false,
  131. true /* sync_output_directory */,
  132. true /* write_manifest */, Env::Priority::USER);
  133. {
  134. InstrumentedMutexLock l(&mutex_);
  135. flush_job.PickMemTable();
  136. ASSERT_OK(flush_job.Run());
  137. }
  138. job_context.Clean();
  139. }
  140. TEST_F(FlushJobTest, NonEmpty) {
  141. JobContext job_context(0);
  142. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  143. auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
  144. kMaxSequenceNumber);
  145. new_mem->Ref();
  146. auto inserted_keys = mock::MakeMockFile();
  147. // Test data:
  148. // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
  149. // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
  150. // range-delete "9995" -> "9999" at seqno 10000
  151. // blob references with seqnos 10001..10006
  152. for (int i = 1; i < 10000; ++i) {
  153. std::string key(ToString((i + 1000) % 10000));
  154. std::string value("value" + key);
  155. new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
  156. if ((i + 1000) % 10000 < 9995) {
  157. InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
  158. inserted_keys.insert({internal_key.Encode().ToString(), value});
  159. }
  160. }
  161. {
  162. new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
  163. InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
  164. inserted_keys.insert({internal_key.Encode().ToString(), "9999a"});
  165. }
  166. #ifndef ROCKSDB_LITE
  167. // Note: the first two blob references will not be considered when resolving
  168. // the oldest blob file referenced (the first one is inlined TTL, while the
  169. // second one is TTL and thus points to a TTL blob file).
  170. constexpr std::array<uint64_t, 6> blob_file_numbers{
  171. kInvalidBlobFileNumber, 5, 103, 17, 102, 101};
  172. for (size_t i = 0; i < blob_file_numbers.size(); ++i) {
  173. std::string key(ToString(i + 10001));
  174. std::string blob_index;
  175. if (i == 0) {
  176. BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL,
  177. "foo");
  178. } else if (i == 1) {
  179. BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL,
  180. blob_file_numbers[i], /* offset */ i << 10,
  181. /* size */ i << 20, kNoCompression);
  182. } else {
  183. BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
  184. /* offset */ i << 10, /* size */ i << 20,
  185. kNoCompression);
  186. }
  187. const SequenceNumber seq(i + 10001);
  188. new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
  189. InternalKey internal_key(key, seq, kTypeBlobIndex);
  190. inserted_keys.emplace_hint(inserted_keys.end(),
  191. internal_key.Encode().ToString(), blob_index);
  192. }
  193. #endif
  194. autovector<MemTable*> to_delete;
  195. cfd->imm()->Add(new_mem, &to_delete);
  196. for (auto& m : to_delete) {
  197. delete m;
  198. }
  199. EventLogger event_logger(db_options_.info_log.get());
  200. SnapshotChecker* snapshot_checker = nullptr; // not relavant
  201. FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
  202. db_options_, *cfd->GetLatestMutableCFOptions(),
  203. nullptr /* memtable_id */, env_options_, versions_.get(),
  204. &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
  205. snapshot_checker, &job_context, nullptr, nullptr, nullptr,
  206. kNoCompression, db_options_.statistics.get(),
  207. &event_logger, true, true /* sync_output_directory */,
  208. true /* write_manifest */, Env::Priority::USER);
  209. HistogramData hist;
  210. FileMetaData file_meta;
  211. mutex_.Lock();
  212. flush_job.PickMemTable();
  213. ASSERT_OK(flush_job.Run(nullptr, &file_meta));
  214. mutex_.Unlock();
  215. db_options_.statistics->histogramData(FLUSH_TIME, &hist);
  216. ASSERT_GT(hist.average, 0.0);
  217. ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
  218. ASSERT_EQ("9999a", file_meta.largest.user_key().ToString());
  219. ASSERT_EQ(1, file_meta.fd.smallest_seqno);
  220. #ifndef ROCKSDB_LITE
  221. ASSERT_EQ(10006, file_meta.fd.largest_seqno);
  222. ASSERT_EQ(17, file_meta.oldest_blob_file_number);
  223. #else
  224. ASSERT_EQ(10000, file_meta.fd.largest_seqno);
  225. #endif
  226. mock_table_factory_->AssertSingleFile(inserted_keys);
  227. job_context.Clean();
  228. }
  229. TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
  230. const size_t num_mems = 2;
  231. const size_t num_mems_to_flush = 1;
  232. const size_t num_keys_per_table = 100;
  233. JobContext job_context(0);
  234. ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
  235. std::vector<uint64_t> memtable_ids;
  236. std::vector<MemTable*> new_mems;
  237. for (size_t i = 0; i != num_mems; ++i) {
  238. MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
  239. kMaxSequenceNumber);
  240. mem->SetID(i);
  241. mem->Ref();
  242. new_mems.emplace_back(mem);
  243. memtable_ids.push_back(mem->GetID());
  244. for (size_t j = 0; j < num_keys_per_table; ++j) {
  245. std::string key(ToString(j + i * num_keys_per_table));
  246. std::string value("value" + key);
  247. mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
  248. value);
  249. }
  250. }
  251. autovector<MemTable*> to_delete;
  252. for (auto mem : new_mems) {
  253. cfd->imm()->Add(mem, &to_delete);
  254. }
  255. EventLogger event_logger(db_options_.info_log.get());
  256. SnapshotChecker* snapshot_checker = nullptr; // not relavant
  257. assert(memtable_ids.size() == num_mems);
  258. uint64_t smallest_memtable_id = memtable_ids.front();
  259. uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
  260. FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
  261. db_options_, *cfd->GetLatestMutableCFOptions(),
  262. &flush_memtable_id, env_options_, versions_.get(), &mutex_,
  263. &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
  264. &job_context, nullptr, nullptr, nullptr, kNoCompression,
  265. db_options_.statistics.get(), &event_logger, true,
  266. true /* sync_output_directory */,
  267. true /* write_manifest */, Env::Priority::USER);
  268. HistogramData hist;
  269. FileMetaData file_meta;
  270. mutex_.Lock();
  271. flush_job.PickMemTable();
  272. ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
  273. mutex_.Unlock();
  274. db_options_.statistics->histogramData(FLUSH_TIME, &hist);
  275. ASSERT_GT(hist.average, 0.0);
  276. ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
  277. ASSERT_EQ("99", file_meta.largest.user_key().ToString());
  278. ASSERT_EQ(0, file_meta.fd.smallest_seqno);
  279. ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
  280. file_meta.fd.largest_seqno);
  281. ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number);
  282. for (auto m : to_delete) {
  283. delete m;
  284. }
  285. to_delete.clear();
  286. job_context.Clean();
  287. }
  288. TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
  289. autovector<ColumnFamilyData*> all_cfds;
  290. for (auto cfd : *versions_->GetColumnFamilySet()) {
  291. all_cfds.push_back(cfd);
  292. }
  293. const std::vector<size_t> num_memtables = {2, 1, 3};
  294. assert(num_memtables.size() == column_family_names_.size());
  295. const size_t num_keys_per_memtable = 1000;
  296. JobContext job_context(0);
  297. std::vector<uint64_t> memtable_ids;
  298. std::vector<SequenceNumber> smallest_seqs;
  299. std::vector<SequenceNumber> largest_seqs;
  300. autovector<MemTable*> to_delete;
  301. SequenceNumber curr_seqno = 0;
  302. size_t k = 0;
  303. for (auto cfd : all_cfds) {
  304. smallest_seqs.push_back(curr_seqno);
  305. for (size_t i = 0; i != num_memtables[k]; ++i) {
  306. MemTable* mem = cfd->ConstructNewMemtable(
  307. *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
  308. mem->SetID(i);
  309. mem->Ref();
  310. for (size_t j = 0; j != num_keys_per_memtable; ++j) {
  311. std::string key(ToString(j + i * num_keys_per_memtable));
  312. std::string value("value" + key);
  313. mem->Add(curr_seqno++, kTypeValue, key, value);
  314. }
  315. cfd->imm()->Add(mem, &to_delete);
  316. }
  317. largest_seqs.push_back(curr_seqno - 1);
  318. memtable_ids.push_back(num_memtables[k++] - 1);
  319. }
  320. EventLogger event_logger(db_options_.info_log.get());
  321. SnapshotChecker* snapshot_checker = nullptr; // not relevant
  322. std::vector<std::unique_ptr<FlushJob>> flush_jobs;
  323. k = 0;
  324. for (auto cfd : all_cfds) {
  325. std::vector<SequenceNumber> snapshot_seqs;
  326. flush_jobs.emplace_back(new FlushJob(
  327. dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
  328. &memtable_ids[k], env_options_, versions_.get(), &mutex_,
  329. &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
  330. &job_context, nullptr, nullptr, nullptr, kNoCompression,
  331. db_options_.statistics.get(), &event_logger, true,
  332. false /* sync_output_directory */, false /* write_manifest */,
  333. Env::Priority::USER));
  334. k++;
  335. }
  336. HistogramData hist;
  337. std::vector<FileMetaData> file_metas;
  338. // Call reserve to avoid auto-resizing
  339. file_metas.reserve(flush_jobs.size());
  340. mutex_.Lock();
  341. for (auto& job : flush_jobs) {
  342. job->PickMemTable();
  343. }
  344. for (auto& job : flush_jobs) {
  345. FileMetaData meta;
  346. // Run will release and re-acquire mutex
  347. ASSERT_OK(job->Run(nullptr /**/, &meta));
  348. file_metas.emplace_back(meta);
  349. }
  350. autovector<FileMetaData*> file_meta_ptrs;
  351. for (auto& meta : file_metas) {
  352. file_meta_ptrs.push_back(&meta);
  353. }
  354. autovector<const autovector<MemTable*>*> mems_list;
  355. for (size_t i = 0; i != all_cfds.size(); ++i) {
  356. const auto& mems = flush_jobs[i]->GetMemTables();
  357. mems_list.push_back(&mems);
  358. }
  359. autovector<const MutableCFOptions*> mutable_cf_options_list;
  360. for (auto cfd : all_cfds) {
  361. mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
  362. }
  363. Status s = InstallMemtableAtomicFlushResults(
  364. nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
  365. versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
  366. nullptr /* db_directory */, nullptr /* log_buffer */);
  367. ASSERT_OK(s);
  368. mutex_.Unlock();
  369. db_options_.statistics->histogramData(FLUSH_TIME, &hist);
  370. ASSERT_GT(hist.average, 0.0);
  371. k = 0;
  372. for (const auto& file_meta : file_metas) {
  373. ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
  374. ASSERT_EQ("999", file_meta.largest.user_key()
  375. .ToString()); // max key by bytewise comparator
  376. ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
  377. ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
  378. // Verify that imm is empty
  379. ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
  380. all_cfds[k]->imm()->GetEarliestMemTableID());
  381. ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
  382. ++k;
  383. }
  384. for (auto m : to_delete) {
  385. delete m;
  386. }
  387. to_delete.clear();
  388. job_context.Clean();
  389. }
  390. TEST_F(FlushJobTest, Snapshots) {
  391. JobContext job_context(0);
  392. auto cfd = versions_->GetColumnFamilySet()->GetDefault();
  393. auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
  394. kMaxSequenceNumber);
  395. std::set<SequenceNumber> snapshots_set;
  396. int keys = 10000;
  397. int max_inserts_per_keys = 8;
  398. Random rnd(301);
  399. for (int i = 0; i < keys / 2; ++i) {
  400. snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
  401. }
  402. // set has already removed the duplicate snapshots
  403. std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
  404. snapshots_set.end());
  405. new_mem->Ref();
  406. SequenceNumber current_seqno = 0;
  407. auto inserted_keys = mock::MakeMockFile();
  408. for (int i = 1; i < keys; ++i) {
  409. std::string key(ToString(i));
  410. int insertions = rnd.Uniform(max_inserts_per_keys);
  411. for (int j = 0; j < insertions; ++j) {
  412. std::string value(test::RandomHumanReadableString(&rnd, 10));
  413. auto seqno = ++current_seqno;
  414. new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
  415. // a key is visible only if:
  416. // 1. it's the last one written (j == insertions - 1)
  417. // 2. there's a snapshot pointing at it
  418. bool visible = (j == insertions - 1) ||
  419. (snapshots_set.find(seqno) != snapshots_set.end());
  420. if (visible) {
  421. InternalKey internal_key(key, seqno, kTypeValue);
  422. inserted_keys.insert({internal_key.Encode().ToString(), value});
  423. }
  424. }
  425. }
  426. autovector<MemTable*> to_delete;
  427. cfd->imm()->Add(new_mem, &to_delete);
  428. for (auto& m : to_delete) {
  429. delete m;
  430. }
  431. EventLogger event_logger(db_options_.info_log.get());
  432. SnapshotChecker* snapshot_checker = nullptr; // not relavant
  433. FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
  434. db_options_, *cfd->GetLatestMutableCFOptions(),
  435. nullptr /* memtable_id */, env_options_, versions_.get(),
  436. &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
  437. snapshot_checker, &job_context, nullptr, nullptr, nullptr,
  438. kNoCompression, db_options_.statistics.get(),
  439. &event_logger, true, true /* sync_output_directory */,
  440. true /* write_manifest */, Env::Priority::USER);
  441. mutex_.Lock();
  442. flush_job.PickMemTable();
  443. ASSERT_OK(flush_job.Run());
  444. mutex_.Unlock();
  445. mock_table_factory_->AssertSingleFile(inserted_keys);
  446. HistogramData hist;
  447. db_options_.statistics->histogramData(FLUSH_TIME, &hist);
  448. ASSERT_GT(hist.average, 0.0);
  449. job_context.Clean();
  450. }
  451. } // namespace ROCKSDB_NAMESPACE
  452. int main(int argc, char** argv) {
  453. ::testing::InitGoogleTest(&argc, argv);
  454. return RUN_ALL_TESTS();
  455. }