point_lock_manager.cc 66 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/lock/point/point_lock_manager.h"
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <mutex>
  9. #include "monitoring/perf_context_imp.h"
  10. #include "rocksdb/slice.h"
  11. #include "rocksdb/utilities/transaction_db_mutex.h"
  12. #include "test_util/sync_point.h"
  13. #include "util/hash.h"
  14. #include "util/thread_local.h"
  15. #include "utilities/transactions/pessimistic_transaction_db.h"
  16. #include "utilities/transactions/transaction_db_mutex_impl.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. constexpr bool kDebugLog = false;
  19. // KeyLockWaiter represents a waiter for a key lock. It contains a conditional
  20. // variable to allow waiter to wait for the key lock. It also contains other
  21. // metadata about the waiter such as transaction id, lock type etc.
  22. struct KeyLockWaiter {
  23. KeyLockWaiter(std::shared_ptr<TransactionDBCondVar> c, TransactionID i,
  24. bool ex)
  25. : id(i), exclusive(ex), ready(false), cv(std::move(c)) {}
  26. // disable copy constructor and assignment operator, move and move
  27. // assignment
  28. KeyLockWaiter(const KeyLockWaiter&) = delete;
  29. KeyLockWaiter& operator=(const KeyLockWaiter&) = delete;
  30. KeyLockWaiter(KeyLockWaiter&&) = delete;
  31. KeyLockWaiter& operator=(KeyLockWaiter&&) = delete;
  32. ~KeyLockWaiter() = default;
  33. // Reset the waiter to be used again
  34. void Reset(TransactionID i, bool e) {
  35. id = i;
  36. exclusive = e;
  37. ready = false;
  38. }
  39. // Check whether the waiter has been notified that it is its turn to take the
  40. // lock
  41. bool IsReady() const { return ready; }
  42. // Wait until its turn to take the lock forever
  43. Status Wait(std::shared_ptr<TransactionDBMutex>& mutex) {
  44. // Mutex is already locked by caller
  45. // Check ready flag before wait
  46. if (ready) {
  47. return Status::OK();
  48. }
  49. return AfterWait(cv->Wait(mutex));
  50. }
  51. // Wait until its turn to take the lock within timeout_us
  52. Status WaitFor(std::shared_ptr<TransactionDBMutex>& mutex,
  53. int64_t timeout_us) {
  54. // Mutex is already locked by caller
  55. // Check ready flag before wait
  56. if (ready) {
  57. return Status::OK();
  58. }
  59. return AfterWait(cv->WaitFor(mutex, timeout_us));
  60. }
  61. // Notify the waiter to take the lock
  62. void Notify() {
  63. // Mutex is already locked by caller
  64. ready = true;
  65. cv->Notify();
  66. }
  67. TransactionID id;
  68. bool exclusive;
  69. private:
  70. Status AfterWait(Status wait_result) {
  71. if (wait_result.ok() || wait_result.IsTimedOut()) {
  72. // check ready again after wake up.
  73. if (ready) {
  74. return Status::OK();
  75. } else {
  76. return Status::TimedOut(Status::SubCode::kMutexTimeout);
  77. }
  78. } else {
  79. return wait_result;
  80. }
  81. }
  82. // Track whether the waiter has been woken up explicitly.
  83. bool ready;
  84. // TODO(Xingbo), Switch to std::binary_semaphore, once we have c++20
  85. // semaphore is likely more performant than mutex + cv.
  86. // Although we will also need to implement TransactionDBSemaphore, which would
  87. // be required if external system wants to do instrumented lock wait tracking
  88. std::shared_ptr<TransactionDBCondVar> cv;
  89. };
  90. struct LockInfo {
  91. LockInfo(TransactionID id, uint64_t time, bool ex)
  92. : exclusive(ex), expiration_time(time) {
  93. txn_ids.push_back(id);
  94. }
  95. DECLARE_DEFAULT_MOVES(LockInfo);
  96. bool exclusive;
  97. autovector<TransactionID> txn_ids;
  98. // Transaction locks are not valid after this time in us
  99. uint64_t expiration_time;
  100. // waiter queue for this key
  101. // TODO xingbo, use intrusive list to avoid extra memory allocation
  102. std::unique_ptr<std::list<KeyLockWaiter*>> waiter_queue;
  103. };
  104. // Print debug info for lock waiter wake up action.
  105. void DebugWakeUpWaiter(TransactionID txn_id, TransactionID waiter_id,
  106. const std::string& key, const std::string& msg) {
  107. if (kDebugLog) {
  108. // print which waiter got woken up
  109. fprintf(stderr,
  110. "Txn %" PRIu64 ": wake up next waiter on %s Txn %" PRIu64
  111. " on key %s\n",
  112. txn_id, msg.c_str(), waiter_id, key.c_str());
  113. fflush(stderr);
  114. }
  115. }
  116. // Key lock waiter context, used for free the lock automatically
  117. struct KeyLockWaiterContext {
  118. // When a lock waiter is aborted due to dead lock or time out, this function
  119. // is used to wake up the waiters after it, if they could proceed.
  120. void TryWakeUpNextWaiters(const LockInfo& lock_info, const std::string& key) {
  121. if (waiter_queue != nullptr && lock_waiter != waiter_queue->end()) {
  122. bool wake_up_next_shared_waiters = false;
  123. if (lock_waiter == waiter_queue->begin()) {
  124. // if lock waiter is at the head of the queue, check the current lock
  125. // status. If it is exclusive lock, no waiter should be woken up. other
  126. // wise, try to wake up shared lock waiters on the right side of itself.
  127. wake_up_next_shared_waiters = !lock_info.exclusive;
  128. } else {
  129. // if lock waiter is not at the head of the queue, check the previous
  130. // lock status. If it is active and shared, it should try to wake up the
  131. // shared lock waiter on the right side of itself.
  132. auto lock_waiter_prev = lock_waiter;
  133. lock_waiter_prev--;
  134. wake_up_next_shared_waiters =
  135. (*lock_waiter_prev)->IsReady() && !(*lock_waiter_prev)->exclusive;
  136. }
  137. if (wake_up_next_shared_waiters) {
  138. // Go through all the waiters on the right side of the lock waiter and
  139. // wake up the shared lock waiter until the end of the queue or
  140. // encountered an exclusive lock waiter.
  141. auto lock_waiter_next = lock_waiter;
  142. lock_waiter_next++;
  143. while (lock_waiter_next != waiter_queue->end() &&
  144. !(*lock_waiter_next)->exclusive) {
  145. (*lock_waiter_next)->Notify();
  146. DebugWakeUpWaiter((*lock_waiter)->id, (*lock_waiter_next)->id, key,
  147. "TryWakeUpNextWaiters");
  148. lock_waiter_next++;
  149. }
  150. }
  151. }
  152. }
  153. ~KeyLockWaiterContext() {
  154. if (waiter_queue != nullptr && lock_waiter != waiter_queue->end()) {
  155. waiter_queue->erase(lock_waiter);
  156. lock_waiter = waiter_queue->end();
  157. }
  158. waiter_queue = nullptr;
  159. }
  160. // The waiter queue the lock waiter joined. Used for remove the waiter from
  161. // the waiter queue.
  162. std::list<KeyLockWaiter*>* waiter_queue = nullptr;
  163. // The stable iterator that tracks the position of the waiter in the waiter
  164. // queue. Used for remove the waiter from the waiter queue.
  165. std::list<KeyLockWaiter*>::iterator lock_waiter;
  166. };
  167. struct LockMapStripe {
  168. explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory,
  169. ThreadLocalPtr& key_lock_waiter)
  170. : mutex_factory_(std::move(factory)), key_lock_waiter_(key_lock_waiter) {
  171. stripe_mutex = mutex_factory_->AllocateMutex();
  172. stripe_cv = mutex_factory_->AllocateCondVar();
  173. assert(stripe_mutex);
  174. assert(stripe_cv);
  175. }
  176. LockInfo* GetLockInfo(const std::string& key) {
  177. auto lock_info_iter = keys.find(key);
  178. if (lock_info_iter != keys.end()) {
  179. return &lock_info_iter->second;
  180. } else {
  181. return nullptr;
  182. }
  183. }
  184. // Wait until its turn to take the lock of this key within timeout_us.
  185. // By default timeout_us == 0, which means wait forever
  186. void JoinWaitQueue(LockInfo& lock_info, TransactionID id, bool exclusive,
  187. bool isUpgrade, KeyLockWaiterContext& waiter_context) {
  188. if (lock_info.waiter_queue == nullptr) {
  189. // no waiter queue yet, create a new one
  190. lock_info.waiter_queue = std::make_unique<std::list<KeyLockWaiter*>>();
  191. }
  192. auto waiter_queue = lock_info.waiter_queue.get();
  193. // by default insert the new lock waiter at the end of the queue.
  194. auto insert_point = waiter_queue->end();
  195. if (isUpgrade) {
  196. // If transaction is upgrading a shared lock to exclusive lock, prioritize
  197. // it by moving its lock waiter before the first exclusive lock in the
  198. // queue if there is one, or end of the queue if not exist. It will be
  199. // able to acquire the lock after the other shared locks waiters at the
  200. // front of queue acquired and released locks. This reduces the chance of
  201. // deadlock, which makes transaction run more efficiently.
  202. if (waiter_context.waiter_queue != nullptr) {
  203. // If waiter_context is already initialized, it means current
  204. // transaction already joined the lock queue. Don't move the lock
  205. // position if it is already at the head of the queue or the lock
  206. // waiters before it are ready to take the lock.
  207. if (waiter_context.lock_waiter == waiter_queue->begin()) {
  208. return;
  209. }
  210. auto prev_lock_waiter = waiter_context.lock_waiter;
  211. prev_lock_waiter--;
  212. if ((*prev_lock_waiter)->IsReady()) {
  213. return;
  214. }
  215. // Remove existing lock waiter
  216. waiter_queue->erase(waiter_context.lock_waiter);
  217. }
  218. // For upgrade, insert waiter either at the end of the queue or before the
  219. // first exlusive lock waiter.
  220. insert_point = waiter_queue->begin();
  221. while ((insert_point != waiter_queue->end()) &&
  222. (!(*insert_point)->exclusive)) {
  223. insert_point++;
  224. }
  225. }
  226. // Insert the new lock waiter
  227. waiter_context.lock_waiter =
  228. waiter_queue->insert(insert_point, GetKeyLockWaiter(id, exclusive));
  229. waiter_context.waiter_queue = waiter_queue;
  230. }
  231. // Wait on an existing KeyLockWaiter until its turn to take the lock or
  232. // timeout
  233. Status WaitOnLock(std::list<KeyLockWaiter*>::iterator& lock_waiter,
  234. int64_t timeout_us = 0) {
  235. Status ret;
  236. if (timeout_us == 0) {
  237. ret = (*lock_waiter)->Wait(stripe_mutex);
  238. } else {
  239. ret = (*lock_waiter)->WaitFor(stripe_mutex, timeout_us);
  240. }
  241. return ret;
  242. }
  243. void ReleaseLastLockHolder(
  244. LockInfo& lock_info,
  245. UnorderedMap<std::string, LockInfo>::iterator stripe_iter,
  246. LockMap* lock_map, TransactionID txn_id, const std::string& key,
  247. const int64_t max_num_locks, autovector<TransactionID>& txns,
  248. autovector<TransactionID>::iterator& txn_it);
  249. // Mutex must be held before modifying keys map
  250. std::shared_ptr<TransactionDBMutex> stripe_mutex;
  251. // Condition Variable per stripe for waiting on a lock
  252. std::shared_ptr<TransactionDBCondVar> stripe_cv;
  253. // Locked keys mapped to the info about the transactions that locked them.
  254. // TODO(agiardullo): Explore performance of other data structures.
  255. UnorderedMap<std::string, LockInfo> keys;
  256. private:
  257. std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
  258. // key lock waiter, wrapped in thread local for reusing it across
  259. // transactions.
  260. ThreadLocalPtr& key_lock_waiter_;
  261. // Return key lock waiter stored in thread local var, create on first use
  262. KeyLockWaiter* GetKeyLockWaiter(TransactionID id, bool exclusive) {
  263. KeyLockWaiter* waiter = nullptr;
  264. if (key_lock_waiter_.Get() == nullptr) {
  265. // create key lock waiter
  266. key_lock_waiter_.Reset(
  267. new KeyLockWaiter(mutex_factory_->AllocateCondVar(), id, exclusive));
  268. waiter = static_cast<KeyLockWaiter*>(key_lock_waiter_.Get());
  269. } else {
  270. waiter = static_cast<KeyLockWaiter*>(key_lock_waiter_.Get());
  271. waiter->Reset(id, exclusive);
  272. }
  273. return waiter;
  274. }
  275. };
  276. // Map of #num_stripes LockMapStripes
  277. struct LockMap {
  278. explicit LockMap(size_t num_stripes,
  279. std::shared_ptr<TransactionDBMutexFactory> factory,
  280. ThreadLocalPtr& key_lock_waiter)
  281. : num_stripes_(num_stripes), key_lock_waiter_(key_lock_waiter) {
  282. lock_map_stripes_.reserve(num_stripes);
  283. for (size_t i = 0; i < num_stripes; i++) {
  284. LockMapStripe* stripe = new LockMapStripe(factory, key_lock_waiter_);
  285. lock_map_stripes_.push_back(stripe);
  286. }
  287. }
  288. ~LockMap() {
  289. for (auto stripe : lock_map_stripes_) {
  290. delete stripe;
  291. }
  292. // Validate total locked key count is 0, when lock map is destructed.
  293. assert(locked_key_cnt.LoadRelaxed() == 0);
  294. }
  295. // Number of sepearate LockMapStripes to create, each with their own Mutex
  296. const size_t num_stripes_;
  297. ThreadLocalPtr& key_lock_waiter_;
  298. // Count of keys that are currently locked in this column family.
  299. // Note that multiple shared locks on the same key is counted as 1 lock.
  300. // (Only maintained if PointLockManager::max_num_locks_ is positive.)
  301. RelaxedAtomic<int64_t> locked_key_cnt{0};
  302. std::vector<LockMapStripe*> lock_map_stripes_;
  303. size_t GetStripe(const std::string& key) const;
  304. };
  305. inline void RemoveTransaction(autovector<TransactionID>& txns,
  306. autovector<TransactionID>::iterator& txn_it) {
  307. if (txns.size() > 1) {
  308. auto last_it = txns.end() - 1;
  309. if (txn_it != last_it) {
  310. *txn_it = *last_it;
  311. }
  312. }
  313. txns.pop_back();
  314. }
  315. void LockMapStripe::ReleaseLastLockHolder(
  316. LockInfo& lock_info,
  317. UnorderedMap<std::string, LockInfo>::iterator stripe_iter,
  318. LockMap* lock_map, TransactionID txn_id, const std::string& key,
  319. const int64_t max_num_locks, autovector<TransactionID>& txns,
  320. autovector<TransactionID>::iterator& txn_it) {
  321. // check whether there is other waiting transactions
  322. if (lock_info.waiter_queue == nullptr || lock_info.waiter_queue->empty()) {
  323. keys.erase(stripe_iter);
  324. if (max_num_locks > 0) {
  325. // Maintain lock count if there is a limit on the number of
  326. // locks.
  327. assert(lock_map->locked_key_cnt.LoadRelaxed() > 0);
  328. lock_map->locked_key_cnt.FetchSubRelaxed(1);
  329. }
  330. } else {
  331. // there are waiters in the queue, so we need to wake the next
  332. // one up
  333. RemoveTransaction(txns, txn_it);
  334. // loop through the waiter queue and wake up all the shared lock
  335. // waiters until the first exclusive lock waiter, or wake up the
  336. // first waiter, if it is waiting for an exclusive lock.
  337. bool first_waiter = true;
  338. for (auto& waiter : *lock_info.waiter_queue) {
  339. if (waiter->exclusive) {
  340. if (first_waiter) {
  341. // the first waiter is an exclusive lock waiter, wake it
  342. // up Note that they are only notified, but not removed
  343. // from the waiter queue. This allows new transaction to
  344. // be aware that there are waiters ahead of them.
  345. waiter->Notify();
  346. DebugWakeUpWaiter(txn_id, waiter->id, key, "UnlockKey X waiter");
  347. }
  348. // found the first exclusive lock waiter, stop
  349. break;
  350. } else {
  351. // wake up the shared lock waiter
  352. waiter->Notify();
  353. DebugWakeUpWaiter(txn_id, waiter->id, key, "UnlockKey S waiter");
  354. }
  355. first_waiter = false;
  356. }
  357. }
  358. }
  359. namespace {
  360. void UnrefLockMapsCache(void* ptr) {
  361. // Called when a thread exits or a ThreadLocalPtr gets destroyed.
  362. auto lock_maps_cache =
  363. static_cast<UnorderedMap<uint32_t, std::shared_ptr<LockMap>>*>(ptr);
  364. delete lock_maps_cache;
  365. }
  366. void UnrefKeyLockWaiter(void* ptr) {
  367. auto key_lock_waiter = static_cast<KeyLockWaiter*>(ptr);
  368. delete key_lock_waiter;
  369. }
  370. } // anonymous namespace
  371. PointLockManager::PointLockManager(PessimisticTransactionDB* txn_db,
  372. const TransactionDBOptions& opt)
  373. : txn_db_impl_(txn_db),
  374. default_num_stripes_(opt.num_stripes),
  375. max_num_locks_(opt.max_num_locks),
  376. lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
  377. key_lock_waiter_(&UnrefKeyLockWaiter),
  378. dlock_buffer_(opt.max_num_deadlocks),
  379. mutex_factory_(opt.custom_mutex_factory
  380. ? opt.custom_mutex_factory
  381. : std::make_shared<TransactionDBMutexFactoryImpl>()) {}
  382. size_t LockMap::GetStripe(const std::string& key) const {
  383. assert(num_stripes_ > 0);
  384. return FastRange64(GetSliceNPHash64(key), num_stripes_);
  385. }
  386. void PointLockManager::AddColumnFamily(const ColumnFamilyHandle* cf) {
  387. InstrumentedMutexLock l(&lock_map_mutex_);
  388. if (lock_maps_.find(cf->GetID()) == lock_maps_.end()) {
  389. lock_maps_.emplace(cf->GetID(), std::make_shared<LockMap>(
  390. default_num_stripes_, mutex_factory_,
  391. key_lock_waiter_));
  392. } else {
  393. // column_family already exists in lock map
  394. assert(false);
  395. }
  396. }
  397. void PointLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cf) {
  398. // Remove lock_map for this column family. Since the lock map is stored
  399. // as a shared ptr, concurrent transactions can still keep using it
  400. // until they release their references to it.
  401. {
  402. InstrumentedMutexLock l(&lock_map_mutex_);
  403. auto lock_maps_iter = lock_maps_.find(cf->GetID());
  404. if (lock_maps_iter == lock_maps_.end()) {
  405. return;
  406. }
  407. lock_maps_.erase(lock_maps_iter);
  408. } // lock_map_mutex_
  409. // Clear all thread-local caches
  410. autovector<void*> local_caches;
  411. lock_maps_cache_->Scrape(&local_caches, nullptr);
  412. for (auto cache : local_caches) {
  413. delete static_cast<LockMaps*>(cache);
  414. }
  415. }
  416. // Look up the LockMap std::shared_ptr for a given column_family_id.
  417. // Note: The LockMap is only valid as long as the caller is still holding on
  418. // to the returned std::shared_ptr.
  419. std::shared_ptr<LockMap> PointLockManager::GetLockMap(
  420. ColumnFamilyId column_family_id) {
  421. // First check thread-local cache
  422. if (lock_maps_cache_->Get() == nullptr) {
  423. lock_maps_cache_->Reset(new LockMaps());
  424. }
  425. auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
  426. auto lock_map_iter = lock_maps_cache->find(column_family_id);
  427. if (lock_map_iter != lock_maps_cache->end()) {
  428. // Found lock map for this column family.
  429. return lock_map_iter->second;
  430. }
  431. // Not found in local cache, grab mutex and check shared LockMaps
  432. InstrumentedMutexLock l(&lock_map_mutex_);
  433. lock_map_iter = lock_maps_.find(column_family_id);
  434. if (lock_map_iter == lock_maps_.end()) {
  435. return std::shared_ptr<LockMap>(nullptr);
  436. } else {
  437. // Found lock map. Store in thread-local cache and return.
  438. std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
  439. lock_maps_cache->insert({column_family_id, lock_map});
  440. return lock_map;
  441. }
  442. }
  443. // Returns true if this lock has expired and can be acquired by another
  444. // transaction.
  445. // If false, sets *expire_time to the expiration time of the lock according
  446. // to Env->GetMicros() or 0 if no expiration.
  447. bool PointLockManager::IsLockExpired(TransactionID txn_id,
  448. const LockInfo& lock_info, Env* env,
  449. uint64_t* expire_time) {
  450. if (lock_info.expiration_time == 0) {
  451. *expire_time = 0;
  452. return false;
  453. }
  454. auto now = env->NowMicros();
  455. bool expired = lock_info.expiration_time <= now;
  456. if (!expired) {
  457. // return how many microseconds until lock will be expired
  458. *expire_time = lock_info.expiration_time;
  459. } else {
  460. for (auto id : lock_info.txn_ids) {
  461. if (txn_id == id) {
  462. continue;
  463. }
  464. bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
  465. if (!success) {
  466. expired = false;
  467. *expire_time = 0;
  468. break;
  469. }
  470. }
  471. }
  472. return expired;
  473. }
  474. Status PointLockManager::TryLock(PessimisticTransaction* txn,
  475. ColumnFamilyId column_family_id,
  476. const std::string& key, Env* env,
  477. bool exclusive) {
  478. // Lookup lock map for this column family id
  479. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  480. LockMap* lock_map = lock_map_ptr.get();
  481. if (lock_map == nullptr) {
  482. char msg[255];
  483. snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
  484. column_family_id);
  485. return Status::InvalidArgument(msg);
  486. }
  487. // Need to lock the mutex for the stripe that this key hashes to
  488. size_t stripe_num = lock_map->GetStripe(key);
  489. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  490. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  491. LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
  492. int64_t timeout = txn->GetLockTimeout();
  493. int64_t deadlock_timeout_us = txn->GetDeadlockTimeout();
  494. return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
  495. timeout, deadlock_timeout_us, lock_info);
  496. }
  497. // Helper function for TryLock().
  498. Status PointLockManager::AcquireWithTimeout(
  499. PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
  500. ColumnFamilyId column_family_id, const std::string& key, Env* env,
  501. int64_t timeout, int64_t /*deadlock_timeout_us*/,
  502. const LockInfo& lock_info) {
  503. Status result;
  504. uint64_t end_time = 0;
  505. if (timeout > 0) {
  506. uint64_t start_time = env->NowMicros();
  507. end_time = start_time + timeout;
  508. }
  509. if (timeout < 0) {
  510. // If timeout is negative, we wait indefinitely to acquire the lock
  511. result = stripe->stripe_mutex->Lock();
  512. } else {
  513. result = stripe->stripe_mutex->TryLockFor(timeout);
  514. }
  515. if (!result.ok()) {
  516. // failed to acquire mutex
  517. return result;
  518. }
  519. // Acquire lock if we are able to
  520. uint64_t expire_time_hint = 0;
  521. autovector<TransactionID> wait_ids;
  522. result = AcquireLocked(lock_map, stripe, key, env, lock_info,
  523. &expire_time_hint, &wait_ids);
  524. if (!result.ok() && timeout != 0) {
  525. PERF_TIMER_GUARD(key_lock_wait_time);
  526. PERF_COUNTER_ADD(key_lock_wait_count, 1);
  527. // If we weren't able to acquire the lock, we will keep retrying as long
  528. // as the timeout allows.
  529. bool timed_out = false;
  530. bool cv_wait_fail = false;
  531. do {
  532. // Decide how long to wait
  533. int64_t cv_end_time = -1;
  534. if (expire_time_hint > 0 && end_time > 0) {
  535. cv_end_time = std::min(expire_time_hint, end_time);
  536. } else if (expire_time_hint > 0) {
  537. cv_end_time = expire_time_hint;
  538. } else if (end_time > 0) {
  539. cv_end_time = end_time;
  540. }
  541. assert(result.IsLockLimit() == wait_ids.empty());
  542. // We are dependent on a transaction to finish, so perform deadlock
  543. // detection.
  544. if (wait_ids.size() != 0) {
  545. if (txn->IsDeadlockDetect()) {
  546. if (IncrementWaiters(txn, wait_ids, key, column_family_id,
  547. lock_info.exclusive, env)) {
  548. result = Status::Busy(Status::SubCode::kDeadlock);
  549. stripe->stripe_mutex->UnLock();
  550. return result;
  551. }
  552. }
  553. txn->SetWaitingTxn(wait_ids, column_family_id, &key);
  554. }
  555. TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn");
  556. if (cv_end_time < 0) {
  557. // Wait indefinitely
  558. result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
  559. cv_wait_fail = !result.ok();
  560. } else {
  561. // FIXME: in this case, cv_end_time could be `expire_time_hint` from the
  562. // current lock holder, a time out does not mean we reached the current
  563. // transaction's timeout, and we should continue to retry locking
  564. // instead of exiting this while loop below.
  565. uint64_t now = env->NowMicros();
  566. if (static_cast<uint64_t>(cv_end_time) > now) {
  567. result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
  568. cv_end_time - now);
  569. cv_wait_fail = !result.ok() && !result.IsTimedOut();
  570. } else {
  571. // now >= cv_end_time, we already timed out
  572. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  573. }
  574. }
  575. if (wait_ids.size() != 0) {
  576. txn->ClearWaitingTxn();
  577. if (txn->IsDeadlockDetect()) {
  578. DecrementWaiters(txn, wait_ids);
  579. }
  580. }
  581. if (cv_wait_fail) {
  582. break;
  583. }
  584. if (result.IsTimedOut()) {
  585. timed_out = true;
  586. // Even though we timed out, we will still make one more attempt to
  587. // acquire lock below (it is possible the lock expired and we
  588. // were never signaled).
  589. }
  590. assert(result.ok() || result.IsTimedOut());
  591. wait_ids.clear();
  592. result = AcquireLocked(lock_map, stripe, key, env, lock_info,
  593. &expire_time_hint, &wait_ids);
  594. } while (!result.ok() && !timed_out);
  595. }
  596. stripe->stripe_mutex->UnLock();
  597. // On timeout, persist the lock information so we can debug the contention
  598. if (result.IsTimedOut()) {
  599. txn->SetWaitingTxn(wait_ids, column_family_id, &key, true);
  600. }
  601. return result;
  602. }
  603. // Try to lock this key after we have acquired the mutex.
  604. // Sets *expire_time to the expiration time in microseconds
  605. // or 0 if no expiration.
  606. //
  607. // Returns Status::TimeOut if the lock cannot be acquired due to it being
  608. // held by other transactions, `txn_ids` will be populated with the id of
  609. // transactions that hold the lock, excluding lock_info.txn_ids[0].
  610. // Returns Status::Aborted(kLockLimit) if the lock cannot be acquired due to
  611. // reaching per CF limit on the number of locks.
  612. //
  613. // REQUIRED: Stripe mutex must be held. txn_ids must be empty.
  614. Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
  615. const std::string& key, Env* env,
  616. const LockInfo& txn_lock_info,
  617. uint64_t* expire_time,
  618. autovector<TransactionID>* txn_ids) {
  619. assert(txn_lock_info.txn_ids.size() == 1);
  620. assert(txn_ids && txn_ids->empty());
  621. Status result;
  622. // Check if this key is already locked
  623. auto stripe_iter = stripe->keys.find(key);
  624. if (stripe_iter != stripe->keys.end()) {
  625. // Lock already held
  626. auto& lock_info = stripe_iter->second;
  627. assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
  628. if (lock_info.exclusive || txn_lock_info.exclusive) {
  629. if (lock_info.txn_ids.size() == 1 &&
  630. lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
  631. // The list contains one txn and we're it, so just take it.
  632. lock_info.exclusive = txn_lock_info.exclusive;
  633. lock_info.expiration_time = txn_lock_info.expiration_time;
  634. } else {
  635. // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
  636. // it's there for a shared lock with multiple holders which was not
  637. // caught in the first case.
  638. if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
  639. expire_time)) {
  640. // lock is expired, can steal it
  641. lock_info.txn_ids = txn_lock_info.txn_ids;
  642. lock_info.exclusive = txn_lock_info.exclusive;
  643. lock_info.expiration_time = txn_lock_info.expiration_time;
  644. // lock_cnt does not change
  645. } else {
  646. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  647. for (auto id : lock_info.txn_ids) {
  648. // A transaction is not blocked by itself
  649. if (id != txn_lock_info.txn_ids[0]) {
  650. txn_ids->push_back(id);
  651. }
  652. }
  653. }
  654. }
  655. } else {
  656. // We are requesting shared access to a shared lock, so just grant it.
  657. lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
  658. // Using std::max means that expiration time never goes down even when
  659. // a transaction is removed from the list. The correct solution would be
  660. // to track expiry for every transaction, but this would also work for
  661. // now.
  662. lock_info.expiration_time =
  663. std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
  664. }
  665. } else {
  666. // Lock not held.
  667. // Check lock limit
  668. if (max_num_locks_ > 0 &&
  669. lock_map->locked_key_cnt.LoadRelaxed() >= max_num_locks_) {
  670. result = Status::LockLimit();
  671. } else {
  672. // acquire lock
  673. stripe->keys.try_emplace(key, txn_lock_info.txn_ids[0],
  674. txn_lock_info.expiration_time,
  675. txn_lock_info.exclusive);
  676. // Maintain lock count if there is a limit on the number of locks
  677. if (max_num_locks_ > 0) {
  678. lock_map->locked_key_cnt.FetchAddRelaxed(1);
  679. }
  680. }
  681. }
  682. return result;
  683. }
  684. void PointLockManager::UnLockKey(PessimisticTransaction* txn,
  685. const std::string& key, LockMapStripe* stripe,
  686. LockMap* lock_map, Env* env) {
  687. (void)env;
  688. TransactionID txn_id = txn->GetID();
  689. auto stripe_iter = stripe->keys.find(key);
  690. if (stripe_iter != stripe->keys.end()) {
  691. auto& txns = stripe_iter->second.txn_ids;
  692. auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
  693. // Found the key we locked. unlock it.
  694. if (txn_it != txns.end()) {
  695. if (txns.size() == 1) {
  696. stripe->keys.erase(stripe_iter);
  697. } else {
  698. auto last_it = txns.end() - 1;
  699. if (txn_it != last_it) {
  700. *txn_it = *last_it;
  701. }
  702. txns.pop_back();
  703. }
  704. if (max_num_locks_ > 0) {
  705. // Maintain lock count if there is a limit on the number of locks.
  706. assert(lock_map->locked_key_cnt.LoadRelaxed() > 0);
  707. lock_map->locked_key_cnt.FetchSubRelaxed(1);
  708. }
  709. }
  710. } else {
  711. // This key is either not locked or locked by someone else. This should
  712. // only happen if the unlocking transaction has expired.
  713. assert(txn->GetExpirationTime() > 0 &&
  714. txn->GetExpirationTime() < env->NowMicros());
  715. }
  716. }
  717. void PointLockManager::DecrementWaiters(
  718. const PessimisticTransaction* txn,
  719. const autovector<TransactionID>& wait_ids) {
  720. std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
  721. DecrementWaitersImpl(txn, wait_ids);
  722. }
  723. void PointLockManager::DecrementWaitersImpl(
  724. const PessimisticTransaction* txn,
  725. const autovector<TransactionID>& wait_ids) {
  726. auto id = txn->GetID();
  727. assert(wait_txn_map_.Contains(id));
  728. wait_txn_map_.Delete(id);
  729. for (auto wait_id : wait_ids) {
  730. rev_wait_txn_map_.Get(wait_id)--;
  731. if (rev_wait_txn_map_.Get(wait_id) == 0) {
  732. rev_wait_txn_map_.Delete(wait_id);
  733. }
  734. }
  735. }
  736. bool PointLockManager::IncrementWaiters(
  737. const PessimisticTransaction* txn,
  738. const autovector<TransactionID>& wait_ids, const std::string& key,
  739. const uint32_t& cf_id, const bool& exclusive, Env* const env) {
  740. auto id = txn->GetID();
  741. std::vector<int> queue_parents(
  742. static_cast<size_t>(txn->GetDeadlockDetectDepth()));
  743. std::vector<TransactionID> queue_values(
  744. static_cast<size_t>(txn->GetDeadlockDetectDepth()));
  745. std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
  746. assert(!wait_txn_map_.Contains(id));
  747. wait_txn_map_.Insert(id, {wait_ids, cf_id, exclusive, key});
  748. for (auto wait_id : wait_ids) {
  749. if (rev_wait_txn_map_.Contains(wait_id)) {
  750. rev_wait_txn_map_.Get(wait_id)++;
  751. } else {
  752. rev_wait_txn_map_.Insert(wait_id, 1);
  753. }
  754. }
  755. // No deadlock if nobody is waiting on self.
  756. if (!rev_wait_txn_map_.Contains(id)) {
  757. return false;
  758. }
  759. const auto* next_ids = &wait_ids;
  760. int parent = -1;
  761. int64_t deadlock_time = 0;
  762. for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
  763. int i = 0;
  764. if (next_ids) {
  765. for (; i < static_cast<int>(next_ids->size()) &&
  766. tail + i < txn->GetDeadlockDetectDepth();
  767. i++) {
  768. queue_values[tail + i] = (*next_ids)[i];
  769. queue_parents[tail + i] = parent;
  770. }
  771. tail += i;
  772. }
  773. // No more items in the list, meaning no deadlock.
  774. if (tail == head) {
  775. return false;
  776. }
  777. auto next = queue_values[head];
  778. if (next == id) {
  779. std::vector<DeadlockInfo> path;
  780. while (head != -1) {
  781. assert(wait_txn_map_.Contains(queue_values[head]));
  782. auto extracted_info = wait_txn_map_.Get(queue_values[head]);
  783. path.push_back({queue_values[head], extracted_info.m_cf_id,
  784. extracted_info.m_exclusive,
  785. extracted_info.m_waiting_key});
  786. head = queue_parents[head];
  787. }
  788. if (!env->GetCurrentTime(&deadlock_time).ok()) {
  789. /*
  790. TODO(AR) this preserves the current behaviour whilst checking the
  791. status of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED
  792. passes. Should we instead raise an error if !ok() ?
  793. */
  794. deadlock_time = 0;
  795. }
  796. std::reverse(path.begin(), path.end());
  797. dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
  798. deadlock_time = 0;
  799. DecrementWaitersImpl(txn, wait_ids);
  800. return true;
  801. } else if (!wait_txn_map_.Contains(next)) {
  802. next_ids = nullptr;
  803. continue;
  804. } else {
  805. parent = head;
  806. next_ids = &(wait_txn_map_.Get(next).m_neighbors);
  807. }
  808. }
  809. // Wait cycle too big, just assume deadlock.
  810. if (!env->GetCurrentTime(&deadlock_time).ok()) {
  811. /*
  812. TODO(AR) this preserves the current behaviour whilst checking the status
  813. of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED passes.
  814. Should we instead raise an error if !ok() ?
  815. */
  816. deadlock_time = 0;
  817. }
  818. dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
  819. DecrementWaitersImpl(txn, wait_ids);
  820. return true;
  821. }
  822. void PointLockManager::UnLock(PessimisticTransaction* txn,
  823. ColumnFamilyId column_family_id,
  824. const std::string& key, Env* env) {
  825. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  826. LockMap* lock_map = lock_map_ptr.get();
  827. if (lock_map == nullptr) {
  828. // Column Family must have been dropped.
  829. return;
  830. }
  831. // Lock the mutex for the stripe that this key hashes to
  832. size_t stripe_num = lock_map->GetStripe(key);
  833. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  834. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  835. stripe->stripe_mutex->Lock().AssertOK();
  836. UnLockKey(txn, key, stripe, lock_map, env);
  837. stripe->stripe_mutex->UnLock();
  838. // Signal waiting threads to retry locking
  839. stripe->stripe_cv->NotifyAll();
  840. }
  841. void PointLockManager::UnLock(PessimisticTransaction* txn,
  842. const LockTracker& tracker, Env* env) {
  843. std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
  844. tracker.GetColumnFamilyIterator());
  845. assert(cf_it != nullptr);
  846. while (cf_it->HasNext()) {
  847. ColumnFamilyId cf = cf_it->Next();
  848. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(cf);
  849. LockMap* lock_map = lock_map_ptr.get();
  850. if (!lock_map) {
  851. // Column Family must have been dropped.
  852. return;
  853. }
  854. // Bucket keys by lock_map_ stripe
  855. UnorderedMap<size_t, std::vector<const std::string*>> keys_by_stripe(
  856. lock_map->num_stripes_);
  857. std::unique_ptr<LockTracker::KeyIterator> key_it(
  858. tracker.GetKeyIterator(cf));
  859. assert(key_it != nullptr);
  860. while (key_it->HasNext()) {
  861. const std::string& key = key_it->Next();
  862. size_t stripe_num = lock_map->GetStripe(key);
  863. keys_by_stripe[stripe_num].push_back(&key);
  864. }
  865. // For each stripe, grab the stripe mutex and unlock all keys in this stripe
  866. for (auto& stripe_iter : keys_by_stripe) {
  867. size_t stripe_num = stripe_iter.first;
  868. auto& stripe_keys = stripe_iter.second;
  869. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  870. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  871. stripe->stripe_mutex->Lock().AssertOK();
  872. for (const std::string* key : stripe_keys) {
  873. UnLockKey(txn, *key, stripe, lock_map, env);
  874. }
  875. stripe->stripe_mutex->UnLock();
  876. // Signal waiting threads to retry locking
  877. stripe->stripe_cv->NotifyAll();
  878. }
  879. }
  880. }
  881. PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() {
  882. PointLockStatus data;
  883. // Lock order here is important. The correct order is lock_map_mutex_, then
  884. // for every column family ID in ascending order lock every stripe in
  885. // ascending order.
  886. InstrumentedMutexLock l(&lock_map_mutex_);
  887. std::vector<uint32_t> cf_ids;
  888. for (const auto& map : lock_maps_) {
  889. cf_ids.push_back(map.first);
  890. }
  891. std::sort(cf_ids.begin(), cf_ids.end());
  892. for (auto i : cf_ids) {
  893. const auto& stripes = lock_maps_[i]->lock_map_stripes_;
  894. // Iterate and lock all stripes in ascending order.
  895. for (const auto& j : stripes) {
  896. j->stripe_mutex->Lock().AssertOK();
  897. for (const auto& it : j->keys) {
  898. struct KeyLockInfo info;
  899. info.exclusive = it.second.exclusive;
  900. info.key = it.first;
  901. for (const auto& id : it.second.txn_ids) {
  902. info.ids.push_back(id);
  903. }
  904. data.insert({i, info});
  905. }
  906. }
  907. }
  908. // Unlock everything. Unlocking order is not important.
  909. for (auto i : cf_ids) {
  910. const auto& stripes = lock_maps_[i]->lock_map_stripes_;
  911. for (const auto& j : stripes) {
  912. j->stripe_mutex->UnLock();
  913. }
  914. }
  915. return data;
  916. }
  917. std::vector<DeadlockPath> PointLockManager::GetDeadlockInfoBuffer() {
  918. return dlock_buffer_.PrepareBuffer();
  919. }
  920. void PointLockManager::Resize(uint32_t target_size) {
  921. dlock_buffer_.Resize(target_size);
  922. }
  923. PointLockManager::RangeLockStatus PointLockManager::GetRangeLockStatus() {
  924. return {};
  925. }
  926. Status PointLockManager::TryLock(PessimisticTransaction* /* txn */,
  927. ColumnFamilyId /* cf_id */,
  928. const Endpoint& /* start */,
  929. const Endpoint& /* end */, Env* /* env */,
  930. bool /* exclusive */) {
  931. return Status::NotSupported(
  932. "PointLockManager does not support range locking");
  933. }
  934. void PointLockManager::UnLock(PessimisticTransaction* /* txn */,
  935. ColumnFamilyId /* cf_id */,
  936. const Endpoint& /* start */,
  937. const Endpoint& /* end */, Env* /* env */) {
  938. // no-op
  939. }
  940. // PerKeyPointLockManager implementation
  941. PerKeyPointLockManager::PerKeyPointLockManager(PessimisticTransactionDB* db,
  942. const TransactionDBOptions& opt)
  943. : PointLockManager(db, opt) {}
  944. void DebugLockStatus(TransactionID my_txn_id, const LockInfo& lock_info,
  945. const std::string& key,
  946. const KeyLockWaiterContext& key_lock_waiter_ctx) {
  947. if (kDebugLog) {
  948. char msg[512];
  949. size_t offset = 0;
  950. // print lock holders
  951. offset += snprintf(msg + offset, sizeof(msg),
  952. "Txn %" PRIu64 ": LockStatus key %s: holder [",
  953. my_txn_id, key.c_str());
  954. for (const auto& txn_id : lock_info.txn_ids) {
  955. offset += snprintf(msg + offset, sizeof(msg), "%s%" PRIu64 ",",
  956. lock_info.exclusive ? "X" : "S", txn_id);
  957. }
  958. // print waiter queue
  959. offset += snprintf(msg + offset, sizeof(msg), "], waiter_queue [");
  960. for (auto it = key_lock_waiter_ctx.waiter_queue->begin();
  961. it != key_lock_waiter_ctx.waiter_queue->end(); it++) {
  962. offset += snprintf(msg + offset, sizeof(msg), "%s%" PRIu64 ",",
  963. (*it)->exclusive ? "X" : "S", (*it)->id);
  964. }
  965. offset += snprintf(msg + offset, sizeof(msg), "]\n");
  966. fprintf(stderr, "%s", msg);
  967. fflush(stderr);
  968. }
  969. }
  970. int64_t PerKeyPointLockManager::CalculateWaitEndTime(int64_t expire_time_hint,
  971. int64_t end_time) {
  972. int64_t cv_end_time = -1;
  973. if (expire_time_hint > 0 && end_time > 0) {
  974. cv_end_time = std::min(expire_time_hint, end_time);
  975. } else if (expire_time_hint > 0) {
  976. cv_end_time = expire_time_hint;
  977. } else if (end_time > 0) {
  978. cv_end_time = end_time;
  979. }
  980. return cv_end_time;
  981. }
  982. // Acquire lock within timeout.
  983. // This function is similar to PointLockManger::AcquireWithTimeout with
  984. // following differences.
  985. //
  986. // If deadlock_timeout_us is not 0, it first performs a wait without doing dead
  987. // lock detection. This wait duration is specified by deadlock_timeout_us.
  988. // If this wait times out and it is still not able to acquire the lock, perform
  989. // the deadlock detection before wait again.
  990. //
  991. // It uses a per key lock waiter queue to handle lock waiting and wake up
  992. // efficiently. When a transaction is waiting for acquiring a lock on a key, it
  993. // joins a wait queue that is dedicated for this key. It will either timeout, or
  994. // get woken up when it is its turn to take the lock. This is more efficient
  995. // than the PointLockManger implementation where all lock waiters wait on the
  996. // same lock stripe cond var.
  997. Status PerKeyPointLockManager::AcquireWithTimeout(
  998. PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
  999. ColumnFamilyId column_family_id, const std::string& key, Env* env,
  1000. int64_t timeout, int64_t deadlock_timeout_us,
  1001. const LockInfo& txn_lock_info) {
  1002. Status result;
  1003. uint64_t end_time = 0;
  1004. auto my_txn_id = txn_lock_info.txn_ids[0];
  1005. if (timeout > 0) {
  1006. uint64_t start_time = env->NowMicros();
  1007. end_time = start_time + timeout;
  1008. }
  1009. if (timeout < 0) {
  1010. // If timeout is negative, we wait indefinitely to acquire the lock
  1011. result = stripe->stripe_mutex->Lock();
  1012. } else {
  1013. result = stripe->stripe_mutex->TryLockFor(timeout);
  1014. }
  1015. if (!result.ok()) {
  1016. // failed to acquire mutex
  1017. return result;
  1018. }
  1019. // Acquire lock if we are able to
  1020. uint64_t expire_time_hint = 0;
  1021. autovector<TransactionID> wait_ids;
  1022. bool isUpgrade = false;
  1023. auto lock_info = stripe->GetLockInfo(key);
  1024. auto wait_before_deadlock_detection =
  1025. txn->IsDeadlockDetect() && (deadlock_timeout_us > 0);
  1026. result = AcquireLocked(
  1027. lock_map, stripe, key, env, txn_lock_info, &expire_time_hint,
  1028. // If wait before deadlock detection, it executes a fast path to save CPU
  1029. // cycles, wait ids are not collected.
  1030. wait_before_deadlock_detection ? nullptr : &wait_ids, &lock_info,
  1031. &isUpgrade, true);
  1032. if (!result.ok() && timeout != 0 &&
  1033. /* No need to retry after reach lock limit or aborted */
  1034. !result.IsLockLimit() && !result.IsAborted()) {
  1035. assert(lock_info);
  1036. PERF_TIMER_GUARD(key_lock_wait_time);
  1037. PERF_COUNTER_ADD(key_lock_wait_count, 1);
  1038. // If we weren't able to acquire the lock, we will keep retrying as long
  1039. // as the timeout allows.
  1040. bool timed_out = false;
  1041. bool cv_wait_fail = false;
  1042. KeyLockWaiterContext key_lock_waiter_ctx;
  1043. // Decide how long to wait
  1044. auto cv_end_time = CalculateWaitEndTime(expire_time_hint, end_time);
  1045. // We will try to wait a little bit before checking deadlock, as
  1046. // deadlock check is expensive.
  1047. if (wait_before_deadlock_detection) {
  1048. int64_t now = env->NowMicros();
  1049. if (cv_end_time < 0 || cv_end_time > now) {
  1050. if (kDebugLog) {
  1051. // print lock status before deadlock detection
  1052. fprintf(stderr,
  1053. "Txn %" PRIu64
  1054. " wait before deadlock detection %s, exclusive lock "
  1055. "%d\n",
  1056. my_txn_id, key.c_str(), txn_lock_info.exclusive);
  1057. fflush(stderr);
  1058. }
  1059. stripe->JoinWaitQueue(*lock_info, my_txn_id, txn_lock_info.exclusive,
  1060. false, key_lock_waiter_ctx);
  1061. DebugLockStatus(my_txn_id, *lock_info, key, key_lock_waiter_ctx);
  1062. TEST_SYNC_POINT(
  1063. "PerKeyPointLockManager::AcquireWithTimeout:"
  1064. "WaitingTxnBeforeDeadLockDetection");
  1065. result = stripe->WaitOnLock(
  1066. key_lock_waiter_ctx.lock_waiter,
  1067. std::min(cv_end_time - now, (int64_t)deadlock_timeout_us));
  1068. assert(result.ok() || result.IsTimedOut());
  1069. // Refresh lock info pointer, as this pointer is not guaranteed to be
  1070. // stable in folly
  1071. lock_info = stripe->GetLockInfo(key);
  1072. // try to take a lock again to get wait ids after deadlock timeout
  1073. result = AcquireLocked(lock_map, stripe, key, env, txn_lock_info,
  1074. &expire_time_hint, &wait_ids, &lock_info,
  1075. &isUpgrade, !result.ok());
  1076. } else {
  1077. // Already timed out
  1078. timed_out = true;
  1079. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  1080. }
  1081. }
  1082. while (!result.ok() && !timed_out && !result.IsAborted()) {
  1083. // Refresh wait end time
  1084. cv_end_time = CalculateWaitEndTime(expire_time_hint, end_time);
  1085. // We are dependent on a transaction to finish, so perform deadlock
  1086. // detection.
  1087. if (!wait_ids.empty()) {
  1088. if (txn->IsDeadlockDetect()) {
  1089. if (IncrementWaiters(txn, wait_ids, key, column_family_id,
  1090. txn_lock_info.exclusive, env)) {
  1091. result = Status::Busy(Status::SubCode::kDeadlock);
  1092. break;
  1093. }
  1094. }
  1095. txn->SetWaitingTxn(wait_ids, column_family_id, &key);
  1096. }
  1097. TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn");
  1098. if (kDebugLog) {
  1099. // print transaction lock status and wait ids
  1100. char msg[512];
  1101. size_t offset = 0;
  1102. offset += snprintf(msg + offset, sizeof(msg),
  1103. "Txn %" PRIu64
  1104. " wait after deadlock detection %s, exclusive lock "
  1105. "%d, upgrade %d, wait_ids [",
  1106. my_txn_id, key.c_str(), txn_lock_info.exclusive,
  1107. isUpgrade);
  1108. for (auto it = wait_ids.begin(); it != wait_ids.end(); it++) {
  1109. offset += snprintf(msg + offset, sizeof(msg), "%" PRIu64 ",", *it);
  1110. }
  1111. offset += snprintf(msg + offset, sizeof(msg), "]\n");
  1112. fprintf(stderr, "%s", msg);
  1113. fflush(stderr);
  1114. }
  1115. // If it has not joined wait queue, join it now.
  1116. // If it is a lock upgrade, rejoin it.
  1117. if (isUpgrade || (key_lock_waiter_ctx.waiter_queue == nullptr)) {
  1118. stripe->JoinWaitQueue(*lock_info, my_txn_id, txn_lock_info.exclusive,
  1119. isUpgrade, key_lock_waiter_ctx);
  1120. DebugLockStatus(my_txn_id, *lock_info, key, key_lock_waiter_ctx);
  1121. }
  1122. int64_t now = 0;
  1123. if (cv_end_time < 0) {
  1124. // Wait indefinitely
  1125. result = stripe->WaitOnLock(key_lock_waiter_ctx.lock_waiter);
  1126. cv_wait_fail = !result.ok();
  1127. } else {
  1128. now = env->NowMicros();
  1129. if (cv_end_time > now) {
  1130. result = stripe->WaitOnLock(key_lock_waiter_ctx.lock_waiter,
  1131. cv_end_time - now);
  1132. cv_wait_fail = !result.ok() && !result.IsTimedOut();
  1133. } else {
  1134. // now >= cv_end_time, we already timed out
  1135. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  1136. }
  1137. }
  1138. #ifndef NDEBUG
  1139. stripe->stripe_mutex->UnLock();
  1140. TEST_SYNC_POINT_CALLBACK(
  1141. "PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
  1142. &my_txn_id);
  1143. TEST_SYNC_POINT(
  1144. "PerKeyPointLockManager::AcquireWithTimeout:BeforeTakeLock");
  1145. auto lock_status = stripe->stripe_mutex->Lock();
  1146. assert(lock_status.ok());
  1147. #endif
  1148. if (!wait_ids.empty()) {
  1149. txn->ClearWaitingTxn();
  1150. if (txn->IsDeadlockDetect()) {
  1151. DecrementWaiters(txn, wait_ids);
  1152. }
  1153. }
  1154. if (cv_wait_fail) {
  1155. break;
  1156. }
  1157. if (result.IsTimedOut()) {
  1158. timed_out = true;
  1159. // Even though we timed out, we will still make one more attempt to
  1160. // acquire lock below (it is possible the lock expired and we
  1161. // were never signaled).
  1162. }
  1163. assert(result.ok() || result.IsTimedOut());
  1164. // Refresh lock info pointer, as this pointer is not guaranteed to be
  1165. // stable in folly
  1166. lock_info = stripe->GetLockInfo(key);
  1167. // Try to get the lock again.
  1168. result = AcquireLocked(
  1169. lock_map, stripe, key, env, txn_lock_info, &expire_time_hint,
  1170. &wait_ids, &lock_info, &isUpgrade,
  1171. /* If wait is timed out, it means it is not its turn to take the lock.
  1172. * Therefore, it should still follow FIFO order. */
  1173. timed_out);
  1174. auto fail_to_take_lock_on_its_turn = !timed_out && !result.ok();
  1175. if (fail_to_take_lock_on_its_turn) {
  1176. // If it is its turn, but it failed to take lock, something is broken.
  1177. // Assert this should not happen in debug build during testing.
  1178. // In prod, it simply gives up the attempt.
  1179. assert(!fail_to_take_lock_on_its_turn);
  1180. break;
  1181. }
  1182. if (!result.ok() && cv_end_time >= 0) {
  1183. if (static_cast<int64_t>(end_time) <= now) {
  1184. // lock timeout timed out
  1185. result = Status::TimedOut(Status::SubCode::kLockTimeout);
  1186. timed_out = true;
  1187. }
  1188. }
  1189. }
  1190. // For any reason that the transaction failed to acquire the lock, it should
  1191. // try to wake up next waiters, if they are ready to proceed.
  1192. if (!result.ok()) {
  1193. key_lock_waiter_ctx.TryWakeUpNextWaiters(*lock_info, key);
  1194. }
  1195. }
  1196. stripe->stripe_mutex->UnLock();
  1197. // On timeout, persist the lock information so we can debug the contention
  1198. if (result.IsTimedOut()) {
  1199. txn->SetWaitingTxn(wait_ids, column_family_id, &key, true);
  1200. }
  1201. return result;
  1202. }
  1203. Status PerKeyPointLockManager::FillWaitIds(LockInfo& lock_info,
  1204. const LockInfo& txn_lock_info,
  1205. autovector<TransactionID>* wait_ids,
  1206. bool& isUpgrade,
  1207. TransactionID& my_txn_id,
  1208. const std::string& key) {
  1209. if (wait_ids != nullptr) {
  1210. for (auto id : lock_info.txn_ids) {
  1211. // A transaction is not blocked by itself
  1212. if (id != my_txn_id) {
  1213. wait_ids->push_back(id);
  1214. } else {
  1215. // Itself is already holding a lock, so it is either an upgrade or
  1216. // downgrade. Downgrade has already been handled above. Assert it
  1217. // is an upgrade here.
  1218. auto is_upgrade = !lock_info.exclusive && txn_lock_info.exclusive;
  1219. if (!is_upgrade) {
  1220. if (kDebugLog) {
  1221. fprintf(stderr,
  1222. "txn id %" PRIu64 " assert failed on lock upgrade key %s\n",
  1223. my_txn_id, key.c_str());
  1224. fflush(stderr);
  1225. }
  1226. assert(is_upgrade);
  1227. return Status::Aborted(Status::SubCode::kNotExpectedCodePath);
  1228. }
  1229. isUpgrade = true;
  1230. }
  1231. }
  1232. }
  1233. return Status::OK();
  1234. }
  1235. // This function is similar to PointLockManager::AcquireLocked with following
  1236. // differences.
  1237. //
  1238. // It introduces a per key lock waiter queue. When it tries to take the lock, it
  1239. // will first check whether there are other transactions already in the waiter
  1240. // queue, if so it will return TimeOut. Caller will join the waiter queue, if
  1241. // lock timeout is not reached yet. When it is its to take the lock, it will be
  1242. // woken up and take the lock.
  1243. //
  1244. // It introduces a fast path check that will quickly check whether the lock
  1245. // could be obtained without gathering waiter id information. This allows
  1246. // transaction to sleep a short time before perform deadlock detection.
  1247. //
  1248. // @param lock_info_ptr: pointer to the LockInfo associated with the key. If the
  1249. // key is already locked, LockInfo will be not null. If not, LockInfo is
  1250. // null, and a new LockInfo is created and assigned to lock_info_ptr.
  1251. //
  1252. // @param wait_ids: When wait_ids is nullptr, it perform a fast path check to
  1253. // see whether it could take the lock, it does not fill waiter_ids. If
  1254. // wait_ids is not nullptr, it will fill the wait_ids with the lock holder.
  1255. //
  1256. // @param isUpgrade: isUpgrade is set to true, if the transaction tries to
  1257. // uprade a lock to exclusive, but it needs to wait for other lock holders to
  1258. // release the shared locks. Note that isUpgrade is not set on fast path
  1259. // check.
  1260. //
  1261. // @param fifo: fifo flag indicates whether it should follow fifo order to check
  1262. // whether there is already a waiter waiting for the lock or not. If fifo is
  1263. // true and there is already a lock waiter waiting in the queue and it is not
  1264. // itself, return TimedOut. If fifo is false, it means it is its turn to take
  1265. // the lock.
  1266. Status PerKeyPointLockManager::AcquireLocked(
  1267. LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env,
  1268. const LockInfo& txn_lock_info, uint64_t* expire_time,
  1269. autovector<TransactionID>* wait_ids, LockInfo** lock_info_ptr,
  1270. bool* isUpgrade, bool fifo) {
  1271. assert(txn_lock_info.txn_ids.size() == 1);
  1272. if (wait_ids != nullptr) {
  1273. wait_ids->clear();
  1274. }
  1275. *isUpgrade = false;
  1276. auto my_txn_id = txn_lock_info.txn_ids[0];
  1277. if (!*lock_info_ptr) {
  1278. // No lock nor waiter on this key, so it can try to acquire the lock
  1279. // directly
  1280. if (max_num_locks_ > 0 &&
  1281. lock_map->locked_key_cnt.LoadRelaxed() >= max_num_locks_) {
  1282. return Status::LockLimit();
  1283. } else {
  1284. // acquire lock
  1285. auto ret = stripe->keys.try_emplace(key, my_txn_id,
  1286. txn_lock_info.expiration_time,
  1287. txn_lock_info.exclusive);
  1288. assert(ret.second);
  1289. *lock_info_ptr = &(ret.first->second);
  1290. // Maintain lock count if there is a limit on the number of locks
  1291. if (max_num_locks_ > 0) {
  1292. lock_map->locked_key_cnt.FetchAddRelaxed(1);
  1293. }
  1294. return Status::OK();
  1295. }
  1296. }
  1297. auto& lock_info = **lock_info_ptr;
  1298. auto locked = !lock_info.txn_ids.empty();
  1299. auto solo_lock_owner =
  1300. (lock_info.txn_ids.size() == 1) && (lock_info.txn_ids[0] == my_txn_id);
  1301. // Handle lock downgrade and reentrant first, it should always succeed
  1302. if (locked) {
  1303. if (solo_lock_owner) {
  1304. // Lock is already owned by itself.
  1305. if (lock_info.exclusive && !txn_lock_info.exclusive) {
  1306. // For downgrade, wake up all the shared lock waiters at the front of
  1307. // the waiter queue
  1308. if (lock_info.waiter_queue != nullptr) {
  1309. for (auto& waiter : *lock_info.waiter_queue) {
  1310. if (waiter->exclusive) {
  1311. break;
  1312. }
  1313. waiter->Notify();
  1314. DebugWakeUpWaiter(my_txn_id, waiter->id, key, "Lock Downgrade");
  1315. }
  1316. }
  1317. }
  1318. if (lock_info.exclusive || !txn_lock_info.exclusive) {
  1319. // If it is lock downgrade or re-entrant, grant it immediately
  1320. lock_info.exclusive = txn_lock_info.exclusive;
  1321. lock_info.expiration_time = txn_lock_info.expiration_time;
  1322. return Status::OK();
  1323. }
  1324. } else {
  1325. // handle read reentrant lock for non solo lock owner case
  1326. // Check whether the transaction already hold a shared lock and it is
  1327. // trying to acquire it again.
  1328. if (!txn_lock_info.exclusive && !lock_info.exclusive) {
  1329. auto lock_it = std::find(lock_info.txn_ids.begin(),
  1330. lock_info.txn_ids.end(), my_txn_id);
  1331. if (lock_it != lock_info.txn_ids.end()) {
  1332. lock_info.expiration_time = std::max(lock_info.expiration_time,
  1333. txn_lock_info.expiration_time);
  1334. return Status::OK();
  1335. }
  1336. }
  1337. }
  1338. }
  1339. auto has_waiter =
  1340. (lock_info.waiter_queue != nullptr) && !lock_info.waiter_queue->empty();
  1341. // Update solo lock owner for the rest of the cases
  1342. if (solo_lock_owner) {
  1343. // If there is a shared lock waiter that is ready to take the lock, the
  1344. // current transaction would not be the solo lock owner.
  1345. auto has_ready_shared_lock_waiter =
  1346. has_waiter && lock_info.waiter_queue->front()->IsReady() &&
  1347. (!lock_info.waiter_queue->front()->exclusive);
  1348. solo_lock_owner = !has_ready_shared_lock_waiter;
  1349. }
  1350. // If myself is the first waiter in the queue, skip checking waiter queue
  1351. auto is_first_waiter =
  1352. has_waiter && (lock_info.waiter_queue->front()->id == my_txn_id);
  1353. if (fifo && has_waiter && !is_first_waiter) {
  1354. // There are other waiters ahead of myself
  1355. {
  1356. // handle shared lock request on a shared lock with only shared lock
  1357. // waiters
  1358. if (!txn_lock_info.exclusive &&
  1359. (!locked || (locked && !lock_info.exclusive))) {
  1360. bool has_exclusive_waiter = false;
  1361. // check whether there is exclusive lock waiter
  1362. for (auto& waiter : *lock_info.waiter_queue) {
  1363. if (waiter->exclusive) {
  1364. has_exclusive_waiter = true;
  1365. break;
  1366. }
  1367. }
  1368. if (!has_exclusive_waiter) {
  1369. // no X waiter in the queue, so it can acquire the lock without
  1370. // waiting
  1371. lock_info.txn_ids.push_back(my_txn_id);
  1372. lock_info.exclusive = false;
  1373. lock_info.expiration_time = std::max(lock_info.expiration_time,
  1374. txn_lock_info.expiration_time);
  1375. return Status::OK();
  1376. }
  1377. }
  1378. }
  1379. // fast path check for lock upgrade
  1380. if (solo_lock_owner && !lock_info.exclusive && txn_lock_info.exclusive) {
  1381. // During lock upgrade, if it is the only transaction owns the lock and no
  1382. // other shared lock requesting transaction is ready to take the lock,
  1383. // prioritize the lock grade and grant it now.
  1384. lock_info.exclusive = txn_lock_info.exclusive;
  1385. lock_info.expiration_time = txn_lock_info.expiration_time;
  1386. return Status::OK();
  1387. }
  1388. if (wait_ids == nullptr) {
  1389. // If wait_ids is nullptr, it is a fast path check to see whether it is
  1390. // able to take the lock or not, skip filling the waiting txn ids for
  1391. // deadlock detection.
  1392. return Status::TimedOut(Status::SubCode::kLockTimeout);
  1393. }
  1394. // For other cases with fifo and lock waiter, try to wait in the queue
  1395. // and fill the waiting txn list
  1396. auto s = FillWaitIds(lock_info, txn_lock_info, wait_ids, *isUpgrade,
  1397. my_txn_id, key);
  1398. if (!s.ok()) {
  1399. // propagate error up
  1400. return s;
  1401. }
  1402. // Add the waiter txn ids to the blocking txn id list
  1403. if (txn_lock_info.exclusive) {
  1404. // For exclusive lock, it traverse the queue from front to back to
  1405. // handle upgrade
  1406. for (auto& waiter : *lock_info.waiter_queue) {
  1407. // For upgrade locks, it will be placed at the beginning of
  1408. // the queue. However, for shared lock waiters that are at
  1409. // the beginning of the queue that got woken up but haven't
  1410. // taken the lock yet, they should still be added to the
  1411. // blocking txn id list.
  1412. if (*isUpgrade && waiter->exclusive) {
  1413. break;
  1414. }
  1415. if (waiter->id != my_txn_id) {
  1416. wait_ids->push_back(waiter->id);
  1417. }
  1418. }
  1419. } else {
  1420. // For shared lock, skip the S lock waiters at the end of the queue, as
  1421. // they will be waked up together. Therefore, it traverses the queue from
  1422. // from back to front.
  1423. bool skip_shared_lock_waiter = true;
  1424. for (auto it = lock_info.waiter_queue->rbegin();
  1425. it != lock_info.waiter_queue->rend(); ++it) {
  1426. if ((*it)->exclusive) {
  1427. skip_shared_lock_waiter = false;
  1428. } else {
  1429. if (skip_shared_lock_waiter) {
  1430. continue;
  1431. }
  1432. }
  1433. if ((*it)->id != my_txn_id) {
  1434. wait_ids->push_back((*it)->id);
  1435. }
  1436. }
  1437. }
  1438. return Status::TimedOut(Status::SubCode::kLockTimeout);
  1439. } else {
  1440. // there is no waiter or it is its turn to take the lock
  1441. if (!locked) {
  1442. // no lock on this key, acquire it directly
  1443. lock_info.txn_ids = txn_lock_info.txn_ids;
  1444. lock_info.exclusive = txn_lock_info.exclusive;
  1445. lock_info.expiration_time = txn_lock_info.expiration_time;
  1446. return Status::OK();
  1447. }
  1448. if (IsLockExpired(my_txn_id, lock_info, env, expire_time)) {
  1449. // current lock is expired, steal it.
  1450. lock_info.txn_ids = txn_lock_info.txn_ids;
  1451. lock_info.exclusive = txn_lock_info.exclusive;
  1452. lock_info.expiration_time = txn_lock_info.expiration_time;
  1453. return Status::OK();
  1454. }
  1455. // Check lock compatibility
  1456. if (txn_lock_info.exclusive) {
  1457. // handle lock upgrade
  1458. if (solo_lock_owner) {
  1459. // Lock re-entrant or downgrade has already been handled above.
  1460. // Assert it is an upgrade here. Acquire the lock directly.
  1461. assert(!lock_info.exclusive);
  1462. lock_info.exclusive = txn_lock_info.exclusive;
  1463. lock_info.expiration_time = txn_lock_info.expiration_time;
  1464. return Status::OK();
  1465. } else {
  1466. // lock is already owned by other transactions
  1467. auto s = FillWaitIds(lock_info, txn_lock_info, wait_ids, *isUpgrade,
  1468. my_txn_id, key);
  1469. if (!s.ok()) {
  1470. // propagate error up
  1471. return s;
  1472. }
  1473. return Status::TimedOut(Status::SubCode::kLockTimeout);
  1474. }
  1475. } else {
  1476. // handle shared lock request
  1477. if (lock_info.exclusive) {
  1478. // lock is already owned by other exclusive lock
  1479. auto s = FillWaitIds(lock_info, txn_lock_info, wait_ids, *isUpgrade,
  1480. my_txn_id, key);
  1481. if (!s.ok()) {
  1482. // propagate error up
  1483. return s;
  1484. }
  1485. return Status::TimedOut(Status::SubCode::kLockTimeout);
  1486. } else {
  1487. // lock is on shared lock state, acquire it
  1488. lock_info.txn_ids.push_back(my_txn_id);
  1489. // update the expiration time
  1490. lock_info.expiration_time =
  1491. std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
  1492. return Status::OK();
  1493. }
  1494. }
  1495. }
  1496. }
  1497. void PerKeyPointLockManager::UnLockKey(PessimisticTransaction* txn,
  1498. const std::string& key,
  1499. LockMapStripe* stripe, LockMap* lock_map,
  1500. Env* env) {
  1501. #ifdef NDEBUG
  1502. (void)env;
  1503. #endif
  1504. TransactionID txn_id = txn->GetID();
  1505. auto stripe_iter = stripe->keys.find(key);
  1506. if (stripe_iter != stripe->keys.end()) {
  1507. auto& lock_info = stripe_iter->second;
  1508. auto& txns = lock_info.txn_ids;
  1509. auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
  1510. if (txn_it != txns.end()) {
  1511. // If the lock was held in exclusive mode, only one transaction should
  1512. // holding it.
  1513. if (lock_info.exclusive) {
  1514. assert(txns.size() == 1);
  1515. stripe->ReleaseLastLockHolder(lock_info, stripe_iter, lock_map, txn_id,
  1516. key, max_num_locks_, txns, txn_it);
  1517. } else {
  1518. // In shared mode, it is possible that another transaction is holding
  1519. // a shared lock and is waiting to upgrade the lock to exclusive.
  1520. assert(txns.size() >= 1);
  1521. if (txns.size() > 2) {
  1522. // Including the current transaction, if there are more than 2
  1523. // transactions holding the lock in shared mode, don't wake up any
  1524. // waiter, as the next waiter will not be able to acquire the lock
  1525. // anyway.
  1526. RemoveTransaction(txns, txn_it);
  1527. } else if (txns.size() == 2) {
  1528. // remove the current transaction first.
  1529. RemoveTransaction(txns, txn_it);
  1530. // Check whether the one remained is trying to upgrade the lock by
  1531. // checking whether its id matches.
  1532. auto& waiter_queue = lock_info.waiter_queue;
  1533. if (waiter_queue != nullptr && !waiter_queue->empty() &&
  1534. waiter_queue->front()->id == txns[0]) {
  1535. // There are waiters in the queue and the next one is same as the
  1536. // only one that is still holding the shared lock, wake the waiter
  1537. // up
  1538. waiter_queue->front()->Notify();
  1539. DebugWakeUpWaiter(txn_id, waiter_queue->front()->id, key,
  1540. "Lock Upgrade");
  1541. }
  1542. } else {
  1543. // Current transaction is the only one holding the shared lock
  1544. stripe->ReleaseLastLockHolder(lock_info, stripe_iter, lock_map,
  1545. txn_id, key, max_num_locks_, txns,
  1546. txn_it);
  1547. }
  1548. }
  1549. }
  1550. } else {
  1551. // This key is either not locked or locked by someone else. This should
  1552. // only happen if the unlocking transaction has expired.
  1553. assert(txn->GetExpirationTime() > 0 &&
  1554. txn->GetExpirationTime() < env->NowMicros());
  1555. }
  1556. }
  1557. void PerKeyPointLockManager::UnLock(PessimisticTransaction* txn,
  1558. ColumnFamilyId column_family_id,
  1559. const std::string& key, Env* env) {
  1560. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  1561. LockMap* lock_map = lock_map_ptr.get();
  1562. if (lock_map == nullptr) {
  1563. // Column Family must have been dropped.
  1564. return;
  1565. }
  1566. // Lock the mutex for the stripe that this key hashes to
  1567. size_t stripe_num = lock_map->GetStripe(key);
  1568. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  1569. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  1570. stripe->stripe_mutex->Lock().AssertOK();
  1571. UnLockKey(txn, key, stripe, lock_map, env);
  1572. stripe->stripe_mutex->UnLock();
  1573. }
  1574. void PerKeyPointLockManager::UnLock(PessimisticTransaction* txn,
  1575. const LockTracker& tracker, Env* env) {
  1576. std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
  1577. tracker.GetColumnFamilyIterator());
  1578. assert(cf_it != nullptr);
  1579. while (cf_it->HasNext()) {
  1580. ColumnFamilyId cf = cf_it->Next();
  1581. std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(cf);
  1582. LockMap* lock_map = lock_map_ptr.get();
  1583. if (!lock_map) {
  1584. // Column Family must have been dropped.
  1585. return;
  1586. }
  1587. // Bucket keys by lock_map_ stripe
  1588. UnorderedMap<size_t, std::vector<const std::string*>> keys_by_stripe(
  1589. lock_map->num_stripes_);
  1590. std::unique_ptr<LockTracker::KeyIterator> key_it(
  1591. tracker.GetKeyIterator(cf));
  1592. assert(key_it != nullptr);
  1593. while (key_it->HasNext()) {
  1594. const std::string& key = key_it->Next();
  1595. size_t stripe_num = lock_map->GetStripe(key);
  1596. keys_by_stripe[stripe_num].push_back(&key);
  1597. }
  1598. // For each stripe, grab the stripe mutex and unlock all keys in this
  1599. // stripe
  1600. for (auto& stripe_iter : keys_by_stripe) {
  1601. size_t stripe_num = stripe_iter.first;
  1602. auto& stripe_keys = stripe_iter.second;
  1603. assert(lock_map->lock_map_stripes_.size() > stripe_num);
  1604. LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  1605. stripe->stripe_mutex->Lock().AssertOK();
  1606. for (const std::string* key : stripe_keys) {
  1607. UnLockKey(txn, *key, stripe, lock_map, env);
  1608. }
  1609. stripe->stripe_mutex->UnLock();
  1610. }
  1611. }
  1612. }
  1613. void PerKeyPointLockManager::UnLock(PessimisticTransaction* /* txn */,
  1614. ColumnFamilyId /* cf_id */,
  1615. const Endpoint& /* start */,
  1616. const Endpoint& /* end */, Env* /* env */) {
  1617. // no-op
  1618. }
  1619. } // namespace ROCKSDB_NAMESPACE