transaction_test.h 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <functional>
  9. #include <string>
  10. #include <thread>
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/db_test_util.h"
  13. #include "port/port.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/utilities/transaction.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "table/mock_table.h"
  19. #include "test_util/sync_point.h"
  20. #include "test_util/testharness.h"
  21. #include "test_util/testutil.h"
  22. #include "test_util/transaction_test_util.h"
  23. #include "util/random.h"
  24. #include "util/string_util.h"
  25. #include "utilities/fault_injection_fs.h"
  26. #include "utilities/merge_operators.h"
  27. #include "utilities/merge_operators/string_append/stringappend.h"
  28. #include "utilities/transactions/pessimistic_transaction_db.h"
  29. #include "utilities/transactions/write_unprepared_txn_db.h"
  30. namespace ROCKSDB_NAMESPACE {
  31. // Return true if the ith bit is set in combination represented by comb
  32. bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); }
  33. enum WriteOrdering : bool { kOrderedWrite, kUnorderedWrite };
  34. class TransactionTestBase : public ::testing::Test {
  35. public:
  36. TransactionDB* db;
  37. SpecialEnv special_env;
  38. std::shared_ptr<FaultInjectionTestFS> fault_fs;
  39. std::unique_ptr<Env> env;
  40. std::string dbname;
  41. Options options;
  42. TransactionDBOptions txn_db_options;
  43. bool use_stackable_db_;
  44. int64_t deadlock_timeout_us_;
  45. TransactionTestBase(bool use_stackable_db, bool two_write_queue,
  46. TxnDBWritePolicy write_policy,
  47. WriteOrdering write_ordering,
  48. bool use_per_key_point_lock_mgr,
  49. int64_t deadlock_timeout_us)
  50. : db(nullptr),
  51. special_env(Env::Default()),
  52. env(nullptr),
  53. use_stackable_db_(use_stackable_db),
  54. deadlock_timeout_us_(deadlock_timeout_us) {
  55. options.create_if_missing = true;
  56. options.max_write_buffer_number = 2;
  57. options.write_buffer_size = 4 * 1024;
  58. options.unordered_write = write_ordering == kUnorderedWrite;
  59. options.level0_file_num_compaction_trigger = 2;
  60. options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
  61. // Recycling log file is generally more challenging for correctness
  62. options.recycle_log_file_num = 2;
  63. special_env.skip_fsync_ = true;
  64. fault_fs.reset(new FaultInjectionTestFS(FileSystem::Default()));
  65. env.reset(new CompositeEnvWrapper(&special_env, fault_fs));
  66. options.env = env.get();
  67. options.two_write_queues = two_write_queue;
  68. dbname = test::PerThreadDBPath("transaction_testdb");
  69. EXPECT_OK(DestroyDB(dbname, options));
  70. txn_db_options.transaction_lock_timeout = 0;
  71. txn_db_options.default_lock_timeout = 0;
  72. txn_db_options.write_policy = write_policy;
  73. txn_db_options.rollback_merge_operands = true;
  74. txn_db_options.use_per_key_point_lock_mgr = use_per_key_point_lock_mgr;
  75. // This will stress write unprepared, by forcing write batch flush on every
  76. // write.
  77. txn_db_options.default_write_batch_flush_threshold = 1;
  78. // Write unprepared requires all transactions to be named. This setting
  79. // autogenerates the name so that existing tests can pass.
  80. txn_db_options.autogenerate_name = true;
  81. Status s;
  82. if (use_stackable_db == false) {
  83. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  84. } else {
  85. s = OpenWithStackableDB();
  86. }
  87. EXPECT_OK(s);
  88. }
  89. ~TransactionTestBase() {
  90. delete db;
  91. db = nullptr;
  92. // This is to skip the assert statement in FaultInjectionTestEnv. There
  93. // seems to be a bug in btrfs that the makes readdir return recently
  94. // unlink-ed files. By using the default fs we simply ignore errors resulted
  95. // from attempting to delete such files in DestroyDB.
  96. if (getenv("KEEP_DB") == nullptr) {
  97. options.env = Env::Default();
  98. EXPECT_OK(DestroyDB(dbname, options));
  99. } else {
  100. fprintf(stdout, "db is still in %s\n", dbname.c_str());
  101. }
  102. }
  103. Status ReOpenNoDelete() {
  104. delete db;
  105. db = nullptr;
  106. fault_fs->AssertNoOpenFile();
  107. EXPECT_OK(fault_fs->DropUnsyncedFileData());
  108. fault_fs->ResetState();
  109. Status s;
  110. if (use_stackable_db_ == false) {
  111. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  112. } else {
  113. s = OpenWithStackableDB();
  114. }
  115. assert(!s.ok() || db != nullptr);
  116. return s;
  117. }
  118. Status ReOpenNoDelete(std::vector<ColumnFamilyDescriptor>& cfs,
  119. std::vector<ColumnFamilyHandle*>* handles) {
  120. for (auto h : *handles) {
  121. delete h;
  122. }
  123. handles->clear();
  124. delete db;
  125. db = nullptr;
  126. fault_fs->AssertNoOpenFile();
  127. EXPECT_OK(fault_fs->DropUnsyncedFileData());
  128. fault_fs->ResetState();
  129. Status s;
  130. if (use_stackable_db_ == false) {
  131. s = TransactionDB::Open(options, txn_db_options, dbname, cfs, handles,
  132. &db);
  133. } else {
  134. s = OpenWithStackableDB(cfs, handles);
  135. }
  136. assert(!s.ok() || db != nullptr);
  137. return s;
  138. }
  139. Status ReOpen() {
  140. delete db;
  141. db = nullptr;
  142. EXPECT_OK(DestroyDB(dbname, options));
  143. Status s;
  144. if (use_stackable_db_ == false) {
  145. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  146. } else {
  147. s = OpenWithStackableDB();
  148. }
  149. assert(db != nullptr);
  150. return s;
  151. }
  152. Status OpenWithStackableDB(std::vector<ColumnFamilyDescriptor>& cfs,
  153. std::vector<ColumnFamilyHandle*>* handles) {
  154. std::vector<size_t> compaction_enabled_cf_indices;
  155. TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices);
  156. std::unique_ptr<DB> root_db;
  157. Options options_copy(options);
  158. const bool use_seq_per_batch =
  159. txn_db_options.write_policy == WRITE_PREPARED ||
  160. txn_db_options.write_policy == WRITE_UNPREPARED;
  161. const bool use_batch_per_txn =
  162. txn_db_options.write_policy == WRITE_COMMITTED ||
  163. txn_db_options.write_policy == WRITE_PREPARED;
  164. Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
  165. use_seq_per_batch, use_batch_per_txn,
  166. /*is_retry=*/false, /*can_retry=*/nullptr);
  167. auto stackable_db = std::make_unique<StackableDB>(std::move(root_db));
  168. if (s.ok()) {
  169. // If WrapStackableDB() returns non-ok, then stackable_db is already
  170. // deleted within WrapStackableDB().
  171. s = TransactionDB::WrapStackableDB(stackable_db.release(), txn_db_options,
  172. compaction_enabled_cf_indices,
  173. *handles, &db);
  174. }
  175. return s;
  176. }
  177. Status OpenWithStackableDB() {
  178. std::vector<size_t> compaction_enabled_cf_indices;
  179. std::vector<ColumnFamilyDescriptor> column_families{ColumnFamilyDescriptor(
  180. kDefaultColumnFamilyName, ColumnFamilyOptions(options))};
  181. TransactionDB::PrepareWrap(&options, &column_families,
  182. &compaction_enabled_cf_indices);
  183. std::vector<ColumnFamilyHandle*> handles;
  184. std::unique_ptr<DB> root_db;
  185. Options options_copy(options);
  186. const bool use_seq_per_batch =
  187. txn_db_options.write_policy == WRITE_PREPARED ||
  188. txn_db_options.write_policy == WRITE_UNPREPARED;
  189. const bool use_batch_per_txn =
  190. txn_db_options.write_policy == WRITE_COMMITTED ||
  191. txn_db_options.write_policy == WRITE_PREPARED;
  192. Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
  193. &root_db, use_seq_per_batch, use_batch_per_txn,
  194. /*is_retry=*/false, /*can_retry=*/nullptr);
  195. if (!s.ok()) {
  196. return s;
  197. }
  198. StackableDB* stackable_db = new StackableDB(std::move(root_db));
  199. assert(handles.size() == 1);
  200. s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
  201. compaction_enabled_cf_indices, handles,
  202. &db);
  203. delete handles[0];
  204. if (!s.ok()) {
  205. delete stackable_db;
  206. }
  207. return s;
  208. }
  209. std::atomic<size_t> linked = {0};
  210. std::atomic<size_t> exp_seq = {0};
  211. std::atomic<size_t> commit_writes = {0};
  212. std::atomic<size_t> expected_commits = {0};
  213. // Without Prepare, the commit does not write to WAL
  214. std::atomic<size_t> with_empty_commits = {0};
  215. void TestTxn0(size_t index) {
  216. // Test DB's internal txn. It involves no prepare phase nor a commit marker.
  217. auto s = db->Put(WriteOptions(), "key" + std::to_string(index), "value");
  218. ASSERT_OK(s);
  219. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  220. // Consume one seq per key
  221. exp_seq++;
  222. } else {
  223. // Consume one seq per batch
  224. exp_seq++;
  225. if (options.two_write_queues) {
  226. // Consume one seq for commit
  227. exp_seq++;
  228. }
  229. }
  230. with_empty_commits++;
  231. }
  232. void TestTxn1(size_t index) {
  233. // Testing directly writing a write batch. Functionality-wise it is
  234. // equivalent to commit without prepare.
  235. WriteBatch wb;
  236. auto istr = std::to_string(index);
  237. ASSERT_OK(wb.Put("k1" + istr, "v1"));
  238. ASSERT_OK(wb.Put("k2" + istr, "v2"));
  239. ASSERT_OK(wb.Put("k3" + istr, "v3"));
  240. auto s = db->Write(WriteOptions(), &wb);
  241. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  242. // Consume one seq per key
  243. exp_seq += 3;
  244. } else {
  245. // Consume one seq per batch
  246. exp_seq++;
  247. if (options.two_write_queues) {
  248. // Consume one seq for commit
  249. exp_seq++;
  250. }
  251. }
  252. ASSERT_OK(s);
  253. with_empty_commits++;
  254. }
  255. void TestTxn2(size_t index) {
  256. // Commit without prepare. It should write to DB without a commit marker.
  257. Transaction* txn =
  258. db->BeginTransaction(WriteOptions(), TransactionOptions());
  259. auto istr = std::to_string(index);
  260. ASSERT_OK(txn->SetName("xid" + istr));
  261. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  262. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  263. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  264. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  265. ASSERT_OK(txn->Commit());
  266. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  267. // Consume one seq per key
  268. exp_seq += 4;
  269. } else if (txn_db_options.write_policy ==
  270. TxnDBWritePolicy::WRITE_PREPARED) {
  271. // Consume one seq per batch
  272. exp_seq++;
  273. if (options.two_write_queues) {
  274. // Consume one seq for commit
  275. exp_seq++;
  276. }
  277. } else {
  278. // Flushed after each key, consume one seq per flushed batch
  279. exp_seq += 4;
  280. // WriteUnprepared implements CommitWithoutPrepareInternal by simply
  281. // calling Prepare then Commit. Consume one seq for the prepare.
  282. exp_seq++;
  283. }
  284. delete txn;
  285. with_empty_commits++;
  286. }
  287. void TestTxn3(size_t index) {
  288. // A full 2pc txn that also involves a commit marker.
  289. Transaction* txn =
  290. db->BeginTransaction(WriteOptions(), TransactionOptions());
  291. auto istr = std::to_string(index);
  292. ASSERT_OK(txn->SetName("xid" + istr));
  293. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  294. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  295. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  296. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  297. ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
  298. expected_commits++;
  299. ASSERT_OK(txn->Prepare());
  300. commit_writes++;
  301. ASSERT_OK(txn->Commit());
  302. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  303. // Consume one seq per key
  304. exp_seq += 5;
  305. } else if (txn_db_options.write_policy ==
  306. TxnDBWritePolicy::WRITE_PREPARED) {
  307. // Consume one seq per batch
  308. exp_seq++;
  309. // Consume one seq per commit marker
  310. exp_seq++;
  311. } else {
  312. // Flushed after each key, consume one seq per flushed batch
  313. exp_seq += 5;
  314. // Consume one seq per commit marker
  315. exp_seq++;
  316. }
  317. delete txn;
  318. }
  319. void TestTxn4(size_t index) {
  320. // A full 2pc txn that also involves a commit marker.
  321. Transaction* txn =
  322. db->BeginTransaction(WriteOptions(), TransactionOptions());
  323. auto istr = std::to_string(index);
  324. ASSERT_OK(txn->SetName("xid" + istr));
  325. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  326. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  327. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  328. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  329. ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
  330. expected_commits++;
  331. ASSERT_OK(txn->Prepare());
  332. commit_writes++;
  333. ASSERT_OK(txn->Rollback());
  334. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  335. // No seq is consumed for deleting the txn buffer
  336. exp_seq += 0;
  337. } else if (txn_db_options.write_policy ==
  338. TxnDBWritePolicy::WRITE_PREPARED) {
  339. // Consume one seq per batch
  340. exp_seq++;
  341. // Consume one seq per rollback batch
  342. exp_seq++;
  343. if (options.two_write_queues) {
  344. // Consume one seq for rollback commit
  345. exp_seq++;
  346. }
  347. } else {
  348. // Flushed after each key, consume one seq per flushed batch
  349. exp_seq += 5;
  350. // Consume one seq per rollback batch
  351. exp_seq++;
  352. if (options.two_write_queues) {
  353. // Consume one seq for rollback commit
  354. exp_seq++;
  355. }
  356. }
  357. delete txn;
  358. }
  359. // Test that we can change write policy after a clean shutdown (which would
  360. // empty the WAL)
  361. void CrossCompatibilityTest(TxnDBWritePolicy from_policy,
  362. TxnDBWritePolicy to_policy, bool empty_wal) {
  363. TransactionOptions txn_options;
  364. ReadOptions read_options;
  365. WriteOptions write_options;
  366. uint32_t index = 0;
  367. Random rnd(1103);
  368. options.write_buffer_size = 1024; // To create more sst files
  369. std::unordered_map<std::string, std::string> committed_kvs;
  370. Transaction* txn;
  371. txn_db_options.write_policy = from_policy;
  372. if (txn_db_options.write_policy == WRITE_COMMITTED) {
  373. options.unordered_write = false;
  374. }
  375. ASSERT_OK(ReOpen());
  376. for (int i = 0; i < 1024; i++) {
  377. auto istr = std::to_string(index);
  378. auto k = Slice("foo-" + istr).ToString();
  379. auto v = Slice("bar-" + istr).ToString();
  380. // For test the duplicate keys
  381. auto v2 = Slice("bar2-" + istr).ToString();
  382. auto type = rnd.Uniform(4);
  383. switch (type) {
  384. case 0:
  385. committed_kvs[k] = v;
  386. ASSERT_OK(db->Put(write_options, k, v));
  387. committed_kvs[k] = v2;
  388. ASSERT_OK(db->Put(write_options, k, v2));
  389. break;
  390. case 1: {
  391. WriteBatch wb;
  392. committed_kvs[k] = v;
  393. ASSERT_OK(wb.Put(k, v));
  394. committed_kvs[k] = v2;
  395. ASSERT_OK(wb.Put(k, v2));
  396. ASSERT_OK(db->Write(write_options, &wb));
  397. } break;
  398. case 2:
  399. case 3:
  400. txn = db->BeginTransaction(write_options, txn_options);
  401. ASSERT_OK(txn->SetName("xid" + istr));
  402. committed_kvs[k] = v;
  403. ASSERT_OK(txn->Put(k, v));
  404. committed_kvs[k] = v2;
  405. ASSERT_OK(txn->Put(k, v2));
  406. if (type == 3) {
  407. ASSERT_OK(txn->Prepare());
  408. }
  409. ASSERT_OK(txn->Commit());
  410. delete txn;
  411. break;
  412. default:
  413. FAIL();
  414. }
  415. index++;
  416. } // for i
  417. txn_db_options.write_policy = to_policy;
  418. if (txn_db_options.write_policy == WRITE_COMMITTED) {
  419. options.unordered_write = false;
  420. }
  421. auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  422. // Before upgrade/downgrade the WAL must be emptied
  423. if (empty_wal) {
  424. ASSERT_OK(db_impl->TEST_FlushMemTable());
  425. } else {
  426. ASSERT_OK(db_impl->FlushWAL(true));
  427. }
  428. auto s = ReOpenNoDelete();
  429. if (empty_wal) {
  430. ASSERT_OK(s);
  431. } else {
  432. // Test that we can detect the WAL that is produced by an incompatible
  433. // WritePolicy and fail fast before mis-interpreting the WAL.
  434. ASSERT_TRUE(s.IsNotSupported());
  435. return;
  436. }
  437. db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  438. // Check that WAL is empty
  439. VectorWalPtr log_files;
  440. ASSERT_OK(db_impl->GetSortedWalFiles(log_files));
  441. ASSERT_EQ(0, log_files.size());
  442. for (auto& kv : committed_kvs) {
  443. std::string value;
  444. s = db->Get(read_options, kv.first, &value);
  445. if (s.IsNotFound()) {
  446. printf("key = %s\n", kv.first.c_str());
  447. }
  448. ASSERT_OK(s);
  449. if (kv.second != value) {
  450. printf("key = %s\n", kv.first.c_str());
  451. }
  452. ASSERT_EQ(kv.second, value);
  453. }
  454. }
  455. };
  456. class TransactionTest
  457. : public TransactionTestBase,
  458. virtual public ::testing::WithParamInterface<std::tuple<
  459. bool, bool, TxnDBWritePolicy, WriteOrdering, bool, int64_t>> {
  460. public:
  461. TransactionTest()
  462. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  463. std::get<2>(GetParam()), std::get<3>(GetParam()),
  464. std::get<4>(GetParam()), std::get<5>(GetParam())) {}
  465. };
  466. class TransactionDBTest
  467. : public TransactionTestBase,
  468. virtual public ::testing::WithParamInterface<std::tuple<bool, int64_t>> {
  469. public:
  470. TransactionDBTest()
  471. : TransactionTestBase(false, false, WRITE_COMMITTED, kOrderedWrite,
  472. std::get<0>(GetParam()), std::get<1>(GetParam())) {}
  473. };
  474. class TransactionStressTest : public TransactionTest {};
  475. class MySQLStyleTransactionTest
  476. : public TransactionTestBase,
  477. virtual public ::testing::WithParamInterface<std::tuple<
  478. bool, bool, TxnDBWritePolicy, WriteOrdering, bool, bool, int64_t>> {
  479. public:
  480. MySQLStyleTransactionTest()
  481. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  482. std::get<2>(GetParam()), std::get<3>(GetParam()),
  483. std::get<5>(GetParam()), std::get<6>(GetParam())),
  484. with_slow_threads_(std::get<4>(GetParam())) {
  485. if (with_slow_threads_ &&
  486. (txn_db_options.write_policy == WRITE_PREPARED ||
  487. txn_db_options.write_policy == WRITE_UNPREPARED)) {
  488. // The corner case with slow threads involves the caches filling
  489. // over which would not happen even with artifial delays. To help
  490. // such cases to show up we lower the size of the cache-related data
  491. // structures.
  492. txn_db_options.wp_snapshot_cache_bits = 1;
  493. txn_db_options.wp_commit_cache_bits = 10;
  494. options.write_buffer_size = 1024;
  495. EXPECT_OK(ReOpen());
  496. }
  497. };
  498. protected:
  499. // Also emulate slow threads by addin artiftial delays
  500. const bool with_slow_threads_;
  501. };
  502. class WriteCommittedTxnWithTsTest
  503. : public TransactionTestBase,
  504. public ::testing::WithParamInterface<
  505. std::tuple<bool, bool, bool, bool, int64_t>> {
  506. public:
  507. WriteCommittedTxnWithTsTest()
  508. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  509. WRITE_COMMITTED, kOrderedWrite,
  510. std::get<3>(GetParam()), std::get<4>(GetParam())) {}
  511. ~WriteCommittedTxnWithTsTest() override {
  512. for (auto* h : handles_) {
  513. delete h;
  514. }
  515. }
  516. Status GetFromDb(ReadOptions read_opts, ColumnFamilyHandle* column_family,
  517. const Slice& key, TxnTimestamp ts, std::string* value) {
  518. std::string ts_buf;
  519. PutFixed64(&ts_buf, ts);
  520. Slice ts_slc = ts_buf;
  521. read_opts.timestamp = &ts_slc;
  522. assert(db);
  523. return db->Get(read_opts, column_family, key, value);
  524. }
  525. Transaction* NewTxn(WriteOptions write_opts, TransactionOptions txn_opts) {
  526. assert(db);
  527. auto* txn = db->BeginTransaction(write_opts, txn_opts);
  528. assert(txn);
  529. const bool enable_indexing = std::get<2>(GetParam());
  530. if (enable_indexing) {
  531. txn->EnableIndexing();
  532. } else {
  533. txn->DisableIndexing();
  534. }
  535. return txn;
  536. }
  537. protected:
  538. std::vector<ColumnFamilyHandle*> handles_{};
  539. };
  540. class TimestampedSnapshotWithTsSanityCheck
  541. : public TransactionTestBase,
  542. public ::testing::WithParamInterface<std::tuple<
  543. bool, bool, TxnDBWritePolicy, WriteOrdering, bool, int64_t>> {
  544. public:
  545. explicit TimestampedSnapshotWithTsSanityCheck()
  546. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  547. std::get<2>(GetParam()), std::get<3>(GetParam()),
  548. std::get<4>(GetParam()), std::get<5>(GetParam())) {}
  549. ~TimestampedSnapshotWithTsSanityCheck() override {
  550. for (auto* h : handles_) {
  551. delete h;
  552. }
  553. }
  554. protected:
  555. std::vector<ColumnFamilyHandle*> handles_{};
  556. };
  557. // The following templates causes a bug in GCC 14, ignore the error for now
  558. #if defined(__GNUC__) && __GNUC__ == 14
  559. #pragma GCC diagnostic push
  560. #pragma GCC diagnostic ignored "-Wstringop-overflow"
  561. #endif
  562. // Wrap existing params with per-key point lock manager parameters
  563. template <typename TargetParamType, typename SourceParamType, std::size_t... Is>
  564. std::vector<TargetParamType> WrapParamWithPerKeyPointLockManagerParamsImpl(
  565. SourceParamType&& source_param, std::index_sequence<Is...>) {
  566. std::vector<TargetParamType> wrapped_params;
  567. // Use original PointLockManager
  568. wrapped_params.push_back(TargetParamType(
  569. std::get<Is>(std::forward<SourceParamType>(source_param))..., false,
  570. INT64_C(0)));
  571. // Use PerKeyPointLockManager with deadlock timeout 0
  572. wrapped_params.push_back(TargetParamType(
  573. std::get<Is>(std::forward<SourceParamType>(source_param))..., true,
  574. INT64_C(0)));
  575. // Use PerKeyPointLockManager with deadlock timeout 1000
  576. wrapped_params.push_back(TargetParamType(
  577. std::get<Is>(std::forward<SourceParamType>(source_param))..., true,
  578. INT64_C(1000)));
  579. return wrapped_params;
  580. }
  581. template <typename TargetParamType, typename SourceParamType>
  582. std::vector<TargetParamType> WrapParamWithPerKeyPointLockManagerParams(
  583. SourceParamType&& source_param) {
  584. // Get the size of the source param
  585. constexpr std::size_t N = std::tuple_size_v<std::decay_t<SourceParamType>>;
  586. // Create an index sequence from 0 to N-1
  587. return WrapParamWithPerKeyPointLockManagerParamsImpl<TargetParamType>(
  588. std::forward<SourceParamType>(source_param),
  589. std::make_index_sequence<N>{});
  590. }
  591. template <typename TargetParamType, typename SourceParamType, size_t M>
  592. std::vector<TargetParamType> WrapParamsWithPerKeyPointLockManagerParams(
  593. std::array<SourceParamType, M> source_param) {
  594. std::vector<TargetParamType> wrapped_params;
  595. for (auto& param : source_param) {
  596. // Create an index sequence from 0 to N-1
  597. auto new_params =
  598. WrapParamWithPerKeyPointLockManagerParams<TargetParamType>(
  599. std::forward<SourceParamType>(param));
  600. wrapped_params.insert(wrapped_params.end(), new_params.begin(),
  601. new_params.end());
  602. }
  603. return wrapped_params;
  604. }
  605. #if defined(__GNUC__) && __GNUC__ == 14
  606. #pragma GCC diagnostic pop
  607. #endif
  608. #define WRAP_PARAM(...) __VA_ARGS__
  609. #define WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(SOURCE_PARAM_TYPES, \
  610. PARAMS) \
  611. WrapParamsWithPerKeyPointLockManagerParams< \
  612. std::tuple<SOURCE_PARAM_TYPES, bool, int64_t>>(PARAMS)
  613. } // namespace ROCKSDB_NAMESPACE