corruption_test.cc 60 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <fcntl.h>
  10. #include <sys/stat.h>
  11. #include <sys/types.h>
  12. #include <cinttypes>
  13. #include "db/db_impl/db_impl.h"
  14. #include "db/db_test_util.h"
  15. #include "db/log_format.h"
  16. #include "db/version_set.h"
  17. #include "file/filename.h"
  18. #include "port/stack_trace.h"
  19. #include "rocksdb/cache.h"
  20. #include "rocksdb/convenience.h"
  21. #include "rocksdb/db.h"
  22. #include "rocksdb/env.h"
  23. #include "rocksdb/options.h"
  24. #include "rocksdb/table.h"
  25. #include "rocksdb/utilities/transaction_db.h"
  26. #include "rocksdb/write_batch.h"
  27. #include "table/block_based/block_based_table_builder.h"
  28. #include "table/meta_blocks.h"
  29. #include "table/mock_table.h"
  30. #include "test_util/testharness.h"
  31. #include "test_util/testutil.h"
  32. #include "util/cast_util.h"
  33. #include "util/random.h"
  34. #include "util/string_util.h"
  35. namespace ROCKSDB_NAMESPACE {
  36. static constexpr int kValueSize = 1000;
  37. namespace {
  38. // A wrapper that allows injection of errors.
  39. class ErrorFS : public FileSystemWrapper {
  40. public:
  41. bool writable_file_error_;
  42. int num_writable_file_errors_;
  43. explicit ErrorFS(const std::shared_ptr<FileSystem>& _target)
  44. : FileSystemWrapper(_target),
  45. writable_file_error_(false),
  46. num_writable_file_errors_(0) {}
  47. const char* Name() const override { return "ErrorEnv"; }
  48. IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
  49. std::unique_ptr<FSWritableFile>* result,
  50. IODebugContext* dbg) override {
  51. result->reset();
  52. if (writable_file_error_) {
  53. ++num_writable_file_errors_;
  54. return IOStatus::IOError(fname, "fake error");
  55. }
  56. return target()->NewWritableFile(fname, opts, result, dbg);
  57. }
  58. };
  59. } // anonymous namespace
  60. class CorruptionTest : public testing::Test {
  61. public:
  62. std::shared_ptr<Env> env_guard_;
  63. std::shared_ptr<ErrorFS> fs_;
  64. std::unique_ptr<Env> env_;
  65. Env* base_env_;
  66. std::string dbname_;
  67. std::shared_ptr<Cache> tiny_cache_;
  68. Options options_;
  69. DB* db_;
  70. CorruptionTest() {
  71. // If LRU cache shard bit is smaller than 2 (or -1 which will automatically
  72. // set it to 0), test SequenceNumberRecovery will fail, likely because of a
  73. // bug in recovery code. Keep it 4 for now to make the test passes.
  74. tiny_cache_ = NewLRUCache(100, 4);
  75. base_env_ = Env::Default();
  76. EXPECT_OK(
  77. test::CreateEnvFromSystem(ConfigOptions(), &base_env_, &env_guard_));
  78. EXPECT_NE(base_env_, nullptr);
  79. fs_.reset(new ErrorFS(base_env_->GetFileSystem()));
  80. env_ = NewCompositeEnv(fs_);
  81. options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
  82. options_.env = env_.get();
  83. dbname_ = test::PerThreadDBPath(env_.get(), "corruption_test");
  84. Status s = DestroyDB(dbname_, options_);
  85. EXPECT_OK(s);
  86. db_ = nullptr;
  87. options_.create_if_missing = true;
  88. BlockBasedTableOptions table_options;
  89. table_options.block_size_deviation = 0; // make unit test pass for now
  90. options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
  91. Reopen();
  92. options_.create_if_missing = false;
  93. }
  94. ~CorruptionTest() override {
  95. SyncPoint::GetInstance()->DisableProcessing();
  96. SyncPoint::GetInstance()->LoadDependency({});
  97. SyncPoint::GetInstance()->ClearAllCallBacks();
  98. delete db_;
  99. db_ = nullptr;
  100. if (getenv("KEEP_DB")) {
  101. fprintf(stdout, "db is still at %s\n", dbname_.c_str());
  102. } else {
  103. Options opts;
  104. opts.env = base_env_;
  105. EXPECT_OK(DestroyDB(dbname_, opts));
  106. }
  107. }
  108. void CloseDb() {
  109. delete db_;
  110. db_ = nullptr;
  111. }
  112. Status TryReopen(Options* options = nullptr) {
  113. delete db_;
  114. db_ = nullptr;
  115. Options opt = (options ? *options : options_);
  116. if (opt.env == Options().env) {
  117. // If env is not overridden, replace it with ErrorEnv.
  118. // Otherwise, the test already uses a non-default Env.
  119. opt.env = env_.get();
  120. }
  121. opt.arena_block_size = 4096;
  122. BlockBasedTableOptions table_options;
  123. table_options.block_cache = tiny_cache_;
  124. table_options.block_size_deviation = 0;
  125. opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
  126. return DB::Open(opt, dbname_, &db_);
  127. }
  128. void Reopen(Options* options = nullptr) { ASSERT_OK(TryReopen(options)); }
  129. void RepairDB() {
  130. delete db_;
  131. db_ = nullptr;
  132. ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
  133. }
  134. void Build(int n, int start, int flush_every) {
  135. std::string key_space, value_space;
  136. WriteBatch batch;
  137. for (int i = 0; i < n; i++) {
  138. if (flush_every != 0 && i != 0 && i % flush_every == 0) {
  139. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  140. ASSERT_OK(dbi->TEST_FlushMemTable());
  141. }
  142. // if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
  143. Slice key = Key(i + start, &key_space);
  144. batch.Clear();
  145. ASSERT_OK(batch.Put(key, Value(i + start, &value_space)));
  146. ASSERT_OK(db_->Write(WriteOptions(), &batch));
  147. }
  148. }
  149. void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
  150. void Check(int min_expected, int max_expected) {
  151. Check(min_expected, max_expected, ReadOptions(false, true));
  152. }
  153. void Check(int min_expected, int max_expected, ReadOptions read_options) {
  154. uint64_t next_expected = 0;
  155. uint64_t missed = 0;
  156. int bad_keys = 0;
  157. int bad_values = 0;
  158. int correct = 0;
  159. std::string value_space;
  160. // Do not verify checksums. If we verify checksums then the
  161. // db itself will raise errors because data is corrupted.
  162. // Instead, we want the reads to be successful and this test
  163. // will detect whether the appropriate corruptions have
  164. // occurred.
  165. Iterator* iter = db_->NewIterator(read_options);
  166. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  167. ASSERT_OK(iter->status());
  168. uint64_t key;
  169. Slice in(iter->key());
  170. if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
  171. key < next_expected) {
  172. bad_keys++;
  173. continue;
  174. }
  175. missed += (key - next_expected);
  176. next_expected = key + 1;
  177. if (iter->value() != Value(static_cast<int>(key), &value_space)) {
  178. bad_values++;
  179. } else {
  180. correct++;
  181. }
  182. }
  183. iter->status().PermitUncheckedError();
  184. delete iter;
  185. fprintf(
  186. stderr,
  187. "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
  188. min_expected, max_expected, correct, bad_keys, bad_values,
  189. static_cast<unsigned long long>(missed));
  190. ASSERT_LE(min_expected, correct);
  191. ASSERT_GE(max_expected, correct);
  192. }
  193. void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
  194. // Pick file to corrupt
  195. std::vector<std::string> filenames;
  196. ASSERT_OK(env_->GetChildren(dbname_, &filenames));
  197. uint64_t number;
  198. FileType type;
  199. std::string fname;
  200. int picked_number = -1;
  201. for (size_t i = 0; i < filenames.size(); i++) {
  202. if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
  203. static_cast<int>(number) > picked_number) { // Pick latest file
  204. fname = dbname_ + "/" + filenames[i];
  205. picked_number = static_cast<int>(number);
  206. }
  207. }
  208. ASSERT_TRUE(!fname.empty()) << filetype;
  209. ASSERT_OK(test::CorruptFile(env_.get(), fname, offset, bytes_to_corrupt,
  210. /*verify_checksum*/ filetype == kTableFile));
  211. }
  212. // corrupts exactly one file at level `level`. if no file found at level,
  213. // asserts
  214. void CorruptTableFileAtLevel(int level, int offset, int bytes_to_corrupt) {
  215. std::vector<LiveFileMetaData> metadata;
  216. db_->GetLiveFilesMetaData(&metadata);
  217. for (const auto& m : metadata) {
  218. if (m.level == level) {
  219. ASSERT_OK(test::CorruptFile(env_.get(), dbname_ + "/" + m.name, offset,
  220. bytes_to_corrupt));
  221. return;
  222. }
  223. }
  224. FAIL() << "no file found at level";
  225. }
  226. int Property(const std::string& name) {
  227. std::string property;
  228. int result;
  229. if (db_->GetProperty(name, &property) &&
  230. sscanf(property.c_str(), "%d", &result) == 1) {
  231. return result;
  232. } else {
  233. return -1;
  234. }
  235. }
  236. // Return the ith key
  237. Slice Key(int i, std::string* storage) {
  238. char buf[100];
  239. snprintf(buf, sizeof(buf), "%016d", i);
  240. storage->assign(buf, strlen(buf));
  241. return Slice(*storage);
  242. }
  243. // Return the value to associate with the specified key
  244. Slice Value(int k, std::string* storage) {
  245. if (k == 0) {
  246. // Ugh. Random seed of 0 used to produce no entropy. This code
  247. // preserves the implementation that was in place when all of the
  248. // magic values in this file were picked.
  249. *storage = std::string(kValueSize, ' ');
  250. } else {
  251. Random r(k);
  252. *storage = r.RandomString(kValueSize);
  253. }
  254. return Slice(*storage);
  255. }
  256. void GetSortedWalFiles(std::vector<uint64_t>& file_nums) {
  257. std::vector<std::string> tmp_files;
  258. ASSERT_OK(env_->GetChildren(dbname_, &tmp_files));
  259. FileType type = kWalFile;
  260. for (const auto& file : tmp_files) {
  261. uint64_t number = 0;
  262. if (ParseFileName(file, &number, &type) && type == kWalFile) {
  263. file_nums.push_back(number);
  264. }
  265. }
  266. std::sort(file_nums.begin(), file_nums.end());
  267. }
  268. void CorruptFileWithTruncation(FileType file, uint64_t number,
  269. uint64_t bytes_to_truncate = 0) {
  270. std::string path;
  271. switch (file) {
  272. case FileType::kWalFile:
  273. path = LogFileName(dbname_, number);
  274. break;
  275. // TODO: Add other file types as this method is being used for those file
  276. // types.
  277. default:
  278. return;
  279. }
  280. uint64_t old_size = 0;
  281. ASSERT_OK(env_->GetFileSize(path, &old_size));
  282. assert(old_size > bytes_to_truncate);
  283. uint64_t new_size = old_size - bytes_to_truncate;
  284. // If bytes_to_truncate == 0, it will do full truncation.
  285. if (bytes_to_truncate == 0) {
  286. new_size = 0;
  287. }
  288. ASSERT_OK(test::TruncateFile(env_.get(), path, new_size));
  289. }
  290. };
  291. TEST_F(CorruptionTest, Recovery) {
  292. Build(100);
  293. Check(100, 100);
  294. #ifdef OS_WIN
  295. // On Wndows OS Disk cache does not behave properly
  296. // We do not call FlushBuffers on every Flush. If we do not close
  297. // the log file prior to the corruption we end up with the first
  298. // block not corrupted but only the second. However, under the debugger
  299. // things work just fine but never pass when running normally
  300. // For that reason people may want to run with unbuffered I/O. That option
  301. // is not available for WAL though.
  302. CloseDb();
  303. #endif
  304. Corrupt(kWalFile, 19, 1); // WriteBatch tag for first record
  305. Corrupt(kWalFile, log::kBlockSize + 1000, 1); // Somewhere in second block
  306. ASSERT_TRUE(!TryReopen().ok());
  307. options_.paranoid_checks = false;
  308. Reopen(&options_);
  309. // The 64 records in the first two log blocks are completely lost.
  310. Check(36, 36);
  311. }
  312. TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
  313. // Repro for bug where WALs following the point-in-time recovery were not
  314. // retained leading to the next recovery failing.
  315. CloseDb();
  316. options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  317. const std::string test_cf_name = "test_cf";
  318. std::vector<ColumnFamilyDescriptor> cf_descs;
  319. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  320. cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
  321. uint64_t log_num;
  322. {
  323. options_.create_missing_column_families = true;
  324. std::vector<ColumnFamilyHandle*> cfhs;
  325. ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
  326. assert(db_ != nullptr); // suppress false clang-analyze report
  327. ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
  328. ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
  329. ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
  330. std::vector<uint64_t> file_nums;
  331. GetSortedWalFiles(file_nums);
  332. log_num = file_nums.back();
  333. for (auto* cfh : cfhs) {
  334. delete cfh;
  335. }
  336. CloseDb();
  337. }
  338. CorruptFileWithTruncation(FileType::kWalFile, log_num,
  339. /*bytes_to_truncate=*/1);
  340. {
  341. // Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
  342. options_.avoid_flush_during_recovery = true;
  343. std::vector<ColumnFamilyHandle*> cfhs;
  344. ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
  345. assert(db_ != nullptr); // suppress false clang-analyze report
  346. // Flush one but not both CFs and write some data so there's a seqno gap
  347. // between the PITR corruption and the next DB session's first WAL.
  348. ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
  349. ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
  350. for (auto* cfh : cfhs) {
  351. delete cfh;
  352. }
  353. CloseDb();
  354. }
  355. // With the bug, this DB open would remove the WALs following the PITR
  356. // corruption. Then, the next recovery would fail.
  357. for (int i = 0; i < 2; ++i) {
  358. std::vector<ColumnFamilyHandle*> cfhs;
  359. ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
  360. assert(db_ != nullptr); // suppress false clang-analyze report
  361. for (auto* cfh : cfhs) {
  362. delete cfh;
  363. }
  364. CloseDb();
  365. }
  366. }
  367. TEST_F(CorruptionTest, RecoverWriteError) {
  368. fs_->writable_file_error_ = true;
  369. Status s = TryReopen();
  370. ASSERT_TRUE(!s.ok());
  371. }
  372. TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
  373. // Do enough writing to force minor compaction
  374. fs_->writable_file_error_ = true;
  375. const int num =
  376. static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
  377. std::string value_storage;
  378. Status s;
  379. bool failed = false;
  380. for (int i = 0; i < num; i++) {
  381. WriteBatch batch;
  382. ASSERT_OK(batch.Put("a", Value(100, &value_storage)));
  383. s = db_->Write(WriteOptions(), &batch);
  384. if (!s.ok()) {
  385. failed = true;
  386. }
  387. ASSERT_TRUE(!failed || !s.ok());
  388. }
  389. ASSERT_TRUE(!s.ok());
  390. ASSERT_GE(fs_->num_writable_file_errors_, 1);
  391. fs_->writable_file_error_ = false;
  392. Reopen();
  393. }
  394. TEST_F(CorruptionTest, TableFile) {
  395. Build(100);
  396. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  397. ASSERT_OK(dbi->TEST_FlushMemTable());
  398. ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
  399. ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
  400. Corrupt(kTableFile, 100, 1);
  401. Check(99, 99);
  402. ASSERT_NOK(dbi->VerifyChecksum());
  403. }
  404. TEST_F(CorruptionTest, VerifyChecksumReadahead) {
  405. Options options;
  406. options.level_compaction_dynamic_level_bytes = false;
  407. SpecialEnv senv(base_env_);
  408. options.env = &senv;
  409. // Disable block cache as we are going to check checksum for
  410. // the same file twice and measure number of reads.
  411. BlockBasedTableOptions table_options_no_bc;
  412. table_options_no_bc.no_block_cache = true;
  413. options.table_factory.reset(NewBlockBasedTableFactory(table_options_no_bc));
  414. Reopen(&options);
  415. Build(10000);
  416. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  417. ASSERT_OK(dbi->TEST_FlushMemTable());
  418. ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
  419. ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
  420. senv.count_random_reads_ = true;
  421. senv.random_read_counter_.Reset();
  422. ASSERT_OK(dbi->VerifyChecksum());
  423. // Make sure the counter is enabled.
  424. ASSERT_GT(senv.random_read_counter_.Read(), 0);
  425. // The SST file is about 10MB. Default readahead size is 256KB.
  426. // Give a conservative 20 reads for metadata blocks, The number
  427. // of random reads should be within 10 MB / 256KB + 20 = 60.
  428. ASSERT_LT(senv.random_read_counter_.Read(), 60);
  429. senv.random_read_bytes_counter_ = 0;
  430. ReadOptions ro;
  431. ro.readahead_size = size_t{32 * 1024};
  432. ASSERT_OK(dbi->VerifyChecksum(ro));
  433. // The SST file is about 10MB. We set readahead size to 32KB.
  434. // Give 0 to 20 reads for metadata blocks, and allow real read
  435. // to range from 24KB to 48KB. The lower bound would be:
  436. // 10MB / 48KB + 0 = 213
  437. // The higher bound is
  438. // 10MB / 24KB + 20 = 447.
  439. ASSERT_GE(senv.random_read_counter_.Read(), 213);
  440. ASSERT_LE(senv.random_read_counter_.Read(), 447);
  441. // Test readahead shouldn't break mmap mode (where it should be
  442. // disabled).
  443. options.allow_mmap_reads = true;
  444. Reopen(&options);
  445. dbi = static_cast<DBImpl*>(db_);
  446. ASSERT_OK(dbi->VerifyChecksum(ro));
  447. CloseDb();
  448. }
  449. TEST_F(CorruptionTest, TableFileIndexData) {
  450. Options options;
  451. options.level_compaction_dynamic_level_bytes = false;
  452. // very big, we'll trigger flushes manually
  453. options.write_buffer_size = 100 * 1024 * 1024;
  454. Reopen(&options);
  455. // build 2 tables, flush at 5000
  456. Build(10000, 5000);
  457. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  458. ASSERT_OK(dbi->TEST_FlushMemTable());
  459. // corrupt an index block of an entire file
  460. Corrupt(kTableFile, -2000, 500);
  461. options.paranoid_checks = false;
  462. Reopen(&options);
  463. dbi = static_cast_with_check<DBImpl>(db_);
  464. // one full file may be readable, since only one was corrupted
  465. // the other file should be fully non-readable, since index was corrupted
  466. Check(0, 5000, ReadOptions(true, true));
  467. ASSERT_NOK(dbi->VerifyChecksum());
  468. // In paranoid mode, the db cannot be opened due to the corrupted file.
  469. ASSERT_TRUE(TryReopen().IsCorruption());
  470. }
  471. TEST_F(CorruptionTest, TableFileFooterMagic) {
  472. Build(100);
  473. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  474. ASSERT_OK(dbi->TEST_FlushMemTable());
  475. Check(100, 100);
  476. // Corrupt the whole footer
  477. Corrupt(kTableFile, -100, 100);
  478. Status s = TryReopen();
  479. ASSERT_TRUE(s.IsCorruption());
  480. // Contains useful message, and magic number should be the first thing
  481. // reported as corrupt.
  482. ASSERT_TRUE(s.ToString().find("magic number") != std::string::npos);
  483. // with file name
  484. ASSERT_TRUE(s.ToString().find(".sst") != std::string::npos);
  485. }
  486. TEST_F(CorruptionTest, TableFileFooterNotMagic) {
  487. Build(100);
  488. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  489. ASSERT_OK(dbi->TEST_FlushMemTable());
  490. Check(100, 100);
  491. // Corrupt footer except magic number
  492. Corrupt(kTableFile, -100, 92);
  493. Status s = TryReopen();
  494. ASSERT_TRUE(s.IsCorruption());
  495. // The next thing checked after magic number is format_version
  496. ASSERT_TRUE(s.ToString().find("format_version") != std::string::npos);
  497. // with file name
  498. ASSERT_TRUE(s.ToString().find(".sst") != std::string::npos);
  499. }
  500. TEST_F(CorruptionTest, DBOpenWithWrongFileSize) {
  501. // Validate that when paranoid flag is true, DB::Open() fails if one of the
  502. // file corrupted. Validate that when paranoid flag is false, DB::Open()
  503. // succeed if one of the file corrupted, and the healthy file is readable.
  504. CloseDb();
  505. const std::string test_cf_name = "test_cf";
  506. std::vector<ColumnFamilyDescriptor> cf_descs;
  507. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  508. cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
  509. {
  510. options_.create_missing_column_families = true;
  511. std::vector<ColumnFamilyHandle*> cfhs;
  512. ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
  513. assert(db_ != nullptr); // suppress false clang-analyze report
  514. ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
  515. ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k1", "v1"));
  516. ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
  517. for (auto* cfh : cfhs) {
  518. delete cfh;
  519. }
  520. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  521. ASSERT_OK(dbi->TEST_FlushMemTable());
  522. // ********************************************
  523. // Corrupt the file by making the file bigger
  524. std::vector<LiveFileMetaData> metadata;
  525. db_->GetLiveFilesMetaData(&metadata);
  526. std::string filename = dbname_ + metadata[0].name;
  527. const auto& fs = options_.env->GetFileSystem();
  528. {
  529. std::unique_ptr<FSWritableFile> f;
  530. ASSERT_OK(fs->ReopenWritableFile(filename, FileOptions(), &f, nullptr));
  531. ASSERT_OK(f->Append("blahblah", IOOptions(), nullptr));
  532. ASSERT_OK(f->Close(IOOptions(), nullptr));
  533. }
  534. CloseDb();
  535. }
  536. // DB failed to open due to one of the file is corrupted, as paranoid flag is
  537. // true
  538. options_.paranoid_checks = true;
  539. std::vector<ColumnFamilyHandle*> cfhs;
  540. auto s = DB::Open(options_, dbname_, cf_descs, &cfhs, &db_);
  541. ASSERT_TRUE(s.IsCorruption());
  542. ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
  543. // DB opened successfully, as paranoid flag is false, validate the one that is
  544. // healthy is still accessible
  545. options_.paranoid_checks = false;
  546. ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
  547. assert(db_ != nullptr); // suppress false clang-analyze report
  548. std::string v;
  549. ASSERT_OK(db_->Get(ReadOptions(), cfhs[1], "k1", &v));
  550. ASSERT_EQ(v, "v1");
  551. // Validate the default column family is corrupted
  552. Check(0, 0);
  553. s = db_->Get(ReadOptions(), cfhs[0], "k1", &v);
  554. ASSERT_TRUE(s.IsCorruption());
  555. delete cfhs[1];
  556. delete cfhs[0];
  557. }
  558. TEST_F(CorruptionTest, TableFileWrongSize) {
  559. Build(100);
  560. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  561. ASSERT_OK(dbi->TEST_FlushMemTable());
  562. Check(100, 100);
  563. // ********************************************
  564. // Make the file bigger by appending to it
  565. std::vector<LiveFileMetaData> metadata;
  566. db_->GetLiveFilesMetaData(&metadata);
  567. ASSERT_EQ(1U, metadata.size());
  568. std::string filename = dbname_ + metadata[0].name;
  569. const auto& fs = options_.env->GetFileSystem();
  570. {
  571. std::unique_ptr<FSWritableFile> f;
  572. ASSERT_OK(fs->ReopenWritableFile(filename, FileOptions(), &f, nullptr));
  573. ASSERT_OK(f->Append("blahblah", IOOptions(), nullptr));
  574. ASSERT_OK(f->Close(IOOptions(), nullptr));
  575. }
  576. // DB actually accepts this without paranoid checks, relying on size
  577. // recorded in manifest to locate the SST footer.
  578. options_.paranoid_checks = false;
  579. Reopen();
  580. // As footer could not be extraced, file is completely unreadable
  581. Check(0, 0);
  582. std::string v;
  583. auto s = db_->Get(ReadOptions(), "k1", &v);
  584. ASSERT_TRUE(s.IsCorruption());
  585. // But reports the issue with paranoid checks
  586. options_.paranoid_checks = true;
  587. s = TryReopen();
  588. ASSERT_TRUE(s.IsCorruption());
  589. ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
  590. // ********************************************
  591. // Make the file smaller with truncation.
  592. // First leaving a partial footer, and then completely removing footer.
  593. for (size_t bytes_lost : {8, 100}) {
  594. ASSERT_OK(test::TruncateFile(env_.get(), filename,
  595. metadata[0].size - bytes_lost));
  596. // Reported well with paranoid checks
  597. options_.paranoid_checks = true;
  598. s = TryReopen();
  599. ASSERT_TRUE(s.IsCorruption());
  600. ASSERT_TRUE(s.ToString().find("file size mismatch") != std::string::npos);
  601. // Without paranoid checks, not reported until read
  602. options_.paranoid_checks = false;
  603. Reopen();
  604. Check(0, 0); // Missing data
  605. }
  606. }
  607. TEST_F(CorruptionTest, MissingDescriptor) {
  608. Build(1000);
  609. RepairDB();
  610. Reopen();
  611. Check(1000, 1000);
  612. }
  613. TEST_F(CorruptionTest, SequenceNumberRecovery) {
  614. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
  615. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
  616. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
  617. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
  618. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
  619. RepairDB();
  620. Reopen();
  621. std::string v;
  622. ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
  623. ASSERT_EQ("v5", v);
  624. // Write something. If sequence number was not recovered properly,
  625. // it will be hidden by an earlier write.
  626. ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
  627. ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
  628. ASSERT_EQ("v6", v);
  629. Reopen();
  630. ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
  631. ASSERT_EQ("v6", v);
  632. }
  633. TEST_F(CorruptionTest, CorruptedDescriptor) {
  634. ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
  635. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  636. ASSERT_OK(dbi->TEST_FlushMemTable());
  637. CompactRangeOptions cro;
  638. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  639. ASSERT_OK(
  640. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
  641. Corrupt(kDescriptorFile, 0, 1000);
  642. Status s = TryReopen();
  643. ASSERT_TRUE(!s.ok());
  644. RepairDB();
  645. Reopen();
  646. std::string v;
  647. ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
  648. ASSERT_EQ("hello", v);
  649. }
  650. TEST_F(CorruptionTest, CompactionInputError) {
  651. Options options;
  652. options.level_compaction_dynamic_level_bytes = false;
  653. options.env = env_.get();
  654. Reopen(&options);
  655. Build(10);
  656. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  657. ASSERT_OK(dbi->TEST_FlushMemTable());
  658. ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
  659. ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
  660. ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
  661. Corrupt(kTableFile, 100, 1);
  662. Check(9, 9);
  663. ASSERT_NOK(dbi->VerifyChecksum());
  664. // Force compactions by writing lots of values
  665. Build(10000);
  666. Check(10000, 10000);
  667. ASSERT_NOK(dbi->VerifyChecksum());
  668. }
  669. TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
  670. Options options;
  671. options.level_compaction_dynamic_level_bytes = false;
  672. options.env = env_.get();
  673. options.paranoid_checks = true;
  674. options.write_buffer_size = 131072;
  675. options.max_write_buffer_number = 2;
  676. Reopen(&options);
  677. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  678. // Fill levels >= 1
  679. for (int level = 1; level < dbi->NumberLevels(); level++) {
  680. ASSERT_OK(dbi->Put(WriteOptions(), "", "begin"));
  681. ASSERT_OK(dbi->Put(WriteOptions(), "~", "end"));
  682. ASSERT_OK(dbi->TEST_FlushMemTable());
  683. for (int comp_level = 0; comp_level < dbi->NumberLevels() - level;
  684. ++comp_level) {
  685. ASSERT_OK(dbi->TEST_CompactRange(comp_level, nullptr, nullptr));
  686. }
  687. }
  688. Reopen(&options);
  689. dbi = static_cast_with_check<DBImpl>(db_);
  690. Build(10);
  691. ASSERT_OK(dbi->TEST_FlushMemTable());
  692. ASSERT_OK(dbi->TEST_WaitForCompact());
  693. ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
  694. CorruptTableFileAtLevel(0, 100, 1);
  695. Check(9, 9);
  696. ASSERT_NOK(dbi->VerifyChecksum());
  697. // Write must eventually fail because of corrupted table
  698. Status s;
  699. std::string tmp1, tmp2;
  700. bool failed = false;
  701. for (int i = 0; i < 10000; i++) {
  702. s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
  703. if (!s.ok()) {
  704. failed = true;
  705. }
  706. // if one write failed, every subsequent write must fail, too
  707. ASSERT_TRUE(!failed || !s.ok()) << "write did not fail in a corrupted db";
  708. }
  709. ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
  710. }
  711. TEST_F(CorruptionTest, UnrelatedKeys) {
  712. Build(10);
  713. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  714. ASSERT_OK(dbi->TEST_FlushMemTable());
  715. Corrupt(kTableFile, 100, 1);
  716. ASSERT_NOK(dbi->VerifyChecksum());
  717. std::string tmp1, tmp2;
  718. ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
  719. std::string v;
  720. ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
  721. ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
  722. ASSERT_OK(dbi->TEST_FlushMemTable());
  723. ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
  724. ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
  725. }
  726. TEST_F(CorruptionTest, RangeDeletionCorrupted) {
  727. ASSERT_OK(
  728. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
  729. ASSERT_OK(db_->Flush(FlushOptions()));
  730. std::vector<LiveFileMetaData> metadata;
  731. db_->GetLiveFilesMetaData(&metadata);
  732. ASSERT_EQ(static_cast<size_t>(1), metadata.size());
  733. std::string filename = dbname_ + metadata[0].name;
  734. FileOptions file_opts;
  735. const auto& fs = options_.env->GetFileSystem();
  736. std::unique_ptr<RandomAccessFileReader> file_reader;
  737. ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
  738. &file_reader, nullptr));
  739. uint64_t file_size;
  740. ASSERT_OK(
  741. fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
  742. BlockHandle range_del_handle;
  743. const ReadOptions read_options;
  744. ASSERT_OK(FindMetaBlockInFile(file_reader.get(), file_size,
  745. kBlockBasedTableMagicNumber,
  746. ImmutableOptions(options_), read_options,
  747. kRangeDelBlockName, &range_del_handle));
  748. ASSERT_OK(TryReopen());
  749. ASSERT_OK(test::CorruptFile(env_.get(), filename,
  750. static_cast<int>(range_del_handle.offset()), 1));
  751. ASSERT_TRUE(TryReopen().IsCorruption());
  752. }
  753. TEST_F(CorruptionTest, FileSystemStateCorrupted) {
  754. for (int iter = 0; iter < 2; ++iter) {
  755. Options options;
  756. options.level_compaction_dynamic_level_bytes = false;
  757. options.env = env_.get();
  758. options.paranoid_checks = true;
  759. options.create_if_missing = true;
  760. Reopen(&options);
  761. Build(10);
  762. ASSERT_OK(db_->Flush(FlushOptions()));
  763. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  764. std::vector<LiveFileMetaData> metadata;
  765. dbi->GetLiveFilesMetaData(&metadata);
  766. ASSERT_GT(metadata.size(), 0);
  767. std::string filename = dbname_ + metadata[0].name;
  768. delete db_;
  769. db_ = nullptr;
  770. if (iter == 0) { // corrupt file size
  771. std::unique_ptr<WritableFile> file;
  772. ASSERT_OK(env_->NewWritableFile(filename, &file, EnvOptions()));
  773. ASSERT_OK(file->Append(Slice("corrupted sst")));
  774. file.reset();
  775. Status x = TryReopen(&options);
  776. ASSERT_TRUE(x.IsCorruption());
  777. } else { // delete the file
  778. ASSERT_OK(env_->DeleteFile(filename));
  779. Status x = TryReopen(&options);
  780. ASSERT_TRUE(x.IsCorruption());
  781. }
  782. ASSERT_OK(DestroyDB(dbname_, options_));
  783. }
  784. }
  785. static const auto& corruption_modes = {
  786. mock::MockTableFactory::kCorruptNone, mock::MockTableFactory::kCorruptKey,
  787. mock::MockTableFactory::kCorruptValue,
  788. mock::MockTableFactory::kCorruptReorderKey};
  789. TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
  790. Options options;
  791. options.level_compaction_dynamic_level_bytes = false;
  792. options.env = env_.get();
  793. options.paranoid_file_checks = true;
  794. options.create_if_missing = true;
  795. Status s;
  796. for (const auto& mode : corruption_modes) {
  797. delete db_;
  798. db_ = nullptr;
  799. s = DestroyDB(dbname_, options);
  800. ASSERT_OK(s);
  801. std::shared_ptr<mock::MockTableFactory> mock =
  802. std::make_shared<mock::MockTableFactory>();
  803. options.table_factory = mock;
  804. mock->SetCorruptionMode(mode);
  805. ASSERT_OK(DB::Open(options, dbname_, &db_));
  806. assert(db_ != nullptr); // suppress false clang-analyze report
  807. Build(10);
  808. s = db_->Flush(FlushOptions());
  809. if (mode == mock::MockTableFactory::kCorruptNone) {
  810. ASSERT_OK(s);
  811. } else {
  812. ASSERT_NOK(s);
  813. }
  814. }
  815. }
  816. TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
  817. Options options;
  818. options.level_compaction_dynamic_level_bytes = false;
  819. options.env = env_.get();
  820. options.paranoid_file_checks = true;
  821. options.create_if_missing = true;
  822. Status s;
  823. for (const auto& mode : corruption_modes) {
  824. delete db_;
  825. db_ = nullptr;
  826. s = DestroyDB(dbname_, options);
  827. ASSERT_OK(s);
  828. std::shared_ptr<mock::MockTableFactory> mock =
  829. std::make_shared<mock::MockTableFactory>();
  830. options.table_factory = mock;
  831. ASSERT_OK(DB::Open(options, dbname_, &db_));
  832. assert(db_ != nullptr); // suppress false clang-analyze report
  833. Build(100, 2);
  834. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  835. ASSERT_OK(dbi->TEST_FlushMemTable());
  836. mock->SetCorruptionMode(mode);
  837. CompactRangeOptions cro;
  838. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  839. s = dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
  840. if (mode == mock::MockTableFactory::kCorruptNone) {
  841. ASSERT_OK(s);
  842. } else {
  843. ASSERT_NOK(s);
  844. }
  845. }
  846. }
  847. TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
  848. Options options;
  849. options.level_compaction_dynamic_level_bytes = false;
  850. options.env = env_.get();
  851. options.paranoid_file_checks = true;
  852. options.create_if_missing = true;
  853. for (bool do_flush : {true, false}) {
  854. delete db_;
  855. db_ = nullptr;
  856. ASSERT_OK(DestroyDB(dbname_, options));
  857. ASSERT_OK(DB::Open(options, dbname_, &db_));
  858. std::string start, end;
  859. assert(db_ != nullptr); // suppress false clang-analyze report
  860. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  861. Key(3, &start), Key(7, &end)));
  862. auto snap = db_->GetSnapshot();
  863. ASSERT_NE(snap, nullptr);
  864. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  865. Key(8, &start), Key(9, &end)));
  866. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  867. Key(2, &start), Key(5, &end)));
  868. Build(10);
  869. if (do_flush) {
  870. ASSERT_OK(db_->Flush(FlushOptions()));
  871. } else {
  872. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  873. ASSERT_OK(dbi->TEST_FlushMemTable());
  874. CompactRangeOptions cro;
  875. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  876. ASSERT_OK(
  877. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
  878. }
  879. db_->ReleaseSnapshot(snap);
  880. }
  881. }
  882. TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
  883. Options options;
  884. options.level_compaction_dynamic_level_bytes = false;
  885. options.env = env_.get();
  886. options.paranoid_file_checks = true;
  887. options.create_if_missing = true;
  888. for (bool do_flush : {true, false}) {
  889. delete db_;
  890. db_ = nullptr;
  891. ASSERT_OK(DestroyDB(dbname_, options));
  892. ASSERT_OK(DB::Open(options, dbname_, &db_));
  893. assert(db_ != nullptr); // suppress false clang-analyze report
  894. Build(10, 0, 0);
  895. std::string start, end;
  896. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  897. Key(5, &start), Key(15, &end)));
  898. auto snap = db_->GetSnapshot();
  899. ASSERT_NE(snap, nullptr);
  900. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  901. Key(8, &start), Key(9, &end)));
  902. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  903. Key(12, &start), Key(17, &end)));
  904. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  905. Key(2, &start), Key(4, &end)));
  906. Build(10, 10, 0);
  907. if (do_flush) {
  908. ASSERT_OK(db_->Flush(FlushOptions()));
  909. } else {
  910. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  911. ASSERT_OK(dbi->TEST_FlushMemTable());
  912. CompactRangeOptions cro;
  913. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  914. ASSERT_OK(
  915. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
  916. }
  917. db_->ReleaseSnapshot(snap);
  918. }
  919. }
  920. TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
  921. Options options;
  922. options.level_compaction_dynamic_level_bytes = false;
  923. options.env = env_.get();
  924. options.paranoid_file_checks = true;
  925. options.create_if_missing = true;
  926. for (bool do_flush : {true, false}) {
  927. delete db_;
  928. db_ = nullptr;
  929. ASSERT_OK(DestroyDB(dbname_, options));
  930. ASSERT_OK(DB::Open(options, dbname_, &db_));
  931. assert(db_ != nullptr); // suppress false clang-analyze report
  932. std::string start, end;
  933. Build(10);
  934. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  935. Key(3, &start), Key(7, &end)));
  936. auto snap = db_->GetSnapshot();
  937. ASSERT_NE(snap, nullptr);
  938. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  939. Key(6, &start), Key(8, &end)));
  940. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  941. Key(2, &start), Key(5, &end)));
  942. if (do_flush) {
  943. ASSERT_OK(db_->Flush(FlushOptions()));
  944. } else {
  945. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  946. ASSERT_OK(dbi->TEST_FlushMemTable());
  947. CompactRangeOptions cro;
  948. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  949. ASSERT_OK(
  950. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
  951. }
  952. db_->ReleaseSnapshot(snap);
  953. }
  954. }
  955. TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
  956. Options options;
  957. options.level_compaction_dynamic_level_bytes = false;
  958. options.env = env_.get();
  959. options.create_if_missing = true;
  960. options.allow_data_in_errors = true;
  961. auto mode = mock::MockTableFactory::kCorruptKey;
  962. delete db_;
  963. db_ = nullptr;
  964. ASSERT_OK(DestroyDB(dbname_, options));
  965. std::shared_ptr<mock::MockTableFactory> mock =
  966. std::make_shared<mock::MockTableFactory>();
  967. mock->SetCorruptionMode(mode);
  968. options.table_factory = mock;
  969. ASSERT_OK(DB::Open(options, dbname_, &db_));
  970. assert(db_ != nullptr); // suppress false clang-analyze report
  971. Build(100, 2);
  972. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  973. ASSERT_OK(dbi->TEST_FlushMemTable());
  974. CompactRangeOptions cro;
  975. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  976. Status s =
  977. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
  978. ASSERT_NOK(s);
  979. ASSERT_TRUE(s.IsCorruption());
  980. }
  981. TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
  982. Options options;
  983. options.level_compaction_dynamic_level_bytes = false;
  984. options.env = env_.get();
  985. options.paranoid_file_checks = false;
  986. options.create_if_missing = true;
  987. delete db_;
  988. db_ = nullptr;
  989. ASSERT_OK(DestroyDB(dbname_, options));
  990. std::shared_ptr<mock::MockTableFactory> mock =
  991. std::make_shared<mock::MockTableFactory>();
  992. options.table_factory = mock;
  993. ASSERT_OK(DB::Open(options, dbname_, &db_));
  994. assert(db_ != nullptr); // suppress false clang-analyze report
  995. mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
  996. Build(100, 2);
  997. DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
  998. ASSERT_OK(dbi->TEST_FlushMemTable());
  999. mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
  1000. CompactRangeOptions cro;
  1001. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  1002. ASSERT_NOK(
  1003. dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
  1004. }
  1005. TEST_F(CorruptionTest, FlushKeyOrderCheck) {
  1006. Options options;
  1007. options.level_compaction_dynamic_level_bytes = false;
  1008. options.env = env_.get();
  1009. options.paranoid_file_checks = false;
  1010. options.create_if_missing = true;
  1011. ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
  1012. ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
  1013. ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
  1014. ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
  1015. int cnt = 0;
  1016. // Generate some out of order keys from the memtable
  1017. SyncPoint::GetInstance()->SetCallBack(
  1018. "MemTableIterator::Next:0", [&](void* arg) {
  1019. MemTableRep::Iterator* mem_iter =
  1020. static_cast<MemTableRep::Iterator*>(arg);
  1021. if (++cnt == 3) {
  1022. mem_iter->Prev();
  1023. mem_iter->Prev();
  1024. }
  1025. });
  1026. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1027. Status s = static_cast_with_check<DBImpl>(db_)->TEST_FlushMemTable();
  1028. ASSERT_NOK(s);
  1029. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1030. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1031. }
  1032. TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
  1033. CloseDb();
  1034. Options options;
  1035. options.level_compaction_dynamic_level_bytes = false;
  1036. options.env = env_.get();
  1037. ASSERT_OK(DestroyDB(dbname_, options));
  1038. options.create_if_missing = true;
  1039. options.file_checksum_gen_factory =
  1040. ROCKSDB_NAMESPACE::GetFileChecksumGenCrc32cFactory();
  1041. Reopen(&options);
  1042. Build(10, 5);
  1043. ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
  1044. CloseDb();
  1045. // Corrupt the first byte of each table file, this must be data block.
  1046. Corrupt(kTableFile, 0, 1);
  1047. ASSERT_OK(TryReopen(&options));
  1048. SyncPoint::GetInstance()->DisableProcessing();
  1049. SyncPoint::GetInstance()->ClearAllCallBacks();
  1050. int count{0};
  1051. SyncPoint::GetInstance()->SetCallBack(
  1052. "DBImpl::VerifyFullFileChecksum:mismatch", [&](void* arg) {
  1053. auto* s = static_cast<Status*>(arg);
  1054. ASSERT_NE(s, nullptr);
  1055. ++count;
  1056. ASSERT_NOK(*s);
  1057. });
  1058. SyncPoint::GetInstance()->EnableProcessing();
  1059. ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsCorruption());
  1060. ASSERT_EQ(1, count);
  1061. }
  1062. class CrashDuringRecoveryWithCorruptionTest
  1063. : public CorruptionTest,
  1064. public testing::WithParamInterface<std::tuple<bool, bool>> {
  1065. public:
  1066. explicit CrashDuringRecoveryWithCorruptionTest()
  1067. : CorruptionTest(),
  1068. avoid_flush_during_recovery_(std::get<0>(GetParam())),
  1069. track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {}
  1070. protected:
  1071. const bool avoid_flush_during_recovery_;
  1072. const bool track_and_verify_wals_in_manifest_;
  1073. };
  1074. INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
  1075. ::testing::Values(std::make_tuple(true, false),
  1076. std::make_tuple(false, false),
  1077. std::make_tuple(true, true),
  1078. std::make_tuple(false, true)));
  1079. // In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
  1080. // won't flush the data from WAL to L0 for all column families if possible. As a
  1081. // result, not all column families can increase their log_numbers, and
  1082. // min_log_number_to_keep won't change.
  1083. // It may prematurely persist a new MANIFEST even before we can declare the DB
  1084. // is in consistent state after recovery (this is when the new WAL is synced)
  1085. // and advances log_numbers for some column families.
  1086. //
  1087. // If there is power failure before we sync the new WAL, we will end up in
  1088. // a situation in which after persisting the MANIFEST, RocksDB will see some
  1089. // column families' log_numbers larger than the corrupted wal, and
  1090. // "Column family inconsistency: SST file contains data beyond the point of
  1091. // corruption" error will be hit, causing recovery to fail.
  1092. //
  1093. // After adding the fix, only after new WAL is synced, RocksDB persist a new
  1094. // MANIFEST with column families to ensure RocksDB is in consistent state.
  1095. // RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
  1096. // synced immediately afterwards. The sequence number of the sentinel
  1097. // WriteBatch will be the next sequence number immediately after the largest
  1098. // sequence number recovered from previous WALs and MANIFEST because of which DB
  1099. // will be in consistent state.
  1100. // If a future recovery starts from the new MANIFEST, then it means the new WAL
  1101. // is successfully synced. Due to the sentinel empty write batch at the
  1102. // beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
  1103. // If future recovery starts from the old MANIFEST, it means the writing the new
  1104. // MANIFEST failed. It won't have the "SST ahead of WAL" error.
  1105. //
  1106. // The combination of corrupting a WAL and injecting an error during subsequent
  1107. // re-open exposes the bug of prematurely persisting a new MANIFEST with
  1108. // advanced ColumnFamilyData::log_number.
  1109. TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
  1110. CloseDb();
  1111. Options options;
  1112. options.level_compaction_dynamic_level_bytes = false;
  1113. options.track_and_verify_wals_in_manifest =
  1114. track_and_verify_wals_in_manifest_;
  1115. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1116. options.avoid_flush_during_recovery = false;
  1117. options.env = env_.get();
  1118. ASSERT_OK(DestroyDB(dbname_, options));
  1119. options.create_if_missing = true;
  1120. options.max_write_buffer_number = 8;
  1121. Reopen(&options);
  1122. Status s;
  1123. const std::string test_cf_name = "test_cf";
  1124. ColumnFamilyHandle* cfh = nullptr;
  1125. s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
  1126. ASSERT_OK(s);
  1127. delete cfh;
  1128. CloseDb();
  1129. std::vector<ColumnFamilyDescriptor> cf_descs;
  1130. cf_descs.emplace_back(kDefaultColumnFamilyName, options);
  1131. cf_descs.emplace_back(test_cf_name, options);
  1132. std::vector<ColumnFamilyHandle*> handles;
  1133. // 1. Open and populate the DB. Write and flush default_cf several times to
  1134. // advance wal number so that some column families have advanced log_number
  1135. // while other don't.
  1136. {
  1137. ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
  1138. auto* dbimpl = static_cast_with_check<DBImpl>(db_);
  1139. assert(dbimpl);
  1140. // Write one key to test_cf.
  1141. ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
  1142. ASSERT_OK(db_->Flush(FlushOptions(), handles[1]));
  1143. // Write to default_cf and flush this cf several times to advance wal
  1144. // number. TEST_SwitchMemtable makes sure WALs are not synced and test can
  1145. // corrupt un-sync WAL.
  1146. for (int i = 0; i < 2; ++i) {
  1147. ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
  1148. "value" + std::to_string(i)));
  1149. ASSERT_OK(dbimpl->TEST_SwitchMemtable());
  1150. }
  1151. for (auto* h : handles) {
  1152. delete h;
  1153. }
  1154. handles.clear();
  1155. CloseDb();
  1156. }
  1157. // 2. Corrupt second last un-syned wal file to emulate power reset which
  1158. // caused the DB to lose the un-synced WAL.
  1159. {
  1160. std::vector<uint64_t> file_nums;
  1161. GetSortedWalFiles(file_nums);
  1162. size_t size = file_nums.size();
  1163. assert(size >= 2);
  1164. uint64_t log_num = file_nums[size - 2];
  1165. CorruptFileWithTruncation(FileType::kWalFile, log_num,
  1166. /*bytes_to_truncate=*/8);
  1167. }
  1168. // 3. After first crash reopen the DB which contains corrupted WAL. Default
  1169. // family has higher log number than corrupted wal number.
  1170. //
  1171. // Case1: If avoid_flush_during_recovery = true, RocksDB won't flush the data
  1172. // from WAL to L0 for all column families (test_cf_name in this case). As a
  1173. // result, not all column families can increase their log_numbers, and
  1174. // min_log_number_to_keep won't change.
  1175. //
  1176. // Case2: If avoid_flush_during_recovery = false, all column families have
  1177. // flushed their data from WAL to L0 during recovery, and none of them will
  1178. // ever need to read the WALs again.
  1179. // 4. Fault is injected to fail the recovery.
  1180. {
  1181. SyncPoint::GetInstance()->DisableProcessing();
  1182. SyncPoint::GetInstance()->ClearAllCallBacks();
  1183. SyncPoint::GetInstance()->SetCallBack(
  1184. "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
  1185. auto* tmp_s = static_cast<Status*>(arg);
  1186. assert(tmp_s);
  1187. *tmp_s = Status::IOError("Injected");
  1188. });
  1189. SyncPoint::GetInstance()->EnableProcessing();
  1190. handles.clear();
  1191. options.avoid_flush_during_recovery = true;
  1192. s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
  1193. ASSERT_TRUE(s.IsIOError());
  1194. ASSERT_EQ("IO error: Injected", s.ToString());
  1195. for (auto* h : handles) {
  1196. delete h;
  1197. }
  1198. CloseDb();
  1199. SyncPoint::GetInstance()->DisableProcessing();
  1200. SyncPoint::GetInstance()->ClearAllCallBacks();
  1201. }
  1202. // 5. After second crash reopen the db with second corruption. Default family
  1203. // has higher log number than corrupted wal number.
  1204. //
  1205. // Case1: If avoid_flush_during_recovery = true, we persist a new
  1206. // MANIFEST with advanced log_numbers for some column families only after
  1207. // syncing the WAL. So during second crash, RocksDB will skip the corrupted
  1208. // WAL files as they have been moved to different folder. Since newly synced
  1209. // WAL file's sequence number (sentinel WriteBatch) will be the next
  1210. // sequence number immediately after the largest sequence number recovered
  1211. // from previous WALs and MANIFEST, db will be in consistent state and opens
  1212. // successfully.
  1213. //
  1214. // Case2: If avoid_flush_during_recovery = false, the corrupted WAL is below
  1215. // this number. So during a second crash after persisting the new MANIFEST,
  1216. // RocksDB will skip the corrupted WAL(s) because they are all below this
  1217. // bound. Therefore, we won't hit the "column family inconsistency" error
  1218. // message.
  1219. {
  1220. options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
  1221. ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
  1222. // Verify that data is not lost.
  1223. {
  1224. std::string v;
  1225. ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
  1226. ASSERT_EQ("dontcare", v);
  1227. v.clear();
  1228. ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
  1229. ASSERT_EQ("value" + std::to_string(0), v);
  1230. // Since it's corrupting second last wal, below key is not found.
  1231. v.clear();
  1232. ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
  1233. Status::NotFound());
  1234. }
  1235. for (auto* h : handles) {
  1236. delete h;
  1237. }
  1238. handles.clear();
  1239. CloseDb();
  1240. }
  1241. }
  1242. // In case of TransactionDB, it enables two-phase-commit. The prepare section of
  1243. // an uncommitted transaction always need to be kept. Even if we perform flush
  1244. // during recovery, we may still need to hold an old WAL. The
  1245. // min_log_number_to_keep won't change, and "Column family inconsistency: SST
  1246. // file contains data beyond the point of corruption" error will be hit, causing
  1247. // recovery to fail.
  1248. //
  1249. // After adding the fix, only after new WAL is synced, RocksDB persist a new
  1250. // MANIFEST with column families to ensure RocksDB is in consistent state.
  1251. // RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
  1252. // synced immediately afterwards. The sequence number of the sentinel
  1253. // WriteBatch will be the next sequence number immediately after the largest
  1254. // sequence number recovered from previous WALs and MANIFEST because of which DB
  1255. // will be in consistent state.
  1256. // If a future recovery starts from the new MANIFEST, then it means the new WAL
  1257. // is successfully synced. Due to the sentinel empty write batch at the
  1258. // beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
  1259. // If future recovery starts from the old MANIFEST, it means the writing the new
  1260. // MANIFEST failed. It won't have the "SST ahead of WAL" error.
  1261. //
  1262. // The combination of corrupting a WAL and injecting an error during subsequent
  1263. // re-open exposes the bug of prematurely persisting a new MANIFEST with
  1264. // advanced ColumnFamilyData::log_number.
  1265. TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
  1266. CloseDb();
  1267. Options options;
  1268. options.level_compaction_dynamic_level_bytes = false;
  1269. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1270. options.track_and_verify_wals_in_manifest =
  1271. track_and_verify_wals_in_manifest_;
  1272. options.avoid_flush_during_recovery = false;
  1273. options.env = env_.get();
  1274. ASSERT_OK(DestroyDB(dbname_, options));
  1275. options.create_if_missing = true;
  1276. options.max_write_buffer_number = 3;
  1277. Reopen(&options);
  1278. // Create cf test_cf_name.
  1279. ColumnFamilyHandle* cfh = nullptr;
  1280. const std::string test_cf_name = "test_cf";
  1281. Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
  1282. ASSERT_OK(s);
  1283. delete cfh;
  1284. CloseDb();
  1285. std::vector<ColumnFamilyDescriptor> cf_descs;
  1286. cf_descs.emplace_back(kDefaultColumnFamilyName, options);
  1287. cf_descs.emplace_back(test_cf_name, options);
  1288. std::vector<ColumnFamilyHandle*> handles;
  1289. TransactionDB* txn_db = nullptr;
  1290. TransactionDBOptions txn_db_opts;
  1291. // 1. Open and populate the DB. Write and flush default_cf several times to
  1292. // advance wal number so that some column families have advanced log_number
  1293. // while other don't.
  1294. {
  1295. ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
  1296. &handles, &txn_db));
  1297. auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
  1298. // Put cf1
  1299. ASSERT_OK(txn->Put(handles[1], "foo", "value"));
  1300. ASSERT_OK(txn->SetName("txn0"));
  1301. ASSERT_OK(txn->Prepare());
  1302. ASSERT_OK(txn_db->Flush(FlushOptions()));
  1303. delete txn;
  1304. txn = nullptr;
  1305. auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
  1306. assert(dbimpl);
  1307. // Put and flush cf0
  1308. for (int i = 0; i < 2; ++i) {
  1309. ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
  1310. "value" + std::to_string(i)));
  1311. ASSERT_OK(dbimpl->TEST_SwitchMemtable());
  1312. }
  1313. // Put cf1
  1314. txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
  1315. ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
  1316. ASSERT_OK(txn->Commit());
  1317. delete txn;
  1318. txn = nullptr;
  1319. for (auto* h : handles) {
  1320. delete h;
  1321. }
  1322. handles.clear();
  1323. delete txn_db;
  1324. }
  1325. // 2. Corrupt second last wal to emulate power reset which caused the DB to
  1326. // lose the un-synced WAL.
  1327. {
  1328. std::vector<uint64_t> file_nums;
  1329. GetSortedWalFiles(file_nums);
  1330. size_t size = file_nums.size();
  1331. assert(size >= 2);
  1332. uint64_t log_num = file_nums[size - 2];
  1333. CorruptFileWithTruncation(FileType::kWalFile, log_num,
  1334. /*bytes_to_truncate=*/8);
  1335. }
  1336. // 3. After first crash reopen the DB which contains corrupted WAL. Default
  1337. // family has higher log number than corrupted wal number. There may be old
  1338. // WAL files that it must not delete because they can contain data of
  1339. // uncommitted transactions. As a result, min_log_number_to_keep won't change.
  1340. {
  1341. SyncPoint::GetInstance()->DisableProcessing();
  1342. SyncPoint::GetInstance()->ClearAllCallBacks();
  1343. SyncPoint::GetInstance()->SetCallBack(
  1344. "DBImpl::Open::BeforeSyncWAL", [&](void* arg) {
  1345. auto* tmp_s = static_cast<Status*>(arg);
  1346. assert(tmp_s);
  1347. *tmp_s = Status::IOError("Injected");
  1348. });
  1349. SyncPoint::GetInstance()->EnableProcessing();
  1350. handles.clear();
  1351. s = TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles,
  1352. &txn_db);
  1353. ASSERT_TRUE(s.IsIOError());
  1354. ASSERT_EQ("IO error: Injected", s.ToString());
  1355. for (auto* h : handles) {
  1356. delete h;
  1357. }
  1358. CloseDb();
  1359. SyncPoint::GetInstance()->DisableProcessing();
  1360. SyncPoint::GetInstance()->ClearAllCallBacks();
  1361. }
  1362. // 4. Corrupt max_wal_num.
  1363. {
  1364. std::vector<uint64_t> file_nums;
  1365. GetSortedWalFiles(file_nums);
  1366. size_t size = file_nums.size();
  1367. uint64_t log_num = file_nums[size - 1];
  1368. CorruptFileWithTruncation(FileType::kWalFile, log_num);
  1369. }
  1370. // 5. After second crash reopen the db with second corruption. Default family
  1371. // has higher log number than corrupted wal number.
  1372. // We persist a new MANIFEST with advanced log_numbers for some column
  1373. // families only after syncing the WAL. So during second crash, RocksDB will
  1374. // skip the corrupted WAL files as they have been moved to different folder.
  1375. // Since newly synced WAL file's sequence number (sentinel WriteBatch) will be
  1376. // the next sequence number immediately after the largest sequence number
  1377. // recovered from previous WALs and MANIFEST, db will be in consistent state
  1378. // and opens successfully.
  1379. {
  1380. ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
  1381. &handles, &txn_db));
  1382. // Verify that data is not lost.
  1383. {
  1384. std::string v;
  1385. // Key not visible since it's not committed.
  1386. ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
  1387. Status::NotFound());
  1388. v.clear();
  1389. ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
  1390. ASSERT_EQ("value" + std::to_string(0), v);
  1391. // Last WAL is corrupted which contains two keys below.
  1392. v.clear();
  1393. ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
  1394. Status::NotFound());
  1395. v.clear();
  1396. ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
  1397. Status::NotFound());
  1398. }
  1399. for (auto* h : handles) {
  1400. delete h;
  1401. }
  1402. delete txn_db;
  1403. }
  1404. }
  1405. // This test is similar to
  1406. // CrashDuringRecoveryWithCorruptionTest.CrashDuringRecovery except it calls
  1407. // flush and corrupts Last WAL. It calls flush to sync some of the WALs and
  1408. // remaining are unsyned one of which is then corrupted to simulate crash.
  1409. //
  1410. // In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
  1411. // won't flush the data from WAL to L0 for all column families if possible. As a
  1412. // result, not all column families can increase their log_numbers, and
  1413. // min_log_number_to_keep won't change.
  1414. // It may prematurely persist a new MANIFEST even before we can declare the DB
  1415. // is in consistent state after recovery (this is when the new WAL is synced)
  1416. // and advances log_numbers for some column families.
  1417. //
  1418. // If there is power failure before we sync the new WAL, we will end up in
  1419. // a situation in which after persisting the MANIFEST, RocksDB will see some
  1420. // column families' log_numbers larger than the corrupted wal, and
  1421. // "Column family inconsistency: SST file contains data beyond the point of
  1422. // corruption" error will be hit, causing recovery to fail.
  1423. //
  1424. // After adding the fix, only after new WAL is synced, RocksDB persist a new
  1425. // MANIFEST with column families to ensure RocksDB is in consistent state.
  1426. // RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
  1427. // synced immediately afterwards. The sequence number of the sentinel
  1428. // WriteBatch will be the next sequence number immediately after the largest
  1429. // sequence number recovered from previous WALs and MANIFEST because of which DB
  1430. // will be in consistent state.
  1431. // If a future recovery starts from the new MANIFEST, then it means the new WAL
  1432. // is successfully synced. Due to the sentinel empty write batch at the
  1433. // beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
  1434. // If future recovery starts from the old MANIFEST, it means the writing the new
  1435. // MANIFEST failed. It won't have the "SST ahead of WAL" error.
  1436. // The combination of corrupting a WAL and injecting an error during subsequent
  1437. // re-open exposes the bug of prematurely persisting a new MANIFEST with
  1438. // advanced ColumnFamilyData::log_number.
  1439. TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
  1440. CloseDb();
  1441. Options options;
  1442. options.level_compaction_dynamic_level_bytes = false;
  1443. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1444. options.avoid_flush_during_recovery = false;
  1445. options.env = env_.get();
  1446. options.create_if_missing = true;
  1447. ASSERT_OK(DestroyDB(dbname_, options));
  1448. Reopen(&options);
  1449. ColumnFamilyHandle* cfh = nullptr;
  1450. const std::string test_cf_name = "test_cf";
  1451. Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
  1452. ASSERT_OK(s);
  1453. delete cfh;
  1454. CloseDb();
  1455. std::vector<ColumnFamilyDescriptor> cf_descs;
  1456. cf_descs.emplace_back(kDefaultColumnFamilyName, options);
  1457. cf_descs.emplace_back(test_cf_name, options);
  1458. std::vector<ColumnFamilyHandle*> handles;
  1459. {
  1460. ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
  1461. // Write one key to test_cf.
  1462. ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
  1463. // Write to default_cf and flush this cf several times to advance wal
  1464. // number.
  1465. for (int i = 0; i < 2; ++i) {
  1466. ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
  1467. "value" + std::to_string(i)));
  1468. ASSERT_OK(db_->Flush(FlushOptions()));
  1469. }
  1470. ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare"));
  1471. for (auto* h : handles) {
  1472. delete h;
  1473. }
  1474. handles.clear();
  1475. CloseDb();
  1476. }
  1477. // Corrupt second last un-syned wal file to emulate power reset which
  1478. // caused the DB to lose the un-synced WAL.
  1479. {
  1480. std::vector<uint64_t> file_nums;
  1481. GetSortedWalFiles(file_nums);
  1482. size_t size = file_nums.size();
  1483. uint64_t log_num = file_nums[size - 1];
  1484. CorruptFileWithTruncation(FileType::kWalFile, log_num,
  1485. /*bytes_to_truncate=*/8);
  1486. }
  1487. // Fault is injected to fail the recovery.
  1488. {
  1489. SyncPoint::GetInstance()->DisableProcessing();
  1490. SyncPoint::GetInstance()->ClearAllCallBacks();
  1491. SyncPoint::GetInstance()->SetCallBack(
  1492. "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
  1493. auto* tmp_s = static_cast<Status*>(arg);
  1494. assert(tmp_s);
  1495. *tmp_s = Status::IOError("Injected");
  1496. });
  1497. SyncPoint::GetInstance()->EnableProcessing();
  1498. handles.clear();
  1499. options.avoid_flush_during_recovery = true;
  1500. s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
  1501. ASSERT_TRUE(s.IsIOError());
  1502. ASSERT_EQ("IO error: Injected", s.ToString());
  1503. for (auto* h : handles) {
  1504. delete h;
  1505. }
  1506. CloseDb();
  1507. SyncPoint::GetInstance()->DisableProcessing();
  1508. SyncPoint::GetInstance()->ClearAllCallBacks();
  1509. }
  1510. // Reopen db again
  1511. {
  1512. options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
  1513. ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
  1514. // Verify that data is not lost.
  1515. {
  1516. std::string v;
  1517. ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
  1518. ASSERT_EQ("dontcare", v);
  1519. for (int i = 0; i < 2; ++i) {
  1520. v.clear();
  1521. ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
  1522. ASSERT_EQ("value" + std::to_string(i), v);
  1523. }
  1524. // Since it's corrupting last wal after Flush, below key is not found.
  1525. v.clear();
  1526. ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
  1527. Status::NotFound());
  1528. }
  1529. for (auto* h : handles) {
  1530. delete h;
  1531. }
  1532. }
  1533. }
  1534. } // namespace ROCKSDB_NAMESPACE
  1535. int main(int argc, char** argv) {
  1536. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1537. ::testing::InitGoogleTest(&argc, argv);
  1538. RegisterCustomObjects(argc, argv);
  1539. return RUN_ALL_TESTS();
  1540. }