transaction_test.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <functional>
  9. #include <string>
  10. #include <thread>
  11. #include "db/db_impl/db_impl.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/options.h"
  14. #include "rocksdb/utilities/transaction.h"
  15. #include "rocksdb/utilities/transaction_db.h"
  16. #include "table/mock_table.h"
  17. #include "test_util/fault_injection_test_env.h"
  18. #include "test_util/sync_point.h"
  19. #include "test_util/testharness.h"
  20. #include "test_util/testutil.h"
  21. #include "test_util/transaction_test_util.h"
  22. #include "util/random.h"
  23. #include "util/string_util.h"
  24. #include "utilities/merge_operators.h"
  25. #include "utilities/merge_operators/string_append/stringappend.h"
  26. #include "utilities/transactions/pessimistic_transaction_db.h"
  27. #include "utilities/transactions/write_unprepared_txn_db.h"
  28. #include "port/port.h"
  29. namespace ROCKSDB_NAMESPACE {
  30. // Return true if the ith bit is set in combination represented by comb
  31. bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); }
  32. enum WriteOrdering : bool { kOrderedWrite, kUnorderedWrite };
  33. class TransactionTestBase : public ::testing::Test {
  34. public:
  35. TransactionDB* db;
  36. FaultInjectionTestEnv* env;
  37. std::string dbname;
  38. Options options;
  39. TransactionDBOptions txn_db_options;
  40. bool use_stackable_db_;
  41. TransactionTestBase(bool use_stackable_db, bool two_write_queue,
  42. TxnDBWritePolicy write_policy,
  43. WriteOrdering write_ordering)
  44. : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) {
  45. options.create_if_missing = true;
  46. options.max_write_buffer_number = 2;
  47. options.write_buffer_size = 4 * 1024;
  48. options.unordered_write = write_ordering == kUnorderedWrite;
  49. options.level0_file_num_compaction_trigger = 2;
  50. options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
  51. env = new FaultInjectionTestEnv(Env::Default());
  52. options.env = env;
  53. options.two_write_queues = two_write_queue;
  54. dbname = test::PerThreadDBPath("transaction_testdb");
  55. DestroyDB(dbname, options);
  56. txn_db_options.transaction_lock_timeout = 0;
  57. txn_db_options.default_lock_timeout = 0;
  58. txn_db_options.write_policy = write_policy;
  59. txn_db_options.rollback_merge_operands = true;
  60. // This will stress write unprepared, by forcing write batch flush on every
  61. // write.
  62. txn_db_options.default_write_batch_flush_threshold = 1;
  63. // Write unprepared requires all transactions to be named. This setting
  64. // autogenerates the name so that existing tests can pass.
  65. txn_db_options.autogenerate_name = true;
  66. Status s;
  67. if (use_stackable_db == false) {
  68. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  69. } else {
  70. s = OpenWithStackableDB();
  71. }
  72. assert(s.ok());
  73. }
  74. ~TransactionTestBase() {
  75. delete db;
  76. db = nullptr;
  77. // This is to skip the assert statement in FaultInjectionTestEnv. There
  78. // seems to be a bug in btrfs that the makes readdir return recently
  79. // unlink-ed files. By using the default fs we simply ignore errors resulted
  80. // from attempting to delete such files in DestroyDB.
  81. options.env = Env::Default();
  82. DestroyDB(dbname, options);
  83. delete env;
  84. }
  85. Status ReOpenNoDelete() {
  86. delete db;
  87. db = nullptr;
  88. env->AssertNoOpenFile();
  89. env->DropUnsyncedFileData();
  90. env->ResetState();
  91. Status s;
  92. if (use_stackable_db_ == false) {
  93. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  94. } else {
  95. s = OpenWithStackableDB();
  96. }
  97. assert(!s.ok() || db != nullptr);
  98. return s;
  99. }
  100. Status ReOpenNoDelete(std::vector<ColumnFamilyDescriptor>& cfs,
  101. std::vector<ColumnFamilyHandle*>* handles) {
  102. for (auto h : *handles) {
  103. delete h;
  104. }
  105. handles->clear();
  106. delete db;
  107. db = nullptr;
  108. env->AssertNoOpenFile();
  109. env->DropUnsyncedFileData();
  110. env->ResetState();
  111. Status s;
  112. if (use_stackable_db_ == false) {
  113. s = TransactionDB::Open(options, txn_db_options, dbname, cfs, handles,
  114. &db);
  115. } else {
  116. s = OpenWithStackableDB(cfs, handles);
  117. }
  118. assert(!s.ok() || db != nullptr);
  119. return s;
  120. }
  121. Status ReOpen() {
  122. delete db;
  123. db = nullptr;
  124. DestroyDB(dbname, options);
  125. Status s;
  126. if (use_stackable_db_ == false) {
  127. s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  128. } else {
  129. s = OpenWithStackableDB();
  130. }
  131. assert(db != nullptr);
  132. return s;
  133. }
  134. Status OpenWithStackableDB(std::vector<ColumnFamilyDescriptor>& cfs,
  135. std::vector<ColumnFamilyHandle*>* handles) {
  136. std::vector<size_t> compaction_enabled_cf_indices;
  137. TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices);
  138. DB* root_db = nullptr;
  139. Options options_copy(options);
  140. const bool use_seq_per_batch =
  141. txn_db_options.write_policy == WRITE_PREPARED ||
  142. txn_db_options.write_policy == WRITE_UNPREPARED;
  143. const bool use_batch_per_txn =
  144. txn_db_options.write_policy == WRITE_COMMITTED ||
  145. txn_db_options.write_policy == WRITE_PREPARED;
  146. Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
  147. use_seq_per_batch, use_batch_per_txn);
  148. StackableDB* stackable_db = new StackableDB(root_db);
  149. if (s.ok()) {
  150. assert(root_db != nullptr);
  151. s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
  152. compaction_enabled_cf_indices,
  153. *handles, &db);
  154. }
  155. if (!s.ok()) {
  156. delete stackable_db;
  157. }
  158. return s;
  159. }
  160. Status OpenWithStackableDB() {
  161. std::vector<size_t> compaction_enabled_cf_indices;
  162. std::vector<ColumnFamilyDescriptor> column_families{ColumnFamilyDescriptor(
  163. kDefaultColumnFamilyName, ColumnFamilyOptions(options))};
  164. TransactionDB::PrepareWrap(&options, &column_families,
  165. &compaction_enabled_cf_indices);
  166. std::vector<ColumnFamilyHandle*> handles;
  167. DB* root_db = nullptr;
  168. Options options_copy(options);
  169. const bool use_seq_per_batch =
  170. txn_db_options.write_policy == WRITE_PREPARED ||
  171. txn_db_options.write_policy == WRITE_UNPREPARED;
  172. const bool use_batch_per_txn =
  173. txn_db_options.write_policy == WRITE_COMMITTED ||
  174. txn_db_options.write_policy == WRITE_PREPARED;
  175. Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
  176. &root_db, use_seq_per_batch, use_batch_per_txn);
  177. if (!s.ok()) {
  178. delete root_db;
  179. return s;
  180. }
  181. StackableDB* stackable_db = new StackableDB(root_db);
  182. assert(root_db != nullptr);
  183. assert(handles.size() == 1);
  184. s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
  185. compaction_enabled_cf_indices, handles,
  186. &db);
  187. delete handles[0];
  188. if (!s.ok()) {
  189. delete stackable_db;
  190. }
  191. return s;
  192. }
  193. std::atomic<size_t> linked = {0};
  194. std::atomic<size_t> exp_seq = {0};
  195. std::atomic<size_t> commit_writes = {0};
  196. std::atomic<size_t> expected_commits = {0};
  197. // Without Prepare, the commit does not write to WAL
  198. std::atomic<size_t> with_empty_commits = {0};
  199. std::function<void(size_t, Status)> txn_t0_with_status = [&](size_t index,
  200. Status exp_s) {
  201. // Test DB's internal txn. It involves no prepare phase nor a commit marker.
  202. WriteOptions wopts;
  203. auto s = db->Put(wopts, "key" + std::to_string(index), "value");
  204. ASSERT_EQ(exp_s, s);
  205. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  206. // Consume one seq per key
  207. exp_seq++;
  208. } else {
  209. // Consume one seq per batch
  210. exp_seq++;
  211. if (options.two_write_queues) {
  212. // Consume one seq for commit
  213. exp_seq++;
  214. }
  215. }
  216. with_empty_commits++;
  217. };
  218. std::function<void(size_t)> txn_t0 = [&](size_t index) {
  219. return txn_t0_with_status(index, Status::OK());
  220. };
  221. std::function<void(size_t)> txn_t1 = [&](size_t index) {
  222. // Testing directly writing a write batch. Functionality-wise it is
  223. // equivalent to commit without prepare.
  224. WriteBatch wb;
  225. auto istr = std::to_string(index);
  226. ASSERT_OK(wb.Put("k1" + istr, "v1"));
  227. ASSERT_OK(wb.Put("k2" + istr, "v2"));
  228. ASSERT_OK(wb.Put("k3" + istr, "v3"));
  229. WriteOptions wopts;
  230. auto s = db->Write(wopts, &wb);
  231. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  232. // Consume one seq per key
  233. exp_seq += 3;
  234. } else {
  235. // Consume one seq per batch
  236. exp_seq++;
  237. if (options.two_write_queues) {
  238. // Consume one seq for commit
  239. exp_seq++;
  240. }
  241. }
  242. ASSERT_OK(s);
  243. with_empty_commits++;
  244. };
  245. std::function<void(size_t)> txn_t2 = [&](size_t index) {
  246. // Commit without prepare. It should write to DB without a commit marker.
  247. TransactionOptions txn_options;
  248. WriteOptions write_options;
  249. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  250. auto istr = std::to_string(index);
  251. ASSERT_OK(txn->SetName("xid" + istr));
  252. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  253. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  254. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  255. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  256. ASSERT_OK(txn->Commit());
  257. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  258. // Consume one seq per key
  259. exp_seq += 4;
  260. } else if (txn_db_options.write_policy ==
  261. TxnDBWritePolicy::WRITE_PREPARED) {
  262. // Consume one seq per batch
  263. exp_seq++;
  264. if (options.two_write_queues) {
  265. // Consume one seq for commit
  266. exp_seq++;
  267. }
  268. } else {
  269. // Flushed after each key, consume one seq per flushed batch
  270. exp_seq += 4;
  271. // WriteUnprepared implements CommitWithoutPrepareInternal by simply
  272. // calling Prepare then Commit. Consume one seq for the prepare.
  273. exp_seq++;
  274. }
  275. delete txn;
  276. with_empty_commits++;
  277. };
  278. std::function<void(size_t)> txn_t3 = [&](size_t index) {
  279. // A full 2pc txn that also involves a commit marker.
  280. TransactionOptions txn_options;
  281. WriteOptions write_options;
  282. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  283. auto istr = std::to_string(index);
  284. ASSERT_OK(txn->SetName("xid" + istr));
  285. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  286. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  287. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  288. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  289. ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
  290. expected_commits++;
  291. ASSERT_OK(txn->Prepare());
  292. commit_writes++;
  293. ASSERT_OK(txn->Commit());
  294. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  295. // Consume one seq per key
  296. exp_seq += 5;
  297. } else if (txn_db_options.write_policy ==
  298. TxnDBWritePolicy::WRITE_PREPARED) {
  299. // Consume one seq per batch
  300. exp_seq++;
  301. // Consume one seq per commit marker
  302. exp_seq++;
  303. } else {
  304. // Flushed after each key, consume one seq per flushed batch
  305. exp_seq += 5;
  306. // Consume one seq per commit marker
  307. exp_seq++;
  308. }
  309. delete txn;
  310. };
  311. std::function<void(size_t)> txn_t4 = [&](size_t index) {
  312. // A full 2pc txn that also involves a commit marker.
  313. TransactionOptions txn_options;
  314. WriteOptions write_options;
  315. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  316. auto istr = std::to_string(index);
  317. ASSERT_OK(txn->SetName("xid" + istr));
  318. ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
  319. ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
  320. ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
  321. ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
  322. ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
  323. expected_commits++;
  324. ASSERT_OK(txn->Prepare());
  325. commit_writes++;
  326. ASSERT_OK(txn->Rollback());
  327. if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
  328. // No seq is consumed for deleting the txn buffer
  329. exp_seq += 0;
  330. } else if (txn_db_options.write_policy ==
  331. TxnDBWritePolicy::WRITE_PREPARED) {
  332. // Consume one seq per batch
  333. exp_seq++;
  334. // Consume one seq per rollback batch
  335. exp_seq++;
  336. if (options.two_write_queues) {
  337. // Consume one seq for rollback commit
  338. exp_seq++;
  339. }
  340. } else {
  341. // Flushed after each key, consume one seq per flushed batch
  342. exp_seq += 5;
  343. // Consume one seq per rollback batch
  344. exp_seq++;
  345. if (options.two_write_queues) {
  346. // Consume one seq for rollback commit
  347. exp_seq++;
  348. }
  349. }
  350. delete txn;
  351. };
  352. // Test that we can change write policy after a clean shutdown (which would
  353. // empty the WAL)
  354. void CrossCompatibilityTest(TxnDBWritePolicy from_policy,
  355. TxnDBWritePolicy to_policy, bool empty_wal) {
  356. TransactionOptions txn_options;
  357. ReadOptions read_options;
  358. WriteOptions write_options;
  359. uint32_t index = 0;
  360. Random rnd(1103);
  361. options.write_buffer_size = 1024; // To create more sst files
  362. std::unordered_map<std::string, std::string> committed_kvs;
  363. Transaction* txn;
  364. txn_db_options.write_policy = from_policy;
  365. if (txn_db_options.write_policy == WRITE_COMMITTED) {
  366. options.unordered_write = false;
  367. }
  368. ReOpen();
  369. for (int i = 0; i < 1024; i++) {
  370. auto istr = std::to_string(index);
  371. auto k = Slice("foo-" + istr).ToString();
  372. auto v = Slice("bar-" + istr).ToString();
  373. // For test the duplicate keys
  374. auto v2 = Slice("bar2-" + istr).ToString();
  375. auto type = rnd.Uniform(4);
  376. switch (type) {
  377. case 0:
  378. committed_kvs[k] = v;
  379. ASSERT_OK(db->Put(write_options, k, v));
  380. committed_kvs[k] = v2;
  381. ASSERT_OK(db->Put(write_options, k, v2));
  382. break;
  383. case 1: {
  384. WriteBatch wb;
  385. committed_kvs[k] = v;
  386. wb.Put(k, v);
  387. committed_kvs[k] = v2;
  388. wb.Put(k, v2);
  389. ASSERT_OK(db->Write(write_options, &wb));
  390. } break;
  391. case 2:
  392. case 3:
  393. txn = db->BeginTransaction(write_options, txn_options);
  394. ASSERT_OK(txn->SetName("xid" + istr));
  395. committed_kvs[k] = v;
  396. ASSERT_OK(txn->Put(k, v));
  397. committed_kvs[k] = v2;
  398. ASSERT_OK(txn->Put(k, v2));
  399. if (type == 3) {
  400. ASSERT_OK(txn->Prepare());
  401. }
  402. ASSERT_OK(txn->Commit());
  403. delete txn;
  404. break;
  405. default:
  406. assert(0);
  407. }
  408. index++;
  409. } // for i
  410. txn_db_options.write_policy = to_policy;
  411. if (txn_db_options.write_policy == WRITE_COMMITTED) {
  412. options.unordered_write = false;
  413. }
  414. auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
  415. // Before upgrade/downgrade the WAL must be emptied
  416. if (empty_wal) {
  417. db_impl->TEST_FlushMemTable();
  418. } else {
  419. db_impl->FlushWAL(true);
  420. }
  421. auto s = ReOpenNoDelete();
  422. if (empty_wal) {
  423. ASSERT_OK(s);
  424. } else {
  425. // Test that we can detect the WAL that is produced by an incompatible
  426. // WritePolicy and fail fast before mis-interpreting the WAL.
  427. ASSERT_TRUE(s.IsNotSupported());
  428. return;
  429. }
  430. db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
  431. // Check that WAL is empty
  432. VectorLogPtr log_files;
  433. db_impl->GetSortedWalFiles(log_files);
  434. ASSERT_EQ(0, log_files.size());
  435. for (auto& kv : committed_kvs) {
  436. std::string value;
  437. s = db->Get(read_options, kv.first, &value);
  438. if (s.IsNotFound()) {
  439. printf("key = %s\n", kv.first.c_str());
  440. }
  441. ASSERT_OK(s);
  442. if (kv.second != value) {
  443. printf("key = %s\n", kv.first.c_str());
  444. }
  445. ASSERT_EQ(kv.second, value);
  446. }
  447. }
  448. };
  449. class TransactionTest
  450. : public TransactionTestBase,
  451. virtual public ::testing::WithParamInterface<
  452. std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> {
  453. public:
  454. TransactionTest()
  455. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  456. std::get<2>(GetParam()), std::get<3>(GetParam())){};
  457. };
  458. class TransactionStressTest : public TransactionTest {};
  459. class MySQLStyleTransactionTest
  460. : public TransactionTestBase,
  461. virtual public ::testing::WithParamInterface<
  462. std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering, bool>> {
  463. public:
  464. MySQLStyleTransactionTest()
  465. : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
  466. std::get<2>(GetParam()), std::get<3>(GetParam())),
  467. with_slow_threads_(std::get<4>(GetParam())) {
  468. if (with_slow_threads_ &&
  469. (txn_db_options.write_policy == WRITE_PREPARED ||
  470. txn_db_options.write_policy == WRITE_UNPREPARED)) {
  471. // The corner case with slow threads involves the caches filling
  472. // over which would not happen even with artifial delays. To help
  473. // such cases to show up we lower the size of the cache-related data
  474. // structures.
  475. txn_db_options.wp_snapshot_cache_bits = 1;
  476. txn_db_options.wp_commit_cache_bits = 10;
  477. options.write_buffer_size = 1024;
  478. EXPECT_OK(ReOpen());
  479. }
  480. };
  481. protected:
  482. // Also emulate slow threads by addin artiftial delays
  483. const bool with_slow_threads_;
  484. };
  485. } // namespace ROCKSDB_NAMESPACE