| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <algorithm>
- #include <array>
- #include <map>
- #include <string>
- #include "db/blob_index.h"
- #include "db/column_family.h"
- #include "db/db_impl/db_impl.h"
- #include "db/flush_job.h"
- #include "db/version_set.h"
- #include "file/writable_file_writer.h"
- #include "rocksdb/cache.h"
- #include "rocksdb/write_buffer_manager.h"
- #include "table/mock_table.h"
- #include "test_util/testharness.h"
- #include "test_util/testutil.h"
- #include "util/string_util.h"
- namespace ROCKSDB_NAMESPACE {
- // TODO(icanadi) Mock out everything else:
- // 1. VersionSet
- // 2. Memtable
- class FlushJobTest : public testing::Test {
- public:
- FlushJobTest()
- : env_(Env::Default()),
- fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
- dbname_(test::PerThreadDBPath("flush_job_test")),
- options_(),
- db_options_(options_),
- column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
- table_cache_(NewLRUCache(50000, 16)),
- write_buffer_manager_(db_options_.db_write_buffer_size),
- shutting_down_(false),
- mock_table_factory_(new mock::MockTableFactory()) {
- EXPECT_OK(env_->CreateDirIfMissing(dbname_));
- db_options_.db_paths.emplace_back(dbname_,
- std::numeric_limits<uint64_t>::max());
- db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
- // TODO(icanadi) Remove this once we mock out VersionSet
- NewDB();
- std::vector<ColumnFamilyDescriptor> column_families;
- cf_options_.table_factory = mock_table_factory_;
- for (const auto& cf_name : column_family_names_) {
- column_families.emplace_back(cf_name, cf_options_);
- }
- db_options_.env = env_;
- db_options_.fs = fs_;
- versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
- table_cache_.get(), &write_buffer_manager_,
- &write_controller_,
- /*block_cache_tracer=*/nullptr));
- EXPECT_OK(versions_->Recover(column_families, false));
- }
- void NewDB() {
- SetIdentityFile(env_, dbname_);
- VersionEdit new_db;
- if (db_options_.write_dbid_to_manifest) {
- DBImpl* impl = new DBImpl(DBOptions(), dbname_);
- std::string db_id;
- impl->GetDbIdentityFromIdentityFile(&db_id);
- new_db.SetDBId(db_id);
- }
- new_db.SetLogNumber(0);
- new_db.SetNextFile(2);
- new_db.SetLastSequence(0);
- autovector<VersionEdit> new_cfs;
- SequenceNumber last_seq = 1;
- uint32_t cf_id = 1;
- for (size_t i = 1; i != column_family_names_.size(); ++i) {
- VersionEdit new_cf;
- new_cf.AddColumnFamily(column_family_names_[i]);
- new_cf.SetColumnFamily(cf_id++);
- new_cf.SetLogNumber(0);
- new_cf.SetNextFile(2);
- new_cf.SetLastSequence(last_seq++);
- new_cfs.emplace_back(new_cf);
- }
- const std::string manifest = DescriptorFileName(dbname_, 1);
- std::unique_ptr<WritableFile> file;
- Status s = env_->NewWritableFile(
- manifest, &file, env_->OptimizeForManifestWrite(env_options_));
- ASSERT_OK(s);
- std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions()));
- {
- log::Writer log(std::move(file_writer), 0, false);
- std::string record;
- new_db.EncodeTo(&record);
- s = log.AddRecord(record);
- for (const auto& e : new_cfs) {
- record.clear();
- e.EncodeTo(&record);
- s = log.AddRecord(record);
- ASSERT_OK(s);
- }
- }
- ASSERT_OK(s);
- // Make "CURRENT" file that points to the new manifest file.
- s = SetCurrentFile(env_, dbname_, 1, nullptr);
- }
- Env* env_;
- std::shared_ptr<FileSystem> fs_;
- std::string dbname_;
- EnvOptions env_options_;
- Options options_;
- ImmutableDBOptions db_options_;
- const std::vector<std::string> column_family_names_;
- std::shared_ptr<Cache> table_cache_;
- WriteController write_controller_;
- WriteBufferManager write_buffer_manager_;
- ColumnFamilyOptions cf_options_;
- std::unique_ptr<VersionSet> versions_;
- InstrumentedMutex mutex_;
- std::atomic<bool> shutting_down_;
- std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
- };
- TEST_F(FlushJobTest, Empty) {
- JobContext job_context(0);
- auto cfd = versions_->GetColumnFamilySet()->GetDefault();
- EventLogger event_logger(db_options_.info_log.get());
- SnapshotChecker* snapshot_checker = nullptr; // not relavant
- FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
- db_options_, *cfd->GetLatestMutableCFOptions(),
- nullptr /* memtable_id */, env_options_, versions_.get(),
- &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr,
- kNoCompression, nullptr, &event_logger, false,
- true /* sync_output_directory */,
- true /* write_manifest */, Env::Priority::USER);
- {
- InstrumentedMutexLock l(&mutex_);
- flush_job.PickMemTable();
- ASSERT_OK(flush_job.Run());
- }
- job_context.Clean();
- }
- TEST_F(FlushJobTest, NonEmpty) {
- JobContext job_context(0);
- auto cfd = versions_->GetColumnFamilySet()->GetDefault();
- auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
- kMaxSequenceNumber);
- new_mem->Ref();
- auto inserted_keys = mock::MakeMockFile();
- // Test data:
- // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
- // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
- // range-delete "9995" -> "9999" at seqno 10000
- // blob references with seqnos 10001..10006
- for (int i = 1; i < 10000; ++i) {
- std::string key(ToString((i + 1000) % 10000));
- std::string value("value" + key);
- new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
- if ((i + 1000) % 10000 < 9995) {
- InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
- inserted_keys.insert({internal_key.Encode().ToString(), value});
- }
- }
- {
- new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
- InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
- inserted_keys.insert({internal_key.Encode().ToString(), "9999a"});
- }
- #ifndef ROCKSDB_LITE
- // Note: the first two blob references will not be considered when resolving
- // the oldest blob file referenced (the first one is inlined TTL, while the
- // second one is TTL and thus points to a TTL blob file).
- constexpr std::array<uint64_t, 6> blob_file_numbers{
- kInvalidBlobFileNumber, 5, 103, 17, 102, 101};
- for (size_t i = 0; i < blob_file_numbers.size(); ++i) {
- std::string key(ToString(i + 10001));
- std::string blob_index;
- if (i == 0) {
- BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL,
- "foo");
- } else if (i == 1) {
- BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL,
- blob_file_numbers[i], /* offset */ i << 10,
- /* size */ i << 20, kNoCompression);
- } else {
- BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
- /* offset */ i << 10, /* size */ i << 20,
- kNoCompression);
- }
- const SequenceNumber seq(i + 10001);
- new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
- InternalKey internal_key(key, seq, kTypeBlobIndex);
- inserted_keys.emplace_hint(inserted_keys.end(),
- internal_key.Encode().ToString(), blob_index);
- }
- #endif
- autovector<MemTable*> to_delete;
- cfd->imm()->Add(new_mem, &to_delete);
- for (auto& m : to_delete) {
- delete m;
- }
- EventLogger event_logger(db_options_.info_log.get());
- SnapshotChecker* snapshot_checker = nullptr; // not relavant
- FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
- db_options_, *cfd->GetLatestMutableCFOptions(),
- nullptr /* memtable_id */, env_options_, versions_.get(),
- &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr,
- kNoCompression, db_options_.statistics.get(),
- &event_logger, true, true /* sync_output_directory */,
- true /* write_manifest */, Env::Priority::USER);
- HistogramData hist;
- FileMetaData file_meta;
- mutex_.Lock();
- flush_job.PickMemTable();
- ASSERT_OK(flush_job.Run(nullptr, &file_meta));
- mutex_.Unlock();
- db_options_.statistics->histogramData(FLUSH_TIME, &hist);
- ASSERT_GT(hist.average, 0.0);
- ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
- ASSERT_EQ("9999a", file_meta.largest.user_key().ToString());
- ASSERT_EQ(1, file_meta.fd.smallest_seqno);
- #ifndef ROCKSDB_LITE
- ASSERT_EQ(10006, file_meta.fd.largest_seqno);
- ASSERT_EQ(17, file_meta.oldest_blob_file_number);
- #else
- ASSERT_EQ(10000, file_meta.fd.largest_seqno);
- #endif
- mock_table_factory_->AssertSingleFile(inserted_keys);
- job_context.Clean();
- }
- TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
- const size_t num_mems = 2;
- const size_t num_mems_to_flush = 1;
- const size_t num_keys_per_table = 100;
- JobContext job_context(0);
- ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
- std::vector<uint64_t> memtable_ids;
- std::vector<MemTable*> new_mems;
- for (size_t i = 0; i != num_mems; ++i) {
- MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
- kMaxSequenceNumber);
- mem->SetID(i);
- mem->Ref();
- new_mems.emplace_back(mem);
- memtable_ids.push_back(mem->GetID());
- for (size_t j = 0; j < num_keys_per_table; ++j) {
- std::string key(ToString(j + i * num_keys_per_table));
- std::string value("value" + key);
- mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
- value);
- }
- }
- autovector<MemTable*> to_delete;
- for (auto mem : new_mems) {
- cfd->imm()->Add(mem, &to_delete);
- }
- EventLogger event_logger(db_options_.info_log.get());
- SnapshotChecker* snapshot_checker = nullptr; // not relavant
- assert(memtable_ids.size() == num_mems);
- uint64_t smallest_memtable_id = memtable_ids.front();
- uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
- FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
- db_options_, *cfd->GetLatestMutableCFOptions(),
- &flush_memtable_id, env_options_, versions_.get(), &mutex_,
- &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
- &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- true /* sync_output_directory */,
- true /* write_manifest */, Env::Priority::USER);
- HistogramData hist;
- FileMetaData file_meta;
- mutex_.Lock();
- flush_job.PickMemTable();
- ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
- mutex_.Unlock();
- db_options_.statistics->histogramData(FLUSH_TIME, &hist);
- ASSERT_GT(hist.average, 0.0);
- ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
- ASSERT_EQ("99", file_meta.largest.user_key().ToString());
- ASSERT_EQ(0, file_meta.fd.smallest_seqno);
- ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
- file_meta.fd.largest_seqno);
- ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number);
- for (auto m : to_delete) {
- delete m;
- }
- to_delete.clear();
- job_context.Clean();
- }
- TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
- autovector<ColumnFamilyData*> all_cfds;
- for (auto cfd : *versions_->GetColumnFamilySet()) {
- all_cfds.push_back(cfd);
- }
- const std::vector<size_t> num_memtables = {2, 1, 3};
- assert(num_memtables.size() == column_family_names_.size());
- const size_t num_keys_per_memtable = 1000;
- JobContext job_context(0);
- std::vector<uint64_t> memtable_ids;
- std::vector<SequenceNumber> smallest_seqs;
- std::vector<SequenceNumber> largest_seqs;
- autovector<MemTable*> to_delete;
- SequenceNumber curr_seqno = 0;
- size_t k = 0;
- for (auto cfd : all_cfds) {
- smallest_seqs.push_back(curr_seqno);
- for (size_t i = 0; i != num_memtables[k]; ++i) {
- MemTable* mem = cfd->ConstructNewMemtable(
- *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
- mem->SetID(i);
- mem->Ref();
- for (size_t j = 0; j != num_keys_per_memtable; ++j) {
- std::string key(ToString(j + i * num_keys_per_memtable));
- std::string value("value" + key);
- mem->Add(curr_seqno++, kTypeValue, key, value);
- }
- cfd->imm()->Add(mem, &to_delete);
- }
- largest_seqs.push_back(curr_seqno - 1);
- memtable_ids.push_back(num_memtables[k++] - 1);
- }
- EventLogger event_logger(db_options_.info_log.get());
- SnapshotChecker* snapshot_checker = nullptr; // not relevant
- std::vector<std::unique_ptr<FlushJob>> flush_jobs;
- k = 0;
- for (auto cfd : all_cfds) {
- std::vector<SequenceNumber> snapshot_seqs;
- flush_jobs.emplace_back(new FlushJob(
- dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
- &memtable_ids[k], env_options_, versions_.get(), &mutex_,
- &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
- &job_context, nullptr, nullptr, nullptr, kNoCompression,
- db_options_.statistics.get(), &event_logger, true,
- false /* sync_output_directory */, false /* write_manifest */,
- Env::Priority::USER));
- k++;
- }
- HistogramData hist;
- std::vector<FileMetaData> file_metas;
- // Call reserve to avoid auto-resizing
- file_metas.reserve(flush_jobs.size());
- mutex_.Lock();
- for (auto& job : flush_jobs) {
- job->PickMemTable();
- }
- for (auto& job : flush_jobs) {
- FileMetaData meta;
- // Run will release and re-acquire mutex
- ASSERT_OK(job->Run(nullptr /**/, &meta));
- file_metas.emplace_back(meta);
- }
- autovector<FileMetaData*> file_meta_ptrs;
- for (auto& meta : file_metas) {
- file_meta_ptrs.push_back(&meta);
- }
- autovector<const autovector<MemTable*>*> mems_list;
- for (size_t i = 0; i != all_cfds.size(); ++i) {
- const auto& mems = flush_jobs[i]->GetMemTables();
- mems_list.push_back(&mems);
- }
- autovector<const MutableCFOptions*> mutable_cf_options_list;
- for (auto cfd : all_cfds) {
- mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
- }
- Status s = InstallMemtableAtomicFlushResults(
- nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
- versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
- nullptr /* db_directory */, nullptr /* log_buffer */);
- ASSERT_OK(s);
- mutex_.Unlock();
- db_options_.statistics->histogramData(FLUSH_TIME, &hist);
- ASSERT_GT(hist.average, 0.0);
- k = 0;
- for (const auto& file_meta : file_metas) {
- ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
- ASSERT_EQ("999", file_meta.largest.user_key()
- .ToString()); // max key by bytewise comparator
- ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
- ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
- // Verify that imm is empty
- ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
- all_cfds[k]->imm()->GetEarliestMemTableID());
- ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
- ++k;
- }
- for (auto m : to_delete) {
- delete m;
- }
- to_delete.clear();
- job_context.Clean();
- }
- TEST_F(FlushJobTest, Snapshots) {
- JobContext job_context(0);
- auto cfd = versions_->GetColumnFamilySet()->GetDefault();
- auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
- kMaxSequenceNumber);
- std::set<SequenceNumber> snapshots_set;
- int keys = 10000;
- int max_inserts_per_keys = 8;
- Random rnd(301);
- for (int i = 0; i < keys / 2; ++i) {
- snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
- }
- // set has already removed the duplicate snapshots
- std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
- snapshots_set.end());
- new_mem->Ref();
- SequenceNumber current_seqno = 0;
- auto inserted_keys = mock::MakeMockFile();
- for (int i = 1; i < keys; ++i) {
- std::string key(ToString(i));
- int insertions = rnd.Uniform(max_inserts_per_keys);
- for (int j = 0; j < insertions; ++j) {
- std::string value(test::RandomHumanReadableString(&rnd, 10));
- auto seqno = ++current_seqno;
- new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
- // a key is visible only if:
- // 1. it's the last one written (j == insertions - 1)
- // 2. there's a snapshot pointing at it
- bool visible = (j == insertions - 1) ||
- (snapshots_set.find(seqno) != snapshots_set.end());
- if (visible) {
- InternalKey internal_key(key, seqno, kTypeValue);
- inserted_keys.insert({internal_key.Encode().ToString(), value});
- }
- }
- }
- autovector<MemTable*> to_delete;
- cfd->imm()->Add(new_mem, &to_delete);
- for (auto& m : to_delete) {
- delete m;
- }
- EventLogger event_logger(db_options_.info_log.get());
- SnapshotChecker* snapshot_checker = nullptr; // not relavant
- FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
- db_options_, *cfd->GetLatestMutableCFOptions(),
- nullptr /* memtable_id */, env_options_, versions_.get(),
- &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
- snapshot_checker, &job_context, nullptr, nullptr, nullptr,
- kNoCompression, db_options_.statistics.get(),
- &event_logger, true, true /* sync_output_directory */,
- true /* write_manifest */, Env::Priority::USER);
- mutex_.Lock();
- flush_job.PickMemTable();
- ASSERT_OK(flush_job.Run());
- mutex_.Unlock();
- mock_table_factory_->AssertSingleFile(inserted_keys);
- HistogramData hist;
- db_options_.statistics->histogramData(FLUSH_TIME, &hist);
- ASSERT_GT(hist.average, 0.0);
- job_context.Clean();
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|