| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #ifdef GFLAGS
- #include "db_stress_tool/db_stress_common.h"
- #include "file/file_util.h"
- namespace ROCKSDB_NAMESPACE {
- class CfConsistencyStressTest : public StressTest {
- public:
- CfConsistencyStressTest() : batch_id_(0) {}
- ~CfConsistencyStressTest() override = default;
- bool IsStateTracked() const override { return false; }
- Status TestPut(ThreadState* thread, WriteOptions& write_opts,
- const ReadOptions& /* read_opts */,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys,
- char (&value)[100]) override {
- assert(!rand_column_families.empty());
- assert(!rand_keys.empty());
- const std::string k = Key(rand_keys[0]);
- const uint32_t value_base = batch_id_.fetch_add(1);
- const size_t sz = GenerateValue(value_base, value, sizeof(value));
- const Slice v(value, sz);
- WriteBatch batch;
- Status status;
- if (FLAGS_use_attribute_group && FLAGS_use_put_entity_one_in > 0 &&
- (value_base % FLAGS_use_put_entity_one_in) == 0) {
- std::vector<ColumnFamilyHandle*> cfhs;
- cfhs.reserve(rand_column_families.size());
- for (auto cf : rand_column_families) {
- cfhs.push_back(column_families_[cf]);
- }
- status = batch.PutEntity(k, GenerateAttributeGroups(cfhs, value_base, v));
- } else {
- for (auto cf : rand_column_families) {
- ColumnFamilyHandle* const cfh = column_families_[cf];
- assert(cfh);
- if (FLAGS_use_put_entity_one_in > 0 &&
- (value_base % FLAGS_use_put_entity_one_in) == 0) {
- status = batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
- } else if (FLAGS_use_timed_put_one_in > 0 &&
- ((value_base + kLargePrimeForCommonFactorSkew) %
- FLAGS_use_timed_put_one_in) == 0) {
- uint64_t write_unix_time = GetWriteUnixTime(thread);
- status = batch.TimedPut(cfh, k, v, write_unix_time);
- } else if (FLAGS_use_merge) {
- status = batch.Merge(cfh, k, v);
- } else {
- status = batch.Put(cfh, k, v);
- }
- if (!status.ok()) {
- break;
- }
- }
- }
- if (status.ok()) {
- status = db_->Write(write_opts, &batch);
- }
- if (status.ok()) {
- auto num = static_cast<long>(rand_column_families.size());
- thread->stats.AddBytesForWrites(num, (sz + 1) * num);
- } else if (!IsErrorInjectedAndRetryable(status)) {
- fprintf(stderr, "multi put or merge error: %s\n",
- status.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- return status;
- }
- Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- std::string key_str = Key(rand_keys[0]);
- Slice key = key_str;
- WriteBatch batch;
- for (auto cf : rand_column_families) {
- ColumnFamilyHandle* cfh = column_families_[cf];
- batch.Delete(cfh, key);
- }
- Status s = db_->Write(write_opts, &batch);
- if (s.ok()) {
- thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
- } else if (!IsErrorInjectedAndRetryable(s)) {
- fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- return s;
- }
- Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- int64_t rand_key = rand_keys[0];
- auto shared = thread->shared;
- int64_t max_key = shared->GetMaxKey();
- if (rand_key > max_key - FLAGS_range_deletion_width) {
- rand_key =
- thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
- }
- std::string key_str = Key(rand_key);
- Slice key = key_str;
- std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
- Slice end_key = end_key_str;
- WriteBatch batch;
- for (auto cf : rand_column_families) {
- ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
- batch.DeleteRange(cfh, key, end_key);
- }
- Status s = db_->Write(write_opts, &batch);
- if (s.ok()) {
- thread->stats.AddRangeDeletions(
- static_cast<long>(rand_column_families.size()));
- } else if (!IsErrorInjectedAndRetryable(s)) {
- fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- return s;
- }
- void TestIngestExternalFile(
- ThreadState* /* thread */,
- const std::vector<int>& /* rand_column_families */,
- const std::vector<int64_t>& /* rand_keys */) override {
- assert(false);
- fprintf(stderr,
- "CfConsistencyStressTest does not support TestIngestExternalFile "
- "because it's not possible to verify the result\n");
- std::terminate();
- }
- Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- std::string key_str = Key(rand_keys[0]);
- Slice key = key_str;
- Status s;
- bool is_consistent = true;
- if (thread->rand.OneIn(2)) {
- // 1/2 chance, does a random read from random CF
- auto cfh =
- column_families_[rand_column_families[thread->rand.Next() %
- rand_column_families.size()]];
- std::string from_db;
- s = db_->Get(readoptions, cfh, key, &from_db);
- } else {
- // 1/2 chance, comparing one key is the same across all CFs
- const Snapshot* snapshot = db_->GetSnapshot();
- ReadOptions readoptionscopy = readoptions;
- readoptionscopy.snapshot = snapshot;
- std::string value0;
- s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
- key, &value0);
- // Temporarily disable error injection for verification
- if (fault_fs_guard) {
- fault_fs_guard->DisableThreadLocalErrorInjection(
- FaultInjectionIOType::kRead);
- fault_fs_guard->DisableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataRead);
- }
- if (s.ok() || s.IsNotFound()) {
- bool found = s.ok();
- for (size_t i = 1; i < rand_column_families.size(); i++) {
- std::string value1;
- s = db_->Get(readoptionscopy,
- column_families_[rand_column_families[i]], key, &value1);
- if (!s.ok() && !s.IsNotFound()) {
- break;
- }
- if (!found && s.ok()) {
- fprintf(stderr, "Get() return different results with key %s\n",
- Slice(key_str).ToString(true).c_str());
- fprintf(stderr, "CF %s is not found\n",
- column_family_names_[0].c_str());
- fprintf(stderr, "CF %s returns value %s\n",
- column_family_names_[i].c_str(),
- Slice(value1).ToString(true).c_str());
- is_consistent = false;
- } else if (found && s.IsNotFound()) {
- fprintf(stderr, "Get() return different results with key %s\n",
- Slice(key_str).ToString(true).c_str());
- fprintf(stderr, "CF %s returns value %s\n",
- column_family_names_[0].c_str(),
- Slice(value0).ToString(true).c_str());
- fprintf(stderr, "CF %s is not found\n",
- column_family_names_[i].c_str());
- is_consistent = false;
- } else if (s.ok() && value0 != value1) {
- fprintf(stderr, "Get() return different results with key %s\n",
- Slice(key_str).ToString(true).c_str());
- fprintf(stderr, "CF %s returns value %s\n",
- column_family_names_[0].c_str(),
- Slice(value0).ToString(true).c_str());
- fprintf(stderr, "CF %s returns value %s\n",
- column_family_names_[i].c_str(),
- Slice(value1).ToString(true).c_str());
- is_consistent = false;
- }
- if (!is_consistent) {
- break;
- }
- }
- }
- // Enable back error injection disabled for verification
- if (fault_fs_guard) {
- fault_fs_guard->EnableThreadLocalErrorInjection(
- FaultInjectionIOType::kRead);
- fault_fs_guard->EnableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataRead);
- }
- db_->ReleaseSnapshot(snapshot);
- }
- if (!is_consistent) {
- fprintf(stderr, "TestGet error: is_consistent is false\n");
- thread->stats.AddErrors(1);
- // Fail fast to preserve the DB state.
- thread->shared->SetVerificationFailure();
- } else if (s.ok()) {
- thread->stats.AddGets(1, 1);
- } else if (s.IsNotFound()) {
- thread->stats.AddGets(1, 0);
- } else if (!IsErrorInjectedAndRetryable(s)) {
- fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- return s;
- }
- std::vector<Status> TestMultiGet(
- ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- size_t num_keys = rand_keys.size();
- std::vector<std::string> key_str;
- std::vector<Slice> keys;
- keys.reserve(num_keys);
- key_str.reserve(num_keys);
- std::vector<PinnableSlice> values(num_keys);
- std::vector<Status> statuses(num_keys);
- ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
- ReadOptions readoptionscopy = read_opts;
- readoptionscopy.rate_limiter_priority =
- FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
- for (size_t i = 0; i < num_keys; ++i) {
- key_str.emplace_back(Key(rand_keys[i]));
- keys.emplace_back(key_str.back());
- }
- db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
- statuses.data());
- for (const auto& s : statuses) {
- if (s.ok()) {
- // found case
- thread->stats.AddGets(1, 1);
- } else if (s.IsNotFound()) {
- // not found case
- thread->stats.AddGets(1, 0);
- } else if (!IsErrorInjectedAndRetryable(s)) {
- // errors case
- fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- }
- return statuses;
- }
- void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- assert(thread);
- assert(!rand_column_families.empty());
- assert(!rand_keys.empty());
- const std::string key = Key(rand_keys[0]);
- Status s;
- bool is_consistent = true;
- if (thread->rand.OneIn(2)) {
- // With a 1/2 chance, do a random read from a random CF
- const size_t cf_id = thread->rand.Next() % rand_column_families.size();
- assert(rand_column_families[cf_id] >= 0);
- assert(rand_column_families[cf_id] <
- static_cast<int>(column_families_.size()));
- ColumnFamilyHandle* const cfh =
- column_families_[rand_column_families[cf_id]];
- assert(cfh);
- PinnableWideColumns result;
- s = db_->GetEntity(read_opts, cfh, key, &result);
- if (s.ok()) {
- if (!VerifyWideColumns(result.columns())) {
- fprintf(
- stderr,
- "GetEntity error: inconsistent columns for key %s, entity %s\n",
- StringToHex(key).c_str(),
- WideColumnsToHex(result.columns()).c_str());
- is_consistent = false;
- }
- }
- } else {
- // With a 1/2 chance, compare one key across all CFs
- ManagedSnapshot snapshot_guard(db_);
- ReadOptions read_opts_copy = read_opts;
- read_opts_copy.snapshot = snapshot_guard.snapshot();
- assert(rand_column_families[0] >= 0);
- assert(rand_column_families[0] <
- static_cast<int>(column_families_.size()));
- PinnableWideColumns cmp_result;
- s = db_->GetEntity(read_opts_copy,
- column_families_[rand_column_families[0]], key,
- &cmp_result);
- // Temporarily disable error injection for verification
- if (fault_fs_guard) {
- fault_fs_guard->DisableThreadLocalErrorInjection(
- FaultInjectionIOType::kRead);
- fault_fs_guard->DisableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataRead);
- }
- if (s.ok() || s.IsNotFound()) {
- const bool cmp_found = s.ok();
- if (cmp_found) {
- if (!VerifyWideColumns(cmp_result.columns())) {
- fprintf(stderr,
- "GetEntity error: inconsistent columns for key %s, "
- "entity %s\n",
- StringToHex(key).c_str(),
- WideColumnsToHex(cmp_result.columns()).c_str());
- is_consistent = false;
- }
- }
- if (is_consistent) {
- if (FLAGS_use_attribute_group) {
- PinnableAttributeGroups result;
- result.reserve(rand_column_families.size());
- for (size_t i = 1; i < rand_column_families.size(); ++i) {
- assert(rand_column_families[i] >= 0);
- assert(rand_column_families[i] <
- static_cast<int>(column_families_.size()));
- result.emplace_back(column_families_[rand_column_families[i]]);
- }
- s = db_->GetEntity(read_opts_copy, key, &result);
- if (s.ok()) {
- for (auto& attribute_group : result) {
- s = attribute_group.status();
- if (!s.ok() && !s.IsNotFound()) {
- break;
- }
- const bool found = s.ok();
- if (!cmp_found && found) {
- fprintf(
- stderr,
- "Non-AttributeGroup GetEntity returns different results "
- "than AttributeGroup GetEntity for key %s: CF %s "
- "returns not found, CF %s returns entity %s \n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- attribute_group.column_family()->GetName().c_str(),
- WideColumnsToHex(attribute_group.columns()).c_str());
- is_consistent = false;
- break;
- }
- if (cmp_found && !found) {
- fprintf(
- stderr,
- "Non-AttributeGroup GetEntity returns different results "
- "than AttributeGroup GetEntity for key %s: CF %s "
- "returns entity %s, CF %s returns not found \n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_result.columns()).c_str(),
- attribute_group.column_family()->GetName().c_str());
- is_consistent = false;
- break;
- }
- if (found &&
- attribute_group.columns() != cmp_result.columns()) {
- fprintf(
- stderr,
- "Non-AttributeGroup GetEntity returns different results "
- "than AttributeGroup GetEntity for key %s: CF %s "
- "returns entity %s, CF %s returns entity %s\n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_result.columns()).c_str(),
- attribute_group.column_family()->GetName().c_str(),
- WideColumnsToHex(attribute_group.columns()).c_str());
- is_consistent = false;
- break;
- }
- }
- }
- } else {
- for (size_t i = 1; i < rand_column_families.size(); ++i) {
- assert(rand_column_families[i] >= 0);
- assert(rand_column_families[i] <
- static_cast<int>(column_families_.size()));
- PinnableWideColumns result;
- s = db_->GetEntity(read_opts_copy,
- column_families_[rand_column_families[i]], key,
- &result);
- if (!s.ok() && !s.IsNotFound()) {
- break;
- }
- const bool found = s.ok();
- assert(!column_family_names_.empty());
- assert(i < column_family_names_.size());
- if (!cmp_found && found) {
- fprintf(stderr,
- "GetEntity returns different results for key %s: CF %s "
- "returns not found, CF %s returns entity %s\n",
- StringToHex(key).c_str(),
- column_family_names_[0].c_str(),
- column_family_names_[i].c_str(),
- WideColumnsToHex(result.columns()).c_str());
- is_consistent = false;
- break;
- }
- if (cmp_found && !found) {
- fprintf(stderr,
- "GetEntity returns different results for key %s: CF %s "
- "returns entity %s, CF %s returns not found\n",
- StringToHex(key).c_str(),
- column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_result.columns()).c_str(),
- column_family_names_[i].c_str());
- is_consistent = false;
- break;
- }
- if (found && result != cmp_result) {
- fprintf(stderr,
- "GetEntity returns different results for key %s: CF %s "
- "returns entity %s, CF %s returns entity %s\n",
- StringToHex(key).c_str(),
- column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_result.columns()).c_str(),
- column_family_names_[i].c_str(),
- WideColumnsToHex(result.columns()).c_str());
- is_consistent = false;
- break;
- }
- }
- }
- }
- }
- // Enable back error injection disabled for verification
- if (fault_fs_guard) {
- fault_fs_guard->EnableThreadLocalErrorInjection(
- FaultInjectionIOType::kRead);
- fault_fs_guard->EnableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataRead);
- }
- }
- if (!is_consistent) {
- fprintf(stderr, "TestGetEntity error: results are not consistent\n");
- thread->stats.AddErrors(1);
- // Fail fast to preserve the DB state.
- thread->shared->SetVerificationFailure();
- } else if (s.ok()) {
- thread->stats.AddGets(1, 1);
- } else if (s.IsNotFound()) {
- thread->stats.AddGets(1, 0);
- } else if (!IsErrorInjectedAndRetryable(s)) {
- fprintf(stderr, "TestGetEntity error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- }
- }
- void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- assert(thread);
- assert(thread->shared);
- assert(!rand_column_families.empty());
- assert(!rand_keys.empty());
- ManagedSnapshot snapshot_guard(db_);
- ReadOptions read_opts_copy = read_opts;
- read_opts_copy.snapshot = snapshot_guard.snapshot();
- const size_t num_cfs = rand_column_families.size();
- std::vector<ColumnFamilyHandle*> cfhs;
- cfhs.reserve(num_cfs);
- for (size_t j = 0; j < num_cfs; ++j) {
- assert(rand_column_families[j] >= 0);
- assert(rand_column_families[j] <
- static_cast<int>(column_families_.size()));
- ColumnFamilyHandle* const cfh = column_families_[rand_column_families[j]];
- assert(cfh);
- cfhs.emplace_back(cfh);
- }
- const size_t num_keys = rand_keys.size();
- if (FLAGS_use_attribute_group) {
- // AttributeGroup MultiGetEntity verification
- std::vector<PinnableAttributeGroups> results;
- std::vector<Slice> key_slices;
- std::vector<std::string> key_strs;
- results.reserve(num_keys);
- key_slices.reserve(num_keys);
- key_strs.reserve(num_keys);
- for (size_t i = 0; i < num_keys; ++i) {
- key_strs.emplace_back(Key(rand_keys[i]));
- key_slices.emplace_back(key_strs.back());
- PinnableAttributeGroups attribute_groups;
- for (auto* cfh : cfhs) {
- attribute_groups.emplace_back(cfh);
- }
- results.emplace_back(std::move(attribute_groups));
- }
- db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(),
- results.data());
- bool is_consistent = true;
- for (size_t i = 0; i < num_keys; ++i) {
- const auto& result = results[i];
- const Status& cmp_s = result[0].status();
- const WideColumns& cmp_columns = result[0].columns();
- bool has_error = false;
- for (size_t j = 0; j < num_cfs; ++j) {
- const Status& s = result[j].status();
- const WideColumns& columns = result[j].columns();
- if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
- break;
- } else if (!s.ok() && !s.IsNotFound()) {
- fprintf(stderr, "TestMultiGetEntity (AttributeGroup) error: %s\n",
- s.ToString().c_str());
- thread->stats.AddErrors(1);
- has_error = true;
- break;
- }
- assert(cmp_s.ok() || cmp_s.IsNotFound());
- if (s.IsNotFound()) {
- if (cmp_s.ok()) {
- fprintf(stderr,
- "MultiGetEntity (AttributeGroup) returns different "
- "results for key %s: CF %s "
- "returns entity %s, CF %s returns not found\n",
- key_slices[i].ToString(true).c_str(),
- column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_columns).c_str(),
- column_family_names_[j].c_str());
- is_consistent = false;
- break;
- }
- continue;
- }
- assert(s.ok());
- if (cmp_s.IsNotFound()) {
- fprintf(stderr,
- "MultiGetEntity (AttributeGroup) returns different results "
- "for key %s: CF %s "
- "returns not found, CF %s returns entity %s\n",
- key_slices[i].ToString(true).c_str(),
- column_family_names_[0].c_str(),
- column_family_names_[j].c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- if (columns != cmp_columns) {
- fprintf(stderr,
- "MultiGetEntity (AttributeGroup) returns different results "
- "for key %s: CF %s "
- "returns entity %s, CF %s returns entity %s\n",
- key_slices[i].ToString(true).c_str(),
- column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_columns).c_str(),
- column_family_names_[j].c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- if (!VerifyWideColumns(columns)) {
- fprintf(stderr,
- "MultiGetEntity (AttributeGroup) error: inconsistent "
- "columns for key %s, "
- "entity %s\n",
- key_slices[i].ToString(true).c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- }
- if (has_error) {
- break;
- } else if (!is_consistent) {
- fprintf(stderr,
- "TestMultiGetEntity (AttributeGroup) error: results are not "
- "consistent\n");
- thread->stats.AddErrors(1);
- // Fail fast to preserve the DB state.
- thread->shared->SetVerificationFailure();
- break;
- } else if (cmp_s.ok()) {
- thread->stats.AddGets(1, 1);
- } else if (cmp_s.IsNotFound()) {
- thread->stats.AddGets(1, 0);
- }
- }
- } else {
- // Non-AttributeGroup MultiGetEntity verification
- for (size_t i = 0; i < num_keys; ++i) {
- const std::string key = Key(rand_keys[i]);
- std::vector<Slice> key_slices(num_cfs, key);
- std::vector<PinnableWideColumns> results(num_cfs);
- std::vector<Status> statuses(num_cfs);
- db_->MultiGetEntity(read_opts_copy, num_cfs, cfhs.data(),
- key_slices.data(), results.data(), statuses.data());
- bool is_consistent = true;
- const Status& cmp_s = statuses[0];
- const WideColumns& cmp_columns = results[0].columns();
- for (size_t j = 0; j < num_cfs; ++j) {
- const Status& s = statuses[j];
- const WideColumns& columns = results[j].columns();
- if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
- break;
- } else if (!s.ok() && !s.IsNotFound()) {
- fprintf(stderr, "TestMultiGetEntity error: %s\n",
- s.ToString().c_str());
- thread->stats.AddErrors(1);
- break;
- }
- assert(cmp_s.ok() || cmp_s.IsNotFound());
- if (s.IsNotFound()) {
- if (cmp_s.ok()) {
- fprintf(
- stderr,
- "MultiGetEntity returns different results for key %s: CF %s "
- "returns entity %s, CF %s returns not found\n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_columns).c_str(),
- column_family_names_[j].c_str());
- is_consistent = false;
- break;
- }
- continue;
- }
- assert(s.ok());
- if (cmp_s.IsNotFound()) {
- fprintf(
- stderr,
- "MultiGetEntity returns different results for key %s: CF %s "
- "returns not found, CF %s returns entity %s\n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- column_family_names_[j].c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- if (columns != cmp_columns) {
- fprintf(
- stderr,
- "MultiGetEntity returns different results for key %s: CF %s "
- "returns entity %s, CF %s returns entity %s\n",
- StringToHex(key).c_str(), column_family_names_[0].c_str(),
- WideColumnsToHex(cmp_columns).c_str(),
- column_family_names_[j].c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- if (!VerifyWideColumns(columns)) {
- fprintf(stderr,
- "MultiGetEntity error: inconsistent columns for key %s, "
- "entity %s\n",
- StringToHex(key).c_str(),
- WideColumnsToHex(columns).c_str());
- is_consistent = false;
- break;
- }
- }
- if (!is_consistent) {
- fprintf(stderr,
- "TestMultiGetEntity error: results are not consistent\n");
- thread->stats.AddErrors(1);
- // Fail fast to preserve the DB state.
- thread->shared->SetVerificationFailure();
- break;
- } else if (statuses[0].ok()) {
- thread->stats.AddGets(1, 1);
- } else if (statuses[0].IsNotFound()) {
- thread->stats.AddGets(1, 0);
- }
- }
- }
- }
- Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
- const std::vector<int>& rand_column_families,
- const std::vector<int64_t>& rand_keys) override {
- assert(!rand_column_families.empty());
- assert(!rand_keys.empty());
- const std::string key = Key(rand_keys[0]);
- const size_t prefix_to_use =
- (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
- const Slice prefix(key.data(), prefix_to_use);
- std::string upper_bound;
- Slice ub_slice;
- ReadOptions ro_copy = readoptions;
- std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
- if (ro_copy.auto_refresh_iterator_with_snapshot) {
- snapshot = std::make_unique<ManagedSnapshot>(db_);
- ro_copy.snapshot = snapshot->snapshot();
- }
- // Get the next prefix first and then see if we want to set upper bound.
- // We'll use the next prefix in an assertion later on
- if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
- ub_slice = Slice(upper_bound);
- ro_copy.iterate_upper_bound = &ub_slice;
- if (FLAGS_use_sqfc_for_range_queries) {
- ro_copy.table_filter =
- sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
- }
- }
- ColumnFamilyHandle* const cfh =
- column_families_[rand_column_families[thread->rand.Uniform(
- static_cast<int>(rand_column_families.size()))]];
- assert(cfh);
- std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
- uint64_t count = 0;
- Status s;
- for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
- iter->Next()) {
- ++count;
- if (ro_copy.allow_unprepared_value) {
- if (!iter->PrepareValue()) {
- s = iter->status();
- break;
- }
- }
- if (!VerifyWideColumns(iter->value(), iter->columns())) {
- s = Status::Corruption("Value and columns inconsistent",
- DebugString(iter->value(), iter->columns()));
- break;
- }
- }
- assert(prefix_to_use == 0 ||
- count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
- if (s.ok()) {
- s = iter->status();
- }
- if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
- fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
- thread->stats.AddErrors(1);
- return s;
- }
- thread->stats.AddPrefixes(1, count);
- return Status::OK();
- }
- ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
- int /*column_family_id*/
- ) override {
- // All column families should contain the same data. Randomly pick one.
- return column_families_[thread->rand.Next() % column_families_.size()];
- }
- void VerifyDb(ThreadState* thread) const override {
- // This `ReadOptions` is for validation purposes. Ignore
- // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
- ReadOptions options(FLAGS_verify_checksum, true);
- // We must set total_order_seek to true because we are doing a SeekToFirst
- // on a column family whose memtables may support (by default) prefix-based
- // iterator. In this case, NewIterator with options.total_order_seek being
- // false returns a prefix-based iterator. Calling SeekToFirst using this
- // iterator causes the iterator to become invalid. That means we cannot
- // iterate the memtable using this iterator any more, although the memtable
- // contains the most up-to-date key-values.
- options.total_order_seek = true;
- ManagedSnapshot snapshot_guard(db_);
- options.snapshot = snapshot_guard.snapshot();
- options.auto_refresh_iterator_with_snapshot =
- FLAGS_auto_refresh_iterator_with_snapshot;
- const size_t num = column_families_.size();
- std::vector<std::unique_ptr<Iterator>> iters;
- iters.reserve(num);
- for (size_t i = 0; i < num; ++i) {
- iters.emplace_back(db_->NewIterator(options, column_families_[i]));
- iters.back()->SeekToFirst();
- }
- std::vector<Status> statuses(num, Status::OK());
- assert(thread);
- auto shared = thread->shared;
- assert(shared);
- do {
- if (shared->HasVerificationFailedYet()) {
- break;
- }
- size_t valid_cnt = 0;
- for (size_t i = 0; i < num; ++i) {
- const auto& iter = iters[i];
- assert(iter);
- if (iter->Valid()) {
- if (!VerifyWideColumns(iter->value(), iter->columns())) {
- statuses[i] =
- Status::Corruption("Value and columns inconsistent",
- DebugString(iter->value(), iter->columns()));
- } else {
- ++valid_cnt;
- }
- } else {
- statuses[i] = iter->status();
- }
- }
- if (valid_cnt == 0) {
- for (size_t i = 0; i < num; ++i) {
- const auto& s = statuses[i];
- if (!s.ok()) {
- fprintf(stderr, "Iterator on cf %s has error: %s\n",
- column_families_[i]->GetName().c_str(),
- s.ToString().c_str());
- shared->SetVerificationFailure();
- }
- }
- break;
- }
- if (valid_cnt < num) {
- shared->SetVerificationFailure();
- for (size_t i = 0; i < num; ++i) {
- assert(iters[i]);
- if (!iters[i]->Valid()) {
- if (statuses[i].ok()) {
- fprintf(stderr, "Finished scanning cf %s\n",
- column_families_[i]->GetName().c_str());
- } else {
- fprintf(stderr, "Iterator on cf %s has error: %s\n",
- column_families_[i]->GetName().c_str(),
- statuses[i].ToString().c_str());
- }
- } else {
- fprintf(stderr, "cf %s has remaining data to scan\n",
- column_families_[i]->GetName().c_str());
- }
- }
- break;
- }
- if (shared->HasVerificationFailedYet()) {
- break;
- }
- // If the program reaches here, then all column families' iterators are
- // still valid.
- assert(valid_cnt == num);
- if (shared->PrintingVerificationResults()) {
- continue;
- }
- assert(iters[0]);
- const Slice key = iters[0]->key();
- const Slice value = iters[0]->value();
- int num_mismatched_cfs = 0;
- for (size_t i = 1; i < num; ++i) {
- assert(iters[i]);
- const int cmp = key.compare(iters[i]->key());
- if (cmp != 0) {
- ++num_mismatched_cfs;
- if (1 == num_mismatched_cfs) {
- fprintf(stderr, "Verification failed\n");
- fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
- db_->GetLatestSequenceNumber());
- fprintf(stderr, "[%s] %s => %s\n",
- column_families_[0]->GetName().c_str(),
- key.ToString(true /* hex */).c_str(),
- value.ToString(true /* hex */).c_str());
- }
- fprintf(stderr, "[%s] %s => %s\n",
- column_families_[i]->GetName().c_str(),
- iters[i]->key().ToString(true /* hex */).c_str(),
- iters[i]->value().ToString(true /* hex */).c_str());
- Slice begin_key;
- Slice end_key;
- if (cmp < 0) {
- begin_key = key;
- end_key = iters[i]->key();
- } else {
- begin_key = iters[i]->key();
- end_key = key;
- }
- const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
- constexpr size_t kMaxNumIKeys = 8;
- std::vector<KeyVersion> versions;
- const Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
- kMaxNumIKeys, &versions);
- if (!s.ok()) {
- fprintf(stderr, "%s\n", s.ToString().c_str());
- return;
- }
- assert(cfh);
- fprintf(stderr,
- "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
- ")\n",
- cfh->GetName().c_str(),
- begin_key.ToString(true /* hex */).c_str(),
- end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
- for (const KeyVersion& kv : versions) {
- fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
- Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
- kv.type);
- }
- };
- if (1 == num_mismatched_cfs) {
- print_key_versions(column_families_[0]);
- }
- print_key_versions(column_families_[i]);
- shared->SetVerificationFailure();
- }
- }
- shared->FinishPrintingVerificationResults();
- for (auto& iter : iters) {
- assert(iter);
- iter->Next();
- }
- } while (true);
- }
- void ContinuouslyVerifyDb(ThreadState* thread) const override {
- assert(thread);
- Status status;
- DB* db_ptr = secondary_db_ ? secondary_db_ : db_;
- const auto& cfhs = secondary_db_ ? secondary_cfhs_ : column_families_;
- // Take a snapshot to preserve the state of primary db.
- ManagedSnapshot snapshot_guard(db_);
- SharedState* shared = thread->shared;
- assert(shared);
- if (secondary_db_) {
- status = secondary_db_->TryCatchUpWithPrimary();
- if (!status.ok()) {
- fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
- status.ToString().c_str());
- shared->SetShouldStopTest();
- assert(false);
- return;
- }
- }
- const auto checksum_column_family = [](Iterator* iter,
- uint32_t* checksum) -> Status {
- assert(nullptr != checksum);
- uint32_t ret = 0;
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
- ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
- for (const auto& column : iter->columns()) {
- ret = crc32c::Extend(ret, column.name().data(), column.name().size());
- ret =
- crc32c::Extend(ret, column.value().data(), column.value().size());
- }
- }
- *checksum = ret;
- return iter->status();
- };
- // This `ReadOptions` is for validation purposes. Ignore
- // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
- ReadOptions ropts(FLAGS_verify_checksum, true);
- ropts.total_order_seek = true;
- if (nullptr == secondary_db_ || FLAGS_auto_refresh_iterator_with_snapshot) {
- ropts.snapshot = snapshot_guard.snapshot();
- ropts.auto_refresh_iterator_with_snapshot = true;
- }
- uint32_t crc = 0;
- {
- // Compute crc for all key-values of default column family.
- std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
- status = checksum_column_family(it.get(), &crc);
- if (!status.ok()) {
- fprintf(stderr, "Computing checksum of default cf: %s\n",
- status.ToString().c_str());
- assert(false);
- }
- }
- // Since we currently intentionally disallow reading from the secondary
- // instance with snapshot, we cannot achieve cross-cf consistency if WAL is
- // enabled because there is no guarantee that secondary instance replays
- // the primary's WAL to a consistent point where all cfs have the same
- // data.
- if (status.ok() && FLAGS_disable_wal) {
- uint32_t tmp_crc = 0;
- for (ColumnFamilyHandle* cfh : cfhs) {
- if (cfh == db_ptr->DefaultColumnFamily()) {
- continue;
- }
- std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
- status = checksum_column_family(it.get(), &tmp_crc);
- if (!status.ok() || tmp_crc != crc) {
- break;
- }
- }
- if (!status.ok()) {
- fprintf(stderr, "status: %s\n", status.ToString().c_str());
- shared->SetShouldStopTest();
- assert(false);
- } else if (tmp_crc != crc) {
- fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc);
- shared->SetShouldStopTest();
- assert(false);
- }
- }
- }
- std::vector<int> GenerateColumnFamilies(
- const int /* num_column_families */,
- int /* rand_column_family */) const override {
- std::vector<int> ret;
- int num = static_cast<int>(column_families_.size());
- int k = 0;
- std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
- return ret;
- }
- private:
- std::atomic<uint32_t> batch_id_;
- };
- StressTest* CreateCfConsistencyStressTest() {
- return new CfConsistencyStressTest();
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // GFLAGS
|