| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef OS_WIN
- #include <functional>
- #include <iomanip>
- #include <string>
- #include <thread>
- #include "db/db_impl/db_impl.h"
- #include "rocksdb/db.h"
- #include "rocksdb/options.h"
- #include "rocksdb/utilities/transaction.h"
- #include "rocksdb/utilities/transaction_db.h"
- #include "utilities/transactions/lock/point/any_lock_manager_test.h"
- #include "utilities/transactions/transaction_db_mutex_impl.h"
- using std::string;
- namespace ROCKSDB_NAMESPACE {
- class RangeLockingTest : public ::testing::Test {
- public:
- TransactionDB* db;
- std::string dbname;
- Options options;
- std::shared_ptr<RangeLockManagerHandle> range_lock_mgr;
- TransactionDBOptions txn_db_options;
- RangeLockingTest() : db(nullptr) {
- options.create_if_missing = true;
- dbname = test::PerThreadDBPath("range_locking_testdb");
- EXPECT_OK(DestroyDB(dbname, options));
- range_lock_mgr.reset(NewRangeLockManager(nullptr));
- txn_db_options.lock_mgr_handle = range_lock_mgr;
- auto s = TransactionDB::Open(options, txn_db_options, dbname, &db);
- assert(s.ok());
- }
- ~RangeLockingTest() {
- delete db;
- db = nullptr;
- // This is to skip the assert statement in FaultInjectionTestEnv. There
- // seems to be a bug in btrfs that the makes readdir return recently
- // unlink-ed files. By using the default fs we simply ignore errors resulted
- // from attempting to delete such files in DestroyDB.
- EXPECT_OK(DestroyDB(dbname, options));
- }
- PessimisticTransaction* NewTxn(
- TransactionOptions txn_opt = TransactionOptions()) {
- Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt);
- return static_cast<PessimisticTransaction*>(txn);
- }
- };
- // TODO: set a smaller lock wait timeout so that the test runs faster.
- TEST_F(RangeLockingTest, BasicRangeLocking) {
- WriteOptions write_options;
- TransactionOptions txn_options;
- std::string value;
- ReadOptions read_options;
- auto cf = db->DefaultColumnFamily();
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
- // Get a range lock
- ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
- // Check that range Lock inhibits an overlapping range lock
- {
- auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
- ASSERT_TRUE(s.IsTimedOut());
- }
- // Check that range Lock inhibits an overlapping point lock
- {
- auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value);
- ASSERT_TRUE(s.IsTimedOut());
- }
- // Get a point lock, check that it inhibits range locks
- ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value")));
- {
- auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p"));
- ASSERT_TRUE(s.IsTimedOut());
- }
- ASSERT_OK(txn0->Commit());
- txn1->Rollback();
- delete txn0;
- delete txn1;
- }
- TEST_F(RangeLockingTest, MyRocksLikeUpdate) {
- WriteOptions write_options;
- TransactionOptions txn_options;
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- auto cf = db->DefaultColumnFamily();
- Status s;
- // Get a range lock for the range we are about to update
- ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
- bool try_range_lock_called = false;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "RangeTreeLockManager::TryRangeLock:enter",
- [&](void* /*arg*/) { try_range_lock_called = true; });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // For performance reasons, the following must NOT call lock_mgr->TryLock():
- // We verify that by checking the value of try_range_lock_called.
- ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"),
- /*assume_tracked=*/true));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- ASSERT_FALSE(try_range_lock_called);
- txn0->Rollback();
- delete txn0;
- }
- TEST_F(RangeLockingTest, UpgradeLockAndGetConflict) {
- WriteOptions write_options;
- TransactionOptions txn_options;
- auto cf = db->DefaultColumnFamily();
- Status s;
- std::string value;
- txn_options.lock_timeout = 10;
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
- // Get the shared lock in txn0
- s = txn0->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
- false /*exclusive*/);
- ASSERT_TRUE(s.IsNotFound());
- // Get the shared lock on the same key in txn1
- s = txn1->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
- false /*exclusive*/);
- ASSERT_TRUE(s.IsNotFound());
- // Now, try getting an exclusive lock that overlaps with the above
- s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("b"));
- ASSERT_TRUE(s.IsTimedOut());
- txn0->Rollback();
- txn1->Rollback();
- delete txn0;
- delete txn1;
- }
- TEST_F(RangeLockingTest, SnapshotValidation) {
- Status s;
- Slice key_slice = Slice("k");
- ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
- auto txn0 = NewTxn();
- txn0->Put(key_slice, Slice("initial"));
- txn0->Commit();
- // txn1
- auto txn1 = NewTxn();
- txn1->SetSnapshot();
- std::string val1;
- ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1));
- ASSERT_EQ(val1, "initial");
- val1 = val1 + std::string("-txn1");
- ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1)));
- // txn2
- auto txn2 = NewTxn();
- txn2->SetSnapshot();
- std::string val2;
- // This will see the original value as nothing is committed
- // This is also Get, so it is doesn't acquire any locks.
- ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2));
- ASSERT_EQ(val2, "initial");
- // txn1
- ASSERT_OK(txn1->Commit());
- // txn2
- val2 = val2 + std::string("-txn2");
- // Now, this call should do Snapshot Validation and fail:
- s = txn2->Put(cfh, key_slice, Slice(val2));
- ASSERT_TRUE(s.IsBusy());
- ASSERT_OK(txn2->Commit());
- delete txn0;
- delete txn1;
- delete txn2;
- }
- TEST_F(RangeLockingTest, MultipleTrxLockStatusData) {
- WriteOptions write_options;
- TransactionOptions txn_options;
- auto cf = db->DefaultColumnFamily();
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
- // Get a range lock
- ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")));
- ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")));
- auto s = range_lock_mgr->GetRangeLockStatusData();
- ASSERT_EQ(s.size(), 2);
- for (auto it = s.begin(); it != s.end(); ++it) {
- ASSERT_EQ(it->first, cf->GetID());
- auto val = it->second;
- ASSERT_FALSE(val.start.inf_suffix);
- ASSERT_FALSE(val.end.inf_suffix);
- ASSERT_TRUE(val.exclusive);
- ASSERT_EQ(val.ids.size(), 1);
- if (val.ids[0] == txn0->GetID()) {
- ASSERT_EQ(val.start.slice, "z");
- ASSERT_EQ(val.end.slice, "z");
- } else if (val.ids[0] == txn1->GetID()) {
- ASSERT_EQ(val.start.slice, "b");
- ASSERT_EQ(val.end.slice, "e");
- } else {
- FAIL(); // Unknown transaction ID.
- }
- }
- delete txn0;
- delete txn1;
- }
- #if defined(__has_feature)
- #if __has_feature(thread_sanitizer)
- #define SKIP_LOCK_ESCALATION_TEST 1
- #endif
- #else
- #define SKIP_LOCK_ESCALATION_TEST 1
- #endif
- #ifndef SKIP_LOCK_ESCALATION_TEST
- TEST_F(RangeLockingTest, BasicLockEscalation) {
- auto cf = db->DefaultColumnFamily();
- auto counters = range_lock_mgr->GetStatus();
- // Initially not using any lock memory
- ASSERT_EQ(counters.current_lock_memory, 0);
- ASSERT_EQ(counters.escalation_count, 0);
- ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000));
- // Insert until we see lock escalations
- auto txn = NewTxn();
- // Get the locks until we hit an escalation
- for (int i = 0; i < 2020; i++) {
- std::ostringstream buf;
- buf << std::setw(8) << std::setfill('0') << i;
- std::string buf_str = buf.str();
- ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
- }
- counters = range_lock_mgr->GetStatus();
- ASSERT_GT(counters.escalation_count, 0);
- ASSERT_LE(counters.current_lock_memory, 2000);
- delete txn;
- }
- // An escalation barrier function. Allow escalation iff the first two bytes are
- // identical.
- static bool escalation_barrier(const Endpoint& a, const Endpoint& b) {
- assert(a.slice.size() > 2);
- assert(b.slice.size() > 2);
- if (memcmp(a.slice.data(), b.slice.data(), 2)) {
- return true; // This is a barrier
- } else {
- return false; // No barrier
- }
- }
- TEST_F(RangeLockingTest, LockEscalationBarrier) {
- auto cf = db->DefaultColumnFamily();
- auto counters = range_lock_mgr->GetStatus();
- // Initially not using any lock memory
- ASSERT_EQ(counters.escalation_count, 0);
- range_lock_mgr->SetMaxLockMemory(8000);
- range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier);
- // Insert enough locks to cause lock escalations to happen
- auto txn = NewTxn();
- const int N = 2000;
- for (int i = 0; i < N; i++) {
- std::ostringstream buf;
- buf << std::setw(4) << std::setfill('0') << i;
- std::string buf_str = buf.str();
- ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
- }
- counters = range_lock_mgr->GetStatus();
- ASSERT_GT(counters.escalation_count, 0);
- // Check that lock escalation was not performed across escalation barriers:
- // Use another txn to acquire locks near the barriers.
- auto txn2 = NewTxn();
- range_lock_mgr->SetMaxLockMemory(500000);
- for (int i = 100; i < N; i += 100) {
- std::ostringstream buf;
- buf << std::setw(4) << std::setfill('0') << i - 1 << "-a";
- std::string buf_str = buf.str();
- // Check that we CAN get a lock near the escalation barrier
- ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
- }
- txn->Rollback();
- txn2->Rollback();
- delete txn;
- delete txn2;
- }
- #endif
- TEST_F(RangeLockingTest, LockWaitCount) {
- TransactionOptions txn_options;
- auto cf = db->DefaultColumnFamily();
- txn_options.lock_timeout = 50;
- Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
- Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
- // Get a range lock
- ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
- uint64_t lock_waits1 = range_lock_mgr->GetStatus().lock_wait_count;
- // Attempt to get a conflicting lock
- auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
- ASSERT_TRUE(s.IsTimedOut());
- // Check that the counter was incremented
- uint64_t lock_waits2 = range_lock_mgr->GetStatus().lock_wait_count;
- ASSERT_EQ(lock_waits1 + 1, lock_waits2);
- txn0->Rollback();
- txn1->Rollback();
- delete txn0;
- delete txn1;
- }
- TEST_F(RangeLockingTest, LockWaiteeAccess) {
- TransactionOptions txn_options;
- auto cf = db->DefaultColumnFamily();
- txn_options.lock_timeout = 60;
- Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
- Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
- // Get a range lock
- ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
- std::atomic<bool> reached(false);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) {
- reached.store(true);
- std::this_thread::sleep_for(std::chrono::milliseconds(2000));
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- port::Thread t([&]() {
- // Attempt to get a conflicting lock
- auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
- ASSERT_TRUE(s.ok());
- txn1->Rollback();
- });
- while (!reached.load()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- // Release locks and free the transaction
- txn0->Rollback();
- delete txn0;
- t.join();
- delete txn1;
- }
- void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
- self->env_ = Env::Default();
- self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
- ASSERT_OK(self->env_->CreateDir(self->db_dir_));
- Options opt;
- opt.create_if_missing = true;
- TransactionDBOptions txn_opt;
- txn_opt.transaction_lock_timeout = 0;
- auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>();
- self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager());
- std::shared_ptr<RangeLockManagerHandle> range_lock_mgr =
- std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_);
- txn_opt.lock_mgr_handle = range_lock_mgr;
- ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_));
- self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn";
- }
- INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest,
- ::testing::Values(PointLockManagerTestExternalSetup));
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
- #else // OS_WIN
- #include <stdio.h>
- int main(int /*argc*/, char** /*argv*/) {
- fprintf(stderr, "skipped as Range Locking is not supported on Windows\n");
- return 0;
- }
- #endif // OS_WIN
|