point_lock_manager_test.cc 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435
  1. // Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/lock/point/point_lock_manager_test.h"
  6. #include "utilities/transactions/lock/point/any_lock_manager_test.h"
  7. namespace ROCKSDB_NAMESPACE {
  8. struct SpotLockManagerTestParam {
  9. bool use_per_key_point_lock_manager;
  10. int deadlock_timeout_us;
  11. };
  12. // Define operator<< for SpotLockManagerTestParam to stop valgrind from
  13. // complaining uinitialized value when printing SpotLockManagerTestParam.
  14. std::ostream& operator<<(std::ostream& os,
  15. const SpotLockManagerTestParam& param) {
  16. os << "use_per_key_point_lock_manager: "
  17. << param.use_per_key_point_lock_manager
  18. << ", deadlock_timeout_us: " << param.deadlock_timeout_us;
  19. return os;
  20. }
  21. // including test for both PointLockManager and PerKeyPointLockManager
  22. class SpotLockManagerTest
  23. : public PointLockManagerTest,
  24. public testing::WithParamInterface<SpotLockManagerTestParam> {
  25. public:
  26. void SetUp() override {
  27. init();
  28. // If a custom setup function was provided, use it. Otherwise, use what we
  29. // have inherited.
  30. auto param = GetParam();
  31. if (param.use_per_key_point_lock_manager) {
  32. locker_.reset(new PerKeyPointLockManager(
  33. static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
  34. } else {
  35. locker_.reset(new PointLockManager(
  36. static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
  37. }
  38. deadlock_timeout_us = param.deadlock_timeout_us;
  39. }
  40. };
  41. // This test is not applicable for Range Lock manager as Range Lock Manager
  42. // operates on Column Families, not their ids.
  43. TEST_P(SpotLockManagerTest, LockNonExistingColumnFamily) {
  44. MockColumnFamilyHandle cf(1024);
  45. locker_->RemoveColumnFamily(&cf);
  46. auto txn = NewTxn();
  47. auto s = locker_->TryLock(txn, 1024, "k", env_, true);
  48. ASSERT_TRUE(s.IsInvalidArgument());
  49. ASSERT_STREQ(s.getState(), "Column family id not found: 1024");
  50. delete txn;
  51. }
  52. TEST_P(SpotLockManagerTest, LockStatus) {
  53. MockColumnFamilyHandle cf1(1024), cf2(2048);
  54. locker_->AddColumnFamily(&cf1);
  55. locker_->AddColumnFamily(&cf2);
  56. auto txn1 = NewTxn();
  57. ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true));
  58. ASSERT_OK(locker_->TryLock(txn1, 2048, "k1", env_, true));
  59. auto txn2 = NewTxn();
  60. ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false));
  61. ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false));
  62. auto s = locker_->GetPointLockStatus();
  63. ASSERT_EQ(s.size(), 4u);
  64. for (uint32_t cf_id : {1024, 2048}) {
  65. ASSERT_EQ(s.count(cf_id), 2u);
  66. auto range = s.equal_range(cf_id);
  67. for (auto it = range.first; it != range.second; it++) {
  68. ASSERT_TRUE(it->second.key == "k1" || it->second.key == "k2");
  69. if (it->second.key == "k1") {
  70. ASSERT_EQ(it->second.exclusive, true);
  71. ASSERT_EQ(it->second.ids.size(), 1u);
  72. ASSERT_EQ(it->second.ids[0], txn1->GetID());
  73. } else if (it->second.key == "k2") {
  74. ASSERT_EQ(it->second.exclusive, false);
  75. ASSERT_EQ(it->second.ids.size(), 1u);
  76. ASSERT_EQ(it->second.ids[0], txn2->GetID());
  77. }
  78. }
  79. }
  80. // Cleanup
  81. locker_->UnLock(txn1, 1024, "k1", env_);
  82. locker_->UnLock(txn1, 2048, "k1", env_);
  83. locker_->UnLock(txn2, 1024, "k2", env_);
  84. locker_->UnLock(txn2, 2048, "k2", env_);
  85. delete txn1;
  86. delete txn2;
  87. }
  88. TEST_P(SpotLockManagerTest, UnlockExclusive) {
  89. MockColumnFamilyHandle cf(1);
  90. locker_->AddColumnFamily(&cf);
  91. auto txn1 = NewTxn();
  92. ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true));
  93. locker_->UnLock(txn1, 1, "k", env_);
  94. auto txn2 = NewTxn();
  95. ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
  96. // Cleanup
  97. locker_->UnLock(txn2, 1, "k", env_);
  98. delete txn1;
  99. delete txn2;
  100. }
  101. TEST_P(SpotLockManagerTest, UnlockShared) {
  102. MockColumnFamilyHandle cf(1);
  103. locker_->AddColumnFamily(&cf);
  104. auto txn1 = NewTxn();
  105. ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
  106. locker_->UnLock(txn1, 1, "k", env_);
  107. auto txn2 = NewTxn();
  108. ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
  109. // Cleanup
  110. locker_->UnLock(txn2, 1, "k", env_);
  111. delete txn1;
  112. delete txn2;
  113. }
  114. // This test doesn't work with Range Lock Manager, because Range Lock Manager
  115. // doesn't support deadlock_detect_depth.
  116. TEST_P(SpotLockManagerTest, DeadlockDepthExceeded) {
  117. // Tests that when detecting deadlock, if the detection depth is exceeded,
  118. // it's also viewed as deadlock.
  119. MockColumnFamilyHandle cf(1);
  120. locker_->AddColumnFamily(&cf);
  121. TransactionOptions txn_opt;
  122. txn_opt.deadlock_detect = true;
  123. txn_opt.deadlock_detect_depth = 1;
  124. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  125. auto txn1 = NewTxn(txn_opt);
  126. auto txn2 = NewTxn(txn_opt);
  127. auto txn3 = NewTxn(txn_opt);
  128. auto txn4 = NewTxn(txn_opt);
  129. // "a ->(k) b" means transaction a is waiting for transaction b to release
  130. // the held lock on key k.
  131. // txn4 ->(k3) -> txn3 ->(k2) txn2 ->(k1) txn1
  132. // txn3's deadlock detection will exceed the detection depth 1,
  133. // which will be viewed as a deadlock.
  134. // NOTE:
  135. // txn4 ->(k3) -> txn3 must be set up before
  136. // txn3 ->(k2) -> txn2, because to trigger deadlock detection for txn3,
  137. // it must have another txn waiting on it, which is txn4 in this case.
  138. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  139. port::Thread t1;
  140. BlockUntilWaitingTxn(wait_sync_point_name_, t1, [&]() {
  141. ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
  142. // block because txn1 is holding a lock on k1.
  143. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  144. });
  145. ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true));
  146. port::Thread t2;
  147. BlockUntilWaitingTxn(wait_sync_point_name_, t2, [&]() {
  148. // block because txn3 is holding a lock on k1.
  149. ASSERT_OK(locker_->TryLock(txn4, 1, "k3", env_, true));
  150. });
  151. auto s = locker_->TryLock(txn3, 1, "k2", env_, true);
  152. ASSERT_TRUE(s.IsBusy());
  153. ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
  154. std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
  155. ASSERT_EQ(deadlock_paths.size(), 1u);
  156. ASSERT_TRUE(deadlock_paths[0].limit_exceeded);
  157. locker_->UnLock(txn1, 1, "k1", env_);
  158. locker_->UnLock(txn3, 1, "k3", env_);
  159. t1.join();
  160. t2.join();
  161. locker_->UnLock(txn2, 1, "k2", env_);
  162. locker_->UnLock(txn2, 1, "k1", env_);
  163. locker_->UnLock(txn4, 1, "k3", env_);
  164. delete txn4;
  165. delete txn3;
  166. delete txn2;
  167. delete txn1;
  168. }
  169. TEST_P(SpotLockManagerTest, PrioritizedLockUpgradeWithExclusiveLock) {
  170. // Tests that a lock upgrade request is prioritized over other lock requests.
  171. // txn1 acquires shared lock on k1.
  172. // txn2 acquires exclusive lock on k1.
  173. // txn1 acquires exclusive locks k1 successfully
  174. MockColumnFamilyHandle cf(1);
  175. locker_->AddColumnFamily(&cf);
  176. TransactionOptions txn_opt;
  177. txn_opt.deadlock_detect = true;
  178. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  179. auto txn1 = NewTxn(txn_opt);
  180. auto txn2 = NewTxn(txn_opt);
  181. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  182. // txn2 tries to lock k1 exclusively, will be blocked.
  183. port::Thread t;
  184. BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn2]() {
  185. // block because txn1 is holding a shared lock on k1.
  186. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  187. });
  188. // verify lock upgrade successfully
  189. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  190. // unlock txn1, so txn2 could proceed
  191. locker_->UnLock(txn1, 1, "k1", env_);
  192. // Cleanup
  193. t.join();
  194. // Cleanup
  195. locker_->UnLock(txn2, 1, "k1", env_);
  196. delete txn2;
  197. delete txn1;
  198. }
  199. TEST_P(SpotLockManagerTest,
  200. PrioritizedLockUpgradeWithExclusiveLockAndSharedLock) {
  201. // Tests that lock upgrade is prioritized when mixed with shared and exclusive
  202. // locks requests
  203. // txn1 acquires shared lock on k1.
  204. // txn2 acquires shared lock on k1.
  205. // txn3 acquires exclusive lock on k1.
  206. // txn1 acquires exclusive locks k1 <- request granted after txn2 release the
  207. // lock
  208. MockColumnFamilyHandle cf(1);
  209. locker_->AddColumnFamily(&cf);
  210. TransactionOptions txn_opt;
  211. txn_opt.deadlock_detect = true;
  212. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  213. auto txn1 = NewTxn(txn_opt);
  214. auto txn2 = NewTxn(txn_opt);
  215. auto txn3 = NewTxn(txn_opt);
  216. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  217. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  218. // txn3 tries to lock k1 exclusively, will be blocked.
  219. port::Thread txn3_thread;
  220. BlockUntilWaitingTxn(wait_sync_point_name_, txn3_thread, [this, &txn3]() {
  221. // block because txn1 and txn2 are holding a shared lock on k1.
  222. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
  223. });
  224. // Verify txn3 is blocked
  225. ASSERT_TRUE(txn3_thread.joinable());
  226. // txn1 tries to lock k1 exclusively, will be blocked.
  227. port::Thread txn1_thread;
  228. BlockUntilWaitingTxn(wait_sync_point_name_, txn1_thread, [this, &txn1]() {
  229. // block because txn1 and txn2 are holding a shared lock on k1.
  230. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  231. });
  232. // Verify txn1 is blocked
  233. ASSERT_TRUE(txn1_thread.joinable());
  234. // Unlock txn2, so txn1 could proceed
  235. locker_->UnLock(txn2, 1, "k1", env_);
  236. txn1_thread.join();
  237. // Unlock txn1, so txn3 could proceed
  238. locker_->UnLock(txn1, 1, "k1", env_);
  239. txn3_thread.join();
  240. // Cleanup
  241. locker_->UnLock(txn3, 1, "k1", env_);
  242. delete txn3;
  243. delete txn2;
  244. delete txn1;
  245. }
  246. TEST_P(SpotLockManagerTest, Deadlock_MultipleUpgrade) {
  247. // Tests that deadlock can be detected for shared locks and exclusive locks
  248. // mixed Deadlock scenario:
  249. // txn1 acquires shared lock on k1.
  250. // txn2 acquires shared lock on k1.
  251. // txn1 acquires exclusive locks k1
  252. // txn2 acquires exclusive locks k1 <- dead lock detected
  253. MockColumnFamilyHandle cf(1);
  254. locker_->AddColumnFamily(&cf);
  255. TransactionOptions txn_opt;
  256. txn_opt.deadlock_detect = true;
  257. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  258. auto txn1 = NewTxn(txn_opt);
  259. auto txn2 = NewTxn(txn_opt);
  260. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  261. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  262. // txn1 tries to lock k1 exclusively, will be blocked.
  263. port::Thread t;
  264. BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn1]() {
  265. // block because txn2 is holding a shared lock on k1.
  266. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  267. });
  268. auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
  269. ASSERT_TRUE(s.IsBusy());
  270. ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
  271. std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
  272. ASSERT_EQ(deadlock_paths.size(), 1u);
  273. ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
  274. std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
  275. ASSERT_EQ(deadlocks.size(), 2u);
  276. ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
  277. ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
  278. ASSERT_TRUE(deadlocks[0].m_exclusive);
  279. ASSERT_EQ(deadlocks[0].m_waiting_key, "k1");
  280. ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
  281. ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
  282. ASSERT_TRUE(deadlocks[1].m_exclusive);
  283. ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
  284. locker_->UnLock(txn2, 1, "k1", env_);
  285. t.join();
  286. // Cleanup
  287. locker_->UnLock(txn1, 1, "k1", env_);
  288. delete txn2;
  289. delete txn1;
  290. }
  291. TEST_P(SpotLockManagerTest, Deadlock_MultipleUpgradeInterleaveExclusive) {
  292. // Tests that deadlock can be detected for shared locks and exclusive locks
  293. // mixed Deadlock scenario:
  294. // txn1 acquires shared lock on k1.
  295. // txn2 acquires shared lock on k1.
  296. // txn3 acquires exclusive lock on k1.
  297. // txn1 acquires exclusive locks k1 <- request granted after txn2 release the
  298. // lock.
  299. // txn2 acquires exclusive locks k1 <- dead lock detected
  300. MockColumnFamilyHandle cf(1);
  301. locker_->AddColumnFamily(&cf);
  302. TransactionOptions txn_opt;
  303. txn_opt.deadlock_detect = true;
  304. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  305. auto txn1 = NewTxn(txn_opt);
  306. auto txn2 = NewTxn(txn_opt);
  307. auto txn3 = NewTxn(txn_opt);
  308. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  309. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  310. // txn3 tries to lock k1 exclusively, will be blocked.
  311. port::Thread txn3_thread;
  312. BlockUntilWaitingTxn(wait_sync_point_name_, txn3_thread, [this, &txn3]() {
  313. // block because txn1 and txn2 are holding a shared lock on k1.
  314. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
  315. });
  316. // Verify txn3 is blocked
  317. ASSERT_TRUE(txn3_thread.joinable());
  318. // txn1 tries to lock k1 exclusively, will be blocked.
  319. port::Thread txn1_thread;
  320. BlockUntilWaitingTxn(wait_sync_point_name_, txn1_thread, [this, &txn1]() {
  321. // block because txn1 and txn2 are holding a shared lock on k1.
  322. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  323. });
  324. // Verify txn1 is blocked
  325. ASSERT_TRUE(txn1_thread.joinable());
  326. auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
  327. ASSERT_TRUE(s.IsBusy());
  328. ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
  329. std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
  330. ASSERT_EQ(deadlock_paths.size(), 1u);
  331. ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
  332. std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
  333. ASSERT_EQ(deadlocks.size(), 2u);
  334. ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
  335. ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
  336. ASSERT_TRUE(deadlocks[0].m_exclusive);
  337. ASSERT_EQ(deadlocks[0].m_waiting_key, "k1");
  338. ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
  339. ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
  340. ASSERT_TRUE(deadlocks[1].m_exclusive);
  341. ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
  342. // Unlock txn2, so txn1 could proceed
  343. locker_->UnLock(txn2, 1, "k1", env_);
  344. txn1_thread.join();
  345. // Unlock txn1, so txn3 could proceed
  346. locker_->UnLock(txn1, 1, "k1", env_);
  347. txn3_thread.join();
  348. // Cleanup
  349. locker_->UnLock(txn3, 1, "k1", env_);
  350. delete txn3;
  351. delete txn2;
  352. delete txn1;
  353. }
  354. class PerKeyPointLockManagerTest : public PointLockManagerTest {
  355. public:
  356. void SetUp() override {
  357. init();
  358. cf_ = std::make_unique<MockColumnFamilyHandle>(1);
  359. txn_opt_.deadlock_detect = true;
  360. // by default use long timeout and disable expiration
  361. txn_opt_.lock_timeout = kLongTxnTimeoutMs;
  362. txn_opt_.expiration = -1;
  363. // CAUTION: This test creates a separate lock manager object (right, NOT
  364. // the one that the TransactionDB is using!), and runs tests on it.
  365. locker_.reset(new PerKeyPointLockManager(
  366. static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
  367. locker_->AddColumnFamily(cf_.get());
  368. }
  369. TransactionOptions txn_opt_;
  370. std::unique_ptr<MockColumnFamilyHandle> cf_;
  371. };
  372. TEST_F(PerKeyPointLockManagerTest, LockEfficiency) {
  373. // Create multiple transactions, each acquire exclusive lock on the same key
  374. std::vector<PessimisticTransaction*> txns;
  375. std::vector<port::Thread> blockingThreads;
  376. // Count the total number of wait sync point calls
  377. std::atomic_int wait_sync_point_times = 0;
  378. SyncPoint::GetInstance()->SetCallBack(
  379. wait_sync_point_name_,
  380. [&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
  381. SyncPoint::GetInstance()->EnableProcessing();
  382. constexpr auto num_of_txn = 10;
  383. // create 10 transactions, each of them try to acquire exclusive lock on the
  384. // same key
  385. for (int i = 0; i < num_of_txn; i++) {
  386. auto txn = NewTxn(txn_opt_);
  387. txns.push_back(txn);
  388. if (i == 0) {
  389. // txn0 acquires the lock, so the rest of the transactions could block
  390. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, true));
  391. } else {
  392. blockingThreads.emplace_back([this, txn]() {
  393. // block because first txn is holding an exclusive lock on k1.
  394. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, true));
  395. });
  396. }
  397. // wait for transaction i to be blocked
  398. while (wait_sync_point_times.load() < i) {
  399. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  400. }
  401. }
  402. // unlock the key, so next transaction could take the lock.
  403. locker_->UnLock(txns[0], 1, "k1", env_);
  404. auto num_of_blocking_thread = num_of_txn - 1;
  405. for (int i = 0; i < num_of_blocking_thread; i++) {
  406. // validate the thread is finished
  407. blockingThreads[i].join();
  408. auto num_of_threads_completed = i + 1;
  409. for (int j = 0; j < num_of_blocking_thread; j++) {
  410. if (j < num_of_threads_completed) {
  411. // validate the thread is no longer joinable
  412. ASSERT_FALSE(blockingThreads[j].joinable());
  413. } else {
  414. // validate the rest of the threads are still joinable
  415. ASSERT_TRUE(blockingThreads[j].joinable());
  416. }
  417. }
  418. // unlock the key, so next transaction could take the lock.
  419. locker_->UnLock(txns[i + 1], 1, "k1", env_);
  420. }
  421. ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
  422. SyncPoint::GetInstance()->DisableProcessing();
  423. SyncPoint::GetInstance()->ClearAllCallBacks();
  424. for (int i = 0; i < num_of_txn; i++) {
  425. delete txns[num_of_txn - i - 1];
  426. }
  427. }
  428. TEST_F(PerKeyPointLockManagerTest, LockFairness) {
  429. // Create multiple transactions requesting locks on the same key, validate
  430. // that they are executed in FIFO order
  431. // txn0 acquires exclusive lock on k1.
  432. // txn1 acquires shared lock on k1.
  433. // txn2 acquires shared lock on k1.
  434. // txn3 acquires exclusive lock on k1.
  435. // txn4 acquires shared lock on k1.
  436. // txn5 acquires exclusive lock on k1.
  437. // txn6 acquires exclusive lock on k1.
  438. // txn7 acquires shared lock on k1.
  439. // txn8 acquires shared lock on k1.
  440. // txn9 acquires exclusive lock on k1.
  441. std::vector<PessimisticTransaction*> txns;
  442. std::vector<port::Thread> blockingThreads;
  443. // Count the total number of wait sync point calls
  444. std::atomic_int wait_sync_point_times = 0;
  445. SyncPoint::GetInstance()->SetCallBack(
  446. wait_sync_point_name_,
  447. [&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
  448. SyncPoint::GetInstance()->EnableProcessing();
  449. constexpr auto num_of_txn = 10;
  450. std::vector<bool> txn_lock_types = {true, false, false, true, false,
  451. true, true, false, false, true};
  452. // create 10 transactions, each of them try to acquire exclusive lock on the
  453. // same key
  454. for (int i = 0; i < num_of_txn; i++) {
  455. auto txn = NewTxn(txn_opt_);
  456. txns.push_back(txn);
  457. if (i == 0) {
  458. // txn0 acquires the lock, so the rest of the transactions would block
  459. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, txn_lock_types[0]));
  460. } else {
  461. blockingThreads.emplace_back([this, txn, type = txn_lock_types[i]]() {
  462. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, type));
  463. });
  464. }
  465. // wait for transaction i to be blocked
  466. while (wait_sync_point_times.load() < i) {
  467. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  468. }
  469. }
  470. auto num_of_blocking_thread = num_of_txn - 1;
  471. auto thread_idx = 0;
  472. auto txn_idx = 0;
  473. auto unlockTxn = [&]() {
  474. // unlock the key in transaction.
  475. locker_->UnLock(txns[txn_idx++], 1, "k1", env_);
  476. };
  477. auto validateLockTakenByNextTxn = [&]() {
  478. // validate the thread is finished
  479. blockingThreads[thread_idx++].join();
  480. };
  481. auto stillWaitingForLock = [&]() {
  482. // validate the thread is no longer joinable
  483. ASSERT_TRUE(blockingThreads[thread_idx].joinable());
  484. };
  485. // unlock the key, so next group of transactions could take the lock.
  486. unlockTxn();
  487. // txn1 acquires shared lock on k1.
  488. // txn2 acquires shared lock on k1.
  489. validateLockTakenByNextTxn();
  490. validateLockTakenByNextTxn();
  491. // txn3 acquires exclusive lock on k1.
  492. stillWaitingForLock();
  493. unlockTxn();
  494. unlockTxn();
  495. validateLockTakenByNextTxn();
  496. // txn4 acquires shared lock on k1.
  497. stillWaitingForLock();
  498. unlockTxn();
  499. validateLockTakenByNextTxn();
  500. // txn5 acquires exclusive lock on k1.
  501. stillWaitingForLock();
  502. unlockTxn();
  503. validateLockTakenByNextTxn();
  504. // txn6 acquires exclusive lock on k1.
  505. stillWaitingForLock();
  506. unlockTxn();
  507. validateLockTakenByNextTxn();
  508. // txn7 acquires shared lock on k1.
  509. // txn8 acquires shared lock on k1.
  510. stillWaitingForLock();
  511. unlockTxn();
  512. validateLockTakenByNextTxn();
  513. validateLockTakenByNextTxn();
  514. // txn9 acquires exclusive lock on k1.
  515. stillWaitingForLock();
  516. unlockTxn();
  517. unlockTxn();
  518. validateLockTakenByNextTxn();
  519. // clean up
  520. unlockTxn();
  521. ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
  522. SyncPoint::GetInstance()->DisableProcessing();
  523. SyncPoint::GetInstance()->ClearAllCallBacks();
  524. for (int i = 0; i < num_of_txn; i++) {
  525. delete txns[num_of_txn - i - 1];
  526. }
  527. }
  528. TEST_F(PerKeyPointLockManagerTest, FIFO) {
  529. // validate S, X, S lock order would be executed in FIFO order
  530. // txn1 acquires shared lock on k1.
  531. // txn2 acquires exclusive lock on k1.
  532. // txn3 acquires shared lock on k1.
  533. std::vector<PessimisticTransaction*> txns;
  534. std::vector<port::Thread> blockingThreads;
  535. // Count the total number of wait sync point calls
  536. std::atomic_int wait_sync_point_times = 0;
  537. SyncPoint::GetInstance()->SetCallBack(
  538. wait_sync_point_name_,
  539. [&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
  540. SyncPoint::GetInstance()->EnableProcessing();
  541. constexpr auto num_of_txn = 3;
  542. std::vector<bool> txn_lock_types = {false, true, false};
  543. // create 3 transactions, each of them try to acquire exclusive lock on the
  544. // same key
  545. for (int i = 0; i < num_of_txn; i++) {
  546. auto txn = NewTxn(txn_opt_);
  547. txns.push_back(txn);
  548. if (i == 0) {
  549. // txn0 acquires the lock, so the rest of the transactions would block
  550. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, txn_lock_types[0]));
  551. } else {
  552. blockingThreads.emplace_back([this, txn, type = txn_lock_types[i]]() {
  553. ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, type));
  554. });
  555. }
  556. // wait for transaction i to be blocked
  557. while (wait_sync_point_times.load() < i) {
  558. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  559. }
  560. }
  561. auto num_of_blocking_thread = num_of_txn - 1;
  562. auto thread_idx = 0;
  563. auto txn_idx = 0;
  564. auto unlockTxn = [&]() {
  565. // unlock the key in transaction.
  566. locker_->UnLock(txns[txn_idx++], 1, "k1", env_);
  567. };
  568. auto validateLockTakenByNextTxn = [&]() {
  569. // validate the thread is finished
  570. blockingThreads[thread_idx++].join();
  571. };
  572. auto stillWaitingForLock = [&]() {
  573. // validate the thread is no longer joinable
  574. ASSERT_TRUE(blockingThreads[thread_idx].joinable());
  575. };
  576. // unlock the key, so next group of transactions could take the lock.
  577. stillWaitingForLock();
  578. unlockTxn();
  579. // txn1 acquires exclusive lock on k1.
  580. validateLockTakenByNextTxn();
  581. // txn2 acquires shared lock on k1.
  582. stillWaitingForLock();
  583. unlockTxn();
  584. validateLockTakenByNextTxn();
  585. // clean up
  586. unlockTxn();
  587. ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
  588. SyncPoint::GetInstance()->DisableProcessing();
  589. SyncPoint::GetInstance()->ClearAllCallBacks();
  590. for (int i = 0; i < num_of_txn; i++) {
  591. delete txns[num_of_txn - i - 1];
  592. }
  593. }
  594. TEST_P(SpotLockManagerTest, LockDownGradeWithOtherLockRequests) {
  595. // Test lock down grade always succeeds, even if there are other lock requests
  596. // waiting for the same lock.
  597. MockColumnFamilyHandle cf(1);
  598. locker_->AddColumnFamily(&cf);
  599. TransactionOptions txn_opt;
  600. txn_opt.deadlock_detect = true;
  601. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  602. auto txn1 = NewTxn(txn_opt);
  603. auto txn2 = NewTxn(txn_opt);
  604. for (bool exclusive : {true, false}) {
  605. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  606. port::Thread t;
  607. BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn2, exclusive]() {
  608. // block because txn1 is holding a exclusive lock on k1.
  609. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, exclusive));
  610. });
  611. // txn1 downgrades the lock to shared lock, so txn2 could proceed
  612. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  613. locker_->UnLock(txn1, 1, "k1", env_);
  614. t.join();
  615. locker_->UnLock(txn2, 1, "k1", env_);
  616. }
  617. // clean up
  618. delete txn2;
  619. delete txn1;
  620. }
  621. TEST_P(SpotLockManagerTest, LockTimeout) {
  622. // Test lock timeout
  623. // txn1 acquires an exclusive lock on k1 successfully.
  624. // txn2 try to acquire a lock on k1, but timedout.
  625. MockColumnFamilyHandle cf(1);
  626. locker_->AddColumnFamily(&cf);
  627. TransactionOptions txn_opt;
  628. txn_opt.deadlock_detect = true;
  629. txn_opt.lock_timeout = kShortTxnTimeoutMs;
  630. auto txn1 = NewTxn(txn_opt);
  631. auto txn2 = NewTxn(txn_opt);
  632. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  633. for (bool exclusive : {true, false}) {
  634. auto ret = locker_->TryLock(txn2, 1, "k1", env_, exclusive);
  635. ASSERT_TRUE(ret.IsTimedOut());
  636. }
  637. // clean up
  638. locker_->UnLock(txn1, 1, "k1", env_);
  639. delete txn2;
  640. delete txn1;
  641. }
  642. TEST_P(SpotLockManagerTest, ExpiredLockStolenAfterTimeout) {
  643. // validate an expired lock can be stolen by another transaction that timed
  644. // out on the lock.
  645. // txn1 acquires an exclusive lock on k1 successfully with a short expiration
  646. // time.
  647. // txn2 try to acquire a shared lock on k1 with timeout that is slightly
  648. // longer than the txn1 expiration.
  649. // Validate txn2 will take the lock.
  650. MockColumnFamilyHandle cf(1);
  651. locker_->AddColumnFamily(&cf);
  652. TransactionOptions txn_opt;
  653. txn_opt.deadlock_detect = true;
  654. txn_opt.expiration = 1000;
  655. txn_opt.lock_timeout = 1000 * 2;
  656. auto txn1 = NewTxn(txn_opt);
  657. auto txn2 = NewTxn(txn_opt);
  658. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  659. port::Thread t1;
  660. BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
  661. // block because txn1 is holding an exclusive lock on k1.
  662. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  663. });
  664. t1.join();
  665. // clean up
  666. locker_->UnLock(txn2, 1, "k1", env_);
  667. locker_->UnLock(txn1, 1, "k1", env_);
  668. delete txn2;
  669. delete txn1;
  670. }
  671. // Try to block until transaction enters waiting state.
  672. // However due to timing, it could fail, so return true if succeeded, false
  673. // otherwise.
  674. bool TryBlockUntilWaitingTxn(const char* sync_point_name, port::Thread& t,
  675. std::function<void()> function) {
  676. std::atomic<bool> reached(false);
  677. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  678. sync_point_name, [&](void* /*arg*/) { reached.store(true); });
  679. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  680. // As the lifetime of the complete variable could go beyond the scope of this
  681. // function, so we wrap it in a shared_ptr, and copy it into the lambda
  682. std::shared_ptr<std::atomic<bool>> complete =
  683. std::make_shared<std::atomic<bool>>(false);
  684. t = port::Thread([complete, &function]() {
  685. function();
  686. complete->store(true);
  687. });
  688. auto ret = false;
  689. while (true) {
  690. if (complete->load()) {
  691. // function completed, before sync point was reached, return false
  692. t.join();
  693. ret = false;
  694. break;
  695. }
  696. if (reached.load()) {
  697. // sync point was reached before function completed, return true
  698. ret = true;
  699. break;
  700. }
  701. }
  702. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  703. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  704. return ret;
  705. }
  706. TEST_F(PerKeyPointLockManagerTest, LockStealAfterExpirationExclusive) {
  707. // There are multiple transactions waiting for the same lock.
  708. // txn1 acquires an exclusive lock on k1 successfully with a short expiration
  709. // time.
  710. // txn2 try to acquire an exclusive lock on k1, before expiration time,
  711. // so it is blocked and waits for txn1 lock expired.
  712. // txn3 try to acquire an exclusive lock on k1 after txn1 lock expires, FIFO
  713. // order is respected.
  714. // txn2 is woken up and takes the lock. unlock txn2, txn3 should proceed.
  715. txn_opt_.expiration = 1000;
  716. auto txn1 = NewTxn(txn_opt_);
  717. txn_opt_.expiration = -1;
  718. auto txn2 = NewTxn(txn_opt_);
  719. auto txn3 = NewTxn(txn_opt_);
  720. port::Thread t1;
  721. auto retry_times = 10;
  722. // Use a loop to reduce test flakiness.
  723. // that the test is flaky because the txn2 thread start could be delayed until
  724. // txn1 lock expired. In that case, txn2 will not enter into wait state, which
  725. // will defeat the test purpose. Use a loop to retry a few times, until it is
  726. // able to enter into wait state.
  727. while (retry_times--) {
  728. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  729. if (TryBlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
  730. // block because txn1 is holding a shared lock on k1.
  731. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  732. })) {
  733. break;
  734. }
  735. // failed, retry again
  736. locker_->UnLock(txn1, 1, "k1", env_);
  737. locker_->UnLock(txn2, 1, "k1", env_);
  738. }
  739. // make sure txn2 is able to reach the wait state before proceed
  740. ASSERT_GT(retry_times, 0);
  741. // txn3 try to acquire an exclusive lock on k1, FIFO order is respected.
  742. port::Thread t2;
  743. BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
  744. // block because txn1 is holding an exclusive lock on k1.
  745. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
  746. });
  747. // validate txn2 is woken up and takes the lock
  748. t1.join();
  749. // unlock txn2, txn3 should proceed
  750. locker_->UnLock(txn2, 1, "k1", env_);
  751. t2.join();
  752. // clean up
  753. locker_->UnLock(txn3, 1, "k1", env_);
  754. delete txn3;
  755. delete txn2;
  756. delete txn1;
  757. }
  758. TEST_F(PerKeyPointLockManagerTest, LockStealAfterExpirationShared) {
  759. // There are multiple transactions waiting for the same lock.
  760. // txn1 acquires a shared lock on k1 successfully with a short expiration
  761. // time.
  762. // txn2 try to acquire an exclusive lock on k1, before expiration time,
  763. // so it is blocked and waits for txn1 lock expired.
  764. // txn3 try to acquire a shared lock on k1 after txn1 lock expires, FIFO
  765. // order is respected.
  766. // txn2 is woken up and takes the lock. unlock txn2, txn3 should proceed.
  767. txn_opt_.expiration = 1000;
  768. auto txn1 = NewTxn(txn_opt_);
  769. txn_opt_.expiration = -1;
  770. auto txn2 = NewTxn(txn_opt_);
  771. auto txn3 = NewTxn(txn_opt_);
  772. port::Thread t1;
  773. auto retry_times = 10;
  774. // Use a loop to reduce test flakiness.
  775. // that the test is flaky because the txn2 thread start could be delayed until
  776. // txn1 lock expired. In that case, txn2 will not enter into wait state, which
  777. // will defeat the test purpose. Use a loop to retry a few times, until it is
  778. // able to enter into wait state.
  779. while (retry_times--) {
  780. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  781. if (TryBlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
  782. // block because txn1 is holding an exclusive lock on k1.
  783. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  784. })) {
  785. break;
  786. }
  787. // failed, retry again
  788. locker_->UnLock(txn1, 1, "k1", env_);
  789. locker_->UnLock(txn2, 1, "k1", env_);
  790. }
  791. // make sure txn2 is able to reach the wait state before proceed
  792. ASSERT_GT(retry_times, 0);
  793. // txn3 try to acquire an exclusive lock on k1, FIFO order is respected.
  794. port::Thread t2;
  795. BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
  796. // block because txn1 is holding an exclusive lock on k1.
  797. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
  798. });
  799. // validate txn2 is woken up and takes the lock
  800. t1.join();
  801. // unlock txn2, txn3 should proceed
  802. locker_->UnLock(txn2, 1, "k1", env_);
  803. t2.join();
  804. // clean up
  805. locker_->UnLock(txn3, 1, "k1", env_);
  806. delete txn3;
  807. delete txn2;
  808. delete txn1;
  809. }
  810. TEST_F(PerKeyPointLockManagerTest, DeadLockOnWaiter) {
  811. // Txn1 acquires exclusive lock on k1
  812. // Txn3 acquires shared lock on k2
  813. // Txn2 tries to acquire exclusive lock on k1, waiting in the waiter queue.
  814. // Txn3 tries to acquire exclusive lock on k1, waiting in the waiter queue.
  815. // Txn3 depends on both Txn1 and Txn2. Txn1 unlocks k1.
  816. // Txn2 takes the lock k1, and tries to acquire lock k2.
  817. // Now Txn2 depends on Txn3.
  818. // Deadlock is detected, and Txn2 is aborted.
  819. auto txn1 = NewTxn(txn_opt_);
  820. auto txn2 = NewTxn(txn_opt_);
  821. auto txn3 = NewTxn(txn_opt_);
  822. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  823. ASSERT_OK(locker_->TryLock(txn3, 1, "k2", env_, false));
  824. port::Thread t1;
  825. BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
  826. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  827. auto s = locker_->TryLock(txn2, 1, "k2", env_, true);
  828. ASSERT_TRUE(s.IsDeadlock());
  829. });
  830. port::Thread t2;
  831. BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
  832. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
  833. });
  834. locker_->UnLock(txn1, 1, "k1", env_);
  835. t1.join();
  836. locker_->UnLock(txn2, 1, "k1", env_);
  837. t2.join();
  838. // clean up
  839. locker_->UnLock(txn3, 1, "k1", env_);
  840. locker_->UnLock(txn3, 1, "k2", env_);
  841. delete txn3;
  842. delete txn2;
  843. delete txn1;
  844. }
  845. TEST_F(PerKeyPointLockManagerTest, SharedLockRaceCondition) {
  846. // Verify a shared lock race condition is handled properly.
  847. // When there are waiters in the queue, and all of them are shared waiters,
  848. // and no one has taken the lock and all of them just got woken up and not
  849. // yet taken the lock yet. A new shared lock request should be granted
  850. // directly, without wait in the queue. If it did, It would not be woken up
  851. // until the last shared lock is released.
  852. // Disable deadlock detection timeout to prevent test flakyness.
  853. deadlock_timeout_us = 0;
  854. auto txn1 = NewTxn(txn_opt_);
  855. auto txn2 = NewTxn(txn_opt_);
  856. auto txn3 = NewTxn(txn_opt_);
  857. SyncPoint::GetInstance()->DisableProcessing();
  858. SyncPoint::GetInstance()->LoadDependency(
  859. {{"PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
  860. "PerKeyPointLockManagerTest::SharedLockRaceCondition:"
  861. "BeforeNewSharedLockRequest"},
  862. {"PerKeyPointLockManagerTest::SharedLockRaceCondition:"
  863. "AfterNewSharedLockRequest",
  864. "PerKeyPointLockManager::AcquireWithTimeout:BeforeTakeLock"}});
  865. std::atomic<bool> reached(false);
  866. SyncPoint::GetInstance()->SetCallBack(
  867. wait_sync_point_name_,
  868. [&reached](void* /*arg*/) { reached.store(true); });
  869. SyncPoint::GetInstance()->EnableProcessing();
  870. // txn1 acquires an exclusive lock on k1, so that the following shared lock
  871. // request would be blocked
  872. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  873. // txn2 try to acquire a shared lock on k1, and get blocked
  874. auto t1 = port::Thread([this, &txn2]() {
  875. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  876. });
  877. while (!reached.load()) {
  878. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  879. }
  880. // unlock txn1, txn2 should be woken up, but txn2 stops on the sync point
  881. locker_->UnLock(txn1, 1, "k1", env_);
  882. // Use sync point to simulate the race condition.
  883. // txn3 tries to take the lock right after txn2 is woken up, but before it
  884. // takes the lock
  885. TEST_SYNC_POINT(
  886. "PerKeyPointLockManagerTest::SharedLockRaceCondition:"
  887. "BeforeNewSharedLockRequest");
  888. // txn3 try to acquire a shared lock on k1, and get granted immediately
  889. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
  890. TEST_SYNC_POINT(
  891. "PerKeyPointLockManagerTest::SharedLockRaceCondition:"
  892. "AfterNewSharedLockRequest");
  893. // validate txn2 is woken up and takes the lock
  894. t1.join();
  895. // cleanup
  896. locker_->UnLock(txn2, 1, "k1", env_);
  897. locker_->UnLock(txn3, 1, "k1", env_);
  898. delete txn3;
  899. delete txn2;
  900. delete txn1;
  901. }
  902. TEST_F(PerKeyPointLockManagerTest, UpgradeLockRaceCondition) {
  903. // Verify an upgrade lock race condition is handled properly.
  904. // When a key is locked in exlusive mode, shared lock waiters will be enqueued
  905. // as waiters.
  906. // When the exclusive lock holder release the lock. The shared lock waiters
  907. // are woken up to take the lock. At this point, when a new shared lock
  908. // requester comes in, it will take the lock directly without waiting or
  909. // queueing. This requester then immediately upgrade the lock to exclusive
  910. // lock. This request will be prioritized to the head of the queue.
  911. // Meantime, it should also depend on the shared lock waiters which are still
  912. // in the queue that are ready to take the lock. Later, when one of the reader
  913. // lock want to also upgrade its lock, it will detect a dead lock and abort.
  914. auto txn1 = NewTxn(txn_opt_);
  915. auto txn2 = NewTxn(txn_opt_);
  916. auto txn3 = NewTxn(txn_opt_);
  917. SyncPoint::GetInstance()->DisableProcessing();
  918. SyncPoint::GetInstance()->LoadDependency(
  919. {{"PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
  920. "PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
  921. "BeforeNewSharedLockRequest"},
  922. {"PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
  923. "AfterNewSharedLockRequest",
  924. "PerKeyPointLockManager::AcquireWithTimeout:BeforeTakeLock"}});
  925. std::atomic<bool> reached(false);
  926. SyncPoint::GetInstance()->SetCallBack(
  927. wait_sync_point_name_,
  928. [&reached](void* /*arg*/) { reached.store(true); });
  929. SyncPoint::GetInstance()->EnableProcessing();
  930. // txn1 acquires an exclusive lock on k1, so that the following shared lock
  931. // request would be blocked
  932. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  933. auto t1 = port::Thread([this, &txn2]() {
  934. // txn2 try to acquire a shared lock on k1, and get blocked
  935. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  936. });
  937. while (!reached.load()) {
  938. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  939. }
  940. // unlock txn1, txn2 should be woken up, but txn2 stops on the sync point
  941. locker_->UnLock(txn1, 1, "k1", env_);
  942. // Use sync point to simulate the race condition.
  943. // txn3 tries to take the lock right after txn2 is woken up, but before it
  944. // takes the lock
  945. TEST_SYNC_POINT(
  946. "PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
  947. "BeforeNewSharedLockRequest");
  948. // txn3 try to acquire a shared lock on k1, and get granted immediately
  949. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
  950. // txn3 try to upgrade its lock to exclusive lock and get blocked.
  951. reached = false;
  952. auto t2 = port::Thread([this, &txn3]() {
  953. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
  954. });
  955. while (!reached.load()) {
  956. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  957. }
  958. TEST_SYNC_POINT(
  959. "PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
  960. "AfterNewSharedLockRequest");
  961. // validate txn2 is woken up and takes the shared lock
  962. t1.join();
  963. // validate txn2 would get deadlock when it try to upgrade its lock to
  964. // exclusive
  965. auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
  966. ASSERT_TRUE(s.IsDeadlock());
  967. // cleanup
  968. locker_->UnLock(txn2, 1, "k1", env_);
  969. t2.join();
  970. locker_->UnLock(txn3, 1, "k1", env_);
  971. delete txn3;
  972. delete txn2;
  973. delete txn1;
  974. }
  975. TEST_P(SpotLockManagerTest, Catch22) {
  976. // Benchmark the overhead of one transaction depends on another in a circle
  977. // repeatedly
  978. MockColumnFamilyHandle cf(1);
  979. locker_->AddColumnFamily(&cf);
  980. TransactionOptions txn_opt;
  981. txn_opt.deadlock_detect = true;
  982. txn_opt.lock_timeout = kLongTxnTimeoutMs;
  983. txn_opt.expiration = kLongTxnTimeoutMs;
  984. auto txn1 = NewTxn(txn_opt);
  985. auto txn2 = NewTxn(txn_opt);
  986. // use a wait count to count the number of times the lock is waited inside
  987. // transaction lock
  988. std::atomic_int wait_count(0);
  989. SyncPoint::GetInstance()->DisableProcessing();
  990. if (GetParam().use_per_key_point_lock_manager &&
  991. GetParam().deadlock_timeout_us != 0) {
  992. // Use special sync point when deadlock timeout is enabled, so the test run
  993. // faster
  994. SyncPoint::GetInstance()->SetCallBack(
  995. "PerKeyPointLockManager::AcquireWithTimeout:"
  996. "WaitingTxnBeforeDeadLockDetection",
  997. [&wait_count](void* /*arg*/) { wait_count++; });
  998. } else {
  999. // PointLockManager
  1000. SyncPoint::GetInstance()->SetCallBack(
  1001. wait_sync_point_name_, [&wait_count](void* /*arg*/) { wait_count++; });
  1002. }
  1003. SyncPoint::GetInstance()->EnableProcessing();
  1004. // txn1 X lock
  1005. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  1006. std::mutex coordinator_mutex;
  1007. int iteration_count = 10000;
  1008. // txn1 try to lock X lock in a loop
  1009. auto t1 = port::Thread(
  1010. [this, &txn1, &wait_count, &coordinator_mutex, &iteration_count]() {
  1011. while (wait_count.load() < iteration_count) {
  1012. // spin wait until the other thread enters the lock waiter queue.
  1013. while (wait_count.load() % 2 == 0);
  1014. // unlock the lock, so that the other thread can acquire the lock
  1015. locker_->UnLock(txn1, 1, "k1", env_);
  1016. {
  1017. // Use the coordinator mutex to make sure the other thread has been
  1018. // waked up and acquired the lock, before this thread try to acquire
  1019. // the lock again.
  1020. std::scoped_lock<std::mutex> lock(coordinator_mutex);
  1021. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  1022. }
  1023. }
  1024. locker_->UnLock(txn1, 1, "k1", env_);
  1025. });
  1026. // txn2 try to lock X lock in a loop
  1027. auto t2 = port::Thread(
  1028. [this, &txn2, &wait_count, &coordinator_mutex, &iteration_count]() {
  1029. while (wait_count.load() < iteration_count) {
  1030. {
  1031. // Use the coordinator mutex to make sure the other thread has been
  1032. // waked up and acquired the lock, before this thread try to acquire
  1033. // the lock again.
  1034. std::scoped_lock<std::mutex> lock(coordinator_mutex);
  1035. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  1036. }
  1037. // spin wait until the other thread enters the lock waiter queue.
  1038. while (wait_count.load() % 2 == 1);
  1039. // unlock the lock, so that the other thread can acquire the lock
  1040. locker_->UnLock(txn2, 1, "k1", env_);
  1041. }
  1042. });
  1043. // clean up
  1044. t1.join();
  1045. t2.join();
  1046. delete txn2;
  1047. delete txn1;
  1048. }
  1049. TEST_F(PerKeyPointLockManagerTest, LockUpgradeOrdering) {
  1050. // When lock is upgraded, verify that it will only upgrade its lock after all
  1051. // the shared lock that are before the first exclusive lock in the lock wait
  1052. // queue.
  1053. auto txn1 = NewTxn(txn_opt_);
  1054. auto txn2 = NewTxn(txn_opt_);
  1055. auto txn3 = NewTxn(txn_opt_);
  1056. auto txn4 = NewTxn(txn_opt_);
  1057. std::mutex txn4_mutex;
  1058. std::unique_lock<std::mutex> txn4_lock(txn4_mutex);
  1059. std::atomic_bool txn4_waked_up(false);
  1060. std::atomic_int wait_count(0);
  1061. SyncPoint::GetInstance()->DisableProcessing();
  1062. SyncPoint::GetInstance()->SetCallBack(
  1063. wait_sync_point_name_, [&wait_count](void* /*arg*/) { wait_count++; });
  1064. SyncPoint::GetInstance()->SetCallBack(
  1065. "PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
  1066. [&txn4, &txn4_mutex, &txn4_waked_up](void* arg) {
  1067. auto transaction_id = *(static_cast<TransactionID*>(arg));
  1068. if (transaction_id == txn4->GetID()) {
  1069. txn4_waked_up.store(true);
  1070. {
  1071. // wait for txn4 mutex to be released, so that this thread will be
  1072. // blocked.
  1073. std::scoped_lock<std::mutex> lock(txn4_mutex);
  1074. }
  1075. }
  1076. });
  1077. SyncPoint::GetInstance()->EnableProcessing();
  1078. // Txn1 X lock
  1079. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  1080. // Txn2,3,4 try S lock
  1081. port::Thread t1([this, &txn2]() {
  1082. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  1083. });
  1084. port::Thread t2([this, &txn3]() {
  1085. ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
  1086. });
  1087. port::Thread t3([this, &txn4]() {
  1088. ASSERT_OK(locker_->TryLock(txn4, 1, "k1", env_, false));
  1089. });
  1090. // wait for all 3 transactions to enter wait state
  1091. while (wait_count.load() < 3) {
  1092. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  1093. }
  1094. // Txn1 unlock
  1095. locker_->UnLock(txn1, 1, "k1", env_);
  1096. // Txn2,3 take S lock
  1097. t1.join();
  1098. t2.join();
  1099. // wait for txn4 to be woken up, otherwise txn2 will get deadlock
  1100. while (!txn4_waked_up.load()) {
  1101. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  1102. }
  1103. // Txn2 try X lock
  1104. std::atomic_bool txn2_exclusive_lock_acquired(false);
  1105. port::Thread t4([this, &txn2, &txn2_exclusive_lock_acquired]() {
  1106. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
  1107. txn2_exclusive_lock_acquired.store(true);
  1108. });
  1109. // wait for txn2 to enter wait state
  1110. while (wait_count.load() < 4) {
  1111. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  1112. }
  1113. // Txn3 release S lock
  1114. locker_->UnLock(txn3, 1, "k1", env_);
  1115. // Validate Txn2 has not acquired the lock yet
  1116. ASSERT_FALSE(txn2_exclusive_lock_acquired.load());
  1117. // Txn4 take S lock
  1118. txn4_lock.unlock();
  1119. t3.join();
  1120. // Txn4 release S lock Txn2 upgraded to X lock Txn2
  1121. locker_->UnLock(txn4, 1, "k1", env_);
  1122. t4.join();
  1123. ASSERT_TRUE(txn2_exclusive_lock_acquired.load());
  1124. // release lock clean up
  1125. locker_->UnLock(txn2, 1, "k1", env_);
  1126. delete txn4;
  1127. delete txn3;
  1128. delete txn2;
  1129. delete txn1;
  1130. }
  1131. TEST_F(PerKeyPointLockManagerTest, LockDownGradeRaceCondition) {
  1132. // When a lock is downgraded, it should notify all the shared waiters in the
  1133. // queue to take the lock.
  1134. auto txn1 = NewTxn(txn_opt_);
  1135. auto txn2 = NewTxn(txn_opt_);
  1136. // Txn1 X lock
  1137. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
  1138. // Txn2 try S lock
  1139. port::Thread t1;
  1140. BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
  1141. ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
  1142. });
  1143. // Txn1 downgrade to S lock
  1144. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
  1145. // Txn2 take S lock
  1146. t1.join();
  1147. // clean up
  1148. locker_->UnLock(txn1, 1, "k1", env_);
  1149. locker_->UnLock(txn2, 1, "k1", env_);
  1150. delete txn2;
  1151. delete txn1;
  1152. }
  1153. // Run AnyLockManagerTest with PointLockManager
  1154. INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest,
  1155. ::testing::Values(nullptr));
  1156. // Run AnyLockManagerTest with PerKeyPointLockManager
  1157. template <int64_t N>
  1158. void PerKeyPointLockManagerTestSetup(PointLockManagerTest* self) {
  1159. self->init();
  1160. self->deadlock_timeout_us = N;
  1161. self->UsePerKeyPointLockManager();
  1162. }
  1163. INSTANTIATE_TEST_CASE_P(
  1164. PerLockPointLockManager, AnyLockManagerTest,
  1165. ::testing::Values(PerKeyPointLockManagerTestSetup<0>,
  1166. PerKeyPointLockManagerTestSetup<100>,
  1167. PerKeyPointLockManagerTestSetup<1000>));
  1168. // Run PointLockManagerTest with PerLockPointLockManager and PointLockManager
  1169. INSTANTIATE_TEST_CASE_P(
  1170. PointLockCorrectnessCheckTestSuite, SpotLockManagerTest,
  1171. ::testing::ValuesIn(std::vector<SpotLockManagerTestParam>{
  1172. {true, 0}, {true, 100}, {true, 1000}, {false, 0}}));
  1173. } // namespace ROCKSDB_NAMESPACE
  1174. int main(int argc, char** argv) {
  1175. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1176. ::testing::InitGoogleTest(&argc, argv);
  1177. return RUN_ALL_TESTS();
  1178. }