db_wal_test.cc 113 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_test_util.h"
  10. #include "db/db_with_timestamp_test_util.h"
  11. #include "options/options_helper.h"
  12. #include "port/port.h"
  13. #include "port/stack_trace.h"
  14. #include "rocksdb/file_system.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/defer.h"
  17. #include "util/udt_util.h"
  18. #include "utilities/fault_injection_env.h"
  19. #include "utilities/fault_injection_fs.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class DBWALTestBase : public DBTestBase {
  22. protected:
  23. explicit DBWALTestBase(const std::string& dir_name)
  24. : DBTestBase(dir_name, /*env_do_fsync=*/true) {}
  25. #if defined(ROCKSDB_PLATFORM_POSIX)
  26. public:
  27. #if defined(ROCKSDB_FALLOCATE_PRESENT)
  28. bool IsFallocateSupported() {
  29. // Test fallocate support of running file system.
  30. // Skip this test if fallocate is not supported.
  31. std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
  32. int fd = -1;
  33. do {
  34. fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  35. } while (fd < 0 && errno == EINTR);
  36. assert(fd > 0);
  37. int alloc_status = fallocate(fd, 0, 0, 1);
  38. int err_number = errno;
  39. close(fd);
  40. assert(env_->DeleteFile(fname_test_fallocate) == Status::OK());
  41. if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
  42. fprintf(stderr, "Skipped preallocated space check: %s\n",
  43. errnoStr(err_number).c_str());
  44. return false;
  45. }
  46. assert(alloc_status == 0);
  47. return true;
  48. }
  49. #endif // ROCKSDB_FALLOCATE_PRESENT
  50. uint64_t GetAllocatedFileSize(std::string file_name) {
  51. struct stat sbuf;
  52. int err = stat(file_name.c_str(), &sbuf);
  53. assert(err == 0);
  54. return sbuf.st_blocks * 512;
  55. }
  56. #endif // ROCKSDB_PLATFORM_POSIX
  57. };
  58. class DBWALTest : public DBWALTestBase {
  59. public:
  60. DBWALTest() : DBWALTestBase("/db_wal_test") {}
  61. };
  62. // A SpecialEnv enriched to give more insight about deleted files
  63. class EnrichedSpecialEnv : public SpecialEnv {
  64. public:
  65. explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
  66. Status NewSequentialFile(const std::string& f,
  67. std::unique_ptr<SequentialFile>* r,
  68. const EnvOptions& soptions) override {
  69. InstrumentedMutexLock l(&env_mutex_);
  70. if (f == skipped_wal) {
  71. deleted_wal_reopened = true;
  72. if (IsWAL(f) && largest_deleted_wal.size() != 0 &&
  73. f.compare(largest_deleted_wal) <= 0) {
  74. gap_in_wals = true;
  75. }
  76. }
  77. return SpecialEnv::NewSequentialFile(f, r, soptions);
  78. }
  79. Status DeleteFile(const std::string& fname) override {
  80. if (IsWAL(fname)) {
  81. deleted_wal_cnt++;
  82. InstrumentedMutexLock l(&env_mutex_);
  83. // If this is the first WAL, remember its name and skip deleting it. We
  84. // remember its name partly because the application might attempt to
  85. // delete the file again.
  86. if (skipped_wal.size() != 0 && skipped_wal != fname) {
  87. if (largest_deleted_wal.size() == 0 ||
  88. largest_deleted_wal.compare(fname) < 0) {
  89. largest_deleted_wal = fname;
  90. }
  91. } else {
  92. skipped_wal = fname;
  93. return Status::OK();
  94. }
  95. }
  96. return SpecialEnv::DeleteFile(fname);
  97. }
  98. bool IsWAL(const std::string& fname) {
  99. // printf("iswal %s\n", fname.c_str());
  100. return fname.compare(fname.size() - 3, 3, "log") == 0;
  101. }
  102. InstrumentedMutex env_mutex_;
  103. // the wal whose actual delete was skipped by the env
  104. std::string skipped_wal;
  105. // the largest WAL that was requested to be deleted
  106. std::string largest_deleted_wal;
  107. // number of WALs that were successfully deleted
  108. std::atomic<size_t> deleted_wal_cnt = {0};
  109. // the WAL whose delete from fs was skipped is reopened during recovery
  110. std::atomic<bool> deleted_wal_reopened = {false};
  111. // whether a gap in the WALs was detected during recovery
  112. std::atomic<bool> gap_in_wals = {false};
  113. };
  114. class DBWALTestWithEnrichedEnv : public DBTestBase {
  115. public:
  116. DBWALTestWithEnrichedEnv()
  117. : DBTestBase("db_wal_test", /*env_do_fsync=*/true) {
  118. enriched_env_ = new EnrichedSpecialEnv(env_->target());
  119. auto options = CurrentOptions();
  120. options.env = enriched_env_;
  121. options.allow_2pc = true;
  122. Reopen(options);
  123. delete env_;
  124. // to be deleted by the parent class
  125. env_ = enriched_env_;
  126. }
  127. protected:
  128. EnrichedSpecialEnv* enriched_env_;
  129. };
  130. // Test that the recovery would successfully avoid the gaps between the logs.
  131. // One known scenario that could cause this is that the application issue the
  132. // WAL deletion out of order. For the sake of simplicity in the test, here we
  133. // create the gap by manipulating the env to skip deletion of the first WAL but
  134. // not the ones after it.
  135. TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
  136. auto options = last_options_;
  137. // To cause frequent WAL deletion
  138. options.write_buffer_size = 128;
  139. Reopen(options);
  140. SyncPoint::GetInstance()->LoadDependency(
  141. {{"DBImpl::PurgeObsoleteFiles:End",
  142. "DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"}});
  143. SyncPoint::GetInstance()->EnableProcessing();
  144. WriteOptions writeOpt = WriteOptions();
  145. for (int i = 0; i < 128 * 5; i++) {
  146. ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
  147. }
  148. FlushOptions fo;
  149. fo.wait = true;
  150. ASSERT_OK(db_->Flush(fo));
  151. TEST_SYNC_POINT("DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush");
  152. // some wals are deleted
  153. ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
  154. // but not the first one
  155. ASSERT_NE(0, enriched_env_->skipped_wal.size());
  156. // Test that the WAL that was not deleted will be skipped during recovery
  157. options = last_options_;
  158. Reopen(options);
  159. ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
  160. ASSERT_FALSE(enriched_env_->gap_in_wals);
  161. SyncPoint::GetInstance()->DisableProcessing();
  162. }
  163. TEST_F(DBWALTest, WAL) {
  164. do {
  165. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  166. WriteOptions writeOpt = WriteOptions();
  167. writeOpt.disableWAL = true;
  168. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
  169. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  170. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  171. ASSERT_EQ("v1", Get(1, "foo"));
  172. ASSERT_EQ("v1", Get(1, "bar"));
  173. writeOpt.disableWAL = false;
  174. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
  175. writeOpt.disableWAL = true;
  176. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
  177. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  178. // Both value's should be present.
  179. ASSERT_EQ("v2", Get(1, "bar"));
  180. ASSERT_EQ("v2", Get(1, "foo"));
  181. writeOpt.disableWAL = true;
  182. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
  183. writeOpt.disableWAL = false;
  184. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
  185. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  186. // again both values should be present.
  187. ASSERT_EQ("v3", Get(1, "foo"));
  188. ASSERT_EQ("v3", Get(1, "bar"));
  189. } while (ChangeWalOptions());
  190. }
  191. TEST_F(DBWALTest, RollLog) {
  192. do {
  193. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  194. ASSERT_OK(Put(1, "foo", "v1"));
  195. ASSERT_OK(Put(1, "baz", "v5"));
  196. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  197. for (int i = 0; i < 10; i++) {
  198. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  199. }
  200. ASSERT_OK(Put(1, "foo", "v4"));
  201. for (int i = 0; i < 10; i++) {
  202. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  203. }
  204. } while (ChangeWalOptions());
  205. }
  206. TEST_F(DBWALTest, SyncWALNotBlockWrite) {
  207. Options options = CurrentOptions();
  208. options.max_write_buffer_number = 4;
  209. DestroyAndReopen(options);
  210. ASSERT_OK(Put("foo1", "bar1"));
  211. ASSERT_OK(Put("foo5", "bar5"));
  212. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  213. {"WritableFileWriter::SyncWithoutFlush:1",
  214. "DBWALTest::SyncWALNotBlockWrite:1"},
  215. {"DBWALTest::SyncWALNotBlockWrite:2",
  216. "WritableFileWriter::SyncWithoutFlush:2"},
  217. });
  218. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  219. ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
  220. TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
  221. ASSERT_OK(Put("foo2", "bar2"));
  222. ASSERT_OK(Put("foo3", "bar3"));
  223. FlushOptions fo;
  224. fo.wait = false;
  225. ASSERT_OK(db_->Flush(fo));
  226. ASSERT_OK(Put("foo4", "bar4"));
  227. TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
  228. thread.join();
  229. ASSERT_EQ(Get("foo1"), "bar1");
  230. ASSERT_EQ(Get("foo2"), "bar2");
  231. ASSERT_EQ(Get("foo3"), "bar3");
  232. ASSERT_EQ(Get("foo4"), "bar4");
  233. ASSERT_EQ(Get("foo5"), "bar5");
  234. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  235. }
  236. TEST_F(DBWALTest, SyncWALNotWaitWrite) {
  237. ASSERT_OK(Put("foo1", "bar1"));
  238. ASSERT_OK(Put("foo3", "bar3"));
  239. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  240. {"SpecialEnv::SpecialWalFile::Append:1",
  241. "DBWALTest::SyncWALNotWaitWrite:1"},
  242. {"DBWALTest::SyncWALNotWaitWrite:2",
  243. "SpecialEnv::SpecialWalFile::Append:2"},
  244. });
  245. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  246. ROCKSDB_NAMESPACE::port::Thread thread(
  247. [&]() { ASSERT_OK(Put("foo2", "bar2")); });
  248. // Moving this to SyncWAL before the actual fsync
  249. // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
  250. ASSERT_OK(db_->SyncWAL());
  251. // Moving this to SyncWAL after actual fsync
  252. // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
  253. thread.join();
  254. ASSERT_EQ(Get("foo1"), "bar1");
  255. ASSERT_EQ(Get("foo2"), "bar2");
  256. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  257. }
  258. TEST_F(DBWALTest, Recover) {
  259. do {
  260. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  261. ASSERT_OK(Put(1, "foo", "v1"));
  262. ASSERT_OK(Put(1, "baz", "v5"));
  263. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  264. ASSERT_EQ("v1", Get(1, "foo"));
  265. ASSERT_EQ("v1", Get(1, "foo"));
  266. ASSERT_EQ("v5", Get(1, "baz"));
  267. ASSERT_OK(Put(1, "bar", "v2"));
  268. ASSERT_OK(Put(1, "foo", "v3"));
  269. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  270. ASSERT_EQ("v3", Get(1, "foo"));
  271. ASSERT_OK(Put(1, "foo", "v4"));
  272. ASSERT_EQ("v4", Get(1, "foo"));
  273. ASSERT_EQ("v2", Get(1, "bar"));
  274. ASSERT_EQ("v5", Get(1, "baz"));
  275. } while (ChangeWalOptions());
  276. }
  277. class DBWALTestWithTimestamp
  278. : public DBBasicTestWithTimestampBase,
  279. public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
  280. public:
  281. DBWALTestWithTimestamp()
  282. : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
  283. Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
  284. const Options& ts_options, bool persist_udt,
  285. bool avoid_flush_during_recovery = false) {
  286. Options default_options = CurrentOptions();
  287. default_options.allow_concurrent_memtable_write =
  288. persist_udt ? true : false;
  289. DestroyAndReopen(default_options);
  290. CreateColumnFamilies(cfs, ts_options);
  291. return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
  292. avoid_flush_during_recovery);
  293. }
  294. Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
  295. Options ts_options, bool persist_udt,
  296. bool avoid_flush_during_recovery = false) {
  297. Options default_options = CurrentOptions();
  298. default_options.create_if_missing = false;
  299. default_options.allow_concurrent_memtable_write =
  300. persist_udt ? true : false;
  301. default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
  302. ts_options.create_if_missing = false;
  303. std::vector<Options> cf_options(cfs.size(), ts_options);
  304. std::vector<std::string> cfs_plus_default = cfs;
  305. cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
  306. cf_options.insert(cf_options.begin(), default_options);
  307. Close();
  308. return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
  309. }
  310. Status Put(uint32_t cf, const Slice& key, const Slice& ts,
  311. const Slice& value) {
  312. WriteOptions write_opts;
  313. return db_->Put(write_opts, handles_[cf], key, ts, value);
  314. }
  315. void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
  316. const std::string& expected_value,
  317. const std::string& expected_ts) {
  318. std::string actual_value;
  319. std::string actual_ts;
  320. ASSERT_OK(
  321. db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
  322. ASSERT_EQ(expected_value, actual_value);
  323. ASSERT_EQ(expected_ts, actual_ts);
  324. }
  325. };
  326. TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
  327. // Set up the option that enables user defined timestmp size.
  328. std::string ts1;
  329. PutFixed64(&ts1, 1);
  330. Options ts_options;
  331. ts_options.create_if_missing = true;
  332. ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  333. // Test that user-defined timestamps are recovered from WAL regardless of
  334. // the value of this flag because UDTs are saved in WAL nonetheless.
  335. // We however need to explicitly disable flush during recovery by setting
  336. // `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
  337. // stripped when the `persist_user_defined_timestamps` flag is false, so that
  338. // all written timestamps are available for testing user-defined time travel
  339. // read.
  340. bool persist_udt = test::ShouldPersistUDT(GetParam());
  341. ts_options.persist_user_defined_timestamps = persist_udt;
  342. bool avoid_flush_during_recovery = true;
  343. std::string full_history_ts_low;
  344. ReadOptions read_opts;
  345. do {
  346. Slice ts_slice = ts1;
  347. read_opts.timestamp = &ts_slice;
  348. ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
  349. avoid_flush_during_recovery));
  350. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
  351. ASSERT_OK(Put(1, "foo", ts1, "v1"));
  352. ASSERT_OK(Put(1, "baz", ts1, "v5"));
  353. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
  354. avoid_flush_during_recovery));
  355. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
  356. // Do a timestamped read with ts1 after second reopen.
  357. CheckGet(read_opts, 1, "foo", "v1", ts1);
  358. CheckGet(read_opts, 1, "baz", "v5", ts1);
  359. // Write more value versions for key "foo" and "bar" before and after second
  360. // reopen.
  361. std::string ts2;
  362. PutFixed64(&ts2, 2);
  363. ASSERT_OK(Put(1, "bar", ts2, "v2"));
  364. ASSERT_OK(Put(1, "foo", ts2, "v3"));
  365. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
  366. avoid_flush_during_recovery));
  367. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
  368. std::string ts3;
  369. PutFixed64(&ts3, 3);
  370. ASSERT_OK(Put(1, "foo", ts3, "v4"));
  371. // All the key value pairs available for read:
  372. // "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")]
  373. // "bar" -> [(ts2, "v2")]
  374. // "baz" -> [(ts1, "v5")]
  375. // Do a timestamped read with ts1 after third reopen.
  376. // read_opts.timestamp is set to ts1 for below reads
  377. CheckGet(read_opts, 1, "foo", "v1", ts1);
  378. std::string value;
  379. ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
  380. CheckGet(read_opts, 1, "baz", "v5", ts1);
  381. // Do a timestamped read with ts2 after third reopen.
  382. ts_slice = ts2;
  383. // read_opts.timestamp is set to ts2 for below reads.
  384. CheckGet(read_opts, 1, "foo", "v3", ts2);
  385. CheckGet(read_opts, 1, "bar", "v2", ts2);
  386. CheckGet(read_opts, 1, "baz", "v5", ts1);
  387. // Do a timestamped read with ts3 after third reopen.
  388. ts_slice = ts3;
  389. // read_opts.timestamp is set to ts3 for below reads.
  390. CheckGet(read_opts, 1, "foo", "v4", ts3);
  391. CheckGet(read_opts, 1, "bar", "v2", ts2);
  392. CheckGet(read_opts, 1, "baz", "v5", ts1);
  393. ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
  394. ASSERT_TRUE(full_history_ts_low.empty());
  395. } while (ChangeWalOptions());
  396. }
  397. TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
  398. // Set up the option that enables user defined timestamp size.
  399. std::string min_ts;
  400. std::string write_ts;
  401. PutFixed64(&min_ts, 0);
  402. PutFixed64(&write_ts, 1);
  403. Options ts_options;
  404. ts_options.create_if_missing = true;
  405. ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  406. bool persist_udt = test::ShouldPersistUDT(GetParam());
  407. ts_options.persist_user_defined_timestamps = persist_udt;
  408. std::string smallest_ukey_without_ts = "baz";
  409. std::string largest_ukey_without_ts = "foo";
  410. ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
  411. // No flush, no sst files, because of no data.
  412. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
  413. ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
  414. ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));
  415. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
  416. // Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
  417. // defaults to false, created one L0 file.
  418. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);
  419. std::vector<std::vector<FileMetaData>> level_to_files;
  420. dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
  421. std::string full_history_ts_low;
  422. ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
  423. ASSERT_GT(level_to_files.size(), 1);
  424. // L0 only has one SST file.
  425. ASSERT_EQ(level_to_files[0].size(), 1);
  426. auto meta = level_to_files[0][0];
  427. if (persist_udt) {
  428. ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
  429. ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
  430. ASSERT_TRUE(full_history_ts_low.empty());
  431. } else {
  432. ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
  433. ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
  434. std::string effective_cutoff;
  435. Slice write_ts_slice = write_ts;
  436. GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff);
  437. ASSERT_EQ(effective_cutoff, full_history_ts_low);
  438. }
  439. }
  440. // Param 0: test mode for the user-defined timestamp feature
  441. INSTANTIATE_TEST_CASE_P(
  442. P, DBWALTestWithTimestamp,
  443. ::testing::Values(
  444. test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
  445. test::UserDefinedTimestampTestMode::kNormal));
  446. TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
  447. Options options;
  448. options.create_if_missing = true;
  449. options.comparator = BytewiseComparator();
  450. bool avoid_flush_during_recovery = true;
  451. ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */,
  452. avoid_flush_during_recovery));
  453. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
  454. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));
  455. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  456. options.persist_user_defined_timestamps = false;
  457. // Test handle timestamp size inconsistency in WAL when enabling user-defined
  458. // timestamps.
  459. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
  460. false /* persist_udt */,
  461. avoid_flush_during_recovery));
  462. std::string ts;
  463. PutFixed64(&ts, 0);
  464. Slice ts_slice = ts;
  465. ReadOptions read_opts;
  466. read_opts.timestamp = &ts_slice;
  467. // Pre-existing entries are treated as if they have the min timestamp.
  468. CheckGet(read_opts, 1, "foo", "v1", ts);
  469. CheckGet(read_opts, 1, "baz", "v5", ts);
  470. ts.clear();
  471. PutFixed64(&ts, 1);
  472. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
  473. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
  474. CheckGet(read_opts, 1, "foo", "v2", ts);
  475. CheckGet(read_opts, 1, "baz", "v6", ts);
  476. options.comparator = BytewiseComparator();
  477. // Open the column family again with the UDT feature disabled. Test handle
  478. // timestamp size inconsistency in WAL when disabling user-defined timestamps
  479. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
  480. true /* persist_udt */,
  481. avoid_flush_during_recovery));
  482. ASSERT_EQ("v2", Get(1, "foo"));
  483. ASSERT_EQ("v6", Get(1, "baz"));
  484. }
  485. TEST_F(DBWALTest, RecoverWithTableHandle) {
  486. do {
  487. Options options = CurrentOptions();
  488. options.create_if_missing = true;
  489. options.disable_auto_compactions = true;
  490. options.avoid_flush_during_recovery = false;
  491. DestroyAndReopen(options);
  492. CreateAndReopenWithCF({"pikachu"}, options);
  493. ASSERT_OK(Put(1, "foo", "v1"));
  494. ASSERT_OK(Put(1, "bar", "v2"));
  495. ASSERT_OK(Flush(1));
  496. ASSERT_OK(Put(1, "foo", "v3"));
  497. ASSERT_OK(Put(1, "bar", "v4"));
  498. ASSERT_OK(Flush(1));
  499. ASSERT_OK(Put(1, "big", std::string(100, 'a')));
  500. options = CurrentOptions();
  501. const int kSmallMaxOpenFiles = 13;
  502. if (option_config_ == kDBLogDir) {
  503. // Use this option to check not preloading files
  504. // Set the max open files to be small enough so no preload will
  505. // happen.
  506. options.max_open_files = kSmallMaxOpenFiles;
  507. // RocksDB sanitize max open files to at least 20. Modify it back.
  508. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  509. "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
  510. int* max_open_files = static_cast<int*>(arg);
  511. *max_open_files = kSmallMaxOpenFiles;
  512. });
  513. } else if (option_config_ == kWalDirAndMmapReads) {
  514. // Use this option to check always loading all files.
  515. options.max_open_files = 100;
  516. } else {
  517. options.max_open_files = -1;
  518. }
  519. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  520. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  521. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  522. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  523. std::vector<std::vector<FileMetaData>> files;
  524. dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
  525. size_t total_files = 0;
  526. for (const auto& level : files) {
  527. total_files += level.size();
  528. }
  529. ASSERT_EQ(total_files, 3);
  530. for (const auto& level : files) {
  531. for (const auto& file : level) {
  532. if (options.max_open_files == kSmallMaxOpenFiles) {
  533. ASSERT_TRUE(file.table_reader_handle == nullptr);
  534. } else {
  535. ASSERT_TRUE(file.table_reader_handle != nullptr);
  536. }
  537. }
  538. }
  539. } while (ChangeWalOptions());
  540. }
  541. TEST_F(DBWALTest, RecoverWithBlob) {
  542. // Write a value that's below the prospective size limit for blobs and another
  543. // one that's above. Note that blob files are not actually enabled at this
  544. // point.
  545. constexpr uint64_t min_blob_size = 10;
  546. constexpr char short_value[] = "short";
  547. static_assert(sizeof(short_value) - 1 < min_blob_size,
  548. "short_value too long");
  549. constexpr char long_value[] = "long_value";
  550. static_assert(sizeof(long_value) - 1 >= min_blob_size,
  551. "long_value too short");
  552. ASSERT_OK(Put("key1", short_value));
  553. ASSERT_OK(Put("key2", long_value));
  554. // There should be no files just yet since we haven't flushed.
  555. {
  556. VersionSet* const versions = dbfull()->GetVersionSet();
  557. ASSERT_NE(versions, nullptr);
  558. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  559. ASSERT_NE(cfd, nullptr);
  560. Version* const current = cfd->current();
  561. ASSERT_NE(current, nullptr);
  562. const VersionStorageInfo* const storage_info = current->storage_info();
  563. ASSERT_NE(storage_info, nullptr);
  564. ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
  565. ASSERT_TRUE(storage_info->GetBlobFiles().empty());
  566. }
  567. // Reopen the database with blob files enabled. A new table file/blob file
  568. // pair should be written during recovery.
  569. Options options;
  570. options.enable_blob_files = true;
  571. options.min_blob_size = min_blob_size;
  572. options.avoid_flush_during_recovery = false;
  573. options.disable_auto_compactions = true;
  574. options.env = env_;
  575. Reopen(options);
  576. ASSERT_EQ(Get("key1"), short_value);
  577. ASSERT_EQ(Get("key2"), long_value);
  578. VersionSet* const versions = dbfull()->GetVersionSet();
  579. ASSERT_NE(versions, nullptr);
  580. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  581. ASSERT_NE(cfd, nullptr);
  582. Version* const current = cfd->current();
  583. ASSERT_NE(current, nullptr);
  584. const VersionStorageInfo* const storage_info = current->storage_info();
  585. ASSERT_NE(storage_info, nullptr);
  586. const auto& l0_files = storage_info->LevelFiles(0);
  587. ASSERT_EQ(l0_files.size(), 1);
  588. const FileMetaData* const table_file = l0_files[0];
  589. ASSERT_NE(table_file, nullptr);
  590. const auto& blob_files = storage_info->GetBlobFiles();
  591. ASSERT_EQ(blob_files.size(), 1);
  592. const auto& blob_file = blob_files.front();
  593. ASSERT_NE(blob_file, nullptr);
  594. ASSERT_EQ(table_file->smallest.user_key(), "key1");
  595. ASSERT_EQ(table_file->largest.user_key(), "key2");
  596. ASSERT_EQ(table_file->fd.smallest_seqno, 1);
  597. ASSERT_EQ(table_file->fd.largest_seqno, 2);
  598. ASSERT_EQ(table_file->oldest_blob_file_number,
  599. blob_file->GetBlobFileNumber());
  600. ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
  601. const InternalStats* const internal_stats = cfd->internal_stats();
  602. ASSERT_NE(internal_stats, nullptr);
  603. const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
  604. ASSERT_FALSE(compaction_stats.empty());
  605. ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
  606. ASSERT_EQ(compaction_stats[0].bytes_written_blob,
  607. blob_file->GetTotalBlobBytes());
  608. ASSERT_EQ(compaction_stats[0].num_output_files, 1);
  609. ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
  610. const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
  611. ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
  612. compaction_stats[0].bytes_written +
  613. compaction_stats[0].bytes_written_blob);
  614. }
  615. TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
  616. // Write several large (4 KB) values without flushing. Note that blob files
  617. // are not actually enabled at this point.
  618. std::string large_value(1 << 12, 'a');
  619. constexpr int num_keys = 64;
  620. for (int i = 0; i < num_keys; ++i) {
  621. ASSERT_OK(Put(Key(i), large_value));
  622. }
  623. // There should be no files just yet since we haven't flushed.
  624. {
  625. VersionSet* const versions = dbfull()->GetVersionSet();
  626. ASSERT_NE(versions, nullptr);
  627. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  628. ASSERT_NE(cfd, nullptr);
  629. Version* const current = cfd->current();
  630. ASSERT_NE(current, nullptr);
  631. const VersionStorageInfo* const storage_info = current->storage_info();
  632. ASSERT_NE(storage_info, nullptr);
  633. ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
  634. ASSERT_TRUE(storage_info->GetBlobFiles().empty());
  635. }
  636. // Reopen the database with blob files enabled and write buffer size set to a
  637. // smaller value. Multiple table files+blob files should be written and added
  638. // to the Version during recovery.
  639. Options options;
  640. options.write_buffer_size = 1 << 16; // 64 KB
  641. options.enable_blob_files = true;
  642. options.avoid_flush_during_recovery = false;
  643. options.disable_auto_compactions = true;
  644. options.env = env_;
  645. Reopen(options);
  646. for (int i = 0; i < num_keys; ++i) {
  647. ASSERT_EQ(Get(Key(i)), large_value);
  648. }
  649. VersionSet* const versions = dbfull()->GetVersionSet();
  650. ASSERT_NE(versions, nullptr);
  651. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  652. ASSERT_NE(cfd, nullptr);
  653. Version* const current = cfd->current();
  654. ASSERT_NE(current, nullptr);
  655. const VersionStorageInfo* const storage_info = current->storage_info();
  656. ASSERT_NE(storage_info, nullptr);
  657. const auto& l0_files = storage_info->LevelFiles(0);
  658. ASSERT_GT(l0_files.size(), 1);
  659. const auto& blob_files = storage_info->GetBlobFiles();
  660. ASSERT_GT(blob_files.size(), 1);
  661. ASSERT_EQ(l0_files.size(), blob_files.size());
  662. }
  663. TEST_F(DBWALTest, WALWithChecksumHandoff) {
  664. #ifndef ROCKSDB_ASSERT_STATUS_CHECKED
  665. if (mem_env_ || encrypted_env_) {
  666. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  667. return;
  668. }
  669. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  670. new FaultInjectionTestFS(FileSystem::Default()));
  671. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  672. do {
  673. Options options = CurrentOptions();
  674. options.checksum_handoff_file_types.Add(FileType::kWalFile);
  675. options.env = fault_fs_env.get();
  676. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  677. CreateAndReopenWithCF({"pikachu"}, options);
  678. WriteOptions writeOpt = WriteOptions();
  679. writeOpt.disableWAL = true;
  680. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
  681. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  682. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  683. ASSERT_EQ("v1", Get(1, "foo"));
  684. ASSERT_EQ("v1", Get(1, "bar"));
  685. writeOpt.disableWAL = false;
  686. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
  687. writeOpt.disableWAL = true;
  688. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
  689. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  690. // Both value's should be present.
  691. ASSERT_EQ("v2", Get(1, "bar"));
  692. ASSERT_EQ("v2", Get(1, "foo"));
  693. writeOpt.disableWAL = true;
  694. // This put, data is persisted by Flush
  695. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
  696. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  697. writeOpt.disableWAL = false;
  698. // Data is persisted in the WAL
  699. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3"));
  700. ASSERT_OK(dbfull()->SyncWAL());
  701. // The hash does not match, write fails
  702. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  703. writeOpt.disableWAL = false;
  704. ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
  705. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  706. // Due to the write failure, Get should not find
  707. ASSERT_NE("v3", Get(1, "foo"));
  708. ASSERT_EQ("v3", Get(1, "zoo"));
  709. ASSERT_EQ("v3", Get(1, "bar"));
  710. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  711. // Each write will be similated as corrupted.
  712. fault_fs->IngestDataCorruptionBeforeWrite();
  713. writeOpt.disableWAL = true;
  714. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4"));
  715. writeOpt.disableWAL = false;
  716. ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4"));
  717. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  718. ASSERT_NE("v4", Get(1, "foo"));
  719. ASSERT_NE("v4", Get(1, "bar"));
  720. fault_fs->NoDataCorruptionBeforeWrite();
  721. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
  722. // The file system does not provide checksum method and verification.
  723. writeOpt.disableWAL = true;
  724. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5"));
  725. writeOpt.disableWAL = false;
  726. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5"));
  727. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  728. ASSERT_EQ("v5", Get(1, "foo"));
  729. ASSERT_EQ("v5", Get(1, "bar"));
  730. Destroy(options);
  731. } while (ChangeWalOptions());
  732. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  733. }
  734. TEST_F(DBWALTest, LockWal) {
  735. do {
  736. Options options = CurrentOptions();
  737. options.create_if_missing = true;
  738. DestroyAndReopen(options);
  739. ASSERT_OK(Put("foo", "v"));
  740. ASSERT_OK(Put("bar", "v"));
  741. ASSERT_OK(db_->LockWAL());
  742. // Verify writes are stopped
  743. WriteOptions wopts;
  744. wopts.no_slowdown = true;
  745. Status s = db_->Put(wopts, "foo", "dontcare");
  746. ASSERT_TRUE(s.IsIncomplete());
  747. {
  748. VectorLogPtr wals;
  749. ASSERT_OK(db_->GetSortedWalFiles(wals));
  750. ASSERT_FALSE(wals.empty());
  751. }
  752. port::Thread worker([&]() {
  753. Status tmp_s = db_->Flush(FlushOptions());
  754. ASSERT_OK(tmp_s);
  755. });
  756. FlushOptions flush_opts;
  757. flush_opts.wait = false;
  758. s = db_->Flush(flush_opts);
  759. ASSERT_TRUE(s.IsTryAgain());
  760. ASSERT_OK(db_->UnlockWAL());
  761. ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
  762. worker.join();
  763. } while (ChangeWalOptions());
  764. }
  765. class DBRecoveryTestBlobError
  766. : public DBWALTest,
  767. public testing::WithParamInterface<std::string> {
  768. public:
  769. DBRecoveryTestBlobError() : sync_point_(GetParam()) {}
  770. std::string sync_point_;
  771. };
  772. INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
  773. ::testing::ValuesIn(std::vector<std::string>{
  774. "BlobFileBuilder::WriteBlobToFile:AddRecord",
  775. "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
  776. TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
  777. // Write a value. Note that blob files are not actually enabled at this point.
  778. ASSERT_OK(Put("key", "blob"));
  779. // Reopen with blob files enabled but make blob file writing fail during
  780. // recovery.
  781. SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
  782. Status* const s = static_cast<Status*>(arg);
  783. assert(s);
  784. (*s) = Status::IOError(sync_point_);
  785. });
  786. SyncPoint::GetInstance()->EnableProcessing();
  787. Options options;
  788. options.enable_blob_files = true;
  789. options.avoid_flush_during_recovery = false;
  790. options.disable_auto_compactions = true;
  791. options.env = env_;
  792. ASSERT_NOK(TryReopen(options));
  793. SyncPoint::GetInstance()->DisableProcessing();
  794. SyncPoint::GetInstance()->ClearAllCallBacks();
  795. // Make sure the files generated by the failed recovery have been deleted.
  796. std::vector<std::string> files;
  797. ASSERT_OK(env_->GetChildren(dbname_, &files));
  798. for (const auto& file : files) {
  799. uint64_t number = 0;
  800. FileType type = kTableFile;
  801. if (!ParseFileName(file, &number, &type)) {
  802. continue;
  803. }
  804. ASSERT_NE(type, kTableFile);
  805. ASSERT_NE(type, kBlobFile);
  806. }
  807. }
  808. TEST_F(DBWALTest, IgnoreRecoveredLog) {
  809. std::string backup_logs = dbname_ + "/backup_logs";
  810. do {
  811. // delete old files in backup_logs directory
  812. ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
  813. std::vector<std::string> old_files;
  814. ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
  815. for (auto& file : old_files) {
  816. ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
  817. }
  818. Options options = CurrentOptions();
  819. options.create_if_missing = true;
  820. options.merge_operator = MergeOperators::CreateUInt64AddOperator();
  821. options.wal_dir = dbname_ + "/logs";
  822. DestroyAndReopen(options);
  823. // fill up the DB
  824. std::string one, two;
  825. PutFixed64(&one, 1);
  826. PutFixed64(&two, 2);
  827. ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
  828. ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
  829. ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
  830. // copy the logs to backup
  831. std::vector<std::string> logs;
  832. ASSERT_OK(env_->GetChildren(options.wal_dir, &logs));
  833. for (auto& log : logs) {
  834. CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
  835. }
  836. // recover the DB
  837. Reopen(options);
  838. ASSERT_EQ(two, Get("foo"));
  839. ASSERT_EQ(one, Get("bar"));
  840. Close();
  841. // copy the logs from backup back to wal dir
  842. for (auto& log : logs) {
  843. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  844. }
  845. // this should ignore the log files, recovery should not happen again
  846. // if the recovery happens, the same merge operator would be called twice,
  847. // leading to incorrect results
  848. Reopen(options);
  849. ASSERT_EQ(two, Get("foo"));
  850. ASSERT_EQ(one, Get("bar"));
  851. Close();
  852. Destroy(options);
  853. Reopen(options);
  854. Close();
  855. // copy the logs from backup back to wal dir
  856. ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
  857. for (auto& log : logs) {
  858. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  859. }
  860. // assert that we successfully recovered only from logs, even though we
  861. // destroyed the DB
  862. Reopen(options);
  863. ASSERT_EQ(two, Get("foo"));
  864. ASSERT_EQ(one, Get("bar"));
  865. // Recovery will fail if DB directory doesn't exist.
  866. Destroy(options);
  867. // copy the logs from backup back to wal dir
  868. ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
  869. for (auto& log : logs) {
  870. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  871. // we won't be needing this file no more
  872. ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log));
  873. }
  874. Status s = TryReopen(options);
  875. ASSERT_NOK(s);
  876. Destroy(options);
  877. } while (ChangeWalOptions());
  878. }
  879. TEST_F(DBWALTest, RecoveryWithEmptyLog) {
  880. do {
  881. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  882. ASSERT_OK(Put(1, "foo", "v1"));
  883. ASSERT_OK(Put(1, "foo", "v2"));
  884. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  885. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  886. ASSERT_OK(Put(1, "foo", "v3"));
  887. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  888. ASSERT_EQ("v3", Get(1, "foo"));
  889. } while (ChangeWalOptions());
  890. }
  891. #if !(defined NDEBUG) || !defined(OS_WIN)
  892. TEST_F(DBWALTest, PreallocateBlock) {
  893. Options options = CurrentOptions();
  894. options.write_buffer_size = 10 * 1000 * 1000;
  895. options.max_total_wal_size = 0;
  896. size_t expected_preallocation_size = static_cast<size_t>(
  897. options.write_buffer_size + options.write_buffer_size / 10);
  898. DestroyAndReopen(options);
  899. std::atomic<int> called(0);
  900. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  901. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  902. ASSERT_TRUE(arg != nullptr);
  903. size_t preallocation_size = *(static_cast<size_t*>(arg));
  904. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  905. called.fetch_add(1);
  906. });
  907. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  908. ASSERT_OK(Put("", ""));
  909. ASSERT_OK(Flush());
  910. ASSERT_OK(Put("", ""));
  911. Close();
  912. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  913. ASSERT_EQ(2, called.load());
  914. options.max_total_wal_size = 1000 * 1000;
  915. expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
  916. Reopen(options);
  917. called.store(0);
  918. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  919. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  920. ASSERT_TRUE(arg != nullptr);
  921. size_t preallocation_size = *(static_cast<size_t*>(arg));
  922. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  923. called.fetch_add(1);
  924. });
  925. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  926. ASSERT_OK(Put("", ""));
  927. ASSERT_OK(Flush());
  928. ASSERT_OK(Put("", ""));
  929. Close();
  930. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  931. ASSERT_EQ(2, called.load());
  932. options.db_write_buffer_size = 800 * 1000;
  933. expected_preallocation_size =
  934. static_cast<size_t>(options.db_write_buffer_size);
  935. Reopen(options);
  936. called.store(0);
  937. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  938. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  939. ASSERT_TRUE(arg != nullptr);
  940. size_t preallocation_size = *(static_cast<size_t*>(arg));
  941. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  942. called.fetch_add(1);
  943. });
  944. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  945. ASSERT_OK(Put("", ""));
  946. ASSERT_OK(Flush());
  947. ASSERT_OK(Put("", ""));
  948. Close();
  949. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  950. ASSERT_EQ(2, called.load());
  951. expected_preallocation_size = 700 * 1000;
  952. std::shared_ptr<WriteBufferManager> write_buffer_manager =
  953. std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
  954. options.write_buffer_manager = write_buffer_manager;
  955. Reopen(options);
  956. called.store(0);
  957. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  958. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  959. ASSERT_TRUE(arg != nullptr);
  960. size_t preallocation_size = *(static_cast<size_t*>(arg));
  961. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  962. called.fetch_add(1);
  963. });
  964. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  965. ASSERT_OK(Put("", ""));
  966. ASSERT_OK(Flush());
  967. ASSERT_OK(Put("", ""));
  968. Close();
  969. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  970. ASSERT_EQ(2, called.load());
  971. }
  972. #endif // !(defined NDEBUG) || !defined(OS_WIN)
  973. TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
  974. // For github issue #1303
  975. for (int i = 0; i < 2; ++i) {
  976. Options options = CurrentOptions();
  977. options.create_if_missing = true;
  978. options.recycle_log_file_num = 2;
  979. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  980. if (i != 0) {
  981. options.wal_dir = alternative_wal_dir_;
  982. }
  983. DestroyAndReopen(options);
  984. ASSERT_OK(Put("foo", "v1"));
  985. VectorLogPtr log_files;
  986. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  987. ASSERT_GT(log_files.size(), 0);
  988. ASSERT_OK(Flush());
  989. // Now the original WAL is in log_files[0] and should be marked for
  990. // recycling.
  991. // Verify full purge cannot remove this file.
  992. JobContext job_context(0);
  993. dbfull()->TEST_LockMutex();
  994. dbfull()->FindObsoleteFiles(&job_context, true /* force */);
  995. dbfull()->TEST_UnlockMutex();
  996. dbfull()->PurgeObsoleteFiles(job_context);
  997. job_context.Clean();
  998. if (i == 0) {
  999. ASSERT_OK(
  1000. env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
  1001. } else {
  1002. ASSERT_OK(env_->FileExists(
  1003. LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
  1004. }
  1005. }
  1006. }
  1007. TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
  1008. // Ensures full purge cannot delete a WAL while it's in the process of being
  1009. // recycled. In particular, we force the full purge after a file has been
  1010. // chosen for reuse, but before it has been renamed.
  1011. for (int i = 0; i < 2; ++i) {
  1012. Options options = CurrentOptions();
  1013. options.recycle_log_file_num = 1;
  1014. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1015. if (i != 0) {
  1016. options.wal_dir = alternative_wal_dir_;
  1017. }
  1018. DestroyAndReopen(options);
  1019. // The first flush creates a second log so writes can continue before the
  1020. // flush finishes.
  1021. ASSERT_OK(Put("foo", "bar"));
  1022. ASSERT_OK(Flush());
  1023. // The second flush can recycle the first log. Sync points enforce the
  1024. // full purge happens after choosing the log to recycle and before it is
  1025. // renamed.
  1026. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  1027. {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
  1028. "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
  1029. {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
  1030. "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
  1031. });
  1032. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1033. ROCKSDB_NAMESPACE::port::Thread thread([&]() {
  1034. TEST_SYNC_POINT(
  1035. "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
  1036. ASSERT_OK(db_->EnableFileDeletions());
  1037. TEST_SYNC_POINT(
  1038. "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
  1039. });
  1040. ASSERT_OK(Put("foo", "bar"));
  1041. ASSERT_OK(Flush());
  1042. thread.join();
  1043. }
  1044. }
  1045. TEST_F(DBWALTest, GetSortedWalFiles) {
  1046. do {
  1047. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  1048. VectorLogPtr log_files;
  1049. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  1050. ASSERT_EQ(0, log_files.size());
  1051. ASSERT_OK(Put(1, "foo", "v1"));
  1052. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  1053. ASSERT_EQ(1, log_files.size());
  1054. } while (ChangeWalOptions());
  1055. }
  1056. TEST_F(DBWALTest, GetCurrentWalFile) {
  1057. do {
  1058. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  1059. std::unique_ptr<LogFile>* bad_log_file = nullptr;
  1060. ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
  1061. std::unique_ptr<LogFile> log_file;
  1062. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  1063. // nothing has been written to the log yet
  1064. ASSERT_EQ(log_file->StartSequence(), 0);
  1065. ASSERT_EQ(log_file->SizeFileBytes(), 0);
  1066. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  1067. ASSERT_GT(log_file->LogNumber(), 0);
  1068. // add some data and verify that the file size actually moves foward
  1069. ASSERT_OK(Put(0, "foo", "v1"));
  1070. ASSERT_OK(Put(0, "foo2", "v2"));
  1071. ASSERT_OK(Put(0, "foo3", "v3"));
  1072. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  1073. ASSERT_EQ(log_file->StartSequence(), 0);
  1074. ASSERT_GT(log_file->SizeFileBytes(), 0);
  1075. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  1076. ASSERT_GT(log_file->LogNumber(), 0);
  1077. // force log files to cycle and add some more data, then check if
  1078. // log number moves forward
  1079. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  1080. for (int i = 0; i < 10; i++) {
  1081. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  1082. }
  1083. ASSERT_OK(Put(0, "foo4", "v4"));
  1084. ASSERT_OK(Put(0, "foo5", "v5"));
  1085. ASSERT_OK(Put(0, "foo6", "v6"));
  1086. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  1087. ASSERT_EQ(log_file->StartSequence(), 0);
  1088. ASSERT_GT(log_file->SizeFileBytes(), 0);
  1089. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  1090. ASSERT_GT(log_file->LogNumber(), 0);
  1091. } while (ChangeWalOptions());
  1092. }
  1093. TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
  1094. // Test for regression of WAL cleanup missing files that don't contain data
  1095. // for every column family.
  1096. do {
  1097. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  1098. ASSERT_OK(Put(1, "foo", "v1"));
  1099. ASSERT_OK(Put(1, "foo", "v2"));
  1100. uint64_t earliest_log_nums[2];
  1101. for (int i = 0; i < 2; ++i) {
  1102. if (i > 0) {
  1103. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  1104. }
  1105. VectorLogPtr log_files;
  1106. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  1107. if (log_files.size() > 0) {
  1108. earliest_log_nums[i] = log_files[0]->LogNumber();
  1109. } else {
  1110. earliest_log_nums[i] = std::numeric_limits<uint64_t>::max();
  1111. }
  1112. }
  1113. // Check at least the first WAL was cleaned up during the recovery.
  1114. ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
  1115. } while (ChangeWalOptions());
  1116. }
  1117. TEST_F(DBWALTest, RecoverWithLargeLog) {
  1118. do {
  1119. {
  1120. Options options = CurrentOptions();
  1121. CreateAndReopenWithCF({"pikachu"}, options);
  1122. ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
  1123. ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
  1124. ASSERT_OK(Put(1, "small3", std::string(10, '3')));
  1125. ASSERT_OK(Put(1, "small4", std::string(10, '4')));
  1126. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
  1127. }
  1128. // Make sure that if we re-open with a small write buffer size that
  1129. // we flush table files in the middle of a large log file.
  1130. Options options;
  1131. options.write_buffer_size = 100000;
  1132. options = CurrentOptions(options);
  1133. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  1134. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
  1135. ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
  1136. ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
  1137. ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
  1138. ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
  1139. ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
  1140. } while (ChangeWalOptions());
  1141. }
  1142. // In https://reviews.facebook.net/D20661 we change
  1143. // recovery behavior: previously for each log file each column family
  1144. // memtable was flushed, even it was empty. Now it's changed:
  1145. // we try to create the smallest number of table files by merging
  1146. // updates from multiple logs
  1147. TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
  1148. Options options = CurrentOptions();
  1149. options.write_buffer_size = 5000000;
  1150. CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
  1151. // Since we will reopen DB with smaller write_buffer_size,
  1152. // each key will go to new SST file
  1153. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  1154. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  1155. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  1156. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  1157. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  1158. // Make 'dobrynia' to be flushed and new WAL file to be created
  1159. ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
  1160. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  1161. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
  1162. {
  1163. auto tables = ListTableFiles(env_, dbname_);
  1164. ASSERT_EQ(tables.size(), static_cast<size_t>(1));
  1165. // Make sure 'dobrynia' was flushed: check sst files amount
  1166. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  1167. static_cast<uint64_t>(1));
  1168. }
  1169. // New WAL file
  1170. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  1171. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  1172. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  1173. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  1174. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  1175. options.write_buffer_size = 4096;
  1176. options.arena_block_size = 4096;
  1177. ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
  1178. options);
  1179. {
  1180. // No inserts => default is empty
  1181. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
  1182. static_cast<uint64_t>(0));
  1183. // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
  1184. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
  1185. static_cast<uint64_t>(5));
  1186. // 1 SST for big key + 1 SST for small one
  1187. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  1188. static_cast<uint64_t>(2));
  1189. // 1 SST for all keys
  1190. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  1191. static_cast<uint64_t>(1));
  1192. }
  1193. }
  1194. // In https://reviews.facebook.net/D20661 we change
  1195. // recovery behavior: previously for each log file each column family
  1196. // memtable was flushed, even it wasn't empty. Now it's changed:
  1197. // we try to create the smallest number of table files by merging
  1198. // updates from multiple logs
  1199. TEST_F(DBWALTest, RecoverCheckFileAmount) {
  1200. Options options = CurrentOptions();
  1201. options.write_buffer_size = 100000;
  1202. options.arena_block_size = 4 * 1024;
  1203. options.avoid_flush_during_recovery = false;
  1204. CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
  1205. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  1206. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  1207. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  1208. // Make 'nikitich' memtable to be flushed
  1209. ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
  1210. ASSERT_OK(Put(3, Key(1), DummyString(1)));
  1211. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
  1212. // 4 memtable are not flushed, 1 sst file
  1213. {
  1214. auto tables = ListTableFiles(env_, dbname_);
  1215. ASSERT_EQ(tables.size(), static_cast<size_t>(1));
  1216. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  1217. static_cast<uint64_t>(1));
  1218. }
  1219. // Memtable for 'nikitich' has flushed, new WAL file has opened
  1220. // 4 memtable still not flushed
  1221. // Write to new WAL file
  1222. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  1223. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  1224. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  1225. // Fill up 'nikitich' one more time
  1226. ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
  1227. // make it flush
  1228. ASSERT_OK(Put(3, Key(1), DummyString(1)));
  1229. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
  1230. // There are still 4 memtable not flushed, and 2 sst tables
  1231. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  1232. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  1233. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  1234. {
  1235. auto tables = ListTableFiles(env_, dbname_);
  1236. ASSERT_EQ(tables.size(), static_cast<size_t>(2));
  1237. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  1238. static_cast<uint64_t>(2));
  1239. }
  1240. ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
  1241. options);
  1242. {
  1243. std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
  1244. // Check, that records for 'default', 'dobrynia' and 'pikachu' from
  1245. // first, second and third WALs went to the same SST.
  1246. // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
  1247. // 'dobrynia', one for 'pikachu'
  1248. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
  1249. static_cast<uint64_t>(1));
  1250. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  1251. static_cast<uint64_t>(3));
  1252. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  1253. static_cast<uint64_t>(1));
  1254. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
  1255. static_cast<uint64_t>(1));
  1256. }
  1257. }
  1258. TEST_F(DBWALTest, SyncMultipleLogs) {
  1259. const uint64_t kNumBatches = 2;
  1260. const int kBatchSize = 1000;
  1261. Options options = CurrentOptions();
  1262. options.create_if_missing = true;
  1263. options.write_buffer_size = 4096;
  1264. Reopen(options);
  1265. WriteBatch batch;
  1266. WriteOptions wo;
  1267. wo.sync = true;
  1268. for (uint64_t b = 0; b < kNumBatches; b++) {
  1269. batch.Clear();
  1270. for (int i = 0; i < kBatchSize; i++) {
  1271. ASSERT_OK(batch.Put(Key(i), DummyString(128)));
  1272. }
  1273. ASSERT_OK(dbfull()->Write(wo, &batch));
  1274. }
  1275. ASSERT_OK(dbfull()->SyncWAL());
  1276. }
  1277. TEST_F(DBWALTest, DISABLED_RecycleMultipleWalsCrash) {
  1278. Options options = CurrentOptions();
  1279. options.max_write_buffer_number = 5;
  1280. options.track_and_verify_wals_in_manifest = true;
  1281. options.max_bgerror_resume_count = 0; // manual resume
  1282. options.recycle_log_file_num = 3;
  1283. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1284. // Disable truncating recycled WALs to new size in posix env
  1285. // (approximating a crash)
  1286. SyncPoint::GetInstance()->SetCallBack(
  1287. "PosixWritableFile::Close",
  1288. [](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
  1289. SyncPoint::GetInstance()->EnableProcessing();
  1290. // Re-open with desired options
  1291. DestroyAndReopen(options);
  1292. Defer closer([this]() { Close(); });
  1293. // Ensure WAL recycling wasn't sanitized away
  1294. ASSERT_EQ(db_->GetOptions().recycle_log_file_num,
  1295. options.recycle_log_file_num);
  1296. // Prepare external files for later ingestion
  1297. std::string sst_files_dir = dbname_ + "/sst_files/";
  1298. ASSERT_OK(DestroyDir(env_, sst_files_dir));
  1299. ASSERT_OK(env_->CreateDir(sst_files_dir));
  1300. std::string external_file1 = sst_files_dir + "file1.sst";
  1301. {
  1302. SstFileWriter sst_file_writer(EnvOptions(), options);
  1303. ASSERT_OK(sst_file_writer.Open(external_file1));
  1304. ASSERT_OK(sst_file_writer.Put("external1", "ex1"));
  1305. ExternalSstFileInfo file_info;
  1306. ASSERT_OK(sst_file_writer.Finish(&file_info));
  1307. }
  1308. std::string external_file2 = sst_files_dir + "file2.sst";
  1309. {
  1310. SstFileWriter sst_file_writer(EnvOptions(), options);
  1311. ASSERT_OK(sst_file_writer.Open(external_file2));
  1312. ASSERT_OK(sst_file_writer.Put("external2", "ex2"));
  1313. ExternalSstFileInfo file_info;
  1314. ASSERT_OK(sst_file_writer.Finish(&file_info));
  1315. }
  1316. // Populate some WALs to be recycled such that there will be extra data
  1317. // from an old incarnation of the WAL on recovery
  1318. ASSERT_OK(db_->PauseBackgroundWork());
  1319. ASSERT_OK(Put("ignore1", Random::GetTLSInstance()->RandomString(500)));
  1320. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1321. ASSERT_OK(Put("ignore2", Random::GetTLSInstance()->RandomString(500)));
  1322. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1323. ASSERT_OK(db_->ContinueBackgroundWork());
  1324. ASSERT_OK(Flush());
  1325. ASSERT_OK(Put("ignore3", Random::GetTLSInstance()->RandomString(500)));
  1326. ASSERT_OK(Flush());
  1327. // Verify expected log files (still there for recycling)
  1328. std::vector<FileAttributes> files;
  1329. int log_count = 0;
  1330. ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
  1331. for (const auto& f : files) {
  1332. if (EndsWith(f.name, ".log")) {
  1333. EXPECT_GT(f.size_bytes, 500);
  1334. ++log_count;
  1335. }
  1336. }
  1337. EXPECT_EQ(log_count, 3);
  1338. // (Re-used recipe) Generate two inactive WALs and one active WAL, with a
  1339. // gap in sequence numbers to interfere with recovery
  1340. ASSERT_OK(db_->PauseBackgroundWork());
  1341. ASSERT_OK(Put("key1", "val1"));
  1342. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1343. ASSERT_OK(Put("key2", "val2"));
  1344. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1345. // Need a gap in sequence numbers, so e.g. ingest external file
  1346. // with an open snapshot
  1347. {
  1348. ManagedSnapshot snapshot(db_);
  1349. ASSERT_OK(
  1350. db_->IngestExternalFile({external_file1}, IngestExternalFileOptions()));
  1351. }
  1352. ASSERT_OK(Put("key3", "val3"));
  1353. ASSERT_OK(db_->SyncWAL());
  1354. // Need an SST file that is logically after that WAL, so that dropping WAL
  1355. // data is not a valid point in time.
  1356. {
  1357. ManagedSnapshot snapshot(db_);
  1358. ASSERT_OK(
  1359. db_->IngestExternalFile({external_file2}, IngestExternalFileOptions()));
  1360. }
  1361. // Approximate a crash, with respect to recycled WAL data extending past
  1362. // the end of the current WAL data (see SyncPoint callback above)
  1363. Close();
  1364. // Verify recycled log files haven't been truncated
  1365. files.clear();
  1366. log_count = 0;
  1367. ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
  1368. for (const auto& f : files) {
  1369. if (EndsWith(f.name, ".log")) {
  1370. EXPECT_GT(f.size_bytes, 500);
  1371. ++log_count;
  1372. }
  1373. }
  1374. EXPECT_EQ(log_count, 3);
  1375. // Verify no data loss after reopen.
  1376. Reopen(options);
  1377. EXPECT_EQ("val1", Get("key1"));
  1378. EXPECT_EQ("val2", Get("key2")); // Passes because of adjacent seqnos
  1379. EXPECT_EQ("ex1", Get("external1"));
  1380. EXPECT_EQ("val3", Get("key3")); // <- ONLY FAILURE! (Not a point in time)
  1381. EXPECT_EQ("ex2", Get("external2"));
  1382. SyncPoint::GetInstance()->DisableProcessing();
  1383. SyncPoint::GetInstance()->ClearAllCallBacks();
  1384. }
  1385. TEST_F(DBWALTest, SyncWalPartialFailure) {
  1386. class MyTestFileSystem : public FileSystemWrapper {
  1387. public:
  1388. explicit MyTestFileSystem(std::shared_ptr<FileSystem> base)
  1389. : FileSystemWrapper(std::move(base)) {}
  1390. static const char* kClassName() { return "MyTestFileSystem"; }
  1391. const char* Name() const override { return kClassName(); }
  1392. IOStatus NewWritableFile(const std::string& fname,
  1393. const FileOptions& file_opts,
  1394. std::unique_ptr<FSWritableFile>* result,
  1395. IODebugContext* dbg) override {
  1396. IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
  1397. if (s.ok()) {
  1398. *result =
  1399. std::make_unique<MyTestWritableFile>(std::move(*result), *this);
  1400. }
  1401. return s;
  1402. }
  1403. AcqRelAtomic<uint32_t> syncs_before_failure_{UINT32_MAX};
  1404. protected:
  1405. class MyTestWritableFile : public FSWritableFileOwnerWrapper {
  1406. public:
  1407. MyTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
  1408. MyTestFileSystem& fs)
  1409. : FSWritableFileOwnerWrapper(std::move(file)), fs_(fs) {}
  1410. IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
  1411. int prev_val = fs_.syncs_before_failure_.FetchSub(1);
  1412. if (prev_val == 0) {
  1413. return IOStatus::IOError("fault");
  1414. } else {
  1415. return target()->Sync(options, dbg);
  1416. }
  1417. }
  1418. protected:
  1419. MyTestFileSystem& fs_;
  1420. };
  1421. };
  1422. Options options = CurrentOptions();
  1423. options.max_write_buffer_number = 4;
  1424. options.track_and_verify_wals_in_manifest = true;
  1425. options.max_bgerror_resume_count = 0; // manual resume
  1426. auto custom_fs =
  1427. std::make_shared<MyTestFileSystem>(options.env->GetFileSystem());
  1428. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(custom_fs));
  1429. options.env = fault_fs_env.get();
  1430. Reopen(options);
  1431. Defer closer([this]() { Close(); });
  1432. // This is the simplest way to get
  1433. // * one inactive WAL, synced
  1434. // * one inactive WAL, not synced, and
  1435. // * one active WAL, not synced
  1436. // with a single thread, to exercise as much logic as we reasonably can.
  1437. ASSERT_OK(db_->PauseBackgroundWork());
  1438. ASSERT_OK(Put("key1", "val1"));
  1439. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1440. ASSERT_OK(db_->SyncWAL());
  1441. ASSERT_OK(Put("key2", "val2"));
  1442. ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
  1443. ASSERT_OK(Put("key3", "val3"));
  1444. // Allow 1 of the WALs to sync, but another won't
  1445. custom_fs->syncs_before_failure_.Store(1);
  1446. ASSERT_NOK(db_->SyncWAL());
  1447. // Stuck in this state. (This could previously cause a segfault.)
  1448. ASSERT_NOK(db_->SyncWAL());
  1449. // Can't Resume because WAL write failure is considered non-recoverable,
  1450. // regardless of the IOStatus itself. (Can/should be fixed?)
  1451. ASSERT_NOK(db_->Resume());
  1452. // Verify no data loss after reopen.
  1453. // Also Close() could previously crash in this state.
  1454. Reopen(options);
  1455. ASSERT_EQ("val1", Get("key1"));
  1456. ASSERT_EQ("val2", Get("key2"));
  1457. ASSERT_EQ("val3", Get("key3"));
  1458. }
  1459. // Github issue 1339. Prior the fix we read sequence id from the first log to
  1460. // a local variable, then keep increase the variable as we replay logs,
  1461. // ignoring actual sequence id of the records. This is incorrect if some writes
  1462. // come with WAL disabled.
  1463. TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
  1464. std::unique_ptr<FaultInjectionTestEnv> fault_env(
  1465. new FaultInjectionTestEnv(env_));
  1466. Options options = CurrentOptions();
  1467. options.env = fault_env.get();
  1468. options.disable_auto_compactions = true;
  1469. WriteOptions wal_on, wal_off;
  1470. wal_on.sync = true;
  1471. wal_on.disableWAL = false;
  1472. wal_off.disableWAL = true;
  1473. CreateAndReopenWithCF({"dummy"}, options);
  1474. ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
  1475. ASSERT_OK(Put(1, "dummy", "d2", wal_off));
  1476. ASSERT_OK(Put(1, "dummy", "d3", wal_off));
  1477. ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
  1478. ASSERT_OK(Flush(0));
  1479. ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
  1480. ASSERT_EQ("v5", Get(0, "key"));
  1481. ASSERT_OK(dbfull()->FlushWAL(false));
  1482. // Simulate a crash.
  1483. fault_env->SetFilesystemActive(false);
  1484. Close();
  1485. fault_env->ResetState();
  1486. ReopenWithColumnFamilies({"default", "dummy"}, options);
  1487. // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
  1488. ASSERT_EQ("v5", Get(0, "key"));
  1489. // Destroy DB before destruct fault_env.
  1490. Destroy(options);
  1491. }
  1492. //
  1493. // Test WAL recovery for the various modes available
  1494. //
  1495. class RecoveryTestHelper {
  1496. public:
  1497. // Number of WAL files to generate
  1498. static constexpr int kWALFilesCount = 10;
  1499. // Starting number for the WAL file name like 00010.log
  1500. static constexpr int kWALFileOffset = 10;
  1501. // Keys to be written per WAL file
  1502. static constexpr int kKeysPerWALFile = 133;
  1503. // Size of the value
  1504. static constexpr int kValueSize = 96;
  1505. // Create WAL files with values filled in
  1506. static void FillData(DBWALTestBase* test, const Options& options,
  1507. const size_t wal_count, size_t* count) {
  1508. // Calling internal functions requires sanitized options.
  1509. Options sanitized_options = SanitizeOptions(test->dbname_, options);
  1510. const ImmutableDBOptions db_options(sanitized_options);
  1511. *count = 0;
  1512. std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
  1513. FileOptions file_options;
  1514. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  1515. std::unique_ptr<VersionSet> versions;
  1516. std::unique_ptr<WalManager> wal_manager;
  1517. WriteController write_controller;
  1518. versions.reset(new VersionSet(
  1519. test->dbname_, &db_options, file_options, table_cache.get(),
  1520. &write_buffer_manager, &write_controller,
  1521. /*block_cache_tracer=*/nullptr,
  1522. /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
  1523. options.daily_offpeak_time_utc,
  1524. /*error_handler=*/nullptr, /*read_only=*/false));
  1525. wal_manager.reset(
  1526. new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
  1527. std::unique_ptr<log::Writer> current_log_writer;
  1528. for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
  1529. uint64_t current_log_number = j;
  1530. std::string fname = LogFileName(test->dbname_, current_log_number);
  1531. std::unique_ptr<WritableFileWriter> file_writer;
  1532. ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
  1533. fname, file_options, &file_writer,
  1534. nullptr));
  1535. log::Writer* log_writer =
  1536. new log::Writer(std::move(file_writer), current_log_number,
  1537. db_options.recycle_log_file_num > 0, false,
  1538. db_options.wal_compression);
  1539. ASSERT_OK(log_writer->AddCompressionTypeRecord(WriteOptions()));
  1540. current_log_writer.reset(log_writer);
  1541. WriteBatch batch;
  1542. for (int i = 0; i < kKeysPerWALFile; i++) {
  1543. std::string key = "key" + std::to_string((*count)++);
  1544. std::string value = test->DummyString(kValueSize);
  1545. ASSERT_NE(current_log_writer.get(), nullptr);
  1546. uint64_t seq = versions->LastSequence() + 1;
  1547. batch.Clear();
  1548. ASSERT_OK(batch.Put(key, value));
  1549. WriteBatchInternal::SetSequence(&batch, seq);
  1550. ASSERT_OK(current_log_writer->AddRecord(
  1551. WriteOptions(), WriteBatchInternal::Contents(&batch)));
  1552. versions->SetLastAllocatedSequence(seq);
  1553. versions->SetLastPublishedSequence(seq);
  1554. versions->SetLastSequence(seq);
  1555. }
  1556. }
  1557. }
  1558. // Recreate and fill the store with some data
  1559. static size_t FillData(DBWALTestBase* test, Options* options) {
  1560. options->create_if_missing = true;
  1561. test->DestroyAndReopen(*options);
  1562. test->Close();
  1563. size_t count = 0;
  1564. FillData(test, *options, kWALFilesCount, &count);
  1565. return count;
  1566. }
  1567. // Read back all the keys we wrote and return the number of keys found
  1568. static size_t GetData(DBWALTestBase* test) {
  1569. size_t count = 0;
  1570. for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
  1571. if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") {
  1572. ++count;
  1573. }
  1574. }
  1575. return count;
  1576. }
  1577. // Manuall corrupt the specified WAL
  1578. static void CorruptWAL(DBWALTestBase* test, const Options& options,
  1579. const double off, const double len,
  1580. const int wal_file_id, const bool trunc = false) {
  1581. Env* env = options.env;
  1582. std::string fname = LogFileName(test->dbname_, wal_file_id);
  1583. uint64_t size;
  1584. ASSERT_OK(env->GetFileSize(fname, &size));
  1585. ASSERT_GT(size, 0);
  1586. #ifdef OS_WIN
  1587. // Windows disk cache behaves differently. When we truncate
  1588. // the original content is still in the cache due to the original
  1589. // handle is still open. Generally, in Windows, one prohibits
  1590. // shared access to files and it is not needed for WAL but we allow
  1591. // it to induce corruption at various tests.
  1592. test->Close();
  1593. #endif
  1594. if (trunc) {
  1595. ASSERT_OK(
  1596. test::TruncateFile(env, fname, static_cast<uint64_t>(size * off)));
  1597. } else {
  1598. ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8),
  1599. static_cast<int>(size * len), false));
  1600. }
  1601. }
  1602. };
  1603. TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) {
  1604. Options options = CurrentOptions();
  1605. options.avoid_flush_during_shutdown = true;
  1606. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1607. options.recycle_log_file_num = 1;
  1608. options.track_and_verify_wals = true;
  1609. DestroyAndReopen(options);
  1610. ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
  1611. ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
  1612. ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
  1613. ASSERT_OK(Flush());
  1614. ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
  1615. ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
  1616. ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
  1617. ASSERT_OK(Flush());
  1618. // Stop background flush to avoid deleting any WAL
  1619. options.env->SetBackgroundThreads(1, Env::HIGH);
  1620. test::SleepingBackgroundTask sleeping_task;
  1621. options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  1622. &sleeping_task, Env::Priority::HIGH);
  1623. // Recycle the first WAL
  1624. ASSERT_OK(Put("key1", "old_value"));
  1625. // Recycle the second WAL
  1626. ASSERT_OK(dbfull()->TEST_SwitchWAL());
  1627. ASSERT_OK(Put("key1", "new_value"));
  1628. // Create a WAL hole on sequence number by truncating the first WAL to 0 byte
  1629. VectorWalPtr log_files;
  1630. ASSERT_OK(db_->GetSortedWalFiles(log_files));
  1631. ASSERT_EQ(log_files.size(), 2);
  1632. std::string log_name = LogFileName(dbname_, log_files.front()->LogNumber());
  1633. Close();
  1634. // Drop `Put("key1", "old_value")` in the first WAL
  1635. ASSERT_OK(test::TruncateFile(options.env, log_name, 0 /* new_length */));
  1636. Status s = DB::Open(options, dbname_, &db_);
  1637. ASSERT_OK(s);
  1638. ASSERT_EQ("wal_to_recycle", Get("key_ignore2"));
  1639. ASSERT_EQ("NOT_FOUND", Get("key1"));
  1640. Close();
  1641. }
  1642. class DBWALTrackAndVerifyWALsWithParamsTest
  1643. : public DBWALTestBase,
  1644. public ::testing::WithParamInterface<WALRecoveryMode> {
  1645. public:
  1646. DBWALTrackAndVerifyWALsWithParamsTest()
  1647. : DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {}
  1648. };
  1649. INSTANTIATE_TEST_CASE_P(
  1650. DBWALTrackAndVerifyWALsWithParamsTest,
  1651. DBWALTrackAndVerifyWALsWithParamsTest,
  1652. ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
  1653. WALRecoveryMode::kAbsoluteConsistency,
  1654. WALRecoveryMode::kPointInTimeRecovery,
  1655. WALRecoveryMode::kSkipAnyCorruptedRecords));
  1656. TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) {
  1657. Options options = CurrentOptions();
  1658. options.avoid_flush_during_shutdown = true;
  1659. options.track_and_verify_wals = true;
  1660. options.wal_recovery_mode = GetParam();
  1661. // Stop background flush to avoid deleting any WAL
  1662. options.env->SetBackgroundThreads(1, Env::HIGH);
  1663. test::SleepingBackgroundTask sleeping_task;
  1664. options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  1665. &sleeping_task, Env::Priority::HIGH);
  1666. for (int i = 0; i < 5; i++) {
  1667. DestroyAndReopen(options);
  1668. ASSERT_OK(Put("key1", "old_value"));
  1669. SequenceNumber last_seqno_recorded_in_fist_wal =
  1670. dbfull()->GetLatestSequenceNumber();
  1671. ASSERT_OK(dbfull()->TEST_SwitchWAL());
  1672. ASSERT_OK(Put("key1", "new_value"));
  1673. VectorWalPtr log_files;
  1674. ASSERT_OK(db_->GetSortedWalFiles(log_files));
  1675. ASSERT_EQ(log_files.size(), 2);
  1676. uint64_t first_log_number = log_files.front()->LogNumber();
  1677. std::string first_log_name = LogFileName(dbname_, first_log_number);
  1678. std::string second_log_name =
  1679. LogFileName(dbname_, log_files.back()->LogNumber());
  1680. if (i == 0) {
  1681. // Delete the obsolete WAL and verify it will not be seen as a WAL hole
  1682. sleeping_task.WakeUp();
  1683. sleeping_task.WaitUntilDone();
  1684. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  1685. // Stop background flush to avoid deleting any WAL
  1686. sleeping_task.Reset();
  1687. options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  1688. &sleeping_task, Env::Priority::HIGH);
  1689. Close();
  1690. } else if (i == 1) {
  1691. // Create a WAL hole on WAL number by deleting the first WAL and verify
  1692. // the hole will be detected
  1693. Close();
  1694. ASSERT_OK(options.env->DeleteFile(first_log_name));
  1695. } else if (i == 2) {
  1696. // Create a WAL hole on sequence number by truncating the first WAL and
  1697. // verify the hole will be detected
  1698. Close();
  1699. ASSERT_OK(
  1700. test::TruncateFile(options.env, first_log_name, 0 /* new_length */));
  1701. } else if (i == 3) {
  1702. // Create a WAL hole on size difference by truncating the first WAL and
  1703. // mocking a correct sequence number to force triggering corruption based
  1704. // on size instead of sequence number and verify the hole will be detected
  1705. Close();
  1706. ASSERT_OK(
  1707. test::TruncateFile(options.env, first_log_name, 0 /* new_length */));
  1708. SyncPoint::GetInstance()->SetCallBack(
  1709. "DBImpl::UpdatePredecessorWALInfo", [&](void* arg) {
  1710. std::pair<uint64_t, SequenceNumber*>* pair =
  1711. static_cast<std::pair<uint64_t, SequenceNumber*>*>(arg);
  1712. if (pair->first == first_log_number) {
  1713. *(pair->second) = last_seqno_recorded_in_fist_wal;
  1714. }
  1715. });
  1716. SyncPoint::GetInstance()->EnableProcessing();
  1717. } else if (i == 4) {
  1718. // Delete all wals and verify opening a DB with no WAL will be detected
  1719. Close();
  1720. ASSERT_OK(options.env->DeleteFile(first_log_name));
  1721. ASSERT_OK(options.env->DeleteFile(second_log_name));
  1722. }
  1723. Status s = DB::Open(options, dbname_, &db_);
  1724. if (i == 0) {
  1725. ASSERT_OK(s);
  1726. ASSERT_EQ("new_value", Get("key1"));
  1727. continue;
  1728. } else if (i == 3) {
  1729. SyncPoint::GetInstance()->ClearAllCallBacks();
  1730. SyncPoint::GetInstance()->DisableProcessing();
  1731. } else if (i == 4) {
  1732. ASSERT_TRUE(s.IsCorruption());
  1733. ASSERT_TRUE(
  1734. s.ToString().find("Opening an existing DB with no WAL files") !=
  1735. std::string::npos);
  1736. Close();
  1737. continue;
  1738. }
  1739. if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  1740. ASSERT_OK(s);
  1741. ASSERT_EQ("NOT_FOUND", Get("key1"));
  1742. } else if (options.wal_recovery_mode ==
  1743. WALRecoveryMode::kAbsoluteConsistency ||
  1744. options.wal_recovery_mode ==
  1745. WALRecoveryMode::kTolerateCorruptedTailRecords) {
  1746. ASSERT_TRUE(s.IsCorruption());
  1747. std::string msg;
  1748. if (i == 1) {
  1749. msg = "Missing WAL";
  1750. } else if (i == 2) {
  1751. msg = "Mismatched last sequence number recorded in the WAL";
  1752. } else if (i == 3) {
  1753. msg = "Mismatched size of the WAL";
  1754. }
  1755. ASSERT_TRUE(s.ToString().find(msg) != std::string::npos);
  1756. } else {
  1757. ASSERT_OK(s);
  1758. ASSERT_EQ("new_value", Get("key1"));
  1759. }
  1760. Close();
  1761. }
  1762. }
  1763. class DBWALTestWithParams
  1764. : public DBWALTestBase,
  1765. public ::testing::WithParamInterface<
  1766. std::tuple<bool, int, int, CompressionType, bool>> {
  1767. public:
  1768. DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
  1769. };
  1770. INSTANTIATE_TEST_CASE_P(
  1771. Wal, DBWALTestWithParams,
  1772. ::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1),
  1773. ::testing::Range(RecoveryTestHelper::kWALFileOffset,
  1774. RecoveryTestHelper::kWALFileOffset +
  1775. RecoveryTestHelper::kWALFilesCount,
  1776. 1),
  1777. ::testing::Values(CompressionType::kNoCompression,
  1778. CompressionType::kZSTD),
  1779. ::testing::Bool()));
  1780. class DBWALTestWithParamsVaryingRecoveryMode
  1781. : public DBWALTestBase,
  1782. public ::testing::WithParamInterface<
  1783. std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
  1784. public:
  1785. DBWALTestWithParamsVaryingRecoveryMode()
  1786. : DBWALTestBase("/db_wal_test_with_params_mode") {}
  1787. };
  1788. INSTANTIATE_TEST_CASE_P(
  1789. Wal, DBWALTestWithParamsVaryingRecoveryMode,
  1790. ::testing::Combine(
  1791. ::testing::Bool(), ::testing::Range(0, 4, 1),
  1792. ::testing::Range(RecoveryTestHelper::kWALFileOffset,
  1793. RecoveryTestHelper::kWALFileOffset +
  1794. RecoveryTestHelper::kWALFilesCount,
  1795. 1),
  1796. ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
  1797. WALRecoveryMode::kAbsoluteConsistency,
  1798. WALRecoveryMode::kPointInTimeRecovery,
  1799. WALRecoveryMode::kSkipAnyCorruptedRecords),
  1800. ::testing::Values(CompressionType::kNoCompression,
  1801. CompressionType::kZSTD)));
  1802. // Test scope:
  1803. // - We expect to open the data store when there is incomplete trailing writes
  1804. // at the end of any of the logs
  1805. // - We do not expect to open the data store for corruption
  1806. TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) {
  1807. bool trunc = std::get<0>(GetParam()); // Corruption style
  1808. // Corruption offset position
  1809. int corrupt_offset = std::get<1>(GetParam());
  1810. int wal_file_id = std::get<2>(GetParam()); // WAL file
  1811. // Fill data for testing
  1812. Options options = CurrentOptions();
  1813. options.track_and_verify_wals = std::get<4>(GetParam());
  1814. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  1815. // test checksum failure or parsing
  1816. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
  1817. /*len%=*/.1, wal_file_id, trunc);
  1818. options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
  1819. if (trunc) {
  1820. options.create_if_missing = false;
  1821. ASSERT_OK(TryReopen(options));
  1822. const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  1823. ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0);
  1824. ASSERT_LT(recovered_row_count, row_count);
  1825. } else {
  1826. ASSERT_NOK(TryReopen(options));
  1827. }
  1828. }
  1829. // Test scope:
  1830. // We don't expect the data store to be opened if there is any corruption
  1831. // (leading, middle or trailing -- incomplete writes or corruption)
  1832. TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
  1833. // Verify clean slate behavior
  1834. Options options = CurrentOptions();
  1835. options.track_and_verify_wals = std::get<4>(GetParam());
  1836. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  1837. options.create_if_missing = false;
  1838. ASSERT_OK(TryReopen(options));
  1839. ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
  1840. bool trunc = std::get<0>(GetParam()); // Corruption style
  1841. // Corruption offset position
  1842. int corrupt_offset = std::get<1>(GetParam());
  1843. int wal_file_id = std::get<2>(GetParam()); // WAL file
  1844. // WAL compression type
  1845. CompressionType compression_type = std::get<3>(GetParam());
  1846. options.wal_compression = compression_type;
  1847. if (trunc && corrupt_offset == 0) {
  1848. return;
  1849. }
  1850. // fill with new date
  1851. RecoveryTestHelper::FillData(this, &options);
  1852. // corrupt the wal
  1853. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
  1854. /*len%=*/.1, wal_file_id, trunc);
  1855. // verify
  1856. options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
  1857. options.create_if_missing = false;
  1858. ASSERT_NOK(TryReopen(options));
  1859. }
  1860. // Test scope:
  1861. // We don't expect the data store to be opened if there is any inconsistency
  1862. // between WAL and SST files
  1863. TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
  1864. Options options = CurrentOptions();
  1865. options.avoid_flush_during_recovery = true;
  1866. // Create DB with multiple column families.
  1867. CreateAndReopenWithCF({"one", "two"}, options);
  1868. ASSERT_OK(Put(1, "key1", "val1"));
  1869. ASSERT_OK(Put(2, "key2", "val2"));
  1870. // Record the offset at this point
  1871. Env* env = options.env;
  1872. uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
  1873. std::string fname = LogFileName(dbname_, wal_file_id);
  1874. uint64_t offset_to_corrupt;
  1875. ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
  1876. ASSERT_GT(offset_to_corrupt, 0);
  1877. ASSERT_OK(Put(1, "key3", "val3"));
  1878. // Corrupt WAL at location of key3
  1879. ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt),
  1880. 4, false));
  1881. ASSERT_OK(Put(2, "key4", "val4"));
  1882. ASSERT_OK(Put(1, "key5", "val5"));
  1883. ASSERT_OK(Flush(2));
  1884. // PIT recovery & verify
  1885. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1886. ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
  1887. }
  1888. TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
  1889. Options options = CurrentOptions();
  1890. options.env = env_;
  1891. options.track_and_verify_wals_in_manifest = true;
  1892. // The following make sure there are two bg flush threads.
  1893. options.max_background_jobs = 8;
  1894. DestroyAndReopen(options);
  1895. const std::string cf1_name("cf1");
  1896. CreateAndReopenWithCF({cf1_name}, options);
  1897. assert(handles_.size() == 2);
  1898. {
  1899. dbfull()->TEST_LockMutex();
  1900. ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
  1901. dbfull()->TEST_UnlockMutex();
  1902. }
  1903. ASSERT_OK(dbfull()->PauseBackgroundWork());
  1904. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
  1905. ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
  1906. ASSERT_OK(dbfull()->TEST_FlushMemTable(
  1907. /*wait=*/false, /*allow_write_stall=*/true, handles_[1]));
  1908. ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
  1909. ASSERT_OK(dbfull()->TEST_FlushMemTable(
  1910. /*wait=*/false, /*allow_write_stall=*/true, handles_[0]));
  1911. bool called = false;
  1912. std::atomic<int> bg_flush_threads{0};
  1913. std::atomic<bool> wal_synced{false};
  1914. SyncPoint::GetInstance()->DisableProcessing();
  1915. SyncPoint::GetInstance()->ClearAllCallBacks();
  1916. SyncPoint::GetInstance()->SetCallBack(
  1917. "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
  1918. int cur = bg_flush_threads.load();
  1919. int desired = cur + 1;
  1920. if (cur > 0 ||
  1921. !bg_flush_threads.compare_exchange_strong(cur, desired)) {
  1922. while (!wal_synced.load()) {
  1923. // Wait until the other bg flush thread finishes committing WAL sync
  1924. // operation to the MANIFEST.
  1925. }
  1926. }
  1927. });
  1928. SyncPoint::GetInstance()->SetCallBack(
  1929. "DBImpl::FlushMemTableToOutputFile:CommitWal:1",
  1930. [&](void* /*arg*/) { wal_synced.store(true); });
  1931. // This callback will be called when the first bg flush thread reaches the
  1932. // point before entering the MANIFEST write queue after flushing the SST
  1933. // file.
  1934. // The purpose of the sync points here is to ensure both bg flush threads
  1935. // finish computing `min_wal_number_to_keep` before any of them updates the
  1936. // `log_number` for the column family that's being flushed.
  1937. SyncPoint::GetInstance()->SetCallBack(
  1938. "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
  1939. [&](void* /*arg*/) {
  1940. dbfull()->mutex()->AssertHeld();
  1941. if (!called) {
  1942. // We are the first bg flush thread in the MANIFEST write queue.
  1943. // We set up the dependency between sync points for two threads that
  1944. // will be executing the same code.
  1945. // For the interleaving of events, see
  1946. // https://github.com/facebook/rocksdb/pull/9715.
  1947. // bg flush thread1 will release the db mutex while in the MANIFEST
  1948. // write queue. In the meantime, bg flush thread2 locks db mutex and
  1949. // computes the min_wal_number_to_keep (before thread1 writes to
  1950. // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
  1951. // the MANIFEST write queue afterwards and bg flush thread1 proceeds
  1952. // with writing to MANIFEST.
  1953. called = true;
  1954. SyncPoint::GetInstance()->LoadDependency({
  1955. {"VersionSet::LogAndApply:WriteManifestStart",
  1956. "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
  1957. {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
  1958. "VersionSet::LogAndApply:WriteManifest"},
  1959. });
  1960. } else {
  1961. // The other bg flush thread has already been in the MANIFEST write
  1962. // queue, and we are after.
  1963. TEST_SYNC_POINT(
  1964. "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
  1965. }
  1966. });
  1967. SyncPoint::GetInstance()->EnableProcessing();
  1968. ASSERT_OK(dbfull()->ContinueBackgroundWork());
  1969. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
  1970. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
  1971. ASSERT_TRUE(called);
  1972. Close();
  1973. SyncPoint::GetInstance()->DisableProcessing();
  1974. SyncPoint::GetInstance()->ClearAllCallBacks();
  1975. DB* db1 = nullptr;
  1976. Status s = DB::OpenForReadOnly(options, dbname_, &db1);
  1977. ASSERT_OK(s);
  1978. assert(db1);
  1979. delete db1;
  1980. }
  1981. TEST_F(DBWALTest, FixSyncWalOnObseletedWalWithNewManifestCausingMissingWAL) {
  1982. Options options = CurrentOptions();
  1983. // Small size to force manifest creation
  1984. options.max_manifest_file_size = 1;
  1985. options.track_and_verify_wals_in_manifest = true;
  1986. DestroyAndReopen(options);
  1987. // Accumulate memtable m1 and create the 1st wal (i.e, 4.log)
  1988. ASSERT_OK(Put(Key(1), ""));
  1989. ASSERT_OK(Put(Key(2), ""));
  1990. ASSERT_OK(Put(Key(3), ""));
  1991. const std::string wal_file_path = db_->GetName() + "/000004.log";
  1992. // Coerce the following sequence of events:
  1993. // (1) Flush() marks 4.log to be obsoleted, 8.log to be the latest (i.e,
  1994. // active) log and release the lock
  1995. // (2) SyncWAL() proceeds with the lock. It
  1996. // creates a new manifest and syncs all the inactive wals before the latest
  1997. // (i.e, active log), which is 4.log. Note that SyncWAL() is not aware of the
  1998. // fact that 4.log has marked as to be obseleted. Such wal
  1999. // sync will then add a WAL addition record of 4.log to the new manifest
  2000. // without any special treatment. Prior to the fix, there is no WAL deletion
  2001. // record to offset it. (3) BackgroundFlush() will eventually purge 4.log.
  2002. bool wal_synced = false;
  2003. SyncPoint::GetInstance()->SetCallBack(
  2004. "FindObsoleteFiles::PostMutexUnlock", [&](void*) {
  2005. ASSERT_OK(env_->FileExists(wal_file_path));
  2006. uint64_t pre_sync_wal_manifest_no =
  2007. dbfull()->TEST_Current_Manifest_FileNo();
  2008. ASSERT_OK(db_->SyncWAL());
  2009. uint64_t post_sync_wal_manifest_no =
  2010. dbfull()->TEST_Current_Manifest_FileNo();
  2011. bool new_manifest_created =
  2012. post_sync_wal_manifest_no == pre_sync_wal_manifest_no + 1;
  2013. ASSERT_TRUE(new_manifest_created);
  2014. wal_synced = true;
  2015. });
  2016. SyncPoint::GetInstance()->EnableProcessing();
  2017. ASSERT_OK(Flush());
  2018. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  2019. ASSERT_TRUE(wal_synced);
  2020. // BackgroundFlush() purged 4.log
  2021. // because the memtable associated with the WAL was flushed and new WAL was
  2022. // created (i.e, 8.log)
  2023. ASSERT_TRUE(env_->FileExists(wal_file_path).IsNotFound());
  2024. SyncPoint::GetInstance()->ClearAllCallBacks();
  2025. SyncPoint::GetInstance()->DisableProcessing();
  2026. // To verify the corruption of "Missing WAL with log number: 4" under
  2027. // `options.track_and_verify_wals_in_manifest = true` is fixed.
  2028. //
  2029. // Before the fix, `db_->SyncWAL()` will sync and record WAL addtion of the
  2030. // obseleted WAL 4.log in a new manifest without any special treament.
  2031. // This will result in missing-wal corruption in DB::Reopen().
  2032. Status s = TryReopen(options);
  2033. EXPECT_OK(s);
  2034. }
  2035. // Test scope:
  2036. // - We expect to open data store under all circumstances
  2037. // - We expect only data upto the point where the first error was encountered
  2038. TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
  2039. const int maxkeys =
  2040. RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
  2041. bool trunc = std::get<0>(GetParam()); // Corruption style
  2042. // Corruption offset position
  2043. int corrupt_offset = std::get<1>(GetParam());
  2044. int wal_file_id = std::get<2>(GetParam()); // WAL file
  2045. // WAL compression type
  2046. CompressionType compression_type = std::get<3>(GetParam());
  2047. // Fill data for testing
  2048. Options options = CurrentOptions();
  2049. options.track_and_verify_wals = std::get<4>(GetParam());
  2050. options.wal_compression = compression_type;
  2051. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  2052. // Corrupt the wal
  2053. // The offset here was 0.3 which cuts off right at the end of a
  2054. // valid fragment after wal zstd compression checksum is enabled,
  2055. // so changed the value to 0.33.
  2056. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
  2057. /*len%=*/.1, wal_file_id, trunc);
  2058. // Verify
  2059. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  2060. options.create_if_missing = false;
  2061. ASSERT_OK(TryReopen(options));
  2062. // Probe data for invariants
  2063. size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  2064. ASSERT_LT(recovered_row_count, row_count);
  2065. // Verify a prefix of keys were recovered. But not in the case of full WAL
  2066. // truncation, because we have no way to know there was a corruption when
  2067. // truncation happened on record boundaries (preventing recovery holes in
  2068. // that case requires using `track_and_verify_wals_in_manifest`).
  2069. if (!trunc || corrupt_offset != 0) {
  2070. bool expect_data = true;
  2071. for (size_t k = 0; k < maxkeys; ++k) {
  2072. bool found = Get("key" + std::to_string(k)) != "NOT_FOUND";
  2073. if (expect_data && !found) {
  2074. expect_data = false;
  2075. }
  2076. ASSERT_EQ(found, expect_data);
  2077. }
  2078. }
  2079. const size_t min = RecoveryTestHelper::kKeysPerWALFile *
  2080. (wal_file_id - RecoveryTestHelper::kWALFileOffset);
  2081. ASSERT_GE(recovered_row_count, min);
  2082. if (!trunc && corrupt_offset != 0) {
  2083. const size_t max = RecoveryTestHelper::kKeysPerWALFile *
  2084. (wal_file_id - RecoveryTestHelper::kWALFileOffset + 1);
  2085. ASSERT_LE(recovered_row_count, max);
  2086. }
  2087. }
  2088. // Test scope:
  2089. // - We expect to open the data store under all scenarios
  2090. // - We expect to have recovered records past the corruption zone
  2091. TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
  2092. bool trunc = std::get<0>(GetParam()); // Corruption style
  2093. // Corruption offset position
  2094. int corrupt_offset = std::get<1>(GetParam());
  2095. int wal_file_id = std::get<2>(GetParam()); // WAL file
  2096. // WAL compression type
  2097. CompressionType compression_type = std::get<3>(GetParam());
  2098. // Fill data for testing
  2099. Options options = CurrentOptions();
  2100. options.track_and_verify_wals = std::get<4>(GetParam());
  2101. options.wal_compression = compression_type;
  2102. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  2103. // Corrupt the WAL
  2104. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
  2105. /*len%=*/.1, wal_file_id, trunc);
  2106. // Verify behavior
  2107. options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
  2108. options.create_if_missing = false;
  2109. ASSERT_OK(TryReopen(options));
  2110. // Probe data for invariants
  2111. size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  2112. ASSERT_LT(recovered_row_count, row_count);
  2113. if (!trunc) {
  2114. ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0);
  2115. }
  2116. }
  2117. TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
  2118. Options options = CurrentOptions();
  2119. options.disable_auto_compactions = true;
  2120. options.avoid_flush_during_recovery = false;
  2121. // Test with flush after recovery.
  2122. Reopen(options);
  2123. ASSERT_OK(Put("foo", "v1"));
  2124. ASSERT_OK(Put("bar", "v2"));
  2125. ASSERT_OK(Flush());
  2126. ASSERT_OK(Put("foo", "v3"));
  2127. ASSERT_OK(Put("bar", "v4"));
  2128. ASSERT_EQ(1, TotalTableFiles());
  2129. // Reopen DB. Check if WAL logs flushed.
  2130. Reopen(options);
  2131. ASSERT_EQ("v3", Get("foo"));
  2132. ASSERT_EQ("v4", Get("bar"));
  2133. ASSERT_EQ(2, TotalTableFiles());
  2134. // Test without flush after recovery.
  2135. options.avoid_flush_during_recovery = true;
  2136. DestroyAndReopen(options);
  2137. ASSERT_OK(Put("foo", "v5"));
  2138. ASSERT_OK(Put("bar", "v6"));
  2139. ASSERT_OK(Flush());
  2140. ASSERT_OK(Put("foo", "v7"));
  2141. ASSERT_OK(Put("bar", "v8"));
  2142. ASSERT_EQ(1, TotalTableFiles());
  2143. // Reopen DB. WAL logs should not be flushed this time.
  2144. Reopen(options);
  2145. ASSERT_EQ("v7", Get("foo"));
  2146. ASSERT_EQ("v8", Get("bar"));
  2147. ASSERT_EQ(1, TotalTableFiles());
  2148. // Force flush with allow_2pc.
  2149. options.avoid_flush_during_recovery = true;
  2150. options.allow_2pc = true;
  2151. ASSERT_OK(Put("foo", "v9"));
  2152. ASSERT_OK(Put("bar", "v10"));
  2153. ASSERT_OK(Flush());
  2154. ASSERT_OK(Put("foo", "v11"));
  2155. ASSERT_OK(Put("bar", "v12"));
  2156. Reopen(options);
  2157. ASSERT_EQ("v11", Get("foo"));
  2158. ASSERT_EQ("v12", Get("bar"));
  2159. ASSERT_EQ(3, TotalTableFiles());
  2160. }
  2161. TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
  2162. // Verifies WAL files that were present during recovery, but not flushed due
  2163. // to avoid_flush_during_recovery, will be considered for deletion at a later
  2164. // stage. We check at least one such file is deleted during Flush().
  2165. Options options = CurrentOptions();
  2166. options.disable_auto_compactions = true;
  2167. options.avoid_flush_during_recovery = true;
  2168. Reopen(options);
  2169. ASSERT_OK(Put("foo", "v1"));
  2170. Reopen(options);
  2171. for (int i = 0; i < 2; ++i) {
  2172. if (i > 0) {
  2173. // Flush() triggers deletion of obsolete tracked files
  2174. ASSERT_OK(Flush());
  2175. }
  2176. VectorLogPtr log_files;
  2177. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  2178. if (i == 0) {
  2179. ASSERT_GT(log_files.size(), 0);
  2180. } else {
  2181. ASSERT_EQ(0, log_files.size());
  2182. }
  2183. }
  2184. }
  2185. TEST_F(DBWALTest, RecoverWithoutFlush) {
  2186. Options options = CurrentOptions();
  2187. options.avoid_flush_during_recovery = true;
  2188. options.create_if_missing = false;
  2189. options.disable_auto_compactions = true;
  2190. options.write_buffer_size = 64 * 1024 * 1024;
  2191. size_t count = RecoveryTestHelper::FillData(this, &options);
  2192. auto validateData = [this, count]() {
  2193. for (size_t i = 0; i < count; i++) {
  2194. ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND");
  2195. }
  2196. };
  2197. Reopen(options);
  2198. validateData();
  2199. // Insert some data without flush
  2200. ASSERT_OK(Put("foo", "foo_v1"));
  2201. ASSERT_OK(Put("bar", "bar_v1"));
  2202. Reopen(options);
  2203. validateData();
  2204. ASSERT_EQ(Get("foo"), "foo_v1");
  2205. ASSERT_EQ(Get("bar"), "bar_v1");
  2206. // Insert again and reopen
  2207. ASSERT_OK(Put("foo", "foo_v2"));
  2208. ASSERT_OK(Put("bar", "bar_v2"));
  2209. Reopen(options);
  2210. validateData();
  2211. ASSERT_EQ(Get("foo"), "foo_v2");
  2212. ASSERT_EQ(Get("bar"), "bar_v2");
  2213. // manual flush and insert again
  2214. ASSERT_OK(Flush());
  2215. ASSERT_EQ(Get("foo"), "foo_v2");
  2216. ASSERT_EQ(Get("bar"), "bar_v2");
  2217. ASSERT_OK(Put("foo", "foo_v3"));
  2218. ASSERT_OK(Put("bar", "bar_v3"));
  2219. Reopen(options);
  2220. validateData();
  2221. ASSERT_EQ(Get("foo"), "foo_v3");
  2222. ASSERT_EQ(Get("bar"), "bar_v3");
  2223. }
  2224. TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
  2225. const std::string kSmallValue = "v";
  2226. const std::string kLargeValue = DummyString(1024);
  2227. Options options = CurrentOptions();
  2228. options.avoid_flush_during_recovery = true;
  2229. options.create_if_missing = false;
  2230. options.disable_auto_compactions = true;
  2231. auto countWalFiles = [this]() {
  2232. VectorLogPtr log_files;
  2233. if (!dbfull()->GetSortedWalFiles(log_files).ok()) {
  2234. return size_t{0};
  2235. }
  2236. return log_files.size();
  2237. };
  2238. // Create DB with multiple column families and multiple log files.
  2239. CreateAndReopenWithCF({"one", "two"}, options);
  2240. ASSERT_OK(Put(0, "key1", kSmallValue));
  2241. ASSERT_OK(Put(1, "key2", kLargeValue));
  2242. ASSERT_OK(Flush(1));
  2243. ASSERT_EQ(1, countWalFiles());
  2244. ASSERT_OK(Put(0, "key3", kSmallValue));
  2245. ASSERT_OK(Put(2, "key4", kLargeValue));
  2246. ASSERT_OK(Flush(2));
  2247. ASSERT_EQ(2, countWalFiles());
  2248. // Reopen, insert and flush.
  2249. options.db_write_buffer_size = 64 * 1024 * 1024;
  2250. ReopenWithColumnFamilies({"default", "one", "two"}, options);
  2251. ASSERT_EQ(Get(0, "key1"), kSmallValue);
  2252. ASSERT_EQ(Get(1, "key2"), kLargeValue);
  2253. ASSERT_EQ(Get(0, "key3"), kSmallValue);
  2254. ASSERT_EQ(Get(2, "key4"), kLargeValue);
  2255. // Insert more data.
  2256. ASSERT_OK(Put(0, "key5", kLargeValue));
  2257. ASSERT_OK(Put(1, "key6", kLargeValue));
  2258. ASSERT_EQ(3, countWalFiles());
  2259. ASSERT_OK(Flush(1));
  2260. ASSERT_OK(Put(2, "key7", kLargeValue));
  2261. ASSERT_OK(dbfull()->FlushWAL(false));
  2262. ASSERT_EQ(4, countWalFiles());
  2263. // Reopen twice and validate.
  2264. for (int i = 0; i < 2; i++) {
  2265. ReopenWithColumnFamilies({"default", "one", "two"}, options);
  2266. ASSERT_EQ(Get(0, "key1"), kSmallValue);
  2267. ASSERT_EQ(Get(1, "key2"), kLargeValue);
  2268. ASSERT_EQ(Get(0, "key3"), kSmallValue);
  2269. ASSERT_EQ(Get(2, "key4"), kLargeValue);
  2270. ASSERT_EQ(Get(0, "key5"), kLargeValue);
  2271. ASSERT_EQ(Get(1, "key6"), kLargeValue);
  2272. ASSERT_EQ(Get(2, "key7"), kLargeValue);
  2273. ASSERT_EQ(4, countWalFiles());
  2274. }
  2275. }
  2276. // In this test we are trying to do the following:
  2277. // 1. Create a DB with corrupted WAL log;
  2278. // 2. Open with avoid_flush_during_recovery = true;
  2279. // 3. Append more data without flushing, which creates new WAL log.
  2280. // 4. Open again. See if it can correctly handle previous corruption.
  2281. TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
  2282. RecoverFromCorruptedWALWithoutFlush) {
  2283. const int kAppendKeys = 100;
  2284. Options options = CurrentOptions();
  2285. options.avoid_flush_during_recovery = true;
  2286. options.create_if_missing = false;
  2287. options.disable_auto_compactions = true;
  2288. options.write_buffer_size = 64 * 1024 * 1024;
  2289. auto getAll = [this]() {
  2290. std::vector<std::pair<std::string, std::string>> data;
  2291. ReadOptions ropt;
  2292. Iterator* iter = dbfull()->NewIterator(ropt);
  2293. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  2294. data.emplace_back(iter->key().ToString(), iter->value().ToString());
  2295. }
  2296. EXPECT_OK(iter->status());
  2297. delete iter;
  2298. return data;
  2299. };
  2300. bool trunc = std::get<0>(GetParam()); // Corruption style
  2301. // Corruption offset position
  2302. int corrupt_offset = std::get<1>(GetParam());
  2303. int wal_file_id = std::get<2>(GetParam()); // WAL file
  2304. WALRecoveryMode recovery_mode = std::get<3>(GetParam());
  2305. // WAL compression type
  2306. CompressionType compression_type = std::get<4>(GetParam());
  2307. options.wal_recovery_mode = recovery_mode;
  2308. options.wal_compression = compression_type;
  2309. // Create corrupted WAL
  2310. RecoveryTestHelper::FillData(this, &options);
  2311. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
  2312. /*len%=*/.1, wal_file_id, trunc);
  2313. // Skip the test if DB won't open.
  2314. if (!TryReopen(options).ok()) {
  2315. ASSERT_TRUE(options.wal_recovery_mode ==
  2316. WALRecoveryMode::kAbsoluteConsistency ||
  2317. (!trunc && options.wal_recovery_mode ==
  2318. WALRecoveryMode::kTolerateCorruptedTailRecords));
  2319. return;
  2320. }
  2321. ASSERT_OK(TryReopen(options));
  2322. // Append some more data.
  2323. for (int k = 0; k < kAppendKeys; k++) {
  2324. std::string key = "extra_key" + std::to_string(k);
  2325. std::string value = DummyString(RecoveryTestHelper::kValueSize);
  2326. ASSERT_OK(Put(key, value));
  2327. }
  2328. // Save data for comparison.
  2329. auto data = getAll();
  2330. // Reopen. Verify data.
  2331. ASSERT_OK(TryReopen(options));
  2332. auto actual_data = getAll();
  2333. ASSERT_EQ(data, actual_data);
  2334. }
  2335. // Tests that total log size is recovered if we set
  2336. // avoid_flush_during_recovery=true.
  2337. // Flush should trigger if max_total_wal_size is reached.
  2338. TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
  2339. auto test_listener = std::make_shared<FlushCounterListener>();
  2340. test_listener->expected_flush_reason = FlushReason::kWalFull;
  2341. constexpr size_t kKB = 1024;
  2342. constexpr size_t kMB = 1024 * 1024;
  2343. Options options = CurrentOptions();
  2344. options.avoid_flush_during_recovery = true;
  2345. options.max_total_wal_size = 1 * kMB;
  2346. options.listeners.push_back(test_listener);
  2347. // Have to open DB in multi-CF mode to trigger flush when
  2348. // max_total_wal_size is reached.
  2349. CreateAndReopenWithCF({"one"}, options);
  2350. // Write some keys and we will end up with one log file which is slightly
  2351. // smaller than 1MB.
  2352. std::string value_100k(100 * kKB, 'v');
  2353. std::string value_300k(300 * kKB, 'v');
  2354. ASSERT_OK(Put(0, "foo", "v1"));
  2355. for (int i = 0; i < 9; i++) {
  2356. ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k));
  2357. }
  2358. // Get log files before reopen.
  2359. VectorLogPtr log_files_before;
  2360. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  2361. ASSERT_EQ(1, log_files_before.size());
  2362. uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
  2363. ASSERT_GT(log_size_before, 900 * kKB);
  2364. ASSERT_LT(log_size_before, 1 * kMB);
  2365. ReopenWithColumnFamilies({"default", "one"}, options);
  2366. // Write one more value to make log larger than 1MB.
  2367. ASSERT_OK(Put(1, "bar", value_300k));
  2368. // Get log files again. A new log file will be opened.
  2369. VectorLogPtr log_files_after_reopen;
  2370. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
  2371. ASSERT_EQ(2, log_files_after_reopen.size());
  2372. ASSERT_EQ(log_files_before[0]->LogNumber(),
  2373. log_files_after_reopen[0]->LogNumber());
  2374. ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
  2375. log_files_after_reopen[1]->SizeFileBytes(),
  2376. 1 * kMB);
  2377. // Write one more key to trigger flush.
  2378. ASSERT_OK(Put(0, "foo", "v2"));
  2379. for (auto* h : handles_) {
  2380. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h));
  2381. }
  2382. // Flushed two column families.
  2383. ASSERT_EQ(2, test_listener->count.load());
  2384. }
  2385. #if defined(ROCKSDB_PLATFORM_POSIX)
  2386. #if defined(ROCKSDB_FALLOCATE_PRESENT)
  2387. // Tests that we will truncate the preallocated space of the last log from
  2388. // previous.
  2389. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
  2390. constexpr size_t kKB = 1024;
  2391. Options options = CurrentOptions();
  2392. options.env = env_;
  2393. options.avoid_flush_during_recovery = true;
  2394. if (mem_env_) {
  2395. ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
  2396. return;
  2397. }
  2398. if (!IsFallocateSupported()) {
  2399. return;
  2400. }
  2401. DestroyAndReopen(options);
  2402. size_t preallocated_size =
  2403. dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
  2404. ASSERT_OK(Put("foo", "v1"));
  2405. VectorLogPtr log_files_before;
  2406. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  2407. ASSERT_EQ(1, log_files_before.size());
  2408. auto& file_before = log_files_before[0];
  2409. ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
  2410. // The log file has preallocated space.
  2411. ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  2412. preallocated_size);
  2413. Reopen(options);
  2414. VectorLogPtr log_files_after;
  2415. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
  2416. ASSERT_EQ(1, log_files_after.size());
  2417. ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
  2418. // The preallocated space should be truncated.
  2419. ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  2420. preallocated_size);
  2421. }
  2422. // Tests that we will truncate the preallocated space of the last log from
  2423. // previous.
  2424. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) {
  2425. constexpr size_t kKB = 1024;
  2426. Options options = CurrentOptions();
  2427. options.env = env_;
  2428. options.avoid_flush_during_recovery = false;
  2429. options.avoid_flush_during_shutdown = true;
  2430. if (mem_env_) {
  2431. ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
  2432. return;
  2433. }
  2434. if (!IsFallocateSupported()) {
  2435. return;
  2436. }
  2437. DestroyAndReopen(options);
  2438. size_t preallocated_size =
  2439. dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
  2440. ASSERT_OK(Put("foo", "v1"));
  2441. VectorLogPtr log_files_before;
  2442. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  2443. ASSERT_EQ(1, log_files_before.size());
  2444. auto& file_before = log_files_before[0];
  2445. ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
  2446. ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  2447. preallocated_size);
  2448. // The log file has preallocated space.
  2449. Close();
  2450. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2451. {{"DBImpl::PurgeObsoleteFiles:Begin",
  2452. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
  2453. {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
  2454. "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
  2455. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2456. port::Thread reopen_thread([&]() { Reopen(options); });
  2457. TEST_SYNC_POINT(
  2458. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
  2459. // After the flush during Open, the log file should get deleted. However,
  2460. // if the process is in a crash loop, the log file may not get
  2461. // deleted and thte preallocated space will keep accumulating. So we need
  2462. // to ensure it gets trtuncated.
  2463. EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  2464. preallocated_size);
  2465. TEST_SYNC_POINT(
  2466. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
  2467. reopen_thread.join();
  2468. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2469. }
  2470. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) {
  2471. Options options = CurrentOptions();
  2472. options.env = env_;
  2473. options.avoid_flush_during_recovery = false;
  2474. if (mem_env_ || encrypted_env_) {
  2475. ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment");
  2476. return;
  2477. }
  2478. if (!IsFallocateSupported()) {
  2479. return;
  2480. }
  2481. DestroyAndReopen(options);
  2482. size_t preallocated_size =
  2483. dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
  2484. Close();
  2485. std::vector<std::string> filenames;
  2486. std::string last_log;
  2487. uint64_t last_log_num = 0;
  2488. ASSERT_OK(env_->GetChildren(dbname_, &filenames));
  2489. for (const auto& fname : filenames) {
  2490. uint64_t number;
  2491. FileType type;
  2492. if (ParseFileName(fname, &number, &type, nullptr)) {
  2493. if (type == kWalFile && number > last_log_num) {
  2494. last_log = fname;
  2495. }
  2496. }
  2497. }
  2498. ASSERT_NE(last_log, "");
  2499. last_log = dbname_ + '/' + last_log;
  2500. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2501. {{"DBImpl::PurgeObsoleteFiles:Begin",
  2502. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
  2503. {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
  2504. "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
  2505. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2506. "PosixWritableFile::Close",
  2507. [](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
  2508. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2509. // Preallocate space for the empty log file. This could happen if WAL data
  2510. // was buffered in memory and the process crashed.
  2511. std::unique_ptr<WritableFile> log_file;
  2512. ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions()));
  2513. log_file->SetPreallocationBlockSize(preallocated_size);
  2514. log_file->PrepareWrite(0, 4096);
  2515. log_file.reset();
  2516. ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size);
  2517. port::Thread reopen_thread([&]() { Reopen(options); });
  2518. TEST_SYNC_POINT(
  2519. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
  2520. // The preallocated space should be truncated.
  2521. EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size);
  2522. TEST_SYNC_POINT(
  2523. "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
  2524. reopen_thread.join();
  2525. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2526. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  2527. }
  2528. TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) {
  2529. constexpr size_t kKB = 1024;
  2530. Options options = CurrentOptions();
  2531. options.env = env_;
  2532. options.avoid_flush_during_recovery = true;
  2533. if (mem_env_) {
  2534. ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
  2535. return;
  2536. }
  2537. if (!IsFallocateSupported()) {
  2538. return;
  2539. }
  2540. // create DB and close with file truncate disabled
  2541. std::atomic_bool enable_truncate{false};
  2542. SyncPoint::GetInstance()->SetCallBack("PosixWritableFile::Close",
  2543. [&](void* arg) {
  2544. if (!enable_truncate) {
  2545. *(static_cast<size_t*>(arg)) = 0;
  2546. }
  2547. });
  2548. SyncPoint::GetInstance()->EnableProcessing();
  2549. DestroyAndReopen(options);
  2550. size_t preallocated_size =
  2551. dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
  2552. ASSERT_OK(Put("foo", "v1"));
  2553. VectorLogPtr log_files_before;
  2554. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  2555. ASSERT_EQ(1, log_files_before.size());
  2556. auto& file_before = log_files_before[0];
  2557. ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
  2558. // The log file has preallocated space.
  2559. auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName());
  2560. ASSERT_GE(db_size, preallocated_size);
  2561. Close();
  2562. // enable truncate and open DB as readonly, the file should not be truncated
  2563. // and DB size is not changed.
  2564. enable_truncate = true;
  2565. ASSERT_OK(ReadOnlyReopen(options));
  2566. VectorLogPtr log_files_after;
  2567. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
  2568. ASSERT_EQ(1, log_files_after.size());
  2569. ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
  2570. ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName());
  2571. // The preallocated space should NOT be truncated.
  2572. // the DB size is almost the same.
  2573. ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size,
  2574. db_size / 100);
  2575. SyncPoint::GetInstance()->DisableProcessing();
  2576. SyncPoint::GetInstance()->ClearAllCallBacks();
  2577. }
  2578. #endif // ROCKSDB_FALLOCATE_PRESENT
  2579. #endif // ROCKSDB_PLATFORM_POSIX
  2580. TEST_F(DBWALTest, WalInManifestButNotInSortedWals) {
  2581. Options options = CurrentOptions();
  2582. options.track_and_verify_wals_in_manifest = true;
  2583. options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
  2584. // Build a way to make wal files selectively go missing
  2585. bool wals_go_missing = false;
  2586. struct MissingWalFs : public FileSystemWrapper {
  2587. MissingWalFs(const std::shared_ptr<FileSystem>& t,
  2588. bool* _wals_go_missing_flag)
  2589. : FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {}
  2590. bool* wals_go_missing_flag;
  2591. IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
  2592. std::vector<std::string>* r,
  2593. IODebugContext* dbg) override {
  2594. IOStatus s = target_->GetChildren(dir, io_opts, r, dbg);
  2595. if (s.ok() && *wals_go_missing_flag) {
  2596. for (size_t i = 0; i < r->size();) {
  2597. if (EndsWith(r->at(i), ".log")) {
  2598. r->erase(r->begin() + i);
  2599. } else {
  2600. ++i;
  2601. }
  2602. }
  2603. }
  2604. return s;
  2605. }
  2606. const char* Name() const override { return "MissingWalFs"; }
  2607. };
  2608. auto my_fs =
  2609. std::make_shared<MissingWalFs>(env_->GetFileSystem(), &wals_go_missing);
  2610. std::unique_ptr<Env> my_env(NewCompositeEnv(my_fs));
  2611. options.env = my_env.get();
  2612. CreateAndReopenWithCF({"blah"}, options);
  2613. // Currently necessary to get a WAL tracked in manifest; see
  2614. // https://github.com/facebook/rocksdb/issues/10080
  2615. ASSERT_OK(Put(0, "x", "y"));
  2616. ASSERT_OK(db_->SyncWAL());
  2617. ASSERT_OK(Put(1, "x", "y"));
  2618. ASSERT_OK(db_->SyncWAL());
  2619. ASSERT_OK(Flush(1));
  2620. ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
  2621. std::vector<std::unique_ptr<LogFile>> wals;
  2622. ASSERT_OK(db_->GetSortedWalFiles(wals));
  2623. wals_go_missing = true;
  2624. ASSERT_NOK(db_->GetSortedWalFiles(wals));
  2625. wals_go_missing = false;
  2626. Close();
  2627. }
  2628. TEST_F(DBWALTest, WalTermTest) {
  2629. Options options = CurrentOptions();
  2630. options.env = env_;
  2631. CreateAndReopenWithCF({"pikachu"}, options);
  2632. ASSERT_OK(Put(1, "foo", "bar"));
  2633. WriteOptions wo;
  2634. wo.sync = true;
  2635. wo.disableWAL = false;
  2636. WriteBatch batch;
  2637. ASSERT_OK(batch.Put("foo", "bar"));
  2638. batch.MarkWalTerminationPoint();
  2639. ASSERT_OK(batch.Put("foo2", "bar2"));
  2640. ASSERT_OK(dbfull()->Write(wo, &batch));
  2641. // make sure we can re-open it.
  2642. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
  2643. ASSERT_EQ("bar", Get(1, "foo"));
  2644. ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
  2645. }
  2646. TEST_F(DBWALTest, GetCompressedWalsAfterSync) {
  2647. if (db_->GetOptions().wal_compression == kNoCompression) {
  2648. ROCKSDB_GTEST_BYPASS("stream compression not present");
  2649. return;
  2650. }
  2651. Options options = GetDefaultOptions();
  2652. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  2653. options.create_if_missing = true;
  2654. options.env = env_;
  2655. options.avoid_flush_during_recovery = true;
  2656. options.track_and_verify_wals_in_manifest = true;
  2657. // Enable WAL compression so that the newly-created WAL will be non-empty
  2658. // after DB open, even if point-in-time WAL recovery encounters no
  2659. // corruption.
  2660. options.wal_compression = kZSTD;
  2661. DestroyAndReopen(options);
  2662. // Write something to memtable and WAL so that wal_empty_ will be false after
  2663. // next DB::Open().
  2664. ASSERT_OK(Put("a", "v"));
  2665. Reopen(options);
  2666. // New WAL is created, thanks to !wal_empty_.
  2667. ASSERT_OK(dbfull()->TEST_SwitchWAL());
  2668. ASSERT_OK(Put("b", "v"));
  2669. ASSERT_OK(db_->SyncWAL());
  2670. VectorLogPtr wals;
  2671. Status s = dbfull()->GetSortedWalFiles(wals);
  2672. ASSERT_OK(s);
  2673. }
  2674. TEST_F(DBWALTest, EmptyWalReopenTest) {
  2675. Options options = CurrentOptions();
  2676. options.env = env_;
  2677. CreateAndReopenWithCF({"pikachu"}, options);
  2678. // make sure we can re-open it.
  2679. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
  2680. {
  2681. std::vector<std::string> files;
  2682. int num_wal_files = 0;
  2683. ASSERT_OK(env_->GetChildren(dbname_, &files));
  2684. for (const auto& file : files) {
  2685. uint64_t number = 0;
  2686. FileType type = kWalFile;
  2687. if (ParseFileName(file, &number, &type) && type == kWalFile) {
  2688. num_wal_files++;
  2689. }
  2690. }
  2691. ASSERT_EQ(num_wal_files, 1);
  2692. }
  2693. }
  2694. TEST_F(DBWALTest, RecoveryFlushSwitchWALOnEmptyMemtable) {
  2695. Options options = CurrentOptions();
  2696. auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
  2697. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  2698. options.env = fault_fs_env.get();
  2699. options.avoid_flush_during_shutdown = true;
  2700. DestroyAndReopen(options);
  2701. // Make sure the memtable switch in recovery flush happened after test checks
  2702. // the memtable is empty.
  2703. SyncPoint::GetInstance()->LoadDependency(
  2704. {{"DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
  2705. "AfterCheckMemtableEmpty",
  2706. "RecoverFromRetryableBGIOError:BeforeStart"}});
  2707. SyncPoint::GetInstance()->EnableProcessing();
  2708. fault_fs->SetThreadLocalErrorContext(
  2709. FaultInjectionIOType::kMetadataWrite, 7 /* seed*/, 1 /* one_in */,
  2710. true /* retryable */, false /* has_data_loss*/);
  2711. fault_fs->EnableThreadLocalErrorInjection(
  2712. FaultInjectionIOType::kMetadataWrite);
  2713. WriteOptions wo;
  2714. wo.sync = true;
  2715. Status s = Put("k", "old_v", wo);
  2716. ASSERT_TRUE(s.IsIOError());
  2717. // To verify the key is not in memtable nor SST
  2718. ASSERT_TRUE(static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
  2719. ->cfd()
  2720. ->mem()
  2721. ->IsEmpty());
  2722. ASSERT_EQ("NOT_FOUND", Get("k"));
  2723. TEST_SYNC_POINT(
  2724. "DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
  2725. "AfterCheckMemtableEmpty");
  2726. SyncPoint::GetInstance()->DisableProcessing();
  2727. fault_fs->DisableThreadLocalErrorInjection(
  2728. FaultInjectionIOType::kMetadataWrite);
  2729. // Keep trying write until recovery of the previous IO error finishes
  2730. while (!s.ok()) {
  2731. options.env->SleepForMicroseconds(1000);
  2732. s = Put("k", "new_v");
  2733. }
  2734. // If recovery flush didn't switch WAL, we will end up having two duplicate
  2735. // WAL entries with same seqno and same key that violate assertion during WAL
  2736. // recovery and fail DB reopen
  2737. options.avoid_flush_during_recovery = false;
  2738. Reopen(options);
  2739. ASSERT_EQ("new_v", Get("k"));
  2740. Destroy(options);
  2741. }
  2742. TEST_F(DBWALTest, WALWriteErrorNoRecovery) {
  2743. Options options = CurrentOptions();
  2744. auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
  2745. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  2746. options.env = fault_fs_env.get();
  2747. options.manual_wal_flush = true;
  2748. DestroyAndReopen(options);
  2749. fault_fs->SetThreadLocalErrorContext(
  2750. FaultInjectionIOType::kWrite, 7 /* seed*/, 1 /* one_in */,
  2751. true /* retryable */, false /* has_data_loss*/);
  2752. fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
  2753. ASSERT_OK(Put("k", "v"));
  2754. Status s;
  2755. s = db_->FlushWAL(false);
  2756. ASSERT_TRUE(s.IsIOError());
  2757. s = dbfull()->TEST_GetBGError();
  2758. ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
  2759. ASSERT_FALSE(dbfull()->TEST_IsRecoveryInProgress());
  2760. fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
  2761. Destroy(options);
  2762. }
  2763. } // namespace ROCKSDB_NAMESPACE
  2764. int main(int argc, char** argv) {
  2765. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2766. ::testing::InitGoogleTest(&argc, argv);
  2767. return RUN_ALL_TESTS();
  2768. }