fault_injection_fs.cc 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright 2014 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. // This test uses a custom FileSystem to keep track of the state of a file
  10. // system the last "Sync". The data being written is cached in a "buffer".
  11. // Only when "Sync" is called, the data will be persistent. It can simulate
  12. // file data loss (or entire files) not protected by a "Sync". For any of the
  13. // FileSystem related operations, by specify the "IOStatus Error", a specific
  14. // error can be returned when file system is not activated.
  15. #include "utilities/fault_injection_fs.h"
  16. #include <algorithm>
  17. #include <cstdio>
  18. #include <functional>
  19. #include <utility>
  20. #include "env/composite_env_wrapper.h"
  21. #include "port/lang.h"
  22. #include "port/stack_trace.h"
  23. #include "rocksdb/env.h"
  24. #include "rocksdb/io_status.h"
  25. #include "rocksdb/types.h"
  26. #include "test_util/sync_point.h"
  27. #include "util/coding.h"
  28. #include "util/crc32c.h"
  29. #include "util/mutexlock.h"
  30. #include "util/random.h"
  31. #include "util/string_util.h"
  32. #include "util/xxhash.h"
  33. namespace ROCKSDB_NAMESPACE {
  34. const std::string kNewFileNoOverwrite;
  35. // Assume a filename, and not a directory name like "/foo/bar/"
  36. std::string TestFSGetDirName(const std::string filename) {
  37. size_t found = filename.find_last_of("/\\");
  38. if (found == std::string::npos) {
  39. return "";
  40. } else {
  41. return filename.substr(0, found);
  42. }
  43. }
  44. // Trim the tailing "/" in the end of `str`
  45. std::string TestFSTrimDirname(const std::string& str) {
  46. size_t found = str.find_last_not_of('/');
  47. if (found == std::string::npos) {
  48. return str;
  49. }
  50. return str.substr(0, found + 1);
  51. }
  52. // Return pair <parent directory name, file name> of a full path.
  53. std::pair<std::string, std::string> TestFSGetDirAndName(
  54. const std::string& name) {
  55. std::string dirname = TestFSGetDirName(name);
  56. std::string fname = name.substr(dirname.size() + 1);
  57. return std::make_pair(dirname, fname);
  58. }
  59. // Calculate the checksum of the data with corresponding checksum
  60. // type. If name does not match, no checksum is returned.
  61. void CalculateTypedChecksum(const ChecksumType& checksum_type, const char* data,
  62. size_t size, std::string* checksum) {
  63. if (checksum_type == ChecksumType::kCRC32c) {
  64. uint32_t v_crc32c = crc32c::Extend(0, data, size);
  65. PutFixed32(checksum, v_crc32c);
  66. return;
  67. } else if (checksum_type == ChecksumType::kxxHash) {
  68. uint32_t v = XXH32(data, size, 0);
  69. PutFixed32(checksum, v);
  70. }
  71. }
  72. IOStatus FSFileState::DropUnsyncedData() {
  73. buffer_.resize(0);
  74. return IOStatus::OK();
  75. }
  76. IOStatus FSFileState::DropRandomUnsyncedData(Random* rand) {
  77. int range = static_cast<int>(buffer_.size());
  78. size_t truncated_size = static_cast<size_t>(rand->Uniform(range));
  79. buffer_.resize(truncated_size);
  80. return IOStatus::OK();
  81. }
  82. IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
  83. if (!fs_->IsFilesystemActive()) {
  84. return fs_->GetError();
  85. }
  86. IOStatus s = fs_->MaybeInjectThreadLocalError(
  87. FaultInjectionIOType::kMetadataWrite, options);
  88. if (!s.ok()) {
  89. return s;
  90. }
  91. fs_->SyncDir(dirname_);
  92. s = dir_->Fsync(options, dbg);
  93. return s;
  94. }
  95. IOStatus TestFSDirectory::Close(const IOOptions& options, IODebugContext* dbg) {
  96. if (!fs_->IsFilesystemActive()) {
  97. return fs_->GetError();
  98. }
  99. IOStatus s = fs_->MaybeInjectThreadLocalError(
  100. FaultInjectionIOType::kMetadataWrite, options);
  101. if (!s.ok()) {
  102. return s;
  103. }
  104. s = dir_->Close(options, dbg);
  105. return s;
  106. }
  107. IOStatus TestFSDirectory::FsyncWithDirOptions(
  108. const IOOptions& options, IODebugContext* dbg,
  109. const DirFsyncOptions& dir_fsync_options) {
  110. if (!fs_->IsFilesystemActive()) {
  111. return fs_->GetError();
  112. }
  113. IOStatus s = fs_->MaybeInjectThreadLocalError(
  114. FaultInjectionIOType::kMetadataWrite, options);
  115. if (!s.ok()) {
  116. return s;
  117. }
  118. fs_->SyncDir(dirname_);
  119. s = dir_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
  120. return s;
  121. }
  122. TestFSWritableFile::TestFSWritableFile(const std::string& fname,
  123. const FileOptions& file_opts,
  124. std::unique_ptr<FSWritableFile>&& f,
  125. FaultInjectionTestFS* fs)
  126. : state_(fname),
  127. file_opts_(file_opts),
  128. target_(std::move(f)),
  129. writable_file_opened_(true),
  130. fs_(fs),
  131. unsync_data_loss_(fs_->InjectUnsyncedDataLoss()) {
  132. assert(target_ != nullptr);
  133. assert(state_.pos_at_last_append_ == 0);
  134. assert(state_.pos_at_last_sync_ == 0);
  135. }
  136. TestFSWritableFile::~TestFSWritableFile() {
  137. if (writable_file_opened_) {
  138. Close(IOOptions(), nullptr).PermitUncheckedError();
  139. }
  140. }
  141. IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions& options,
  142. IODebugContext* dbg) {
  143. MutexLock l(&mutex_);
  144. if (!fs_->IsFilesystemActive()) {
  145. return fs_->GetError();
  146. }
  147. IOStatus s = fs_->MaybeInjectThreadLocalError(
  148. FaultInjectionIOType::kWrite, options, state_.filename_,
  149. FaultInjectionTestFS::ErrorOperation::kAppend);
  150. if (!s.ok()) {
  151. return s;
  152. }
  153. if (target_->use_direct_io() || !unsync_data_loss_) {
  154. // TODO(hx235): buffer data for direct IO write to simulate data loss like
  155. // non-direct IO write
  156. s = target_->Append(data, options, dbg);
  157. } else {
  158. state_.buffer_.append(data.data(), data.size());
  159. }
  160. if (s.ok()) {
  161. state_.pos_at_last_append_ += data.size();
  162. fs_->WritableFileAppended(state_);
  163. }
  164. return s;
  165. }
  166. // By setting the IngestDataCorruptionBeforeWrite(), the data corruption is
  167. // simulated.
  168. IOStatus TestFSWritableFile::Append(
  169. const Slice& data, const IOOptions& options,
  170. const DataVerificationInfo& verification_info, IODebugContext* dbg) {
  171. MutexLock l(&mutex_);
  172. if (!fs_->IsFilesystemActive()) {
  173. return fs_->GetError();
  174. }
  175. if (fs_->ShouldDataCorruptionBeforeWrite()) {
  176. return IOStatus::Corruption("Data is corrupted!");
  177. }
  178. IOStatus s = fs_->MaybeInjectThreadLocalError(
  179. FaultInjectionIOType::kWrite, options, state_.filename_,
  180. FaultInjectionTestFS::ErrorOperation::kAppend);
  181. if (!s.ok()) {
  182. return s;
  183. }
  184. // Calculate the checksum
  185. std::string checksum;
  186. CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
  187. data.size(), &checksum);
  188. if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
  189. checksum != verification_info.checksum.ToString()) {
  190. std::string msg =
  191. "Data is corrupted! Origin data checksum: " +
  192. verification_info.checksum.ToString(true) +
  193. "current data checksum: " + Slice(checksum).ToString(true);
  194. return IOStatus::Corruption(msg);
  195. }
  196. if (target_->use_direct_io() || !unsync_data_loss_) {
  197. // TODO(hx235): buffer data for direct IO write to simulate data loss like
  198. // non-direct IO write
  199. s = target_->Append(data, options, dbg);
  200. } else {
  201. state_.buffer_.append(data.data(), data.size());
  202. }
  203. if (s.ok()) {
  204. state_.pos_at_last_append_ += data.size();
  205. fs_->WritableFileAppended(state_);
  206. }
  207. return s;
  208. }
  209. IOStatus TestFSWritableFile::Truncate(uint64_t size, const IOOptions& options,
  210. IODebugContext* dbg) {
  211. MutexLock l(&mutex_);
  212. if (!fs_->IsFilesystemActive()) {
  213. return fs_->GetError();
  214. }
  215. IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
  216. options, state_.filename_);
  217. if (!s.ok()) {
  218. return s;
  219. }
  220. s = target_->Truncate(size, options, dbg);
  221. if (s.ok()) {
  222. state_.pos_at_last_append_ = size;
  223. }
  224. return s;
  225. }
  226. IOStatus TestFSWritableFile::PositionedAppend(const Slice& data,
  227. uint64_t offset,
  228. const IOOptions& options,
  229. IODebugContext* dbg) {
  230. MutexLock l(&mutex_);
  231. if (!fs_->IsFilesystemActive()) {
  232. return fs_->GetError();
  233. }
  234. if (fs_->ShouldDataCorruptionBeforeWrite()) {
  235. return IOStatus::Corruption("Data is corrupted!");
  236. }
  237. IOStatus s = fs_->MaybeInjectThreadLocalError(
  238. FaultInjectionIOType::kWrite, options, state_.filename_,
  239. FaultInjectionTestFS::ErrorOperation::kPositionedAppend);
  240. if (!s.ok()) {
  241. return s;
  242. }
  243. // TODO(hx235): buffer data for direct IO write to simulate data loss like
  244. // non-direct IO write
  245. s = target_->PositionedAppend(data, offset, options, dbg);
  246. if (s.ok()) {
  247. state_.pos_at_last_append_ = offset + data.size();
  248. fs_->WritableFileAppended(state_);
  249. }
  250. return s;
  251. }
  252. IOStatus TestFSWritableFile::PositionedAppend(
  253. const Slice& data, uint64_t offset, const IOOptions& options,
  254. const DataVerificationInfo& verification_info, IODebugContext* dbg) {
  255. MutexLock l(&mutex_);
  256. if (!fs_->IsFilesystemActive()) {
  257. return fs_->GetError();
  258. }
  259. if (fs_->ShouldDataCorruptionBeforeWrite()) {
  260. return IOStatus::Corruption("Data is corrupted!");
  261. }
  262. IOStatus s = fs_->MaybeInjectThreadLocalError(
  263. FaultInjectionIOType::kWrite, options, state_.filename_,
  264. FaultInjectionTestFS::ErrorOperation::kPositionedAppend);
  265. if (!s.ok()) {
  266. return s;
  267. }
  268. // Calculate the checksum
  269. std::string checksum;
  270. CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
  271. data.size(), &checksum);
  272. if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
  273. checksum != verification_info.checksum.ToString()) {
  274. std::string msg =
  275. "Data is corrupted! Origin data checksum: " +
  276. verification_info.checksum.ToString(true) +
  277. "current data checksum: " + Slice(checksum).ToString(true);
  278. return IOStatus::Corruption(msg);
  279. }
  280. // TODO(hx235): buffer data for direct IO write to simulate data loss like
  281. // non-direct IO write
  282. s = target_->PositionedAppend(data, offset, options, dbg);
  283. if (s.ok()) {
  284. state_.pos_at_last_append_ = offset + data.size();
  285. fs_->WritableFileAppended(state_);
  286. }
  287. return s;
  288. }
  289. IOStatus TestFSWritableFile::Close(const IOOptions& options,
  290. IODebugContext* dbg) {
  291. MutexLock l(&mutex_);
  292. fs_->WritableFileClosed(state_);
  293. if (!fs_->IsFilesystemActive()) {
  294. return fs_->GetError();
  295. }
  296. IOStatus io_s = fs_->MaybeInjectThreadLocalError(
  297. FaultInjectionIOType::kMetadataWrite, options);
  298. if (!io_s.ok()) {
  299. return io_s;
  300. }
  301. writable_file_opened_ = false;
  302. // Drop buffered data that was never synced because close is not a syncing
  303. // mechanism in POSIX file semantics.
  304. state_.buffer_.resize(0);
  305. io_s = target_->Close(options, dbg);
  306. return io_s;
  307. }
  308. IOStatus TestFSWritableFile::Flush(const IOOptions&, IODebugContext*) {
  309. MutexLock l(&mutex_);
  310. if (!fs_->IsFilesystemActive()) {
  311. return fs_->GetError();
  312. }
  313. return IOStatus::OK();
  314. }
  315. IOStatus TestFSWritableFile::Sync(const IOOptions& options,
  316. IODebugContext* dbg) {
  317. MutexLock l(&mutex_);
  318. if (!fs_->IsFilesystemActive()) {
  319. return fs_->GetError();
  320. }
  321. if (target_->use_direct_io()) {
  322. // For Direct IO mode, we don't buffer anything in TestFSWritableFile.
  323. // So just return
  324. return IOStatus::OK();
  325. }
  326. IOStatus io_s = target_->Append(state_.buffer_, options, dbg);
  327. state_.buffer_.resize(0);
  328. // Ignore sync errors
  329. target_->Sync(options, dbg).PermitUncheckedError();
  330. state_.pos_at_last_sync_ = state_.pos_at_last_append_;
  331. fs_->WritableFileSynced(state_);
  332. return io_s;
  333. }
  334. IOStatus TestFSWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
  335. const IOOptions& options,
  336. IODebugContext* dbg) {
  337. MutexLock l(&mutex_);
  338. if (!fs_->IsFilesystemActive()) {
  339. return fs_->GetError();
  340. }
  341. // Assumes caller passes consecutive byte ranges.
  342. uint64_t sync_limit = offset + nbytes;
  343. IOStatus io_s;
  344. if (sync_limit < state_.pos_at_last_sync_) {
  345. return io_s;
  346. }
  347. uint64_t num_to_sync = std::min(static_cast<uint64_t>(state_.buffer_.size()),
  348. sync_limit - state_.pos_at_last_sync_);
  349. Slice buf_to_sync(state_.buffer_.data(), num_to_sync);
  350. io_s = target_->Append(buf_to_sync, options, dbg);
  351. state_.buffer_ = state_.buffer_.substr(num_to_sync);
  352. // Ignore sync errors
  353. target_->RangeSync(offset, nbytes, options, dbg).PermitUncheckedError();
  354. state_.pos_at_last_sync_ = offset + num_to_sync;
  355. fs_->WritableFileSynced(state_);
  356. return io_s;
  357. }
  358. TestFSRandomRWFile::TestFSRandomRWFile(const std::string& fname,
  359. std::unique_ptr<FSRandomRWFile>&& f,
  360. FaultInjectionTestFS* fs)
  361. : fname_(fname), target_(std::move(f)), file_opened_(true), fs_(fs) {
  362. assert(target_ != nullptr);
  363. }
  364. TestFSRandomRWFile::~TestFSRandomRWFile() {
  365. if (file_opened_) {
  366. Close(IOOptions(), nullptr).PermitUncheckedError();
  367. }
  368. }
  369. IOStatus TestFSRandomRWFile::Write(uint64_t offset, const Slice& data,
  370. const IOOptions& options,
  371. IODebugContext* dbg) {
  372. if (!fs_->IsFilesystemActive()) {
  373. return fs_->GetError();
  374. }
  375. return target_->Write(offset, data, options, dbg);
  376. }
  377. IOStatus TestFSRandomRWFile::Read(uint64_t offset, size_t n,
  378. const IOOptions& options, Slice* result,
  379. char* scratch, IODebugContext* dbg) const {
  380. if (!fs_->IsFilesystemActive()) {
  381. return fs_->GetError();
  382. }
  383. // TODO (low priority): fs_->ReadUnsyncedData()
  384. return target_->Read(offset, n, options, result, scratch, dbg);
  385. }
  386. IOStatus TestFSRandomRWFile::Close(const IOOptions& options,
  387. IODebugContext* dbg) {
  388. fs_->RandomRWFileClosed(fname_);
  389. if (!fs_->IsFilesystemActive()) {
  390. return fs_->GetError();
  391. }
  392. file_opened_ = false;
  393. return target_->Close(options, dbg);
  394. }
  395. IOStatus TestFSRandomRWFile::Flush(const IOOptions& options,
  396. IODebugContext* dbg) {
  397. if (!fs_->IsFilesystemActive()) {
  398. return fs_->GetError();
  399. }
  400. return target_->Flush(options, dbg);
  401. }
  402. IOStatus TestFSRandomRWFile::Sync(const IOOptions& options,
  403. IODebugContext* dbg) {
  404. if (!fs_->IsFilesystemActive()) {
  405. return fs_->GetError();
  406. }
  407. return target_->Sync(options, dbg);
  408. }
  409. TestFSRandomAccessFile::TestFSRandomAccessFile(
  410. const std::string& fname, std::unique_ptr<FSRandomAccessFile>&& f,
  411. FaultInjectionTestFS* fs)
  412. : target_(std::move(f)), fs_(fs), is_sst_(EndsWith(fname, ".sst")) {
  413. assert(target_ != nullptr);
  414. }
  415. IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
  416. const IOOptions& options, Slice* result,
  417. char* scratch,
  418. IODebugContext* dbg) const {
  419. TEST_SYNC_POINT("FaultInjectionTestFS::RandomRead");
  420. if (!fs_->IsFilesystemActive()) {
  421. return fs_->GetError();
  422. }
  423. IOStatus s = fs_->MaybeInjectThreadLocalError(
  424. FaultInjectionIOType::kRead, options, "",
  425. FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
  426. scratch, /*need_count_increase=*/true,
  427. /*fault_injected=*/nullptr);
  428. if (!s.ok()) {
  429. return s;
  430. }
  431. s = target_->Read(offset, n, options, result, scratch, dbg);
  432. // TODO (low priority): fs_->ReadUnsyncedData()
  433. return s;
  434. }
  435. IOStatus TestFSRandomAccessFile::ReadAsync(
  436. FSReadRequest& req, const IOOptions& opts,
  437. std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
  438. void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
  439. IOStatus res_status;
  440. FSReadRequest res;
  441. IOStatus s;
  442. if (!fs_->IsFilesystemActive()) {
  443. res_status = fs_->GetError();
  444. }
  445. if (res_status.ok()) {
  446. res_status = fs_->MaybeInjectThreadLocalError(
  447. FaultInjectionIOType::kRead, opts, "",
  448. FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
  449. use_direct_io(), req.scratch, /*need_count_increase=*/true,
  450. /*fault_injected=*/nullptr);
  451. }
  452. if (res_status.ok()) {
  453. s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
  454. // TODO (low priority): fs_->ReadUnsyncedData()
  455. } else {
  456. // If there's no injected error, then cb will be called asynchronously when
  457. // target_ actually finishes the read. But if there's an injected error, it
  458. // needs to immediately call cb(res, cb_arg) s since target_->ReadAsync()
  459. // isn't invoked at all.
  460. res.status = res_status;
  461. cb(res, cb_arg);
  462. }
  463. // We return ReadAsync()'s status intead of injected error status here since
  464. // the return status is not supposed to be the status of the actual IO (i.e,
  465. // the actual async read). The actual status of the IO will be passed to cb()
  466. // callback upon the actual read finishes or like above when injected error
  467. // happens.
  468. return s;
  469. }
  470. IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
  471. const IOOptions& options,
  472. IODebugContext* dbg) {
  473. if (!fs_->IsFilesystemActive()) {
  474. return fs_->GetError();
  475. }
  476. IOStatus s = target_->MultiRead(reqs, num_reqs, options, dbg);
  477. // TODO (low priority): fs_->ReadUnsyncedData()
  478. bool injected_error = false;
  479. for (size_t i = 0; i < num_reqs; i++) {
  480. if (!reqs[i].status.ok()) {
  481. // Already seeing an error.
  482. break;
  483. }
  484. bool this_injected_error;
  485. reqs[i].status = fs_->MaybeInjectThreadLocalError(
  486. FaultInjectionIOType::kRead, options, "",
  487. FaultInjectionTestFS::ErrorOperation::kRead, &(reqs[i].result),
  488. use_direct_io(), reqs[i].scratch,
  489. /*need_count_increase=*/true,
  490. /*fault_injected=*/&this_injected_error);
  491. injected_error |= this_injected_error;
  492. }
  493. if (s.ok()) {
  494. s = fs_->MaybeInjectThreadLocalError(
  495. FaultInjectionIOType::kRead, options, "",
  496. FaultInjectionTestFS::ErrorOperation::kMultiRead, nullptr,
  497. use_direct_io(), nullptr, /*need_count_increase=*/!injected_error,
  498. /*fault_injected=*/nullptr);
  499. }
  500. return s;
  501. }
  502. size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
  503. if (fs_->ShouldFailGetUniqueId()) {
  504. return 0;
  505. } else {
  506. return target_->GetUniqueId(id, max_size);
  507. }
  508. }
  509. IOStatus TestFSRandomAccessFile::GetFileSize(uint64_t* file_size) {
  510. if (is_sst_ && fs_->ShouldFailRandomAccessGetFileSizeSst()) {
  511. return IOStatus::IOError("FSRandomAccessFile::GetFileSize failed");
  512. } else {
  513. return target_->GetFileSize(file_size);
  514. }
  515. }
  516. namespace {
  517. // Modifies `result` to start at the beginning of `scratch` if not already,
  518. // copying data there if needed.
  519. void MoveToScratchIfNeeded(Slice* result, char* scratch) {
  520. if (result->data() != scratch) {
  521. // NOTE: might overlap, where result is later in scratch
  522. std::copy(result->data(), result->data() + result->size(), scratch);
  523. *result = Slice(scratch, result->size());
  524. }
  525. }
  526. } // namespace
  527. void FaultInjectionTestFS::ReadUnsynced(const std::string& fname,
  528. uint64_t offset, size_t n,
  529. Slice* result, char* scratch,
  530. int64_t* pos_at_last_sync) {
  531. *result = Slice(scratch, 0); // default empty result
  532. assert(*pos_at_last_sync == -1); // default "unknown"
  533. MutexLock l(&mutex_);
  534. auto it = db_file_state_.find(fname);
  535. if (it != db_file_state_.end()) {
  536. auto& st = it->second;
  537. *pos_at_last_sync = static_cast<int64_t>(st.pos_at_last_sync_);
  538. // Find overlap between [offset, offset + n) and
  539. // [*pos_at_last_sync, *pos_at_last_sync + st.buffer_.size())
  540. int64_t begin = std::max(static_cast<int64_t>(offset), *pos_at_last_sync);
  541. int64_t end =
  542. std::min(static_cast<int64_t>(offset + n),
  543. *pos_at_last_sync + static_cast<int64_t>(st.buffer_.size()));
  544. // Copy and return overlap if there is any
  545. if (begin < end) {
  546. size_t offset_in_buffer = static_cast<size_t>(begin - *pos_at_last_sync);
  547. size_t offset_in_scratch = static_cast<size_t>(begin - offset);
  548. std::copy_n(st.buffer_.data() + offset_in_buffer, end - begin,
  549. scratch + offset_in_scratch);
  550. *result = Slice(scratch + offset_in_scratch, end - begin);
  551. }
  552. }
  553. }
  554. IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
  555. Slice* result, char* scratch,
  556. IODebugContext* dbg) {
  557. IOStatus s = fs_->MaybeInjectThreadLocalError(
  558. FaultInjectionIOType::kRead, options, "",
  559. FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
  560. scratch, true /*need_count_increase=*/, nullptr /* fault_injected*/);
  561. if (!s.ok()) {
  562. return s;
  563. }
  564. // Some complex logic is needed to deal with concurrent write to the same
  565. // file, while keeping good performance (e.g. not holding FS mutex during
  566. // I/O op), especially in common cases.
  567. if (read_pos_ == target_read_pos_) {
  568. // Normal case: start by reading from underlying file
  569. s = target()->Read(n, options, result, scratch, dbg);
  570. if (!s.ok()) {
  571. return s;
  572. }
  573. target_read_pos_ += result->size();
  574. } else {
  575. // We must have previously read buffered data (unsynced) not written to
  576. // target. Deal with this case (and more) below.
  577. *result = {};
  578. }
  579. if (fs_->ReadUnsyncedData() && result->size() < n) {
  580. // We need to check if there's unsynced data to fill out the rest of the
  581. // read.
  582. // First, ensure target read data is in scratch for easy handling.
  583. MoveToScratchIfNeeded(result, scratch);
  584. assert(result->data() == scratch);
  585. // If we just did a target Read, we only want unsynced data after it
  586. // (target_read_pos_). Otherwise (e.g. if target is behind because of
  587. // unsynced data) we want unsynced data starting at the current read pos
  588. // (read_pos_, not yet updated).
  589. const uint64_t unsynced_read_pos = std::max(target_read_pos_, read_pos_);
  590. const size_t offset_from_read_pos =
  591. static_cast<size_t>(unsynced_read_pos - read_pos_);
  592. Slice unsynced_result;
  593. int64_t pos_at_last_sync = -1;
  594. fs_->ReadUnsynced(fname_, unsynced_read_pos, n - offset_from_read_pos,
  595. &unsynced_result, scratch + offset_from_read_pos,
  596. &pos_at_last_sync);
  597. assert(unsynced_result.data() >= scratch + offset_from_read_pos);
  598. assert(unsynced_result.data() < scratch + n);
  599. // Now, there are several cases to consider (some grouped together):
  600. if (pos_at_last_sync <= static_cast<int64_t>(unsynced_read_pos)) {
  601. // 1. We didn't get any unsynced data because nothing has been written
  602. // to the file beyond unsynced_read_pos (including untracked
  603. // pos_at_last_sync == -1)
  604. // 2. We got some unsynced data starting at unsynced_read_pos (possibly
  605. // on top of some synced data from target). We don't need to try reading
  606. // any more from target because we established a "point in time" for
  607. // completing this Read in which we read as much tail data (unsynced) as
  608. // we could.
  609. // We got pos_at_last_sync info if we got any unsynced data.
  610. assert(pos_at_last_sync >= 0 || unsynced_result.size() == 0);
  611. // Combined data is already lined up in scratch.
  612. assert(result->data() + result->size() == unsynced_result.data());
  613. assert(result->size() + unsynced_result.size() <= n);
  614. // Combine results
  615. *result = Slice(result->data(), result->size() + unsynced_result.size());
  616. } else {
  617. // 3. Any unsynced data we got was after unsynced_read_pos because the
  618. // file was synced some time since our last target Read (either from this
  619. // Read or a prior Read). We need to read more data from target to ensure
  620. // this Read is filled out, even though we might have already read some
  621. // (but not all due to a race). This code handles:
  622. //
  623. // * Catching up target after prior read(s) of unsynced data
  624. // * Racing Sync in another thread since we called target Read above
  625. //
  626. // And merging potentially three results together for this Read:
  627. // * The original target Read above
  628. // * The following (non-throw-away) target Read
  629. // * The ReadUnsynced above, which is always last if it returned data,
  630. // so that we have a "point in time" for completing this Read in which we
  631. // read as much tail data (unsynced) as we could.
  632. //
  633. // Deeper note about the race: we cannot just treat the original target
  634. // Read as a "point in time" view of available data in the file, because
  635. // there might have been unsynced data at that time, which became synced
  636. // data by the time we read unsynced data. That is the race we are
  637. // resolving with this "double check"-style code.
  638. const size_t supplemental_read_pos = unsynced_read_pos;
  639. // First, if there's any data from target that we know we would need to
  640. // throw away to catch up, try to do it.
  641. if (target_read_pos_ < supplemental_read_pos) {
  642. Slice throw_away_result;
  643. size_t throw_away_n = supplemental_read_pos - target_read_pos_;
  644. std::unique_ptr<char[]> throw_away_scratch{new char[throw_away_n]};
  645. s = target()->Read(throw_away_n, options, &throw_away_result,
  646. throw_away_scratch.get(), dbg);
  647. if (!s.ok()) {
  648. read_pos_ += result->size();
  649. return s;
  650. }
  651. target_read_pos_ += throw_away_result.size();
  652. if (target_read_pos_ < supplemental_read_pos) {
  653. // Because of pos_at_last_sync > supplemental_read_pos, we should
  654. // have been able to catch up
  655. read_pos_ += result->size();
  656. return IOStatus::IOError(
  657. "Unexpected truncation or short read of file " + fname_);
  658. }
  659. }
  660. // Now we can do a productive supplemental Read from target
  661. assert(target_read_pos_ == supplemental_read_pos);
  662. Slice supplemental_result;
  663. size_t supplemental_n =
  664. unsynced_result.size() == 0
  665. ? n - offset_from_read_pos
  666. : unsynced_result.data() - (scratch + offset_from_read_pos);
  667. s = target()->Read(supplemental_n, options, &supplemental_result,
  668. scratch + offset_from_read_pos, dbg);
  669. if (!s.ok()) {
  670. read_pos_ += result->size();
  671. return s;
  672. }
  673. target_read_pos_ += supplemental_result.size();
  674. MoveToScratchIfNeeded(&supplemental_result,
  675. scratch + offset_from_read_pos);
  676. // Combined data is already lined up in scratch.
  677. assert(result->data() + result->size() == supplemental_result.data());
  678. assert(unsynced_result.size() == 0 ||
  679. supplemental_result.data() + supplemental_result.size() ==
  680. unsynced_result.data());
  681. assert(result->size() + supplemental_result.size() +
  682. unsynced_result.size() <=
  683. n);
  684. // Combine results
  685. *result =
  686. Slice(result->data(), result->size() + supplemental_result.size() +
  687. unsynced_result.size());
  688. }
  689. }
  690. read_pos_ += result->size();
  691. return s;
  692. }
  693. IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
  694. const IOOptions& options,
  695. Slice* result, char* scratch,
  696. IODebugContext* dbg) {
  697. IOStatus s = fs_->MaybeInjectThreadLocalError(
  698. FaultInjectionIOType::kRead, options, "",
  699. FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
  700. scratch, true /*need_count_increase=*/, nullptr /* fault_injected */);
  701. if (!s.ok()) {
  702. return s;
  703. }
  704. s = target()->PositionedRead(offset, n, options, result, scratch, dbg);
  705. // TODO (low priority): fs_->ReadUnsyncedData()
  706. return s;
  707. }
  708. IOStatus FaultInjectionTestFS::NewDirectory(
  709. const std::string& name, const IOOptions& options,
  710. std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {
  711. std::unique_ptr<FSDirectory> r;
  712. IOStatus io_s = target()->NewDirectory(name, options, &r, dbg);
  713. if (!io_s.ok()) {
  714. return io_s;
  715. }
  716. result->reset(
  717. new TestFSDirectory(this, TestFSTrimDirname(name), r.release()));
  718. return IOStatus::OK();
  719. }
  720. IOStatus FaultInjectionTestFS::FileExists(const std::string& fname,
  721. const IOOptions& options,
  722. IODebugContext* dbg) {
  723. if (!IsFilesystemActive()) {
  724. return GetError();
  725. }
  726. IOStatus io_s =
  727. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  728. if (!io_s.ok()) {
  729. return io_s;
  730. }
  731. io_s = target()->FileExists(fname, options, dbg);
  732. return io_s;
  733. }
  734. IOStatus FaultInjectionTestFS::GetChildren(const std::string& dir,
  735. const IOOptions& options,
  736. std::vector<std::string>* result,
  737. IODebugContext* dbg) {
  738. if (!IsFilesystemActive()) {
  739. return GetError();
  740. }
  741. IOStatus io_s =
  742. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  743. if (!io_s.ok()) {
  744. return io_s;
  745. }
  746. io_s = target()->GetChildren(dir, options, result, dbg);
  747. return io_s;
  748. }
  749. IOStatus FaultInjectionTestFS::GetChildrenFileAttributes(
  750. const std::string& dir, const IOOptions& options,
  751. std::vector<FileAttributes>* result, IODebugContext* dbg) {
  752. if (!IsFilesystemActive()) {
  753. return GetError();
  754. }
  755. IOStatus io_s =
  756. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  757. if (!io_s.ok()) {
  758. return io_s;
  759. }
  760. io_s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
  761. return io_s;
  762. }
  763. IOStatus FaultInjectionTestFS::NewWritableFile(
  764. const std::string& fname, const FileOptions& file_opts,
  765. std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
  766. if (!IsFilesystemActive()) {
  767. return GetError();
  768. }
  769. if (IsFilesystemDirectWritable()) {
  770. return target()->NewWritableFile(fname, file_opts, result, dbg);
  771. }
  772. IOStatus io_s = MaybeInjectThreadLocalError(
  773. FaultInjectionIOType::kWrite, file_opts.io_options, fname,
  774. FaultInjectionTestFS::ErrorOperation::kOpen);
  775. if (!io_s.ok()) {
  776. return io_s;
  777. }
  778. io_s = target()->NewWritableFile(fname, file_opts, result, dbg);
  779. if (io_s.ok()) {
  780. result->reset(
  781. new TestFSWritableFile(fname, file_opts, std::move(*result), this));
  782. // WritableFileWriter* file is opened
  783. // again then it will be truncated - so forget our saved state.
  784. UntrackFile(fname);
  785. {
  786. MutexLock l(&mutex_);
  787. open_managed_files_.insert(fname);
  788. auto dir_and_name = TestFSGetDirAndName(fname);
  789. auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
  790. // The new file could overwrite an old one. Here we simplify
  791. // the implementation by assuming no file of this name after
  792. // dropping unsynced files.
  793. list[dir_and_name.second] = kNewFileNoOverwrite;
  794. }
  795. }
  796. return io_s;
  797. }
  798. IOStatus FaultInjectionTestFS::ReopenWritableFile(
  799. const std::string& fname, const FileOptions& file_opts,
  800. std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
  801. if (!IsFilesystemActive()) {
  802. return GetError();
  803. }
  804. if (IsFilesystemDirectWritable()) {
  805. return target()->ReopenWritableFile(fname, file_opts, result, dbg);
  806. }
  807. IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
  808. file_opts.io_options, fname);
  809. if (!io_s.ok()) {
  810. return io_s;
  811. }
  812. bool exists;
  813. IOStatus exists_s =
  814. target()->FileExists(fname, IOOptions(), nullptr /* dbg */);
  815. if (exists_s.IsNotFound()) {
  816. exists = false;
  817. } else if (exists_s.ok()) {
  818. exists = true;
  819. } else {
  820. io_s = exists_s;
  821. exists = false;
  822. }
  823. if (!io_s.ok()) {
  824. return io_s;
  825. }
  826. io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
  827. // Only track files we created. Files created outside of this
  828. // `FaultInjectionTestFS` are not eligible for tracking/data dropping
  829. // (for example, they may contain data a previous db_stress run expects to
  830. // be recovered). This could be extended to track/drop data appended once
  831. // the file is under `FaultInjectionTestFS`'s control.
  832. if (io_s.ok()) {
  833. bool should_track;
  834. {
  835. MutexLock l(&mutex_);
  836. if (db_file_state_.find(fname) != db_file_state_.end()) {
  837. // It was written by this `FileSystem` earlier.
  838. assert(exists);
  839. should_track = true;
  840. } else if (!exists) {
  841. // It was created by this `FileSystem` just now.
  842. should_track = true;
  843. open_managed_files_.insert(fname);
  844. auto dir_and_name = TestFSGetDirAndName(fname);
  845. auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
  846. list[dir_and_name.second] = kNewFileNoOverwrite;
  847. } else {
  848. should_track = false;
  849. }
  850. }
  851. if (should_track) {
  852. result->reset(
  853. new TestFSWritableFile(fname, file_opts, std::move(*result), this));
  854. }
  855. }
  856. return io_s;
  857. }
  858. IOStatus FaultInjectionTestFS::ReuseWritableFile(
  859. const std::string& fname, const std::string& old_fname,
  860. const FileOptions& file_opts, std::unique_ptr<FSWritableFile>* result,
  861. IODebugContext* dbg) {
  862. IOStatus s = RenameFile(old_fname, fname, file_opts.io_options, dbg);
  863. if (!s.ok()) {
  864. return s;
  865. }
  866. return NewWritableFile(fname, file_opts, result, dbg);
  867. }
  868. IOStatus FaultInjectionTestFS::NewRandomRWFile(
  869. const std::string& fname, const FileOptions& file_opts,
  870. std::unique_ptr<FSRandomRWFile>* result, IODebugContext* dbg) {
  871. if (!IsFilesystemActive()) {
  872. return GetError();
  873. }
  874. if (IsFilesystemDirectWritable()) {
  875. return target()->NewRandomRWFile(fname, file_opts, result, dbg);
  876. }
  877. IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
  878. file_opts.io_options, fname);
  879. if (!io_s.ok()) {
  880. return io_s;
  881. }
  882. io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
  883. if (io_s.ok()) {
  884. result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
  885. // WritableFileWriter* file is opened
  886. // again then it will be truncated - so forget our saved state.
  887. UntrackFile(fname);
  888. {
  889. MutexLock l(&mutex_);
  890. open_managed_files_.insert(fname);
  891. auto dir_and_name = TestFSGetDirAndName(fname);
  892. auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
  893. // It could be overwriting an old file, but we simplify the
  894. // implementation by ignoring it.
  895. list[dir_and_name.second] = kNewFileNoOverwrite;
  896. }
  897. }
  898. return io_s;
  899. }
  900. IOStatus FaultInjectionTestFS::NewRandomAccessFile(
  901. const std::string& fname, const FileOptions& file_opts,
  902. std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
  903. if (!IsFilesystemActive()) {
  904. return GetError();
  905. }
  906. IOStatus io_s = MaybeInjectThreadLocalError(
  907. FaultInjectionIOType::kRead, file_opts.io_options, fname,
  908. ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
  909. nullptr /* scratch */, true /*need_count_increase*/,
  910. nullptr /*fault_injected*/);
  911. if (!io_s.ok()) {
  912. return io_s;
  913. }
  914. io_s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
  915. if (io_s.ok()) {
  916. result->reset(new TestFSRandomAccessFile(fname, std::move(*result), this));
  917. }
  918. return io_s;
  919. }
  920. IOStatus FaultInjectionTestFS::NewSequentialFile(
  921. const std::string& fname, const FileOptions& file_opts,
  922. std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
  923. if (!IsFilesystemActive()) {
  924. return GetError();
  925. }
  926. IOStatus io_s = MaybeInjectThreadLocalError(
  927. FaultInjectionIOType::kRead, file_opts.io_options, fname,
  928. ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
  929. nullptr /* scratch */, true /*need_count_increase*/,
  930. nullptr /*fault_injected*/);
  931. if (!io_s.ok()) {
  932. return io_s;
  933. }
  934. io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
  935. if (io_s.ok()) {
  936. result->reset(new TestFSSequentialFile(std::move(*result), this, fname));
  937. }
  938. return io_s;
  939. }
  940. IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
  941. const IOOptions& options,
  942. IODebugContext* dbg) {
  943. if (!IsFilesystemActive()) {
  944. return GetError();
  945. }
  946. IOStatus io_s = MaybeInjectThreadLocalError(
  947. FaultInjectionIOType::kMetadataWrite, options);
  948. if (!io_s.ok()) {
  949. return io_s;
  950. }
  951. io_s = FileSystemWrapper::DeleteFile(f, options, dbg);
  952. if (io_s.ok()) {
  953. UntrackFile(f);
  954. }
  955. return io_s;
  956. }
  957. IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
  958. const IOOptions& options,
  959. uint64_t* file_size,
  960. IODebugContext* dbg) {
  961. if (EndsWith(f, ".sst") && ShouldFailFilesystemGetFileSizeSst()) {
  962. return IOStatus::IOError("FileSystem::GetFileSize failed");
  963. }
  964. if (!IsFilesystemActive()) {
  965. return GetError();
  966. }
  967. IOStatus io_s =
  968. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  969. if (!io_s.ok()) {
  970. return io_s;
  971. }
  972. io_s = target()->GetFileSize(f, options, file_size, dbg);
  973. if (!io_s.ok()) {
  974. return io_s;
  975. }
  976. if (ReadUnsyncedData()) {
  977. // Need to report flushed size, not synced size
  978. MutexLock l(&mutex_);
  979. auto it = db_file_state_.find(f);
  980. if (it != db_file_state_.end()) {
  981. *file_size = it->second.pos_at_last_append_;
  982. }
  983. }
  984. return io_s;
  985. }
  986. IOStatus FaultInjectionTestFS::GetFileModificationTime(const std::string& fname,
  987. const IOOptions& options,
  988. uint64_t* file_mtime,
  989. IODebugContext* dbg) {
  990. if (!IsFilesystemActive()) {
  991. return GetError();
  992. }
  993. IOStatus io_s =
  994. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  995. if (!io_s.ok()) {
  996. return io_s;
  997. }
  998. io_s = target()->GetFileModificationTime(fname, options, file_mtime, dbg);
  999. return io_s;
  1000. }
  1001. IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
  1002. const std::string& t,
  1003. const IOOptions& options,
  1004. IODebugContext* dbg) {
  1005. if (!IsFilesystemActive()) {
  1006. return GetError();
  1007. }
  1008. IOStatus io_s = MaybeInjectThreadLocalError(
  1009. FaultInjectionIOType::kMetadataWrite, options);
  1010. if (!io_s.ok()) {
  1011. return io_s;
  1012. }
  1013. // We preserve contents of overwritten files up to a size threshold.
  1014. // We could keep previous file in another name, but we need to worry about
  1015. // garbage collect the those files. We do it if it is needed later.
  1016. // We ignore I/O errors here for simplicity.
  1017. std::string previous_contents = kNewFileNoOverwrite;
  1018. if (target()->FileExists(t, IOOptions(), nullptr).ok()) {
  1019. uint64_t file_size;
  1020. if (target()->GetFileSize(t, IOOptions(), &file_size, nullptr).ok() &&
  1021. file_size < 1024) {
  1022. ReadFileToString(target(), t, &previous_contents).PermitUncheckedError();
  1023. }
  1024. }
  1025. io_s = FileSystemWrapper::RenameFile(s, t, options, dbg);
  1026. if (io_s.ok()) {
  1027. {
  1028. MutexLock l(&mutex_);
  1029. if (db_file_state_.find(s) != db_file_state_.end()) {
  1030. db_file_state_[t] = db_file_state_[s];
  1031. db_file_state_.erase(s);
  1032. }
  1033. auto sdn = TestFSGetDirAndName(s);
  1034. auto tdn = TestFSGetDirAndName(t);
  1035. if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
  1036. auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
  1037. tlist[tdn.second] = previous_contents;
  1038. }
  1039. }
  1040. }
  1041. return io_s;
  1042. }
  1043. IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
  1044. const std::string& t,
  1045. const IOOptions& options,
  1046. IODebugContext* dbg) {
  1047. if (!IsFilesystemActive()) {
  1048. return GetError();
  1049. }
  1050. IOStatus io_s = MaybeInjectThreadLocalError(
  1051. FaultInjectionIOType::kMetadataWrite, options);
  1052. if (!io_s.ok()) {
  1053. return io_s;
  1054. }
  1055. // Using the value in `dir_to_new_files_since_last_sync_` for the source file
  1056. // may be a more reasonable choice.
  1057. std::string previous_contents = kNewFileNoOverwrite;
  1058. io_s = FileSystemWrapper::LinkFile(s, t, options, dbg);
  1059. if (io_s.ok()) {
  1060. {
  1061. MutexLock l(&mutex_);
  1062. if (!allow_link_open_file_ &&
  1063. open_managed_files_.find(s) != open_managed_files_.end()) {
  1064. fprintf(stderr, "Attempt to LinkFile while open for write: %s\n",
  1065. s.c_str());
  1066. abort();
  1067. }
  1068. if (db_file_state_.find(s) != db_file_state_.end()) {
  1069. db_file_state_[t] = db_file_state_[s];
  1070. }
  1071. auto sdn = TestFSGetDirAndName(s);
  1072. auto tdn = TestFSGetDirAndName(t);
  1073. if (dir_to_new_files_since_last_sync_[sdn.first].find(sdn.second) !=
  1074. dir_to_new_files_since_last_sync_[sdn.first].end()) {
  1075. auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
  1076. assert(tlist.find(tdn.second) == tlist.end());
  1077. tlist[tdn.second] = previous_contents;
  1078. }
  1079. }
  1080. }
  1081. return io_s;
  1082. }
  1083. IOStatus FaultInjectionTestFS::NumFileLinks(const std::string& fname,
  1084. const IOOptions& options,
  1085. uint64_t* count,
  1086. IODebugContext* dbg) {
  1087. if (!IsFilesystemActive()) {
  1088. return GetError();
  1089. }
  1090. IOStatus io_s =
  1091. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  1092. if (!io_s.ok()) {
  1093. return io_s;
  1094. }
  1095. io_s = target()->NumFileLinks(fname, options, count, dbg);
  1096. return io_s;
  1097. }
  1098. IOStatus FaultInjectionTestFS::AreFilesSame(const std::string& first,
  1099. const std::string& second,
  1100. const IOOptions& options, bool* res,
  1101. IODebugContext* dbg) {
  1102. if (!IsFilesystemActive()) {
  1103. return GetError();
  1104. }
  1105. IOStatus io_s =
  1106. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  1107. if (!io_s.ok()) {
  1108. return io_s;
  1109. }
  1110. io_s = target()->AreFilesSame(first, second, options, res, dbg);
  1111. return io_s;
  1112. }
  1113. IOStatus FaultInjectionTestFS::GetAbsolutePath(const std::string& db_path,
  1114. const IOOptions& options,
  1115. std::string* output_path,
  1116. IODebugContext* dbg) {
  1117. if (!IsFilesystemActive()) {
  1118. return GetError();
  1119. }
  1120. IOStatus io_s =
  1121. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  1122. if (!io_s.ok()) {
  1123. return io_s;
  1124. }
  1125. io_s = target()->GetAbsolutePath(db_path, options, output_path, dbg);
  1126. return io_s;
  1127. }
  1128. IOStatus FaultInjectionTestFS::IsDirectory(const std::string& path,
  1129. const IOOptions& options,
  1130. bool* is_dir, IODebugContext* dgb) {
  1131. if (!IsFilesystemActive()) {
  1132. return GetError();
  1133. }
  1134. IOStatus io_s =
  1135. MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
  1136. if (!io_s.ok()) {
  1137. return io_s;
  1138. }
  1139. io_s = target()->IsDirectory(path, options, is_dir, dgb);
  1140. return io_s;
  1141. }
  1142. IOStatus FaultInjectionTestFS::Poll(std::vector<void*>& io_handles,
  1143. size_t min_completions) {
  1144. return target()->Poll(io_handles, min_completions);
  1145. }
  1146. IOStatus FaultInjectionTestFS::AbortIO(std::vector<void*>& io_handles) {
  1147. return target()->AbortIO(io_handles);
  1148. }
  1149. void FaultInjectionTestFS::RandomRWFileClosed(const std::string& fname) {
  1150. MutexLock l(&mutex_);
  1151. if (open_managed_files_.find(fname) != open_managed_files_.end()) {
  1152. open_managed_files_.erase(fname);
  1153. }
  1154. }
  1155. void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
  1156. MutexLock l(&mutex_);
  1157. if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
  1158. db_file_state_[state.filename_] = state;
  1159. open_managed_files_.erase(state.filename_);
  1160. }
  1161. }
  1162. void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) {
  1163. MutexLock l(&mutex_);
  1164. if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
  1165. if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
  1166. db_file_state_.insert(std::make_pair(state.filename_, state));
  1167. } else {
  1168. db_file_state_[state.filename_] = state;
  1169. }
  1170. }
  1171. }
  1172. void FaultInjectionTestFS::WritableFileAppended(const FSFileState& state) {
  1173. MutexLock l(&mutex_);
  1174. if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
  1175. if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
  1176. db_file_state_.insert(std::make_pair(state.filename_, state));
  1177. } else {
  1178. db_file_state_[state.filename_] = state;
  1179. }
  1180. }
  1181. }
  1182. IOStatus FaultInjectionTestFS::DropUnsyncedFileData() {
  1183. IOStatus io_s;
  1184. MutexLock l(&mutex_);
  1185. for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
  1186. io_s.ok() && it != db_file_state_.end(); ++it) {
  1187. FSFileState& fs_state = it->second;
  1188. if (!fs_state.IsFullySynced()) {
  1189. io_s = fs_state.DropUnsyncedData();
  1190. }
  1191. }
  1192. return io_s;
  1193. }
  1194. IOStatus FaultInjectionTestFS::DropRandomUnsyncedFileData(Random* rnd) {
  1195. IOStatus io_s;
  1196. MutexLock l(&mutex_);
  1197. for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
  1198. io_s.ok() && it != db_file_state_.end(); ++it) {
  1199. FSFileState& fs_state = it->second;
  1200. if (!fs_state.IsFullySynced()) {
  1201. io_s = fs_state.DropRandomUnsyncedData(rnd);
  1202. }
  1203. }
  1204. return io_s;
  1205. }
  1206. IOStatus FaultInjectionTestFS::DeleteFilesCreatedAfterLastDirSync(
  1207. const IOOptions& options, IODebugContext* dbg) {
  1208. // Because DeleteFile access this container make a copy to avoid deadlock
  1209. std::map<std::string, std::map<std::string, std::string>> map_copy;
  1210. {
  1211. MutexLock l(&mutex_);
  1212. map_copy.insert(dir_to_new_files_since_last_sync_.begin(),
  1213. dir_to_new_files_since_last_sync_.end());
  1214. }
  1215. for (auto& pair : map_copy) {
  1216. for (auto& file_pair : pair.second) {
  1217. if (file_pair.second == kNewFileNoOverwrite) {
  1218. IOStatus io_s =
  1219. DeleteFile(pair.first + "/" + file_pair.first, options, dbg);
  1220. if (!io_s.ok()) {
  1221. return io_s;
  1222. }
  1223. } else {
  1224. IOOptions opts;
  1225. IOStatus io_s =
  1226. WriteStringToFile(target(), file_pair.second,
  1227. pair.first + "/" + file_pair.first, true, opts);
  1228. if (!io_s.ok()) {
  1229. return io_s;
  1230. }
  1231. }
  1232. }
  1233. }
  1234. return IOStatus::OK();
  1235. }
  1236. void FaultInjectionTestFS::ResetState() {
  1237. MutexLock l(&mutex_);
  1238. db_file_state_.clear();
  1239. dir_to_new_files_since_last_sync_.clear();
  1240. SetFilesystemActiveNoLock(true);
  1241. }
  1242. void FaultInjectionTestFS::UntrackFile(const std::string& f) {
  1243. MutexLock l(&mutex_);
  1244. auto dir_and_name = TestFSGetDirAndName(f);
  1245. dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
  1246. dir_and_name.second);
  1247. db_file_state_.erase(f);
  1248. open_managed_files_.erase(f);
  1249. }
  1250. IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError(
  1251. const IOOptions& io_options, ErrorOperation op, Slice* result,
  1252. bool direct_io, char* scratch, bool need_count_increase,
  1253. bool* fault_injected) {
  1254. bool dummy_bool;
  1255. bool& ret_fault_injected = fault_injected ? *fault_injected : dummy_bool;
  1256. ret_fault_injected = false;
  1257. ErrorContext* ctx =
  1258. static_cast<ErrorContext*>(injected_thread_local_read_error_.Get());
  1259. if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
  1260. ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity)) {
  1261. return IOStatus::OK();
  1262. }
  1263. IOStatus ret;
  1264. if (ctx->rand.OneIn(ctx->one_in)) {
  1265. if (ctx->count == 0) {
  1266. ctx->message = "";
  1267. }
  1268. if (need_count_increase) {
  1269. ctx->count++;
  1270. }
  1271. if (ctx->callstack) {
  1272. free(ctx->callstack);
  1273. }
  1274. ctx->callstack = port::SaveStack(&ctx->frames);
  1275. std::stringstream msg;
  1276. msg << FaultInjectionTestFS::kInjected << " ";
  1277. if (op != ErrorOperation::kMultiReadSingleReq) {
  1278. // Likely non-per read status code for MultiRead
  1279. msg << "read error";
  1280. ctx->message = msg.str();
  1281. ret_fault_injected = true;
  1282. ret = IOStatus::IOError(ctx->message);
  1283. } else if (Random::GetTLSInstance()->OneIn(8)) {
  1284. assert(result);
  1285. // For a small chance, set the failure to status but turn the
  1286. // result to be empty, which is supposed to be caught for a check.
  1287. *result = Slice();
  1288. msg << "empty result";
  1289. ctx->message = msg.str();
  1290. ret_fault_injected = true;
  1291. ret = IOStatus::IOError(ctx->message);
  1292. } else if (!direct_io && Random::GetTLSInstance()->OneIn(7) &&
  1293. scratch != nullptr && result->data() == scratch) {
  1294. assert(result);
  1295. // With direct I/O, many extra bytes might be read so corrupting
  1296. // one byte might not cause checksum mismatch. Skip checksum
  1297. // corruption injection.
  1298. // We only corrupt data if the result is filled to `scratch`. For other
  1299. // cases, the data might not be able to be modified (e.g mmaped files)
  1300. // or has unintended side effects.
  1301. // For a small chance, set the failure to status but corrupt the
  1302. // result in a way that checksum checking is supposed to fail.
  1303. // Corrupt the last byte, which is supposed to be a checksum byte
  1304. // It would work for CRC. Not 100% sure for xxhash and will adjust
  1305. // if it is not the case.
  1306. const_cast<char*>(result->data())[result->size() - 1]++;
  1307. msg << "corrupt last byte";
  1308. ctx->message = msg.str();
  1309. ret_fault_injected = true;
  1310. ret = IOStatus::IOError(ctx->message);
  1311. } else {
  1312. msg << "error result multiget single";
  1313. ctx->message = msg.str();
  1314. ret_fault_injected = true;
  1315. ret = IOStatus::IOError(ctx->message);
  1316. }
  1317. }
  1318. ret.SetRetryable(ctx->retryable);
  1319. ret.SetDataLoss(ctx->has_data_loss);
  1320. return ret;
  1321. }
  1322. bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name,
  1323. uint64_t* number, FileType* type) {
  1324. std::size_t found = file_name.find_last_of('/');
  1325. std::string file = file_name.substr(found);
  1326. return ParseFileName(file, number, type);
  1327. }
  1328. IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(
  1329. FaultInjectionIOType type, const IOOptions& io_options,
  1330. const std::string& file_name, ErrorOperation op, Slice* result,
  1331. bool direct_io, char* scratch, bool need_count_increase,
  1332. bool* fault_injected) {
  1333. if (type == FaultInjectionIOType::kRead) {
  1334. return MaybeInjectThreadLocalReadError(io_options, op, result, direct_io,
  1335. scratch, need_count_increase,
  1336. fault_injected);
  1337. }
  1338. ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
  1339. if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
  1340. ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity) ||
  1341. (type == FaultInjectionIOType::kWrite &&
  1342. ShouldExcludeFromWriteFaultInjection(file_name))) {
  1343. return IOStatus::OK();
  1344. }
  1345. IOStatus ret;
  1346. if (ctx->rand.OneIn(ctx->one_in)) {
  1347. ctx->count++;
  1348. if (ctx->callstack) {
  1349. free(ctx->callstack);
  1350. }
  1351. ctx->callstack = port::SaveStack(&ctx->frames);
  1352. ctx->message = GetErrorMessage(type, file_name, op);
  1353. ret = IOStatus::IOError(ctx->message);
  1354. ret.SetRetryable(ctx->retryable);
  1355. ret.SetDataLoss(ctx->has_data_loss);
  1356. if (type == FaultInjectionIOType::kWrite) {
  1357. TEST_SYNC_POINT(
  1358. "FaultInjectionTestFS::InjectMetadataWriteError:Injected");
  1359. }
  1360. }
  1361. return ret;
  1362. }
  1363. void FaultInjectionTestFS::PrintInjectedThreadLocalErrorBacktrace(
  1364. FaultInjectionIOType type) {
  1365. #if defined(OS_LINUX)
  1366. ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
  1367. if (ctx) {
  1368. if (type == FaultInjectionIOType::kRead) {
  1369. fprintf(stderr, "Injected read error type = %d\n", ctx->type);
  1370. }
  1371. fprintf(stderr, "Message: %s\n", ctx->message.c_str());
  1372. port::PrintAndFreeStack(ctx->callstack, ctx->frames);
  1373. ctx->callstack = nullptr;
  1374. }
  1375. #else
  1376. (void)type;
  1377. #endif
  1378. }
  1379. } // namespace ROCKSDB_NAMESPACE