pessimistic_transaction_db.cc 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/pessimistic_transaction_db.h"
  7. #include <cinttypes>
  8. #include <string>
  9. #include <unordered_set>
  10. #include <vector>
  11. #include "db/db_impl/db_impl.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/options.h"
  14. #include "rocksdb/utilities/transaction_db.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/cast_util.h"
  17. #include "util/mutexlock.h"
  18. #include "utilities/transactions/pessimistic_transaction.h"
  19. #include "utilities/transactions/transaction_db_mutex_impl.h"
  20. #include "utilities/transactions/write_prepared_txn_db.h"
  21. #include "utilities/transactions/write_unprepared_txn_db.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. PessimisticTransactionDB::PessimisticTransactionDB(
  24. DB* db, const TransactionDBOptions& txn_db_options)
  25. : TransactionDB(db),
  26. db_impl_(static_cast_with_check<DBImpl, DB>(db)),
  27. txn_db_options_(txn_db_options),
  28. lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
  29. txn_db_options_.max_num_deadlocks,
  30. txn_db_options_.custom_mutex_factory
  31. ? txn_db_options_.custom_mutex_factory
  32. : std::shared_ptr<TransactionDBMutexFactory>(
  33. new TransactionDBMutexFactoryImpl())) {
  34. assert(db_impl_ != nullptr);
  35. info_log_ = db_impl_->GetDBOptions().info_log;
  36. }
  37. // Support initiliazing PessimisticTransactionDB from a stackable db
  38. //
  39. // PessimisticTransactionDB
  40. // ^ ^
  41. // | |
  42. // | +
  43. // | StackableDB
  44. // | ^
  45. // | |
  46. // + +
  47. // DBImpl
  48. // ^
  49. // |(inherit)
  50. // +
  51. // DB
  52. //
  53. PessimisticTransactionDB::PessimisticTransactionDB(
  54. StackableDB* db, const TransactionDBOptions& txn_db_options)
  55. : TransactionDB(db),
  56. db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
  57. txn_db_options_(txn_db_options),
  58. lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
  59. txn_db_options_.max_num_deadlocks,
  60. txn_db_options_.custom_mutex_factory
  61. ? txn_db_options_.custom_mutex_factory
  62. : std::shared_ptr<TransactionDBMutexFactory>(
  63. new TransactionDBMutexFactoryImpl())) {
  64. assert(db_impl_ != nullptr);
  65. }
  66. PessimisticTransactionDB::~PessimisticTransactionDB() {
  67. while (!transactions_.empty()) {
  68. delete transactions_.begin()->second;
  69. // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
  70. // clear whether delete would also remove the entry from transactions_.
  71. }
  72. }
  73. Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
  74. return Status::OK();
  75. }
  76. Status PessimisticTransactionDB::Initialize(
  77. const std::vector<size_t>& compaction_enabled_cf_indices,
  78. const std::vector<ColumnFamilyHandle*>& handles) {
  79. for (auto cf_ptr : handles) {
  80. AddColumnFamily(cf_ptr);
  81. }
  82. // Verify cf options
  83. for (auto handle : handles) {
  84. ColumnFamilyDescriptor cfd;
  85. Status s = handle->GetDescriptor(&cfd);
  86. if (!s.ok()) {
  87. return s;
  88. }
  89. s = VerifyCFOptions(cfd.options);
  90. if (!s.ok()) {
  91. return s;
  92. }
  93. }
  94. // Re-enable compaction for the column families that initially had
  95. // compaction enabled.
  96. std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
  97. compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
  98. for (auto index : compaction_enabled_cf_indices) {
  99. compaction_enabled_cf_handles.push_back(handles[index]);
  100. }
  101. Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
  102. // create 'real' transactions from recovered shell transactions
  103. auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
  104. assert(dbimpl != nullptr);
  105. auto rtrxs = dbimpl->recovered_transactions();
  106. for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
  107. auto recovered_trx = it->second;
  108. assert(recovered_trx);
  109. assert(recovered_trx->batches_.size() == 1);
  110. const auto& seq = recovered_trx->batches_.begin()->first;
  111. const auto& batch_info = recovered_trx->batches_.begin()->second;
  112. assert(batch_info.log_number_);
  113. assert(recovered_trx->name_.length());
  114. WriteOptions w_options;
  115. w_options.sync = true;
  116. TransactionOptions t_options;
  117. // This would help avoiding deadlock for keys that although exist in the WAL
  118. // did not go through concurrency control. This includes the merge that
  119. // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
  120. // there is a conflict between the keys of two transactions that must be
  121. // avoided, it is already avoided by the application, MyRocks, before the
  122. // restart (ii) application, MyRocks, guarntees to rollback/commit the
  123. // recovered transactions before new transactions start.
  124. t_options.skip_concurrency_control = true;
  125. Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
  126. assert(real_trx);
  127. real_trx->SetLogNumber(batch_info.log_number_);
  128. assert(seq != kMaxSequenceNumber);
  129. if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
  130. real_trx->SetId(seq);
  131. }
  132. s = real_trx->SetName(recovered_trx->name_);
  133. if (!s.ok()) {
  134. break;
  135. }
  136. s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
  137. // WriteCommitted set this to to disable this check that is specific to
  138. // WritePrepared txns
  139. assert(batch_info.batch_cnt_ == 0 ||
  140. real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
  141. real_trx->SetState(Transaction::PREPARED);
  142. if (!s.ok()) {
  143. break;
  144. }
  145. }
  146. if (s.ok()) {
  147. dbimpl->DeleteAllRecoveredTransactions();
  148. }
  149. return s;
  150. }
  151. Transaction* WriteCommittedTxnDB::BeginTransaction(
  152. const WriteOptions& write_options, const TransactionOptions& txn_options,
  153. Transaction* old_txn) {
  154. if (old_txn != nullptr) {
  155. ReinitializeTransaction(old_txn, write_options, txn_options);
  156. return old_txn;
  157. } else {
  158. return new WriteCommittedTxn(this, write_options, txn_options);
  159. }
  160. }
  161. TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
  162. const TransactionDBOptions& txn_db_options) {
  163. TransactionDBOptions validated = txn_db_options;
  164. if (txn_db_options.num_stripes == 0) {
  165. validated.num_stripes = 1;
  166. }
  167. return validated;
  168. }
  169. Status TransactionDB::Open(const Options& options,
  170. const TransactionDBOptions& txn_db_options,
  171. const std::string& dbname, TransactionDB** dbptr) {
  172. DBOptions db_options(options);
  173. ColumnFamilyOptions cf_options(options);
  174. std::vector<ColumnFamilyDescriptor> column_families;
  175. column_families.push_back(
  176. ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
  177. std::vector<ColumnFamilyHandle*> handles;
  178. Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
  179. column_families, &handles, dbptr);
  180. if (s.ok()) {
  181. assert(handles.size() == 1);
  182. // i can delete the handle since DBImpl is always holding a reference to
  183. // default column family
  184. delete handles[0];
  185. }
  186. return s;
  187. }
  188. Status TransactionDB::Open(
  189. const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
  190. const std::string& dbname,
  191. const std::vector<ColumnFamilyDescriptor>& column_families,
  192. std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
  193. Status s;
  194. DB* db = nullptr;
  195. if (txn_db_options.write_policy == WRITE_COMMITTED &&
  196. db_options.unordered_write) {
  197. return Status::NotSupported(
  198. "WRITE_COMMITTED is incompatible with unordered_writes");
  199. }
  200. if (txn_db_options.write_policy == WRITE_UNPREPARED &&
  201. db_options.unordered_write) {
  202. // TODO(lth): support it
  203. return Status::NotSupported(
  204. "WRITE_UNPREPARED is currently incompatible with unordered_writes");
  205. }
  206. if (txn_db_options.write_policy == WRITE_PREPARED &&
  207. db_options.unordered_write && !db_options.two_write_queues) {
  208. return Status::NotSupported(
  209. "WRITE_PREPARED is incompatible with unordered_writes if "
  210. "two_write_queues is not enabled.");
  211. }
  212. std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
  213. std::vector<size_t> compaction_enabled_cf_indices;
  214. DBOptions db_options_2pc = db_options;
  215. PrepareWrap(&db_options_2pc, &column_families_copy,
  216. &compaction_enabled_cf_indices);
  217. const bool use_seq_per_batch =
  218. txn_db_options.write_policy == WRITE_PREPARED ||
  219. txn_db_options.write_policy == WRITE_UNPREPARED;
  220. const bool use_batch_per_txn =
  221. txn_db_options.write_policy == WRITE_COMMITTED ||
  222. txn_db_options.write_policy == WRITE_PREPARED;
  223. s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
  224. use_seq_per_batch, use_batch_per_txn);
  225. if (s.ok()) {
  226. ROCKS_LOG_WARN(db->GetDBOptions().info_log,
  227. "Transaction write_policy is %" PRId32,
  228. static_cast<int>(txn_db_options.write_policy));
  229. s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
  230. dbptr);
  231. }
  232. if (!s.ok()) {
  233. // just in case it was not deleted (and not set to nullptr).
  234. delete db;
  235. }
  236. return s;
  237. }
  238. void TransactionDB::PrepareWrap(
  239. DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
  240. std::vector<size_t>* compaction_enabled_cf_indices) {
  241. compaction_enabled_cf_indices->clear();
  242. // Enable MemTable History if not already enabled
  243. for (size_t i = 0; i < column_families->size(); i++) {
  244. ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
  245. if (cf_options->max_write_buffer_size_to_maintain == 0 &&
  246. cf_options->max_write_buffer_number_to_maintain == 0) {
  247. // Setting to -1 will set the History size to
  248. // max_write_buffer_number * write_buffer_size.
  249. cf_options->max_write_buffer_size_to_maintain = -1;
  250. }
  251. if (!cf_options->disable_auto_compactions) {
  252. // Disable compactions momentarily to prevent race with DB::Open
  253. cf_options->disable_auto_compactions = true;
  254. compaction_enabled_cf_indices->push_back(i);
  255. }
  256. }
  257. db_options->allow_2pc = true;
  258. }
  259. Status TransactionDB::WrapDB(
  260. // make sure this db is already opened with memtable history enabled,
  261. // auto compaction distabled and 2 phase commit enabled
  262. DB* db, const TransactionDBOptions& txn_db_options,
  263. const std::vector<size_t>& compaction_enabled_cf_indices,
  264. const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
  265. assert(db != nullptr);
  266. assert(dbptr != nullptr);
  267. *dbptr = nullptr;
  268. std::unique_ptr<PessimisticTransactionDB> txn_db;
  269. switch (txn_db_options.write_policy) {
  270. case WRITE_UNPREPARED:
  271. txn_db.reset(new WriteUnpreparedTxnDB(
  272. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  273. break;
  274. case WRITE_PREPARED:
  275. txn_db.reset(new WritePreparedTxnDB(
  276. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  277. break;
  278. case WRITE_COMMITTED:
  279. default:
  280. txn_db.reset(new WriteCommittedTxnDB(
  281. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  282. }
  283. txn_db->UpdateCFComparatorMap(handles);
  284. Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
  285. // In case of a failure at this point, db is deleted via the txn_db destructor
  286. // and set to nullptr.
  287. if (s.ok()) {
  288. *dbptr = txn_db.release();
  289. }
  290. return s;
  291. }
  292. Status TransactionDB::WrapStackableDB(
  293. // make sure this stackable_db is already opened with memtable history
  294. // enabled, auto compaction distabled and 2 phase commit enabled
  295. StackableDB* db, const TransactionDBOptions& txn_db_options,
  296. const std::vector<size_t>& compaction_enabled_cf_indices,
  297. const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
  298. assert(db != nullptr);
  299. assert(dbptr != nullptr);
  300. *dbptr = nullptr;
  301. std::unique_ptr<PessimisticTransactionDB> txn_db;
  302. switch (txn_db_options.write_policy) {
  303. case WRITE_UNPREPARED:
  304. txn_db.reset(new WriteUnpreparedTxnDB(
  305. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  306. break;
  307. case WRITE_PREPARED:
  308. txn_db.reset(new WritePreparedTxnDB(
  309. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  310. break;
  311. case WRITE_COMMITTED:
  312. default:
  313. txn_db.reset(new WriteCommittedTxnDB(
  314. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  315. }
  316. txn_db->UpdateCFComparatorMap(handles);
  317. Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
  318. // In case of a failure at this point, db is deleted via the txn_db destructor
  319. // and set to nullptr.
  320. if (s.ok()) {
  321. *dbptr = txn_db.release();
  322. }
  323. return s;
  324. }
  325. // Let TransactionLockMgr know that this column family exists so it can
  326. // allocate a LockMap for it.
  327. void PessimisticTransactionDB::AddColumnFamily(
  328. const ColumnFamilyHandle* handle) {
  329. lock_mgr_.AddColumnFamily(handle->GetID());
  330. }
  331. Status PessimisticTransactionDB::CreateColumnFamily(
  332. const ColumnFamilyOptions& options, const std::string& column_family_name,
  333. ColumnFamilyHandle** handle) {
  334. InstrumentedMutexLock l(&column_family_mutex_);
  335. Status s = VerifyCFOptions(options);
  336. if (!s.ok()) {
  337. return s;
  338. }
  339. s = db_->CreateColumnFamily(options, column_family_name, handle);
  340. if (s.ok()) {
  341. lock_mgr_.AddColumnFamily((*handle)->GetID());
  342. UpdateCFComparatorMap(*handle);
  343. }
  344. return s;
  345. }
  346. // Let TransactionLockMgr know that it can deallocate the LockMap for this
  347. // column family.
  348. Status PessimisticTransactionDB::DropColumnFamily(
  349. ColumnFamilyHandle* column_family) {
  350. InstrumentedMutexLock l(&column_family_mutex_);
  351. Status s = db_->DropColumnFamily(column_family);
  352. if (s.ok()) {
  353. lock_mgr_.RemoveColumnFamily(column_family->GetID());
  354. }
  355. return s;
  356. }
  357. Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
  358. uint32_t cfh_id,
  359. const std::string& key,
  360. bool exclusive) {
  361. return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
  362. }
  363. void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
  364. const TransactionKeyMap* keys) {
  365. lock_mgr_.UnLock(txn, keys, GetEnv());
  366. }
  367. void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
  368. uint32_t cfh_id, const std::string& key) {
  369. lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
  370. }
  371. // Used when wrapping DB write operations in a transaction
  372. Transaction* PessimisticTransactionDB::BeginInternalTransaction(
  373. const WriteOptions& options) {
  374. TransactionOptions txn_options;
  375. Transaction* txn = BeginTransaction(options, txn_options, nullptr);
  376. // Use default timeout for non-transactional writes
  377. txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
  378. return txn;
  379. }
  380. // All user Put, Merge, Delete, and Write requests must be intercepted to make
  381. // sure that they lock all keys that they are writing to avoid causing conflicts
  382. // with any concurrent transactions. The easiest way to do this is to wrap all
  383. // write operations in a transaction.
  384. //
  385. // Put(), Merge(), and Delete() only lock a single key per call. Write() will
  386. // sort its keys before locking them. This guarantees that TransactionDB write
  387. // methods cannot deadlock with each other (but still could deadlock with a
  388. // Transaction).
  389. Status PessimisticTransactionDB::Put(const WriteOptions& options,
  390. ColumnFamilyHandle* column_family,
  391. const Slice& key, const Slice& val) {
  392. Status s;
  393. Transaction* txn = BeginInternalTransaction(options);
  394. txn->DisableIndexing();
  395. // Since the client didn't create a transaction, they don't care about
  396. // conflict checking for this write. So we just need to do PutUntracked().
  397. s = txn->PutUntracked(column_family, key, val);
  398. if (s.ok()) {
  399. s = txn->Commit();
  400. }
  401. delete txn;
  402. return s;
  403. }
  404. Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
  405. ColumnFamilyHandle* column_family,
  406. const Slice& key) {
  407. Status s;
  408. Transaction* txn = BeginInternalTransaction(wopts);
  409. txn->DisableIndexing();
  410. // Since the client didn't create a transaction, they don't care about
  411. // conflict checking for this write. So we just need to do
  412. // DeleteUntracked().
  413. s = txn->DeleteUntracked(column_family, key);
  414. if (s.ok()) {
  415. s = txn->Commit();
  416. }
  417. delete txn;
  418. return s;
  419. }
  420. Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
  421. ColumnFamilyHandle* column_family,
  422. const Slice& key) {
  423. Status s;
  424. Transaction* txn = BeginInternalTransaction(wopts);
  425. txn->DisableIndexing();
  426. // Since the client didn't create a transaction, they don't care about
  427. // conflict checking for this write. So we just need to do
  428. // SingleDeleteUntracked().
  429. s = txn->SingleDeleteUntracked(column_family, key);
  430. if (s.ok()) {
  431. s = txn->Commit();
  432. }
  433. delete txn;
  434. return s;
  435. }
  436. Status PessimisticTransactionDB::Merge(const WriteOptions& options,
  437. ColumnFamilyHandle* column_family,
  438. const Slice& key, const Slice& value) {
  439. Status s;
  440. Transaction* txn = BeginInternalTransaction(options);
  441. txn->DisableIndexing();
  442. // Since the client didn't create a transaction, they don't care about
  443. // conflict checking for this write. So we just need to do
  444. // MergeUntracked().
  445. s = txn->MergeUntracked(column_family, key, value);
  446. if (s.ok()) {
  447. s = txn->Commit();
  448. }
  449. delete txn;
  450. return s;
  451. }
  452. Status PessimisticTransactionDB::Write(const WriteOptions& opts,
  453. WriteBatch* updates) {
  454. return WriteWithConcurrencyControl(opts, updates);
  455. }
  456. Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
  457. WriteBatch* updates) {
  458. if (txn_db_options_.skip_concurrency_control) {
  459. return db_impl_->Write(opts, updates);
  460. } else {
  461. return WriteWithConcurrencyControl(opts, updates);
  462. }
  463. }
  464. Status WriteCommittedTxnDB::Write(
  465. const WriteOptions& opts,
  466. const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
  467. if (optimizations.skip_concurrency_control) {
  468. return db_impl_->Write(opts, updates);
  469. } else {
  470. return WriteWithConcurrencyControl(opts, updates);
  471. }
  472. }
  473. void PessimisticTransactionDB::InsertExpirableTransaction(
  474. TransactionID tx_id, PessimisticTransaction* tx) {
  475. assert(tx->GetExpirationTime() > 0);
  476. std::lock_guard<std::mutex> lock(map_mutex_);
  477. expirable_transactions_map_.insert({tx_id, tx});
  478. }
  479. void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
  480. std::lock_guard<std::mutex> lock(map_mutex_);
  481. expirable_transactions_map_.erase(tx_id);
  482. }
  483. bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
  484. TransactionID tx_id) {
  485. std::lock_guard<std::mutex> lock(map_mutex_);
  486. auto tx_it = expirable_transactions_map_.find(tx_id);
  487. if (tx_it == expirable_transactions_map_.end()) {
  488. return true;
  489. }
  490. PessimisticTransaction& tx = *(tx_it->second);
  491. return tx.TryStealingLocks();
  492. }
  493. void PessimisticTransactionDB::ReinitializeTransaction(
  494. Transaction* txn, const WriteOptions& write_options,
  495. const TransactionOptions& txn_options) {
  496. auto txn_impl =
  497. static_cast_with_check<PessimisticTransaction, Transaction>(txn);
  498. txn_impl->Reinitialize(this, write_options, txn_options);
  499. }
  500. Transaction* PessimisticTransactionDB::GetTransactionByName(
  501. const TransactionName& name) {
  502. std::lock_guard<std::mutex> lock(name_map_mutex_);
  503. auto it = transactions_.find(name);
  504. if (it == transactions_.end()) {
  505. return nullptr;
  506. } else {
  507. return it->second;
  508. }
  509. }
  510. void PessimisticTransactionDB::GetAllPreparedTransactions(
  511. std::vector<Transaction*>* transv) {
  512. assert(transv);
  513. transv->clear();
  514. std::lock_guard<std::mutex> lock(name_map_mutex_);
  515. for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
  516. if (it->second->GetState() == Transaction::PREPARED) {
  517. transv->push_back(it->second);
  518. }
  519. }
  520. }
  521. TransactionLockMgr::LockStatusData
  522. PessimisticTransactionDB::GetLockStatusData() {
  523. return lock_mgr_.GetLockStatusData();
  524. }
  525. std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
  526. return lock_mgr_.GetDeadlockInfoBuffer();
  527. }
  528. void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
  529. lock_mgr_.Resize(target_size);
  530. }
  531. void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
  532. assert(txn);
  533. assert(txn->GetName().length() > 0);
  534. assert(GetTransactionByName(txn->GetName()) == nullptr);
  535. assert(txn->GetState() == Transaction::STARTED);
  536. std::lock_guard<std::mutex> lock(name_map_mutex_);
  537. transactions_[txn->GetName()] = txn;
  538. }
  539. void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
  540. assert(txn);
  541. std::lock_guard<std::mutex> lock(name_map_mutex_);
  542. auto it = transactions_.find(txn->GetName());
  543. assert(it != transactions_.end());
  544. transactions_.erase(it);
  545. }
  546. } // namespace ROCKSDB_NAMESPACE
  547. #endif // ROCKSDB_LITE