timestamped_snapshot_test.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <cassert>
  6. #include "util/cast_util.h"
  7. #include "utilities/transactions/transaction_test.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. constexpr std::array TimestampedSnapshotWithTsSanityCheck_Params = {
  10. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
  11. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
  12. std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite)};
  13. INSTANTIATE_TEST_CASE_P(
  14. Unsupported, TimestampedSnapshotWithTsSanityCheck,
  15. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  16. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering),
  17. TimestampedSnapshotWithTsSanityCheck_Params)));
  18. INSTANTIATE_TEST_CASE_P(
  19. WriteCommitted, TransactionTest,
  20. ::testing::Combine(/*use_stackable_db=*/::testing::Bool(),
  21. /*two_write_queue=*/::testing::Bool(),
  22. ::testing::Values(WRITE_COMMITTED),
  23. ::testing::Values(kOrderedWrite),
  24. /*use_per_key_point_lock_mgr=*/::testing::Bool(),
  25. /*deadlock_timeout_us=*/::testing::Values(0, 1000)));
  26. namespace {
  27. // Not thread-safe. Caller needs to provide external synchronization.
  28. class TsCheckingTxnNotifier : public TransactionNotifier {
  29. public:
  30. explicit TsCheckingTxnNotifier() = default;
  31. ~TsCheckingTxnNotifier() override = default;
  32. void SnapshotCreated(const Snapshot* new_snapshot) override {
  33. assert(new_snapshot);
  34. if (prev_snapshot_seq_ != kMaxSequenceNumber) {
  35. assert(prev_snapshot_seq_ <= new_snapshot->GetSequenceNumber());
  36. }
  37. prev_snapshot_seq_ = new_snapshot->GetSequenceNumber();
  38. if (prev_snapshot_ts_ != kMaxTxnTimestamp) {
  39. assert(prev_snapshot_ts_ <= new_snapshot->GetTimestamp());
  40. }
  41. prev_snapshot_ts_ = new_snapshot->GetTimestamp();
  42. }
  43. TxnTimestamp prev_snapshot_ts() const { return prev_snapshot_ts_; }
  44. private:
  45. SequenceNumber prev_snapshot_seq_ = kMaxSequenceNumber;
  46. TxnTimestamp prev_snapshot_ts_ = kMaxTxnTimestamp;
  47. };
  48. } // anonymous namespace
  49. TEST_P(TimestampedSnapshotWithTsSanityCheck, WithoutCommitTs) {
  50. std::unique_ptr<Transaction> txn(
  51. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  52. assert(txn);
  53. ASSERT_OK(txn->SetName("txn0"));
  54. ASSERT_OK(txn->Put("a", "v"));
  55. ASSERT_OK(txn->Prepare());
  56. Status s = txn->CommitAndTryCreateSnapshot();
  57. ASSERT_TRUE(s.IsInvalidArgument());
  58. ASSERT_OK(txn->Rollback());
  59. txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
  60. assert(txn);
  61. ASSERT_OK(txn->SetName("txn0"));
  62. ASSERT_OK(txn->Put("a", "v"));
  63. s = txn->CommitAndTryCreateSnapshot();
  64. ASSERT_TRUE(s.IsInvalidArgument());
  65. }
  66. TEST_P(TimestampedSnapshotWithTsSanityCheck, SetCommitTs) {
  67. std::unique_ptr<Transaction> txn(
  68. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  69. assert(txn);
  70. ASSERT_OK(txn->SetName("txn0"));
  71. ASSERT_OK(txn->Put("a", "v"));
  72. ASSERT_OK(txn->Prepare());
  73. std::shared_ptr<const Snapshot> snapshot;
  74. Status s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot);
  75. ASSERT_TRUE(s.IsNotSupported());
  76. ASSERT_OK(txn->Rollback());
  77. txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
  78. assert(txn);
  79. ASSERT_OK(txn->SetName("txn0"));
  80. ASSERT_OK(txn->Put("a", "v"));
  81. s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot);
  82. ASSERT_TRUE(s.IsNotSupported());
  83. }
  84. TEST_P(TransactionTest, WithoutCommitTs) {
  85. std::unique_ptr<Transaction> txn(
  86. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  87. assert(txn);
  88. ASSERT_OK(txn->SetName("txn0"));
  89. ASSERT_OK(txn->Put("a", "v"));
  90. ASSERT_OK(txn->Prepare());
  91. Status s = txn->CommitAndTryCreateSnapshot();
  92. ASSERT_TRUE(s.IsInvalidArgument());
  93. ASSERT_OK(txn->Rollback());
  94. txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
  95. assert(txn);
  96. ASSERT_OK(txn->SetName("txn0"));
  97. ASSERT_OK(txn->Put("a", "v"));
  98. s = txn->CommitAndTryCreateSnapshot();
  99. ASSERT_TRUE(s.IsInvalidArgument());
  100. }
  101. TEST_P(TransactionTest, ReuseExistingTxn) {
  102. Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
  103. assert(txn);
  104. ASSERT_OK(txn->SetName("txn0"));
  105. ASSERT_OK(txn->Put("a", "v1"));
  106. ASSERT_OK(txn->Prepare());
  107. auto notifier = std::make_shared<TsCheckingTxnNotifier>();
  108. std::shared_ptr<const Snapshot> snapshot1;
  109. Status s =
  110. txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/100, &snapshot1);
  111. ASSERT_OK(s);
  112. ASSERT_EQ(100, snapshot1->GetTimestamp());
  113. Transaction* txn1 =
  114. db->BeginTransaction(WriteOptions(), TransactionOptions(), txn);
  115. assert(txn1 == txn);
  116. ASSERT_OK(txn1->SetName("txn1"));
  117. ASSERT_OK(txn->Put("a", "v2"));
  118. ASSERT_OK(txn->Prepare());
  119. std::shared_ptr<const Snapshot> snapshot2;
  120. s = txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/110, &snapshot2);
  121. ASSERT_OK(s);
  122. ASSERT_EQ(110, snapshot2->GetTimestamp());
  123. delete txn;
  124. {
  125. std::string value;
  126. ReadOptions read_opts;
  127. read_opts.snapshot = snapshot1.get();
  128. ASSERT_OK(db->Get(read_opts, "a", &value));
  129. ASSERT_EQ("v1", value);
  130. read_opts.snapshot = snapshot2.get();
  131. ASSERT_OK(db->Get(read_opts, "a", &value));
  132. ASSERT_EQ("v2", value);
  133. }
  134. }
  135. TEST_P(TransactionTest, CreateSnapshotWhenCommit) {
  136. std::unique_ptr<Transaction> txn(
  137. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  138. assert(txn);
  139. constexpr int batch_size = 10;
  140. for (int i = 0; i < batch_size; ++i) {
  141. ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), "v0"));
  142. }
  143. const SequenceNumber seq0 = db->GetLatestSequenceNumber();
  144. ASSERT_EQ(static_cast<SequenceNumber>(batch_size), seq0);
  145. txn->SetSnapshot();
  146. {
  147. const Snapshot* const snapshot = txn->GetSnapshot();
  148. assert(snapshot);
  149. ASSERT_EQ(seq0, snapshot->GetSequenceNumber());
  150. }
  151. for (int i = 0; i < batch_size; ++i) {
  152. ASSERT_OK(txn->Put("k" + std::to_string(i), "v1"));
  153. }
  154. ASSERT_OK(txn->SetName("txn0"));
  155. ASSERT_OK(txn->Prepare());
  156. std::shared_ptr<const Snapshot> snapshot;
  157. constexpr TxnTimestamp timestamp = 1;
  158. auto notifier = std::make_shared<TsCheckingTxnNotifier>();
  159. Status s = txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot);
  160. ASSERT_OK(s);
  161. ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
  162. assert(snapshot);
  163. ASSERT_EQ(timestamp, snapshot->GetTimestamp());
  164. ASSERT_EQ(seq0 + batch_size, snapshot->GetSequenceNumber());
  165. const Snapshot* const raw_snapshot_ptr = txn->GetSnapshot();
  166. ASSERT_EQ(raw_snapshot_ptr, snapshot.get());
  167. ASSERT_EQ(snapshot, txn->GetTimestampedSnapshot());
  168. {
  169. std::shared_ptr<const Snapshot> snapshot1 =
  170. db->GetLatestTimestampedSnapshot();
  171. ASSERT_EQ(snapshot, snapshot1);
  172. }
  173. {
  174. std::shared_ptr<const Snapshot> snapshot1 =
  175. db->GetTimestampedSnapshot(timestamp);
  176. ASSERT_EQ(snapshot, snapshot1);
  177. }
  178. {
  179. std::vector<std::shared_ptr<const Snapshot> > snapshots;
  180. s = db->GetAllTimestampedSnapshots(snapshots);
  181. ASSERT_OK(s);
  182. ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{snapshot},
  183. snapshots);
  184. }
  185. }
  186. TEST_P(TransactionTest, CreateSnapshot) {
  187. // First create a non-timestamped snapshot
  188. ManagedSnapshot snapshot_guard(db);
  189. for (int i = 0; i < 10; ++i) {
  190. ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i),
  191. "v0_" + std::to_string(i)));
  192. }
  193. {
  194. auto ret = db->CreateTimestampedSnapshot(kMaxTxnTimestamp);
  195. ASSERT_TRUE(ret.first.IsInvalidArgument());
  196. auto snapshot = ret.second;
  197. ASSERT_EQ(nullptr, snapshot.get());
  198. }
  199. constexpr TxnTimestamp timestamp = 100;
  200. Status s;
  201. std::shared_ptr<const Snapshot> ts_snap0;
  202. std::tie(s, ts_snap0) = db->CreateTimestampedSnapshot(timestamp);
  203. ASSERT_OK(s);
  204. assert(ts_snap0);
  205. ASSERT_EQ(timestamp, ts_snap0->GetTimestamp());
  206. for (int i = 0; i < 10; ++i) {
  207. ASSERT_OK(db->Delete(WriteOptions(), "k" + std::to_string(i)));
  208. }
  209. {
  210. ReadOptions read_opts;
  211. read_opts.snapshot = ts_snap0.get();
  212. for (int i = 0; i < 10; ++i) {
  213. std::string value;
  214. s = db->Get(read_opts, "k" + std::to_string(i), &value);
  215. ASSERT_OK(s);
  216. ASSERT_EQ("v0_" + std::to_string(i), value);
  217. }
  218. }
  219. {
  220. std::shared_ptr<const Snapshot> snapshot =
  221. db->GetLatestTimestampedSnapshot();
  222. ASSERT_EQ(ts_snap0, snapshot);
  223. }
  224. {
  225. std::shared_ptr<const Snapshot> snapshot =
  226. db->GetTimestampedSnapshot(timestamp);
  227. ASSERT_OK(s);
  228. ASSERT_EQ(ts_snap0, snapshot);
  229. }
  230. {
  231. std::vector<std::shared_ptr<const Snapshot> > snapshots;
  232. s = db->GetAllTimestampedSnapshots(snapshots);
  233. ASSERT_OK(s);
  234. ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{ts_snap0},
  235. snapshots);
  236. }
  237. }
  238. TEST_P(TransactionTest, SequenceAndTsOrder) {
  239. Status s;
  240. std::shared_ptr<const Snapshot> snapshot;
  241. std::tie(s, snapshot) = db->CreateTimestampedSnapshot(100);
  242. ASSERT_OK(s);
  243. assert(snapshot);
  244. {
  245. // Cannot request smaller timestamp for the new timestamped snapshot.
  246. std::shared_ptr<const Snapshot> tmp_snapshot;
  247. std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(50);
  248. ASSERT_TRUE(s.IsInvalidArgument());
  249. ASSERT_EQ(nullptr, tmp_snapshot.get());
  250. }
  251. // If requesting a new timestamped snapshot with the same timestamp and
  252. // sequence number, we avoid creating new snapshot object but reuse
  253. // exisisting one.
  254. std::shared_ptr<const Snapshot> snapshot1;
  255. std::tie(s, snapshot1) = db->CreateTimestampedSnapshot(100);
  256. ASSERT_OK(s);
  257. ASSERT_EQ(snapshot.get(), snapshot1.get());
  258. // If there is no write, but we request a larger timestamp, we still create
  259. // a new snapshot object.
  260. std::shared_ptr<const Snapshot> snapshot2;
  261. std::tie(s, snapshot2) = db->CreateTimestampedSnapshot(200);
  262. ASSERT_OK(s);
  263. assert(snapshot2);
  264. ASSERT_NE(snapshot.get(), snapshot2.get());
  265. ASSERT_EQ(snapshot2->GetSequenceNumber(), snapshot->GetSequenceNumber());
  266. ASSERT_EQ(200, snapshot2->GetTimestamp());
  267. // Increase sequence number.
  268. ASSERT_OK(db->Put(WriteOptions(), "foo", "v0"));
  269. {
  270. // We are requesting the same timestamp for a larger sequence number, thus
  271. // we cannot create timestamped snapshot.
  272. std::shared_ptr<const Snapshot> tmp_snapshot;
  273. std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(200);
  274. ASSERT_TRUE(s.IsInvalidArgument());
  275. ASSERT_EQ(nullptr, tmp_snapshot.get());
  276. }
  277. {
  278. std::unique_ptr<Transaction> txn1(
  279. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  280. ASSERT_OK(txn1->Put("bar", "v0"));
  281. std::shared_ptr<const Snapshot> ss;
  282. ASSERT_OK(txn1->CommitAndTryCreateSnapshot(nullptr, 200, &ss));
  283. // Cannot create snapshot because requested timestamp is the same as the
  284. // latest timestamped snapshot while sequence number is strictly higher.
  285. ASSERT_EQ(nullptr, ss);
  286. }
  287. {
  288. std::unique_ptr<Transaction> txn2(
  289. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  290. ASSERT_OK(txn2->Put("bar", "v0"));
  291. std::shared_ptr<const Snapshot> ss;
  292. // Application should never do this. This is just to demonstrate error
  293. // handling.
  294. ASSERT_OK(txn2->CommitAndTryCreateSnapshot(nullptr, 100, &ss));
  295. // Cannot create snapshot because requested timestamp is smaller than
  296. // latest timestamped snapshot.
  297. ASSERT_EQ(nullptr, ss);
  298. }
  299. }
  300. TEST_P(TransactionTest, CloseDbWithSnapshots) {
  301. std::unique_ptr<Transaction> txn(
  302. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  303. ASSERT_OK(txn->SetName("txn0"));
  304. ASSERT_OK(txn->Put("foo", "v"));
  305. ASSERT_OK(txn->Prepare());
  306. std::shared_ptr<const Snapshot> snapshot;
  307. constexpr TxnTimestamp timestamp = 121;
  308. auto notifier = std::make_shared<TsCheckingTxnNotifier>();
  309. ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot));
  310. assert(snapshot);
  311. ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
  312. ASSERT_EQ(timestamp, snapshot->GetTimestamp());
  313. ASSERT_TRUE(db->Close().IsAborted());
  314. }
  315. TEST_P(TransactionTest, MultipleTimestampedSnapshots) {
  316. auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
  317. assert(dbimpl);
  318. const bool seq_per_batch = dbimpl->seq_per_batch();
  319. // TODO: remove the following assert(!seq_per_batch) once timestamped snapshot
  320. // is supported in write-prepared/write-unprepared transactions.
  321. assert(!seq_per_batch);
  322. constexpr size_t txn_size = 10;
  323. constexpr TxnTimestamp ts_delta = 10;
  324. constexpr size_t num_txns = 100;
  325. std::vector<std::shared_ptr<const Snapshot> > snapshots(num_txns);
  326. constexpr TxnTimestamp start_ts = 10000;
  327. auto notifier = std::make_shared<TsCheckingTxnNotifier>();
  328. for (size_t i = 0; i < num_txns; ++i) {
  329. std::unique_ptr<Transaction> txn(
  330. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  331. ASSERT_OK(txn->SetName("txn" + std::to_string(i)));
  332. for (size_t j = 0; j < txn_size; ++j) {
  333. ASSERT_OK(txn->Put("k" + std::to_string(j),
  334. "v" + std::to_string(j) + "_" + std::to_string(i)));
  335. }
  336. if (0 == (i % 2)) {
  337. ASSERT_OK(txn->Prepare());
  338. }
  339. ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, start_ts + i * ts_delta,
  340. &snapshots[i]));
  341. assert(snapshots[i]);
  342. ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
  343. ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp());
  344. }
  345. {
  346. auto snapshot = db->GetTimestampedSnapshot(start_ts + 1);
  347. ASSERT_EQ(nullptr, snapshot);
  348. }
  349. constexpr TxnTimestamp max_ts = start_ts + num_txns * ts_delta;
  350. for (size_t i = 0; i < num_txns; ++i) {
  351. auto snapshot = db->GetTimestampedSnapshot(start_ts + i * ts_delta);
  352. ASSERT_EQ(snapshots[i], snapshot);
  353. std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
  354. Status s = db->GetTimestampedSnapshots(max_ts, start_ts + i * ts_delta,
  355. tmp_snapshots);
  356. ASSERT_TRUE(s.IsInvalidArgument());
  357. ASSERT_TRUE(tmp_snapshots.empty());
  358. for (size_t j = i; j < num_txns; ++j) {
  359. std::vector<std::shared_ptr<const Snapshot> > expected_snapshots(
  360. snapshots.begin() + i, snapshots.begin() + j);
  361. tmp_snapshots.clear();
  362. s = db->GetTimestampedSnapshots(start_ts + i * ts_delta,
  363. start_ts + j * ts_delta, tmp_snapshots);
  364. if (i < j) {
  365. ASSERT_OK(s);
  366. } else {
  367. ASSERT_TRUE(s.IsInvalidArgument());
  368. }
  369. ASSERT_EQ(expected_snapshots, tmp_snapshots);
  370. }
  371. }
  372. {
  373. std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
  374. const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots);
  375. ASSERT_OK(s);
  376. ASSERT_EQ(snapshots, tmp_snapshots);
  377. const std::shared_ptr<const Snapshot> latest_snapshot =
  378. db->GetLatestTimestampedSnapshot();
  379. ASSERT_EQ(snapshots.back(), latest_snapshot);
  380. }
  381. for (size_t i = 0; i <= num_txns; ++i) {
  382. std::vector<std::shared_ptr<const Snapshot> > snapshots1(
  383. snapshots.begin() + i, snapshots.end());
  384. if (i > 0) {
  385. auto snapshot1 =
  386. db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta);
  387. assert(snapshot1);
  388. ASSERT_EQ(start_ts + (i - 1) * ts_delta, snapshot1->GetTimestamp());
  389. }
  390. db->ReleaseTimestampedSnapshotsOlderThan(start_ts + i * ts_delta);
  391. if (i > 0) {
  392. auto snapshot1 =
  393. db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta);
  394. ASSERT_EQ(nullptr, snapshot1);
  395. }
  396. std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
  397. const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots);
  398. ASSERT_OK(s);
  399. ASSERT_EQ(snapshots1, tmp_snapshots);
  400. }
  401. // Even after released by db, the applications still hold reference to shared
  402. // snapshots.
  403. for (size_t i = 0; i < num_txns; ++i) {
  404. assert(snapshots[i]);
  405. ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp());
  406. }
  407. snapshots.clear();
  408. ASSERT_OK(db->Close());
  409. delete db;
  410. db = nullptr;
  411. }
  412. } // namespace ROCKSDB_NAMESPACE
  413. int main(int argc, char** argv) {
  414. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  415. ::testing::InitGoogleTest(&argc, argv);
  416. return RUN_ALL_TESTS();
  417. }