pessimistic_transaction_db.cc 28 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/pessimistic_transaction_db.h"
  6. #include <cinttypes>
  7. #include <memory>
  8. #include <sstream>
  9. #include <string>
  10. #include <unordered_set>
  11. #include <vector>
  12. #include "db/db_impl/db_impl.h"
  13. #include "logging/logging.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/utilities/transaction_db.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/cast_util.h"
  19. #include "util/mutexlock.h"
  20. #include "utilities/secondary_index/secondary_index_mixin.h"
  21. #include "utilities/transactions/pessimistic_transaction.h"
  22. #include "utilities/transactions/transaction_db_mutex_impl.h"
  23. #include "utilities/transactions/write_prepared_txn_db.h"
  24. #include "utilities/transactions/write_unprepared_txn_db.h"
  25. namespace ROCKSDB_NAMESPACE {
  26. PessimisticTransactionDB::PessimisticTransactionDB(
  27. DB* db, const TransactionDBOptions& txn_db_options)
  28. : TransactionDB(db),
  29. db_impl_(static_cast_with_check<DBImpl>(db)),
  30. txn_db_options_(txn_db_options),
  31. lock_manager_(NewLockManager(this, txn_db_options)) {
  32. assert(db_impl_ != nullptr);
  33. info_log_ = db_impl_->GetDBOptions().info_log;
  34. }
  35. // Support initiliazing PessimisticTransactionDB from a stackable db
  36. //
  37. // PessimisticTransactionDB
  38. // ^ ^
  39. // | |
  40. // | +
  41. // | StackableDB
  42. // | ^
  43. // | |
  44. // + +
  45. // DBImpl
  46. // ^
  47. // |(inherit)
  48. // +
  49. // DB
  50. //
  51. PessimisticTransactionDB::PessimisticTransactionDB(
  52. StackableDB* db, const TransactionDBOptions& txn_db_options)
  53. : TransactionDB(db),
  54. db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
  55. txn_db_options_(txn_db_options),
  56. lock_manager_(NewLockManager(this, txn_db_options)) {
  57. assert(db_impl_ != nullptr);
  58. }
  59. PessimisticTransactionDB::~PessimisticTransactionDB() {
  60. while (!transactions_.empty()) {
  61. delete transactions_.begin()->second;
  62. // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
  63. // clear whether delete would also remove the entry from transactions_.
  64. }
  65. }
  66. Status PessimisticTransactionDB::VerifyCFOptions(
  67. const ColumnFamilyOptions& cf_options) {
  68. const Comparator* const ucmp = cf_options.comparator;
  69. assert(ucmp);
  70. size_t ts_sz = ucmp->timestamp_size();
  71. if (0 == ts_sz) {
  72. return Status::OK();
  73. }
  74. if (ts_sz != sizeof(TxnTimestamp)) {
  75. std::ostringstream oss;
  76. oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp)
  77. << " bytes. CF comparator " << std::string(ucmp->Name())
  78. << " timestamp size is " << ts_sz << " bytes";
  79. return Status::InvalidArgument(oss.str());
  80. }
  81. if (txn_db_options_.write_policy != WRITE_COMMITTED) {
  82. return Status::NotSupported("Only WriteCommittedTxn supports timestamp");
  83. }
  84. return Status::OK();
  85. }
  86. Status PessimisticTransactionDB::Initialize(
  87. const std::vector<size_t>& compaction_enabled_cf_indices,
  88. const std::vector<ColumnFamilyHandle*>& handles) {
  89. for (auto cf_ptr : handles) {
  90. AddColumnFamily(cf_ptr);
  91. }
  92. // Verify cf options
  93. for (auto handle : handles) {
  94. ColumnFamilyDescriptor cfd;
  95. Status s = handle->GetDescriptor(&cfd);
  96. if (!s.ok()) {
  97. return s;
  98. }
  99. s = VerifyCFOptions(cfd.options);
  100. if (!s.ok()) {
  101. return s;
  102. }
  103. }
  104. // Re-enable compaction for the column families that initially had
  105. // compaction enabled.
  106. std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
  107. compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
  108. for (auto index : compaction_enabled_cf_indices) {
  109. compaction_enabled_cf_handles.push_back(handles[index]);
  110. }
  111. Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
  112. // create 'real' transactions from recovered shell transactions
  113. auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
  114. assert(dbimpl != nullptr);
  115. auto rtrxs = dbimpl->recovered_transactions();
  116. for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
  117. auto recovered_trx = it->second;
  118. assert(recovered_trx);
  119. assert(recovered_trx->batches_.size() == 1);
  120. const auto& seq = recovered_trx->batches_.begin()->first;
  121. const auto& batch_info = recovered_trx->batches_.begin()->second;
  122. assert(batch_info.log_number_);
  123. assert(recovered_trx->name_.length());
  124. // TODO: plumb Env::IOActivity, Env::IOPriority
  125. WriteOptions w_options;
  126. w_options.sync = true;
  127. TransactionOptions t_options;
  128. // This would help avoiding deadlock for keys that although exist in the WAL
  129. // did not go through concurrency control. This includes the merge that
  130. // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
  131. // there is a conflict between the keys of two transactions that must be
  132. // avoided, it is already avoided by the application, MyRocks, before the
  133. // restart (ii) application, MyRocks, guarntees to rollback/commit the
  134. // recovered transactions before new transactions start.
  135. t_options.skip_concurrency_control = true;
  136. Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
  137. assert(real_trx);
  138. real_trx->SetLogNumber(batch_info.log_number_);
  139. assert(seq != kMaxSequenceNumber);
  140. if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
  141. real_trx->SetId(seq);
  142. }
  143. s = real_trx->SetName(recovered_trx->name_);
  144. if (!s.ok()) {
  145. break;
  146. }
  147. s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
  148. // WriteCommitted set this to to disable this check that is specific to
  149. // WritePrepared txns
  150. assert(batch_info.batch_cnt_ == 0 ||
  151. real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
  152. real_trx->SetState(Transaction::PREPARED);
  153. if (!s.ok()) {
  154. break;
  155. }
  156. }
  157. if (s.ok()) {
  158. dbimpl->DeleteAllRecoveredTransactions();
  159. }
  160. return s;
  161. }
  162. Transaction* WriteCommittedTxnDB::BeginTransaction(
  163. const WriteOptions& write_options, const TransactionOptions& txn_options,
  164. Transaction* old_txn) {
  165. if (old_txn != nullptr) {
  166. ReinitializeTransaction(old_txn, write_options, txn_options);
  167. return old_txn;
  168. } else {
  169. if (!txn_db_options_.secondary_indices.empty()) {
  170. return new SecondaryIndexMixin<WriteCommittedTxn>(
  171. &txn_db_options_.secondary_indices, this, write_options, txn_options);
  172. } else {
  173. return new WriteCommittedTxn(this, write_options, txn_options);
  174. }
  175. }
  176. }
  177. TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
  178. const TransactionDBOptions& txn_db_options) {
  179. TransactionDBOptions validated = txn_db_options;
  180. if (txn_db_options.num_stripes == 0) {
  181. validated.num_stripes = 1;
  182. }
  183. return validated;
  184. }
  185. Status TransactionDB::Open(const Options& options,
  186. const TransactionDBOptions& txn_db_options,
  187. const std::string& dbname, TransactionDB** dbptr) {
  188. DBOptions db_options(options);
  189. ColumnFamilyOptions cf_options(options);
  190. std::vector<ColumnFamilyDescriptor> column_families;
  191. column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  192. std::vector<ColumnFamilyHandle*> handles;
  193. Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
  194. column_families, &handles, dbptr);
  195. if (s.ok()) {
  196. assert(handles.size() == 1);
  197. // i can delete the handle since DBImpl is always holding a reference to
  198. // default column family
  199. delete handles[0];
  200. }
  201. return s;
  202. }
  203. Status TransactionDB::Open(
  204. const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
  205. const std::string& dbname,
  206. const std::vector<ColumnFamilyDescriptor>& column_families,
  207. std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
  208. Status s;
  209. std::unique_ptr<DB> db;
  210. if (txn_db_options.write_policy == WRITE_COMMITTED &&
  211. db_options.unordered_write) {
  212. return Status::NotSupported(
  213. "WRITE_COMMITTED is incompatible with unordered_writes");
  214. }
  215. if (txn_db_options.write_policy == WRITE_UNPREPARED &&
  216. db_options.unordered_write) {
  217. // TODO(lth): support it
  218. return Status::NotSupported(
  219. "WRITE_UNPREPARED is currently incompatible with unordered_writes");
  220. }
  221. if (txn_db_options.write_policy == WRITE_PREPARED &&
  222. db_options.unordered_write && !db_options.two_write_queues) {
  223. return Status::NotSupported(
  224. "WRITE_PREPARED is incompatible with unordered_writes if "
  225. "two_write_queues is not enabled.");
  226. }
  227. std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
  228. std::vector<size_t> compaction_enabled_cf_indices;
  229. DBOptions db_options_2pc = db_options;
  230. PrepareWrap(&db_options_2pc, &column_families_copy,
  231. &compaction_enabled_cf_indices);
  232. const bool use_seq_per_batch =
  233. txn_db_options.write_policy == WRITE_PREPARED ||
  234. txn_db_options.write_policy == WRITE_UNPREPARED;
  235. const bool use_batch_per_txn =
  236. txn_db_options.write_policy == WRITE_COMMITTED ||
  237. txn_db_options.write_policy == WRITE_PREPARED;
  238. s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
  239. use_seq_per_batch, use_batch_per_txn,
  240. /*is_retry=*/false, /*can_retry=*/nullptr);
  241. if (s.ok()) {
  242. ROCKS_LOG_WARN(db->GetDBOptions().info_log,
  243. "Transaction write_policy is %" PRId32,
  244. static_cast<int>(txn_db_options.write_policy));
  245. // if WrapDB return non-ok, db will be deleted in WrapDB() via
  246. // ~StackableDB().
  247. s = WrapDB(db.release(), txn_db_options, compaction_enabled_cf_indices,
  248. *handles, dbptr);
  249. }
  250. return s;
  251. }
  252. void TransactionDB::PrepareWrap(
  253. DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
  254. std::vector<size_t>* compaction_enabled_cf_indices) {
  255. compaction_enabled_cf_indices->clear();
  256. // Enable MemTable History if not already enabled
  257. for (size_t i = 0; i < column_families->size(); i++) {
  258. ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
  259. if (cf_options->max_write_buffer_size_to_maintain == 0) {
  260. // Setting to -1 will set the History size to
  261. // max_write_buffer_number * write_buffer_size.
  262. cf_options->max_write_buffer_size_to_maintain = -1;
  263. }
  264. if (!cf_options->disable_auto_compactions) {
  265. // Disable compactions momentarily to prevent race with DB::Open
  266. cf_options->disable_auto_compactions = true;
  267. compaction_enabled_cf_indices->push_back(i);
  268. }
  269. }
  270. db_options->allow_2pc = true;
  271. }
  272. namespace {
  273. template <typename DBType>
  274. Status WrapAnotherDBInternal(
  275. DBType* db, const TransactionDBOptions& txn_db_options,
  276. const std::vector<size_t>& compaction_enabled_cf_indices,
  277. const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
  278. assert(db != nullptr);
  279. assert(dbptr != nullptr);
  280. *dbptr = nullptr;
  281. std::unique_ptr<PessimisticTransactionDB> txn_db;
  282. // txn_db owns object pointed to by the raw db pointer.
  283. switch (txn_db_options.write_policy) {
  284. case WRITE_UNPREPARED:
  285. txn_db.reset(new WriteUnpreparedTxnDB(
  286. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  287. break;
  288. case WRITE_PREPARED:
  289. txn_db.reset(new WritePreparedTxnDB(
  290. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  291. break;
  292. case WRITE_COMMITTED:
  293. default:
  294. txn_db.reset(new WriteCommittedTxnDB(
  295. db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
  296. }
  297. txn_db->UpdateCFComparatorMap(handles);
  298. Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
  299. // In case of a failure at this point, db is deleted via the txn_db destructor
  300. // and set to nullptr.
  301. if (s.ok()) {
  302. *dbptr = txn_db.release();
  303. } else {
  304. for (auto* h : handles) {
  305. delete h;
  306. }
  307. // txn_db still owns db, and ~StackableDB() will be called when txn_db goes
  308. // out of scope, deleting the input db pointer.
  309. ROCKS_LOG_FATAL(db->GetDBOptions().info_log,
  310. "Failed to initialize txn_db: %s", s.ToString().c_str());
  311. }
  312. return s;
  313. }
  314. } // namespace
  315. Status TransactionDB::WrapDB(
  316. // make sure this db is already opened with memtable history enabled,
  317. // auto compaction distabled and 2 phase commit enabled
  318. DB* db, const TransactionDBOptions& txn_db_options,
  319. const std::vector<size_t>& compaction_enabled_cf_indices,
  320. const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
  321. return WrapAnotherDBInternal(db, txn_db_options,
  322. compaction_enabled_cf_indices, handles, dbptr);
  323. }
  324. Status TransactionDB::WrapStackableDB(
  325. // make sure this stackable_db is already opened with memtable history
  326. // enabled, auto compaction distabled and 2 phase commit enabled
  327. StackableDB* db, const TransactionDBOptions& txn_db_options,
  328. const std::vector<size_t>& compaction_enabled_cf_indices,
  329. const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
  330. return WrapAnotherDBInternal(db, txn_db_options,
  331. compaction_enabled_cf_indices, handles, dbptr);
  332. }
  333. // Let LockManager know that this column family exists so it can
  334. // allocate a LockMap for it.
  335. void PessimisticTransactionDB::AddColumnFamily(
  336. const ColumnFamilyHandle* handle) {
  337. lock_manager_->AddColumnFamily(handle);
  338. }
  339. Status PessimisticTransactionDB::CreateColumnFamily(
  340. const ColumnFamilyOptions& options, const std::string& column_family_name,
  341. ColumnFamilyHandle** handle) {
  342. InstrumentedMutexLock l(&column_family_mutex_);
  343. Status s = VerifyCFOptions(options);
  344. if (!s.ok()) {
  345. return s;
  346. }
  347. s = db_->CreateColumnFamily(options, column_family_name, handle);
  348. if (s.ok()) {
  349. lock_manager_->AddColumnFamily(*handle);
  350. UpdateCFComparatorMap(*handle);
  351. }
  352. return s;
  353. }
  354. Status PessimisticTransactionDB::CreateColumnFamilies(
  355. const ColumnFamilyOptions& options,
  356. const std::vector<std::string>& column_family_names,
  357. std::vector<ColumnFamilyHandle*>* handles) {
  358. InstrumentedMutexLock l(&column_family_mutex_);
  359. Status s = VerifyCFOptions(options);
  360. if (!s.ok()) {
  361. return s;
  362. }
  363. s = db_->CreateColumnFamilies(options, column_family_names, handles);
  364. if (s.ok()) {
  365. for (auto* handle : *handles) {
  366. lock_manager_->AddColumnFamily(handle);
  367. UpdateCFComparatorMap(handle);
  368. }
  369. }
  370. return s;
  371. }
  372. Status PessimisticTransactionDB::CreateColumnFamilies(
  373. const std::vector<ColumnFamilyDescriptor>& column_families,
  374. std::vector<ColumnFamilyHandle*>* handles) {
  375. InstrumentedMutexLock l(&column_family_mutex_);
  376. for (auto& cf_desc : column_families) {
  377. Status s = VerifyCFOptions(cf_desc.options);
  378. if (!s.ok()) {
  379. return s;
  380. }
  381. }
  382. Status s = db_->CreateColumnFamilies(column_families, handles);
  383. if (s.ok()) {
  384. for (auto* handle : *handles) {
  385. lock_manager_->AddColumnFamily(handle);
  386. UpdateCFComparatorMap(handle);
  387. }
  388. }
  389. return s;
  390. }
  391. Status PessimisticTransactionDB::CreateColumnFamilyWithImport(
  392. const ColumnFamilyOptions& options, const std::string& column_family_name,
  393. const ImportColumnFamilyOptions& import_options,
  394. const std::vector<const ExportImportFilesMetaData*>& metadatas,
  395. ColumnFamilyHandle** handle) {
  396. InstrumentedMutexLock l(&column_family_mutex_);
  397. Status s = VerifyCFOptions(options);
  398. if (!s.ok()) {
  399. return s;
  400. }
  401. s = db_->CreateColumnFamilyWithImport(options, column_family_name,
  402. import_options, metadatas, handle);
  403. if (s.ok()) {
  404. lock_manager_->AddColumnFamily(*handle);
  405. UpdateCFComparatorMap(*handle);
  406. }
  407. return s;
  408. }
  409. // Let LockManager know that it can deallocate the LockMap for this
  410. // column family.
  411. Status PessimisticTransactionDB::DropColumnFamily(
  412. ColumnFamilyHandle* column_family) {
  413. InstrumentedMutexLock l(&column_family_mutex_);
  414. Status s = db_->DropColumnFamily(column_family);
  415. if (s.ok()) {
  416. lock_manager_->RemoveColumnFamily(column_family);
  417. }
  418. return s;
  419. }
  420. Status PessimisticTransactionDB::DropColumnFamilies(
  421. const std::vector<ColumnFamilyHandle*>& column_families) {
  422. InstrumentedMutexLock l(&column_family_mutex_);
  423. Status s = db_->DropColumnFamilies(column_families);
  424. if (s.ok()) {
  425. for (auto* handle : column_families) {
  426. lock_manager_->RemoveColumnFamily(handle);
  427. }
  428. }
  429. return s;
  430. }
  431. Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
  432. uint32_t cfh_id,
  433. const std::string& key,
  434. bool exclusive) {
  435. return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
  436. }
  437. Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
  438. uint32_t cfh_id,
  439. const Endpoint& start_endp,
  440. const Endpoint& end_endp) {
  441. return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
  442. /*exclusive=*/true);
  443. }
  444. void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
  445. const LockTracker& keys) {
  446. lock_manager_->UnLock(txn, keys, GetEnv());
  447. }
  448. void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
  449. uint32_t cfh_id, const std::string& key) {
  450. lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
  451. }
  452. // Used when wrapping DB write operations in a transaction
  453. Transaction* PessimisticTransactionDB::BeginInternalTransaction(
  454. const WriteOptions& options) {
  455. TransactionOptions txn_options;
  456. Transaction* txn = BeginTransaction(options, txn_options, nullptr);
  457. // Use default timeout for non-transactional writes
  458. txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
  459. return txn;
  460. }
  461. // All user Put, PutEntity, Merge, Delete, and Write requests must be
  462. // intercepted to make sure that they lock all keys that they are writing to
  463. // avoid causing conflicts with any concurrent transactions. The easiest way to
  464. // do this is to wrap all write operations in a transaction.
  465. //
  466. // Put(), PutEntity(), Merge(), and Delete() only lock a single key per call.
  467. // Write() will sort its keys before locking them. This guarantees that
  468. // TransactionDB write methods cannot deadlock with each other (but still could
  469. // deadlock with a Transaction).
  470. Status PessimisticTransactionDB::Put(const WriteOptions& options,
  471. ColumnFamilyHandle* column_family,
  472. const Slice& key, const Slice& val) {
  473. Status s = FailIfCfEnablesTs(this, column_family);
  474. if (!s.ok()) {
  475. return s;
  476. }
  477. Transaction* txn = BeginInternalTransaction(options);
  478. txn->DisableIndexing();
  479. // Since the client didn't create a transaction, they don't care about
  480. // conflict checking for this write. So we just need to do PutUntracked().
  481. s = txn->PutUntracked(column_family, key, val);
  482. if (s.ok()) {
  483. s = txn->Commit();
  484. }
  485. delete txn;
  486. return s;
  487. }
  488. Status PessimisticTransactionDB::PutEntity(const WriteOptions& options,
  489. ColumnFamilyHandle* column_family,
  490. const Slice& key,
  491. const WideColumns& columns) {
  492. {
  493. const Status s = FailIfCfEnablesTs(this, column_family);
  494. if (!s.ok()) {
  495. return s;
  496. }
  497. }
  498. {
  499. std::unique_ptr<Transaction> txn(BeginInternalTransaction(options));
  500. txn->DisableIndexing();
  501. // Since the client didn't create a transaction, they don't care about
  502. // conflict checking for this write. So we just need to do
  503. // PutEntityUntracked().
  504. {
  505. const Status s = txn->PutEntityUntracked(column_family, key, columns);
  506. if (!s.ok()) {
  507. return s;
  508. }
  509. }
  510. {
  511. const Status s = txn->Commit();
  512. if (!s.ok()) {
  513. return s;
  514. }
  515. }
  516. }
  517. return Status::OK();
  518. }
  519. Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
  520. ColumnFamilyHandle* column_family,
  521. const Slice& key) {
  522. Status s = FailIfCfEnablesTs(this, column_family);
  523. if (!s.ok()) {
  524. return s;
  525. }
  526. Transaction* txn = BeginInternalTransaction(wopts);
  527. txn->DisableIndexing();
  528. // Since the client didn't create a transaction, they don't care about
  529. // conflict checking for this write. So we just need to do
  530. // DeleteUntracked().
  531. s = txn->DeleteUntracked(column_family, key);
  532. if (s.ok()) {
  533. s = txn->Commit();
  534. }
  535. delete txn;
  536. return s;
  537. }
  538. Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
  539. ColumnFamilyHandle* column_family,
  540. const Slice& key) {
  541. Status s = FailIfCfEnablesTs(this, column_family);
  542. if (!s.ok()) {
  543. return s;
  544. }
  545. Transaction* txn = BeginInternalTransaction(wopts);
  546. txn->DisableIndexing();
  547. // Since the client didn't create a transaction, they don't care about
  548. // conflict checking for this write. So we just need to do
  549. // SingleDeleteUntracked().
  550. s = txn->SingleDeleteUntracked(column_family, key);
  551. if (s.ok()) {
  552. s = txn->Commit();
  553. }
  554. delete txn;
  555. return s;
  556. }
  557. Status PessimisticTransactionDB::Merge(const WriteOptions& options,
  558. ColumnFamilyHandle* column_family,
  559. const Slice& key, const Slice& value) {
  560. Status s = FailIfCfEnablesTs(this, column_family);
  561. if (!s.ok()) {
  562. return s;
  563. }
  564. Transaction* txn = BeginInternalTransaction(options);
  565. txn->DisableIndexing();
  566. // Since the client didn't create a transaction, they don't care about
  567. // conflict checking for this write. So we just need to do
  568. // MergeUntracked().
  569. s = txn->MergeUntracked(column_family, key, value);
  570. if (s.ok()) {
  571. s = txn->Commit();
  572. }
  573. delete txn;
  574. return s;
  575. }
  576. Status PessimisticTransactionDB::Write(const WriteOptions& opts,
  577. WriteBatch* updates) {
  578. return WriteWithConcurrencyControl(opts, updates);
  579. }
  580. Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
  581. WriteBatch* updates) {
  582. Status s = FailIfBatchHasTs(updates);
  583. if (!s.ok()) {
  584. return s;
  585. }
  586. if (txn_db_options_.skip_concurrency_control) {
  587. return db_impl_->Write(opts, updates);
  588. } else {
  589. return WriteWithConcurrencyControl(opts, updates);
  590. }
  591. }
  592. Status WriteCommittedTxnDB::Write(
  593. const WriteOptions& opts,
  594. const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
  595. Status s = FailIfBatchHasTs(updates);
  596. if (!s.ok()) {
  597. return s;
  598. }
  599. if (optimizations.skip_concurrency_control) {
  600. return db_impl_->Write(opts, updates);
  601. } else {
  602. return WriteWithConcurrencyControl(opts, updates);
  603. }
  604. }
  605. void PessimisticTransactionDB::InsertExpirableTransaction(
  606. TransactionID tx_id, PessimisticTransaction* tx) {
  607. assert(tx->GetExpirationTime() > 0);
  608. std::lock_guard<std::mutex> lock(map_mutex_);
  609. expirable_transactions_map_.insert({tx_id, tx});
  610. }
  611. void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
  612. std::lock_guard<std::mutex> lock(map_mutex_);
  613. expirable_transactions_map_.erase(tx_id);
  614. }
  615. bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
  616. TransactionID tx_id) {
  617. std::lock_guard<std::mutex> lock(map_mutex_);
  618. auto tx_it = expirable_transactions_map_.find(tx_id);
  619. if (tx_it == expirable_transactions_map_.end()) {
  620. return true;
  621. }
  622. PessimisticTransaction& tx = *(tx_it->second);
  623. return tx.TryStealingLocks();
  624. }
  625. void PessimisticTransactionDB::ReinitializeTransaction(
  626. Transaction* txn, const WriteOptions& write_options,
  627. const TransactionOptions& txn_options) {
  628. auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
  629. txn_impl->Reinitialize(this, write_options, txn_options);
  630. }
  631. Transaction* PessimisticTransactionDB::GetTransactionByName(
  632. const TransactionName& name) {
  633. std::lock_guard<std::mutex> lock(name_map_mutex_);
  634. return GetTransactionByNameLocked(name);
  635. }
  636. Transaction* PessimisticTransactionDB::GetTransactionByNameLocked(
  637. const TransactionName& name) {
  638. auto it = transactions_.find(name);
  639. if (it == transactions_.end()) {
  640. return nullptr;
  641. } else {
  642. return it->second;
  643. }
  644. }
  645. void PessimisticTransactionDB::GetAllPreparedTransactions(
  646. std::vector<Transaction*>* transv) {
  647. assert(transv);
  648. transv->clear();
  649. std::lock_guard<std::mutex> lock(name_map_mutex_);
  650. for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
  651. if (it->second->GetState() == Transaction::PREPARED) {
  652. transv->push_back(it->second);
  653. }
  654. }
  655. }
  656. LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
  657. return lock_manager_->GetPointLockStatus();
  658. }
  659. std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
  660. return lock_manager_->GetDeadlockInfoBuffer();
  661. }
  662. void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
  663. lock_manager_->Resize(target_size);
  664. }
  665. Status PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
  666. assert(txn);
  667. assert(txn->GetName().length() > 0);
  668. assert(txn->GetState() == Transaction::STARTED);
  669. std::lock_guard<std::mutex> lock(name_map_mutex_);
  670. if (!transactions_.insert({txn->GetName(), txn}).second) {
  671. return Status::InvalidArgument("Duplicate txn name " + txn->GetName());
  672. }
  673. return Status::OK();
  674. }
  675. void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
  676. assert(txn);
  677. std::lock_guard<std::mutex> lock(name_map_mutex_);
  678. auto it = transactions_.find(txn->GetName());
  679. assert(it != transactions_.end());
  680. transactions_.erase(it);
  681. }
  682. std::pair<Status, std::shared_ptr<const Snapshot>>
  683. PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) {
  684. if (kMaxTxnTimestamp == ts) {
  685. return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr);
  686. }
  687. assert(db_impl_);
  688. return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts);
  689. }
  690. std::shared_ptr<const Snapshot>
  691. PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const {
  692. assert(db_impl_);
  693. return db_impl_->GetTimestampedSnapshot(ts);
  694. }
  695. void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan(
  696. TxnTimestamp ts) {
  697. assert(db_impl_);
  698. db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts);
  699. }
  700. Status PessimisticTransactionDB::GetTimestampedSnapshots(
  701. TxnTimestamp ts_lb, TxnTimestamp ts_ub,
  702. std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
  703. assert(db_impl_);
  704. return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots);
  705. }
  706. Status SnapshotCreationCallback::operator()(SequenceNumber seq,
  707. bool disable_memtable) {
  708. assert(db_impl_);
  709. assert(commit_ts_ != kMaxTxnTimestamp);
  710. const bool two_write_queues =
  711. db_impl_->immutable_db_options().two_write_queues;
  712. assert(!two_write_queues || !disable_memtable);
  713. #ifdef NDEBUG
  714. (void)two_write_queues;
  715. (void)disable_memtable;
  716. #endif
  717. const bool seq_per_batch = db_impl_->seq_per_batch();
  718. if (!seq_per_batch) {
  719. assert(db_impl_->GetLastPublishedSequence() <= seq);
  720. } else {
  721. assert(db_impl_->GetLastPublishedSequence() < seq);
  722. }
  723. // Create a snapshot which can also be used for write conflict checking.
  724. auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_);
  725. snapshot_creation_status_ = ret.first;
  726. snapshot_ = ret.second;
  727. if (snapshot_creation_status_.ok()) {
  728. assert(snapshot_);
  729. } else {
  730. assert(!snapshot_);
  731. }
  732. if (snapshot_ && snapshot_notifier_) {
  733. snapshot_notifier_->SnapshotCreated(snapshot_.get());
  734. }
  735. return Status::OK();
  736. }
  737. } // namespace ROCKSDB_NAMESPACE