db_kv_checksum_test.cc 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. // Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_index.h"
  6. #include "db/db_test_util.h"
  7. #include "rocksdb/rocksdb_namespace.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. enum class WriteBatchOpType {
  10. kPut = 0,
  11. kDelete,
  12. kSingleDelete,
  13. kMerge,
  14. kPutEntity,
  15. kDeleteRange,
  16. kNum,
  17. };
  18. // Integer addition is needed for `::testing::Range()` to take the enum type.
  19. WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) {
  20. using T = std::underlying_type<WriteBatchOpType>::type;
  21. return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs);
  22. }
  23. enum class WriteMode {
  24. // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key = 0`
  25. // and `WriteOptions::protection_bytes_per_key = 0`
  26. kWriteUnprotectedBatch = 0,
  27. // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`.
  28. kWriteProtectedBatch,
  29. // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`.
  30. // Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`.
  31. kWriteOptionProtectedBatch,
  32. // TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`.
  33. kNum,
  34. };
  35. // Integer addition is needed for `::testing::Range()` to take the enum type.
  36. WriteMode operator+(WriteMode lhs, const int rhs) {
  37. using T = std::underlying_type<WriteMode>::type;
  38. return static_cast<WriteMode>(static_cast<T>(lhs) + rhs);
  39. }
  40. std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
  41. size_t protection_bytes_per_key,
  42. WriteBatchOpType op_type) {
  43. Status s;
  44. WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
  45. protection_bytes_per_key, 0 /* default_cf_ts_sz */);
  46. switch (op_type) {
  47. case WriteBatchOpType::kPut:
  48. s = wb.Put(cf_handle, "key", "val");
  49. break;
  50. case WriteBatchOpType::kDelete:
  51. s = wb.Delete(cf_handle, "key");
  52. break;
  53. case WriteBatchOpType::kSingleDelete:
  54. s = wb.SingleDelete(cf_handle, "key");
  55. break;
  56. case WriteBatchOpType::kDeleteRange:
  57. s = wb.DeleteRange(cf_handle, "begin", "end");
  58. break;
  59. case WriteBatchOpType::kMerge:
  60. s = wb.Merge(cf_handle, "key", "val");
  61. break;
  62. case WriteBatchOpType::kPutEntity:
  63. s = wb.PutEntity(cf_handle, "key",
  64. {{"attr_name1", "foo"}, {"attr_name2", "bar"}});
  65. break;
  66. case WriteBatchOpType::kNum:
  67. assert(false);
  68. }
  69. return {std::move(wb), std::move(s)};
  70. }
  71. class DbKvChecksumTestBase : public DBTestBase {
  72. public:
  73. DbKvChecksumTestBase(const std::string& path, bool env_do_fsync)
  74. : DBTestBase(path, env_do_fsync) {}
  75. ColumnFamilyHandle* GetCFHandleToUse(ColumnFamilyHandle* column_family,
  76. WriteBatchOpType op_type) const {
  77. // Note: PutEntity cannot be called without column family
  78. if (op_type == WriteBatchOpType::kPutEntity && !column_family) {
  79. return db_->DefaultColumnFamily();
  80. }
  81. return column_family;
  82. }
  83. };
  84. class DbKvChecksumTest
  85. : public DbKvChecksumTestBase,
  86. public ::testing::WithParamInterface<
  87. std::tuple<WriteBatchOpType, char, WriteMode,
  88. uint32_t /* memtable_protection_bytes_per_key */>> {
  89. public:
  90. DbKvChecksumTest()
  91. : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
  92. op_type_ = std::get<0>(GetParam());
  93. corrupt_byte_addend_ = std::get<1>(GetParam());
  94. write_mode_ = std::get<2>(GetParam());
  95. memtable_protection_bytes_per_key_ = std::get<3>(GetParam());
  96. }
  97. Status ExecuteWrite(ColumnFamilyHandle* cf_handle) {
  98. switch (write_mode_) {
  99. case WriteMode::kWriteUnprotectedBatch: {
  100. auto batch_and_status =
  101. GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
  102. 0 /* protection_bytes_per_key */, op_type_);
  103. assert(batch_and_status.second.ok());
  104. // Default write option has protection_bytes_per_key = 0
  105. return db_->Write(WriteOptions(), &batch_and_status.first);
  106. }
  107. case WriteMode::kWriteProtectedBatch: {
  108. auto batch_and_status =
  109. GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
  110. 8 /* protection_bytes_per_key */, op_type_);
  111. assert(batch_and_status.second.ok());
  112. return db_->Write(WriteOptions(), &batch_and_status.first);
  113. }
  114. case WriteMode::kWriteOptionProtectedBatch: {
  115. auto batch_and_status =
  116. GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
  117. 0 /* protection_bytes_per_key */, op_type_);
  118. assert(batch_and_status.second.ok());
  119. WriteOptions write_opts;
  120. write_opts.protection_bytes_per_key = 8;
  121. return db_->Write(write_opts, &batch_and_status.first);
  122. }
  123. case WriteMode::kNum:
  124. assert(false);
  125. }
  126. return Status::NotSupported("WriteMode " +
  127. std::to_string(static_cast<int>(write_mode_)));
  128. }
  129. void CorruptNextByteCallBack(void* arg) {
  130. Slice encoded = *static_cast<Slice*>(arg);
  131. if (entry_len_ == std::numeric_limits<size_t>::max()) {
  132. // We learn the entry size on the first attempt
  133. entry_len_ = encoded.size();
  134. }
  135. char* buf = const_cast<char*>(encoded.data());
  136. buf[corrupt_byte_offset_] += corrupt_byte_addend_;
  137. ++corrupt_byte_offset_;
  138. }
  139. bool MoreBytesToCorrupt() { return corrupt_byte_offset_ < entry_len_; }
  140. protected:
  141. WriteBatchOpType op_type_;
  142. char corrupt_byte_addend_;
  143. WriteMode write_mode_;
  144. uint32_t memtable_protection_bytes_per_key_;
  145. size_t corrupt_byte_offset_ = 0;
  146. size_t entry_len_ = std::numeric_limits<size_t>::max();
  147. };
  148. std::string GetOpTypeString(const WriteBatchOpType& op_type) {
  149. switch (op_type) {
  150. case WriteBatchOpType::kPut:
  151. return "Put";
  152. case WriteBatchOpType::kDelete:
  153. return "Delete";
  154. case WriteBatchOpType::kSingleDelete:
  155. return "SingleDelete";
  156. case WriteBatchOpType::kDeleteRange:
  157. return "DeleteRange";
  158. case WriteBatchOpType::kMerge:
  159. return "Merge";
  160. case WriteBatchOpType::kPutEntity:
  161. return "PutEntity";
  162. case WriteBatchOpType::kNum:
  163. assert(false);
  164. }
  165. assert(false);
  166. return "";
  167. }
  168. std::string GetWriteModeString(const WriteMode& mode) {
  169. switch (mode) {
  170. case WriteMode::kWriteUnprotectedBatch:
  171. return "WriteUnprotectedBatch";
  172. case WriteMode::kWriteProtectedBatch:
  173. return "WriteProtectedBatch";
  174. case WriteMode::kWriteOptionProtectedBatch:
  175. return "kWriteOptionProtectedBatch";
  176. case WriteMode::kNum:
  177. assert(false);
  178. }
  179. return "";
  180. }
  181. INSTANTIATE_TEST_CASE_P(
  182. DbKvChecksumTest, DbKvChecksumTest,
  183. ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
  184. WriteBatchOpType::kNum),
  185. ::testing::Values(2, 103, 251),
  186. ::testing::Range(WriteMode::kWriteProtectedBatch,
  187. WriteMode::kNum),
  188. ::testing::Values(0)),
  189. [](const testing::TestParamInfo<
  190. std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) {
  191. std::ostringstream oss;
  192. oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
  193. << static_cast<int>(
  194. static_cast<unsigned char>(std::get<1>(args.param)))
  195. << GetWriteModeString(std::get<2>(args.param))
  196. << static_cast<uint32_t>(std::get<3>(args.param));
  197. return oss.str();
  198. });
  199. // TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such
  200. // corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`.
  201. TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
  202. // This test repeatedly attempts to write `WriteBatch`es containing a single
  203. // entry of type `op_type_`. Each attempt has one byte corrupted in its
  204. // memtable entry by adding `corrupt_byte_addend_` to its original value. The
  205. // test repeats until an attempt has been made on each byte in the encoded
  206. // memtable entry. All attempts are expected to fail with `Status::Corruption`
  207. SyncPoint::GetInstance()->SetCallBack(
  208. "MemTable::Add:Encoded",
  209. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  210. std::placeholders::_1));
  211. while (MoreBytesToCorrupt()) {
  212. // Failed memtable insert always leads to read-only mode, so we have to
  213. // reopen for every attempt.
  214. Options options = CurrentOptions();
  215. if (op_type_ == WriteBatchOpType::kMerge) {
  216. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  217. }
  218. Reopen(options);
  219. SyncPoint::GetInstance()->EnableProcessing();
  220. ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
  221. SyncPoint::GetInstance()->DisableProcessing();
  222. // In case the above callback is not invoked, this test will run
  223. // numeric_limits<size_t>::max() times until it reports an error (or will
  224. // exhaust disk space). Added this assert to report error early.
  225. ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
  226. }
  227. }
  228. TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
  229. // This test repeatedly attempts to write `WriteBatch`es containing a single
  230. // entry of type `op_type_` to a non-default column family. Each attempt has
  231. // one byte corrupted in its memtable entry by adding `corrupt_byte_addend_`
  232. // to its original value. The test repeats until an attempt has been made on
  233. // each byte in the encoded memtable entry. All attempts are expected to fail
  234. // with `Status::Corruption`.
  235. Options options = CurrentOptions();
  236. if (op_type_ == WriteBatchOpType::kMerge) {
  237. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  238. }
  239. CreateAndReopenWithCF({"pikachu"}, options);
  240. SyncPoint::GetInstance()->SetCallBack(
  241. "MemTable::Add:Encoded",
  242. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  243. std::placeholders::_1));
  244. while (MoreBytesToCorrupt()) {
  245. // Failed memtable insert always leads to read-only mode, so we have to
  246. // reopen for every attempt.
  247. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
  248. SyncPoint::GetInstance()->EnableProcessing();
  249. ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption());
  250. SyncPoint::GetInstance()->DisableProcessing();
  251. // In case the above callback is not invoked, this test will run
  252. // numeric_limits<size_t>::max() times until it reports an error (or will
  253. // exhaust disk space). Added this assert to report error early.
  254. ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
  255. }
  256. }
  257. TEST_P(DbKvChecksumTest, NoCorruptionCase) {
  258. // If this test fails, we may have found a piece of malfunctioned hardware
  259. auto batch_and_status =
  260. GetWriteBatch(GetCFHandleToUse(nullptr, op_type_),
  261. 8 /* protection_bytes_per_key */, op_type_);
  262. ASSERT_OK(batch_and_status.second);
  263. ASSERT_OK(batch_and_status.first.VerifyChecksum());
  264. }
  265. TEST_P(DbKvChecksumTest, WriteToWALCorrupted) {
  266. // This test repeatedly attempts to write `WriteBatch`es containing a single
  267. // entry of type `op_type_`. Each attempt has one byte corrupted by adding
  268. // `corrupt_byte_addend_` to its original value. The test repeats until an
  269. // attempt has been made on each byte in the encoded write batch. All attempts
  270. // are expected to fail with `Status::Corruption`
  271. Options options = CurrentOptions();
  272. if (op_type_ == WriteBatchOpType::kMerge) {
  273. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  274. }
  275. SyncPoint::GetInstance()->SetCallBack(
  276. "DBImpl::WriteToWAL:log_entry",
  277. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  278. std::placeholders::_1));
  279. // First 8 bytes are for sequence number which is not protected in write batch
  280. corrupt_byte_offset_ = 8;
  281. while (MoreBytesToCorrupt()) {
  282. // Corrupted write batch leads to read-only mode, so we have to
  283. // reopen for every attempt.
  284. Reopen(options);
  285. auto log_size_pre_write = dbfull()->TEST_wals_total_size();
  286. SyncPoint::GetInstance()->EnableProcessing();
  287. ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
  288. // Confirm that nothing was written to WAL
  289. ASSERT_EQ(log_size_pre_write, dbfull()->TEST_wals_total_size());
  290. ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
  291. SyncPoint::GetInstance()->DisableProcessing();
  292. // In case the above callback is not invoked, this test will run
  293. // numeric_limits<size_t>::max() times until it reports an error (or will
  294. // exhaust disk space). Added this assert to report error early.
  295. ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
  296. }
  297. }
  298. TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
  299. // This test repeatedly attempts to write `WriteBatch`es containing a single
  300. // entry of type `op_type_`. Each attempt has one byte corrupted by adding
  301. // `corrupt_byte_addend_` to its original value. The test repeats until an
  302. // attempt has been made on each byte in the encoded write batch. All attempts
  303. // are expected to fail with `Status::Corruption`
  304. Options options = CurrentOptions();
  305. if (op_type_ == WriteBatchOpType::kMerge) {
  306. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  307. }
  308. CreateAndReopenWithCF({"pikachu"}, options);
  309. SyncPoint::GetInstance()->SetCallBack(
  310. "DBImpl::WriteToWAL:log_entry",
  311. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  312. std::placeholders::_1));
  313. // First 8 bytes are for sequence number which is not protected in write batch
  314. corrupt_byte_offset_ = 8;
  315. while (MoreBytesToCorrupt()) {
  316. // Corrupted write batch leads to read-only mode, so we have to
  317. // reopen for every attempt.
  318. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
  319. auto log_size_pre_write = dbfull()->TEST_wals_total_size();
  320. SyncPoint::GetInstance()->EnableProcessing();
  321. ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
  322. // Confirm that nothing was written to WAL
  323. ASSERT_EQ(log_size_pre_write, dbfull()->TEST_wals_total_size());
  324. ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
  325. SyncPoint::GetInstance()->DisableProcessing();
  326. // In case the above callback is not invoked, this test will run
  327. // numeric_limits<size_t>::max() times until it reports an error (or will
  328. // exhaust disk space). Added this assert to report error early.
  329. ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
  330. }
  331. }
  332. class DbKvChecksumTestMergedBatch
  333. : public DbKvChecksumTestBase,
  334. public ::testing::WithParamInterface<
  335. std::tuple<WriteBatchOpType, WriteBatchOpType, char>> {
  336. public:
  337. DbKvChecksumTestMergedBatch()
  338. : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
  339. op_type1_ = std::get<0>(GetParam());
  340. op_type2_ = std::get<1>(GetParam());
  341. corrupt_byte_addend_ = std::get<2>(GetParam());
  342. }
  343. protected:
  344. WriteBatchOpType op_type1_;
  345. WriteBatchOpType op_type2_;
  346. char corrupt_byte_addend_;
  347. };
  348. void CorruptWriteBatch(Slice* content, size_t offset,
  349. char corrupt_byte_addend) {
  350. ASSERT_TRUE(offset < content->size());
  351. char* buf = const_cast<char*>(content->data());
  352. buf[offset] += corrupt_byte_addend;
  353. }
  354. TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
  355. // Veirfy write batch checksum after write batch append
  356. auto batch1 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
  357. 8 /* protection_bytes_per_key */, op_type1_);
  358. ASSERT_OK(batch1.second);
  359. auto batch2 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
  360. 8 /* protection_bytes_per_key */, op_type2_);
  361. ASSERT_OK(batch2.second);
  362. ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
  363. ASSERT_OK(batch1.first.VerifyChecksum());
  364. }
  365. TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
  366. // This test has two writers repeatedly attempt to write `WriteBatch`es
  367. // containing a single entry of type op_type1_ and op_type2_ respectively. The
  368. // leader of the write group writes the batch containinng the entry of type
  369. // op_type1_. One byte of the pre-merged write batches is corrupted by adding
  370. // `corrupt_byte_addend_` to the batch's original value during each attempt.
  371. // The test repeats until an attempt has been made on each byte in both
  372. // pre-merged write batches. All attempts are expected to fail with
  373. // `Status::Corruption`.
  374. Options options = CurrentOptions();
  375. if (op_type1_ == WriteBatchOpType::kMerge ||
  376. op_type2_ == WriteBatchOpType::kMerge) {
  377. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  378. }
  379. auto leader_batch_and_status =
  380. GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
  381. 8 /* protection_bytes_per_key */, op_type1_);
  382. ASSERT_OK(leader_batch_and_status.second);
  383. auto follower_batch_and_status =
  384. GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
  385. 8 /* protection_bytes_per_key */, op_type2_);
  386. size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
  387. size_t total_bytes =
  388. leader_batch_size + follower_batch_and_status.first.GetDataSize();
  389. // First 8 bytes are for sequence number which is not protected in write batch
  390. size_t corrupt_byte_offset = 8;
  391. std::atomic<bool> follower_joined{false};
  392. std::atomic<int> leader_count{0};
  393. port::Thread follower_thread;
  394. // This callback should only be called by the leader thread
  395. SyncPoint::GetInstance()->SetCallBack(
  396. "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
  397. auto* leader = static_cast<WriteThread::Writer*>(arg_leader);
  398. ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
  399. // This callback should only be called by the follower thread
  400. SyncPoint::GetInstance()->SetCallBack(
  401. "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
  402. auto* follower = static_cast<WriteThread::Writer*>(arg_follower);
  403. // The leader thread will wait on this bool and hence wait until
  404. // this writer joins the write group
  405. ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
  406. if (corrupt_byte_offset >= leader_batch_size) {
  407. Slice batch_content = follower->batch->Data();
  408. CorruptWriteBatch(&batch_content,
  409. corrupt_byte_offset - leader_batch_size,
  410. corrupt_byte_addend_);
  411. }
  412. // Leader busy waits on this flag
  413. follower_joined = true;
  414. // So the follower does not enter the outer callback at
  415. // WriteThread::JoinBatchGroup:Wait2
  416. SyncPoint::GetInstance()->DisableProcessing();
  417. });
  418. // Start the other writer thread which will join the write group as
  419. // follower
  420. follower_thread = port::Thread([&]() {
  421. follower_batch_and_status =
  422. GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
  423. 8 /* protection_bytes_per_key */, op_type2_);
  424. ASSERT_OK(follower_batch_and_status.second);
  425. ASSERT_TRUE(
  426. db_->Write(WriteOptions(), &follower_batch_and_status.first)
  427. .IsCorruption());
  428. });
  429. ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
  430. if (corrupt_byte_offset < leader_batch_size) {
  431. Slice batch_content = leader->batch->Data();
  432. CorruptWriteBatch(&batch_content, corrupt_byte_offset,
  433. corrupt_byte_addend_);
  434. }
  435. leader_count++;
  436. while (!follower_joined) {
  437. // busy waiting
  438. }
  439. });
  440. while (corrupt_byte_offset < total_bytes) {
  441. // Reopen DB since it failed WAL write which lead to read-only mode
  442. Reopen(options);
  443. SyncPoint::GetInstance()->EnableProcessing();
  444. auto log_size_pre_write = dbfull()->TEST_wals_total_size();
  445. leader_batch_and_status =
  446. GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
  447. 8 /* protection_bytes_per_key */, op_type1_);
  448. ASSERT_OK(leader_batch_and_status.second);
  449. ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
  450. .IsCorruption());
  451. follower_thread.join();
  452. // Prevent leader thread from entering this callback
  453. SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
  454. ASSERT_EQ(1, leader_count);
  455. // Nothing should have been written to WAL
  456. ASSERT_EQ(log_size_pre_write, dbfull()->TEST_wals_total_size());
  457. ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
  458. corrupt_byte_offset++;
  459. if (corrupt_byte_offset == leader_batch_size) {
  460. // skip over the sequence number part of follower's write batch
  461. corrupt_byte_offset += 8;
  462. }
  463. follower_joined = false;
  464. leader_count = 0;
  465. }
  466. SyncPoint::GetInstance()->DisableProcessing();
  467. }
  468. TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
  469. // This test has two writers repeatedly attempt to write `WriteBatch`es
  470. // containing a single entry of type op_type1_ and op_type2_ respectively. The
  471. // leader of the write group writes the batch containinng the entry of type
  472. // op_type1_. One byte of the pre-merged write batches is corrupted by adding
  473. // `corrupt_byte_addend_` to the batch's original value during each attempt.
  474. // The test repeats until an attempt has been made on each byte in both
  475. // pre-merged write batches. All attempts are expected to fail with
  476. // `Status::Corruption`.
  477. Options options = CurrentOptions();
  478. if (op_type1_ == WriteBatchOpType::kMerge ||
  479. op_type2_ == WriteBatchOpType::kMerge) {
  480. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  481. }
  482. CreateAndReopenWithCF({"ramen"}, options);
  483. auto leader_batch_and_status =
  484. GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
  485. 8 /* protection_bytes_per_key */, op_type1_);
  486. ASSERT_OK(leader_batch_and_status.second);
  487. auto follower_batch_and_status =
  488. GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
  489. 8 /* protection_bytes_per_key */, op_type2_);
  490. size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
  491. size_t total_bytes =
  492. leader_batch_size + follower_batch_and_status.first.GetDataSize();
  493. // First 8 bytes are for sequence number which is not protected in write batch
  494. size_t corrupt_byte_offset = 8;
  495. std::atomic<bool> follower_joined{false};
  496. std::atomic<int> leader_count{0};
  497. port::Thread follower_thread;
  498. // This callback should only be called by the leader thread
  499. SyncPoint::GetInstance()->SetCallBack(
  500. "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
  501. auto* leader = static_cast<WriteThread::Writer*>(arg_leader);
  502. ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
  503. // This callback should only be called by the follower thread
  504. SyncPoint::GetInstance()->SetCallBack(
  505. "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
  506. auto* follower = static_cast<WriteThread::Writer*>(arg_follower);
  507. // The leader thread will wait on this bool and hence wait until
  508. // this writer joins the write group
  509. ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
  510. if (corrupt_byte_offset >= leader_batch_size) {
  511. Slice batch_content =
  512. WriteBatchInternal::Contents(follower->batch);
  513. CorruptWriteBatch(&batch_content,
  514. corrupt_byte_offset - leader_batch_size,
  515. corrupt_byte_addend_);
  516. }
  517. follower_joined = true;
  518. // So the follower does not enter the outer callback at
  519. // WriteThread::JoinBatchGroup:Wait2
  520. SyncPoint::GetInstance()->DisableProcessing();
  521. });
  522. // Start the other writer thread which will join the write group as
  523. // follower
  524. follower_thread = port::Thread([&]() {
  525. follower_batch_and_status =
  526. GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
  527. 8 /* protection_bytes_per_key */, op_type2_);
  528. ASSERT_OK(follower_batch_and_status.second);
  529. ASSERT_TRUE(
  530. db_->Write(WriteOptions(), &follower_batch_and_status.first)
  531. .IsCorruption());
  532. });
  533. ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
  534. if (corrupt_byte_offset < leader_batch_size) {
  535. Slice batch_content = WriteBatchInternal::Contents(leader->batch);
  536. CorruptWriteBatch(&batch_content, corrupt_byte_offset,
  537. corrupt_byte_addend_);
  538. }
  539. leader_count++;
  540. while (!follower_joined) {
  541. // busy waiting
  542. }
  543. });
  544. SyncPoint::GetInstance()->EnableProcessing();
  545. while (corrupt_byte_offset < total_bytes) {
  546. // Reopen DB since it failed WAL write which lead to read-only mode
  547. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options);
  548. SyncPoint::GetInstance()->EnableProcessing();
  549. auto log_size_pre_write = dbfull()->TEST_wals_total_size();
  550. leader_batch_and_status =
  551. GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
  552. 8 /* protection_bytes_per_key */, op_type1_);
  553. ASSERT_OK(leader_batch_and_status.second);
  554. ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
  555. .IsCorruption());
  556. follower_thread.join();
  557. // Prevent leader thread from entering this callback
  558. SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
  559. ASSERT_EQ(1, leader_count);
  560. // Nothing should have been written to WAL
  561. ASSERT_EQ(log_size_pre_write, dbfull()->TEST_wals_total_size());
  562. ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
  563. corrupt_byte_offset++;
  564. if (corrupt_byte_offset == leader_batch_size) {
  565. // skip over the sequence number part of follower's write batch
  566. corrupt_byte_offset += 8;
  567. }
  568. follower_joined = false;
  569. leader_count = 0;
  570. }
  571. SyncPoint::GetInstance()->DisableProcessing();
  572. }
  573. INSTANTIATE_TEST_CASE_P(
  574. DbKvChecksumTestMergedBatch, DbKvChecksumTestMergedBatch,
  575. ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
  576. WriteBatchOpType::kNum),
  577. ::testing::Range(static_cast<WriteBatchOpType>(0),
  578. WriteBatchOpType::kNum),
  579. ::testing::Values(2, 103, 251)),
  580. [](const testing::TestParamInfo<
  581. std::tuple<WriteBatchOpType, WriteBatchOpType, char>>& args) {
  582. std::ostringstream oss;
  583. oss << GetOpTypeString(std::get<0>(args.param))
  584. << GetOpTypeString(std::get<1>(args.param)) << "Add"
  585. << static_cast<int>(
  586. static_cast<unsigned char>(std::get<2>(args.param)));
  587. return oss.str();
  588. });
  589. // TODO: add test for transactions
  590. // TODO: add test for corrupted write batch with WAL disabled
  591. class DbKVChecksumWALToWriteBatchTest : public DBTestBase {
  592. public:
  593. DbKVChecksumWALToWriteBatchTest()
  594. : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {}
  595. };
  596. TEST_F(DbKVChecksumWALToWriteBatchTest, WriteBatchChecksumHandoff) {
  597. Options options = CurrentOptions();
  598. Reopen(options);
  599. ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
  600. std::string content;
  601. SyncPoint::GetInstance()->SetCallBack(
  602. "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
  603. [&](void* batch_ptr) {
  604. WriteBatch* batch = static_cast<WriteBatch*>(batch_ptr);
  605. content.assign(batch->Data().data(), batch->GetDataSize());
  606. Slice batch_content = batch->Data();
  607. // Corrupt first bit
  608. CorruptWriteBatch(&batch_content, 0, 1);
  609. });
  610. SyncPoint::GetInstance()->SetCallBack(
  611. "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
  612. [&](void* checksum_ptr) {
  613. // Verify that checksum is produced on the batch content
  614. uint64_t checksum = *static_cast<uint64_t*>(checksum_ptr);
  615. ASSERT_EQ(checksum, XXH3_64bits(content.data(), content.size()));
  616. });
  617. SyncPoint::GetInstance()->EnableProcessing();
  618. ASSERT_TRUE(TryReopen(options).IsCorruption());
  619. SyncPoint::GetInstance()->DisableProcessing();
  620. };
  621. // TODO (cbi): add DeleteRange coverage once it is implemented
  622. class DbMemtableKVChecksumTest : public DbKvChecksumTest {
  623. public:
  624. DbMemtableKVChecksumTest() : DbKvChecksumTest() {}
  625. protected:
  626. const size_t kValueLenOffset = 12;
  627. // Indices in the memtable entry that we will not corrupt.
  628. // For memtable entry format, see comments in MemTable::Add().
  629. // We do not corrupt key length and value length fields in this test
  630. // case since it causes segfault and ASAN will complain.
  631. // For this test case, key and value are all of length 3, so
  632. // key length field is at index 0 and value length field is at index 12.
  633. const std::set<size_t> index_not_to_corrupt{0, kValueLenOffset};
  634. void SkipNotToCorruptEntry() {
  635. if (index_not_to_corrupt.find(corrupt_byte_offset_) !=
  636. index_not_to_corrupt.end()) {
  637. corrupt_byte_offset_++;
  638. }
  639. }
  640. };
  641. INSTANTIATE_TEST_CASE_P(
  642. DbMemtableKVChecksumTest, DbMemtableKVChecksumTest,
  643. ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
  644. WriteBatchOpType::kDeleteRange),
  645. ::testing::Values(2, 103, 251),
  646. ::testing::Range(static_cast<WriteMode>(0),
  647. WriteMode::kWriteOptionProtectedBatch),
  648. // skip 1 byte checksum as it makes test flaky
  649. ::testing::Values(2, 4, 8)),
  650. [](const testing::TestParamInfo<
  651. std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) {
  652. std::ostringstream oss;
  653. oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
  654. << static_cast<int>(
  655. static_cast<unsigned char>(std::get<1>(args.param)))
  656. << GetWriteModeString(std::get<2>(args.param))
  657. << static_cast<uint32_t>(std::get<3>(args.param));
  658. return oss.str();
  659. });
  660. TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) {
  661. // Record memtable entry size.
  662. // Not corrupting memtable entry here since it will segfault
  663. // or fail some asserts inside memtablerep implementation
  664. // e.g., when key_len is corrupted.
  665. SyncPoint::GetInstance()->SetCallBack(
  666. "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) {
  667. Slice encoded = *static_cast<Slice*>(arg);
  668. entry_len_ = encoded.size();
  669. });
  670. SyncPoint::GetInstance()->SetCallBack(
  671. "Memtable::SaveValue:Found:entry", [&](void* entry) {
  672. char* buf = *static_cast<char**>(entry);
  673. buf[corrupt_byte_offset_] += corrupt_byte_addend_;
  674. ++corrupt_byte_offset_;
  675. });
  676. // Corrupt value only so that MultiGet below can find the key.
  677. corrupt_byte_offset_ = kValueLenOffset + 1;
  678. SyncPoint::GetInstance()->EnableProcessing();
  679. Options options = CurrentOptions();
  680. options.memtable_protection_bytes_per_key =
  681. memtable_protection_bytes_per_key_;
  682. if (op_type_ == WriteBatchOpType::kMerge) {
  683. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  684. }
  685. std::string key = "key";
  686. SkipNotToCorruptEntry();
  687. while (MoreBytesToCorrupt()) {
  688. Reopen(options);
  689. ASSERT_OK(ExecuteWrite(nullptr));
  690. std::string val;
  691. ASSERT_TRUE(db_->Get(ReadOptions(), key, &val).IsCorruption());
  692. std::vector<std::string> vals = {val};
  693. std::vector<Status> statuses = db_->MultiGet(
  694. ReadOptions(), {db_->DefaultColumnFamily()}, {key}, &vals, nullptr);
  695. ASSERT_TRUE(statuses[0].IsCorruption());
  696. Destroy(options);
  697. SkipNotToCorruptEntry();
  698. }
  699. }
  700. TEST_P(DbMemtableKVChecksumTest,
  701. GetWithColumnFamilyCorruptAfterMemtableInsert) {
  702. // Record memtable entry size.
  703. // Not corrupting memtable entry here since it will segfault
  704. // or fail some asserts inside memtablerep implementation
  705. // e.g., when key_len is corrupted.
  706. SyncPoint::GetInstance()->SetCallBack(
  707. "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) {
  708. Slice encoded = *static_cast<Slice*>(arg);
  709. entry_len_ = encoded.size();
  710. });
  711. SyncPoint::GetInstance()->SetCallBack(
  712. "Memtable::SaveValue:Found:entry", [&](void* entry) {
  713. char* buf = *static_cast<char**>(entry);
  714. buf[corrupt_byte_offset_] += corrupt_byte_addend_;
  715. ++corrupt_byte_offset_;
  716. });
  717. SyncPoint::GetInstance()->EnableProcessing();
  718. Options options = CurrentOptions();
  719. options.memtable_protection_bytes_per_key =
  720. memtable_protection_bytes_per_key_;
  721. if (op_type_ == WriteBatchOpType::kMerge) {
  722. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  723. }
  724. SkipNotToCorruptEntry();
  725. while (MoreBytesToCorrupt()) {
  726. Reopen(options);
  727. CreateAndReopenWithCF({"pikachu"}, options);
  728. ASSERT_OK(ExecuteWrite(handles_[1]));
  729. std::string val;
  730. ASSERT_TRUE(
  731. db_->Get(ReadOptions(), handles_[1], "key", &val).IsCorruption());
  732. Destroy(options);
  733. SkipNotToCorruptEntry();
  734. }
  735. }
  736. TEST_P(DbMemtableKVChecksumTest, IteratorWithCorruptAfterMemtableInsert) {
  737. SyncPoint::GetInstance()->SetCallBack(
  738. "MemTable::Add:BeforeReturn:Encoded",
  739. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  740. std::placeholders::_1));
  741. SyncPoint::GetInstance()->EnableProcessing();
  742. Options options = CurrentOptions();
  743. options.memtable_protection_bytes_per_key =
  744. memtable_protection_bytes_per_key_;
  745. if (op_type_ == WriteBatchOpType::kMerge) {
  746. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  747. }
  748. SkipNotToCorruptEntry();
  749. while (MoreBytesToCorrupt()) {
  750. Reopen(options);
  751. ASSERT_OK(ExecuteWrite(nullptr));
  752. Iterator* it = db_->NewIterator(ReadOptions());
  753. it->SeekToFirst();
  754. ASSERT_FALSE(it->Valid());
  755. ASSERT_TRUE(it->status().IsCorruption());
  756. delete it;
  757. Destroy(options);
  758. SkipNotToCorruptEntry();
  759. }
  760. }
  761. TEST_P(DbMemtableKVChecksumTest,
  762. IteratorWithColumnFamilyCorruptAfterMemtableInsert) {
  763. SyncPoint::GetInstance()->SetCallBack(
  764. "MemTable::Add:BeforeReturn:Encoded",
  765. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  766. std::placeholders::_1));
  767. SyncPoint::GetInstance()->EnableProcessing();
  768. Options options = CurrentOptions();
  769. options.memtable_protection_bytes_per_key =
  770. memtable_protection_bytes_per_key_;
  771. if (op_type_ == WriteBatchOpType::kMerge) {
  772. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  773. }
  774. SkipNotToCorruptEntry();
  775. while (MoreBytesToCorrupt()) {
  776. Reopen(options);
  777. CreateAndReopenWithCF({"pikachu"}, options);
  778. ASSERT_OK(ExecuteWrite(handles_[1]));
  779. Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
  780. it->SeekToFirst();
  781. ASSERT_FALSE(it->Valid());
  782. ASSERT_TRUE(it->status().IsCorruption());
  783. delete it;
  784. Destroy(options);
  785. SkipNotToCorruptEntry();
  786. }
  787. }
  788. TEST_P(DbMemtableKVChecksumTest, FlushWithCorruptAfterMemtableInsert) {
  789. SyncPoint::GetInstance()->SetCallBack(
  790. "MemTable::Add:BeforeReturn:Encoded",
  791. std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
  792. std::placeholders::_1));
  793. SyncPoint::GetInstance()->EnableProcessing();
  794. Options options = CurrentOptions();
  795. options.memtable_protection_bytes_per_key =
  796. memtable_protection_bytes_per_key_;
  797. if (op_type_ == WriteBatchOpType::kMerge) {
  798. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  799. }
  800. SkipNotToCorruptEntry();
  801. // Not corruping each byte like other tests since Flush() is relatively slow.
  802. Reopen(options);
  803. ASSERT_OK(ExecuteWrite(nullptr));
  804. ASSERT_TRUE(Flush().IsCorruption());
  805. // DB enters read-only state when flush reads corrupted data
  806. ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
  807. Destroy(options);
  808. }
  809. } // namespace ROCKSDB_NAMESPACE
  810. int main(int argc, char** argv) {
  811. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  812. ::testing::InitGoogleTest(&argc, argv);
  813. return RUN_ALL_TESTS();
  814. }