transaction_lock_mgr.cc 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/transaction_lock_mgr.h"
  7. #include <cinttypes>
  8. #include <algorithm>
  9. #include <condition_variable>
  10. #include <functional>
  11. #include <mutex>
  12. #include <string>
  13. #include <vector>
  14. #include "monitoring/perf_context_imp.h"
  15. #include "rocksdb/slice.h"
  16. #include "rocksdb/utilities/transaction_db_mutex.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/cast_util.h"
  19. #include "util/hash.h"
  20. #include "util/thread_local.h"
  21. #include "utilities/transactions/pessimistic_transaction_db.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. struct LockInfo {
  24. bool exclusive;
  25. autovector<TransactionID> txn_ids;
  26. // Transaction locks are not valid after this time in us
  27. uint64_t expiration_time;
  28. LockInfo(TransactionID id, uint64_t time, bool ex)
  29. : exclusive(ex), expiration_time(time) {
  30. txn_ids.push_back(id);
  31. }
  32. LockInfo(const LockInfo& lock_info)
  33. : exclusive(lock_info.exclusive),
  34. txn_ids(lock_info.txn_ids),
  35. expiration_time(lock_info.expiration_time) {}
  36. };
  37. struct LockMapStripe {
  38. explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
  39. stripe_mutex = factory->AllocateMutex();
  40. stripe_cv = factory->AllocateCondVar();
  41. assert(stripe_mutex);
  42. assert(stripe_cv);
  43. }
  44. // Mutex must be held before modifying keys map
  45. std::shared_ptr<TransactionDBMutex> stripe_mutex;
  46. // Condition Variable per stripe for waiting on a lock
  47. std::shared_ptr<TransactionDBCondVar> stripe_cv;
  48. // Locked keys mapped to the info about the transactions that locked them.
  49. // TODO(agiardullo): Explore performance of other data structures.
  50. std::unordered_map<std::string, LockInfo> keys;
  51. };
  52. // Map of #num_stripes LockMapStripes
  53. struct LockMap {
  54. explicit LockMap(size_t num_stripes,
  55. std::shared_ptr<TransactionDBMutexFactory> factory)
  56. : num_stripes_(num_stripes) {
  57. lock_map_stripes_.reserve(num_stripes);
  58. for (size_t i = 0; i < num_stripes; i++) {
  59. LockMapStripe* stripe = new LockMapStripe(factory);
  60. lock_map_stripes_.push_back(stripe);
  61. }
  62. }
  63. ~LockMap() {
  64. for (auto stripe : lock_map_stripes_) {
  65. delete stripe;
  66. }
  67. }
  68. // Number of sepearate LockMapStripes to create, each with their own Mutex
  69. const size_t num_stripes_;
  70. // Count of keys that are currently locked in this column family.
  71. // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
  72. std::atomic<int64_t> lock_cnt{0};
  73. std::vector<LockMapStripe*> lock_map_stripes_;
  74. size_t GetStripe(const std::string& key) const;
  75. };
  76. void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
  77. std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
  78. if (paths_buffer_.empty()) {
  79. return;
  80. }
  81. paths_buffer_[buffer_idx_] = std::move(path);
  82. buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
  83. }
  84. void DeadlockInfoBuffer::Resize(uint32_t target_size) {
  85. std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
  86. paths_buffer_ = Normalize();
  87. // Drop the deadlocks that will no longer be needed ater the normalize
  88. if (target_size < paths_buffer_.size()) {
  89. paths_buffer_.erase(
  90. paths_buffer_.begin(),
  91. paths_buffer_.begin() + (paths_buffer_.size() - target_size));
  92. buffer_idx_ = 0;
  93. }
  94. // Resize the buffer to the target size and restore the buffer's idx
  95. else {
  96. auto prev_size = paths_buffer_.size();
  97. paths_buffer_.resize(target_size);
  98. buffer_idx_ = (uint32_t)prev_size;
  99. }
  100. }
  101. std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
  102. auto working = paths_buffer_;
  103. if (working.empty()) {
  104. return working;
  105. }
  106. // Next write occurs at a nonexistent path's slot
  107. if (paths_buffer_[buffer_idx_].empty()) {
  108. working.resize(buffer_idx_);
  109. } else {
  110. std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
  111. }
  112. return working;
  113. }
  114. std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
  115. std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
  116. // Reversing the normalized vector returns the latest deadlocks first
  117. auto working = Normalize();
  118. std::reverse(working.begin(), working.end());
  119. return working;
  120. }
  121. namespace {
  122. void UnrefLockMapsCache(void* ptr) {
  123. // Called when a thread exits or a ThreadLocalPtr gets destroyed.
  124. auto lock_maps_cache =
  125. static_cast<std::unordered_map<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
  126. delete lock_maps_cache;
  127. }
  128. } // anonymous namespace
  129. TransactionLockMgr::TransactionLockMgr(
  130. TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
  131. uint32_t max_num_deadlocks,
  132. std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
  133. : txn_db_impl_(nullptr),
  134. default_num_stripes_(default_num_stripes),
  135. max_num_locks_(max_num_locks),
  136. lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
  137. dlock_buffer_(max_num_deadlocks),
  138. mutex_factory_(mutex_factory) {
  139. assert(txn_db);
  140. txn_db_impl_ =
  141. static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
  142. }
  143. TransactionLockMgr::~TransactionLockMgr() {}
  144. size_t LockMap::GetStripe(const std::string& key) const {
  145. assert(num_stripes_ > 0);
  146. return fastrange64(GetSliceNPHash64(key), num_stripes_);
  147. }
  148. void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
  149. InstrumentedMutexLock l(&lock_map_mutex_);
  150. if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
  151. lock_maps_.emplace(column_family_id,
  152. std::make_shared<LockMap>(default_num_stripes_, mutex_factory_));
  153. } else {
  154. // column_family already exists in lock map
  155. assert(false);
  156. }
  157. }
  158. void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
  159. // Remove lock_map for this column family. Since the lock map is stored
  160. // as a shared ptr, concurrent transactions can still keep using it
  161. // until they release their references to it.
  162. {
  163. InstrumentedMutexLock l(&lock_map_mutex_);
  164. auto lock_maps_iter = lock_maps_.find(column_family_id);
  165. assert(lock_maps_iter != lock_maps_.end());
  166. lock_maps_.erase(lock_maps_iter);
  167. } // lock_map_mutex_
  168. // Clear all thread-local caches
  169. autovector<void*> local_caches;
  170. lock_maps_cache_->Scrape(&local_caches, nullptr);
  171. for (auto cache : local_caches) {
  172. delete static_cast<LockMaps*>(cache);
  173. }
  174. }
  175. // Look up the LockMap std::shared_ptr for a given column_family_id.
  176. // Note: The LockMap is only valid as long as the caller is still holding on
  177. // to the returned std::shared_ptr.
  178. std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
  179. uint32_t column_family_id) {
  180. // First check thread-local cache
  181. if (lock_maps_cache_->Get() == nullptr) {
  182. lock_maps_cache_->Reset(new LockMaps());
  183. }
  184. auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
  185. auto lock_map_iter = lock_maps_cache->find(column_family_id);
  186. if (lock_map_iter != lock_maps_cache->end()) {
  187. // Found lock map for this column family.
  188. return lock_map_iter->second;
  189. }
  190. // Not found in local cache, grab mutex and check shared LockMaps
  191. InstrumentedMutexLock l(&lock_map_mutex_);
  192. lock_map_iter = lock_maps_.find(column_family_id);
  193. if (lock_map_iter == lock_maps_.end()) {
  194. return std::shared_ptr<LockMap>(nullptr);
  195. } else {
  196. // Found lock map. Store in thread-local cache and return.
  197. std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
  198. lock_maps_cache->insert({column_family_id, lock_map});
  199. return lock_map;
  200. }
  201. }
  202. // Returns true if this lock has expired and can be acquired by another
  203. // transaction.
  204. // If false, sets *expire_time to the expiration time of the lock according
  205. // to Env->GetMicros() or 0 if no expiration.
  206. bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
  207. const LockInfo& lock_info, Env* env,
  208. uint64_t* expire_time) {
  209. auto now = env->NowMicros();
  210. bool expired =
  211. (lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
  212. if (!expired && lock_info.expiration_time > 0) {
  213. // return how many microseconds until lock will be expired
  214. *expire_time = lock_info.expiration_time;
  215. } else {
  216. for (auto id : lock_info.txn_ids) {
  217. if (txn_id == id) {
  218. continue;
  219. }
  220. bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
  221. if (!success) {
  222. expired = false;
  223. break;
  224. }
  225. *expire_time = 0;
  226. }
  227. }
  228. return expired;
  229. }
  230. Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
  231. uint32_t column_family_id,
  232. const std::string& key, Env* env,
  233. bool exclusive) {
  234. // Lookup lock map for this column family id
  235. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  236. LockMap* lock_map = lock_map_ptr.get();
  237. if (lock_map == nullptr) {
  238. char msg[255];
  239. snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
  240. column_family_id);
  241. return Status::InvalidArgument(msg);
  242. }
  243. // Need to lock the mutex for the stripe that this key hashes to
  244. size_t stripe_num = lock_map->GetStripe(key);
  245. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  246. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  247. LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
  248. int64_t timeout = txn->GetLockTimeout();
  249. return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
  250. timeout, std::move(lock_info));
  251. }
  252. // Helper function for TryLock().
  253. Status TransactionLockMgr::AcquireWithTimeout(
  254. PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
  255. uint32_t column_family_id, const std::string& key, Env* env,
  256. int64_t timeout, LockInfo&& lock_info) {
  257. Status result;
  258. uint64_t end_time = 0;
  259. if (timeout > 0) {
  260. uint64_t start_time = env->NowMicros();
  261. end_time = start_time + timeout;
  262. }
  263. if (timeout < 0) {
  264. // If timeout is negative, we wait indefinitely to acquire the lock
  265. result = stripe->stripe_mutex->Lock();
  266. } else {
  267. result = stripe->stripe_mutex->TryLockFor(timeout);
  268. }
  269. if (!result.ok()) {
  270. // failed to acquire mutex
  271. return result;
  272. }
  273. // Acquire lock if we are able to
  274. uint64_t expire_time_hint = 0;
  275. autovector<TransactionID> wait_ids;
  276. result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
  277. &expire_time_hint, &wait_ids);
  278. if (!result.ok() && timeout != 0) {
  279. PERF_TIMER_GUARD(key_lock_wait_time);
  280. PERF_COUNTER_ADD(key_lock_wait_count, 1);
  281. // If we weren't able to acquire the lock, we will keep retrying as long
  282. // as the timeout allows.
  283. bool timed_out = false;
  284. do {
  285. // Decide how long to wait
  286. int64_t cv_end_time = -1;
  287. // Check if held lock's expiration time is sooner than our timeout
  288. if (expire_time_hint > 0 &&
  289. (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
  290. // expiration time is sooner than our timeout
  291. cv_end_time = expire_time_hint;
  292. } else if (timeout >= 0) {
  293. cv_end_time = end_time;
  294. }
  295. assert(result.IsBusy() || wait_ids.size() != 0);
  296. // We are dependent on a transaction to finish, so perform deadlock
  297. // detection.
  298. if (wait_ids.size() != 0) {
  299. if (txn->IsDeadlockDetect()) {
  300. if (IncrementWaiters(txn, wait_ids, key, column_family_id,
  301. lock_info.exclusive, env)) {
  302. result = Status::Busy(Status::SubCode::kDeadlock);
  303. stripe->stripe_mutex->UnLock();
  304. return result;
  305. }
  306. }
  307. txn->SetWaitingTxn(wait_ids, column_family_id, &key);
  308. }
  309. TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
  310. if (cv_end_time < 0) {
  311. // Wait indefinitely
  312. result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
  313. } else {
  314. uint64_t now = env->NowMicros();
  315. if (static_cast<uint64_t>(cv_end_time) > now) {
  316. result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
  317. cv_end_time - now);
  318. }
  319. }
  320. if (wait_ids.size() != 0) {
  321. txn->ClearWaitingTxn();
  322. if (txn->IsDeadlockDetect()) {
  323. DecrementWaiters(txn, wait_ids);
  324. }
  325. }
  326. if (result.IsTimedOut()) {
  327. timed_out = true;
  328. // Even though we timed out, we will still make one more attempt to
  329. // acquire lock below (it is possible the lock expired and we
  330. // were never signaled).
  331. }
  332. if (result.ok() || result.IsTimedOut()) {
  333. result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
  334. &expire_time_hint, &wait_ids);
  335. }
  336. } while (!result.ok() && !timed_out);
  337. }
  338. stripe->stripe_mutex->UnLock();
  339. return result;
  340. }
  341. void TransactionLockMgr::DecrementWaiters(
  342. const PessimisticTransaction* txn,
  343. const autovector<TransactionID>& wait_ids) {
  344. std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
  345. DecrementWaitersImpl(txn, wait_ids);
  346. }
  347. void TransactionLockMgr::DecrementWaitersImpl(
  348. const PessimisticTransaction* txn,
  349. const autovector<TransactionID>& wait_ids) {
  350. auto id = txn->GetID();
  351. assert(wait_txn_map_.Contains(id));
  352. wait_txn_map_.Delete(id);
  353. for (auto wait_id : wait_ids) {
  354. rev_wait_txn_map_.Get(wait_id)--;
  355. if (rev_wait_txn_map_.Get(wait_id) == 0) {
  356. rev_wait_txn_map_.Delete(wait_id);
  357. }
  358. }
  359. }
  360. bool TransactionLockMgr::IncrementWaiters(
  361. const PessimisticTransaction* txn,
  362. const autovector<TransactionID>& wait_ids, const std::string& key,
  363. const uint32_t& cf_id, const bool& exclusive, Env* const env) {
  364. auto id = txn->GetID();
  365. std::vector<int> queue_parents(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
  366. std::vector<TransactionID> queue_values(static_cast<size_t>(txn->GetDeadlockDetectDepth()));
  367. std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
  368. assert(!wait_txn_map_.Contains(id));
  369. wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key});
  370. for (auto wait_id : wait_ids) {
  371. if (rev_wait_txn_map_.Contains(wait_id)) {
  372. rev_wait_txn_map_.Get(wait_id)++;
  373. } else {
  374. rev_wait_txn_map_.Insert(wait_id, 1);
  375. }
  376. }
  377. // No deadlock if nobody is waiting on self.
  378. if (!rev_wait_txn_map_.Contains(id)) {
  379. return false;
  380. }
  381. const auto* next_ids = &wait_ids;
  382. int parent = -1;
  383. int64_t deadlock_time = 0;
  384. for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
  385. int i = 0;
  386. if (next_ids) {
  387. for (; i < static_cast<int>(next_ids->size()) &&
  388. tail + i < txn->GetDeadlockDetectDepth();
  389. i++) {
  390. queue_values[tail + i] = (*next_ids)[i];
  391. queue_parents[tail + i] = parent;
  392. }
  393. tail += i;
  394. }
  395. // No more items in the list, meaning no deadlock.
  396. if (tail == head) {
  397. return false;
  398. }
  399. auto next = queue_values[head];
  400. if (next == id) {
  401. std::vector<DeadlockInfo> path;
  402. while (head != -1) {
  403. assert(wait_txn_map_.Contains(queue_values[head]));
  404. auto extracted_info = wait_txn_map_.Get(queue_values[head]);
  405. path.push_back({queue_values[head], extracted_info.m_cf_id,
  406. extracted_info.m_exclusive,
  407. extracted_info.m_waiting_key});
  408. head = queue_parents[head];
  409. }
  410. env->GetCurrentTime(&deadlock_time);
  411. std::reverse(path.begin(), path.end());
  412. dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
  413. deadlock_time = 0;
  414. DecrementWaitersImpl(txn, wait_ids);
  415. return true;
  416. } else if (!wait_txn_map_.Contains(next)) {
  417. next_ids = nullptr;
  418. continue;
  419. } else {
  420. parent = head;
  421. next_ids = &(wait_txn_map_.Get(next).m_neighbors);
  422. }
  423. }
  424. // Wait cycle too big, just assume deadlock.
  425. env->GetCurrentTime(&deadlock_time);
  426. dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
  427. DecrementWaitersImpl(txn, wait_ids);
  428. return true;
  429. }
  430. // Try to lock this key after we have acquired the mutex.
  431. // Sets *expire_time to the expiration time in microseconds
  432. // or 0 if no expiration.
  433. // REQUIRED: Stripe mutex must be held.
  434. Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
  435. LockMapStripe* stripe,
  436. const std::string& key, Env* env,
  437. LockInfo&& txn_lock_info,
  438. uint64_t* expire_time,
  439. autovector<TransactionID>* txn_ids) {
  440. assert(txn_lock_info.txn_ids.size() == 1);
  441. Status result;
  442. // Check if this key is already locked
  443. auto stripe_iter = stripe->keys.find(key);
  444. if (stripe_iter != stripe->keys.end()) {
  445. // Lock already held
  446. LockInfo& lock_info = stripe_iter->second;
  447. assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
  448. if (lock_info.exclusive || txn_lock_info.exclusive) {
  449. if (lock_info.txn_ids.size() == 1 &&
  450. lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
  451. // The list contains one txn and we're it, so just take it.
  452. lock_info.exclusive = txn_lock_info.exclusive;
  453. lock_info.expiration_time = txn_lock_info.expiration_time;
  454. } else {
  455. // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
  456. // it's there for a shared lock with multiple holders which was not
  457. // caught in the first case.
  458. if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
  459. expire_time)) {
  460. // lock is expired, can steal it
  461. lock_info.txn_ids = txn_lock_info.txn_ids;
  462. lock_info.exclusive = txn_lock_info.exclusive;
  463. lock_info.expiration_time = txn_lock_info.expiration_time;
  464. // lock_cnt does not change
  465. } else {
  466. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  467. *txn_ids = lock_info.txn_ids;
  468. }
  469. }
  470. } else {
  471. // We are requesting shared access to a shared lock, so just grant it.
  472. lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
  473. // Using std::max means that expiration time never goes down even when
  474. // a transaction is removed from the list. The correct solution would be
  475. // to track expiry for every transaction, but this would also work for
  476. // now.
  477. lock_info.expiration_time =
  478. std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
  479. }
  480. } else { // Lock not held.
  481. // Check lock limit
  482. if (max_num_locks_ > 0 &&
  483. lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
  484. result = Status::Busy(Status::SubCode::kLockLimit);
  485. } else {
  486. // acquire lock
  487. stripe->keys.emplace(key, std::move(txn_lock_info));
  488. // Maintain lock count if there is a limit on the number of locks
  489. if (max_num_locks_) {
  490. lock_map->lock_cnt++;
  491. }
  492. }
  493. }
  494. return result;
  495. }
  496. void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
  497. const std::string& key,
  498. LockMapStripe* stripe, LockMap* lock_map,
  499. Env* env) {
  500. #ifdef NDEBUG
  501. (void)env;
  502. #endif
  503. TransactionID txn_id = txn->GetID();
  504. auto stripe_iter = stripe->keys.find(key);
  505. if (stripe_iter != stripe->keys.end()) {
  506. auto& txns = stripe_iter->second.txn_ids;
  507. auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
  508. // Found the key we locked. unlock it.
  509. if (txn_it != txns.end()) {
  510. if (txns.size() == 1) {
  511. stripe->keys.erase(stripe_iter);
  512. } else {
  513. auto last_it = txns.end() - 1;
  514. if (txn_it != last_it) {
  515. *txn_it = *last_it;
  516. }
  517. txns.pop_back();
  518. }
  519. if (max_num_locks_ > 0) {
  520. // Maintain lock count if there is a limit on the number of locks.
  521. assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
  522. lock_map->lock_cnt--;
  523. }
  524. }
  525. } else {
  526. // This key is either not locked or locked by someone else. This should
  527. // only happen if the unlocking transaction has expired.
  528. assert(txn->GetExpirationTime() > 0 &&
  529. txn->GetExpirationTime() < env->NowMicros());
  530. }
  531. }
  532. void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
  533. uint32_t column_family_id,
  534. const std::string& key, Env* env) {
  535. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  536. LockMap* lock_map = lock_map_ptr.get();
  537. if (lock_map == nullptr) {
  538. // Column Family must have been dropped.
  539. return;
  540. }
  541. // Lock the mutex for the stripe that this key hashes to
  542. size_t stripe_num = lock_map->GetStripe(key);
  543. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  544. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  545. stripe->stripe_mutex->Lock();
  546. UnLockKey(txn, key, stripe, lock_map, env);
  547. stripe->stripe_mutex->UnLock();
  548. // Signal waiting threads to retry locking
  549. stripe->stripe_cv->NotifyAll();
  550. }
  551. void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
  552. const TransactionKeyMap* key_map, Env* env) {
  553. for (auto& key_map_iter : *key_map) {
  554. uint32_t column_family_id = key_map_iter.first;
  555. auto& keys = key_map_iter.second;
  556. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  557. LockMap* lock_map = lock_map_ptr.get();
  558. if (lock_map == nullptr) {
  559. // Column Family must have been dropped.
  560. return;
  561. }
  562. // Bucket keys by lock_map_ stripe
  563. std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
  564. std::max(keys.size(), lock_map->num_stripes_));
  565. for (auto& key_iter : keys) {
  566. const std::string& key = key_iter.first;
  567. size_t stripe_num = lock_map->GetStripe(key);
  568. keys_by_stripe[stripe_num].push_back(&key);
  569. }
  570. // For each stripe, grab the stripe mutex and unlock all keys in this stripe
  571. for (auto& stripe_iter : keys_by_stripe) {
  572. size_t stripe_num = stripe_iter.first;
  573. auto& stripe_keys = stripe_iter.second;
  574. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  575. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  576. stripe->stripe_mutex->Lock();
  577. for (const std::string* key : stripe_keys) {
  578. UnLockKey(txn, *key, stripe, lock_map, env);
  579. }
  580. stripe->stripe_mutex->UnLock();
  581. // Signal waiting threads to retry locking
  582. stripe->stripe_cv->NotifyAll();
  583. }
  584. }
  585. }
  586. TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
  587. LockStatusData data;
  588. // Lock order here is important. The correct order is lock_map_mutex_, then
  589. // for every column family ID in ascending order lock every stripe in
  590. // ascending order.
  591. InstrumentedMutexLock l(&lock_map_mutex_);
  592. std::vector<uint32_t> cf_ids;
  593. for (const auto& map : lock_maps_) {
  594. cf_ids.push_back(map.first);
  595. }
  596. std::sort(cf_ids.begin(), cf_ids.end());
  597. for (auto i : cf_ids) {
  598. const auto& stripes = lock_maps_[i]->lock_map_stripes_;
  599. // Iterate and lock all stripes in ascending order.
  600. for (const auto& j : stripes) {
  601. j->stripe_mutex->Lock();
  602. for (const auto& it : j->keys) {
  603. struct KeyLockInfo info;
  604. info.exclusive = it.second.exclusive;
  605. info.key = it.first;
  606. for (const auto& id : it.second.txn_ids) {
  607. info.ids.push_back(id);
  608. }
  609. data.insert({i, info});
  610. }
  611. }
  612. }
  613. // Unlock everything. Unlocking order is not important.
  614. for (auto i : cf_ids) {
  615. const auto& stripes = lock_maps_[i]->lock_map_stripes_;
  616. for (const auto& j : stripes) {
  617. j->stripe_mutex->UnLock();
  618. }
  619. }
  620. return data;
  621. }
  622. std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
  623. return dlock_buffer_.PrepareBuffer();
  624. }
  625. void TransactionLockMgr::Resize(uint32_t target_size) {
  626. dlock_buffer_.Resize(target_size);
  627. }
  628. } // namespace ROCKSDB_NAMESPACE
  629. #endif // ROCKSDB_LITE