| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include <iomanip>
- #include "db/db_test_util.h"
- #include "port/stack_trace.h"
- #include "test_util/testutil.h"
- #include "util/random.h"
- namespace ROCKSDB_NAMESPACE {
- namespace {
- // A wrapper that allows injection of errors.
- class CorruptionFS : public FileSystemWrapper {
- public:
- bool writable_file_error_;
- int num_writable_file_errors_;
- explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target,
- bool fs_buffer, bool verify_read)
- : FileSystemWrapper(_target),
- writable_file_error_(false),
- num_writable_file_errors_(0),
- corruption_trigger_(INT_MAX),
- read_count_(0),
- corrupt_offset_(0),
- corrupt_len_(0),
- rnd_(300),
- fs_buffer_(fs_buffer),
- verify_read_(verify_read) {}
- ~CorruptionFS() override {
- // Assert that the corruption was reset, which means it got triggered
- assert(corruption_trigger_ == INT_MAX || corrupt_len_ > 0);
- }
- const char* Name() const override { return "ErrorEnv"; }
- IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
- std::unique_ptr<FSWritableFile>* result,
- IODebugContext* dbg) override {
- result->reset();
- if (writable_file_error_) {
- ++num_writable_file_errors_;
- return IOStatus::IOError(fname, "fake error");
- }
- return target()->NewWritableFile(fname, opts, result, dbg);
- }
- void SetCorruptionTrigger(const int trigger) {
- MutexLock l(&mutex_);
- corruption_trigger_ = trigger;
- read_count_ = 0;
- corrupt_fname_.clear();
- }
- IOStatus NewRandomAccessFile(const std::string& fname,
- const FileOptions& opts,
- std::unique_ptr<FSRandomAccessFile>* result,
- IODebugContext* dbg) override {
- class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
- public:
- CorruptionRandomAccessFile(CorruptionFS& fs, const std::string& fname,
- std::unique_ptr<FSRandomAccessFile>& file)
- : FSRandomAccessFileOwnerWrapper(std::move(file)),
- fs_(fs),
- fname_(fname) {}
- IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
- Slice* result, char* scratch,
- IODebugContext* dbg) const override {
- IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg);
- if (opts.verify_and_reconstruct_read) {
- fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset,
- result->size());
- return s;
- }
- MutexLock l(&fs_.mutex_);
- if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
- fs_.corruption_trigger_ = INT_MAX;
- char* data = const_cast<char*>(result->data());
- std::memcpy(
- data,
- fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
- result->size());
- fs_.SetCorruptedChunk(fname_, offset, result->size());
- }
- return s;
- }
- IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
- const IOOptions& options,
- IODebugContext* dbg) override {
- for (size_t i = 0; i < num_reqs; ++i) {
- FSReadRequest& req = reqs[i];
- if (fs_.fs_buffer_) {
- // See https://github.com/facebook/rocksdb/pull/13195 for why we
- // want to set up our test implementation for FSAllocationPtr this
- // way.
- char* internalData = new char[req.len];
- req.status = Read(req.offset, req.len, options, &req.result,
- internalData, dbg);
- Slice* internalSlice = new Slice(internalData, req.len);
- FSAllocationPtr internalPtr(internalSlice, [](void* ptr) {
- delete[] static_cast<const char*>(
- static_cast<Slice*>(ptr)->data_);
- delete static_cast<Slice*>(ptr);
- });
- req.fs_scratch = std::move(internalPtr);
- } else {
- req.status = Read(req.offset, req.len, options, &req.result,
- req.scratch, dbg);
- }
- }
- return IOStatus::OK();
- }
- IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::NotSupported("Prefetch");
- }
- private:
- CorruptionFS& fs_;
- std::string fname_;
- };
- std::unique_ptr<FSRandomAccessFile> file;
- IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
- EXPECT_OK(s);
- result->reset(new CorruptionRandomAccessFile(*this, fname, file));
- return s;
- }
- IOStatus NewSequentialFile(const std::string& fname,
- const FileOptions& file_opts,
- std::unique_ptr<FSSequentialFile>* result,
- IODebugContext* dbg) override {
- class CorruptionSequentialFile : public FSSequentialFileOwnerWrapper {
- public:
- CorruptionSequentialFile(CorruptionFS& fs, const std::string& fname,
- std::unique_ptr<FSSequentialFile>& file)
- : FSSequentialFileOwnerWrapper(std::move(file)),
- fs_(fs),
- fname_(fname),
- offset_(0) {}
- IOStatus Read(size_t len, const IOOptions& opts, Slice* result,
- char* scratch, IODebugContext* dbg) override {
- IOStatus s = target()->Read(len, opts, result, scratch, dbg);
- if (result->size() == 0 ||
- fname_.find("IDENTITY") != std::string::npos) {
- return s;
- }
- if (opts.verify_and_reconstruct_read) {
- fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset_,
- result->size());
- return s;
- }
- MutexLock l(&fs_.mutex_);
- if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
- fs_.corruption_trigger_ = INT_MAX;
- char* data = const_cast<char*>(result->data());
- std::memcpy(
- data,
- fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
- result->size());
- fs_.SetCorruptedChunk(fname_, offset_, result->size());
- }
- offset_ += result->size();
- return s;
- }
- private:
- CorruptionFS& fs_;
- std::string fname_;
- size_t offset_;
- };
- std::unique_ptr<FSSequentialFile> file;
- IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg);
- EXPECT_OK(s);
- result->reset(new CorruptionSequentialFile(*this, fname, file));
- return s;
- }
- void SupportedOps(int64_t& supported_ops) override {
- supported_ops = 1 << FSSupportedOps::kAsyncIO;
- if (fs_buffer_) {
- supported_ops |= 1 << FSSupportedOps::kFSBuffer;
- }
- if (verify_read_) {
- supported_ops |= 1 << FSSupportedOps::kVerifyAndReconstructRead;
- }
- }
- void SetCorruptedChunk(const std::string& fname, size_t offset, size_t len) {
- assert(corrupt_fname_.empty());
- corrupt_fname_ = fname;
- corrupt_offset_ = offset;
- corrupt_len_ = len;
- }
- void MaybeResetOverlapWithCorruptedChunk(const std::string& fname,
- size_t offset, size_t len) {
- if (fname == corrupt_fname_ &&
- ((offset <= corrupt_offset_ && (offset + len) > corrupt_offset_) ||
- (offset >= corrupt_offset_ &&
- offset < (corrupt_offset_ + corrupt_len_)))) {
- corrupt_fname_.clear();
- }
- }
- bool VerifyRetry() { return corrupt_len_ > 0 && corrupt_fname_.empty(); }
- int read_count() { return read_count_; }
- int corruption_trigger() { return corruption_trigger_; }
- private:
- int corruption_trigger_;
- int read_count_;
- std::string corrupt_fname_;
- size_t corrupt_offset_;
- size_t corrupt_len_;
- Random rnd_;
- bool fs_buffer_;
- bool verify_read_;
- port::Mutex mutex_;
- };
- } // anonymous namespace
- class DBIOFailureTest : public DBTestBase {
- public:
- DBIOFailureTest() : DBTestBase("db_io_failure_test", /*env_do_fsync=*/true) {}
- };
- // Check that number of files does not grow when writes are dropped
- TEST_F(DBIOFailureTest, DropWrites) {
- do {
- Options options = CurrentOptions();
- options.env = env_;
- options.paranoid_checks = false;
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_EQ("v1", Get("foo"));
- Compact("a", "z");
- const size_t num_files = CountFiles();
- // Force out-of-space errors
- env_->drop_writes_.store(true, std::memory_order_release);
- env_->sleep_counter_.Reset();
- env_->SetMockSleep();
- for (int i = 0; i < 5; i++) {
- if (option_config_ != kUniversalCompactionMultiLevel &&
- option_config_ != kUniversalSubcompactions) {
- for (int level = 0; level < dbfull()->NumberLevels(); level++) {
- if (level > 0 && level == dbfull()->NumberLevels() - 1) {
- break;
- }
- Status s =
- dbfull()->TEST_CompactRange(level, nullptr, nullptr, nullptr,
- true /* disallow trivial move */);
- ASSERT_TRUE(s.ok() || s.IsCorruption());
- }
- } else {
- Status s =
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_TRUE(s.ok() || s.IsCorruption());
- }
- }
- std::string property_value;
- ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
- ASSERT_EQ("5", property_value);
- env_->drop_writes_.store(false, std::memory_order_release);
- const size_t count = CountFiles();
- ASSERT_LT(count, num_files + 3);
- // Check that compaction attempts slept after errors
- // TODO @krad: Figure out why ASSERT_EQ 5 keeps failing in certain compiler
- // versions
- ASSERT_GE(env_->sleep_counter_.Read(), 4);
- } while (ChangeCompactOptions());
- }
- // Check background error counter bumped on flush failures.
- TEST_F(DBIOFailureTest, DropWritesFlush) {
- do {
- Options options = CurrentOptions();
- options.env = env_;
- options.max_background_flushes = 1;
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- // Force out-of-space errors
- env_->drop_writes_.store(true, std::memory_order_release);
- std::string property_value;
- // Background error count is 0 now.
- ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
- ASSERT_EQ("0", property_value);
- // ASSERT file is too short
- ASSERT_TRUE(dbfull()->TEST_FlushMemTable(true).IsCorruption());
- ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
- ASSERT_EQ("1", property_value);
- env_->drop_writes_.store(false, std::memory_order_release);
- } while (ChangeCompactOptions());
- }
- // Check that CompactRange() returns failure if there is not enough space left
- // on device
- TEST_F(DBIOFailureTest, NoSpaceCompactRange) {
- do {
- Options options = CurrentOptions();
- options.env = env_;
- options.disable_auto_compactions = true;
- Reopen(options);
- // generate 5 tables
- for (int i = 0; i < 5; ++i) {
- ASSERT_OK(Put(Key(i), Key(i) + "v"));
- ASSERT_OK(Flush());
- }
- // Force out-of-space errors
- env_->no_space_.store(true, std::memory_order_release);
- Status s = dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
- true /* disallow trivial move */);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_TRUE(s.IsNoSpace());
- env_->no_space_.store(false, std::memory_order_release);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBIOFailureTest, NonWritableFileSystem) {
- do {
- Options options = CurrentOptions();
- options.write_buffer_size = 4096;
- options.arena_block_size = 4096;
- options.env = env_;
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- env_->non_writeable_rate_.store(100);
- std::string big(100000, 'x');
- int errors = 0;
- for (int i = 0; i < 20; i++) {
- if (!Put("foo", big).ok()) {
- errors++;
- env_->SleepForMicroseconds(100000);
- }
- }
- ASSERT_GT(errors, 0);
- env_->non_writeable_rate_.store(0);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBIOFailureTest, ManifestWriteError) {
- // Test for the following problem:
- // (a) Compaction produces file F
- // (b) Log record containing F is written to MANIFEST file, but Sync() fails
- // (c) GC deletes F
- // (d) After reopening DB, reads fail since deleted F is named in log record
- // We iterate twice. In the second iteration, everything is the
- // same except the log record never makes it to the MANIFEST file.
- for (int iter = 0; iter < 2; iter++) {
- std::atomic<bool>* error_type = (iter == 0) ? &env_->manifest_sync_error_
- : &env_->manifest_write_error_;
- // Insert foo=>bar mapping
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_EQ("bar", Get("foo"));
- // Memtable compaction (will succeed)
- ASSERT_OK(Flush());
- ASSERT_EQ("bar", Get("foo"));
- const int last = 2;
- MoveFilesToLevel(2);
- ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
- // Merging compaction (will fail)
- error_type->store(true, std::memory_order_release);
- ASSERT_NOK(
- dbfull()->TEST_CompactRange(last, nullptr, nullptr)); // Should fail
- ASSERT_EQ("bar", Get("foo"));
- error_type->store(false, std::memory_order_release);
- // Since paranoid_checks=true, writes should fail
- ASSERT_NOK(Put("foo2", "bar2"));
- // Recovery: should not lose data
- ASSERT_EQ("bar", Get("foo"));
- // Try again with paranoid_checks=false
- Close();
- options.paranoid_checks = false;
- Reopen(options);
- // Merging compaction (will fail)
- error_type->store(true, std::memory_order_release);
- Status s =
- dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
- if (iter == 0) {
- ASSERT_OK(s);
- } else {
- ASSERT_TRUE(s.IsIOError());
- }
- ASSERT_EQ("bar", Get("foo"));
- // Recovery: should not lose data
- error_type->store(false, std::memory_order_release);
- Reopen(options);
- ASSERT_EQ("bar", Get("foo"));
- // Since paranoid_checks=false, writes should succeed
- ASSERT_OK(Put("foo2", "bar2"));
- ASSERT_EQ("bar", Get("foo"));
- ASSERT_EQ("bar2", Get("foo2"));
- }
- }
- TEST_F(DBIOFailureTest, PutFailsParanoid) {
- // Test the following:
- // (a) A random put fails in paranoid mode (simulate by sync fail)
- // (b) All other puts have to fail, even if writes would succeed
- // (c) All of that should happen ONLY if paranoid_checks = true
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo1", "bar1"));
- // simulate error
- env_->log_write_error_.store(true, std::memory_order_release);
- ASSERT_NOK(Put(1, "foo2", "bar2"));
- env_->log_write_error_.store(false, std::memory_order_release);
- // the next put should fail, too
- ASSERT_NOK(Put(1, "foo3", "bar3"));
- // but we're still able to read
- ASSERT_EQ("bar", Get(1, "foo"));
- // do the same thing with paranoid checks off
- options.paranoid_checks = false;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo1", "bar1"));
- // simulate error
- env_->log_write_error_.store(true, std::memory_order_release);
- ASSERT_NOK(Put(1, "foo2", "bar2"));
- env_->log_write_error_.store(false, std::memory_order_release);
- // the next put should NOT fail
- ASSERT_OK(Put(1, "foo3", "bar3"));
- }
- #if !(defined NDEBUG) || !defined(OS_WIN)
- TEST_F(DBIOFailureTest, FlushSstRangeSyncError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.write_buffer_size = 256 * 1024 * 1024;
- options.writable_file_max_buffer_size = 128 * 1024;
- options.bytes_per_sync = 128 * 1024;
- options.level0_file_num_compaction_trigger = 4;
- options.memtable_factory.reset(test::NewSpecialSkipListFactory(10));
- BlockBasedTableOptions table_options;
- table_options.filter_policy.reset(NewBloomFilterPolicy(10));
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- const char* io_error_msg = "range sync dummy error";
- std::atomic<int> range_sync_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
- if (range_sync_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Random rnd(301);
- std::string rnd_str =
- rnd.RandomString(static_cast<int>(options.bytes_per_sync / 2));
- std::string rnd_str_512kb = rnd.RandomString(512 * 1024);
- ASSERT_OK(Put(1, "foo", "bar"));
- // First 1MB doesn't get range synced
- ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
- ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
- ASSERT_OK(Put(1, "foo1_1", rnd_str));
- ASSERT_OK(Put(1, "foo1_2", rnd_str));
- ASSERT_OK(Put(1, "foo1_3", rnd_str));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Put(1, "foo3_1", rnd_str));
- ASSERT_OK(Put(1, "foo3_2", rnd_str));
- ASSERT_OK(Put(1, "foo3_3", rnd_str));
- ASSERT_OK(Put(1, "foo4", "bar"));
- Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as flush failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar", Get(1, "foo"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_GE(1, range_sync_called.load());
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar", Get(1, "foo"));
- }
- TEST_F(DBIOFailureTest, CompactSstRangeSyncError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.write_buffer_size = 256 * 1024 * 1024;
- options.writable_file_max_buffer_size = 128 * 1024;
- options.bytes_per_sync = 128 * 1024;
- options.level0_file_num_compaction_trigger = 2;
- options.target_file_size_base = 256 * 1024 * 1024;
- options.disable_auto_compactions = true;
- BlockBasedTableOptions table_options;
- table_options.filter_policy.reset(NewBloomFilterPolicy(10));
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- Random rnd(301);
- std::string rnd_str =
- rnd.RandomString(static_cast<int>(options.bytes_per_sync / 2));
- std::string rnd_str_512kb = rnd.RandomString(512 * 1024);
- ASSERT_OK(Put(1, "foo", "bar"));
- // First 1MB doesn't get range synced
- ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
- ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
- ASSERT_OK(Put(1, "foo1_1", rnd_str));
- ASSERT_OK(Put(1, "foo1_2", rnd_str));
- ASSERT_OK(Put(1, "foo1_3", rnd_str));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo3_1", rnd_str));
- ASSERT_OK(Put(1, "foo3_2", rnd_str));
- ASSERT_OK(Put(1, "foo3_3", rnd_str));
- ASSERT_OK(Put(1, "foo4", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
- const char* io_error_msg = "range sync dummy error";
- std::atomic<int> range_sync_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
- if (range_sync_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(dbfull()->SetOptions(handles_[1],
- {
- {"disable_auto_compactions", "false"},
- }));
- Status s = dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as flush failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar", Get(1, "foo"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_GE(1, range_sync_called.load());
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar", Get(1, "foo"));
- }
- TEST_F(DBIOFailureTest, FlushSstCloseError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.level0_file_num_compaction_trigger = 4;
- options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- const char* io_error_msg = "close dummy error";
- std::atomic<int> close_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::Close", [&](void* arg) {
- if (close_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo1", "bar1"));
- ASSERT_OK(Put(1, "foo", "bar2"));
- Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as flush failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar2", Get(1, "foo"));
- ASSERT_EQ("bar1", Get(1, "foo1"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar2", Get(1, "foo"));
- ASSERT_EQ("bar1", Get(1, "foo1"));
- }
- TEST_F(DBIOFailureTest, CompactionSstCloseError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.level0_file_num_compaction_trigger = 2;
- options.disable_auto_compactions = true;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "bar2"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "bar3"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- const char* io_error_msg = "close dummy error";
- std::atomic<int> close_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::Close", [&](void* arg) {
- if (close_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(dbfull()->SetOptions(handles_[1],
- {
- {"disable_auto_compactions", "false"},
- }));
- Status s = dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as compaction failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar3", Get(1, "foo"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar3", Get(1, "foo"));
- }
- TEST_F(DBIOFailureTest, FlushSstSyncError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.use_fsync = false;
- options.level0_file_num_compaction_trigger = 4;
- options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- const char* io_error_msg = "sync dummy error";
- std::atomic<int> sync_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::Sync", [&](void* arg) {
- if (sync_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo1", "bar1"));
- ASSERT_OK(Put(1, "foo", "bar2"));
- Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as flush failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar2", Get(1, "foo"));
- ASSERT_EQ("bar1", Get(1, "foo1"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar2", Get(1, "foo"));
- ASSERT_EQ("bar1", Get(1, "foo1"));
- }
- TEST_F(DBIOFailureTest, CompactionSstSyncError) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.error_if_exists = false;
- options.paranoid_checks = true;
- options.level0_file_num_compaction_trigger = 2;
- options.disable_auto_compactions = true;
- options.use_fsync = false;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "bar2"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "bar3"));
- ASSERT_OK(Put(1, "foo2", "bar"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- const char* io_error_msg = "sync dummy error";
- std::atomic<int> sync_called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SpecialEnv::SStableFile::Sync", [&](void* arg) {
- if (sync_called.fetch_add(1) == 0) {
- Status* st = static_cast<Status*>(arg);
- *st = Status::IOError(io_error_msg);
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(dbfull()->SetOptions(handles_[1],
- {
- {"disable_auto_compactions", "false"},
- }));
- Status s = dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(s.IsIOError());
- ASSERT_STREQ(s.getState(), io_error_msg);
- // Following writes should fail as compaction failed.
- ASSERT_NOK(Put(1, "foo2", "bar3"));
- ASSERT_EQ("bar3", Get(1, "foo"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("bar3", Get(1, "foo"));
- }
- #endif // !(defined NDEBUG) || !defined(OS_WIN)
- class DBIOCorruptionTest
- : public DBIOFailureTest,
- public testing::WithParamInterface<std::tuple<bool, bool, bool>> {
- public:
- DBIOCorruptionTest() : DBIOFailureTest() {
- BlockBasedTableOptions bbto;
- options_ = CurrentOptions();
- options_.statistics = CreateDBStatistics();
- base_env_ = env_;
- EXPECT_NE(base_env_, nullptr);
- fs_.reset(new CorruptionFS(base_env_->GetFileSystem(),
- std::get<0>(GetParam()),
- std::get<2>(GetParam())));
- env_guard_ = NewCompositeEnv(fs_);
- options_.env = env_guard_.get();
- bbto.num_file_reads_for_auto_readahead = 0;
- options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
- options_.disable_auto_compactions = true;
- options_.max_file_opening_threads = 0;
- Reopen(options_);
- }
- ~DBIOCorruptionTest() {
- Close();
- db_ = nullptr;
- }
- Status ReopenDB() { return TryReopen(options_); }
- Statistics* stats() { return options_.statistics.get(); }
- protected:
- std::unique_ptr<Env> env_guard_;
- std::shared_ptr<CorruptionFS> fs_;
- Env* base_env_;
- Options options_;
- };
- TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(Flush());
- fs->SetCorruptionTrigger(1);
- std::string val;
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- Status s = dbfull()->Get(ReadOptions(), "key1", &val);
- if (std::get<2>(GetParam())) {
- ASSERT_OK(s);
- ASSERT_EQ(val, "val1");
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- } else {
- ASSERT_TRUE(s.IsCorruption());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- }
- TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(Flush());
- fs->SetCorruptionTrigger(1);
- ReadOptions ro;
- ro.readahead_size = 65536;
- ro.async_io = std::get<1>(GetParam());
- Iterator* iter = dbfull()->NewIterator(ro);
- iter->SeekToFirst();
- while (iter->status().ok() && iter->Valid()) {
- iter->Next();
- }
- if (std::get<2>(GetParam())) {
- ASSERT_OK(iter->status());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- } else {
- ASSERT_TRUE(iter->status().IsCorruption());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- delete iter;
- }
- TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(Put("key2", "val2"));
- ASSERT_OK(Flush());
- fs->SetCorruptionTrigger(1);
- std::vector<std::string> keystr{"key1", "key2"};
- std::vector<Slice> keys{Slice(keystr[0]), Slice(keystr[1])};
- std::vector<PinnableSlice> values(keys.size());
- std::vector<Status> statuses(keys.size());
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data());
- if (std::get<2>(GetParam())) {
- ASSERT_EQ(values[0].ToString(), "val1");
- ASSERT_EQ(values[1].ToString(), "val2");
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- } else {
- ASSERT_TRUE(statuses[0].IsCorruption());
- ASSERT_TRUE(statuses[1].IsCorruption());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- }
- TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(Put("key3", "val3"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("key2", "val2"));
- ASSERT_OK(Flush());
- fs->SetCorruptionTrigger(1);
- Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- if (std::get<2>(GetParam())) {
- ASSERT_OK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- std::string val;
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- ASSERT_OK(dbfull()->Get(ro, "key1", &val));
- ASSERT_EQ(val, "val1");
- } else {
- ASSERT_TRUE(s.IsCorruption());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- }
- TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- fs->SetCorruptionTrigger(1);
- Status s = Flush();
- if (std::get<2>(GetParam())) {
- ASSERT_OK(s);
- ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- std::string val;
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- ASSERT_OK(dbfull()->Get(ro, "key1", &val));
- ASSERT_EQ(val, "val1");
- } else {
- ASSERT_NOK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- }
- TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(Flush());
- SyncPoint::GetInstance()->SetCallBack(
- "VersionSet::Recover:StartManifestRead",
- [&](void* /*arg*/) { fs->SetCorruptionTrigger(0); });
- SyncPoint::GetInstance()->EnableProcessing();
- if (std::get<2>(GetParam())) {
- ASSERT_OK(ReopenDB());
- ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- } else {
- ASSERT_EQ(ReopenDB(), Status::Corruption());
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
- Random rnd(300);
- bool retry = false;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "ReadFooterFromFileInternal:0", [&](void* arg) {
- Slice* data = static_cast<Slice*>(arg);
- if (!retry) {
- std::memcpy(const_cast<char*>(data->data()),
- rnd.RandomString(static_cast<int>(data->size())).c_str(),
- data->size());
- retry = true;
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("key1", "val1"));
- Status s = Flush();
- if (std::get<2>(GetParam())) {
- ASSERT_OK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- std::string val;
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- ASSERT_OK(dbfull()->Get(ro, "key1", &val));
- ASSERT_EQ(val, "val1");
- } else {
- ASSERT_NOK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0);
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
- Random rnd(300);
- bool retry = false;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "ReadTablePropertiesHelper:0", [&](void* arg) {
- Slice* data = static_cast<Slice*>(arg);
- if (!retry) {
- std::memcpy(const_cast<char*>(data->data()),
- rnd.RandomString(static_cast<int>(data->size())).c_str(),
- data->size());
- retry = true;
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("key1", "val1"));
- Status s = Flush();
- if (std::get<2>(GetParam())) {
- ASSERT_OK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
- 1);
- std::string val;
- ReadOptions ro;
- ro.async_io = std::get<1>(GetParam());
- ASSERT_OK(dbfull()->Get(ro, "key1", &val));
- ASSERT_EQ(val, "val1");
- } else {
- ASSERT_NOK(s);
- ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(DBIOCorruptionTest, DBOpenReadCorruptionRetry) {
- if (!std::get<2>(GetParam())) {
- return;
- }
- CorruptionFS* fs =
- static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
- for (int sst = 0; sst < 3; ++sst) {
- for (int key = 0; key < 100; ++key) {
- std::stringstream ss;
- ss << std::setw(3) << 100 * sst + key;
- ASSERT_OK(Put("key" + ss.str(), "val" + ss.str()));
- }
- ASSERT_OK(Flush());
- }
- Close();
- // DB open will create table readers unless we reduce the table cache
- // capacity.
- // SanitizeOptions will set max_open_files to minimum of 20. Table cache
- // is allocated with max_open_files - 10 as capacity. So override
- // max_open_files to 11 so table cache capacity will become 1. This will
- // prevent file open during DB open and force the file to be opened
- // during MultiGet
- SyncPoint::GetInstance()->SetCallBack(
- "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
- int* max_open_files = (int*)arg;
- *max_open_files = 11;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // Progressively increase the IO count trigger for corruption, and verify
- // that it was retried
- int corruption_trigger = 1;
- fs->SetCorruptionTrigger(corruption_trigger);
- do {
- fs->SetCorruptionTrigger(corruption_trigger);
- ASSERT_OK(ReopenDB());
- for (int sst = 0; sst < 3; ++sst) {
- for (int key = 0; key < 100; ++key) {
- std::stringstream ss;
- ss << std::setw(3) << 100 * sst + key;
- ASSERT_EQ(Get("key" + ss.str()), "val" + ss.str());
- }
- }
- // Verify that the injected corruption was repaired
- ASSERT_TRUE(fs->VerifyRetry());
- corruption_trigger++;
- } while (fs->corruption_trigger() == INT_MAX);
- }
- // The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
- // 3. Retry with verify_and_reconstruct_read IOOption
- INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
- testing::Combine(testing::Bool(), testing::Bool(),
- testing::Bool()));
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- RegisterCustomObjects(argc, argv);
- return RUN_ALL_TESTS();
- }
|