write_unprepared_transaction_test.cc 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/transaction_test.h"
  7. #include "utilities/transactions/write_unprepared_txn.h"
  8. #include "utilities/transactions/write_unprepared_txn_db.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
  11. public:
  12. WriteUnpreparedTransactionTestBase(bool use_stackable_db,
  13. bool two_write_queue,
  14. TxnDBWritePolicy write_policy)
  15. : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
  16. kOrderedWrite) {}
  17. };
  18. class WriteUnpreparedTransactionTest
  19. : public WriteUnpreparedTransactionTestBase,
  20. virtual public ::testing::WithParamInterface<
  21. std::tuple<bool, bool, TxnDBWritePolicy>> {
  22. public:
  23. WriteUnpreparedTransactionTest()
  24. : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
  25. std::get<1>(GetParam()),
  26. std::get<2>(GetParam())){}
  27. };
  28. INSTANTIATE_TEST_CASE_P(
  29. WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
  30. ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
  31. std::make_tuple(false, true, WRITE_UNPREPARED)));
  32. enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
  33. class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase,
  34. virtual public ::testing::WithParamInterface<
  35. std::tuple<bool, StressAction>> {
  36. public:
  37. WriteUnpreparedStressTest()
  38. : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()),
  39. WRITE_UNPREPARED),
  40. action_(std::get<1>(GetParam())) {}
  41. StressAction action_;
  42. };
  43. INSTANTIATE_TEST_CASE_P(
  44. WriteUnpreparedStressTest, WriteUnpreparedStressTest,
  45. ::testing::Values(std::make_tuple(false, NO_SNAPSHOT),
  46. std::make_tuple(false, RO_SNAPSHOT),
  47. std::make_tuple(false, REFRESH_SNAPSHOT),
  48. std::make_tuple(true, NO_SNAPSHOT),
  49. std::make_tuple(true, RO_SNAPSHOT),
  50. std::make_tuple(true, REFRESH_SNAPSHOT)));
  51. TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
  52. // The following tests checks whether reading your own write for
  53. // a transaction works for write unprepared, when there are uncommitted
  54. // values written into DB.
  55. auto verify_state = [](Iterator* iter, const std::string& key,
  56. const std::string& value) {
  57. ASSERT_TRUE(iter->Valid());
  58. ASSERT_OK(iter->status());
  59. ASSERT_EQ(key, iter->key().ToString());
  60. ASSERT_EQ(value, iter->value().ToString());
  61. };
  62. // Test always reseeking vs never reseeking.
  63. for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
  64. options.max_sequential_skip_in_iterations = max_skip;
  65. options.disable_auto_compactions = true;
  66. ReOpen();
  67. TransactionOptions txn_options;
  68. WriteOptions woptions;
  69. ReadOptions roptions;
  70. ASSERT_OK(db->Put(woptions, "a", ""));
  71. ASSERT_OK(db->Put(woptions, "b", ""));
  72. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  73. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  74. txn->SetSnapshot();
  75. for (int i = 0; i < 5; i++) {
  76. std::string stored_value = "v" + ToString(i);
  77. ASSERT_OK(txn->Put("a", stored_value));
  78. ASSERT_OK(txn->Put("b", stored_value));
  79. wup_txn->FlushWriteBatchToDB(false);
  80. // Test Get()
  81. std::string value;
  82. ASSERT_OK(txn->Get(roptions, "a", &value));
  83. ASSERT_EQ(value, stored_value);
  84. ASSERT_OK(txn->Get(roptions, "b", &value));
  85. ASSERT_EQ(value, stored_value);
  86. // Test Next()
  87. auto iter = txn->GetIterator(roptions);
  88. iter->Seek("a");
  89. verify_state(iter, "a", stored_value);
  90. iter->Next();
  91. verify_state(iter, "b", stored_value);
  92. iter->SeekToFirst();
  93. verify_state(iter, "a", stored_value);
  94. iter->Next();
  95. verify_state(iter, "b", stored_value);
  96. delete iter;
  97. // Test Prev()
  98. iter = txn->GetIterator(roptions);
  99. iter->SeekForPrev("b");
  100. verify_state(iter, "b", stored_value);
  101. iter->Prev();
  102. verify_state(iter, "a", stored_value);
  103. iter->SeekToLast();
  104. verify_state(iter, "b", stored_value);
  105. iter->Prev();
  106. verify_state(iter, "a", stored_value);
  107. delete iter;
  108. }
  109. delete txn;
  110. }
  111. }
  112. #ifndef ROCKSDB_VALGRIND_RUN
  113. TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
  114. // This is a stress test where different threads are writing random keys, and
  115. // then before committing or aborting the transaction, it validates to see
  116. // that it can read the keys it wrote, and the keys it did not write respect
  117. // the snapshot. To avoid row lock contention (and simply stressing the
  118. // locking system), each thread is mostly only writing to its own set of keys.
  119. const uint32_t kNumIter = 1000;
  120. const uint32_t kNumThreads = 10;
  121. const uint32_t kNumKeys = 5;
  122. std::default_random_engine rand(static_cast<uint32_t>(
  123. std::hash<std::thread::id>()(std::this_thread::get_id())));
  124. // Test with
  125. // 1. no snapshots set
  126. // 2. snapshot set on ReadOptions
  127. // 3. snapshot set, and refreshing after every write.
  128. StressAction a = action_;
  129. WriteOptions write_options;
  130. txn_db_options.transaction_lock_timeout = -1;
  131. options.disable_auto_compactions = true;
  132. ReOpen();
  133. std::vector<std::string> keys;
  134. for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
  135. keys.push_back("k" + ToString(k));
  136. }
  137. std::shuffle(keys.begin(), keys.end(), rand);
  138. // This counter will act as a "sequence number" to help us validate
  139. // visibility logic with snapshots. If we had direct access to the seqno of
  140. // snapshots and key/values, then we should directly compare those instead.
  141. std::atomic<int64_t> counter(0);
  142. std::function<void(uint32_t)> stress_thread = [&](int id) {
  143. size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
  144. Random64 rnd(static_cast<uint32_t>(tid));
  145. Transaction* txn;
  146. TransactionOptions txn_options;
  147. // batch_size of 1 causes writes to DB for every marker.
  148. txn_options.write_batch_flush_threshold = 1;
  149. ReadOptions read_options;
  150. for (uint32_t i = 0; i < kNumIter; i++) {
  151. std::set<std::string> owned_keys(&keys[id * kNumKeys],
  152. &keys[(id + 1) * kNumKeys]);
  153. // Add unowned keys to make the workload more interesting, but this
  154. // increases row lock contention, so just do it sometimes.
  155. if (rnd.OneIn(2)) {
  156. owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
  157. }
  158. txn = db->BeginTransaction(write_options, txn_options);
  159. txn->SetName(ToString(id));
  160. txn->SetSnapshot();
  161. if (a >= RO_SNAPSHOT) {
  162. read_options.snapshot = txn->GetSnapshot();
  163. ASSERT_TRUE(read_options.snapshot != nullptr);
  164. }
  165. uint64_t buf[2];
  166. buf[0] = id;
  167. // When scanning through the database, make sure that all unprepared
  168. // keys have value >= snapshot and all other keys have value < snapshot.
  169. int64_t snapshot_num = counter.fetch_add(1);
  170. Status s;
  171. for (const auto& key : owned_keys) {
  172. buf[1] = counter.fetch_add(1);
  173. s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
  174. if (!s.ok()) {
  175. break;
  176. }
  177. if (a == REFRESH_SNAPSHOT) {
  178. txn->SetSnapshot();
  179. read_options.snapshot = txn->GetSnapshot();
  180. snapshot_num = counter.fetch_add(1);
  181. }
  182. }
  183. // Failure is possible due to snapshot validation. In this case,
  184. // rollback and move onto next iteration.
  185. if (!s.ok()) {
  186. ASSERT_TRUE(s.IsBusy());
  187. ASSERT_OK(txn->Rollback());
  188. delete txn;
  189. continue;
  190. }
  191. auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
  192. const std::string& key, const std::string& value) {
  193. if (owned_keys.count(key) > 0) {
  194. ASSERT_EQ(value.size(), 16);
  195. // Since this key is part of owned_keys, then this key must be
  196. // unprepared by this transaction identified by 'id'
  197. ASSERT_EQ(((int64_t*)value.c_str())[0], id);
  198. if (a == REFRESH_SNAPSHOT) {
  199. // If refresh snapshot is true, then the snapshot is refreshed
  200. // after every Put(), meaning that the current snapshot in
  201. // snapshot_num must be greater than the "seqno" of any keys
  202. // written by the current transaction.
  203. ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
  204. } else {
  205. // If refresh snapshot is not on, then the snapshot was taken at
  206. // the beginning of the transaction, meaning all writes must come
  207. // after snapshot_num
  208. ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
  209. }
  210. } else if (a >= RO_SNAPSHOT) {
  211. // If this is not an unprepared key, just assert that the key
  212. // "seqno" is smaller than the snapshot seqno.
  213. ASSERT_EQ(value.size(), 16);
  214. ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
  215. }
  216. };
  217. // Validate Get()/Next()/Prev(). Do only one of them to save time, and
  218. // reduce lock contention.
  219. switch (rnd.Uniform(3)) {
  220. case 0: // Validate Get()
  221. {
  222. for (const auto& key : keys) {
  223. std::string value;
  224. s = txn->Get(read_options, Slice(key), &value);
  225. if (!s.ok()) {
  226. ASSERT_TRUE(s.IsNotFound());
  227. ASSERT_EQ(owned_keys.count(key), 0);
  228. } else {
  229. verify_key(key, value);
  230. }
  231. }
  232. break;
  233. }
  234. case 1: // Validate Next()
  235. {
  236. Iterator* iter = txn->GetIterator(read_options);
  237. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  238. verify_key(iter->key().ToString(), iter->value().ToString());
  239. }
  240. delete iter;
  241. break;
  242. }
  243. case 2: // Validate Prev()
  244. {
  245. Iterator* iter = txn->GetIterator(read_options);
  246. for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
  247. verify_key(iter->key().ToString(), iter->value().ToString());
  248. }
  249. delete iter;
  250. break;
  251. }
  252. default:
  253. ASSERT_TRUE(false);
  254. }
  255. if (rnd.OneIn(2)) {
  256. ASSERT_OK(txn->Commit());
  257. } else {
  258. ASSERT_OK(txn->Rollback());
  259. }
  260. delete txn;
  261. }
  262. };
  263. std::vector<port::Thread> threads;
  264. for (uint32_t i = 0; i < kNumThreads; i++) {
  265. threads.emplace_back(stress_thread, i);
  266. }
  267. for (auto& t : threads) {
  268. t.join();
  269. }
  270. }
  271. #endif // ROCKSDB_VALGRIND_RUN
  272. // This tests how write unprepared behaves during recovery when the DB crashes
  273. // after a transaction has either been unprepared or prepared, and tests if
  274. // the changes are correctly applied for prepared transactions if we decide to
  275. // rollback/commit.
  276. TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
  277. WriteOptions write_options;
  278. write_options.disableWAL = false;
  279. TransactionOptions txn_options;
  280. std::vector<Transaction*> prepared_trans;
  281. WriteUnpreparedTxnDB* wup_db;
  282. options.disable_auto_compactions = true;
  283. enum Action { UNPREPARED, ROLLBACK, COMMIT };
  284. // batch_size of 1 causes writes to DB for every marker.
  285. for (size_t batch_size : {1, 1000000}) {
  286. txn_options.write_batch_flush_threshold = batch_size;
  287. for (bool empty : {true, false}) {
  288. for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
  289. for (int num_batches = 1; num_batches < 10; num_batches++) {
  290. // Reset database.
  291. prepared_trans.clear();
  292. ReOpen();
  293. wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
  294. if (!empty) {
  295. for (int i = 0; i < num_batches; i++) {
  296. ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
  297. "before value" + ToString(i)));
  298. }
  299. }
  300. // Write num_batches unprepared batches.
  301. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  302. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  303. txn->SetName("xid");
  304. for (int i = 0; i < num_batches; i++) {
  305. ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
  306. if (txn_options.write_batch_flush_threshold == 1) {
  307. // WriteUnprepared will check write_batch_flush_threshold and
  308. // possibly flush before appending to the write batch. No flush
  309. // will happen at the first write because the batch is still
  310. // empty, so after k puts, there should be k-1 flushed batches.
  311. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
  312. } else {
  313. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
  314. }
  315. }
  316. if (a == UNPREPARED) {
  317. // This is done to prevent the destructor from rolling back the
  318. // transaction for us, since we want to pretend we crashed and
  319. // test that recovery does the rollback.
  320. wup_txn->unprep_seqs_.clear();
  321. } else {
  322. txn->Prepare();
  323. }
  324. delete txn;
  325. // Crash and run recovery code paths.
  326. wup_db->db_impl_->FlushWAL(true);
  327. wup_db->TEST_Crash();
  328. ReOpenNoDelete();
  329. assert(db != nullptr);
  330. db->GetAllPreparedTransactions(&prepared_trans);
  331. ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
  332. if (a == ROLLBACK) {
  333. ASSERT_OK(prepared_trans[0]->Rollback());
  334. delete prepared_trans[0];
  335. } else if (a == COMMIT) {
  336. ASSERT_OK(prepared_trans[0]->Commit());
  337. delete prepared_trans[0];
  338. }
  339. Iterator* iter = db->NewIterator(ReadOptions());
  340. iter->SeekToFirst();
  341. // Check that DB has before values.
  342. if (!empty || a == COMMIT) {
  343. for (int i = 0; i < num_batches; i++) {
  344. ASSERT_TRUE(iter->Valid());
  345. ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
  346. if (a == COMMIT) {
  347. ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
  348. } else {
  349. ASSERT_EQ(iter->value().ToString(),
  350. "before value" + ToString(i));
  351. }
  352. iter->Next();
  353. }
  354. }
  355. ASSERT_FALSE(iter->Valid());
  356. delete iter;
  357. }
  358. }
  359. }
  360. }
  361. }
  362. // Basic test to see that unprepared batch gets written to DB when batch size
  363. // is exceeded. It also does some basic checks to see if commit/rollback works
  364. // as expected for write unprepared.
  365. TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
  366. WriteOptions write_options;
  367. TransactionOptions txn_options;
  368. const int kNumKeys = 10;
  369. // batch_size of 1 causes writes to DB for every marker.
  370. for (size_t batch_size : {1, 1000000}) {
  371. txn_options.write_batch_flush_threshold = batch_size;
  372. for (bool prepare : {false, true}) {
  373. for (bool commit : {false, true}) {
  374. ReOpen();
  375. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  376. WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
  377. txn->SetName("xid");
  378. for (int i = 0; i < kNumKeys; i++) {
  379. txn->Put("k" + ToString(i), "v" + ToString(i));
  380. if (txn_options.write_batch_flush_threshold == 1) {
  381. // WriteUnprepared will check write_batch_flush_threshold and
  382. // possibly flush before appending to the write batch. No flush will
  383. // happen at the first write because the batch is still empty, so
  384. // after k puts, there should be k-1 flushed batches.
  385. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
  386. } else {
  387. ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
  388. }
  389. }
  390. if (prepare) {
  391. ASSERT_OK(txn->Prepare());
  392. }
  393. Iterator* iter = db->NewIterator(ReadOptions());
  394. iter->SeekToFirst();
  395. assert(!iter->Valid());
  396. ASSERT_FALSE(iter->Valid());
  397. delete iter;
  398. if (commit) {
  399. ASSERT_OK(txn->Commit());
  400. } else {
  401. ASSERT_OK(txn->Rollback());
  402. }
  403. delete txn;
  404. iter = db->NewIterator(ReadOptions());
  405. iter->SeekToFirst();
  406. for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
  407. ASSERT_TRUE(iter->Valid());
  408. ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
  409. ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
  410. iter->Next();
  411. }
  412. ASSERT_FALSE(iter->Valid());
  413. delete iter;
  414. }
  415. }
  416. }
  417. }
  418. // Test whether logs containing unprepared/prepared batches are kept even
  419. // after memtable finishes flushing, and whether they are removed when
  420. // transaction commits/aborts.
  421. //
  422. // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
  423. TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
  424. WriteOptions write_options;
  425. TransactionOptions txn_options;
  426. // batch_size of 1 causes writes to DB for every marker.
  427. txn_options.write_batch_flush_threshold = 1;
  428. const int kNumKeys = 10;
  429. WriteOptions wopts;
  430. wopts.sync = true;
  431. for (bool prepare : {false, true}) {
  432. for (bool commit : {false, true}) {
  433. ReOpen();
  434. auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
  435. auto db_impl = wup_db->db_impl_;
  436. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  437. ASSERT_OK(txn1->SetName("xid1"));
  438. Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
  439. ASSERT_OK(txn2->SetName("xid2"));
  440. // Spread this transaction across multiple log files.
  441. for (int i = 0; i < kNumKeys; i++) {
  442. ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
  443. if (i >= kNumKeys / 2) {
  444. ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
  445. }
  446. if (i > 0) {
  447. db_impl->TEST_SwitchWAL();
  448. }
  449. }
  450. ASSERT_GT(txn1->GetLogNumber(), 0);
  451. ASSERT_GT(txn2->GetLogNumber(), 0);
  452. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  453. txn1->GetLogNumber());
  454. ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
  455. if (prepare) {
  456. ASSERT_OK(txn1->Prepare());
  457. ASSERT_OK(txn2->Prepare());
  458. }
  459. ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
  460. ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
  461. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  462. txn1->GetLogNumber());
  463. if (commit) {
  464. ASSERT_OK(txn1->Commit());
  465. } else {
  466. ASSERT_OK(txn1->Rollback());
  467. }
  468. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
  469. txn2->GetLogNumber());
  470. if (commit) {
  471. ASSERT_OK(txn2->Commit());
  472. } else {
  473. ASSERT_OK(txn2->Rollback());
  474. }
  475. ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
  476. delete txn1;
  477. delete txn2;
  478. }
  479. }
  480. }
  481. TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
  482. WriteOptions woptions;
  483. TransactionOptions txn_options;
  484. txn_options.write_batch_flush_threshold = 1;
  485. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  486. // Do some writes with no snapshot
  487. ASSERT_OK(txn->Put("a", "a"));
  488. ASSERT_OK(txn->Put("b", "b"));
  489. ASSERT_OK(txn->Put("c", "c"));
  490. // Test that it is still possible to create iterators after writes with no
  491. // snapshot, if iterator snapshot is fresh enough.
  492. ReadOptions roptions;
  493. auto iter = txn->GetIterator(roptions);
  494. int keys = 0;
  495. for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
  496. ASSERT_OK(iter->status());
  497. ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
  498. }
  499. ASSERT_EQ(keys, 3);
  500. delete iter;
  501. delete txn;
  502. }
  503. // Test whether write to a transaction while iterating is supported.
  504. TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
  505. WriteOptions woptions;
  506. TransactionOptions txn_options;
  507. txn_options.write_batch_flush_threshold = 1;
  508. enum Action { DO_DELETE, DO_UPDATE };
  509. for (Action a : {DO_DELETE, DO_UPDATE}) {
  510. for (int i = 0; i < 100; i++) {
  511. ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
  512. }
  513. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  514. // write_batch_ now contains 1 key.
  515. ASSERT_OK(txn->Put("9", "a"));
  516. ReadOptions roptions;
  517. auto iter = txn->GetIterator(roptions);
  518. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  519. ASSERT_OK(iter->status());
  520. if (iter->key() == "9") {
  521. ASSERT_EQ(iter->value().ToString(), "a");
  522. } else {
  523. ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
  524. }
  525. if (a == DO_DELETE) {
  526. ASSERT_OK(txn->Delete(iter->key()));
  527. } else {
  528. ASSERT_OK(txn->Put(iter->key(), "b"));
  529. }
  530. }
  531. delete iter;
  532. ASSERT_OK(txn->Commit());
  533. iter = db->NewIterator(roptions);
  534. if (a == DO_DELETE) {
  535. // Check that db is empty.
  536. iter->SeekToFirst();
  537. ASSERT_FALSE(iter->Valid());
  538. } else {
  539. int keys = 0;
  540. // Check that all values are updated to b.
  541. for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
  542. ASSERT_OK(iter->status());
  543. ASSERT_EQ(iter->value().ToString(), "b");
  544. }
  545. ASSERT_EQ(keys, 100);
  546. }
  547. delete iter;
  548. delete txn;
  549. }
  550. }
  551. TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
  552. WriteOptions woptions;
  553. TransactionOptions txn_options;
  554. txn_options.write_batch_flush_threshold = 1;
  555. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  556. txn->SetSavePoint();
  557. ASSERT_OK(txn->Put("a", "a"));
  558. ASSERT_OK(txn->Put("b", "b"));
  559. ASSERT_OK(txn->Commit());
  560. ReadOptions roptions;
  561. std::string value;
  562. ASSERT_OK(txn->Get(roptions, "a", &value));
  563. ASSERT_EQ(value, "a");
  564. ASSERT_OK(txn->Get(roptions, "b", &value));
  565. ASSERT_EQ(value, "b");
  566. delete txn;
  567. }
  568. TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
  569. WriteOptions woptions;
  570. TransactionOptions txn_options;
  571. txn_options.write_batch_flush_threshold = 1;
  572. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  573. auto wb = txn->GetWriteBatch()->GetWriteBatch();
  574. ASSERT_OK(txn->Put("a", "a"));
  575. ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
  576. txn->SetSavePoint();
  577. ASSERT_OK(txn->Put("b", "b"));
  578. ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
  579. ReadOptions roptions;
  580. std::string value;
  581. ASSERT_OK(txn->Get(roptions, "a", &value));
  582. ASSERT_EQ(value, "a");
  583. ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
  584. ASSERT_EQ(value, "a_untrack");
  585. ASSERT_OK(txn->Get(roptions, "b", &value));
  586. ASSERT_EQ(value, "b");
  587. ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
  588. ASSERT_EQ(value, "b_untrack");
  589. // b and b_untrack should be rolled back.
  590. ASSERT_OK(txn->RollbackToSavePoint());
  591. ASSERT_OK(txn->Get(roptions, "a", &value));
  592. ASSERT_EQ(value, "a");
  593. ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
  594. ASSERT_EQ(value, "a_untrack");
  595. auto s = txn->Get(roptions, "b", &value);
  596. ASSERT_TRUE(s.IsNotFound());
  597. s = txn->Get(roptions, "b_untrack", &value);
  598. ASSERT_TRUE(s.IsNotFound());
  599. // Everything should be rolled back.
  600. ASSERT_OK(txn->Rollback());
  601. s = txn->Get(roptions, "a", &value);
  602. ASSERT_TRUE(s.IsNotFound());
  603. s = txn->Get(roptions, "a_untrack", &value);
  604. ASSERT_TRUE(s.IsNotFound());
  605. s = txn->Get(roptions, "b", &value);
  606. ASSERT_TRUE(s.IsNotFound());
  607. s = txn->Get(roptions, "b_untrack", &value);
  608. ASSERT_TRUE(s.IsNotFound());
  609. delete txn;
  610. }
  611. } // namespace ROCKSDB_NAMESPACE
  612. int main(int argc, char** argv) {
  613. ::testing::InitGoogleTest(&argc, argv);
  614. return RUN_ALL_TESTS();
  615. }
  616. #else
  617. #include <stdio.h>
  618. int main(int /*argc*/, char** /*argv*/) {
  619. fprintf(stderr,
  620. "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
  621. return 0;
  622. }
  623. #endif // ROCKSDB_LITE