write_unprepared_txn_db.h 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include "utilities/transactions/write_prepared_txn_db.h"
  8. #include "utilities/transactions/write_unprepared_txn.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. class WriteUnpreparedTxn;
  11. class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
  12. public:
  13. using WritePreparedTxnDB::WritePreparedTxnDB;
  14. Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
  15. const std::vector<ColumnFamilyHandle*>& handles) override;
  16. Transaction* BeginTransaction(const WriteOptions& write_options,
  17. const TransactionOptions& txn_options,
  18. Transaction* old_txn) override;
  19. // Struct to hold ownership of snapshot and read callback for cleanup.
  20. struct IteratorState;
  21. using WritePreparedTxnDB::NewIterator;
  22. Iterator* NewIterator(const ReadOptions& options,
  23. ColumnFamilyHandle* column_family,
  24. WriteUnpreparedTxn* txn);
  25. private:
  26. Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn);
  27. };
  28. class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
  29. // TODO(lth): Reduce code duplication with
  30. // WritePreparedCommitEntryPreReleaseCallback
  31. public:
  32. // includes_data indicates that the commit also writes non-empty
  33. // CommitTimeWriteBatch to memtable, which needs to be committed separately.
  34. WriteUnpreparedCommitEntryPreReleaseCallback(
  35. WritePreparedTxnDB* db, DBImpl* db_impl,
  36. const std::map<SequenceNumber, size_t>& unprep_seqs,
  37. size_t data_batch_cnt = 0, bool publish_seq = true)
  38. : db_(db),
  39. db_impl_(db_impl),
  40. unprep_seqs_(unprep_seqs),
  41. data_batch_cnt_(data_batch_cnt),
  42. includes_data_(data_batch_cnt_ > 0),
  43. publish_seq_(publish_seq) {
  44. assert(unprep_seqs.size() > 0);
  45. }
  46. virtual Status Callback(SequenceNumber commit_seq,
  47. bool is_mem_disabled __attribute__((__unused__)),
  48. uint64_t, size_t /*index*/,
  49. size_t /*total*/) override {
  50. const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
  51. ? commit_seq
  52. : commit_seq + data_batch_cnt_ - 1;
  53. // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
  54. for (const auto& s : unprep_seqs_) {
  55. for (size_t i = 0; i < s.second; i++) {
  56. db_->AddCommitted(s.first + i, last_commit_seq);
  57. }
  58. }
  59. if (includes_data_) {
  60. assert(data_batch_cnt_);
  61. // Commit the data that is accompanied with the commit request
  62. for (size_t i = 0; i < data_batch_cnt_; i++) {
  63. // For commit seq of each batch use the commit seq of the last batch.
  64. // This would make debugging easier by having all the batches having
  65. // the same sequence number.
  66. db_->AddCommitted(commit_seq + i, last_commit_seq);
  67. }
  68. }
  69. if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
  70. assert(is_mem_disabled); // implies the 2nd queue
  71. // Publish the sequence number. We can do that here assuming the callback
  72. // is invoked only from one write queue, which would guarantee that the
  73. // publish sequence numbers will be in order, i.e., once a seq is
  74. // published all the seq prior to that are also publishable.
  75. db_impl_->SetLastPublishedSequence(last_commit_seq);
  76. }
  77. // else SequenceNumber that is updated as part of the write already does the
  78. // publishing
  79. return Status::OK();
  80. }
  81. private:
  82. WritePreparedTxnDB* db_;
  83. DBImpl* db_impl_;
  84. const std::map<SequenceNumber, size_t>& unprep_seqs_;
  85. size_t data_batch_cnt_;
  86. // Either because it is commit without prepare or it has a
  87. // CommitTimeWriteBatch
  88. bool includes_data_;
  89. // Should the callback also publishes the commit seq number
  90. bool publish_seq_;
  91. };
  92. class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
  93. // TODO(lth): Reduce code duplication with
  94. // WritePreparedCommitEntryPreReleaseCallback
  95. public:
  96. WriteUnpreparedRollbackPreReleaseCallback(
  97. WritePreparedTxnDB* db, DBImpl* db_impl,
  98. const std::map<SequenceNumber, size_t>& unprep_seqs,
  99. SequenceNumber rollback_seq)
  100. : db_(db),
  101. db_impl_(db_impl),
  102. unprep_seqs_(unprep_seqs),
  103. rollback_seq_(rollback_seq) {
  104. assert(unprep_seqs.size() > 0);
  105. assert(db_impl_->immutable_db_options().two_write_queues);
  106. }
  107. virtual Status Callback(SequenceNumber commit_seq,
  108. bool is_mem_disabled __attribute__((__unused__)),
  109. uint64_t, size_t /*index*/,
  110. size_t /*total*/) override {
  111. assert(is_mem_disabled); // implies the 2nd queue
  112. const uint64_t last_commit_seq = commit_seq;
  113. db_->AddCommitted(rollback_seq_, last_commit_seq);
  114. // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
  115. for (const auto& s : unprep_seqs_) {
  116. for (size_t i = 0; i < s.second; i++) {
  117. db_->AddCommitted(s.first + i, last_commit_seq);
  118. }
  119. }
  120. db_impl_->SetLastPublishedSequence(last_commit_seq);
  121. return Status::OK();
  122. }
  123. private:
  124. WritePreparedTxnDB* db_;
  125. DBImpl* db_impl_;
  126. const std::map<SequenceNumber, size_t>& unprep_seqs_;
  127. SequenceNumber rollback_seq_;
  128. };
  129. } // namespace ROCKSDB_NAMESPACE
  130. #endif // ROCKSDB_LITE