| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright 2014 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- // This test uses a custom FileSystem to keep track of the state of a file
- // system the last "Sync". The data being written is cached in a "buffer".
- // Only when "Sync" is called, the data will be persistent. It can simulate
- // file data loss (or entire files) not protected by a "Sync". For any of the
- // FileSystem related operations, by specify the "IOStatus Error", a specific
- // error can be returned when file system is not activated.
- #include "utilities/fault_injection_fs.h"
- #include <algorithm>
- #include <cstdio>
- #include <functional>
- #include <utility>
- #include "env/composite_env_wrapper.h"
- #include "port/lang.h"
- #include "port/stack_trace.h"
- #include "rocksdb/env.h"
- #include "rocksdb/io_status.h"
- #include "rocksdb/types.h"
- #include "test_util/sync_point.h"
- #include "util/coding.h"
- #include "util/crc32c.h"
- #include "util/mutexlock.h"
- #include "util/random.h"
- #include "util/string_util.h"
- #include "util/xxhash.h"
- namespace ROCKSDB_NAMESPACE {
- const std::string kNewFileNoOverwrite;
- // Assume a filename, and not a directory name like "/foo/bar/"
- std::string TestFSGetDirName(const std::string filename) {
- size_t found = filename.find_last_of("/\\");
- if (found == std::string::npos) {
- return "";
- } else {
- return filename.substr(0, found);
- }
- }
- // Trim the tailing "/" in the end of `str`
- std::string TestFSTrimDirname(const std::string& str) {
- size_t found = str.find_last_not_of('/');
- if (found == std::string::npos) {
- return str;
- }
- return str.substr(0, found + 1);
- }
- // Return pair <parent directory name, file name> of a full path.
- std::pair<std::string, std::string> TestFSGetDirAndName(
- const std::string& name) {
- std::string dirname = TestFSGetDirName(name);
- std::string fname = name.substr(dirname.size() + 1);
- return std::make_pair(dirname, fname);
- }
- // Calculate the checksum of the data with corresponding checksum
- // type. If name does not match, no checksum is returned.
- void CalculateTypedChecksum(const ChecksumType& checksum_type, const char* data,
- size_t size, std::string* checksum) {
- if (checksum_type == ChecksumType::kCRC32c) {
- uint32_t v_crc32c = crc32c::Extend(0, data, size);
- PutFixed32(checksum, v_crc32c);
- return;
- } else if (checksum_type == ChecksumType::kxxHash) {
- uint32_t v = XXH32(data, size, 0);
- PutFixed32(checksum, v);
- }
- }
- IOStatus FSFileState::DropUnsyncedData() {
- buffer_.resize(0);
- return IOStatus::OK();
- }
- IOStatus FSFileState::DropRandomUnsyncedData(Random* rand) {
- int range = static_cast<int>(buffer_.size());
- size_t truncated_size = static_cast<size_t>(rand->Uniform(range));
- buffer_.resize(truncated_size);
- return IOStatus::OK();
- }
- IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!s.ok()) {
- return s;
- }
- fs_->SyncDir(dirname_);
- s = dir_->Fsync(options, dbg);
- return s;
- }
- IOStatus TestFSDirectory::Close(const IOOptions& options, IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!s.ok()) {
- return s;
- }
- s = dir_->Close(options, dbg);
- return s;
- }
- IOStatus TestFSDirectory::FsyncWithDirOptions(
- const IOOptions& options, IODebugContext* dbg,
- const DirFsyncOptions& dir_fsync_options) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!s.ok()) {
- return s;
- }
- fs_->SyncDir(dirname_);
- s = dir_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
- return s;
- }
- TestFSWritableFile::TestFSWritableFile(const std::string& fname,
- const FileOptions& file_opts,
- std::unique_ptr<FSWritableFile>&& f,
- FaultInjectionTestFS* fs)
- : state_(fname),
- file_opts_(file_opts),
- target_(std::move(f)),
- writable_file_opened_(true),
- fs_(fs),
- unsync_data_loss_(fs_->InjectUnsyncedDataLoss()) {
- assert(target_ != nullptr);
- assert(state_.pos_at_last_append_ == 0);
- assert(state_.pos_at_last_sync_ == 0);
- }
- TestFSWritableFile::~TestFSWritableFile() {
- if (writable_file_opened_) {
- Close(IOOptions(), nullptr).PermitUncheckedError();
- }
- }
- IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kWrite, options, state_.filename_,
- FaultInjectionTestFS::ErrorOperation::kAppend);
- if (!s.ok()) {
- return s;
- }
- if (target_->use_direct_io() || !unsync_data_loss_) {
- // TODO(hx235): buffer data for direct IO write to simulate data loss like
- // non-direct IO write
- s = target_->Append(data, options, dbg);
- } else {
- state_.buffer_.append(data.data(), data.size());
- }
- if (s.ok()) {
- state_.pos_at_last_append_ += data.size();
- fs_->WritableFileAppended(state_);
- }
- return s;
- }
- // By setting the IngestDataCorruptionBeforeWrite(), the data corruption is
- // simulated.
- IOStatus TestFSWritableFile::Append(
- const Slice& data, const IOOptions& options,
- const DataVerificationInfo& verification_info, IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- if (fs_->ShouldDataCorruptionBeforeWrite()) {
- return IOStatus::Corruption("Data is corrupted!");
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kWrite, options, state_.filename_,
- FaultInjectionTestFS::ErrorOperation::kAppend);
- if (!s.ok()) {
- return s;
- }
- // Calculate the checksum
- std::string checksum;
- CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
- data.size(), &checksum);
- if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
- checksum != verification_info.checksum.ToString()) {
- std::string msg =
- "Data is corrupted! Origin data checksum: " +
- verification_info.checksum.ToString(true) +
- "current data checksum: " + Slice(checksum).ToString(true);
- return IOStatus::Corruption(msg);
- }
- if (target_->use_direct_io() || !unsync_data_loss_) {
- // TODO(hx235): buffer data for direct IO write to simulate data loss like
- // non-direct IO write
- s = target_->Append(data, options, dbg);
- } else {
- state_.buffer_.append(data.data(), data.size());
- }
- if (s.ok()) {
- state_.pos_at_last_append_ += data.size();
- fs_->WritableFileAppended(state_);
- }
- return s;
- }
- IOStatus TestFSWritableFile::Truncate(uint64_t size, const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
- options, state_.filename_);
- if (!s.ok()) {
- return s;
- }
- s = target_->Truncate(size, options, dbg);
- if (s.ok()) {
- state_.pos_at_last_append_ = size;
- }
- return s;
- }
- IOStatus TestFSWritableFile::PositionedAppend(const Slice& data,
- uint64_t offset,
- const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- if (fs_->ShouldDataCorruptionBeforeWrite()) {
- return IOStatus::Corruption("Data is corrupted!");
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kWrite, options, state_.filename_,
- FaultInjectionTestFS::ErrorOperation::kPositionedAppend);
- if (!s.ok()) {
- return s;
- }
- // TODO(hx235): buffer data for direct IO write to simulate data loss like
- // non-direct IO write
- s = target_->PositionedAppend(data, offset, options, dbg);
- if (s.ok()) {
- state_.pos_at_last_append_ = offset + data.size();
- fs_->WritableFileAppended(state_);
- }
- return s;
- }
- IOStatus TestFSWritableFile::PositionedAppend(
- const Slice& data, uint64_t offset, const IOOptions& options,
- const DataVerificationInfo& verification_info, IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- if (fs_->ShouldDataCorruptionBeforeWrite()) {
- return IOStatus::Corruption("Data is corrupted!");
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kWrite, options, state_.filename_,
- FaultInjectionTestFS::ErrorOperation::kPositionedAppend);
- if (!s.ok()) {
- return s;
- }
- // Calculate the checksum
- std::string checksum;
- CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
- data.size(), &checksum);
- if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
- checksum != verification_info.checksum.ToString()) {
- std::string msg =
- "Data is corrupted! Origin data checksum: " +
- verification_info.checksum.ToString(true) +
- "current data checksum: " + Slice(checksum).ToString(true);
- return IOStatus::Corruption(msg);
- }
- // TODO(hx235): buffer data for direct IO write to simulate data loss like
- // non-direct IO write
- s = target_->PositionedAppend(data, offset, options, dbg);
- if (s.ok()) {
- state_.pos_at_last_append_ = offset + data.size();
- fs_->WritableFileAppended(state_);
- }
- return s;
- }
- IOStatus TestFSWritableFile::Close(const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- fs_->WritableFileClosed(state_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus io_s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!io_s.ok()) {
- return io_s;
- }
- writable_file_opened_ = false;
- // Drop buffered data that was never synced because close is not a syncing
- // mechanism in POSIX file semantics.
- state_.buffer_.resize(0);
- io_s = target_->Close(options, dbg);
- return io_s;
- }
- IOStatus TestFSWritableFile::Flush(const IOOptions&, IODebugContext*) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- return IOStatus::OK();
- }
- IOStatus TestFSWritableFile::Sync(const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- if (target_->use_direct_io()) {
- // For Direct IO mode, we don't buffer anything in TestFSWritableFile.
- // So just return
- return IOStatus::OK();
- }
- IOStatus io_s = target_->Append(state_.buffer_, options, dbg);
- state_.buffer_.resize(0);
- // Ignore sync errors
- target_->Sync(options, dbg).PermitUncheckedError();
- state_.pos_at_last_sync_ = state_.pos_at_last_append_;
- fs_->WritableFileSynced(state_);
- return io_s;
- }
- IOStatus TestFSWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
- const IOOptions& options,
- IODebugContext* dbg) {
- MutexLock l(&mutex_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- // Assumes caller passes consecutive byte ranges.
- uint64_t sync_limit = offset + nbytes;
- IOStatus io_s;
- if (sync_limit < state_.pos_at_last_sync_) {
- return io_s;
- }
- uint64_t num_to_sync = std::min(static_cast<uint64_t>(state_.buffer_.size()),
- sync_limit - state_.pos_at_last_sync_);
- Slice buf_to_sync(state_.buffer_.data(), num_to_sync);
- io_s = target_->Append(buf_to_sync, options, dbg);
- state_.buffer_ = state_.buffer_.substr(num_to_sync);
- // Ignore sync errors
- target_->RangeSync(offset, nbytes, options, dbg).PermitUncheckedError();
- state_.pos_at_last_sync_ = offset + num_to_sync;
- fs_->WritableFileSynced(state_);
- return io_s;
- }
- TestFSRandomRWFile::TestFSRandomRWFile(const std::string& fname,
- std::unique_ptr<FSRandomRWFile>&& f,
- FaultInjectionTestFS* fs)
- : fname_(fname), target_(std::move(f)), file_opened_(true), fs_(fs) {
- assert(target_ != nullptr);
- }
- TestFSRandomRWFile::~TestFSRandomRWFile() {
- if (file_opened_) {
- Close(IOOptions(), nullptr).PermitUncheckedError();
- }
- }
- IOStatus TestFSRandomRWFile::Write(uint64_t offset, const Slice& data,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- return target_->Write(offset, data, options, dbg);
- }
- IOStatus TestFSRandomRWFile::Read(uint64_t offset, size_t n,
- const IOOptions& options, Slice* result,
- char* scratch, IODebugContext* dbg) const {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- // TODO (low priority): fs_->ReadUnsyncedData()
- return target_->Read(offset, n, options, result, scratch, dbg);
- }
- IOStatus TestFSRandomRWFile::Close(const IOOptions& options,
- IODebugContext* dbg) {
- fs_->RandomRWFileClosed(fname_);
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- file_opened_ = false;
- return target_->Close(options, dbg);
- }
- IOStatus TestFSRandomRWFile::Flush(const IOOptions& options,
- IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- return target_->Flush(options, dbg);
- }
- IOStatus TestFSRandomRWFile::Sync(const IOOptions& options,
- IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- return target_->Sync(options, dbg);
- }
- TestFSRandomAccessFile::TestFSRandomAccessFile(
- const std::string& fname, std::unique_ptr<FSRandomAccessFile>&& f,
- FaultInjectionTestFS* fs)
- : target_(std::move(f)), fs_(fs), is_sst_(EndsWith(fname, ".sst")) {
- assert(target_ != nullptr);
- }
- IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
- const IOOptions& options, Slice* result,
- char* scratch,
- IODebugContext* dbg) const {
- TEST_SYNC_POINT("FaultInjectionTestFS::RandomRead");
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, options, "",
- FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
- scratch, /*need_count_increase=*/true,
- /*fault_injected=*/nullptr);
- if (!s.ok()) {
- return s;
- }
- s = target_->Read(offset, n, options, result, scratch, dbg);
- // TODO (low priority): fs_->ReadUnsyncedData()
- return s;
- }
- IOStatus TestFSRandomAccessFile::ReadAsync(
- FSReadRequest& req, const IOOptions& opts,
- std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
- void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
- IOStatus res_status;
- FSReadRequest res;
- IOStatus s;
- if (!fs_->IsFilesystemActive()) {
- res_status = fs_->GetError();
- }
- if (res_status.ok()) {
- res_status = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, opts, "",
- FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
- use_direct_io(), req.scratch, /*need_count_increase=*/true,
- /*fault_injected=*/nullptr);
- }
- if (res_status.ok()) {
- s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
- // TODO (low priority): fs_->ReadUnsyncedData()
- } else {
- // If there's no injected error, then cb will be called asynchronously when
- // target_ actually finishes the read. But if there's an injected error, it
- // needs to immediately call cb(res, cb_arg) s since target_->ReadAsync()
- // isn't invoked at all.
- res.status = res_status;
- cb(res, cb_arg);
- }
- // We return ReadAsync()'s status intead of injected error status here since
- // the return status is not supposed to be the status of the actual IO (i.e,
- // the actual async read). The actual status of the IO will be passed to cb()
- // callback upon the actual read finishes or like above when injected error
- // happens.
- return s;
- }
- IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!fs_->IsFilesystemActive()) {
- return fs_->GetError();
- }
- IOStatus s = target_->MultiRead(reqs, num_reqs, options, dbg);
- // TODO (low priority): fs_->ReadUnsyncedData()
- bool injected_error = false;
- for (size_t i = 0; i < num_reqs; i++) {
- if (!reqs[i].status.ok()) {
- // Already seeing an error.
- break;
- }
- bool this_injected_error;
- reqs[i].status = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, options, "",
- FaultInjectionTestFS::ErrorOperation::kRead, &(reqs[i].result),
- use_direct_io(), reqs[i].scratch,
- /*need_count_increase=*/true,
- /*fault_injected=*/&this_injected_error);
- injected_error |= this_injected_error;
- }
- if (s.ok()) {
- s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, options, "",
- FaultInjectionTestFS::ErrorOperation::kMultiRead, nullptr,
- use_direct_io(), nullptr, /*need_count_increase=*/!injected_error,
- /*fault_injected=*/nullptr);
- }
- return s;
- }
- size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
- if (fs_->ShouldFailGetUniqueId()) {
- return 0;
- } else {
- return target_->GetUniqueId(id, max_size);
- }
- }
- IOStatus TestFSRandomAccessFile::GetFileSize(uint64_t* file_size) {
- if (is_sst_ && fs_->ShouldFailRandomAccessGetFileSizeSst()) {
- return IOStatus::IOError("FSRandomAccessFile::GetFileSize failed");
- } else {
- return target_->GetFileSize(file_size);
- }
- }
- namespace {
- // Modifies `result` to start at the beginning of `scratch` if not already,
- // copying data there if needed.
- void MoveToScratchIfNeeded(Slice* result, char* scratch) {
- if (result->data() != scratch) {
- // NOTE: might overlap, where result is later in scratch
- std::copy(result->data(), result->data() + result->size(), scratch);
- *result = Slice(scratch, result->size());
- }
- }
- } // namespace
- void FaultInjectionTestFS::ReadUnsynced(const std::string& fname,
- uint64_t offset, size_t n,
- Slice* result, char* scratch,
- int64_t* pos_at_last_sync) {
- *result = Slice(scratch, 0); // default empty result
- assert(*pos_at_last_sync == -1); // default "unknown"
- MutexLock l(&mutex_);
- auto it = db_file_state_.find(fname);
- if (it != db_file_state_.end()) {
- auto& st = it->second;
- *pos_at_last_sync = static_cast<int64_t>(st.pos_at_last_sync_);
- // Find overlap between [offset, offset + n) and
- // [*pos_at_last_sync, *pos_at_last_sync + st.buffer_.size())
- int64_t begin = std::max(static_cast<int64_t>(offset), *pos_at_last_sync);
- int64_t end =
- std::min(static_cast<int64_t>(offset + n),
- *pos_at_last_sync + static_cast<int64_t>(st.buffer_.size()));
- // Copy and return overlap if there is any
- if (begin < end) {
- size_t offset_in_buffer = static_cast<size_t>(begin - *pos_at_last_sync);
- size_t offset_in_scratch = static_cast<size_t>(begin - offset);
- std::copy_n(st.buffer_.data() + offset_in_buffer, end - begin,
- scratch + offset_in_scratch);
- *result = Slice(scratch + offset_in_scratch, end - begin);
- }
- }
- }
- IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
- Slice* result, char* scratch,
- IODebugContext* dbg) {
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, options, "",
- FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
- scratch, true /*need_count_increase=*/, nullptr /* fault_injected*/);
- if (!s.ok()) {
- return s;
- }
- // Some complex logic is needed to deal with concurrent write to the same
- // file, while keeping good performance (e.g. not holding FS mutex during
- // I/O op), especially in common cases.
- if (read_pos_ == target_read_pos_) {
- // Normal case: start by reading from underlying file
- s = target()->Read(n, options, result, scratch, dbg);
- if (!s.ok()) {
- return s;
- }
- target_read_pos_ += result->size();
- } else {
- // We must have previously read buffered data (unsynced) not written to
- // target. Deal with this case (and more) below.
- *result = {};
- }
- if (fs_->ReadUnsyncedData() && result->size() < n) {
- // We need to check if there's unsynced data to fill out the rest of the
- // read.
- // First, ensure target read data is in scratch for easy handling.
- MoveToScratchIfNeeded(result, scratch);
- assert(result->data() == scratch);
- // If we just did a target Read, we only want unsynced data after it
- // (target_read_pos_). Otherwise (e.g. if target is behind because of
- // unsynced data) we want unsynced data starting at the current read pos
- // (read_pos_, not yet updated).
- const uint64_t unsynced_read_pos = std::max(target_read_pos_, read_pos_);
- const size_t offset_from_read_pos =
- static_cast<size_t>(unsynced_read_pos - read_pos_);
- Slice unsynced_result;
- int64_t pos_at_last_sync = -1;
- fs_->ReadUnsynced(fname_, unsynced_read_pos, n - offset_from_read_pos,
- &unsynced_result, scratch + offset_from_read_pos,
- &pos_at_last_sync);
- assert(unsynced_result.data() >= scratch + offset_from_read_pos);
- assert(unsynced_result.data() < scratch + n);
- // Now, there are several cases to consider (some grouped together):
- if (pos_at_last_sync <= static_cast<int64_t>(unsynced_read_pos)) {
- // 1. We didn't get any unsynced data because nothing has been written
- // to the file beyond unsynced_read_pos (including untracked
- // pos_at_last_sync == -1)
- // 2. We got some unsynced data starting at unsynced_read_pos (possibly
- // on top of some synced data from target). We don't need to try reading
- // any more from target because we established a "point in time" for
- // completing this Read in which we read as much tail data (unsynced) as
- // we could.
- // We got pos_at_last_sync info if we got any unsynced data.
- assert(pos_at_last_sync >= 0 || unsynced_result.size() == 0);
- // Combined data is already lined up in scratch.
- assert(result->data() + result->size() == unsynced_result.data());
- assert(result->size() + unsynced_result.size() <= n);
- // Combine results
- *result = Slice(result->data(), result->size() + unsynced_result.size());
- } else {
- // 3. Any unsynced data we got was after unsynced_read_pos because the
- // file was synced some time since our last target Read (either from this
- // Read or a prior Read). We need to read more data from target to ensure
- // this Read is filled out, even though we might have already read some
- // (but not all due to a race). This code handles:
- //
- // * Catching up target after prior read(s) of unsynced data
- // * Racing Sync in another thread since we called target Read above
- //
- // And merging potentially three results together for this Read:
- // * The original target Read above
- // * The following (non-throw-away) target Read
- // * The ReadUnsynced above, which is always last if it returned data,
- // so that we have a "point in time" for completing this Read in which we
- // read as much tail data (unsynced) as we could.
- //
- // Deeper note about the race: we cannot just treat the original target
- // Read as a "point in time" view of available data in the file, because
- // there might have been unsynced data at that time, which became synced
- // data by the time we read unsynced data. That is the race we are
- // resolving with this "double check"-style code.
- const size_t supplemental_read_pos = unsynced_read_pos;
- // First, if there's any data from target that we know we would need to
- // throw away to catch up, try to do it.
- if (target_read_pos_ < supplemental_read_pos) {
- Slice throw_away_result;
- size_t throw_away_n = supplemental_read_pos - target_read_pos_;
- std::unique_ptr<char[]> throw_away_scratch{new char[throw_away_n]};
- s = target()->Read(throw_away_n, options, &throw_away_result,
- throw_away_scratch.get(), dbg);
- if (!s.ok()) {
- read_pos_ += result->size();
- return s;
- }
- target_read_pos_ += throw_away_result.size();
- if (target_read_pos_ < supplemental_read_pos) {
- // Because of pos_at_last_sync > supplemental_read_pos, we should
- // have been able to catch up
- read_pos_ += result->size();
- return IOStatus::IOError(
- "Unexpected truncation or short read of file " + fname_);
- }
- }
- // Now we can do a productive supplemental Read from target
- assert(target_read_pos_ == supplemental_read_pos);
- Slice supplemental_result;
- size_t supplemental_n =
- unsynced_result.size() == 0
- ? n - offset_from_read_pos
- : unsynced_result.data() - (scratch + offset_from_read_pos);
- s = target()->Read(supplemental_n, options, &supplemental_result,
- scratch + offset_from_read_pos, dbg);
- if (!s.ok()) {
- read_pos_ += result->size();
- return s;
- }
- target_read_pos_ += supplemental_result.size();
- MoveToScratchIfNeeded(&supplemental_result,
- scratch + offset_from_read_pos);
- // Combined data is already lined up in scratch.
- assert(result->data() + result->size() == supplemental_result.data());
- assert(unsynced_result.size() == 0 ||
- supplemental_result.data() + supplemental_result.size() ==
- unsynced_result.data());
- assert(result->size() + supplemental_result.size() +
- unsynced_result.size() <=
- n);
- // Combine results
- *result =
- Slice(result->data(), result->size() + supplemental_result.size() +
- unsynced_result.size());
- }
- }
- read_pos_ += result->size();
- return s;
- }
- IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
- const IOOptions& options,
- Slice* result, char* scratch,
- IODebugContext* dbg) {
- IOStatus s = fs_->MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, options, "",
- FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
- scratch, true /*need_count_increase=*/, nullptr /* fault_injected */);
- if (!s.ok()) {
- return s;
- }
- s = target()->PositionedRead(offset, n, options, result, scratch, dbg);
- // TODO (low priority): fs_->ReadUnsyncedData()
- return s;
- }
- IOStatus FaultInjectionTestFS::NewDirectory(
- const std::string& name, const IOOptions& options,
- std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {
- std::unique_ptr<FSDirectory> r;
- IOStatus io_s = target()->NewDirectory(name, options, &r, dbg);
- if (!io_s.ok()) {
- return io_s;
- }
- result->reset(
- new TestFSDirectory(this, TestFSTrimDirname(name), r.release()));
- return IOStatus::OK();
- }
- IOStatus FaultInjectionTestFS::FileExists(const std::string& fname,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->FileExists(fname, options, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::GetChildren(const std::string& dir,
- const IOOptions& options,
- std::vector<std::string>* result,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->GetChildren(dir, options, result, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::GetChildrenFileAttributes(
- const std::string& dir, const IOOptions& options,
- std::vector<FileAttributes>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::NewWritableFile(
- const std::string& fname, const FileOptions& file_opts,
- std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- if (IsFilesystemDirectWritable()) {
- return target()->NewWritableFile(fname, file_opts, result, dbg);
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kWrite, file_opts.io_options, fname,
- FaultInjectionTestFS::ErrorOperation::kOpen);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->NewWritableFile(fname, file_opts, result, dbg);
- if (io_s.ok()) {
- result->reset(
- new TestFSWritableFile(fname, file_opts, std::move(*result), this));
- // WritableFileWriter* file is opened
- // again then it will be truncated - so forget our saved state.
- UntrackFile(fname);
- {
- MutexLock l(&mutex_);
- open_managed_files_.insert(fname);
- auto dir_and_name = TestFSGetDirAndName(fname);
- auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
- // The new file could overwrite an old one. Here we simplify
- // the implementation by assuming no file of this name after
- // dropping unsynced files.
- list[dir_and_name.second] = kNewFileNoOverwrite;
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::ReopenWritableFile(
- const std::string& fname, const FileOptions& file_opts,
- std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- if (IsFilesystemDirectWritable()) {
- return target()->ReopenWritableFile(fname, file_opts, result, dbg);
- }
- IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
- file_opts.io_options, fname);
- if (!io_s.ok()) {
- return io_s;
- }
- bool exists;
- IOStatus exists_s =
- target()->FileExists(fname, IOOptions(), nullptr /* dbg */);
- if (exists_s.IsNotFound()) {
- exists = false;
- } else if (exists_s.ok()) {
- exists = true;
- } else {
- io_s = exists_s;
- exists = false;
- }
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
- // Only track files we created. Files created outside of this
- // `FaultInjectionTestFS` are not eligible for tracking/data dropping
- // (for example, they may contain data a previous db_stress run expects to
- // be recovered). This could be extended to track/drop data appended once
- // the file is under `FaultInjectionTestFS`'s control.
- if (io_s.ok()) {
- bool should_track;
- {
- MutexLock l(&mutex_);
- if (db_file_state_.find(fname) != db_file_state_.end()) {
- // It was written by this `FileSystem` earlier.
- assert(exists);
- should_track = true;
- } else if (!exists) {
- // It was created by this `FileSystem` just now.
- should_track = true;
- open_managed_files_.insert(fname);
- auto dir_and_name = TestFSGetDirAndName(fname);
- auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
- list[dir_and_name.second] = kNewFileNoOverwrite;
- } else {
- should_track = false;
- }
- }
- if (should_track) {
- result->reset(
- new TestFSWritableFile(fname, file_opts, std::move(*result), this));
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::ReuseWritableFile(
- const std::string& fname, const std::string& old_fname,
- const FileOptions& file_opts, std::unique_ptr<FSWritableFile>* result,
- IODebugContext* dbg) {
- IOStatus s = RenameFile(old_fname, fname, file_opts.io_options, dbg);
- if (!s.ok()) {
- return s;
- }
- return NewWritableFile(fname, file_opts, result, dbg);
- }
- IOStatus FaultInjectionTestFS::NewRandomRWFile(
- const std::string& fname, const FileOptions& file_opts,
- std::unique_ptr<FSRandomRWFile>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- if (IsFilesystemDirectWritable()) {
- return target()->NewRandomRWFile(fname, file_opts, result, dbg);
- }
- IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
- file_opts.io_options, fname);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
- if (io_s.ok()) {
- result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
- // WritableFileWriter* file is opened
- // again then it will be truncated - so forget our saved state.
- UntrackFile(fname);
- {
- MutexLock l(&mutex_);
- open_managed_files_.insert(fname);
- auto dir_and_name = TestFSGetDirAndName(fname);
- auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
- // It could be overwriting an old file, but we simplify the
- // implementation by ignoring it.
- list[dir_and_name.second] = kNewFileNoOverwrite;
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::NewRandomAccessFile(
- const std::string& fname, const FileOptions& file_opts,
- std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, file_opts.io_options, fname,
- ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
- nullptr /* scratch */, true /*need_count_increase*/,
- nullptr /*fault_injected*/);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
- if (io_s.ok()) {
- result->reset(new TestFSRandomAccessFile(fname, std::move(*result), this));
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::NewSequentialFile(
- const std::string& fname, const FileOptions& file_opts,
- std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kRead, file_opts.io_options, fname,
- ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
- nullptr /* scratch */, true /*need_count_increase*/,
- nullptr /*fault_injected*/);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
- if (io_s.ok()) {
- result->reset(new TestFSSequentialFile(std::move(*result), this, fname));
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = FileSystemWrapper::DeleteFile(f, options, dbg);
- if (io_s.ok()) {
- UntrackFile(f);
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
- const IOOptions& options,
- uint64_t* file_size,
- IODebugContext* dbg) {
- if (EndsWith(f, ".sst") && ShouldFailFilesystemGetFileSizeSst()) {
- return IOStatus::IOError("FileSystem::GetFileSize failed");
- }
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->GetFileSize(f, options, file_size, dbg);
- if (!io_s.ok()) {
- return io_s;
- }
- if (ReadUnsyncedData()) {
- // Need to report flushed size, not synced size
- MutexLock l(&mutex_);
- auto it = db_file_state_.find(f);
- if (it != db_file_state_.end()) {
- *file_size = it->second.pos_at_last_append_;
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::GetFileModificationTime(const std::string& fname,
- const IOOptions& options,
- uint64_t* file_mtime,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->GetFileModificationTime(fname, options, file_mtime, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
- const std::string& t,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!io_s.ok()) {
- return io_s;
- }
- // We preserve contents of overwritten files up to a size threshold.
- // We could keep previous file in another name, but we need to worry about
- // garbage collect the those files. We do it if it is needed later.
- // We ignore I/O errors here for simplicity.
- std::string previous_contents = kNewFileNoOverwrite;
- if (target()->FileExists(t, IOOptions(), nullptr).ok()) {
- uint64_t file_size;
- if (target()->GetFileSize(t, IOOptions(), &file_size, nullptr).ok() &&
- file_size < 1024) {
- ReadFileToString(target(), t, &previous_contents).PermitUncheckedError();
- }
- }
- io_s = FileSystemWrapper::RenameFile(s, t, options, dbg);
- if (io_s.ok()) {
- {
- MutexLock l(&mutex_);
- if (db_file_state_.find(s) != db_file_state_.end()) {
- db_file_state_[t] = db_file_state_[s];
- db_file_state_.erase(s);
- }
- auto sdn = TestFSGetDirAndName(s);
- auto tdn = TestFSGetDirAndName(t);
- if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
- auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
- tlist[tdn.second] = previous_contents;
- }
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
- const std::string& t,
- const IOOptions& options,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s = MaybeInjectThreadLocalError(
- FaultInjectionIOType::kMetadataWrite, options);
- if (!io_s.ok()) {
- return io_s;
- }
- // Using the value in `dir_to_new_files_since_last_sync_` for the source file
- // may be a more reasonable choice.
- std::string previous_contents = kNewFileNoOverwrite;
- io_s = FileSystemWrapper::LinkFile(s, t, options, dbg);
- if (io_s.ok()) {
- {
- MutexLock l(&mutex_);
- if (!allow_link_open_file_ &&
- open_managed_files_.find(s) != open_managed_files_.end()) {
- fprintf(stderr, "Attempt to LinkFile while open for write: %s\n",
- s.c_str());
- abort();
- }
- if (db_file_state_.find(s) != db_file_state_.end()) {
- db_file_state_[t] = db_file_state_[s];
- }
- auto sdn = TestFSGetDirAndName(s);
- auto tdn = TestFSGetDirAndName(t);
- if (dir_to_new_files_since_last_sync_[sdn.first].find(sdn.second) !=
- dir_to_new_files_since_last_sync_[sdn.first].end()) {
- auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
- assert(tlist.find(tdn.second) == tlist.end());
- tlist[tdn.second] = previous_contents;
- }
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::NumFileLinks(const std::string& fname,
- const IOOptions& options,
- uint64_t* count,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->NumFileLinks(fname, options, count, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::AreFilesSame(const std::string& first,
- const std::string& second,
- const IOOptions& options, bool* res,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->AreFilesSame(first, second, options, res, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::GetAbsolutePath(const std::string& db_path,
- const IOOptions& options,
- std::string* output_path,
- IODebugContext* dbg) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->GetAbsolutePath(db_path, options, output_path, dbg);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::IsDirectory(const std::string& path,
- const IOOptions& options,
- bool* is_dir, IODebugContext* dgb) {
- if (!IsFilesystemActive()) {
- return GetError();
- }
- IOStatus io_s =
- MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = target()->IsDirectory(path, options, is_dir, dgb);
- return io_s;
- }
- IOStatus FaultInjectionTestFS::Poll(std::vector<void*>& io_handles,
- size_t min_completions) {
- return target()->Poll(io_handles, min_completions);
- }
- IOStatus FaultInjectionTestFS::AbortIO(std::vector<void*>& io_handles) {
- return target()->AbortIO(io_handles);
- }
- void FaultInjectionTestFS::RandomRWFileClosed(const std::string& fname) {
- MutexLock l(&mutex_);
- if (open_managed_files_.find(fname) != open_managed_files_.end()) {
- open_managed_files_.erase(fname);
- }
- }
- void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
- MutexLock l(&mutex_);
- if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
- db_file_state_[state.filename_] = state;
- open_managed_files_.erase(state.filename_);
- }
- }
- void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) {
- MutexLock l(&mutex_);
- if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
- if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
- db_file_state_.insert(std::make_pair(state.filename_, state));
- } else {
- db_file_state_[state.filename_] = state;
- }
- }
- }
- void FaultInjectionTestFS::WritableFileAppended(const FSFileState& state) {
- MutexLock l(&mutex_);
- if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
- if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
- db_file_state_.insert(std::make_pair(state.filename_, state));
- } else {
- db_file_state_[state.filename_] = state;
- }
- }
- }
- IOStatus FaultInjectionTestFS::DropUnsyncedFileData() {
- IOStatus io_s;
- MutexLock l(&mutex_);
- for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
- io_s.ok() && it != db_file_state_.end(); ++it) {
- FSFileState& fs_state = it->second;
- if (!fs_state.IsFullySynced()) {
- io_s = fs_state.DropUnsyncedData();
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::DropRandomUnsyncedFileData(Random* rnd) {
- IOStatus io_s;
- MutexLock l(&mutex_);
- for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
- io_s.ok() && it != db_file_state_.end(); ++it) {
- FSFileState& fs_state = it->second;
- if (!fs_state.IsFullySynced()) {
- io_s = fs_state.DropRandomUnsyncedData(rnd);
- }
- }
- return io_s;
- }
- IOStatus FaultInjectionTestFS::DeleteFilesCreatedAfterLastDirSync(
- const IOOptions& options, IODebugContext* dbg) {
- // Because DeleteFile access this container make a copy to avoid deadlock
- std::map<std::string, std::map<std::string, std::string>> map_copy;
- {
- MutexLock l(&mutex_);
- map_copy.insert(dir_to_new_files_since_last_sync_.begin(),
- dir_to_new_files_since_last_sync_.end());
- }
- for (auto& pair : map_copy) {
- for (auto& file_pair : pair.second) {
- if (file_pair.second == kNewFileNoOverwrite) {
- IOStatus io_s =
- DeleteFile(pair.first + "/" + file_pair.first, options, dbg);
- if (!io_s.ok()) {
- return io_s;
- }
- } else {
- IOOptions opts;
- IOStatus io_s =
- WriteStringToFile(target(), file_pair.second,
- pair.first + "/" + file_pair.first, true, opts);
- if (!io_s.ok()) {
- return io_s;
- }
- }
- }
- }
- return IOStatus::OK();
- }
- void FaultInjectionTestFS::ResetState() {
- MutexLock l(&mutex_);
- db_file_state_.clear();
- dir_to_new_files_since_last_sync_.clear();
- SetFilesystemActiveNoLock(true);
- }
- void FaultInjectionTestFS::UntrackFile(const std::string& f) {
- MutexLock l(&mutex_);
- auto dir_and_name = TestFSGetDirAndName(f);
- dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
- dir_and_name.second);
- db_file_state_.erase(f);
- open_managed_files_.erase(f);
- }
- IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError(
- const IOOptions& io_options, ErrorOperation op, Slice* result,
- bool direct_io, char* scratch, bool need_count_increase,
- bool* fault_injected) {
- bool dummy_bool;
- bool& ret_fault_injected = fault_injected ? *fault_injected : dummy_bool;
- ret_fault_injected = false;
- ErrorContext* ctx =
- static_cast<ErrorContext*>(injected_thread_local_read_error_.Get());
- if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
- ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity)) {
- return IOStatus::OK();
- }
- IOStatus ret;
- if (ctx->rand.OneIn(ctx->one_in)) {
- if (ctx->count == 0) {
- ctx->message = "";
- }
- if (need_count_increase) {
- ctx->count++;
- }
- if (ctx->callstack) {
- free(ctx->callstack);
- }
- ctx->callstack = port::SaveStack(&ctx->frames);
- std::stringstream msg;
- msg << FaultInjectionTestFS::kInjected << " ";
- if (op != ErrorOperation::kMultiReadSingleReq) {
- // Likely non-per read status code for MultiRead
- msg << "read error";
- ctx->message = msg.str();
- ret_fault_injected = true;
- ret = IOStatus::IOError(ctx->message);
- } else if (Random::GetTLSInstance()->OneIn(8)) {
- assert(result);
- // For a small chance, set the failure to status but turn the
- // result to be empty, which is supposed to be caught for a check.
- *result = Slice();
- msg << "empty result";
- ctx->message = msg.str();
- ret_fault_injected = true;
- ret = IOStatus::IOError(ctx->message);
- } else if (!direct_io && Random::GetTLSInstance()->OneIn(7) &&
- scratch != nullptr && result->data() == scratch) {
- assert(result);
- // With direct I/O, many extra bytes might be read so corrupting
- // one byte might not cause checksum mismatch. Skip checksum
- // corruption injection.
- // We only corrupt data if the result is filled to `scratch`. For other
- // cases, the data might not be able to be modified (e.g mmaped files)
- // or has unintended side effects.
- // For a small chance, set the failure to status but corrupt the
- // result in a way that checksum checking is supposed to fail.
- // Corrupt the last byte, which is supposed to be a checksum byte
- // It would work for CRC. Not 100% sure for xxhash and will adjust
- // if it is not the case.
- const_cast<char*>(result->data())[result->size() - 1]++;
- msg << "corrupt last byte";
- ctx->message = msg.str();
- ret_fault_injected = true;
- ret = IOStatus::IOError(ctx->message);
- } else {
- msg << "error result multiget single";
- ctx->message = msg.str();
- ret_fault_injected = true;
- ret = IOStatus::IOError(ctx->message);
- }
- }
- ret.SetRetryable(ctx->retryable);
- ret.SetDataLoss(ctx->has_data_loss);
- return ret;
- }
- bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name,
- uint64_t* number, FileType* type) {
- std::size_t found = file_name.find_last_of('/');
- std::string file = file_name.substr(found);
- return ParseFileName(file, number, type);
- }
- IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(
- FaultInjectionIOType type, const IOOptions& io_options,
- const std::string& file_name, ErrorOperation op, Slice* result,
- bool direct_io, char* scratch, bool need_count_increase,
- bool* fault_injected) {
- if (type == FaultInjectionIOType::kRead) {
- return MaybeInjectThreadLocalReadError(io_options, op, result, direct_io,
- scratch, need_count_increase,
- fault_injected);
- }
- ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
- if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
- ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity) ||
- (type == FaultInjectionIOType::kWrite &&
- ShouldExcludeFromWriteFaultInjection(file_name))) {
- return IOStatus::OK();
- }
- IOStatus ret;
- if (ctx->rand.OneIn(ctx->one_in)) {
- ctx->count++;
- if (ctx->callstack) {
- free(ctx->callstack);
- }
- ctx->callstack = port::SaveStack(&ctx->frames);
- ctx->message = GetErrorMessage(type, file_name, op);
- ret = IOStatus::IOError(ctx->message);
- ret.SetRetryable(ctx->retryable);
- ret.SetDataLoss(ctx->has_data_loss);
- if (type == FaultInjectionIOType::kWrite) {
- TEST_SYNC_POINT(
- "FaultInjectionTestFS::InjectMetadataWriteError:Injected");
- }
- }
- return ret;
- }
- void FaultInjectionTestFS::PrintInjectedThreadLocalErrorBacktrace(
- FaultInjectionIOType type) {
- #if defined(OS_LINUX)
- ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
- if (ctx) {
- if (type == FaultInjectionIOType::kRead) {
- fprintf(stderr, "Injected read error type = %d\n", ctx->type);
- }
- fprintf(stderr, "Message: %s\n", ctx->message.c_str());
- port::PrintAndFreeStack(ctx->callstack, ctx->frames);
- ctx->callstack = nullptr;
- }
- #else
- (void)type;
- #endif
- }
- } // namespace ROCKSDB_NAMESPACE
|