| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #ifdef GFLAGS
- #include "db_stress_tool/db_stress_common.h"
- namespace ROCKSDB_NAMESPACE {
- class NonBatchedOpsStressTest : public StressTest {
- public:
- NonBatchedOpsStressTest() {}
- virtual ~NonBatchedOpsStressTest() {}
- void VerifyDb(ThreadState* thread) const override {
- ReadOptions options(FLAGS_verify_checksum, true);
- auto shared = thread->shared;
- const int64_t max_key = shared->GetMaxKey();
- const int64_t keys_per_thread = max_key / shared->GetNumThreads();
- int64_t start = keys_per_thread * thread->tid;
- int64_t end = start + keys_per_thread;
- uint64_t prefix_to_use =
- (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
- if (thread->tid == shared->GetNumThreads() - 1) {
- end = max_key;
- }
- for (size_t cf = 0; cf < column_families_.size(); ++cf) {
- if (thread->shared->HasVerificationFailedYet()) {
- break;
- }
- if (!thread->rand.OneIn(2)) {
- // Use iterator to verify this range
- Slice prefix;
- std::string seek_key = Key(start);
- std::unique_ptr<Iterator> iter(
- db_->NewIterator(options, column_families_[cf]));
- iter->Seek(seek_key);
- prefix = Slice(seek_key.data(), prefix_to_use);
- for (auto i = start; i < end; i++) {
- if (thread->shared->HasVerificationFailedYet()) {
- break;
- }
- std::string from_db;
- std::string keystr = Key(i);
- Slice k = keystr;
- Slice pfx = Slice(keystr.data(), prefix_to_use);
- // Reseek when the prefix changes
- if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
- iter->Seek(k);
- seek_key = keystr;
- prefix = Slice(seek_key.data(), prefix_to_use);
- }
- Status s = iter->status();
- if (iter->Valid()) {
- Slice iter_key = iter->key();
- if (iter->key().compare(k) > 0) {
- s = Status::NotFound(Slice());
- } else if (iter->key().compare(k) == 0) {
- from_db = iter->value().ToString();
- iter->Next();
- } else if (iter_key.compare(k) < 0) {
- VerificationAbort(shared, "An out of range key was found",
- static_cast<int>(cf), i);
- }
- } else {
- // The iterator found no value for the key in question, so do not
- // move to the next item in the iterator
- s = Status::NotFound();
- }
- VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
- true);
- if (from_db.length()) {
- PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
- from_db.data(), from_db.length());
- }
- }
- } else {
- // Use Get to verify this range
- for (auto i = start; i < end; i++) {
- if (thread->shared->HasVerificationFailedYet()) {
- break;
- }
- std::string from_db;
- std::string keystr = Key(i);
- Slice k = keystr;
- Status s = db_->Get(options, column_families_[cf], k, &from_db);
- VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
- true);
- if (from_db.length()) {
- PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
- from_db.data(), from_db.length());
- }
- }
- }
- }
- }
- void MaybeClearOneColumnFamily(ThreadState* thread) override {
- if (FLAGS_column_families > 1) {
- if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
- // drop column family and then create it again (can't drop default)
- int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
- std::string new_name = ToString(new_column_family_name_.fetch_add(1));
- {
- MutexLock l(thread->shared->GetMutex());
- fprintf(
- stdout,
- "[CF %d] Dropping and recreating column family. new name: %s\n",
- cf, new_name.c_str());
- }
- thread->shared->LockColumnFamily(cf);
- Status s = db_->DropColumnFamily(column_families_[cf]);
- delete column_families_[cf];
- if (!s.ok()) {
- fprintf(stderr, "dropping column family error: %s\n",
- s.ToString().c_str());
- std::terminate();
- }
- s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
- &column_families_[cf]);
- column_family_names_[cf] = new_name;
- thread->shared->ClearColumnFamily(cf);
- if (!s.ok()) {
- fprintf(stderr, "creating column family error: %s\n",
- s.ToString().c_str());
- std::terminate();
- }
- thread->shared->UnlockColumnFamily(cf);
- }
- }
- }
- bool ShouldAcquireMutexOnKey() const override { return true; }
- Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- auto cfh = column_families_[rand_column_families[0]];
- std::string key_str = Key(rand_keys[0]);
- Slice key = key_str;
- std::string from_db;
- Status s = db_->Get(read_opts, cfh, key, &from_db);
- if (s.ok()) {
- // found case
- thread->stats.AddGets(1, 1);
- } else if (s.IsNotFound()) {
- // not found case
- thread->stats.AddGets(1, 0);
- } else {
- // errors case
- fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- return s;
- }
- std::vector<Status> TestMultiGet(
- ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- size_t num_keys = rand_keys.size();
- std::vector<std::string> key_str;
- std::vector<Slice> keys;
- key_str.reserve(num_keys);
- keys.reserve(num_keys);
- std::vector<PinnableSlice> values(num_keys);
- std::vector<Status> statuses(num_keys);
- ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
- // To appease clang analyzer
- const bool use_txn = FLAGS_use_txn;
- // Create a transaction in order to write some data. The purpose is to
- // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
- // will be rolled back once MultiGet returns.
- #ifndef ROCKSDB_LITE
- Transaction* txn = nullptr;
- if (use_txn) {
- WriteOptions wo;
- Status s = NewTxn(wo, &txn);
- if (!s.ok()) {
- fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
- std::terminate();
- }
- }
- #endif
- for (size_t i = 0; i < num_keys; ++i) {
- key_str.emplace_back(Key(rand_keys[i]));
- keys.emplace_back(key_str.back());
- #ifndef ROCKSDB_LITE
- if (use_txn) {
- // With a 1 in 10 probability, insert the just added key in the batch
- // into the transaction. This will create an overlap with the MultiGet
- // keys and exercise some corner cases in the code
- if (thread->rand.OneIn(10)) {
- int op = thread->rand.Uniform(2);
- Status s;
- switch (op) {
- case 0:
- case 1: {
- uint32_t value_base =
- thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
- char value[100];
- size_t sz = GenerateValue(value_base, value, sizeof(value));
- Slice v(value, sz);
- if (op == 0) {
- s = txn->Put(cfh, keys.back(), v);
- } else {
- s = txn->Merge(cfh, keys.back(), v);
- }
- break;
- }
- case 2:
- s = txn->Delete(cfh, keys.back());
- break;
- default:
- assert(false);
- }
- if (!s.ok()) {
- fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
- std::terminate();
- }
- }
- }
- #endif
- }
- if (!use_txn) {
- db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
- statuses.data());
- } else {
- #ifndef ROCKSDB_LITE
- txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
- statuses.data());
- RollbackTxn(txn);
- #endif
- }
- for (const auto& s : statuses) {
- if (s.ok()) {
- // found case
- thread->stats.AddGets(1, 1);
- } else if (s.IsNotFound()) {
- // not found case
- thread->stats.AddGets(1, 0);
- } else if (s.IsMergeInProgress() && use_txn) {
- // With txn this is sometimes expected.
- thread->stats.AddGets(1, 1);
- } else {
- // errors case
- fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- }
- return statuses;
- }
- Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- auto cfh = column_families_[rand_column_families[0]];
- std::string key_str = Key(rand_keys[0]);
- Slice key = key_str;
- Slice prefix = Slice(key.data(), FLAGS_prefix_size);
- std::string upper_bound;
- Slice ub_slice;
- ReadOptions ro_copy = read_opts;
- // Get the next prefix first and then see if we want to set upper bound.
- // We'll use the next prefix in an assertion later on
- if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
- // For half of the time, set the upper bound to the next prefix
- ub_slice = Slice(upper_bound);
- ro_copy.iterate_upper_bound = &ub_slice;
- }
- Iterator* iter = db_->NewIterator(ro_copy, cfh);
- unsigned long count = 0;
- for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
- iter->Next()) {
- ++count;
- }
- assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
- Status s = iter->status();
- if (iter->status().ok()) {
- thread->stats.AddPrefixes(1, count);
- } else {
- fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- delete iter;
- return s;
- }
- Status TestPut(ThreadState* thread, WriteOptions& write_opts,
- const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys, char (&value)[100],
- std::unique_ptr<MutexLock>& lock) override {
- auto shared = thread->shared;
- int64_t max_key = shared->GetMaxKey();
- int64_t rand_key = rand_keys[0];
- int rand_column_family = rand_column_families[0];
- while (!shared->AllowsOverwrite(rand_key) &&
- (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
- lock.reset();
- rand_key = thread->rand.Next() % max_key;
- rand_column_family = thread->rand.Next() % FLAGS_column_families;
- lock.reset(
- new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
- }
- std::string key_str = Key(rand_key);
- Slice key = key_str;
- ColumnFamilyHandle* cfh = column_families_[rand_column_family];
- if (FLAGS_verify_before_write) {
- std::string key_str2 = Key(rand_key);
- Slice k = key_str2;
- std::string from_db;
- Status s = db_->Get(read_opts, cfh, k, &from_db);
- if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, from_db,
- s, true)) {
- return s;
- }
- }
- uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
- size_t sz = GenerateValue(value_base, value, sizeof(value));
- Slice v(value, sz);
- shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
- Status s;
- if (FLAGS_use_merge) {
- if (!FLAGS_use_txn) {
- s = db_->Merge(write_opts, cfh, key, v);
- } else {
- #ifndef ROCKSDB_LITE
- Transaction* txn;
- s = NewTxn(write_opts, &txn);
- if (s.ok()) {
- s = txn->Merge(cfh, key, v);
- if (s.ok()) {
- s = CommitTxn(txn);
- }
- }
- #endif
- }
- } else {
- if (!FLAGS_use_txn) {
- s = db_->Put(write_opts, cfh, key, v);
- } else {
- #ifndef ROCKSDB_LITE
- Transaction* txn;
- s = NewTxn(write_opts, &txn);
- if (s.ok()) {
- s = txn->Put(cfh, key, v);
- if (s.ok()) {
- s = CommitTxn(txn);
- }
- }
- #endif
- }
- }
- shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
- if (!s.ok()) {
- fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
- std::terminate();
- }
- thread->stats.AddBytesForWrites(1, sz);
- PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
- sz);
- return s;
- }
- Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys,
- std::unique_ptr<MutexLock>& lock) override {
- int64_t rand_key = rand_keys[0];
- int rand_column_family = rand_column_families[0];
- auto shared = thread->shared;
- int64_t max_key = shared->GetMaxKey();
- // OPERATION delete
- // If the chosen key does not allow overwrite and it does not exist,
- // choose another key.
- while (!shared->AllowsOverwrite(rand_key) &&
- !shared->Exists(rand_column_family, rand_key)) {
- lock.reset();
- rand_key = thread->rand.Next() % max_key;
- rand_column_family = thread->rand.Next() % FLAGS_column_families;
- lock.reset(
- new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
- }
- std::string key_str = Key(rand_key);
- Slice key = key_str;
- auto cfh = column_families_[rand_column_family];
- // Use delete if the key may be overwritten and a single deletion
- // otherwise.
- Status s;
- if (shared->AllowsOverwrite(rand_key)) {
- shared->Delete(rand_column_family, rand_key, true /* pending */);
- if (!FLAGS_use_txn) {
- s = db_->Delete(write_opts, cfh, key);
- } else {
- #ifndef ROCKSDB_LITE
- Transaction* txn;
- s = NewTxn(write_opts, &txn);
- if (s.ok()) {
- s = txn->Delete(cfh, key);
- if (s.ok()) {
- s = CommitTxn(txn);
- }
- }
- #endif
- }
- shared->Delete(rand_column_family, rand_key, false /* pending */);
- thread->stats.AddDeletes(1);
- if (!s.ok()) {
- fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
- std::terminate();
- }
- } else {
- shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
- if (!FLAGS_use_txn) {
- s = db_->SingleDelete(write_opts, cfh, key);
- } else {
- #ifndef ROCKSDB_LITE
- Transaction* txn;
- s = NewTxn(write_opts, &txn);
- if (s.ok()) {
- s = txn->SingleDelete(cfh, key);
- if (s.ok()) {
- s = CommitTxn(txn);
- }
- }
- #endif
- }
- shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
- thread->stats.AddSingleDeletes(1);
- if (!s.ok()) {
- fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
- std::terminate();
- }
- }
- return s;
- }
- Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys,
- std::unique_ptr<MutexLock>& lock) override {
- // OPERATION delete range
- std::vector<std::unique_ptr<MutexLock>> range_locks;
- // delete range does not respect disallowed overwrites. the keys for
- // which overwrites are disallowed are randomly distributed so it
- // could be expensive to find a range where each key allows
- // overwrites.
- int64_t rand_key = rand_keys[0];
- int rand_column_family = rand_column_families[0];
- auto shared = thread->shared;
- int64_t max_key = shared->GetMaxKey();
- if (rand_key > max_key - FLAGS_range_deletion_width) {
- lock.reset();
- rand_key =
- thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
- range_locks.emplace_back(
- new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
- } else {
- range_locks.emplace_back(std::move(lock));
- }
- for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
- if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
- range_locks.emplace_back(new MutexLock(
- shared->GetMutexForKey(rand_column_family, rand_key + j)));
- }
- }
- shared->DeleteRange(rand_column_family, rand_key,
- rand_key + FLAGS_range_deletion_width,
- true /* pending */);
- std::string keystr = Key(rand_key);
- Slice key = keystr;
- auto cfh = column_families_[rand_column_family];
- std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
- Slice end_key = end_keystr;
- Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
- if (!s.ok()) {
- fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
- std::terminate();
- }
- int covered = shared->DeleteRange(rand_column_family, rand_key,
- rand_key + FLAGS_range_deletion_width,
- false /* pending */);
- thread->stats.AddRangeDeletions(1);
- thread->stats.AddCoveredByRangeDeletions(covered);
- return s;
- }
- #ifdef ROCKSDB_LITE
- void TestIngestExternalFile(
- ThreadState* /* thread */,
- const std::vector<int>& /* rand_column_families */,
- const std::vector<int64_t>& /* rand_keys */,
- std::unique_ptr<MutexLock>& /* lock */) override {
- assert(false);
- fprintf(stderr,
- "RocksDB lite does not support "
- "TestIngestExternalFile\n");
- std::terminate();
- }
- #else
- void TestIngestExternalFile(ThreadState* thread,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys,
- std::unique_ptr<MutexLock>& lock) override {
- const std::string sst_filename =
- FLAGS_db + "/." + ToString(thread->tid) + ".sst";
- Status s;
- if (db_stress_env->FileExists(sst_filename).ok()) {
- // Maybe we terminated abnormally before, so cleanup to give this file
- // ingestion a clean slate
- s = db_stress_env->DeleteFile(sst_filename);
- }
- SstFileWriter sst_file_writer(EnvOptions(options_), options_);
- if (s.ok()) {
- s = sst_file_writer.Open(sst_filename);
- }
- int64_t key_base = rand_keys[0];
- int column_family = rand_column_families[0];
- std::vector<std::unique_ptr<MutexLock>> range_locks;
- std::vector<uint32_t> values;
- SharedState* shared = thread->shared;
- // Grab locks, set pending state on expected values, and add keys
- for (int64_t key = key_base;
- s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
- shared->GetMaxKey());
- ++key) {
- if (key == key_base) {
- range_locks.emplace_back(std::move(lock));
- } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
- range_locks.emplace_back(
- new MutexLock(shared->GetMutexForKey(column_family, key)));
- }
- uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
- values.push_back(value_base);
- shared->Put(column_family, key, value_base, true /* pending */);
- char value[100];
- size_t value_len = GenerateValue(value_base, value, sizeof(value));
- auto key_str = Key(key);
- s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
- }
- if (s.ok()) {
- s = sst_file_writer.Finish();
- }
- if (s.ok()) {
- s = db_->IngestExternalFile(column_families_[column_family],
- {sst_filename}, IngestExternalFileOptions());
- }
- if (!s.ok()) {
- fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
- std::terminate();
- }
- int64_t key = key_base;
- for (int32_t value : values) {
- shared->Put(column_family, key, value, false /* pending */);
- ++key;
- }
- }
- #endif // ROCKSDB_LITE
- bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
- SharedState* shared, const std::string& value_from_db,
- const Status& s, bool strict = false) const {
- if (shared->HasVerificationFailedYet()) {
- return false;
- }
- // compare value_from_db with the value in the shared state
- char value[kValueMaxLen];
- uint32_t value_base = shared->Get(cf, key);
- if (value_base == SharedState::UNKNOWN_SENTINEL) {
- return true;
- }
- if (value_base == SharedState::DELETION_SENTINEL && !strict) {
- return true;
- }
- if (s.ok()) {
- if (value_base == SharedState::DELETION_SENTINEL) {
- VerificationAbort(shared, "Unexpected value found", cf, key);
- return false;
- }
- size_t sz = GenerateValue(value_base, value, sizeof(value));
- if (value_from_db.length() != sz) {
- VerificationAbort(shared, "Length of value read is not equal", cf, key);
- return false;
- }
- if (memcmp(value_from_db.data(), value, sz) != 0) {
- VerificationAbort(shared, "Contents of value read don't match", cf,
- key);
- return false;
- }
- } else {
- if (value_base != SharedState::DELETION_SENTINEL) {
- VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
- return false;
- }
- }
- return true;
- }
- };
- StressTest* CreateNonBatchedOpsStressTest() {
- return new NonBatchedOpsStressTest();
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // GFLAGS
|