fault_injection_test.cc 17 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright 2014 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. // This test uses a custom Env to keep track of the state of a filesystem as of
  10. // the last "sync". It then checks for data loss errors by purposely dropping
  11. // file data (or entire files) not protected by a "sync".
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/log_format.h"
  14. #include "db/version_set.h"
  15. #include "env/mock_env.h"
  16. #include "file/filename.h"
  17. #include "logging/logging.h"
  18. #include "rocksdb/cache.h"
  19. #include "rocksdb/db.h"
  20. #include "rocksdb/env.h"
  21. #include "rocksdb/table.h"
  22. #include "rocksdb/write_batch.h"
  23. #include "test_util/fault_injection_test_env.h"
  24. #include "test_util/sync_point.h"
  25. #include "test_util/testharness.h"
  26. #include "test_util/testutil.h"
  27. #include "util/mutexlock.h"
  28. namespace ROCKSDB_NAMESPACE {
  29. static const int kValueSize = 1000;
  30. static const int kMaxNumValues = 2000;
  31. static const size_t kNumIterations = 3;
  32. enum FaultInjectionOptionConfig {
  33. kDefault,
  34. kDifferentDataDir,
  35. kWalDir,
  36. kSyncWal,
  37. kWalDirSyncWal,
  38. kMultiLevels,
  39. kEnd,
  40. };
  41. class FaultInjectionTest
  42. : public testing::Test,
  43. public testing::WithParamInterface<std::tuple<
  44. bool, FaultInjectionOptionConfig, FaultInjectionOptionConfig>> {
  45. protected:
  46. int option_config_;
  47. int non_inclusive_end_range_; // kEnd or equivalent to that
  48. // When need to make sure data is persistent, sync WAL
  49. bool sync_use_wal_;
  50. // When need to make sure data is persistent, call DB::CompactRange()
  51. bool sync_use_compact_;
  52. bool sequential_order_;
  53. protected:
  54. public:
  55. enum ExpectedVerifResult { kValExpectFound, kValExpectNoError };
  56. enum ResetMethod {
  57. kResetDropUnsyncedData,
  58. kResetDropRandomUnsyncedData,
  59. kResetDeleteUnsyncedFiles,
  60. kResetDropAndDeleteUnsynced
  61. };
  62. std::unique_ptr<Env> base_env_;
  63. FaultInjectionTestEnv* env_;
  64. std::string dbname_;
  65. std::shared_ptr<Cache> tiny_cache_;
  66. Options options_;
  67. DB* db_;
  68. FaultInjectionTest()
  69. : option_config_(std::get<1>(GetParam())),
  70. non_inclusive_end_range_(std::get<2>(GetParam())),
  71. sync_use_wal_(false),
  72. sync_use_compact_(true),
  73. base_env_(nullptr),
  74. env_(nullptr),
  75. db_(nullptr) {}
  76. ~FaultInjectionTest() override {
  77. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  78. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  79. }
  80. bool ChangeOptions() {
  81. option_config_++;
  82. if (option_config_ >= non_inclusive_end_range_) {
  83. return false;
  84. } else {
  85. if (option_config_ == kMultiLevels) {
  86. base_env_.reset(new MockEnv(Env::Default()));
  87. }
  88. return true;
  89. }
  90. }
  91. // Return the current option configuration.
  92. Options CurrentOptions() {
  93. sync_use_wal_ = false;
  94. sync_use_compact_ = true;
  95. Options options;
  96. switch (option_config_) {
  97. case kWalDir:
  98. options.wal_dir = test::PerThreadDBPath(env_, "fault_test_wal");
  99. break;
  100. case kDifferentDataDir:
  101. options.db_paths.emplace_back(
  102. test::PerThreadDBPath(env_, "fault_test_data"), 1000000U);
  103. break;
  104. case kSyncWal:
  105. sync_use_wal_ = true;
  106. sync_use_compact_ = false;
  107. break;
  108. case kWalDirSyncWal:
  109. options.wal_dir = test::PerThreadDBPath(env_, "/fault_test_wal");
  110. sync_use_wal_ = true;
  111. sync_use_compact_ = false;
  112. break;
  113. case kMultiLevels:
  114. options.write_buffer_size = 64 * 1024;
  115. options.target_file_size_base = 64 * 1024;
  116. options.level0_file_num_compaction_trigger = 2;
  117. options.level0_slowdown_writes_trigger = 2;
  118. options.level0_stop_writes_trigger = 4;
  119. options.max_bytes_for_level_base = 128 * 1024;
  120. options.max_write_buffer_number = 2;
  121. options.max_background_compactions = 8;
  122. options.max_background_flushes = 8;
  123. sync_use_wal_ = true;
  124. sync_use_compact_ = false;
  125. break;
  126. default:
  127. break;
  128. }
  129. return options;
  130. }
  131. Status NewDB() {
  132. assert(db_ == nullptr);
  133. assert(tiny_cache_ == nullptr);
  134. assert(env_ == nullptr);
  135. env_ =
  136. new FaultInjectionTestEnv(base_env_ ? base_env_.get() : Env::Default());
  137. options_ = CurrentOptions();
  138. options_.env = env_;
  139. options_.paranoid_checks = true;
  140. BlockBasedTableOptions table_options;
  141. tiny_cache_ = NewLRUCache(100);
  142. table_options.block_cache = tiny_cache_;
  143. options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
  144. dbname_ = test::PerThreadDBPath("fault_test");
  145. EXPECT_OK(DestroyDB(dbname_, options_));
  146. options_.create_if_missing = true;
  147. Status s = OpenDB();
  148. options_.create_if_missing = false;
  149. return s;
  150. }
  151. void SetUp() override {
  152. sequential_order_ = std::get<0>(GetParam());
  153. ASSERT_OK(NewDB());
  154. }
  155. void TearDown() override {
  156. CloseDB();
  157. Status s = DestroyDB(dbname_, options_);
  158. delete env_;
  159. env_ = nullptr;
  160. tiny_cache_.reset();
  161. ASSERT_OK(s);
  162. }
  163. void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
  164. std::string key_space, value_space;
  165. WriteBatch batch;
  166. for (int i = start_idx; i < start_idx + num_vals; i++) {
  167. Slice key = Key(i, &key_space);
  168. batch.Clear();
  169. batch.Put(key, Value(i, &value_space));
  170. ASSERT_OK(db_->Write(write_options, &batch));
  171. }
  172. }
  173. Status ReadValue(int i, std::string* val) const {
  174. std::string key_space, value_space;
  175. Slice key = Key(i, &key_space);
  176. Value(i, &value_space);
  177. ReadOptions options;
  178. return db_->Get(options, key, val);
  179. }
  180. Status Verify(int start_idx, int num_vals,
  181. ExpectedVerifResult expected) const {
  182. std::string val;
  183. std::string value_space;
  184. Status s;
  185. for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
  186. Value(i, &value_space);
  187. s = ReadValue(i, &val);
  188. if (s.ok()) {
  189. EXPECT_EQ(value_space, val);
  190. }
  191. if (expected == kValExpectFound) {
  192. if (!s.ok()) {
  193. fprintf(stderr, "Error when read %dth record (expect found): %s\n", i,
  194. s.ToString().c_str());
  195. return s;
  196. }
  197. } else if (!s.ok() && !s.IsNotFound()) {
  198. fprintf(stderr, "Error when read %dth record: %s\n", i,
  199. s.ToString().c_str());
  200. return s;
  201. }
  202. }
  203. return Status::OK();
  204. }
  205. // Return the ith key
  206. Slice Key(int i, std::string* storage) const {
  207. unsigned long long num = i;
  208. if (!sequential_order_) {
  209. // random transfer
  210. const int m = 0x5bd1e995;
  211. num *= m;
  212. num ^= num << 24;
  213. }
  214. char buf[100];
  215. snprintf(buf, sizeof(buf), "%016d", static_cast<int>(num));
  216. storage->assign(buf, strlen(buf));
  217. return Slice(*storage);
  218. }
  219. // Return the value to associate with the specified key
  220. Slice Value(int k, std::string* storage) const {
  221. Random r(k);
  222. return test::RandomString(&r, kValueSize, storage);
  223. }
  224. void CloseDB() {
  225. delete db_;
  226. db_ = nullptr;
  227. }
  228. Status OpenDB() {
  229. CloseDB();
  230. env_->ResetState();
  231. Status s = DB::Open(options_, dbname_, &db_);
  232. assert(db_ != nullptr);
  233. return s;
  234. }
  235. void DeleteAllData() {
  236. Iterator* iter = db_->NewIterator(ReadOptions());
  237. WriteOptions options;
  238. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  239. ASSERT_OK(db_->Delete(WriteOptions(), iter->key()));
  240. }
  241. delete iter;
  242. FlushOptions flush_options;
  243. flush_options.wait = true;
  244. db_->Flush(flush_options);
  245. }
  246. // rnd cannot be null for kResetDropRandomUnsyncedData
  247. void ResetDBState(ResetMethod reset_method, Random* rnd = nullptr) {
  248. env_->AssertNoOpenFile();
  249. switch (reset_method) {
  250. case kResetDropUnsyncedData:
  251. ASSERT_OK(env_->DropUnsyncedFileData());
  252. break;
  253. case kResetDropRandomUnsyncedData:
  254. ASSERT_OK(env_->DropRandomUnsyncedFileData(rnd));
  255. break;
  256. case kResetDeleteUnsyncedFiles:
  257. ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
  258. break;
  259. case kResetDropAndDeleteUnsynced:
  260. ASSERT_OK(env_->DropUnsyncedFileData());
  261. ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
  262. break;
  263. default:
  264. assert(false);
  265. }
  266. }
  267. void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) {
  268. DeleteAllData();
  269. WriteOptions write_options;
  270. write_options.sync = sync_use_wal_;
  271. Build(write_options, 0, num_pre_sync);
  272. if (sync_use_compact_) {
  273. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  274. }
  275. write_options.sync = false;
  276. Build(write_options, num_pre_sync, num_post_sync);
  277. }
  278. void PartialCompactTestReopenWithFault(ResetMethod reset_method,
  279. int num_pre_sync, int num_post_sync,
  280. Random* rnd = nullptr) {
  281. env_->SetFilesystemActive(false);
  282. CloseDB();
  283. ResetDBState(reset_method, rnd);
  284. ASSERT_OK(OpenDB());
  285. ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
  286. ASSERT_OK(Verify(num_pre_sync, num_post_sync,
  287. FaultInjectionTest::kValExpectNoError));
  288. WaitCompactionFinish();
  289. ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
  290. ASSERT_OK(Verify(num_pre_sync, num_post_sync,
  291. FaultInjectionTest::kValExpectNoError));
  292. }
  293. void NoWriteTestPreFault() {
  294. }
  295. void NoWriteTestReopenWithFault(ResetMethod reset_method) {
  296. CloseDB();
  297. ResetDBState(reset_method);
  298. ASSERT_OK(OpenDB());
  299. }
  300. void WaitCompactionFinish() {
  301. static_cast<DBImpl*>(db_->GetRootDB())->TEST_WaitForCompact();
  302. ASSERT_OK(db_->Put(WriteOptions(), "", ""));
  303. }
  304. };
  305. class FaultInjectionTestSplitted : public FaultInjectionTest {};
  306. TEST_P(FaultInjectionTestSplitted, FaultTest) {
  307. do {
  308. Random rnd(301);
  309. for (size_t idx = 0; idx < kNumIterations; idx++) {
  310. int num_pre_sync = rnd.Uniform(kMaxNumValues);
  311. int num_post_sync = rnd.Uniform(kMaxNumValues);
  312. PartialCompactTestPreFault(num_pre_sync, num_post_sync);
  313. PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync,
  314. num_post_sync);
  315. NoWriteTestPreFault();
  316. NoWriteTestReopenWithFault(kResetDropUnsyncedData);
  317. PartialCompactTestPreFault(num_pre_sync, num_post_sync);
  318. PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData,
  319. num_pre_sync, num_post_sync, &rnd);
  320. NoWriteTestPreFault();
  321. NoWriteTestReopenWithFault(kResetDropUnsyncedData);
  322. // Setting a separate data path won't pass the test as we don't sync
  323. // it after creating new files,
  324. PartialCompactTestPreFault(num_pre_sync, num_post_sync);
  325. PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced,
  326. num_pre_sync, num_post_sync);
  327. NoWriteTestPreFault();
  328. NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
  329. PartialCompactTestPreFault(num_pre_sync, num_post_sync);
  330. // No new files created so we expect all values since no files will be
  331. // dropped.
  332. PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync,
  333. num_post_sync);
  334. NoWriteTestPreFault();
  335. NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles);
  336. }
  337. } while (ChangeOptions());
  338. }
  339. // Previous log file is not fsynced if sync is forced after log rolling.
  340. TEST_P(FaultInjectionTest, WriteOptionSyncTest) {
  341. test::SleepingBackgroundTask sleeping_task_low;
  342. env_->SetBackgroundThreads(1, Env::HIGH);
  343. // Block the job queue to prevent flush job from running.
  344. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
  345. Env::Priority::HIGH);
  346. sleeping_task_low.WaitUntilSleeping();
  347. WriteOptions write_options;
  348. write_options.sync = false;
  349. std::string key_space, value_space;
  350. ASSERT_OK(
  351. db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
  352. FlushOptions flush_options;
  353. flush_options.wait = false;
  354. ASSERT_OK(db_->Flush(flush_options));
  355. write_options.sync = true;
  356. ASSERT_OK(
  357. db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
  358. db_->FlushWAL(false);
  359. env_->SetFilesystemActive(false);
  360. NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
  361. sleeping_task_low.WakeUp();
  362. sleeping_task_low.WaitUntilDone();
  363. ASSERT_OK(OpenDB());
  364. std::string val;
  365. Value(2, &value_space);
  366. ASSERT_OK(ReadValue(2, &val));
  367. ASSERT_EQ(value_space, val);
  368. Value(1, &value_space);
  369. ASSERT_OK(ReadValue(1, &val));
  370. ASSERT_EQ(value_space, val);
  371. }
  372. TEST_P(FaultInjectionTest, UninstalledCompaction) {
  373. options_.target_file_size_base = 32 * 1024;
  374. options_.write_buffer_size = 100 << 10; // 100KB
  375. options_.level0_file_num_compaction_trigger = 6;
  376. options_.level0_stop_writes_trigger = 1 << 10;
  377. options_.level0_slowdown_writes_trigger = 1 << 10;
  378. options_.max_background_compactions = 1;
  379. OpenDB();
  380. if (!sequential_order_) {
  381. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  382. {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
  383. {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
  384. {"FaultInjectionTest::FaultTest:2",
  385. "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
  386. });
  387. }
  388. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  389. int kNumKeys = 1000;
  390. Build(WriteOptions(), 0, kNumKeys);
  391. FlushOptions flush_options;
  392. flush_options.wait = true;
  393. db_->Flush(flush_options);
  394. ASSERT_OK(db_->Put(WriteOptions(), "", ""));
  395. TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
  396. TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
  397. env_->SetFilesystemActive(false);
  398. TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
  399. CloseDB();
  400. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  401. ResetDBState(kResetDropUnsyncedData);
  402. std::atomic<bool> opened(false);
  403. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  404. "DBImpl::Open:Opened", [&](void* /*arg*/) { opened.store(true); });
  405. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  406. "DBImpl::BGWorkCompaction",
  407. [&](void* /*arg*/) { ASSERT_TRUE(opened.load()); });
  408. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  409. ASSERT_OK(OpenDB());
  410. ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
  411. WaitCompactionFinish();
  412. ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
  413. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  414. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  415. }
  416. TEST_P(FaultInjectionTest, ManualLogSyncTest) {
  417. test::SleepingBackgroundTask sleeping_task_low;
  418. env_->SetBackgroundThreads(1, Env::HIGH);
  419. // Block the job queue to prevent flush job from running.
  420. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
  421. Env::Priority::HIGH);
  422. sleeping_task_low.WaitUntilSleeping();
  423. WriteOptions write_options;
  424. write_options.sync = false;
  425. std::string key_space, value_space;
  426. ASSERT_OK(
  427. db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
  428. FlushOptions flush_options;
  429. flush_options.wait = false;
  430. ASSERT_OK(db_->Flush(flush_options));
  431. ASSERT_OK(
  432. db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
  433. ASSERT_OK(db_->FlushWAL(true));
  434. env_->SetFilesystemActive(false);
  435. NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
  436. sleeping_task_low.WakeUp();
  437. sleeping_task_low.WaitUntilDone();
  438. ASSERT_OK(OpenDB());
  439. std::string val;
  440. Value(2, &value_space);
  441. ASSERT_OK(ReadValue(2, &val));
  442. ASSERT_EQ(value_space, val);
  443. Value(1, &value_space);
  444. ASSERT_OK(ReadValue(1, &val));
  445. ASSERT_EQ(value_space, val);
  446. }
  447. TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
  448. ReadOptions ro;
  449. Options options = CurrentOptions();
  450. options.env = env_;
  451. WriteOptions wo;
  452. wo.sync = true;
  453. wo.disableWAL = false;
  454. WriteBatch batch;
  455. batch.Put("cats", "dogs");
  456. batch.MarkWalTerminationPoint();
  457. batch.Put("boys", "girls");
  458. ASSERT_OK(db_->Write(wo, &batch));
  459. env_->SetFilesystemActive(false);
  460. NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
  461. ASSERT_OK(OpenDB());
  462. std::string val;
  463. ASSERT_OK(db_->Get(ro, "cats", &val));
  464. ASSERT_EQ("dogs", val);
  465. ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
  466. }
  467. INSTANTIATE_TEST_CASE_P(
  468. FaultTest, FaultInjectionTest,
  469. ::testing::Values(std::make_tuple(false, kDefault, kEnd),
  470. std::make_tuple(true, kDefault, kEnd)));
  471. INSTANTIATE_TEST_CASE_P(
  472. FaultTest, FaultInjectionTestSplitted,
  473. ::testing::Values(std::make_tuple(false, kDefault, kSyncWal),
  474. std::make_tuple(true, kDefault, kSyncWal),
  475. std::make_tuple(false, kSyncWal, kEnd),
  476. std::make_tuple(true, kSyncWal, kEnd)));
  477. } // namespace ROCKSDB_NAMESPACE
  478. int main(int argc, char** argv) {
  479. ::testing::InitGoogleTest(&argc, argv);
  480. return RUN_ALL_TESTS();
  481. }