optimistic_transaction_test.cc 83 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <cstdint>
  6. #include <functional>
  7. #include <memory>
  8. #include <string>
  9. #include <thread>
  10. #include "db/db_impl/db_impl.h"
  11. #include "db/db_test_util.h"
  12. #include "port/port.h"
  13. #include "rocksdb/db.h"
  14. #include "rocksdb/perf_context.h"
  15. #include "rocksdb/utilities/optimistic_transaction_db.h"
  16. #include "rocksdb/utilities/transaction.h"
  17. #include "test_util/sync_point.h"
  18. #include "test_util/testharness.h"
  19. #include "test_util/transaction_test_util.h"
  20. #include "util/crc32c.h"
  21. #include "util/random.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. class OptimisticTransactionTest
  24. : public testing::Test,
  25. public testing::WithParamInterface<OccValidationPolicy> {
  26. public:
  27. std::unique_ptr<OptimisticTransactionDB> txn_db;
  28. std::string dbname;
  29. Options options;
  30. OptimisticTransactionDBOptions occ_opts;
  31. OptimisticTransactionTest() {
  32. options.create_if_missing = true;
  33. options.max_write_buffer_number = 2;
  34. options.max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
  35. options.merge_operator.reset(new TestPutOperator());
  36. occ_opts.validate_policy = GetParam();
  37. dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
  38. EXPECT_OK(DestroyDB(dbname, options));
  39. Open();
  40. }
  41. ~OptimisticTransactionTest() override {
  42. EXPECT_OK(txn_db->Close());
  43. txn_db.reset();
  44. EXPECT_OK(DestroyDB(dbname, options));
  45. }
  46. void Reopen() {
  47. txn_db.reset();
  48. Open();
  49. }
  50. static void OpenImpl(const Options& options,
  51. const OptimisticTransactionDBOptions& occ_opts,
  52. const std::string& dbname,
  53. std::unique_ptr<OptimisticTransactionDB>* txn_db) {
  54. ColumnFamilyOptions cf_options(options);
  55. std::vector<ColumnFamilyDescriptor> column_families;
  56. std::vector<ColumnFamilyHandle*> handles;
  57. column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  58. OptimisticTransactionDB* raw_txn_db = nullptr;
  59. Status s = OptimisticTransactionDB::Open(
  60. options, occ_opts, dbname, column_families, &handles, &raw_txn_db);
  61. ASSERT_OK(s);
  62. ASSERT_NE(raw_txn_db, nullptr);
  63. txn_db->reset(raw_txn_db);
  64. ASSERT_EQ(handles.size(), 1);
  65. delete handles[0];
  66. }
  67. private:
  68. void Open() { OpenImpl(options, occ_opts, dbname, &txn_db); }
  69. };
  70. TEST_P(OptimisticTransactionTest, SuccessTest) {
  71. WriteOptions write_options;
  72. ReadOptions read_options;
  73. std::string value;
  74. ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
  75. ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
  76. Transaction* txn = txn_db->BeginTransaction(write_options);
  77. ASSERT_NE(txn, nullptr);
  78. ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
  79. ASSERT_EQ(value, "bar");
  80. ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
  81. ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
  82. ASSERT_EQ(value, "bar2");
  83. ASSERT_OK(txn->Commit());
  84. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  85. ASSERT_EQ(value, "bar2");
  86. delete txn;
  87. }
  88. TEST_P(OptimisticTransactionTest, WriteConflictTest) {
  89. WriteOptions write_options;
  90. ReadOptions read_options;
  91. std::string value;
  92. ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
  93. ASSERT_OK(txn_db->Put(write_options, "foo2", "bar"));
  94. Transaction* txn = txn_db->BeginTransaction(write_options);
  95. ASSERT_NE(txn, nullptr);
  96. ASSERT_OK(txn->Put("foo", "bar2"));
  97. // This Put outside of a transaction will conflict with the previous write
  98. ASSERT_OK(txn_db->Put(write_options, "foo", "barz"));
  99. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  100. ASSERT_EQ(value, "barz");
  101. ASSERT_EQ(1, txn->GetNumKeys());
  102. Status s = txn->Commit();
  103. ASSERT_TRUE(s.IsBusy()); // Txn should not commit
  104. // Verify that transaction did not write anything
  105. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  106. ASSERT_EQ(value, "barz");
  107. ASSERT_OK(txn_db->Get(read_options, "foo2", &value));
  108. ASSERT_EQ(value, "bar");
  109. delete txn;
  110. }
  111. TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
  112. WriteOptions write_options;
  113. ReadOptions read_options;
  114. OptimisticTransactionOptions txn_options;
  115. std::string value;
  116. ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
  117. ASSERT_OK(txn_db->Put(write_options, "foo2", "bar"));
  118. txn_options.set_snapshot = true;
  119. Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
  120. ASSERT_NE(txn, nullptr);
  121. // This Put outside of a transaction will conflict with a later write
  122. ASSERT_OK(txn_db->Put(write_options, "foo", "barz"));
  123. ASSERT_OK(txn->Put(
  124. "foo", "bar2")); // Conflicts with write done after snapshot taken
  125. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  126. ASSERT_EQ(value, "barz");
  127. Status s = txn->Commit();
  128. ASSERT_TRUE(s.IsBusy()); // Txn should not commit
  129. // Verify that transaction did not write anything
  130. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  131. ASSERT_EQ(value, "barz");
  132. ASSERT_OK(txn_db->Get(read_options, "foo2", &value));
  133. ASSERT_EQ(value, "bar");
  134. delete txn;
  135. }
  136. TEST_P(OptimisticTransactionTest, WriteConflictTest3) {
  137. ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar"));
  138. Transaction* txn = txn_db->BeginTransaction(WriteOptions());
  139. ASSERT_NE(txn, nullptr);
  140. std::string value;
  141. ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value));
  142. ASSERT_EQ(value, "bar");
  143. ASSERT_OK(txn->Merge("foo", "bar3"));
  144. // Merge outside of a transaction should conflict with the previous merge
  145. ASSERT_OK(txn_db->Merge(WriteOptions(), "foo", "bar2"));
  146. ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value));
  147. ASSERT_EQ(value, "bar2");
  148. ASSERT_EQ(1, txn->GetNumKeys());
  149. Status s = txn->Commit();
  150. EXPECT_TRUE(s.IsBusy()); // Txn should not commit
  151. // Verify that transaction did not write anything
  152. ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value));
  153. ASSERT_EQ(value, "bar2");
  154. delete txn;
  155. }
  156. TEST_P(OptimisticTransactionTest, WriteConflict4) {
  157. ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar"));
  158. Transaction* txn = txn_db->BeginTransaction(WriteOptions());
  159. ASSERT_NE(txn, nullptr);
  160. std::string value;
  161. ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value));
  162. ASSERT_EQ(value, "bar");
  163. ASSERT_OK(txn->Merge("foo", "bar3"));
  164. // Range delete outside of a transaction should conflict with the previous
  165. // merge inside txn
  166. auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
  167. ColumnFamilyHandle* default_cf = dbimpl->DefaultColumnFamily();
  168. ASSERT_OK(dbimpl->DeleteRange(WriteOptions(), default_cf, "foo", "foo1"));
  169. Status s = txn_db->Get(ReadOptions(), "foo", &value);
  170. ASSERT_TRUE(s.IsNotFound());
  171. ASSERT_EQ(1, txn->GetNumKeys());
  172. s = txn->Commit();
  173. EXPECT_TRUE(s.IsBusy()); // Txn should not commit
  174. // Verify that transaction did not write anything
  175. s = txn_db->Get(ReadOptions(), "foo", &value);
  176. ASSERT_TRUE(s.IsNotFound());
  177. delete txn;
  178. }
  179. TEST_P(OptimisticTransactionTest, ReadConflictTest) {
  180. WriteOptions write_options;
  181. ReadOptions read_options, snapshot_read_options;
  182. OptimisticTransactionOptions txn_options;
  183. std::string value;
  184. ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
  185. ASSERT_OK(txn_db->Put(write_options, "foo2", "bar"));
  186. txn_options.set_snapshot = true;
  187. Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
  188. ASSERT_NE(txn, nullptr);
  189. txn->SetSnapshot();
  190. snapshot_read_options.snapshot = txn->GetSnapshot();
  191. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  192. ASSERT_EQ(value, "bar");
  193. // This Put outside of a transaction will conflict with the previous read
  194. ASSERT_OK(txn_db->Put(write_options, "foo", "barz"));
  195. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  196. ASSERT_EQ(value, "barz");
  197. Status s = txn->Commit();
  198. ASSERT_TRUE(s.IsBusy()); // Txn should not commit
  199. // Verify that transaction did not write anything
  200. ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
  201. ASSERT_EQ(value, "barz");
  202. ASSERT_OK(txn->GetForUpdate(read_options, "foo2", &value));
  203. ASSERT_EQ(value, "bar");
  204. delete txn;
  205. }
  206. TEST_P(OptimisticTransactionTest, TxnOnlyTest) {
  207. // Test to make sure transactions work when there are no other writes in an
  208. // empty db.
  209. WriteOptions write_options;
  210. ReadOptions read_options;
  211. std::string value;
  212. Transaction* txn = txn_db->BeginTransaction(write_options);
  213. ASSERT_NE(txn, nullptr);
  214. ASSERT_OK(txn->Put("x", "y"));
  215. ASSERT_OK(txn->Commit());
  216. delete txn;
  217. }
  218. TEST_P(OptimisticTransactionTest, FlushTest) {
  219. WriteOptions write_options;
  220. ReadOptions read_options, snapshot_read_options;
  221. std::string value;
  222. ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
  223. ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
  224. Transaction* txn = txn_db->BeginTransaction(write_options);
  225. ASSERT_NE(txn, nullptr);
  226. snapshot_read_options.snapshot = txn->GetSnapshot();
  227. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  228. ASSERT_EQ(value, "bar");
  229. ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
  230. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  231. ASSERT_EQ(value, "bar2");
  232. // Put a random key so we have a memtable to flush
  233. ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy"));
  234. // force a memtable flush
  235. FlushOptions flush_ops;
  236. ASSERT_OK(txn_db->Flush(flush_ops));
  237. // txn should commit since the flushed table is still in MemtableList History
  238. ASSERT_OK(txn->Commit());
  239. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  240. ASSERT_EQ(value, "bar2");
  241. delete txn;
  242. }
  243. namespace {
  244. void FlushTest2PopulateTxn(Transaction* txn) {
  245. ReadOptions snapshot_read_options;
  246. std::string value;
  247. snapshot_read_options.snapshot = txn->GetSnapshot();
  248. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  249. ASSERT_EQ(value, "bar");
  250. ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
  251. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  252. ASSERT_EQ(value, "bar2");
  253. }
  254. } // namespace
  255. TEST_P(OptimisticTransactionTest, FlushTest2) {
  256. WriteOptions write_options;
  257. ReadOptions read_options;
  258. std::string value;
  259. ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
  260. ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
  261. Transaction* txn = txn_db->BeginTransaction(write_options);
  262. ASSERT_NE(txn, nullptr);
  263. FlushTest2PopulateTxn(txn);
  264. // Put a random key so we have a MemTable to flush
  265. ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy"));
  266. // force a memtable flush
  267. FlushOptions flush_ops;
  268. ASSERT_OK(txn_db->Flush(flush_ops));
  269. // Put a random key so we have a MemTable to flush
  270. ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy2"));
  271. // force a memtable flush
  272. ASSERT_OK(txn_db->Flush(flush_ops));
  273. ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy3"));
  274. // force a memtable flush
  275. // Since our test db has max_write_buffer_number=2, this flush will cause
  276. // the first memtable to get purged from the MemtableList history.
  277. ASSERT_OK(txn_db->Flush(flush_ops));
  278. Status s = txn->Commit();
  279. // txn should not commit since MemTableList History is not large enough
  280. ASSERT_TRUE(s.IsTryAgain());
  281. // simply trying Commit again doesn't help
  282. s = txn->Commit();
  283. ASSERT_TRUE(s.IsTryAgain());
  284. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  285. ASSERT_EQ(value, "bar");
  286. // But rolling back and redoing does
  287. ASSERT_OK(txn->Rollback());
  288. FlushTest2PopulateTxn(txn);
  289. ASSERT_OK(txn->Commit());
  290. ASSERT_OK(txn_db->Get(read_options, "foo", &value));
  291. ASSERT_EQ(value, "bar2");
  292. delete txn;
  293. }
  294. // Trigger the condition where some old memtables are skipped when doing
  295. // TransactionUtil::CheckKey(), and make sure the result is still correct.
  296. TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
  297. const int kAttemptHistoryMemtable = 0;
  298. const int kAttemptImmMemTable = 1;
  299. for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
  300. attempt++) {
  301. Reopen();
  302. WriteOptions write_options;
  303. ReadOptions read_options;
  304. ReadOptions snapshot_read_options;
  305. ReadOptions snapshot_read_options2;
  306. std::string value;
  307. ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
  308. ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
  309. Transaction* txn = txn_db->BeginTransaction(write_options);
  310. ASSERT_TRUE(txn != nullptr);
  311. Transaction* txn2 = txn_db->BeginTransaction(write_options);
  312. ASSERT_TRUE(txn2 != nullptr);
  313. snapshot_read_options.snapshot = txn->GetSnapshot();
  314. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
  315. ASSERT_EQ(value, "bar");
  316. ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
  317. snapshot_read_options2.snapshot = txn2->GetSnapshot();
  318. ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value));
  319. ASSERT_EQ(value, "bar");
  320. ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2")));
  321. // txn updates "foo" and txn2 updates "foo2", and now a write is
  322. // issued for "foo", which conflicts with txn but not txn2
  323. ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
  324. if (attempt == kAttemptImmMemTable) {
  325. // For the second attempt, hold flush from beginning. The memtable
  326. // will be switched to immutable after calling TEST_SwitchMemtable()
  327. // while CheckKey() is called.
  328. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  329. {{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
  330. "FlushJob::Start"}});
  331. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  332. }
  333. // force a memtable flush. The memtable should still be kept
  334. FlushOptions flush_ops;
  335. if (attempt == kAttemptHistoryMemtable) {
  336. ASSERT_OK(txn_db->Flush(flush_ops));
  337. } else {
  338. ASSERT_EQ(attempt, kAttemptImmMemTable);
  339. DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB());
  340. ASSERT_OK(db_impl->TEST_SwitchMemtable());
  341. }
  342. uint64_t num_imm_mems;
  343. ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
  344. &num_imm_mems));
  345. if (attempt == kAttemptHistoryMemtable) {
  346. ASSERT_EQ(0, num_imm_mems);
  347. } else {
  348. ASSERT_EQ(attempt, kAttemptImmMemTable);
  349. ASSERT_EQ(1, num_imm_mems);
  350. }
  351. // Put something in active memtable
  352. ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar")));
  353. // Create txn3 after flushing, when this transaction is commited,
  354. // only need to check the active memtable
  355. Transaction* txn3 = txn_db->BeginTransaction(write_options);
  356. ASSERT_TRUE(txn3 != nullptr);
  357. // Commit both of txn and txn2. txn will conflict but txn2 will
  358. // pass. In both ways, both memtables are queried.
  359. SetPerfLevel(PerfLevel::kEnableCount);
  360. get_perf_context()->Reset();
  361. Status s = txn->Commit();
  362. // We should have checked two memtables
  363. ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
  364. // txn should fail because of conflict, even if the memtable
  365. // has flushed, because it is still preserved in history.
  366. ASSERT_TRUE(s.IsBusy());
  367. get_perf_context()->Reset();
  368. s = txn2->Commit();
  369. // We should have checked two memtables
  370. ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
  371. ASSERT_TRUE(s.ok());
  372. ASSERT_OK(txn3->Put(Slice("foo2"), Slice("bar2")));
  373. get_perf_context()->Reset();
  374. s = txn3->Commit();
  375. // txn3 is created after the active memtable is created, so that is the only
  376. // memtable to check.
  377. ASSERT_EQ(1, get_perf_context()->get_from_memtable_count);
  378. ASSERT_TRUE(s.ok());
  379. TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
  380. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  381. SetPerfLevel(PerfLevel::kDisable);
  382. delete txn;
  383. delete txn2;
  384. delete txn3;
  385. }
  386. }
  387. TEST_P(OptimisticTransactionTest, NoSnapshotTest) {
  388. WriteOptions write_options;
  389. ReadOptions read_options;
  390. std::string value;
  391. ASSERT_OK(txn_db->Put(write_options, "AAA", "bar"));
  392. Transaction* txn = txn_db->BeginTransaction(write_options);
  393. ASSERT_NE(txn, nullptr);
  394. // Modify key after transaction start
  395. ASSERT_OK(txn_db->Put(write_options, "AAA", "bar1"));
  396. // Read and write without a snapshot
  397. ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
  398. ASSERT_EQ(value, "bar1");
  399. ASSERT_OK(txn->Put("AAA", "bar2"));
  400. // Should commit since read/write was done after data changed
  401. ASSERT_OK(txn->Commit());
  402. ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
  403. ASSERT_EQ(value, "bar2");
  404. delete txn;
  405. }
  406. TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) {
  407. WriteOptions write_options;
  408. ReadOptions read_options, snapshot_read_options;
  409. std::string value;
  410. ASSERT_OK(txn_db->Put(write_options, "AAA", "bar"));
  411. ASSERT_OK(txn_db->Put(write_options, "BBB", "bar"));
  412. ASSERT_OK(txn_db->Put(write_options, "CCC", "bar"));
  413. Transaction* txn = txn_db->BeginTransaction(write_options);
  414. ASSERT_NE(txn, nullptr);
  415. ASSERT_OK(txn_db->Put(write_options, "AAA", "bar1"));
  416. // Read and write without a snapshot
  417. ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
  418. ASSERT_EQ(value, "bar1");
  419. ASSERT_OK(txn->Put("AAA", "bar2"));
  420. // Modify BBB before snapshot is taken
  421. ASSERT_OK(txn_db->Put(write_options, "BBB", "bar1"));
  422. txn->SetSnapshot();
  423. snapshot_read_options.snapshot = txn->GetSnapshot();
  424. // Read and write with snapshot
  425. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
  426. ASSERT_EQ(value, "bar1");
  427. ASSERT_OK(txn->Put("BBB", "bar2"));
  428. ASSERT_OK(txn_db->Put(write_options, "CCC", "bar1"));
  429. // Set a new snapshot
  430. txn->SetSnapshot();
  431. snapshot_read_options.snapshot = txn->GetSnapshot();
  432. // Read and write with snapshot
  433. ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value));
  434. ASSERT_EQ(value, "bar1");
  435. ASSERT_OK(txn->Put("CCC", "bar2"));
  436. ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
  437. ASSERT_EQ(value, "bar2");
  438. ASSERT_OK(txn->GetForUpdate(read_options, "BBB", &value));
  439. ASSERT_EQ(value, "bar2");
  440. ASSERT_OK(txn->GetForUpdate(read_options, "CCC", &value));
  441. ASSERT_EQ(value, "bar2");
  442. ASSERT_OK(txn_db->Get(read_options, "AAA", &value));
  443. ASSERT_EQ(value, "bar1");
  444. ASSERT_OK(txn_db->Get(read_options, "BBB", &value));
  445. ASSERT_EQ(value, "bar1");
  446. ASSERT_OK(txn_db->Get(read_options, "CCC", &value));
  447. ASSERT_EQ(value, "bar1");
  448. ASSERT_OK(txn->Commit());
  449. ASSERT_OK(txn_db->Get(read_options, "AAA", &value));
  450. ASSERT_EQ(value, "bar2");
  451. ASSERT_OK(txn_db->Get(read_options, "BBB", &value));
  452. ASSERT_EQ(value, "bar2");
  453. ASSERT_OK(txn_db->Get(read_options, "CCC", &value));
  454. ASSERT_EQ(value, "bar2");
  455. // verify that we track multiple writes to the same key at different snapshots
  456. delete txn;
  457. txn = txn_db->BeginTransaction(write_options);
  458. // Potentially conflicting writes
  459. ASSERT_OK(txn_db->Put(write_options, "ZZZ", "zzz"));
  460. ASSERT_OK(txn_db->Put(write_options, "XXX", "xxx"));
  461. txn->SetSnapshot();
  462. OptimisticTransactionOptions txn_options;
  463. txn_options.set_snapshot = true;
  464. Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
  465. txn2->SetSnapshot();
  466. // This should not conflict in txn since the snapshot is later than the
  467. // previous write (spoiler alert: it will later conflict with txn2).
  468. ASSERT_OK(txn->Put("ZZZ", "zzzz"));
  469. ASSERT_OK(txn->Commit());
  470. delete txn;
  471. // This will conflict since the snapshot is earlier than another write to ZZZ
  472. ASSERT_OK(txn2->Put("ZZZ", "xxxxx"));
  473. Status s = txn2->Commit();
  474. ASSERT_TRUE(s.IsBusy());
  475. delete txn2;
  476. }
  477. TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
  478. WriteOptions write_options;
  479. ReadOptions read_options, snapshot_read_options;
  480. OptimisticTransactionOptions txn_options;
  481. std::string value;
  482. ColumnFamilyHandle *cfa, *cfb;
  483. ColumnFamilyOptions cf_options;
  484. // Create 2 new column families
  485. ASSERT_OK(txn_db->CreateColumnFamily(cf_options, "CFA", &cfa));
  486. ASSERT_OK(txn_db->CreateColumnFamily(cf_options, "CFB", &cfb));
  487. delete cfa;
  488. delete cfb;
  489. txn_db.reset();
  490. OptimisticTransactionDBOptions my_occ_opts = occ_opts;
  491. const size_t bucket_count = 500;
  492. my_occ_opts.shared_lock_buckets = MakeSharedOccLockBuckets(bucket_count);
  493. // open DB with three column families
  494. std::vector<ColumnFamilyDescriptor> column_families;
  495. // have to open default column family
  496. column_families.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  497. // open the new column families
  498. column_families.emplace_back("CFA", ColumnFamilyOptions());
  499. column_families.emplace_back("CFB", ColumnFamilyOptions());
  500. std::vector<ColumnFamilyHandle*> handles;
  501. OptimisticTransactionDB* raw_txn_db = nullptr;
  502. ASSERT_OK(OptimisticTransactionDB::Open(
  503. options, my_occ_opts, dbname, column_families, &handles, &raw_txn_db));
  504. ASSERT_NE(raw_txn_db, nullptr);
  505. txn_db.reset(raw_txn_db);
  506. Transaction* txn = txn_db->BeginTransaction(write_options);
  507. ASSERT_NE(txn, nullptr);
  508. txn->SetSnapshot();
  509. snapshot_read_options.snapshot = txn->GetSnapshot();
  510. txn_options.set_snapshot = true;
  511. Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
  512. ASSERT_TRUE(txn2);
  513. // Write some data to the db
  514. WriteBatch batch;
  515. ASSERT_OK(batch.Put("foo", "foo"));
  516. ASSERT_OK(batch.Put(handles[1], "AAA", "bar"));
  517. ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar"));
  518. ASSERT_OK(txn_db->Write(write_options, &batch));
  519. ASSERT_OK(txn_db->Delete(write_options, handles[1], "AAAZZZ"));
  520. // These keys do no conflict with existing writes since they're in
  521. // different column families
  522. ASSERT_OK(txn->Delete("AAA"));
  523. Status s =
  524. txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
  525. ASSERT_TRUE(s.IsNotFound());
  526. Slice key_slice("AAAZZZ");
  527. Slice value_slices[2] = {Slice("bar"), Slice("bar")};
  528. ASSERT_OK(txn->Put(handles[2], SliceParts(&key_slice, 1),
  529. SliceParts(value_slices, 2)));
  530. ASSERT_EQ(3, txn->GetNumKeys());
  531. // Txn should commit
  532. ASSERT_OK(txn->Commit());
  533. s = txn_db->Get(read_options, "AAA", &value);
  534. ASSERT_TRUE(s.IsNotFound());
  535. s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value);
  536. ASSERT_OK(s);
  537. ASSERT_EQ(value, "barbar");
  538. Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
  539. Slice value_slice("barbarbar");
  540. // This write will cause a conflict with the earlier batch write
  541. ASSERT_OK(txn2->Put(handles[1], SliceParts(key_slices, 3),
  542. SliceParts(&value_slice, 1)));
  543. ASSERT_OK(txn2->Delete(handles[2], "XXX"));
  544. ASSERT_OK(txn2->Delete(handles[1], "XXX"));
  545. s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
  546. ASSERT_TRUE(s.IsNotFound());
  547. // Verify txn did not commit
  548. s = txn2->Commit();
  549. ASSERT_TRUE(s.IsBusy());
  550. s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value);
  551. ASSERT_TRUE(s.IsNotFound());
  552. delete txn;
  553. delete txn2;
  554. // ** MultiGet **
  555. txn = txn_db->BeginTransaction(write_options, txn_options);
  556. snapshot_read_options.snapshot = txn->GetSnapshot();
  557. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  558. ASSERT_NE(txn, nullptr);
  559. std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
  560. handles[0], handles[2]};
  561. std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
  562. std::vector<std::string> values(4);
  563. std::vector<Status> results = txn->MultiGetForUpdate(
  564. snapshot_read_options, multiget_cfh, multiget_keys, &values);
  565. ASSERT_OK(results[0]);
  566. ASSERT_OK(results[1]);
  567. ASSERT_OK(results[2]);
  568. ASSERT_TRUE(results[3].IsNotFound());
  569. ASSERT_EQ(values[0], "bar");
  570. ASSERT_EQ(values[1], "barbar");
  571. ASSERT_EQ(values[2], "foo");
  572. ASSERT_OK(txn->Delete(handles[2], "ZZZ"));
  573. ASSERT_OK(txn->Put(handles[2], "ZZZ", "YYY"));
  574. ASSERT_OK(txn->Put(handles[2], "ZZZ", "YYYY"));
  575. ASSERT_OK(txn->Delete(handles[2], "ZZZ"));
  576. ASSERT_OK(txn->Put(handles[2], "AAAZZZ", "barbarbar"));
  577. ASSERT_EQ(5, txn->GetNumKeys());
  578. // Txn should commit
  579. ASSERT_OK(txn->Commit());
  580. s = txn_db->Get(read_options, handles[2], "ZZZ", &value);
  581. ASSERT_TRUE(s.IsNotFound());
  582. // Put a key which will conflict with the next txn using the previous snapshot
  583. ASSERT_OK(txn_db->Put(write_options, handles[2], "foo", "000"));
  584. results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
  585. multiget_keys, &values);
  586. ASSERT_OK(results[0]);
  587. ASSERT_OK(results[1]);
  588. ASSERT_OK(results[2]);
  589. ASSERT_TRUE(results[3].IsNotFound());
  590. ASSERT_EQ(values[0], "bar");
  591. ASSERT_EQ(values[1], "barbar");
  592. ASSERT_EQ(values[2], "foo");
  593. // Verify Txn Did not Commit
  594. s = txn2->Commit();
  595. ASSERT_TRUE(s.IsBusy());
  596. delete txn;
  597. delete txn2;
  598. // ** Test independence and/or sharing of lock buckets across CFs and DBs **
  599. if (my_occ_opts.validate_policy == OccValidationPolicy::kValidateParallel) {
  600. struct SeenStat {
  601. uint64_t rolling_hash = 0;
  602. uintptr_t min = 0;
  603. uintptr_t max = 0;
  604. };
  605. SeenStat cur_seen;
  606. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  607. "OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr",
  608. [&](void* arg) {
  609. // Hash the pointer
  610. cur_seen.rolling_hash = Hash64(reinterpret_cast<char*>(&arg),
  611. sizeof(arg), cur_seen.rolling_hash);
  612. uintptr_t val = reinterpret_cast<uintptr_t>(arg);
  613. if (cur_seen.min == 0 || val < cur_seen.min) {
  614. cur_seen.min = val;
  615. }
  616. if (cur_seen.max == 0 || val > cur_seen.max) {
  617. cur_seen.max = val;
  618. }
  619. });
  620. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  621. // Another db sharing lock buckets
  622. auto shared_dbname =
  623. test::PerThreadDBPath("optimistic_transaction_testdb_shared");
  624. std::unique_ptr<OptimisticTransactionDB> shared_txn_db = nullptr;
  625. OpenImpl(options, my_occ_opts, shared_dbname, &shared_txn_db);
  626. // Another db not sharing lock buckets
  627. auto nonshared_dbname =
  628. test::PerThreadDBPath("optimistic_transaction_testdb_nonshared");
  629. std::unique_ptr<OptimisticTransactionDB> nonshared_txn_db = nullptr;
  630. my_occ_opts.occ_lock_buckets = bucket_count;
  631. my_occ_opts.shared_lock_buckets = nullptr;
  632. OpenImpl(options, my_occ_opts, nonshared_dbname, &nonshared_txn_db);
  633. // Plenty of keys to avoid randomly hitting the same hash sequence
  634. std::array<std::string, 30> keys;
  635. for (size_t i = 0; i < keys.size(); ++i) {
  636. keys[i] = std::to_string(i);
  637. }
  638. // Get a baseline pattern of bucket accesses
  639. cur_seen = {};
  640. txn = txn_db->BeginTransaction(write_options, txn_options);
  641. for (const auto& key : keys) {
  642. ASSERT_OK(txn->Put(handles[0], key, "blah"));
  643. }
  644. ASSERT_OK(txn->Commit());
  645. // Sufficiently large hash coverage of the space
  646. const uintptr_t min_span_bytes = sizeof(port::Mutex) * bucket_count / 2;
  647. ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
  648. // Save
  649. SeenStat base_seen = cur_seen;
  650. // Verify it is repeatable
  651. cur_seen = {};
  652. txn = txn_db->BeginTransaction(write_options, txn_options, txn);
  653. for (const auto& key : keys) {
  654. ASSERT_OK(txn->Put(handles[0], key, "moo"));
  655. }
  656. ASSERT_OK(txn->Commit());
  657. ASSERT_EQ(cur_seen.rolling_hash, base_seen.rolling_hash);
  658. ASSERT_EQ(cur_seen.min, base_seen.min);
  659. ASSERT_EQ(cur_seen.max, base_seen.max);
  660. // Try another CF
  661. cur_seen = {};
  662. txn = txn_db->BeginTransaction(write_options, txn_options, txn);
  663. for (const auto& key : keys) {
  664. ASSERT_OK(txn->Put(handles[1], key, "blah"));
  665. }
  666. ASSERT_OK(txn->Commit());
  667. // Different access pattern (different hash seed)
  668. ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
  669. // Same pointer space
  670. ASSERT_LT(cur_seen.min, base_seen.max);
  671. ASSERT_GT(cur_seen.max, base_seen.min);
  672. // Sufficiently large hash coverage of the space
  673. ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
  674. // Save
  675. SeenStat cf1_seen = cur_seen;
  676. // And another CF
  677. cur_seen = {};
  678. txn = txn_db->BeginTransaction(write_options, txn_options, txn);
  679. for (const auto& key : keys) {
  680. ASSERT_OK(txn->Put(handles[2], key, "blah"));
  681. }
  682. ASSERT_OK(txn->Commit());
  683. // Different access pattern (different hash seed)
  684. ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
  685. ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
  686. // Same pointer space
  687. ASSERT_LT(cur_seen.min, base_seen.max);
  688. ASSERT_GT(cur_seen.max, base_seen.min);
  689. // Sufficiently large hash coverage of the space
  690. ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
  691. // And DB with shared lock buckets
  692. cur_seen = {};
  693. delete txn;
  694. txn = shared_txn_db->BeginTransaction(write_options, txn_options);
  695. for (const auto& key : keys) {
  696. ASSERT_OK(txn->Put(key, "blah"));
  697. }
  698. ASSERT_OK(txn->Commit());
  699. // Different access pattern (different hash seed)
  700. ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
  701. ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
  702. // Same pointer space
  703. ASSERT_LT(cur_seen.min, base_seen.max);
  704. ASSERT_GT(cur_seen.max, base_seen.min);
  705. // Sufficiently large hash coverage of the space
  706. ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
  707. // And DB with distinct lock buckets
  708. cur_seen = {};
  709. delete txn;
  710. txn = nonshared_txn_db->BeginTransaction(write_options, txn_options);
  711. for (const auto& key : keys) {
  712. ASSERT_OK(txn->Put(key, "blah"));
  713. }
  714. ASSERT_OK(txn->Commit());
  715. // Different access pattern (different hash seed)
  716. ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
  717. ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
  718. // Different pointer space
  719. ASSERT_TRUE(cur_seen.min > base_seen.max || cur_seen.max < base_seen.min);
  720. // Sufficiently large hash coverage of the space
  721. ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
  722. delete txn;
  723. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  724. }
  725. // ** Test dropping column family before committing, or even creating txn **
  726. txn = txn_db->BeginTransaction(write_options, txn_options);
  727. ASSERT_OK(txn->Delete(handles[1], "AAA"));
  728. s = txn_db->DropColumnFamily(handles[1]);
  729. ASSERT_OK(s);
  730. s = txn_db->DropColumnFamily(handles[2]);
  731. ASSERT_OK(s);
  732. ASSERT_NOK(txn->Commit());
  733. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  734. ASSERT_OK(txn2->Delete(handles[2], "AAA"));
  735. ASSERT_NOK(txn2->Commit());
  736. delete txn;
  737. delete txn2;
  738. for (auto handle : handles) {
  739. delete handle;
  740. }
  741. }
  742. TEST_P(OptimisticTransactionTest, EmptyTest) {
  743. WriteOptions write_options;
  744. ReadOptions read_options;
  745. std::string value;
  746. ASSERT_OK(txn_db->Put(write_options, "aaa", "aaa"));
  747. Transaction* txn = txn_db->BeginTransaction(write_options);
  748. ASSERT_OK(txn->Commit());
  749. delete txn;
  750. txn = txn_db->BeginTransaction(write_options);
  751. ASSERT_OK(txn->Rollback());
  752. delete txn;
  753. txn = txn_db->BeginTransaction(write_options);
  754. ASSERT_OK(txn->GetForUpdate(read_options, "aaa", &value));
  755. ASSERT_EQ(value, "aaa");
  756. ASSERT_OK(txn->Commit());
  757. delete txn;
  758. txn = txn_db->BeginTransaction(write_options);
  759. txn->SetSnapshot();
  760. ASSERT_OK(txn->GetForUpdate(read_options, "aaa", &value));
  761. ASSERT_EQ(value, "aaa");
  762. ASSERT_OK(txn_db->Put(write_options, "aaa", "xxx"));
  763. Status s = txn->Commit();
  764. ASSERT_TRUE(s.IsBusy());
  765. delete txn;
  766. }
  767. TEST_P(OptimisticTransactionTest, PredicateManyPreceders) {
  768. WriteOptions write_options;
  769. ReadOptions read_options1, read_options2;
  770. OptimisticTransactionOptions txn_options;
  771. std::string value;
  772. txn_options.set_snapshot = true;
  773. Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
  774. read_options1.snapshot = txn1->GetSnapshot();
  775. Transaction* txn2 = txn_db->BeginTransaction(write_options);
  776. txn2->SetSnapshot();
  777. read_options2.snapshot = txn2->GetSnapshot();
  778. std::vector<Slice> multiget_keys = {"1", "2", "3"};
  779. std::vector<std::string> multiget_values;
  780. std::vector<Status> results =
  781. txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
  782. ASSERT_TRUE(results[0].IsNotFound());
  783. ASSERT_TRUE(results[1].IsNotFound());
  784. ASSERT_TRUE(results[2].IsNotFound());
  785. ASSERT_OK(txn2->Put("2", "x"));
  786. ASSERT_OK(txn2->Commit());
  787. multiget_values.clear();
  788. results =
  789. txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
  790. ASSERT_TRUE(results[0].IsNotFound());
  791. ASSERT_TRUE(results[1].IsNotFound());
  792. ASSERT_TRUE(results[2].IsNotFound());
  793. // should not commit since txn2 wrote a key txn has read
  794. Status s = txn1->Commit();
  795. ASSERT_TRUE(s.IsBusy());
  796. delete txn1;
  797. delete txn2;
  798. txn1 = txn_db->BeginTransaction(write_options, txn_options);
  799. read_options1.snapshot = txn1->GetSnapshot();
  800. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  801. read_options2.snapshot = txn2->GetSnapshot();
  802. ASSERT_OK(txn1->Put("4", "x"));
  803. ASSERT_OK(txn2->Delete("4"));
  804. // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
  805. ASSERT_OK(txn1->Commit());
  806. s = txn2->GetForUpdate(read_options2, "4", &value);
  807. ASSERT_TRUE(s.IsNotFound());
  808. // txn2 cannot commit since txn1 changed "4"
  809. s = txn2->Commit();
  810. ASSERT_TRUE(s.IsBusy());
  811. delete txn1;
  812. delete txn2;
  813. }
  814. TEST_P(OptimisticTransactionTest, LostUpdate) {
  815. WriteOptions write_options;
  816. ReadOptions read_options, read_options1, read_options2;
  817. OptimisticTransactionOptions txn_options;
  818. std::string value;
  819. // Test 2 transactions writing to the same key in multiple orders and
  820. // with/without snapshots
  821. Transaction* txn1 = txn_db->BeginTransaction(write_options);
  822. Transaction* txn2 = txn_db->BeginTransaction(write_options);
  823. ASSERT_OK(txn1->Put("1", "1"));
  824. ASSERT_OK(txn2->Put("1", "2"));
  825. ASSERT_OK(txn1->Commit());
  826. Status s = txn2->Commit();
  827. ASSERT_TRUE(s.IsBusy());
  828. delete txn1;
  829. delete txn2;
  830. txn_options.set_snapshot = true;
  831. txn1 = txn_db->BeginTransaction(write_options, txn_options);
  832. read_options1.snapshot = txn1->GetSnapshot();
  833. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  834. read_options2.snapshot = txn2->GetSnapshot();
  835. ASSERT_OK(txn1->Put("1", "3"));
  836. ASSERT_OK(txn2->Put("1", "4"));
  837. ASSERT_OK(txn1->Commit());
  838. s = txn2->Commit();
  839. ASSERT_TRUE(s.IsBusy());
  840. delete txn1;
  841. delete txn2;
  842. txn1 = txn_db->BeginTransaction(write_options, txn_options);
  843. read_options1.snapshot = txn1->GetSnapshot();
  844. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  845. read_options2.snapshot = txn2->GetSnapshot();
  846. ASSERT_OK(txn1->Put("1", "5"));
  847. ASSERT_OK(txn1->Commit());
  848. ASSERT_OK(txn2->Put("1", "6"));
  849. s = txn2->Commit();
  850. ASSERT_TRUE(s.IsBusy());
  851. delete txn1;
  852. delete txn2;
  853. txn1 = txn_db->BeginTransaction(write_options, txn_options);
  854. read_options1.snapshot = txn1->GetSnapshot();
  855. txn2 = txn_db->BeginTransaction(write_options, txn_options);
  856. read_options2.snapshot = txn2->GetSnapshot();
  857. ASSERT_OK(txn1->Put("1", "5"));
  858. ASSERT_OK(txn1->Commit());
  859. txn2->SetSnapshot();
  860. ASSERT_OK(txn2->Put("1", "6"));
  861. ASSERT_OK(txn2->Commit());
  862. delete txn1;
  863. delete txn2;
  864. txn1 = txn_db->BeginTransaction(write_options);
  865. txn2 = txn_db->BeginTransaction(write_options);
  866. ASSERT_OK(txn1->Put("1", "7"));
  867. ASSERT_OK(txn1->Commit());
  868. ASSERT_OK(txn2->Put("1", "8"));
  869. ASSERT_OK(txn2->Commit());
  870. delete txn1;
  871. delete txn2;
  872. ASSERT_OK(txn_db->Get(read_options, "1", &value));
  873. ASSERT_EQ(value, "8");
  874. }
  875. TEST_P(OptimisticTransactionTest, UntrackedWrites) {
  876. WriteOptions write_options;
  877. ReadOptions read_options;
  878. std::string value;
  879. Status s;
  880. // Verify transaction rollback works for untracked keys.
  881. Transaction* txn = txn_db->BeginTransaction(write_options);
  882. ASSERT_OK(txn->PutUntracked("untracked", "0"));
  883. ASSERT_OK(txn->Rollback());
  884. s = txn_db->Get(read_options, "untracked", &value);
  885. ASSERT_TRUE(s.IsNotFound());
  886. delete txn;
  887. txn = txn_db->BeginTransaction(write_options);
  888. const WideColumns untracked_columns{{"hello", "world"}};
  889. ASSERT_OK(txn->Put("tracked", "1"));
  890. ASSERT_OK(txn->PutUntracked("untracked", "1"));
  891. ASSERT_OK(txn->PutEntityUntracked(txn_db->DefaultColumnFamily(), "untracked",
  892. untracked_columns));
  893. ASSERT_OK(txn->MergeUntracked("untracked", "2"));
  894. ASSERT_OK(txn->DeleteUntracked("untracked"));
  895. // Write to the untracked key outside of the transaction and verify
  896. // it doesn't prevent the transaction from committing.
  897. ASSERT_OK(txn_db->Put(write_options, "untracked", "x"));
  898. ASSERT_OK(txn->Commit());
  899. s = txn_db->Get(read_options, "untracked", &value);
  900. ASSERT_TRUE(s.IsNotFound());
  901. delete txn;
  902. txn = txn_db->BeginTransaction(write_options);
  903. const WideColumns untracked_new_columns{{"foo", "bar"}};
  904. ASSERT_OK(txn->Put("tracked", "10"));
  905. ASSERT_OK(txn->PutUntracked("untracked", "A"));
  906. ASSERT_OK(txn->PutEntityUntracked(txn_db->DefaultColumnFamily(), "untracked",
  907. untracked_new_columns));
  908. // Write to tracked key outside of the transaction and verify that the
  909. // untracked keys are not written when the commit fails.
  910. ASSERT_OK(txn_db->Delete(write_options, "tracked"));
  911. s = txn->Commit();
  912. ASSERT_TRUE(s.IsBusy());
  913. s = txn_db->Get(read_options, "untracked", &value);
  914. ASSERT_TRUE(s.IsNotFound());
  915. delete txn;
  916. }
  917. TEST_P(OptimisticTransactionTest, IteratorTest) {
  918. WriteOptions write_options;
  919. ReadOptions read_options, snapshot_read_options;
  920. OptimisticTransactionOptions txn_options;
  921. std::string value;
  922. // Write some keys to the db
  923. ASSERT_OK(txn_db->Put(write_options, "A", "a"));
  924. ASSERT_OK(txn_db->Put(write_options, "G", "g"));
  925. ASSERT_OK(txn_db->Put(write_options, "F", "f"));
  926. ASSERT_OK(txn_db->Put(write_options, "C", "c"));
  927. ASSERT_OK(txn_db->Put(write_options, "D", "d"));
  928. Transaction* txn = txn_db->BeginTransaction(write_options);
  929. ASSERT_NE(txn, nullptr);
  930. // Write some keys in a txn
  931. ASSERT_OK(txn->Put("B", "b"));
  932. ASSERT_OK(txn->Put("H", "h"));
  933. ASSERT_OK(txn->Delete("D"));
  934. ASSERT_OK(txn->Put("E", "e"));
  935. txn->SetSnapshot();
  936. const Snapshot* snapshot = txn->GetSnapshot();
  937. // Write some keys to the db after the snapshot
  938. ASSERT_OK(txn_db->Put(write_options, "BB", "xx"));
  939. ASSERT_OK(txn_db->Put(write_options, "C", "xx"));
  940. read_options.snapshot = snapshot;
  941. Iterator* iter = txn->GetIterator(read_options);
  942. ASSERT_OK(iter->status());
  943. iter->SeekToFirst();
  944. // Read all keys via iter and lock them all
  945. std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
  946. for (int i = 0; i < 7; i++) {
  947. ASSERT_OK(iter->status());
  948. ASSERT_TRUE(iter->Valid());
  949. ASSERT_EQ(results[i], iter->value().ToString());
  950. ASSERT_OK(
  951. txn->GetForUpdate(read_options, iter->key(), (std::string*)nullptr));
  952. iter->Next();
  953. }
  954. ASSERT_FALSE(iter->Valid());
  955. iter->Seek("G");
  956. ASSERT_OK(iter->status());
  957. ASSERT_TRUE(iter->Valid());
  958. ASSERT_EQ("g", iter->value().ToString());
  959. iter->Prev();
  960. ASSERT_OK(iter->status());
  961. ASSERT_TRUE(iter->Valid());
  962. ASSERT_EQ("f", iter->value().ToString());
  963. iter->Seek("D");
  964. ASSERT_OK(iter->status());
  965. ASSERT_TRUE(iter->Valid());
  966. ASSERT_EQ("e", iter->value().ToString());
  967. iter->Seek("C");
  968. ASSERT_OK(iter->status());
  969. ASSERT_TRUE(iter->Valid());
  970. ASSERT_EQ("c", iter->value().ToString());
  971. iter->Next();
  972. ASSERT_OK(iter->status());
  973. ASSERT_TRUE(iter->Valid());
  974. ASSERT_EQ("e", iter->value().ToString());
  975. iter->Seek("");
  976. ASSERT_OK(iter->status());
  977. ASSERT_TRUE(iter->Valid());
  978. ASSERT_EQ("a", iter->value().ToString());
  979. iter->Seek("X");
  980. ASSERT_OK(iter->status());
  981. ASSERT_FALSE(iter->Valid());
  982. iter->SeekToLast();
  983. ASSERT_OK(iter->status());
  984. ASSERT_TRUE(iter->Valid());
  985. ASSERT_EQ("h", iter->value().ToString());
  986. // key "C" was modified in the db after txn's snapshot. txn will not commit.
  987. Status s = txn->Commit();
  988. ASSERT_TRUE(s.IsBusy());
  989. delete iter;
  990. delete txn;
  991. }
  992. TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) {
  993. // `OptimisticTransactionDB` does not allow range deletion in any API.
  994. ASSERT_TRUE(
  995. txn_db
  996. ->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b")
  997. .IsNotSupported());
  998. WriteBatch wb;
  999. ASSERT_OK(wb.DeleteRange("a", "b"));
  1000. ASSERT_NOK(txn_db->Write(WriteOptions(), &wb));
  1001. }
  1002. TEST_P(OptimisticTransactionTest, SavepointTest) {
  1003. WriteOptions write_options;
  1004. ReadOptions read_options, snapshot_read_options;
  1005. OptimisticTransactionOptions txn_options;
  1006. std::string value;
  1007. Transaction* txn = txn_db->BeginTransaction(write_options);
  1008. ASSERT_NE(txn, nullptr);
  1009. Status s = txn->RollbackToSavePoint();
  1010. ASSERT_TRUE(s.IsNotFound());
  1011. txn->SetSavePoint(); // 1
  1012. ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
  1013. s = txn->RollbackToSavePoint();
  1014. ASSERT_TRUE(s.IsNotFound());
  1015. ASSERT_OK(txn->Put("B", "b"));
  1016. ASSERT_OK(txn->Commit());
  1017. ASSERT_OK(txn_db->Get(read_options, "B", &value));
  1018. ASSERT_EQ("b", value);
  1019. delete txn;
  1020. txn = txn_db->BeginTransaction(write_options);
  1021. ASSERT_NE(txn, nullptr);
  1022. ASSERT_OK(txn->Put("A", "a"));
  1023. ASSERT_OK(txn->Put("B", "bb"));
  1024. ASSERT_OK(txn->Put("C", "c"));
  1025. txn->SetSavePoint(); // 2
  1026. ASSERT_OK(txn->Delete("B"));
  1027. ASSERT_OK(txn->Put("C", "cc"));
  1028. ASSERT_OK(txn->Put("D", "d"));
  1029. ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
  1030. ASSERT_OK(txn->Get(read_options, "A", &value));
  1031. ASSERT_EQ("a", value);
  1032. ASSERT_OK(txn->Get(read_options, "B", &value));
  1033. ASSERT_EQ("bb", value);
  1034. ASSERT_OK(txn->Get(read_options, "C", &value));
  1035. ASSERT_EQ("c", value);
  1036. s = txn->Get(read_options, "D", &value);
  1037. ASSERT_TRUE(s.IsNotFound());
  1038. ASSERT_OK(txn->Put("A", "a"));
  1039. ASSERT_OK(txn->Put("E", "e"));
  1040. // Rollback to beginning of txn
  1041. s = txn->RollbackToSavePoint();
  1042. ASSERT_TRUE(s.IsNotFound());
  1043. ASSERT_OK(txn->Rollback());
  1044. s = txn->Get(read_options, "A", &value);
  1045. ASSERT_TRUE(s.IsNotFound());
  1046. ASSERT_OK(txn->Get(read_options, "B", &value));
  1047. ASSERT_EQ("b", value);
  1048. s = txn->Get(read_options, "D", &value);
  1049. ASSERT_TRUE(s.IsNotFound());
  1050. s = txn->Get(read_options, "D", &value);
  1051. ASSERT_TRUE(s.IsNotFound());
  1052. s = txn->Get(read_options, "E", &value);
  1053. ASSERT_TRUE(s.IsNotFound());
  1054. ASSERT_OK(txn->Put("A", "aa"));
  1055. ASSERT_OK(txn->Put("F", "f"));
  1056. txn->SetSavePoint(); // 3
  1057. txn->SetSavePoint(); // 4
  1058. ASSERT_OK(txn->Put("G", "g"));
  1059. ASSERT_OK(txn->Delete("F"));
  1060. ASSERT_OK(txn->Delete("B"));
  1061. ASSERT_OK(txn->Get(read_options, "A", &value));
  1062. ASSERT_EQ("aa", value);
  1063. s = txn->Get(read_options, "F", &value);
  1064. ASSERT_TRUE(s.IsNotFound());
  1065. s = txn->Get(read_options, "B", &value);
  1066. ASSERT_TRUE(s.IsNotFound());
  1067. ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
  1068. ASSERT_OK(txn->Get(read_options, "F", &value));
  1069. ASSERT_EQ("f", value);
  1070. s = txn->Get(read_options, "G", &value);
  1071. ASSERT_TRUE(s.IsNotFound());
  1072. ASSERT_OK(txn->Commit());
  1073. ASSERT_OK(txn_db->Get(read_options, "F", &value));
  1074. ASSERT_EQ("f", value);
  1075. s = txn_db->Get(read_options, "G", &value);
  1076. ASSERT_TRUE(s.IsNotFound());
  1077. ASSERT_OK(txn_db->Get(read_options, "A", &value));
  1078. ASSERT_EQ("aa", value);
  1079. ASSERT_OK(txn_db->Get(read_options, "B", &value));
  1080. ASSERT_EQ("b", value);
  1081. s = txn_db->Get(read_options, "C", &value);
  1082. ASSERT_TRUE(s.IsNotFound());
  1083. s = txn_db->Get(read_options, "D", &value);
  1084. ASSERT_TRUE(s.IsNotFound());
  1085. s = txn_db->Get(read_options, "E", &value);
  1086. ASSERT_TRUE(s.IsNotFound());
  1087. delete txn;
  1088. }
  1089. TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) {
  1090. WriteOptions write_options;
  1091. ReadOptions read_options, snapshot_read_options;
  1092. OptimisticTransactionOptions txn_options;
  1093. std::string value;
  1094. ASSERT_OK(txn_db->Put(write_options, "A", ""));
  1095. Transaction* txn1 = txn_db->BeginTransaction(write_options);
  1096. ASSERT_TRUE(txn1);
  1097. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1098. txn1->UndoGetForUpdate("A");
  1099. Transaction* txn2 = txn_db->BeginTransaction(write_options);
  1100. ASSERT_OK(txn2->Put("A", "x"));
  1101. ASSERT_OK(txn2->Commit());
  1102. delete txn2;
  1103. // Verify that txn1 can commit since A isn't conflict checked
  1104. ASSERT_OK(txn1->Commit());
  1105. delete txn1;
  1106. txn1 = txn_db->BeginTransaction(write_options);
  1107. ASSERT_OK(txn1->Put("A", "a"));
  1108. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1109. txn1->UndoGetForUpdate("A");
  1110. txn2 = txn_db->BeginTransaction(write_options);
  1111. ASSERT_OK(txn2->Put("A", "x"));
  1112. ASSERT_OK(txn2->Commit());
  1113. delete txn2;
  1114. // Verify that txn1 cannot commit since A will still be conflict checked
  1115. Status s = txn1->Commit();
  1116. ASSERT_TRUE(s.IsBusy());
  1117. delete txn1;
  1118. txn1 = txn_db->BeginTransaction(write_options);
  1119. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1120. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1121. txn1->UndoGetForUpdate("A");
  1122. txn2 = txn_db->BeginTransaction(write_options);
  1123. ASSERT_OK(txn2->Put("A", "x"));
  1124. ASSERT_OK(txn2->Commit());
  1125. delete txn2;
  1126. // Verify that txn1 cannot commit since A will still be conflict checked
  1127. s = txn1->Commit();
  1128. ASSERT_TRUE(s.IsBusy());
  1129. delete txn1;
  1130. txn1 = txn_db->BeginTransaction(write_options);
  1131. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1132. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1133. txn1->UndoGetForUpdate("A");
  1134. txn1->UndoGetForUpdate("A");
  1135. txn2 = txn_db->BeginTransaction(write_options);
  1136. ASSERT_OK(txn2->Put("A", "x"));
  1137. ASSERT_OK(txn2->Commit());
  1138. delete txn2;
  1139. // Verify that txn1 can commit since A isn't conflict checked
  1140. ASSERT_OK(txn1->Commit());
  1141. delete txn1;
  1142. txn1 = txn_db->BeginTransaction(write_options);
  1143. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1144. txn1->SetSavePoint();
  1145. txn1->UndoGetForUpdate("A");
  1146. txn2 = txn_db->BeginTransaction(write_options);
  1147. ASSERT_OK(txn2->Put("A", "x"));
  1148. ASSERT_OK(txn2->Commit());
  1149. delete txn2;
  1150. // Verify that txn1 cannot commit since A will still be conflict checked
  1151. s = txn1->Commit();
  1152. ASSERT_TRUE(s.IsBusy());
  1153. delete txn1;
  1154. txn1 = txn_db->BeginTransaction(write_options);
  1155. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1156. txn1->SetSavePoint();
  1157. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1158. txn1->UndoGetForUpdate("A");
  1159. txn2 = txn_db->BeginTransaction(write_options);
  1160. ASSERT_OK(txn2->Put("A", "x"));
  1161. ASSERT_OK(txn2->Commit());
  1162. delete txn2;
  1163. // Verify that txn1 cannot commit since A will still be conflict checked
  1164. s = txn1->Commit();
  1165. ASSERT_TRUE(s.IsBusy());
  1166. delete txn1;
  1167. txn1 = txn_db->BeginTransaction(write_options);
  1168. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1169. txn1->SetSavePoint();
  1170. ASSERT_OK(txn1->GetForUpdate(read_options, "A", &value));
  1171. txn1->UndoGetForUpdate("A");
  1172. ASSERT_OK(txn1->RollbackToSavePoint());
  1173. txn1->UndoGetForUpdate("A");
  1174. txn2 = txn_db->BeginTransaction(write_options);
  1175. ASSERT_OK(txn2->Put("A", "x"));
  1176. ASSERT_OK(txn2->Commit());
  1177. delete txn2;
  1178. // Verify that txn1 can commit since A isn't conflict checked
  1179. ASSERT_OK(txn1->Commit());
  1180. delete txn1;
  1181. }
  1182. namespace {
  1183. Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
  1184. const size_t num_transactions,
  1185. const size_t num_sets,
  1186. const size_t num_keys_per_set) {
  1187. size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
  1188. Random64 _rand(seed);
  1189. WriteOptions write_options;
  1190. ReadOptions read_options;
  1191. OptimisticTransactionOptions txn_options;
  1192. txn_options.set_snapshot = true;
  1193. RandomTransactionInserter inserter(&_rand, write_options, read_options,
  1194. num_keys_per_set,
  1195. static_cast<uint16_t>(num_sets));
  1196. for (size_t t = 0; t < num_transactions; t++) {
  1197. bool success = inserter.OptimisticTransactionDBInsert(db, txn_options);
  1198. if (!success) {
  1199. // unexpected failure
  1200. return inserter.GetLastStatus();
  1201. }
  1202. }
  1203. inserter.GetLastStatus().PermitUncheckedError();
  1204. // Make sure at least some of the transactions succeeded. It's ok if
  1205. // some failed due to write-conflicts.
  1206. if (inserter.GetFailureCount() > num_transactions / 2) {
  1207. return Status::TryAgain("Too many transactions failed! " +
  1208. std::to_string(inserter.GetFailureCount()) + " / " +
  1209. std::to_string(num_transactions));
  1210. }
  1211. return Status::OK();
  1212. }
  1213. } // namespace
  1214. TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
  1215. const size_t num_threads = 4;
  1216. const size_t num_transactions_per_thread = 10000;
  1217. const size_t num_sets = 3;
  1218. const size_t num_keys_per_set = 100;
  1219. // Setting the key-space to be 100 keys should cause enough write-conflicts
  1220. // to make this test interesting.
  1221. std::vector<port::Thread> threads;
  1222. std::function<void()> call_inserter = [&] {
  1223. ASSERT_OK(OptimisticTransactionStressTestInserter(
  1224. txn_db.get(), num_transactions_per_thread, num_sets, num_keys_per_set));
  1225. };
  1226. // Create N threads that use RandomTransactionInserter to write
  1227. // many transactions.
  1228. for (uint32_t i = 0; i < num_threads; i++) {
  1229. threads.emplace_back(call_inserter);
  1230. }
  1231. // Wait for all threads to run
  1232. for (auto& t : threads) {
  1233. t.join();
  1234. }
  1235. // Verify that data is consistent
  1236. Status s = RandomTransactionInserter::Verify(txn_db.get(), num_sets);
  1237. ASSERT_OK(s);
  1238. }
  1239. TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
  1240. WriteOptions write_options;
  1241. OptimisticTransactionOptions transaction_options;
  1242. Transaction* transaction(
  1243. txn_db->BeginTransaction(write_options, transaction_options));
  1244. Status s = transaction->Put("foo", "val");
  1245. ASSERT_OK(s);
  1246. s = transaction->Put("foo2", "val");
  1247. ASSERT_OK(s);
  1248. s = transaction->Put("foo3", "val");
  1249. ASSERT_OK(s);
  1250. s = transaction->Commit();
  1251. ASSERT_OK(s);
  1252. delete transaction;
  1253. Reopen();
  1254. transaction = txn_db->BeginTransaction(write_options, transaction_options);
  1255. s = transaction->Put("bar", "val");
  1256. ASSERT_OK(s);
  1257. s = transaction->Put("bar2", "val");
  1258. ASSERT_OK(s);
  1259. s = transaction->Commit();
  1260. ASSERT_OK(s);
  1261. delete transaction;
  1262. }
  1263. #ifdef __SANITIZE_THREAD__
  1264. // Skip OptimisticTransactionTest.SequenceNumberAfterRecoverLargeTest under TSAN
  1265. // to avoid false positive because of TSAN lock limit of 64.
  1266. #else
  1267. TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverLargeTest) {
  1268. WriteOptions write_options;
  1269. OptimisticTransactionOptions transaction_options;
  1270. Transaction* transaction(
  1271. txn_db->BeginTransaction(write_options, transaction_options));
  1272. std::string value(1024 * 1024, 'X');
  1273. const size_t n_zero = 2;
  1274. std::string s_i;
  1275. Status s;
  1276. for (int i = 1; i <= 64; i++) {
  1277. s_i = std::to_string(i);
  1278. auto key = std::string(n_zero - std::min(n_zero, s_i.length()), '0') + s_i;
  1279. s = transaction->Put(key, value);
  1280. ASSERT_OK(s);
  1281. }
  1282. s = transaction->Commit();
  1283. ASSERT_OK(s);
  1284. delete transaction;
  1285. Reopen();
  1286. transaction = txn_db->BeginTransaction(write_options, transaction_options);
  1287. s = transaction->Put("bar", "val");
  1288. ASSERT_OK(s);
  1289. s = transaction->Commit();
  1290. if (!s.ok()) {
  1291. std::cerr << "Failed to commit records. Error: " << s.ToString()
  1292. << std::endl;
  1293. }
  1294. ASSERT_OK(s);
  1295. delete transaction;
  1296. }
  1297. #endif // __SANITIZE_THREAD__
  1298. TEST_P(OptimisticTransactionTest, TimestampedSnapshotMissingCommitTs) {
  1299. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1300. ASSERT_OK(txn->Put("a", "v"));
  1301. Status s = txn->CommitAndTryCreateSnapshot();
  1302. ASSERT_TRUE(s.IsInvalidArgument());
  1303. }
  1304. TEST_P(OptimisticTransactionTest, TimestampedSnapshotSetCommitTs) {
  1305. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1306. ASSERT_OK(txn->Put("a", "v"));
  1307. std::shared_ptr<const Snapshot> snapshot;
  1308. Status s = txn->CommitAndTryCreateSnapshot(nullptr, /*ts=*/100, &snapshot);
  1309. ASSERT_TRUE(s.IsNotSupported());
  1310. }
  1311. TEST_P(OptimisticTransactionTest, PutEntitySuccess) {
  1312. constexpr char foo[] = "foo";
  1313. const WideColumns foo_columns{
  1314. {kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
  1315. const WideColumns foo_new_columns{
  1316. {kDefaultWideColumnName, "baz"}, {"colA", "valA"}, {"colB", "valB"}};
  1317. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1318. foo, foo_columns));
  1319. {
  1320. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1321. ASSERT_NE(txn, nullptr);
  1322. ASSERT_EQ(txn->GetNumPutEntities(), 0);
  1323. {
  1324. PinnableWideColumns columns;
  1325. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1326. foo, &columns));
  1327. ASSERT_EQ(columns.columns(), foo_columns);
  1328. }
  1329. {
  1330. PinnableWideColumns columns;
  1331. ASSERT_OK(txn->GetEntityForUpdate(
  1332. ReadOptions(), txn_db->DefaultColumnFamily(), foo, &columns));
  1333. ASSERT_EQ(columns.columns(), foo_columns);
  1334. }
  1335. ASSERT_OK(
  1336. txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));
  1337. ASSERT_EQ(txn->GetNumPutEntities(), 1);
  1338. {
  1339. PinnableWideColumns columns;
  1340. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1341. foo, &columns));
  1342. ASSERT_EQ(columns.columns(), foo_new_columns);
  1343. }
  1344. {
  1345. PinnableWideColumns columns;
  1346. ASSERT_OK(txn->GetEntityForUpdate(
  1347. ReadOptions(), txn_db->DefaultColumnFamily(), foo, &columns));
  1348. ASSERT_EQ(columns.columns(), foo_new_columns);
  1349. }
  1350. ASSERT_OK(txn->Commit());
  1351. }
  1352. {
  1353. PinnableWideColumns columns;
  1354. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1355. foo, &columns));
  1356. ASSERT_EQ(columns.columns(), foo_new_columns);
  1357. }
  1358. }
  1359. TEST_P(OptimisticTransactionTest, PutEntityWriteConflict) {
  1360. constexpr char foo[] = "foo";
  1361. const WideColumns foo_columns{
  1362. {kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
  1363. constexpr char baz[] = "baz";
  1364. const WideColumns baz_columns{
  1365. {kDefaultWideColumnName, "quux"}, {"colA", "valA"}, {"colB", "valB"}};
  1366. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1367. foo, foo_columns));
  1368. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1369. baz, baz_columns));
  1370. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1371. ASSERT_NE(txn, nullptr);
  1372. {
  1373. PinnableWideColumns columns;
  1374. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), foo,
  1375. &columns));
  1376. ASSERT_EQ(columns.columns(), foo_columns);
  1377. }
  1378. {
  1379. PinnableWideColumns columns;
  1380. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), baz,
  1381. &columns));
  1382. ASSERT_EQ(columns.columns(), baz_columns);
  1383. }
  1384. {
  1385. constexpr size_t num_keys = 2;
  1386. std::array<Slice, num_keys> keys{{foo, baz}};
  1387. std::array<PinnableWideColumns, num_keys> results;
  1388. std::array<Status, num_keys> statuses;
  1389. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1390. keys.data(), results.data(), statuses.data());
  1391. ASSERT_OK(statuses[0]);
  1392. ASSERT_OK(statuses[1]);
  1393. ASSERT_EQ(results[0].columns(), foo_columns);
  1394. ASSERT_EQ(results[1].columns(), baz_columns);
  1395. }
  1396. const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
  1397. {"hello", "world"}};
  1398. const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
  1399. {"ping", "pong"}};
  1400. ASSERT_OK(
  1401. txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));
  1402. ASSERT_OK(
  1403. txn->PutEntity(txn_db->DefaultColumnFamily(), baz, baz_new_columns));
  1404. {
  1405. PinnableWideColumns columns;
  1406. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), foo,
  1407. &columns));
  1408. ASSERT_EQ(columns.columns(), foo_new_columns);
  1409. }
  1410. {
  1411. PinnableWideColumns columns;
  1412. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), baz,
  1413. &columns));
  1414. ASSERT_EQ(columns.columns(), baz_new_columns);
  1415. }
  1416. {
  1417. constexpr size_t num_keys = 2;
  1418. std::array<Slice, num_keys> keys{{foo, baz}};
  1419. std::array<PinnableWideColumns, num_keys> results;
  1420. std::array<Status, num_keys> statuses;
  1421. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1422. keys.data(), results.data(), statuses.data());
  1423. ASSERT_OK(statuses[0]);
  1424. ASSERT_OK(statuses[1]);
  1425. ASSERT_EQ(results[0].columns(), foo_new_columns);
  1426. ASSERT_EQ(results[1].columns(), baz_new_columns);
  1427. }
  1428. // This PutEntity outside of a transaction will conflict with the previous
  1429. // write
  1430. const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
  1431. {"conflicting", "write"}};
  1432. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1433. foo, foo_conflict_columns));
  1434. {
  1435. PinnableWideColumns columns;
  1436. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1437. foo, &columns));
  1438. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1439. }
  1440. ASSERT_TRUE(txn->Commit().IsBusy()); // Txn should not commit
  1441. // Verify that transaction did not write anything
  1442. {
  1443. PinnableWideColumns columns;
  1444. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1445. foo, &columns));
  1446. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1447. }
  1448. {
  1449. PinnableWideColumns columns;
  1450. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1451. baz, &columns));
  1452. ASSERT_EQ(columns.columns(), baz_columns);
  1453. }
  1454. {
  1455. constexpr size_t num_keys = 2;
  1456. std::array<Slice, num_keys> keys{{foo, baz}};
  1457. std::array<PinnableWideColumns, num_keys> results;
  1458. std::array<Status, num_keys> statuses;
  1459. constexpr bool sorted_input = false;
  1460. txn_db->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1461. num_keys, keys.data(), results.data(),
  1462. statuses.data(), sorted_input);
  1463. ASSERT_OK(statuses[0]);
  1464. ASSERT_OK(statuses[1]);
  1465. ASSERT_EQ(results[0].columns(), foo_conflict_columns);
  1466. ASSERT_EQ(results[1].columns(), baz_columns);
  1467. }
  1468. }
  1469. TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
  1470. constexpr char foo[] = "foo";
  1471. const WideColumns foo_columns{
  1472. {kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
  1473. constexpr char baz[] = "baz";
  1474. const WideColumns baz_columns{
  1475. {kDefaultWideColumnName, "quux"}, {"colA", "valA"}, {"colB", "valB"}};
  1476. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1477. foo, foo_columns));
  1478. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1479. baz, baz_columns));
  1480. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1481. ASSERT_NE(txn, nullptr);
  1482. {
  1483. PinnableWideColumns columns;
  1484. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), foo,
  1485. &columns));
  1486. ASSERT_EQ(columns.columns(), foo_columns);
  1487. }
  1488. {
  1489. PinnableWideColumns columns;
  1490. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), baz,
  1491. &columns));
  1492. ASSERT_EQ(columns.columns(), baz_columns);
  1493. }
  1494. {
  1495. constexpr size_t num_keys = 2;
  1496. std::array<Slice, num_keys> keys{{foo, baz}};
  1497. std::array<PinnableWideColumns, num_keys> results;
  1498. std::array<Status, num_keys> statuses;
  1499. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1500. keys.data(), results.data(), statuses.data());
  1501. ASSERT_OK(statuses[0]);
  1502. ASSERT_OK(statuses[1]);
  1503. ASSERT_EQ(results[0].columns(), foo_columns);
  1504. ASSERT_EQ(results[1].columns(), baz_columns);
  1505. }
  1506. const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
  1507. {"hello", "world"}};
  1508. const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
  1509. {"ping", "pong"}};
  1510. ASSERT_OK(
  1511. txn->PutEntity(txn_db->DefaultColumnFamily(), foo, foo_new_columns));
  1512. ASSERT_OK(
  1513. txn->PutEntity(txn_db->DefaultColumnFamily(), baz, baz_new_columns));
  1514. {
  1515. PinnableWideColumns columns;
  1516. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), foo,
  1517. &columns));
  1518. ASSERT_EQ(columns.columns(), foo_new_columns);
  1519. }
  1520. {
  1521. PinnableWideColumns columns;
  1522. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), baz,
  1523. &columns));
  1524. ASSERT_EQ(columns.columns(), baz_new_columns);
  1525. }
  1526. {
  1527. constexpr size_t num_keys = 2;
  1528. std::array<Slice, num_keys> keys{{foo, baz}};
  1529. std::array<PinnableWideColumns, num_keys> results;
  1530. std::array<Status, num_keys> statuses;
  1531. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1532. keys.data(), results.data(), statuses.data());
  1533. ASSERT_OK(statuses[0]);
  1534. ASSERT_OK(statuses[1]);
  1535. ASSERT_EQ(results[0].columns(), foo_new_columns);
  1536. ASSERT_EQ(results[1].columns(), baz_new_columns);
  1537. }
  1538. std::unique_ptr<Transaction> conflicting_txn(
  1539. txn_db->BeginTransaction(WriteOptions()));
  1540. ASSERT_NE(conflicting_txn, nullptr);
  1541. const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
  1542. {"conflicting", "write"}};
  1543. ASSERT_OK(conflicting_txn->PutEntity(txn_db->DefaultColumnFamily(), foo,
  1544. foo_conflict_columns));
  1545. ASSERT_OK(conflicting_txn->Commit());
  1546. {
  1547. PinnableWideColumns columns;
  1548. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1549. foo, &columns));
  1550. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1551. }
  1552. ASSERT_TRUE(txn->Commit().IsBusy()); // Txn should not commit
  1553. // Verify that transaction did not write anything
  1554. {
  1555. PinnableWideColumns columns;
  1556. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1557. foo, &columns));
  1558. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1559. }
  1560. {
  1561. PinnableWideColumns columns;
  1562. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1563. baz, &columns));
  1564. ASSERT_EQ(columns.columns(), baz_columns);
  1565. }
  1566. {
  1567. constexpr size_t num_keys = 2;
  1568. std::array<Slice, num_keys> keys{{foo, baz}};
  1569. std::array<PinnableWideColumns, num_keys> results;
  1570. std::array<Status, num_keys> statuses;
  1571. constexpr bool sorted_input = false;
  1572. txn_db->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1573. num_keys, keys.data(), results.data(),
  1574. statuses.data(), sorted_input);
  1575. ASSERT_OK(statuses[0]);
  1576. ASSERT_OK(statuses[1]);
  1577. ASSERT_EQ(results[0].columns(), foo_conflict_columns);
  1578. ASSERT_EQ(results[1].columns(), baz_columns);
  1579. }
  1580. }
  1581. TEST_P(OptimisticTransactionTest, PutEntityReadConflict) {
  1582. constexpr char foo[] = "foo";
  1583. const WideColumns foo_columns{
  1584. {kDefaultWideColumnName, "bar"}, {"col1", "val1"}, {"col2", "val2"}};
  1585. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1586. foo, foo_columns));
  1587. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1588. ASSERT_NE(txn, nullptr);
  1589. txn->SetSnapshot();
  1590. ReadOptions snapshot_read_options;
  1591. snapshot_read_options.snapshot = txn->GetSnapshot();
  1592. {
  1593. PinnableWideColumns columns;
  1594. ASSERT_OK(txn->GetEntityForUpdate(
  1595. snapshot_read_options, txn_db->DefaultColumnFamily(), foo, &columns));
  1596. ASSERT_EQ(columns.columns(), foo_columns);
  1597. }
  1598. // This PutEntity outside of a transaction will conflict with the previous
  1599. // write
  1600. const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
  1601. {"conflicting", "write"}};
  1602. ASSERT_OK(txn_db->PutEntity(WriteOptions(), txn_db->DefaultColumnFamily(),
  1603. foo, foo_conflict_columns));
  1604. {
  1605. PinnableWideColumns columns;
  1606. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1607. foo, &columns));
  1608. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1609. }
  1610. {
  1611. PinnableWideColumns columns;
  1612. ASSERT_OK(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), foo,
  1613. &columns));
  1614. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1615. }
  1616. ASSERT_TRUE(txn->Commit().IsBusy()); // Txn should not commit
  1617. {
  1618. PinnableWideColumns columns;
  1619. ASSERT_OK(txn_db->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1620. foo, &columns));
  1621. ASSERT_EQ(columns.columns(), foo_conflict_columns);
  1622. }
  1623. }
  1624. TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
  1625. constexpr char foo[] = "foo";
  1626. constexpr char bar[] = "bar";
  1627. constexpr size_t num_keys = 2;
  1628. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1629. ASSERT_NE(txn, nullptr);
  1630. {
  1631. constexpr ColumnFamilyHandle* column_family = nullptr;
  1632. PinnableWideColumns columns;
  1633. ASSERT_TRUE(txn->GetEntity(ReadOptions(), column_family, foo, &columns)
  1634. .IsInvalidArgument());
  1635. }
  1636. {
  1637. constexpr PinnableWideColumns* columns = nullptr;
  1638. ASSERT_TRUE(txn->GetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
  1639. foo, columns)
  1640. .IsInvalidArgument());
  1641. }
  1642. {
  1643. ReadOptions read_options;
  1644. read_options.io_activity = Env::IOActivity::kGet;
  1645. PinnableWideColumns columns;
  1646. ASSERT_TRUE(txn->GetEntity(read_options, txn_db->DefaultColumnFamily(), foo,
  1647. &columns)
  1648. .IsInvalidArgument());
  1649. }
  1650. {
  1651. constexpr ColumnFamilyHandle* column_family = nullptr;
  1652. std::array<Slice, num_keys> keys{{foo, bar}};
  1653. std::array<PinnableWideColumns, num_keys> results;
  1654. std::array<Status, num_keys> statuses;
  1655. constexpr bool sorted_input = false;
  1656. txn->MultiGetEntity(ReadOptions(), column_family, num_keys, keys.data(),
  1657. results.data(), statuses.data(), sorted_input);
  1658. ASSERT_TRUE(statuses[0].IsInvalidArgument());
  1659. ASSERT_TRUE(statuses[1].IsInvalidArgument());
  1660. }
  1661. {
  1662. constexpr Slice* keys = nullptr;
  1663. std::array<PinnableWideColumns, num_keys> results;
  1664. std::array<Status, num_keys> statuses;
  1665. constexpr bool sorted_input = false;
  1666. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1667. keys, results.data(), statuses.data(), sorted_input);
  1668. ASSERT_TRUE(statuses[0].IsInvalidArgument());
  1669. ASSERT_TRUE(statuses[1].IsInvalidArgument());
  1670. }
  1671. {
  1672. std::array<Slice, num_keys> keys{{foo, bar}};
  1673. constexpr PinnableWideColumns* results = nullptr;
  1674. std::array<Status, num_keys> statuses;
  1675. constexpr bool sorted_input = false;
  1676. txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
  1677. keys.data(), results, statuses.data(), sorted_input);
  1678. ASSERT_TRUE(statuses[0].IsInvalidArgument());
  1679. ASSERT_TRUE(statuses[1].IsInvalidArgument());
  1680. }
  1681. {
  1682. ReadOptions read_options;
  1683. read_options.io_activity = Env::IOActivity::kMultiGet;
  1684. std::array<Slice, num_keys> keys{{foo, bar}};
  1685. std::array<PinnableWideColumns, num_keys> results;
  1686. std::array<Status, num_keys> statuses;
  1687. constexpr bool sorted_input = false;
  1688. txn->MultiGetEntity(read_options, txn_db->DefaultColumnFamily(), num_keys,
  1689. keys.data(), results.data(), statuses.data(),
  1690. sorted_input);
  1691. ASSERT_TRUE(statuses[0].IsInvalidArgument());
  1692. ASSERT_TRUE(statuses[1].IsInvalidArgument());
  1693. }
  1694. {
  1695. constexpr ColumnFamilyHandle* column_family = nullptr;
  1696. PinnableWideColumns columns;
  1697. ASSERT_TRUE(
  1698. txn->GetEntityForUpdate(ReadOptions(), column_family, foo, &columns)
  1699. .IsInvalidArgument());
  1700. }
  1701. {
  1702. constexpr PinnableWideColumns* columns = nullptr;
  1703. ASSERT_TRUE(txn->GetEntityForUpdate(ReadOptions(),
  1704. txn_db->DefaultColumnFamily(), foo,
  1705. columns)
  1706. .IsInvalidArgument());
  1707. }
  1708. {
  1709. ReadOptions read_options;
  1710. read_options.io_activity = Env::IOActivity::kGet;
  1711. PinnableWideColumns columns;
  1712. ASSERT_TRUE(txn->GetEntityForUpdate(read_options,
  1713. txn_db->DefaultColumnFamily(), foo,
  1714. &columns)
  1715. .IsInvalidArgument());
  1716. }
  1717. {
  1718. txn->SetSnapshot();
  1719. ReadOptions read_options;
  1720. read_options.snapshot = txn->GetSnapshot();
  1721. PinnableWideColumns columns;
  1722. constexpr bool exclusive = true;
  1723. constexpr bool do_validate = false;
  1724. ASSERT_TRUE(txn->GetEntityForUpdate(read_options,
  1725. txn_db->DefaultColumnFamily(), foo,
  1726. &columns, exclusive, do_validate)
  1727. .IsInvalidArgument());
  1728. }
  1729. }
  1730. TEST_P(OptimisticTransactionTest, CoalescingIterator) {
  1731. ColumnFamilyOptions cf_opts;
  1732. cf_opts.enable_blob_files = true;
  1733. ColumnFamilyHandle* cfh1 = nullptr;
  1734. ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
  1735. std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
  1736. ColumnFamilyHandle* cfh2 = nullptr;
  1737. ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
  1738. std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
  1739. // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
  1740. // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
  1741. // database; "b" keys are present only in the transaction; "c" keys are
  1742. // present in both the database and the transaction. The values indicate the
  1743. // column family as well as whether the entry came from the database or the
  1744. // transaction.
  1745. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
  1746. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
  1747. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
  1748. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
  1749. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
  1750. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
  1751. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
  1752. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
  1753. ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1));
  1754. ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2));
  1755. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1756. ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
  1757. ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
  1758. ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
  1759. ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
  1760. ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
  1761. ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
  1762. ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
  1763. ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
  1764. auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
  1765. ReadOptions read_options;
  1766. read_options.allow_unprepared_value = allow_unprepared_value;
  1767. std::unique_ptr<Iterator> iter(
  1768. txn->GetCoalescingIterator(read_options, {cfh1, cfh2}));
  1769. {
  1770. iter->SeekToFirst();
  1771. ASSERT_TRUE(iter->Valid());
  1772. ASSERT_OK(iter->status());
  1773. ASSERT_EQ(iter->key(), "cf12_a");
  1774. prepare_if_needed(iter.get());
  1775. ASSERT_EQ(iter->value(), "cf12_a_db_cf2");
  1776. }
  1777. {
  1778. iter->Next();
  1779. ASSERT_TRUE(iter->Valid());
  1780. ASSERT_OK(iter->status());
  1781. ASSERT_EQ(iter->key(), "cf12_b");
  1782. prepare_if_needed(iter.get());
  1783. ASSERT_EQ(iter->value(), "cf12_b_txn_cf2");
  1784. }
  1785. {
  1786. iter->Next();
  1787. ASSERT_TRUE(iter->Valid());
  1788. ASSERT_OK(iter->status());
  1789. ASSERT_EQ(iter->key(), "cf12_c");
  1790. prepare_if_needed(iter.get());
  1791. ASSERT_EQ(iter->value(), "cf12_c_txn_cf2");
  1792. }
  1793. {
  1794. iter->Next();
  1795. ASSERT_TRUE(iter->Valid());
  1796. ASSERT_OK(iter->status());
  1797. ASSERT_EQ(iter->key(), "cf1_a");
  1798. prepare_if_needed(iter.get());
  1799. ASSERT_EQ(iter->value(), "cf1_a_db_cf1");
  1800. }
  1801. {
  1802. iter->Next();
  1803. ASSERT_TRUE(iter->Valid());
  1804. ASSERT_OK(iter->status());
  1805. ASSERT_EQ(iter->key(), "cf1_b");
  1806. prepare_if_needed(iter.get());
  1807. ASSERT_EQ(iter->value(), "cf1_b_txn_cf1");
  1808. }
  1809. {
  1810. iter->Next();
  1811. ASSERT_TRUE(iter->Valid());
  1812. ASSERT_OK(iter->status());
  1813. ASSERT_EQ(iter->key(), "cf1_c");
  1814. prepare_if_needed(iter.get());
  1815. ASSERT_EQ(iter->value(), "cf1_c_txn_cf1");
  1816. }
  1817. {
  1818. iter->Next();
  1819. ASSERT_TRUE(iter->Valid());
  1820. ASSERT_OK(iter->status());
  1821. ASSERT_EQ(iter->key(), "cf2_a");
  1822. prepare_if_needed(iter.get());
  1823. ASSERT_EQ(iter->value(), "cf2_a_db_cf2");
  1824. }
  1825. {
  1826. iter->Next();
  1827. ASSERT_TRUE(iter->Valid());
  1828. ASSERT_OK(iter->status());
  1829. ASSERT_EQ(iter->key(), "cf2_b");
  1830. prepare_if_needed(iter.get());
  1831. ASSERT_EQ(iter->value(), "cf2_b_txn_cf2");
  1832. }
  1833. {
  1834. iter->Next();
  1835. ASSERT_TRUE(iter->Valid());
  1836. ASSERT_OK(iter->status());
  1837. ASSERT_EQ(iter->key(), "cf2_c");
  1838. prepare_if_needed(iter.get());
  1839. ASSERT_EQ(iter->value(), "cf2_c_txn_cf2");
  1840. }
  1841. {
  1842. iter->Next();
  1843. ASSERT_FALSE(iter->Valid());
  1844. ASSERT_OK(iter->status());
  1845. }
  1846. };
  1847. verify(/* allow_unprepared_value */ false, [](Iterator*) {});
  1848. verify(/* allow_unprepared_value */ true, [](Iterator* iter) {
  1849. ASSERT_TRUE(iter->value().empty());
  1850. ASSERT_TRUE(iter->PrepareValue());
  1851. });
  1852. }
  1853. TEST_P(OptimisticTransactionTest, CoalescingIteratorSanityChecks) {
  1854. ColumnFamilyOptions cf1_opts;
  1855. ColumnFamilyHandle* cfh1 = nullptr;
  1856. ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
  1857. std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
  1858. ColumnFamilyOptions cf2_opts;
  1859. cf2_opts.comparator = ReverseBytewiseComparator();
  1860. ColumnFamilyHandle* cfh2 = nullptr;
  1861. ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
  1862. std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
  1863. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1864. {
  1865. std::unique_ptr<Iterator> iter(
  1866. txn->GetCoalescingIterator(ReadOptions(), {}));
  1867. ASSERT_TRUE(iter->status().IsInvalidArgument());
  1868. }
  1869. {
  1870. std::unique_ptr<Iterator> iter(
  1871. txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2}));
  1872. ASSERT_TRUE(iter->status().IsInvalidArgument());
  1873. }
  1874. {
  1875. ReadOptions read_options;
  1876. read_options.io_activity = Env::IOActivity::kCompaction;
  1877. std::unique_ptr<Iterator> iter(
  1878. txn->GetCoalescingIterator(read_options, {cfh1}));
  1879. ASSERT_TRUE(iter->status().IsInvalidArgument());
  1880. }
  1881. }
  1882. TEST_P(OptimisticTransactionTest, AttributeGroupIterator) {
  1883. ColumnFamilyOptions cf_opts;
  1884. cf_opts.enable_blob_files = true;
  1885. ColumnFamilyHandle* cfh1 = nullptr;
  1886. ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
  1887. std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
  1888. ColumnFamilyHandle* cfh2 = nullptr;
  1889. ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
  1890. std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
  1891. // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
  1892. // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
  1893. // database; "b" keys are present only in the transaction; "c" keys are
  1894. // present in both the database and the transaction. The values indicate the
  1895. // column family as well as whether the entry came from the database or the
  1896. // transaction.
  1897. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
  1898. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
  1899. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
  1900. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
  1901. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
  1902. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
  1903. ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
  1904. ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
  1905. ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1));
  1906. ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2));
  1907. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  1908. ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
  1909. ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
  1910. ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
  1911. ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
  1912. ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
  1913. ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
  1914. ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
  1915. ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
  1916. auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
  1917. ReadOptions read_options;
  1918. read_options.allow_unprepared_value = allow_unprepared_value;
  1919. std::unique_ptr<AttributeGroupIterator> iter(
  1920. txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2}));
  1921. {
  1922. iter->SeekToFirst();
  1923. ASSERT_TRUE(iter->Valid());
  1924. ASSERT_OK(iter->status());
  1925. ASSERT_EQ(iter->key(), "cf12_a");
  1926. prepare_if_needed(iter.get());
  1927. WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}};
  1928. WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}};
  1929. IteratorAttributeGroups expected{
  1930. IteratorAttributeGroup{cfh1, &cf1_columns},
  1931. IteratorAttributeGroup{cfh2, &cf2_columns}};
  1932. ASSERT_EQ(iter->attribute_groups(), expected);
  1933. }
  1934. {
  1935. iter->Next();
  1936. ASSERT_TRUE(iter->Valid());
  1937. ASSERT_OK(iter->status());
  1938. ASSERT_EQ(iter->key(), "cf12_b");
  1939. prepare_if_needed(iter.get());
  1940. WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}};
  1941. WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}};
  1942. IteratorAttributeGroups expected{
  1943. IteratorAttributeGroup{cfh1, &cf1_columns},
  1944. IteratorAttributeGroup{cfh2, &cf2_columns}};
  1945. ASSERT_EQ(iter->attribute_groups(), expected);
  1946. }
  1947. {
  1948. iter->Next();
  1949. ASSERT_TRUE(iter->Valid());
  1950. ASSERT_OK(iter->status());
  1951. ASSERT_EQ(iter->key(), "cf12_c");
  1952. prepare_if_needed(iter.get());
  1953. WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}};
  1954. WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}};
  1955. IteratorAttributeGroups expected{
  1956. IteratorAttributeGroup{cfh1, &cf1_columns},
  1957. IteratorAttributeGroup{cfh2, &cf2_columns}};
  1958. ASSERT_EQ(iter->attribute_groups(), expected);
  1959. }
  1960. {
  1961. iter->Next();
  1962. ASSERT_TRUE(iter->Valid());
  1963. ASSERT_OK(iter->status());
  1964. ASSERT_EQ(iter->key(), "cf1_a");
  1965. prepare_if_needed(iter.get());
  1966. WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}};
  1967. IteratorAttributeGroups expected{
  1968. IteratorAttributeGroup{cfh1, &cf1_columns}};
  1969. ASSERT_EQ(iter->attribute_groups(), expected);
  1970. }
  1971. {
  1972. iter->Next();
  1973. ASSERT_TRUE(iter->Valid());
  1974. ASSERT_OK(iter->status());
  1975. ASSERT_EQ(iter->key(), "cf1_b");
  1976. prepare_if_needed(iter.get());
  1977. WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}};
  1978. IteratorAttributeGroups expected{
  1979. IteratorAttributeGroup{cfh1, &cf1_columns}};
  1980. ASSERT_EQ(iter->attribute_groups(), expected);
  1981. }
  1982. {
  1983. iter->Next();
  1984. ASSERT_TRUE(iter->Valid());
  1985. ASSERT_OK(iter->status());
  1986. ASSERT_EQ(iter->key(), "cf1_c");
  1987. prepare_if_needed(iter.get());
  1988. WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}};
  1989. IteratorAttributeGroups expected{
  1990. IteratorAttributeGroup{cfh1, &cf1_columns}};
  1991. ASSERT_EQ(iter->attribute_groups(), expected);
  1992. }
  1993. {
  1994. iter->Next();
  1995. ASSERT_TRUE(iter->Valid());
  1996. ASSERT_OK(iter->status());
  1997. ASSERT_EQ(iter->key(), "cf2_a");
  1998. prepare_if_needed(iter.get());
  1999. WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}};
  2000. IteratorAttributeGroups expected{
  2001. IteratorAttributeGroup{cfh2, &cf2_columns}};
  2002. ASSERT_EQ(iter->attribute_groups(), expected);
  2003. }
  2004. {
  2005. iter->Next();
  2006. ASSERT_TRUE(iter->Valid());
  2007. ASSERT_OK(iter->status());
  2008. ASSERT_EQ(iter->key(), "cf2_b");
  2009. prepare_if_needed(iter.get());
  2010. WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}};
  2011. IteratorAttributeGroups expected{
  2012. IteratorAttributeGroup{cfh2, &cf2_columns}};
  2013. ASSERT_EQ(iter->attribute_groups(), expected);
  2014. }
  2015. {
  2016. iter->Next();
  2017. ASSERT_TRUE(iter->Valid());
  2018. ASSERT_OK(iter->status());
  2019. ASSERT_EQ(iter->key(), "cf2_c");
  2020. prepare_if_needed(iter.get());
  2021. WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}};
  2022. IteratorAttributeGroups expected{
  2023. IteratorAttributeGroup{cfh2, &cf2_columns}};
  2024. ASSERT_EQ(iter->attribute_groups(), expected);
  2025. }
  2026. {
  2027. iter->Next();
  2028. ASSERT_FALSE(iter->Valid());
  2029. ASSERT_OK(iter->status());
  2030. }
  2031. };
  2032. verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {});
  2033. verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) {
  2034. ASSERT_TRUE(iter->attribute_groups().empty());
  2035. ASSERT_TRUE(iter->PrepareValue());
  2036. });
  2037. }
  2038. TEST_P(OptimisticTransactionTest, AttributeGroupIteratorSanityChecks) {
  2039. ColumnFamilyOptions cf1_opts;
  2040. ColumnFamilyHandle* cfh1 = nullptr;
  2041. ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
  2042. std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
  2043. ColumnFamilyOptions cf2_opts;
  2044. cf2_opts.comparator = ReverseBytewiseComparator();
  2045. ColumnFamilyHandle* cfh2 = nullptr;
  2046. ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
  2047. std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
  2048. std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
  2049. {
  2050. std::unique_ptr<AttributeGroupIterator> iter(
  2051. txn->GetAttributeGroupIterator(ReadOptions(), {}));
  2052. ASSERT_TRUE(iter->status().IsInvalidArgument());
  2053. }
  2054. {
  2055. std::unique_ptr<AttributeGroupIterator> iter(
  2056. txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2}));
  2057. ASSERT_TRUE(iter->status().IsInvalidArgument());
  2058. }
  2059. {
  2060. ReadOptions read_options;
  2061. read_options.io_activity = Env::IOActivity::kCompaction;
  2062. std::unique_ptr<AttributeGroupIterator> iter(
  2063. txn->GetAttributeGroupIterator(read_options, {cfh1}));
  2064. ASSERT_TRUE(iter->status().IsInvalidArgument());
  2065. }
  2066. }
  2067. INSTANTIATE_TEST_CASE_P(
  2068. InstanceOccGroup, OptimisticTransactionTest,
  2069. testing::Values(OccValidationPolicy::kValidateSerial,
  2070. OccValidationPolicy::kValidateParallel));
  2071. TEST(OccLockBucketsTest, CacheAligned) {
  2072. // Typical x86_64 is 40 byte mutex, 64 byte cache line
  2073. if (sizeof(port::Mutex) >= sizeof(CacheAlignedWrapper<port::Mutex>)) {
  2074. ROCKSDB_GTEST_BYPASS("Test requires mutex smaller than cache line");
  2075. return;
  2076. }
  2077. auto buckets_unaligned = MakeSharedOccLockBuckets(100, false);
  2078. auto buckets_aligned = MakeSharedOccLockBuckets(100, true);
  2079. // Save at least one byte per bucket
  2080. ASSERT_LE(buckets_unaligned->ApproximateMemoryUsage() + 100,
  2081. buckets_aligned->ApproximateMemoryUsage());
  2082. }
  2083. } // namespace ROCKSDB_NAMESPACE
  2084. int main(int argc, char** argv) {
  2085. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2086. ::testing::InitGoogleTest(&argc, argv);
  2087. return RUN_ALL_TESTS();
  2088. }