write_unprepared_transaction_test.cc 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/transaction_test.h"
  6. #include "utilities/transactions/write_unprepared_txn.h"
  7. #include "utilities/transactions/write_unprepared_txn_db.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
  10. public:
  11. WriteUnpreparedTransactionTestBase(bool use_stackable_db,
  12. bool two_write_queue,
  13. TxnDBWritePolicy write_policy,
  14. bool use_per_key_point_lock_mgr,
  15. int64_t deadlock_timeout_us)
  16. : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
  17. kOrderedWrite, use_per_key_point_lock_mgr,
  18. deadlock_timeout_us) {}
  19. };
  20. class WriteUnpreparedTransactionTest
  21. : public WriteUnpreparedTransactionTestBase,
  22. virtual public ::testing::WithParamInterface<
  23. std::tuple<bool, bool, TxnDBWritePolicy, bool, int64_t>> {
  24. public:
  25. WriteUnpreparedTransactionTest()
  26. : WriteUnpreparedTransactionTestBase(
  27. std::get<0>(GetParam()), std::get<1>(GetParam()),
  28. std::get<2>(GetParam()), std::get<3>(GetParam()),
  29. std::get<4>(GetParam())) {}
  30. };
  31. INSTANTIATE_TEST_CASE_P(
  32. WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
  33. ::testing::Combine(::testing::Values(false), ::testing::Bool(),
  34. ::testing::Values(WRITE_UNPREPARED), ::testing::Bool(),
  35. ::testing::Values(0, 1000)));
  36. enum SnapshotAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
  37. enum VerificationOperation { VERIFY_GET, VERIFY_NEXT, VERIFY_PREV };
  38. class WriteUnpreparedSnapshotTest
  39. : public WriteUnpreparedTransactionTestBase,
  40. virtual public ::testing::WithParamInterface<std::tuple<
  41. bool, SnapshotAction, VerificationOperation, bool, int64_t>> {
  42. public:
  43. WriteUnpreparedSnapshotTest()
  44. : WriteUnpreparedTransactionTestBase(
  45. false, std::get<0>(GetParam()), WRITE_UNPREPARED,
  46. std::get<3>(GetParam()), std::get<4>(GetParam())),
  47. action_(std::get<1>(GetParam())),
  48. verify_op_(std::get<2>(GetParam())) {}
  49. SnapshotAction action_;
  50. VerificationOperation verify_op_;
  51. };
  52. // Test parameters:
  53. // Param 0): use stackable db, parameterization hard coded to be overwritten to
  54. // false. Param 1): test mode for snapshot action Param 2): test mode for
  55. // verification operation
  56. INSTANTIATE_TEST_CASE_P(
  57. WriteUnpreparedSnapshotTest, WriteUnpreparedSnapshotTest,
  58. ::testing::Combine(::testing::Bool(),
  59. ::testing::Values(NO_SNAPSHOT, RO_SNAPSHOT,
  60. REFRESH_SNAPSHOT),
  61. ::testing::Values(VERIFY_GET, VERIFY_NEXT, VERIFY_PREV),
  62. ::testing::Bool(), ::testing::Values(0, 1000)));
  63. TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
  64. // The following tests checks whether reading your own write for
  65. // a transaction works for write unprepared, when there are uncommitted
  66. // values written into DB.
  67. auto verify_state = [](Iterator* iter, const std::string& key,
  68. const std::string& value) {
  69. ASSERT_TRUE(iter->Valid());
  70. ASSERT_OK(iter->status());
  71. ASSERT_EQ(key, iter->key().ToString());
  72. ASSERT_EQ(value, iter->value().ToString());
  73. };
  74. // Test always reseeking vs never reseeking.
  75. for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
  76. options.max_sequential_skip_in_iterations = max_skip;
  77. options.disable_auto_compactions = true;
  78. ASSERT_OK(ReOpen());
  79. TransactionOptions txn_options;
  80. WriteOptions woptions;
  81. ReadOptions roptions;
  82. ASSERT_OK(db->Put(woptions, "a", ""));
  83. ASSERT_OK(db->Put(woptions, "b", ""));
  84. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  85. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  86. txn->SetSnapshot();
  87. for (int i = 0; i < 5; i++) {
  88. std::string stored_value = "v" + std::to_string(i);
  89. ASSERT_OK(txn->Put("a", stored_value));
  90. ASSERT_OK(txn->Put("b", stored_value));
  91. ASSERT_OK(wup_txn->FlushWriteBatchToDB(false));
  92. // Test Get()
  93. std::string value;
  94. ASSERT_OK(txn->Get(roptions, "a", &value));
  95. ASSERT_EQ(value, stored_value);
  96. ASSERT_OK(txn->Get(roptions, "b", &value));
  97. ASSERT_EQ(value, stored_value);
  98. // Test Next()
  99. auto iter = txn->GetIterator(roptions);
  100. iter->Seek("a");
  101. verify_state(iter, "a", stored_value);
  102. iter->Next();
  103. verify_state(iter, "b", stored_value);
  104. iter->SeekToFirst();
  105. verify_state(iter, "a", stored_value);
  106. iter->Next();
  107. verify_state(iter, "b", stored_value);
  108. delete iter;
  109. // Test Prev()
  110. iter = txn->GetIterator(roptions);
  111. iter->SeekForPrev("b");
  112. verify_state(iter, "b", stored_value);
  113. iter->Prev();
  114. verify_state(iter, "a", stored_value);
  115. iter->SeekToLast();
  116. verify_state(iter, "b", stored_value);
  117. iter->Prev();
  118. verify_state(iter, "a", stored_value);
  119. delete iter;
  120. }
  121. delete txn;
  122. }
  123. }
  124. TEST_P(WriteUnpreparedSnapshotTest, ReadYourOwnWrite) {
  125. // This test validates a transaction can read its writes and the correctness
  126. // of its read with regard to a mocked snapshot functionality.
  127. const uint32_t kNumIter = 1000;
  128. const uint32_t kNumKeys = 5;
  129. // Test with
  130. // 1. no snapshots set
  131. // 2. snapshot set on ReadOptions
  132. // 3. snapshot set, and refreshing after every write.
  133. SnapshotAction snapshot_action = action_;
  134. WriteOptions write_options;
  135. txn_db_options.transaction_lock_timeout = -1;
  136. options.disable_auto_compactions = true;
  137. ASSERT_OK(ReOpen());
  138. std::vector<std::string> keys;
  139. for (uint32_t k = 0; k < kNumKeys; k++) {
  140. keys.push_back("k" + std::to_string(k));
  141. }
  142. // This counter will act as a "sequence number" to help us validate
  143. // visibility logic with snapshots. If we had direct access to the seqno of
  144. // snapshots and key/values, then we should directly compare those instead.
  145. std::atomic<int64_t> counter(0);
  146. std::function<void()> check_correctness_wrt_snapshot = [&]() {
  147. Transaction* txn;
  148. TransactionOptions txn_options;
  149. // batch_size of 1 causes writes to DB for every marker.
  150. txn_options.write_batch_flush_threshold = 1;
  151. ReadOptions read_options;
  152. for (uint32_t i = 0; i < kNumIter; i++) {
  153. txn = db->BeginTransaction(write_options, txn_options);
  154. txn->SetSnapshot();
  155. if (snapshot_action >= RO_SNAPSHOT) {
  156. read_options.snapshot = txn->GetSnapshot();
  157. ASSERT_TRUE(read_options.snapshot != nullptr);
  158. }
  159. uint64_t buf[1];
  160. // When scanning through the database, make sure that all unprepared
  161. // keys have value >= snapshot.
  162. int64_t snapshot_num = counter.fetch_add(1);
  163. Status s;
  164. for (const auto& key : keys) {
  165. buf[0] = counter.fetch_add(1);
  166. s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
  167. if (!s.ok()) {
  168. break;
  169. }
  170. if (snapshot_action == REFRESH_SNAPSHOT) {
  171. txn->SetSnapshot();
  172. read_options.snapshot = txn->GetSnapshot();
  173. snapshot_num = counter.fetch_add(1);
  174. }
  175. }
  176. ASSERT_OK(s);
  177. auto verify_key = [&snapshot_action,
  178. &snapshot_num](const std::string& value) {
  179. ASSERT_EQ(value.size(), 8);
  180. if (snapshot_action == REFRESH_SNAPSHOT) {
  181. // If refresh snapshot is true, then the snapshot is refreshed
  182. // after every Put(), meaning that the current snapshot in
  183. // snapshot_num must be greater than the "seqno" of any keys
  184. // written by the current transaction.
  185. ASSERT_LT(((int64_t*)value.c_str())[0], snapshot_num);
  186. } else {
  187. // If refresh snapshot is not on, then the snapshot was taken at
  188. // the beginning of the transaction, meaning all writes must come
  189. // after snapshot_num
  190. ASSERT_GT(((int64_t*)value.c_str())[0], snapshot_num);
  191. }
  192. };
  193. // Validate one of Get()/Next()/Prev() depending on the verification
  194. // operation to use.
  195. switch (verify_op_) {
  196. case VERIFY_GET: // Validate Get()
  197. {
  198. for (const auto& key : keys) {
  199. std::string value;
  200. ASSERT_OK(txn->Get(read_options, Slice(key), &value));
  201. verify_key(value);
  202. }
  203. break;
  204. }
  205. case VERIFY_NEXT: // Validate Next()
  206. {
  207. Iterator* iter = txn->GetIterator(read_options);
  208. ASSERT_OK(iter->status());
  209. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  210. verify_key(iter->value().ToString());
  211. }
  212. ASSERT_OK(iter->status());
  213. delete iter;
  214. break;
  215. }
  216. case VERIFY_PREV: // Validate Prev()
  217. {
  218. Iterator* iter = txn->GetIterator(read_options);
  219. ASSERT_OK(iter->status());
  220. for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
  221. verify_key(iter->value().ToString());
  222. }
  223. ASSERT_OK(iter->status());
  224. delete iter;
  225. break;
  226. }
  227. default:
  228. FAIL();
  229. }
  230. ASSERT_OK(txn->Commit());
  231. delete txn;
  232. }
  233. };
  234. check_correctness_wrt_snapshot();
  235. }
  236. // This tests how write unprepared behaves during recovery when the DB crashes
  237. // after a transaction has either been unprepared or prepared, and tests if
  238. // the changes are correctly applied for prepared transactions if we decide to
  239. // rollback/commit.
  240. TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
  241. WriteOptions write_options;
  242. write_options.disableWAL = false;
  243. TransactionOptions txn_options;
  244. std::vector<Transaction*> prepared_trans;
  245. WriteUnpreparedTxnDB* wup_db;
  246. options.disable_auto_compactions = true;
  247. enum Action { UNPREPARED, ROLLBACK, COMMIT };
  248. // batch_size of 1 causes writes to DB for every marker.
  249. for (size_t batch_size : {1, 1000000}) {
  250. txn_options.write_batch_flush_threshold = batch_size;
  251. for (bool empty : {true, false}) {
  252. for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
  253. for (int num_batches = 1; num_batches < 10; num_batches++) {
  254. // Reset database.
  255. prepared_trans.clear();
  256. ASSERT_OK(ReOpen());
  257. wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
  258. if (!empty) {
  259. for (int i = 0; i < num_batches; i++) {
  260. ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i),
  261. "before value" + std::to_string(i)));
  262. }
  263. }
  264. // Write num_batches unprepared batches.
  265. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  266. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  267. ASSERT_OK(txn->SetName("xid"));
  268. for (int i = 0; i < num_batches; i++) {
  269. ASSERT_OK(
  270. txn->Put("k" + std::to_string(i), "value" + std::to_string(i)));
  271. if (txn_options.write_batch_flush_threshold == 1) {
  272. // WriteUnprepared will check write_batch_flush_threshold and
  273. // possibly flush before appending to the write batch. No flush
  274. // will happen at the first write because the batch is still
  275. // empty, so after k puts, there should be k-1 flushed batches.
  276. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
  277. } else {
  278. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
  279. }
  280. }
  281. if (a == UNPREPARED) {
  282. // This is done to prevent the destructor from rolling back the
  283. // transaction for us, since we want to pretend we crashed and
  284. // test that recovery does the rollback.
  285. wup_txn->unprep_seqs_.clear();
  286. } else {
  287. ASSERT_OK(txn->Prepare());
  288. }
  289. delete txn;
  290. // Crash and run recovery code paths.
  291. ASSERT_OK(wup_db->db_impl_->FlushWAL(true));
  292. wup_db->TEST_Crash();
  293. ASSERT_OK(ReOpenNoDelete());
  294. assert(db != nullptr);
  295. db->GetAllPreparedTransactions(&prepared_trans);
  296. ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
  297. if (a == ROLLBACK) {
  298. ASSERT_OK(prepared_trans[0]->Rollback());
  299. delete prepared_trans[0];
  300. } else if (a == COMMIT) {
  301. ASSERT_OK(prepared_trans[0]->Commit());
  302. delete prepared_trans[0];
  303. }
  304. Iterator* iter = db->NewIterator(ReadOptions());
  305. ASSERT_OK(iter->status());
  306. iter->SeekToFirst();
  307. // Check that DB has before values.
  308. if (!empty || a == COMMIT) {
  309. for (int i = 0; i < num_batches; i++) {
  310. ASSERT_TRUE(iter->Valid());
  311. ASSERT_EQ(iter->key().ToString(), "k" + std::to_string(i));
  312. if (a == COMMIT) {
  313. ASSERT_EQ(iter->value().ToString(),
  314. "value" + std::to_string(i));
  315. } else {
  316. ASSERT_EQ(iter->value().ToString(),
  317. "before value" + std::to_string(i));
  318. }
  319. iter->Next();
  320. }
  321. }
  322. ASSERT_FALSE(iter->Valid());
  323. ASSERT_OK(iter->status());
  324. delete iter;
  325. }
  326. }
  327. }
  328. }
  329. }
  330. // Basic test to see that unprepared batch gets written to DB when batch size
  331. // is exceeded. It also does some basic checks to see if commit/rollback works
  332. // as expected for write unprepared.
  333. TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
  334. WriteOptions write_options;
  335. TransactionOptions txn_options;
  336. const int kNumKeys = 10;
  337. // batch_size of 1 causes writes to DB for every marker.
  338. for (size_t batch_size : {1, 1000000}) {
  339. txn_options.write_batch_flush_threshold = batch_size;
  340. for (bool prepare : {false, true}) {
  341. for (bool commit : {false, true}) {
  342. ASSERT_OK(ReOpen());
  343. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  344. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  345. ASSERT_OK(txn->SetName("xid"));
  346. for (int i = 0; i < kNumKeys; i++) {
  347. ASSERT_OK(txn->Put("k" + std::to_string(i), "v" + std::to_string(i)));
  348. if (txn_options.write_batch_flush_threshold == 1) {
  349. // WriteUnprepared will check write_batch_flush_threshold and
  350. // possibly flush before appending to the write batch. No flush will
  351. // happen at the first write because the batch is still empty, so
  352. // after k puts, there should be k-1 flushed batches.
  353. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
  354. } else {
  355. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
  356. }
  357. }
  358. if (prepare) {
  359. ASSERT_OK(txn->Prepare());
  360. }
  361. Iterator* iter = db->NewIterator(ReadOptions());
  362. ASSERT_OK(iter->status());
  363. iter->SeekToFirst();
  364. assert(!iter->Valid());
  365. ASSERT_FALSE(iter->Valid());
  366. ASSERT_OK(iter->status());
  367. delete iter;
  368. if (commit) {
  369. ASSERT_OK(txn->Commit());
  370. } else {
  371. ASSERT_OK(txn->Rollback());
  372. }
  373. delete txn;
  374. iter = db->NewIterator(ReadOptions());
  375. ASSERT_OK(iter->status());
  376. iter->SeekToFirst();
  377. for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
  378. ASSERT_TRUE(iter->Valid());
  379. ASSERT_EQ(iter->key().ToString(), "k" + std::to_string(i));
  380. ASSERT_EQ(iter->value().ToString(), "v" + std::to_string(i));
  381. iter->Next();
  382. }
  383. ASSERT_FALSE(iter->Valid());
  384. ASSERT_OK(iter->status());
  385. delete iter;
  386. }
  387. }
  388. }
  389. }
  390. // Test whether logs containing unprepared/prepared batches are kept even
  391. // after memtable finishes flushing, and whether they are removed when
  392. // transaction commits/aborts.
  393. //
  394. // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
  395. TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
  396. WriteOptions write_options;
  397. TransactionOptions txn_options;
  398. // batch_size of 1 causes writes to DB for every marker.
  399. txn_options.write_batch_flush_threshold = 1;
  400. const int kNumKeys = 10;
  401. WriteOptions wopts;
  402. wopts.sync = true;
  403. for (bool prepare : {false, true}) {
  404. for (bool commit : {false, true}) {
  405. ASSERT_OK(ReOpen());
  406. auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
  407. auto db_impl = wup_db->db_impl_;
  408. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  409. ASSERT_OK(txn1->SetName("xid1"));
  410. Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
  411. ASSERT_OK(txn2->SetName("xid2"));
  412. // Spread this transaction across multiple log files.
  413. for (int i = 0; i < kNumKeys; i++) {
  414. ASSERT_OK(txn1->Put("k1" + std::to_string(i), "v" + std::to_string(i)));
  415. if (i >= kNumKeys / 2) {
  416. ASSERT_OK(
  417. txn2->Put("k2" + std::to_string(i), "v" + std::to_string(i)));
  418. }
  419. if (i > 0) {
  420. ASSERT_OK(db_impl->TEST_SwitchWAL());
  421. }
  422. }
  423. ASSERT_GT(txn1->GetLogNumber(), 0);
  424. ASSERT_GT(txn2->GetLogNumber(), 0);
  425. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  426. txn1->GetLogNumber());
  427. ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
  428. if (prepare) {
  429. ASSERT_OK(txn1->Prepare());
  430. ASSERT_OK(txn2->Prepare());
  431. }
  432. ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
  433. ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
  434. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  435. txn1->GetLogNumber());
  436. if (commit) {
  437. ASSERT_OK(txn1->Commit());
  438. } else {
  439. ASSERT_OK(txn1->Rollback());
  440. }
  441. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  442. txn2->GetLogNumber());
  443. if (commit) {
  444. ASSERT_OK(txn2->Commit());
  445. } else {
  446. ASSERT_OK(txn2->Rollback());
  447. }
  448. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
  449. delete txn1;
  450. delete txn2;
  451. }
  452. }
  453. }
  454. TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
  455. WriteOptions woptions;
  456. TransactionOptions txn_options;
  457. txn_options.write_batch_flush_threshold = 1;
  458. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  459. // Do some writes with no snapshot
  460. ASSERT_OK(txn->Put("a", "a"));
  461. ASSERT_OK(txn->Put("b", "b"));
  462. ASSERT_OK(txn->Put("c", "c"));
  463. // Test that it is still possible to create iterators after writes with no
  464. // snapshot, if iterator snapshot is fresh enough.
  465. ReadOptions roptions;
  466. auto iter = txn->GetIterator(roptions);
  467. ASSERT_OK(iter->status());
  468. int keys = 0;
  469. for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
  470. ASSERT_OK(iter->status());
  471. ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
  472. }
  473. ASSERT_EQ(keys, 3);
  474. ASSERT_OK(iter->status());
  475. delete iter;
  476. delete txn;
  477. }
  478. // Test whether write to a transaction while iterating is supported.
  479. TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
  480. WriteOptions woptions;
  481. TransactionOptions txn_options;
  482. txn_options.write_batch_flush_threshold = 1;
  483. enum Action { DO_DELETE, DO_UPDATE };
  484. for (Action a : {DO_DELETE, DO_UPDATE}) {
  485. for (int i = 0; i < 100; i++) {
  486. ASSERT_OK(db->Put(woptions, std::to_string(i), std::to_string(i)));
  487. }
  488. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  489. // write_batch_ now contains 1 key.
  490. ASSERT_OK(txn->Put("9", "a"));
  491. ReadOptions roptions;
  492. auto iter = txn->GetIterator(roptions);
  493. ASSERT_OK(iter->status());
  494. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  495. ASSERT_OK(iter->status());
  496. if (iter->key() == "9") {
  497. ASSERT_EQ(iter->value().ToString(), "a");
  498. } else {
  499. ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
  500. }
  501. if (a == DO_DELETE) {
  502. ASSERT_OK(txn->Delete(iter->key()));
  503. } else {
  504. ASSERT_OK(txn->Put(iter->key(), "b"));
  505. }
  506. }
  507. ASSERT_OK(iter->status());
  508. delete iter;
  509. ASSERT_OK(txn->Commit());
  510. iter = db->NewIterator(roptions);
  511. ASSERT_OK(iter->status());
  512. if (a == DO_DELETE) {
  513. // Check that db is empty.
  514. iter->SeekToFirst();
  515. ASSERT_FALSE(iter->Valid());
  516. } else {
  517. int keys = 0;
  518. // Check that all values are updated to b.
  519. for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
  520. ASSERT_OK(iter->status());
  521. ASSERT_EQ(iter->value().ToString(), "b");
  522. }
  523. ASSERT_EQ(keys, 100);
  524. }
  525. ASSERT_OK(iter->status());
  526. delete iter;
  527. delete txn;
  528. }
  529. }
  530. // Test that using an iterator after transaction clear is not supported
  531. TEST_P(WriteUnpreparedTransactionTest, IterateAfterClear) {
  532. WriteOptions woptions;
  533. TransactionOptions txn_options;
  534. txn_options.write_batch_flush_threshold = 1;
  535. enum Action { kCommit, kRollback };
  536. for (Action a : {kCommit, kRollback}) {
  537. for (int i = 0; i < 100; i++) {
  538. ASSERT_OK(db->Put(woptions, std::to_string(i), std::to_string(i)));
  539. }
  540. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  541. ASSERT_OK(txn->Put("9", "a"));
  542. ReadOptions roptions;
  543. auto iter1 = txn->GetIterator(roptions);
  544. auto iter2 = txn->GetIterator(roptions);
  545. iter1->SeekToFirst();
  546. iter2->Seek("9");
  547. // Check that iterators are valid before transaction finishes.
  548. ASSERT_TRUE(iter1->Valid());
  549. ASSERT_TRUE(iter2->Valid());
  550. ASSERT_OK(iter1->status());
  551. ASSERT_OK(iter2->status());
  552. if (a == kCommit) {
  553. ASSERT_OK(txn->Commit());
  554. } else {
  555. ASSERT_OK(txn->Rollback());
  556. }
  557. // Check that iterators are invalidated after transaction finishes.
  558. ASSERT_FALSE(iter1->Valid());
  559. ASSERT_FALSE(iter2->Valid());
  560. ASSERT_TRUE(iter1->status().IsInvalidArgument());
  561. ASSERT_TRUE(iter2->status().IsInvalidArgument());
  562. delete iter1;
  563. delete iter2;
  564. delete txn;
  565. }
  566. }
  567. TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
  568. WriteOptions woptions;
  569. TransactionOptions txn_options;
  570. txn_options.write_batch_flush_threshold = 1;
  571. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  572. txn->SetSavePoint();
  573. ASSERT_OK(txn->Put("a", "a"));
  574. ASSERT_OK(txn->Put("b", "b"));
  575. ASSERT_OK(txn->Commit());
  576. ReadOptions roptions;
  577. std::string value;
  578. ASSERT_OK(txn->Get(roptions, "a", &value));
  579. ASSERT_EQ(value, "a");
  580. ASSERT_OK(txn->Get(roptions, "b", &value));
  581. ASSERT_EQ(value, "b");
  582. delete txn;
  583. }
  584. TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
  585. WriteOptions woptions;
  586. TransactionOptions txn_options;
  587. txn_options.write_batch_flush_threshold = 1;
  588. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  589. auto wb = txn->GetWriteBatch()->GetWriteBatch();
  590. ASSERT_OK(txn->Put("a", "a"));
  591. ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
  592. txn->SetSavePoint();
  593. ASSERT_OK(txn->Put("b", "b"));
  594. ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
  595. ReadOptions roptions;
  596. std::string value;
  597. ASSERT_OK(txn->Get(roptions, "a", &value));
  598. ASSERT_EQ(value, "a");
  599. ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
  600. ASSERT_EQ(value, "a_untrack");
  601. ASSERT_OK(txn->Get(roptions, "b", &value));
  602. ASSERT_EQ(value, "b");
  603. ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
  604. ASSERT_EQ(value, "b_untrack");
  605. // b and b_untrack should be rolled back.
  606. ASSERT_OK(txn->RollbackToSavePoint());
  607. ASSERT_OK(txn->Get(roptions, "a", &value));
  608. ASSERT_EQ(value, "a");
  609. ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
  610. ASSERT_EQ(value, "a_untrack");
  611. auto s = txn->Get(roptions, "b", &value);
  612. ASSERT_TRUE(s.IsNotFound());
  613. s = txn->Get(roptions, "b_untrack", &value);
  614. ASSERT_TRUE(s.IsNotFound());
  615. // Everything should be rolled back.
  616. ASSERT_OK(txn->Rollback());
  617. s = txn->Get(roptions, "a", &value);
  618. ASSERT_TRUE(s.IsNotFound());
  619. s = txn->Get(roptions, "a_untrack", &value);
  620. ASSERT_TRUE(s.IsNotFound());
  621. s = txn->Get(roptions, "b", &value);
  622. ASSERT_TRUE(s.IsNotFound());
  623. s = txn->Get(roptions, "b_untrack", &value);
  624. ASSERT_TRUE(s.IsNotFound());
  625. delete txn;
  626. }
  627. } // namespace ROCKSDB_NAMESPACE
  628. int main(int argc, char** argv) {
  629. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  630. ::testing::InitGoogleTest(&argc, argv);
  631. return RUN_ALL_TESTS();
  632. }