range_locking_test.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef OS_WIN
  6. #include <functional>
  7. #include <iomanip>
  8. #include <string>
  9. #include <thread>
  10. #include "db/db_impl/db_impl.h"
  11. #include "rocksdb/db.h"
  12. #include "rocksdb/options.h"
  13. #include "rocksdb/utilities/transaction.h"
  14. #include "rocksdb/utilities/transaction_db.h"
  15. #include "utilities/transactions/lock/point/any_lock_manager_test.h"
  16. #include "utilities/transactions/transaction_db_mutex_impl.h"
  17. using std::string;
  18. namespace ROCKSDB_NAMESPACE {
  19. class RangeLockingTest : public ::testing::Test {
  20. public:
  21. TransactionDB* db;
  22. std::string dbname;
  23. Options options;
  24. std::shared_ptr<RangeLockManagerHandle> range_lock_mgr;
  25. TransactionDBOptions txn_db_options;
  26. RangeLockingTest() : db(nullptr) {
  27. options.create_if_missing = true;
  28. dbname = test::PerThreadDBPath("range_locking_testdb");
  29. EXPECT_OK(DestroyDB(dbname, options));
  30. range_lock_mgr.reset(NewRangeLockManager(nullptr));
  31. txn_db_options.lock_mgr_handle = range_lock_mgr;
  32. auto s = TransactionDB::Open(options, txn_db_options, dbname, &db);
  33. assert(s.ok());
  34. }
  35. ~RangeLockingTest() {
  36. delete db;
  37. db = nullptr;
  38. // This is to skip the assert statement in FaultInjectionTestEnv. There
  39. // seems to be a bug in btrfs that the makes readdir return recently
  40. // unlink-ed files. By using the default fs we simply ignore errors resulted
  41. // from attempting to delete such files in DestroyDB.
  42. EXPECT_OK(DestroyDB(dbname, options));
  43. }
  44. PessimisticTransaction* NewTxn(
  45. TransactionOptions txn_opt = TransactionOptions()) {
  46. Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt);
  47. return static_cast<PessimisticTransaction*>(txn);
  48. }
  49. };
  50. // TODO: set a smaller lock wait timeout so that the test runs faster.
  51. TEST_F(RangeLockingTest, BasicRangeLocking) {
  52. WriteOptions write_options;
  53. TransactionOptions txn_options;
  54. std::string value;
  55. ReadOptions read_options;
  56. auto cf = db->DefaultColumnFamily();
  57. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  58. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  59. // Get a range lock
  60. ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
  61. // Check that range Lock inhibits an overlapping range lock
  62. {
  63. auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
  64. ASSERT_TRUE(s.IsTimedOut());
  65. }
  66. // Check that range Lock inhibits an overlapping point lock
  67. {
  68. auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value);
  69. ASSERT_TRUE(s.IsTimedOut());
  70. }
  71. // Get a point lock, check that it inhibits range locks
  72. ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value")));
  73. {
  74. auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p"));
  75. ASSERT_TRUE(s.IsTimedOut());
  76. }
  77. ASSERT_OK(txn0->Commit());
  78. txn1->Rollback();
  79. delete txn0;
  80. delete txn1;
  81. }
  82. TEST_F(RangeLockingTest, MyRocksLikeUpdate) {
  83. WriteOptions write_options;
  84. TransactionOptions txn_options;
  85. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  86. auto cf = db->DefaultColumnFamily();
  87. Status s;
  88. // Get a range lock for the range we are about to update
  89. ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
  90. bool try_range_lock_called = false;
  91. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  92. "RangeTreeLockManager::TryRangeLock:enter",
  93. [&](void* /*arg*/) { try_range_lock_called = true; });
  94. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  95. // For performance reasons, the following must NOT call lock_mgr->TryLock():
  96. // We verify that by checking the value of try_range_lock_called.
  97. ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"),
  98. /*assume_tracked=*/true));
  99. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  100. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  101. ASSERT_FALSE(try_range_lock_called);
  102. txn0->Rollback();
  103. delete txn0;
  104. }
  105. TEST_F(RangeLockingTest, UpgradeLockAndGetConflict) {
  106. WriteOptions write_options;
  107. TransactionOptions txn_options;
  108. auto cf = db->DefaultColumnFamily();
  109. Status s;
  110. std::string value;
  111. txn_options.lock_timeout = 10;
  112. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  113. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  114. // Get the shared lock in txn0
  115. s = txn0->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
  116. false /*exclusive*/);
  117. ASSERT_TRUE(s.IsNotFound());
  118. // Get the shared lock on the same key in txn1
  119. s = txn1->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
  120. false /*exclusive*/);
  121. ASSERT_TRUE(s.IsNotFound());
  122. // Now, try getting an exclusive lock that overlaps with the above
  123. s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("b"));
  124. ASSERT_TRUE(s.IsTimedOut());
  125. txn0->Rollback();
  126. txn1->Rollback();
  127. delete txn0;
  128. delete txn1;
  129. }
  130. TEST_F(RangeLockingTest, SnapshotValidation) {
  131. Status s;
  132. Slice key_slice = Slice("k");
  133. ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
  134. auto txn0 = NewTxn();
  135. txn0->Put(key_slice, Slice("initial"));
  136. txn0->Commit();
  137. // txn1
  138. auto txn1 = NewTxn();
  139. txn1->SetSnapshot();
  140. std::string val1;
  141. ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1));
  142. ASSERT_EQ(val1, "initial");
  143. val1 = val1 + std::string("-txn1");
  144. ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1)));
  145. // txn2
  146. auto txn2 = NewTxn();
  147. txn2->SetSnapshot();
  148. std::string val2;
  149. // This will see the original value as nothing is committed
  150. // This is also Get, so it is doesn't acquire any locks.
  151. ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2));
  152. ASSERT_EQ(val2, "initial");
  153. // txn1
  154. ASSERT_OK(txn1->Commit());
  155. // txn2
  156. val2 = val2 + std::string("-txn2");
  157. // Now, this call should do Snapshot Validation and fail:
  158. s = txn2->Put(cfh, key_slice, Slice(val2));
  159. ASSERT_TRUE(s.IsBusy());
  160. ASSERT_OK(txn2->Commit());
  161. delete txn0;
  162. delete txn1;
  163. delete txn2;
  164. }
  165. TEST_F(RangeLockingTest, MultipleTrxLockStatusData) {
  166. WriteOptions write_options;
  167. TransactionOptions txn_options;
  168. auto cf = db->DefaultColumnFamily();
  169. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  170. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  171. // Get a range lock
  172. ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")));
  173. ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")));
  174. auto s = range_lock_mgr->GetRangeLockStatusData();
  175. ASSERT_EQ(s.size(), 2);
  176. for (auto it = s.begin(); it != s.end(); ++it) {
  177. ASSERT_EQ(it->first, cf->GetID());
  178. auto val = it->second;
  179. ASSERT_FALSE(val.start.inf_suffix);
  180. ASSERT_FALSE(val.end.inf_suffix);
  181. ASSERT_TRUE(val.exclusive);
  182. ASSERT_EQ(val.ids.size(), 1);
  183. if (val.ids[0] == txn0->GetID()) {
  184. ASSERT_EQ(val.start.slice, "z");
  185. ASSERT_EQ(val.end.slice, "z");
  186. } else if (val.ids[0] == txn1->GetID()) {
  187. ASSERT_EQ(val.start.slice, "b");
  188. ASSERT_EQ(val.end.slice, "e");
  189. } else {
  190. FAIL(); // Unknown transaction ID.
  191. }
  192. }
  193. delete txn0;
  194. delete txn1;
  195. }
  196. #if defined(__has_feature)
  197. #if __has_feature(thread_sanitizer)
  198. #define SKIP_LOCK_ESCALATION_TEST 1
  199. #endif
  200. #else
  201. #define SKIP_LOCK_ESCALATION_TEST 1
  202. #endif
  203. #ifndef SKIP_LOCK_ESCALATION_TEST
  204. TEST_F(RangeLockingTest, BasicLockEscalation) {
  205. auto cf = db->DefaultColumnFamily();
  206. auto counters = range_lock_mgr->GetStatus();
  207. // Initially not using any lock memory
  208. ASSERT_EQ(counters.current_lock_memory, 0);
  209. ASSERT_EQ(counters.escalation_count, 0);
  210. ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000));
  211. // Insert until we see lock escalations
  212. auto txn = NewTxn();
  213. // Get the locks until we hit an escalation
  214. for (int i = 0; i < 2020; i++) {
  215. std::ostringstream buf;
  216. buf << std::setw(8) << std::setfill('0') << i;
  217. std::string buf_str = buf.str();
  218. ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
  219. }
  220. counters = range_lock_mgr->GetStatus();
  221. ASSERT_GT(counters.escalation_count, 0);
  222. ASSERT_LE(counters.current_lock_memory, 2000);
  223. delete txn;
  224. }
  225. // An escalation barrier function. Allow escalation iff the first two bytes are
  226. // identical.
  227. static bool escalation_barrier(const Endpoint& a, const Endpoint& b) {
  228. assert(a.slice.size() > 2);
  229. assert(b.slice.size() > 2);
  230. if (memcmp(a.slice.data(), b.slice.data(), 2)) {
  231. return true; // This is a barrier
  232. } else {
  233. return false; // No barrier
  234. }
  235. }
  236. TEST_F(RangeLockingTest, LockEscalationBarrier) {
  237. auto cf = db->DefaultColumnFamily();
  238. auto counters = range_lock_mgr->GetStatus();
  239. // Initially not using any lock memory
  240. ASSERT_EQ(counters.escalation_count, 0);
  241. range_lock_mgr->SetMaxLockMemory(8000);
  242. range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier);
  243. // Insert enough locks to cause lock escalations to happen
  244. auto txn = NewTxn();
  245. const int N = 2000;
  246. for (int i = 0; i < N; i++) {
  247. std::ostringstream buf;
  248. buf << std::setw(4) << std::setfill('0') << i;
  249. std::string buf_str = buf.str();
  250. ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
  251. }
  252. counters = range_lock_mgr->GetStatus();
  253. ASSERT_GT(counters.escalation_count, 0);
  254. // Check that lock escalation was not performed across escalation barriers:
  255. // Use another txn to acquire locks near the barriers.
  256. auto txn2 = NewTxn();
  257. range_lock_mgr->SetMaxLockMemory(500000);
  258. for (int i = 100; i < N; i += 100) {
  259. std::ostringstream buf;
  260. buf << std::setw(4) << std::setfill('0') << i - 1 << "-a";
  261. std::string buf_str = buf.str();
  262. // Check that we CAN get a lock near the escalation barrier
  263. ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
  264. }
  265. txn->Rollback();
  266. txn2->Rollback();
  267. delete txn;
  268. delete txn2;
  269. }
  270. #endif
  271. TEST_F(RangeLockingTest, LockWaitCount) {
  272. TransactionOptions txn_options;
  273. auto cf = db->DefaultColumnFamily();
  274. txn_options.lock_timeout = 50;
  275. Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
  276. Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
  277. // Get a range lock
  278. ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
  279. uint64_t lock_waits1 = range_lock_mgr->GetStatus().lock_wait_count;
  280. // Attempt to get a conflicting lock
  281. auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
  282. ASSERT_TRUE(s.IsTimedOut());
  283. // Check that the counter was incremented
  284. uint64_t lock_waits2 = range_lock_mgr->GetStatus().lock_wait_count;
  285. ASSERT_EQ(lock_waits1 + 1, lock_waits2);
  286. txn0->Rollback();
  287. txn1->Rollback();
  288. delete txn0;
  289. delete txn1;
  290. }
  291. TEST_F(RangeLockingTest, LockWaiteeAccess) {
  292. TransactionOptions txn_options;
  293. auto cf = db->DefaultColumnFamily();
  294. txn_options.lock_timeout = 60;
  295. Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
  296. Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
  297. // Get a range lock
  298. ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
  299. std::atomic<bool> reached(false);
  300. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  301. "RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) {
  302. reached.store(true);
  303. std::this_thread::sleep_for(std::chrono::milliseconds(2000));
  304. });
  305. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  306. port::Thread t([&]() {
  307. // Attempt to get a conflicting lock
  308. auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
  309. ASSERT_TRUE(s.ok());
  310. txn1->Rollback();
  311. });
  312. while (!reached.load()) {
  313. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  314. }
  315. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  316. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  317. // Release locks and free the transaction
  318. txn0->Rollback();
  319. delete txn0;
  320. t.join();
  321. delete txn1;
  322. }
  323. void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
  324. self->env_ = Env::Default();
  325. self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
  326. ASSERT_OK(self->env_->CreateDir(self->db_dir_));
  327. Options opt;
  328. opt.create_if_missing = true;
  329. TransactionDBOptions txn_opt;
  330. txn_opt.transaction_lock_timeout = 0;
  331. auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>();
  332. self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager());
  333. std::shared_ptr<RangeLockManagerHandle> range_lock_mgr =
  334. std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_);
  335. txn_opt.lock_mgr_handle = range_lock_mgr;
  336. ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_));
  337. self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn";
  338. }
  339. INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest,
  340. ::testing::Values(PointLockManagerTestExternalSetup));
  341. } // namespace ROCKSDB_NAMESPACE
  342. int main(int argc, char** argv) {
  343. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  344. ::testing::InitGoogleTest(&argc, argv);
  345. return RUN_ALL_TESTS();
  346. }
  347. #else // OS_WIN
  348. #include <stdio.h>
  349. int main(int /*argc*/, char** /*argv*/) {
  350. fprintf(stderr, "skipped as Range Locking is not supported on Windows\n");
  351. return 0;
  352. }
  353. #endif // OS_WIN