db_io_failure_test.cc 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <iomanip>
  10. #include "db/db_test_util.h"
  11. #include "port/stack_trace.h"
  12. #include "test_util/testutil.h"
  13. #include "util/random.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. namespace {
  16. // A wrapper that allows injection of errors.
  17. class CorruptionFS : public FileSystemWrapper {
  18. public:
  19. bool writable_file_error_;
  20. int num_writable_file_errors_;
  21. explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target,
  22. bool fs_buffer, bool verify_read)
  23. : FileSystemWrapper(_target),
  24. writable_file_error_(false),
  25. num_writable_file_errors_(0),
  26. corruption_trigger_(INT_MAX),
  27. read_count_(0),
  28. corrupt_offset_(0),
  29. corrupt_len_(0),
  30. rnd_(300),
  31. fs_buffer_(fs_buffer),
  32. verify_read_(verify_read) {}
  33. ~CorruptionFS() override {
  34. // Assert that the corruption was reset, which means it got triggered
  35. assert(corruption_trigger_ == INT_MAX || corrupt_len_ > 0);
  36. }
  37. const char* Name() const override { return "ErrorEnv"; }
  38. IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
  39. std::unique_ptr<FSWritableFile>* result,
  40. IODebugContext* dbg) override {
  41. result->reset();
  42. if (writable_file_error_) {
  43. ++num_writable_file_errors_;
  44. return IOStatus::IOError(fname, "fake error");
  45. }
  46. return target()->NewWritableFile(fname, opts, result, dbg);
  47. }
  48. void SetCorruptionTrigger(const int trigger) {
  49. MutexLock l(&mutex_);
  50. corruption_trigger_ = trigger;
  51. read_count_ = 0;
  52. corrupt_fname_.clear();
  53. }
  54. IOStatus NewRandomAccessFile(const std::string& fname,
  55. const FileOptions& opts,
  56. std::unique_ptr<FSRandomAccessFile>* result,
  57. IODebugContext* dbg) override {
  58. class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
  59. public:
  60. CorruptionRandomAccessFile(CorruptionFS& fs, const std::string& fname,
  61. std::unique_ptr<FSRandomAccessFile>& file)
  62. : FSRandomAccessFileOwnerWrapper(std::move(file)),
  63. fs_(fs),
  64. fname_(fname) {}
  65. IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
  66. Slice* result, char* scratch,
  67. IODebugContext* dbg) const override {
  68. IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg);
  69. if (opts.verify_and_reconstruct_read) {
  70. fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset,
  71. result->size());
  72. return s;
  73. }
  74. MutexLock l(&fs_.mutex_);
  75. if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
  76. fs_.corruption_trigger_ = INT_MAX;
  77. char* data = const_cast<char*>(result->data());
  78. std::memcpy(
  79. data,
  80. fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
  81. result->size());
  82. fs_.SetCorruptedChunk(fname_, offset, result->size());
  83. }
  84. return s;
  85. }
  86. IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
  87. const IOOptions& options,
  88. IODebugContext* dbg) override {
  89. for (size_t i = 0; i < num_reqs; ++i) {
  90. FSReadRequest& req = reqs[i];
  91. if (fs_.fs_buffer_) {
  92. // See https://github.com/facebook/rocksdb/pull/13195 for why we
  93. // want to set up our test implementation for FSAllocationPtr this
  94. // way.
  95. char* internalData = new char[req.len];
  96. req.status = Read(req.offset, req.len, options, &req.result,
  97. internalData, dbg);
  98. Slice* internalSlice = new Slice(internalData, req.len);
  99. FSAllocationPtr internalPtr(internalSlice, [](void* ptr) {
  100. delete[] static_cast<const char*>(
  101. static_cast<Slice*>(ptr)->data_);
  102. delete static_cast<Slice*>(ptr);
  103. });
  104. req.fs_scratch = std::move(internalPtr);
  105. } else {
  106. req.status = Read(req.offset, req.len, options, &req.result,
  107. req.scratch, dbg);
  108. }
  109. }
  110. return IOStatus::OK();
  111. }
  112. IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
  113. const IOOptions& /*options*/,
  114. IODebugContext* /*dbg*/) override {
  115. return IOStatus::NotSupported("Prefetch");
  116. }
  117. private:
  118. CorruptionFS& fs_;
  119. std::string fname_;
  120. };
  121. std::unique_ptr<FSRandomAccessFile> file;
  122. IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
  123. EXPECT_OK(s);
  124. result->reset(new CorruptionRandomAccessFile(*this, fname, file));
  125. return s;
  126. }
  127. IOStatus NewSequentialFile(const std::string& fname,
  128. const FileOptions& file_opts,
  129. std::unique_ptr<FSSequentialFile>* result,
  130. IODebugContext* dbg) override {
  131. class CorruptionSequentialFile : public FSSequentialFileOwnerWrapper {
  132. public:
  133. CorruptionSequentialFile(CorruptionFS& fs, const std::string& fname,
  134. std::unique_ptr<FSSequentialFile>& file)
  135. : FSSequentialFileOwnerWrapper(std::move(file)),
  136. fs_(fs),
  137. fname_(fname),
  138. offset_(0) {}
  139. IOStatus Read(size_t len, const IOOptions& opts, Slice* result,
  140. char* scratch, IODebugContext* dbg) override {
  141. IOStatus s = target()->Read(len, opts, result, scratch, dbg);
  142. if (result->size() == 0 ||
  143. fname_.find("IDENTITY") != std::string::npos) {
  144. return s;
  145. }
  146. if (opts.verify_and_reconstruct_read) {
  147. fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset_,
  148. result->size());
  149. return s;
  150. }
  151. MutexLock l(&fs_.mutex_);
  152. if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
  153. fs_.corruption_trigger_ = INT_MAX;
  154. char* data = const_cast<char*>(result->data());
  155. std::memcpy(
  156. data,
  157. fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
  158. result->size());
  159. fs_.SetCorruptedChunk(fname_, offset_, result->size());
  160. }
  161. offset_ += result->size();
  162. return s;
  163. }
  164. private:
  165. CorruptionFS& fs_;
  166. std::string fname_;
  167. size_t offset_;
  168. };
  169. std::unique_ptr<FSSequentialFile> file;
  170. IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg);
  171. EXPECT_OK(s);
  172. result->reset(new CorruptionSequentialFile(*this, fname, file));
  173. return s;
  174. }
  175. void SupportedOps(int64_t& supported_ops) override {
  176. supported_ops = 1 << FSSupportedOps::kAsyncIO;
  177. if (fs_buffer_) {
  178. supported_ops |= 1 << FSSupportedOps::kFSBuffer;
  179. }
  180. if (verify_read_) {
  181. supported_ops |= 1 << FSSupportedOps::kVerifyAndReconstructRead;
  182. }
  183. }
  184. void SetCorruptedChunk(const std::string& fname, size_t offset, size_t len) {
  185. assert(corrupt_fname_.empty());
  186. corrupt_fname_ = fname;
  187. corrupt_offset_ = offset;
  188. corrupt_len_ = len;
  189. }
  190. void MaybeResetOverlapWithCorruptedChunk(const std::string& fname,
  191. size_t offset, size_t len) {
  192. if (fname == corrupt_fname_ &&
  193. ((offset <= corrupt_offset_ && (offset + len) > corrupt_offset_) ||
  194. (offset >= corrupt_offset_ &&
  195. offset < (corrupt_offset_ + corrupt_len_)))) {
  196. corrupt_fname_.clear();
  197. }
  198. }
  199. bool VerifyRetry() { return corrupt_len_ > 0 && corrupt_fname_.empty(); }
  200. int read_count() { return read_count_; }
  201. int corruption_trigger() { return corruption_trigger_; }
  202. private:
  203. int corruption_trigger_;
  204. int read_count_;
  205. std::string corrupt_fname_;
  206. size_t corrupt_offset_;
  207. size_t corrupt_len_;
  208. Random rnd_;
  209. bool fs_buffer_;
  210. bool verify_read_;
  211. port::Mutex mutex_;
  212. };
  213. } // anonymous namespace
  214. class DBIOFailureTest : public DBTestBase {
  215. public:
  216. DBIOFailureTest() : DBTestBase("db_io_failure_test", /*env_do_fsync=*/true) {}
  217. };
  218. // Check that number of files does not grow when writes are dropped
  219. TEST_F(DBIOFailureTest, DropWrites) {
  220. do {
  221. Options options = CurrentOptions();
  222. options.env = env_;
  223. options.paranoid_checks = false;
  224. Reopen(options);
  225. ASSERT_OK(Put("foo", "v1"));
  226. ASSERT_EQ("v1", Get("foo"));
  227. Compact("a", "z");
  228. const size_t num_files = CountFiles();
  229. // Force out-of-space errors
  230. env_->drop_writes_.store(true, std::memory_order_release);
  231. env_->sleep_counter_.Reset();
  232. env_->SetMockSleep();
  233. for (int i = 0; i < 5; i++) {
  234. if (option_config_ != kUniversalCompactionMultiLevel &&
  235. option_config_ != kUniversalSubcompactions) {
  236. for (int level = 0; level < dbfull()->NumberLevels(); level++) {
  237. if (level > 0 && level == dbfull()->NumberLevels() - 1) {
  238. break;
  239. }
  240. Status s =
  241. dbfull()->TEST_CompactRange(level, nullptr, nullptr, nullptr,
  242. true /* disallow trivial move */);
  243. ASSERT_TRUE(s.ok() || s.IsCorruption());
  244. }
  245. } else {
  246. Status s =
  247. dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  248. ASSERT_TRUE(s.ok() || s.IsCorruption());
  249. }
  250. }
  251. std::string property_value;
  252. ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
  253. ASSERT_EQ("5", property_value);
  254. env_->drop_writes_.store(false, std::memory_order_release);
  255. const size_t count = CountFiles();
  256. ASSERT_LT(count, num_files + 3);
  257. // Check that compaction attempts slept after errors
  258. // TODO @krad: Figure out why ASSERT_EQ 5 keeps failing in certain compiler
  259. // versions
  260. ASSERT_GE(env_->sleep_counter_.Read(), 4);
  261. } while (ChangeCompactOptions());
  262. }
  263. // Check background error counter bumped on flush failures.
  264. TEST_F(DBIOFailureTest, DropWritesFlush) {
  265. do {
  266. Options options = CurrentOptions();
  267. options.env = env_;
  268. options.max_background_flushes = 1;
  269. Reopen(options);
  270. ASSERT_OK(Put("foo", "v1"));
  271. // Force out-of-space errors
  272. env_->drop_writes_.store(true, std::memory_order_release);
  273. std::string property_value;
  274. // Background error count is 0 now.
  275. ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
  276. ASSERT_EQ("0", property_value);
  277. // ASSERT file is too short
  278. ASSERT_TRUE(dbfull()->TEST_FlushMemTable(true).IsCorruption());
  279. ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
  280. ASSERT_EQ("1", property_value);
  281. env_->drop_writes_.store(false, std::memory_order_release);
  282. } while (ChangeCompactOptions());
  283. }
  284. // Check that CompactRange() returns failure if there is not enough space left
  285. // on device
  286. TEST_F(DBIOFailureTest, NoSpaceCompactRange) {
  287. do {
  288. Options options = CurrentOptions();
  289. options.env = env_;
  290. options.disable_auto_compactions = true;
  291. Reopen(options);
  292. // generate 5 tables
  293. for (int i = 0; i < 5; ++i) {
  294. ASSERT_OK(Put(Key(i), Key(i) + "v"));
  295. ASSERT_OK(Flush());
  296. }
  297. // Force out-of-space errors
  298. env_->no_space_.store(true, std::memory_order_release);
  299. Status s = dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
  300. true /* disallow trivial move */);
  301. ASSERT_TRUE(s.IsIOError());
  302. ASSERT_TRUE(s.IsNoSpace());
  303. env_->no_space_.store(false, std::memory_order_release);
  304. } while (ChangeCompactOptions());
  305. }
  306. TEST_F(DBIOFailureTest, NonWritableFileSystem) {
  307. do {
  308. Options options = CurrentOptions();
  309. options.write_buffer_size = 4096;
  310. options.arena_block_size = 4096;
  311. options.env = env_;
  312. Reopen(options);
  313. ASSERT_OK(Put("foo", "v1"));
  314. env_->non_writeable_rate_.store(100);
  315. std::string big(100000, 'x');
  316. int errors = 0;
  317. for (int i = 0; i < 20; i++) {
  318. if (!Put("foo", big).ok()) {
  319. errors++;
  320. env_->SleepForMicroseconds(100000);
  321. }
  322. }
  323. ASSERT_GT(errors, 0);
  324. env_->non_writeable_rate_.store(0);
  325. } while (ChangeCompactOptions());
  326. }
  327. TEST_F(DBIOFailureTest, ManifestWriteError) {
  328. // Test for the following problem:
  329. // (a) Compaction produces file F
  330. // (b) Log record containing F is written to MANIFEST file, but Sync() fails
  331. // (c) GC deletes F
  332. // (d) After reopening DB, reads fail since deleted F is named in log record
  333. // We iterate twice. In the second iteration, everything is the
  334. // same except the log record never makes it to the MANIFEST file.
  335. for (int iter = 0; iter < 2; iter++) {
  336. std::atomic<bool>* error_type = (iter == 0) ? &env_->manifest_sync_error_
  337. : &env_->manifest_write_error_;
  338. // Insert foo=>bar mapping
  339. Options options = CurrentOptions();
  340. options.env = env_;
  341. options.create_if_missing = true;
  342. options.error_if_exists = false;
  343. options.paranoid_checks = true;
  344. DestroyAndReopen(options);
  345. ASSERT_OK(Put("foo", "bar"));
  346. ASSERT_EQ("bar", Get("foo"));
  347. // Memtable compaction (will succeed)
  348. ASSERT_OK(Flush());
  349. ASSERT_EQ("bar", Get("foo"));
  350. const int last = 2;
  351. MoveFilesToLevel(2);
  352. ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
  353. // Merging compaction (will fail)
  354. error_type->store(true, std::memory_order_release);
  355. ASSERT_NOK(
  356. dbfull()->TEST_CompactRange(last, nullptr, nullptr)); // Should fail
  357. ASSERT_EQ("bar", Get("foo"));
  358. error_type->store(false, std::memory_order_release);
  359. // Since paranoid_checks=true, writes should fail
  360. ASSERT_NOK(Put("foo2", "bar2"));
  361. // Recovery: should not lose data
  362. ASSERT_EQ("bar", Get("foo"));
  363. // Try again with paranoid_checks=false
  364. Close();
  365. options.paranoid_checks = false;
  366. Reopen(options);
  367. // Merging compaction (will fail)
  368. error_type->store(true, std::memory_order_release);
  369. Status s =
  370. dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
  371. if (iter == 0) {
  372. ASSERT_OK(s);
  373. } else {
  374. ASSERT_TRUE(s.IsIOError());
  375. }
  376. ASSERT_EQ("bar", Get("foo"));
  377. // Recovery: should not lose data
  378. error_type->store(false, std::memory_order_release);
  379. Reopen(options);
  380. ASSERT_EQ("bar", Get("foo"));
  381. // Since paranoid_checks=false, writes should succeed
  382. ASSERT_OK(Put("foo2", "bar2"));
  383. ASSERT_EQ("bar", Get("foo"));
  384. ASSERT_EQ("bar2", Get("foo2"));
  385. }
  386. }
  387. TEST_F(DBIOFailureTest, PutFailsParanoid) {
  388. // Test the following:
  389. // (a) A random put fails in paranoid mode (simulate by sync fail)
  390. // (b) All other puts have to fail, even if writes would succeed
  391. // (c) All of that should happen ONLY if paranoid_checks = true
  392. Options options = CurrentOptions();
  393. options.env = env_;
  394. options.create_if_missing = true;
  395. options.error_if_exists = false;
  396. options.paranoid_checks = true;
  397. DestroyAndReopen(options);
  398. CreateAndReopenWithCF({"pikachu"}, options);
  399. ASSERT_OK(Put(1, "foo", "bar"));
  400. ASSERT_OK(Put(1, "foo1", "bar1"));
  401. // simulate error
  402. env_->log_write_error_.store(true, std::memory_order_release);
  403. ASSERT_NOK(Put(1, "foo2", "bar2"));
  404. env_->log_write_error_.store(false, std::memory_order_release);
  405. // the next put should fail, too
  406. ASSERT_NOK(Put(1, "foo3", "bar3"));
  407. // but we're still able to read
  408. ASSERT_EQ("bar", Get(1, "foo"));
  409. // do the same thing with paranoid checks off
  410. options.paranoid_checks = false;
  411. DestroyAndReopen(options);
  412. CreateAndReopenWithCF({"pikachu"}, options);
  413. ASSERT_OK(Put(1, "foo", "bar"));
  414. ASSERT_OK(Put(1, "foo1", "bar1"));
  415. // simulate error
  416. env_->log_write_error_.store(true, std::memory_order_release);
  417. ASSERT_NOK(Put(1, "foo2", "bar2"));
  418. env_->log_write_error_.store(false, std::memory_order_release);
  419. // the next put should NOT fail
  420. ASSERT_OK(Put(1, "foo3", "bar3"));
  421. }
  422. #if !(defined NDEBUG) || !defined(OS_WIN)
  423. TEST_F(DBIOFailureTest, FlushSstRangeSyncError) {
  424. Options options = CurrentOptions();
  425. options.env = env_;
  426. options.create_if_missing = true;
  427. options.error_if_exists = false;
  428. options.paranoid_checks = true;
  429. options.write_buffer_size = 256 * 1024 * 1024;
  430. options.writable_file_max_buffer_size = 128 * 1024;
  431. options.bytes_per_sync = 128 * 1024;
  432. options.level0_file_num_compaction_trigger = 4;
  433. options.memtable_factory.reset(test::NewSpecialSkipListFactory(10));
  434. BlockBasedTableOptions table_options;
  435. table_options.filter_policy.reset(NewBloomFilterPolicy(10));
  436. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  437. DestroyAndReopen(options);
  438. CreateAndReopenWithCF({"pikachu"}, options);
  439. const char* io_error_msg = "range sync dummy error";
  440. std::atomic<int> range_sync_called(0);
  441. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  442. "SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
  443. if (range_sync_called.fetch_add(1) == 0) {
  444. Status* st = static_cast<Status*>(arg);
  445. *st = Status::IOError(io_error_msg);
  446. }
  447. });
  448. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  449. Random rnd(301);
  450. std::string rnd_str =
  451. rnd.RandomString(static_cast<int>(options.bytes_per_sync / 2));
  452. std::string rnd_str_512kb = rnd.RandomString(512 * 1024);
  453. ASSERT_OK(Put(1, "foo", "bar"));
  454. // First 1MB doesn't get range synced
  455. ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
  456. ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
  457. ASSERT_OK(Put(1, "foo1_1", rnd_str));
  458. ASSERT_OK(Put(1, "foo1_2", rnd_str));
  459. ASSERT_OK(Put(1, "foo1_3", rnd_str));
  460. ASSERT_OK(Put(1, "foo2", "bar"));
  461. ASSERT_OK(Put(1, "foo3_1", rnd_str));
  462. ASSERT_OK(Put(1, "foo3_2", rnd_str));
  463. ASSERT_OK(Put(1, "foo3_3", rnd_str));
  464. ASSERT_OK(Put(1, "foo4", "bar"));
  465. Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
  466. ASSERT_TRUE(s.IsIOError());
  467. ASSERT_STREQ(s.getState(), io_error_msg);
  468. // Following writes should fail as flush failed.
  469. ASSERT_NOK(Put(1, "foo2", "bar3"));
  470. ASSERT_EQ("bar", Get(1, "foo"));
  471. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  472. ASSERT_GE(1, range_sync_called.load());
  473. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  474. ASSERT_EQ("bar", Get(1, "foo"));
  475. }
  476. TEST_F(DBIOFailureTest, CompactSstRangeSyncError) {
  477. Options options = CurrentOptions();
  478. options.env = env_;
  479. options.create_if_missing = true;
  480. options.error_if_exists = false;
  481. options.paranoid_checks = true;
  482. options.write_buffer_size = 256 * 1024 * 1024;
  483. options.writable_file_max_buffer_size = 128 * 1024;
  484. options.bytes_per_sync = 128 * 1024;
  485. options.level0_file_num_compaction_trigger = 2;
  486. options.target_file_size_base = 256 * 1024 * 1024;
  487. options.disable_auto_compactions = true;
  488. BlockBasedTableOptions table_options;
  489. table_options.filter_policy.reset(NewBloomFilterPolicy(10));
  490. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  491. DestroyAndReopen(options);
  492. CreateAndReopenWithCF({"pikachu"}, options);
  493. Random rnd(301);
  494. std::string rnd_str =
  495. rnd.RandomString(static_cast<int>(options.bytes_per_sync / 2));
  496. std::string rnd_str_512kb = rnd.RandomString(512 * 1024);
  497. ASSERT_OK(Put(1, "foo", "bar"));
  498. // First 1MB doesn't get range synced
  499. ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
  500. ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
  501. ASSERT_OK(Put(1, "foo1_1", rnd_str));
  502. ASSERT_OK(Put(1, "foo1_2", rnd_str));
  503. ASSERT_OK(Put(1, "foo1_3", rnd_str));
  504. ASSERT_OK(Flush(1));
  505. ASSERT_OK(Put(1, "foo", "bar"));
  506. ASSERT_OK(Put(1, "foo3_1", rnd_str));
  507. ASSERT_OK(Put(1, "foo3_2", rnd_str));
  508. ASSERT_OK(Put(1, "foo3_3", rnd_str));
  509. ASSERT_OK(Put(1, "foo4", "bar"));
  510. ASSERT_OK(Flush(1));
  511. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
  512. const char* io_error_msg = "range sync dummy error";
  513. std::atomic<int> range_sync_called(0);
  514. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  515. "SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
  516. if (range_sync_called.fetch_add(1) == 0) {
  517. Status* st = static_cast<Status*>(arg);
  518. *st = Status::IOError(io_error_msg);
  519. }
  520. });
  521. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  522. ASSERT_OK(dbfull()->SetOptions(handles_[1],
  523. {
  524. {"disable_auto_compactions", "false"},
  525. }));
  526. Status s = dbfull()->TEST_WaitForCompact();
  527. ASSERT_TRUE(s.IsIOError());
  528. ASSERT_STREQ(s.getState(), io_error_msg);
  529. // Following writes should fail as flush failed.
  530. ASSERT_NOK(Put(1, "foo2", "bar3"));
  531. ASSERT_EQ("bar", Get(1, "foo"));
  532. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  533. ASSERT_GE(1, range_sync_called.load());
  534. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  535. ASSERT_EQ("bar", Get(1, "foo"));
  536. }
  537. TEST_F(DBIOFailureTest, FlushSstCloseError) {
  538. Options options = CurrentOptions();
  539. options.env = env_;
  540. options.create_if_missing = true;
  541. options.error_if_exists = false;
  542. options.paranoid_checks = true;
  543. options.level0_file_num_compaction_trigger = 4;
  544. options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
  545. DestroyAndReopen(options);
  546. CreateAndReopenWithCF({"pikachu"}, options);
  547. const char* io_error_msg = "close dummy error";
  548. std::atomic<int> close_called(0);
  549. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  550. "SpecialEnv::SStableFile::Close", [&](void* arg) {
  551. if (close_called.fetch_add(1) == 0) {
  552. Status* st = static_cast<Status*>(arg);
  553. *st = Status::IOError(io_error_msg);
  554. }
  555. });
  556. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  557. ASSERT_OK(Put(1, "foo", "bar"));
  558. ASSERT_OK(Put(1, "foo1", "bar1"));
  559. ASSERT_OK(Put(1, "foo", "bar2"));
  560. Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
  561. ASSERT_TRUE(s.IsIOError());
  562. ASSERT_STREQ(s.getState(), io_error_msg);
  563. // Following writes should fail as flush failed.
  564. ASSERT_NOK(Put(1, "foo2", "bar3"));
  565. ASSERT_EQ("bar2", Get(1, "foo"));
  566. ASSERT_EQ("bar1", Get(1, "foo1"));
  567. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  568. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  569. ASSERT_EQ("bar2", Get(1, "foo"));
  570. ASSERT_EQ("bar1", Get(1, "foo1"));
  571. }
  572. TEST_F(DBIOFailureTest, CompactionSstCloseError) {
  573. Options options = CurrentOptions();
  574. options.env = env_;
  575. options.create_if_missing = true;
  576. options.error_if_exists = false;
  577. options.paranoid_checks = true;
  578. options.level0_file_num_compaction_trigger = 2;
  579. options.disable_auto_compactions = true;
  580. DestroyAndReopen(options);
  581. CreateAndReopenWithCF({"pikachu"}, options);
  582. ASSERT_OK(Put(1, "foo", "bar"));
  583. ASSERT_OK(Put(1, "foo2", "bar"));
  584. ASSERT_OK(Flush(1));
  585. ASSERT_OK(Put(1, "foo", "bar2"));
  586. ASSERT_OK(Put(1, "foo2", "bar"));
  587. ASSERT_OK(Flush(1));
  588. ASSERT_OK(Put(1, "foo", "bar3"));
  589. ASSERT_OK(Put(1, "foo2", "bar"));
  590. ASSERT_OK(Flush(1));
  591. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  592. const char* io_error_msg = "close dummy error";
  593. std::atomic<int> close_called(0);
  594. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  595. "SpecialEnv::SStableFile::Close", [&](void* arg) {
  596. if (close_called.fetch_add(1) == 0) {
  597. Status* st = static_cast<Status*>(arg);
  598. *st = Status::IOError(io_error_msg);
  599. }
  600. });
  601. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  602. ASSERT_OK(dbfull()->SetOptions(handles_[1],
  603. {
  604. {"disable_auto_compactions", "false"},
  605. }));
  606. Status s = dbfull()->TEST_WaitForCompact();
  607. ASSERT_TRUE(s.IsIOError());
  608. ASSERT_STREQ(s.getState(), io_error_msg);
  609. // Following writes should fail as compaction failed.
  610. ASSERT_NOK(Put(1, "foo2", "bar3"));
  611. ASSERT_EQ("bar3", Get(1, "foo"));
  612. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  613. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  614. ASSERT_EQ("bar3", Get(1, "foo"));
  615. }
  616. TEST_F(DBIOFailureTest, FlushSstSyncError) {
  617. Options options = CurrentOptions();
  618. options.env = env_;
  619. options.create_if_missing = true;
  620. options.error_if_exists = false;
  621. options.paranoid_checks = true;
  622. options.use_fsync = false;
  623. options.level0_file_num_compaction_trigger = 4;
  624. options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
  625. DestroyAndReopen(options);
  626. CreateAndReopenWithCF({"pikachu"}, options);
  627. const char* io_error_msg = "sync dummy error";
  628. std::atomic<int> sync_called(0);
  629. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  630. "SpecialEnv::SStableFile::Sync", [&](void* arg) {
  631. if (sync_called.fetch_add(1) == 0) {
  632. Status* st = static_cast<Status*>(arg);
  633. *st = Status::IOError(io_error_msg);
  634. }
  635. });
  636. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  637. ASSERT_OK(Put(1, "foo", "bar"));
  638. ASSERT_OK(Put(1, "foo1", "bar1"));
  639. ASSERT_OK(Put(1, "foo", "bar2"));
  640. Status s = dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
  641. ASSERT_TRUE(s.IsIOError());
  642. ASSERT_STREQ(s.getState(), io_error_msg);
  643. // Following writes should fail as flush failed.
  644. ASSERT_NOK(Put(1, "foo2", "bar3"));
  645. ASSERT_EQ("bar2", Get(1, "foo"));
  646. ASSERT_EQ("bar1", Get(1, "foo1"));
  647. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  648. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  649. ASSERT_EQ("bar2", Get(1, "foo"));
  650. ASSERT_EQ("bar1", Get(1, "foo1"));
  651. }
  652. TEST_F(DBIOFailureTest, CompactionSstSyncError) {
  653. Options options = CurrentOptions();
  654. options.env = env_;
  655. options.create_if_missing = true;
  656. options.error_if_exists = false;
  657. options.paranoid_checks = true;
  658. options.level0_file_num_compaction_trigger = 2;
  659. options.disable_auto_compactions = true;
  660. options.use_fsync = false;
  661. DestroyAndReopen(options);
  662. CreateAndReopenWithCF({"pikachu"}, options);
  663. ASSERT_OK(Put(1, "foo", "bar"));
  664. ASSERT_OK(Put(1, "foo2", "bar"));
  665. ASSERT_OK(Flush(1));
  666. ASSERT_OK(Put(1, "foo", "bar2"));
  667. ASSERT_OK(Put(1, "foo2", "bar"));
  668. ASSERT_OK(Flush(1));
  669. ASSERT_OK(Put(1, "foo", "bar3"));
  670. ASSERT_OK(Put(1, "foo2", "bar"));
  671. ASSERT_OK(Flush(1));
  672. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  673. const char* io_error_msg = "sync dummy error";
  674. std::atomic<int> sync_called(0);
  675. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  676. "SpecialEnv::SStableFile::Sync", [&](void* arg) {
  677. if (sync_called.fetch_add(1) == 0) {
  678. Status* st = static_cast<Status*>(arg);
  679. *st = Status::IOError(io_error_msg);
  680. }
  681. });
  682. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  683. ASSERT_OK(dbfull()->SetOptions(handles_[1],
  684. {
  685. {"disable_auto_compactions", "false"},
  686. }));
  687. Status s = dbfull()->TEST_WaitForCompact();
  688. ASSERT_TRUE(s.IsIOError());
  689. ASSERT_STREQ(s.getState(), io_error_msg);
  690. // Following writes should fail as compaction failed.
  691. ASSERT_NOK(Put(1, "foo2", "bar3"));
  692. ASSERT_EQ("bar3", Get(1, "foo"));
  693. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  694. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  695. ASSERT_EQ("bar3", Get(1, "foo"));
  696. }
  697. #endif // !(defined NDEBUG) || !defined(OS_WIN)
  698. class DBIOCorruptionTest
  699. : public DBIOFailureTest,
  700. public testing::WithParamInterface<std::tuple<bool, bool, bool>> {
  701. public:
  702. DBIOCorruptionTest() : DBIOFailureTest() {
  703. BlockBasedTableOptions bbto;
  704. options_ = CurrentOptions();
  705. options_.statistics = CreateDBStatistics();
  706. base_env_ = env_;
  707. EXPECT_NE(base_env_, nullptr);
  708. fs_.reset(new CorruptionFS(base_env_->GetFileSystem(),
  709. std::get<0>(GetParam()),
  710. std::get<2>(GetParam())));
  711. env_guard_ = NewCompositeEnv(fs_);
  712. options_.env = env_guard_.get();
  713. bbto.num_file_reads_for_auto_readahead = 0;
  714. options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
  715. options_.disable_auto_compactions = true;
  716. options_.max_file_opening_threads = 0;
  717. Reopen(options_);
  718. }
  719. ~DBIOCorruptionTest() {
  720. Close();
  721. db_ = nullptr;
  722. }
  723. Status ReopenDB() { return TryReopen(options_); }
  724. Statistics* stats() { return options_.statistics.get(); }
  725. protected:
  726. std::unique_ptr<Env> env_guard_;
  727. std::shared_ptr<CorruptionFS> fs_;
  728. Env* base_env_;
  729. Options options_;
  730. };
  731. TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
  732. CorruptionFS* fs =
  733. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  734. ASSERT_OK(Put("key1", "val1"));
  735. ASSERT_OK(Flush());
  736. fs->SetCorruptionTrigger(1);
  737. std::string val;
  738. ReadOptions ro;
  739. ro.async_io = std::get<1>(GetParam());
  740. Status s = dbfull()->Get(ReadOptions(), "key1", &val);
  741. if (std::get<2>(GetParam())) {
  742. ASSERT_OK(s);
  743. ASSERT_EQ(val, "val1");
  744. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  745. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  746. 1);
  747. } else {
  748. ASSERT_TRUE(s.IsCorruption());
  749. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  750. }
  751. }
  752. TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
  753. CorruptionFS* fs =
  754. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  755. ASSERT_OK(Put("key1", "val1"));
  756. ASSERT_OK(Flush());
  757. fs->SetCorruptionTrigger(1);
  758. ReadOptions ro;
  759. ro.readahead_size = 65536;
  760. ro.async_io = std::get<1>(GetParam());
  761. Iterator* iter = dbfull()->NewIterator(ro);
  762. iter->SeekToFirst();
  763. while (iter->status().ok() && iter->Valid()) {
  764. iter->Next();
  765. }
  766. if (std::get<2>(GetParam())) {
  767. ASSERT_OK(iter->status());
  768. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  769. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  770. 1);
  771. } else {
  772. ASSERT_TRUE(iter->status().IsCorruption());
  773. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  774. }
  775. delete iter;
  776. }
  777. TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
  778. CorruptionFS* fs =
  779. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  780. ASSERT_OK(Put("key1", "val1"));
  781. ASSERT_OK(Put("key2", "val2"));
  782. ASSERT_OK(Flush());
  783. fs->SetCorruptionTrigger(1);
  784. std::vector<std::string> keystr{"key1", "key2"};
  785. std::vector<Slice> keys{Slice(keystr[0]), Slice(keystr[1])};
  786. std::vector<PinnableSlice> values(keys.size());
  787. std::vector<Status> statuses(keys.size());
  788. ReadOptions ro;
  789. ro.async_io = std::get<1>(GetParam());
  790. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  791. keys.data(), values.data(), statuses.data());
  792. if (std::get<2>(GetParam())) {
  793. ASSERT_EQ(values[0].ToString(), "val1");
  794. ASSERT_EQ(values[1].ToString(), "val2");
  795. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  796. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  797. 1);
  798. } else {
  799. ASSERT_TRUE(statuses[0].IsCorruption());
  800. ASSERT_TRUE(statuses[1].IsCorruption());
  801. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  802. }
  803. }
  804. TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
  805. CorruptionFS* fs =
  806. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  807. ASSERT_OK(Put("key1", "val1"));
  808. ASSERT_OK(Put("key3", "val3"));
  809. ASSERT_OK(Flush());
  810. ASSERT_OK(Put("key2", "val2"));
  811. ASSERT_OK(Flush());
  812. fs->SetCorruptionTrigger(1);
  813. Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  814. if (std::get<2>(GetParam())) {
  815. ASSERT_OK(s);
  816. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  817. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  818. 1);
  819. std::string val;
  820. ReadOptions ro;
  821. ro.async_io = std::get<1>(GetParam());
  822. ASSERT_OK(dbfull()->Get(ro, "key1", &val));
  823. ASSERT_EQ(val, "val1");
  824. } else {
  825. ASSERT_TRUE(s.IsCorruption());
  826. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  827. }
  828. }
  829. TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
  830. CorruptionFS* fs =
  831. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  832. ASSERT_OK(Put("key1", "val1"));
  833. fs->SetCorruptionTrigger(1);
  834. Status s = Flush();
  835. if (std::get<2>(GetParam())) {
  836. ASSERT_OK(s);
  837. ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  838. ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  839. 1);
  840. std::string val;
  841. ReadOptions ro;
  842. ro.async_io = std::get<1>(GetParam());
  843. ASSERT_OK(dbfull()->Get(ro, "key1", &val));
  844. ASSERT_EQ(val, "val1");
  845. } else {
  846. ASSERT_NOK(s);
  847. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  848. }
  849. }
  850. TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
  851. CorruptionFS* fs =
  852. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  853. ASSERT_OK(Put("key1", "val1"));
  854. ASSERT_OK(Flush());
  855. SyncPoint::GetInstance()->SetCallBack(
  856. "VersionSet::Recover:StartManifestRead",
  857. [&](void* /*arg*/) { fs->SetCorruptionTrigger(0); });
  858. SyncPoint::GetInstance()->EnableProcessing();
  859. if (std::get<2>(GetParam())) {
  860. ASSERT_OK(ReopenDB());
  861. ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  862. ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  863. 1);
  864. } else {
  865. ASSERT_EQ(ReopenDB(), Status::Corruption());
  866. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  867. }
  868. SyncPoint::GetInstance()->DisableProcessing();
  869. }
  870. TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
  871. Random rnd(300);
  872. bool retry = false;
  873. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  874. "ReadFooterFromFileInternal:0", [&](void* arg) {
  875. Slice* data = static_cast<Slice*>(arg);
  876. if (!retry) {
  877. std::memcpy(const_cast<char*>(data->data()),
  878. rnd.RandomString(static_cast<int>(data->size())).c_str(),
  879. data->size());
  880. retry = true;
  881. }
  882. });
  883. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  884. ASSERT_OK(Put("key1", "val1"));
  885. Status s = Flush();
  886. if (std::get<2>(GetParam())) {
  887. ASSERT_OK(s);
  888. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  889. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  890. 1);
  891. std::string val;
  892. ReadOptions ro;
  893. ro.async_io = std::get<1>(GetParam());
  894. ASSERT_OK(dbfull()->Get(ro, "key1", &val));
  895. ASSERT_EQ(val, "val1");
  896. } else {
  897. ASSERT_NOK(s);
  898. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  899. ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0);
  900. }
  901. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  902. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  903. }
  904. TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
  905. Random rnd(300);
  906. bool retry = false;
  907. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  908. "ReadTablePropertiesHelper:0", [&](void* arg) {
  909. Slice* data = static_cast<Slice*>(arg);
  910. if (!retry) {
  911. std::memcpy(const_cast<char*>(data->data()),
  912. rnd.RandomString(static_cast<int>(data->size())).c_str(),
  913. data->size());
  914. retry = true;
  915. }
  916. });
  917. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  918. ASSERT_OK(Put("key1", "val1"));
  919. Status s = Flush();
  920. if (std::get<2>(GetParam())) {
  921. ASSERT_OK(s);
  922. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
  923. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
  924. 1);
  925. std::string val;
  926. ReadOptions ro;
  927. ro.async_io = std::get<1>(GetParam());
  928. ASSERT_OK(dbfull()->Get(ro, "key1", &val));
  929. ASSERT_EQ(val, "val1");
  930. } else {
  931. ASSERT_NOK(s);
  932. ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
  933. }
  934. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  935. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  936. }
  937. TEST_P(DBIOCorruptionTest, DBOpenReadCorruptionRetry) {
  938. if (!std::get<2>(GetParam())) {
  939. return;
  940. }
  941. CorruptionFS* fs =
  942. static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
  943. for (int sst = 0; sst < 3; ++sst) {
  944. for (int key = 0; key < 100; ++key) {
  945. std::stringstream ss;
  946. ss << std::setw(3) << 100 * sst + key;
  947. ASSERT_OK(Put("key" + ss.str(), "val" + ss.str()));
  948. }
  949. ASSERT_OK(Flush());
  950. }
  951. Close();
  952. // DB open will create table readers unless we reduce the table cache
  953. // capacity.
  954. // SanitizeOptions will set max_open_files to minimum of 20. Table cache
  955. // is allocated with max_open_files - 10 as capacity. So override
  956. // max_open_files to 11 so table cache capacity will become 1. This will
  957. // prevent file open during DB open and force the file to be opened
  958. // during MultiGet
  959. SyncPoint::GetInstance()->SetCallBack(
  960. "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
  961. int* max_open_files = (int*)arg;
  962. *max_open_files = 11;
  963. });
  964. SyncPoint::GetInstance()->EnableProcessing();
  965. // Progressively increase the IO count trigger for corruption, and verify
  966. // that it was retried
  967. int corruption_trigger = 1;
  968. fs->SetCorruptionTrigger(corruption_trigger);
  969. do {
  970. fs->SetCorruptionTrigger(corruption_trigger);
  971. ASSERT_OK(ReopenDB());
  972. for (int sst = 0; sst < 3; ++sst) {
  973. for (int key = 0; key < 100; ++key) {
  974. std::stringstream ss;
  975. ss << std::setw(3) << 100 * sst + key;
  976. ASSERT_EQ(Get("key" + ss.str()), "val" + ss.str());
  977. }
  978. }
  979. // Verify that the injected corruption was repaired
  980. ASSERT_TRUE(fs->VerifyRetry());
  981. corruption_trigger++;
  982. } while (fs->corruption_trigger() == INT_MAX);
  983. }
  984. // The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
  985. // 3. Retry with verify_and_reconstruct_read IOOption
  986. INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
  987. testing::Combine(testing::Bool(), testing::Bool(),
  988. testing::Bool()));
  989. } // namespace ROCKSDB_NAMESPACE
  990. int main(int argc, char** argv) {
  991. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  992. ::testing::InitGoogleTest(&argc, argv);
  993. RegisterCustomObjects(argc, argv);
  994. return RUN_ALL_TESTS();
  995. }