| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "memtable/inlineskiplist.h"
- #include <set>
- #include <unordered_set>
- #include "memory/concurrent_arena.h"
- #include "rocksdb/env.h"
- #include "test_util/testharness.h"
- #include "util/hash.h"
- #include "util/random.h"
- namespace ROCKSDB_NAMESPACE {
- // Our test skip list stores 8-byte unsigned integers
- typedef uint64_t Key;
- static const char* Encode(const uint64_t* key) {
- return reinterpret_cast<const char*>(key);
- }
- static Key Decode(const char* key) {
- Key rv;
- memcpy(&rv, key, sizeof(Key));
- return rv;
- }
- struct TestComparator {
- typedef Key DecodedType;
- static DecodedType decode_key(const char* b) {
- return Decode(b);
- }
- int operator()(const char* a, const char* b) const {
- if (Decode(a) < Decode(b)) {
- return -1;
- } else if (Decode(a) > Decode(b)) {
- return +1;
- } else {
- return 0;
- }
- }
- int operator()(const char* a, const DecodedType b) const {
- if (Decode(a) < b) {
- return -1;
- } else if (Decode(a) > b) {
- return +1;
- } else {
- return 0;
- }
- }
- };
- typedef InlineSkipList<TestComparator> TestInlineSkipList;
- class InlineSkipTest : public testing::Test {
- public:
- void Insert(TestInlineSkipList* list, Key key) {
- char* buf = list->AllocateKey(sizeof(Key));
- memcpy(buf, &key, sizeof(Key));
- list->Insert(buf);
- keys_.insert(key);
- }
- bool InsertWithHint(TestInlineSkipList* list, Key key, void** hint) {
- char* buf = list->AllocateKey(sizeof(Key));
- memcpy(buf, &key, sizeof(Key));
- bool res = list->InsertWithHint(buf, hint);
- keys_.insert(key);
- return res;
- }
- void Validate(TestInlineSkipList* list) {
- // Check keys exist.
- for (Key key : keys_) {
- ASSERT_TRUE(list->Contains(Encode(&key)));
- }
- // Iterate over the list, make sure keys appears in order and no extra
- // keys exist.
- TestInlineSkipList::Iterator iter(list);
- ASSERT_FALSE(iter.Valid());
- Key zero = 0;
- iter.Seek(Encode(&zero));
- for (Key key : keys_) {
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(key, Decode(iter.key()));
- iter.Next();
- }
- ASSERT_FALSE(iter.Valid());
- // Validate the list is well-formed.
- list->TEST_Validate();
- }
- private:
- std::set<Key> keys_;
- };
- TEST_F(InlineSkipTest, Empty) {
- Arena arena;
- TestComparator cmp;
- InlineSkipList<TestComparator> list(cmp, &arena);
- Key key = 10;
- ASSERT_TRUE(!list.Contains(Encode(&key)));
- InlineSkipList<TestComparator>::Iterator iter(&list);
- ASSERT_TRUE(!iter.Valid());
- iter.SeekToFirst();
- ASSERT_TRUE(!iter.Valid());
- key = 100;
- iter.Seek(Encode(&key));
- ASSERT_TRUE(!iter.Valid());
- iter.SeekForPrev(Encode(&key));
- ASSERT_TRUE(!iter.Valid());
- iter.SeekToLast();
- ASSERT_TRUE(!iter.Valid());
- }
- TEST_F(InlineSkipTest, InsertAndLookup) {
- const int N = 2000;
- const int R = 5000;
- Random rnd(1000);
- std::set<Key> keys;
- ConcurrentArena arena;
- TestComparator cmp;
- InlineSkipList<TestComparator> list(cmp, &arena);
- for (int i = 0; i < N; i++) {
- Key key = rnd.Next() % R;
- if (keys.insert(key).second) {
- char* buf = list.AllocateKey(sizeof(Key));
- memcpy(buf, &key, sizeof(Key));
- list.Insert(buf);
- }
- }
- for (Key i = 0; i < R; i++) {
- if (list.Contains(Encode(&i))) {
- ASSERT_EQ(keys.count(i), 1U);
- } else {
- ASSERT_EQ(keys.count(i), 0U);
- }
- }
- // Simple iterator tests
- {
- InlineSkipList<TestComparator>::Iterator iter(&list);
- ASSERT_TRUE(!iter.Valid());
- uint64_t zero = 0;
- iter.Seek(Encode(&zero));
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
- uint64_t max_key = R - 1;
- iter.SeekForPrev(Encode(&max_key));
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
- iter.SeekToFirst();
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
- iter.SeekToLast();
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
- }
- // Forward iteration test
- for (Key i = 0; i < R; i++) {
- InlineSkipList<TestComparator>::Iterator iter(&list);
- iter.Seek(Encode(&i));
- // Compare against model iterator
- std::set<Key>::iterator model_iter = keys.lower_bound(i);
- for (int j = 0; j < 3; j++) {
- if (model_iter == keys.end()) {
- ASSERT_TRUE(!iter.Valid());
- break;
- } else {
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*model_iter, Decode(iter.key()));
- ++model_iter;
- iter.Next();
- }
- }
- }
- // Backward iteration test
- for (Key i = 0; i < R; i++) {
- InlineSkipList<TestComparator>::Iterator iter(&list);
- iter.SeekForPrev(Encode(&i));
- // Compare against model iterator
- std::set<Key>::iterator model_iter = keys.upper_bound(i);
- for (int j = 0; j < 3; j++) {
- if (model_iter == keys.begin()) {
- ASSERT_TRUE(!iter.Valid());
- break;
- } else {
- ASSERT_TRUE(iter.Valid());
- ASSERT_EQ(*--model_iter, Decode(iter.key()));
- iter.Prev();
- }
- }
- }
- }
- TEST_F(InlineSkipTest, InsertWithHint_Sequential) {
- const int N = 100000;
- Arena arena;
- TestComparator cmp;
- TestInlineSkipList list(cmp, &arena);
- void* hint = nullptr;
- for (int i = 0; i < N; i++) {
- Key key = i;
- InsertWithHint(&list, key, &hint);
- }
- Validate(&list);
- }
- TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) {
- const int N = 100000;
- const int S = 100;
- Random rnd(534);
- Arena arena;
- TestComparator cmp;
- TestInlineSkipList list(cmp, &arena);
- void* hints[S];
- Key last_key[S];
- for (int i = 0; i < S; i++) {
- hints[i] = nullptr;
- last_key[i] = 0;
- }
- for (int i = 0; i < N; i++) {
- Key s = rnd.Uniform(S);
- Key key = (s << 32) + (++last_key[s]);
- InsertWithHint(&list, key, &hints[s]);
- }
- Validate(&list);
- }
- TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) {
- const int N = 100000;
- const int S = 100;
- Random rnd(534);
- Arena arena;
- TestComparator cmp;
- TestInlineSkipList list(cmp, &arena);
- void* hints[S];
- for (int i = 0; i < S; i++) {
- hints[i] = nullptr;
- }
- for (int i = 0; i < N; i++) {
- Key s = rnd.Uniform(S);
- Key key = (s << 32) + rnd.Next();
- InsertWithHint(&list, key, &hints[s]);
- }
- Validate(&list);
- }
- TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) {
- const int N = 100000;
- const int S1 = 100;
- const int S2 = 100;
- Random rnd(534);
- Arena arena;
- TestComparator cmp;
- TestInlineSkipList list(cmp, &arena);
- std::unordered_set<Key> used;
- Key with_hint[S1];
- Key without_hint[S2];
- void* hints[S1];
- for (int i = 0; i < S1; i++) {
- hints[i] = nullptr;
- while (true) {
- Key s = rnd.Next();
- if (used.insert(s).second) {
- with_hint[i] = s;
- break;
- }
- }
- }
- for (int i = 0; i < S2; i++) {
- while (true) {
- Key s = rnd.Next();
- if (used.insert(s).second) {
- without_hint[i] = s;
- break;
- }
- }
- }
- for (int i = 0; i < N; i++) {
- Key s = rnd.Uniform(S1 + S2);
- if (s < S1) {
- Key key = (with_hint[s] << 32) + rnd.Next();
- InsertWithHint(&list, key, &hints[s]);
- } else {
- Key key = (without_hint[s - S1] << 32) + rnd.Next();
- Insert(&list, key);
- }
- }
- Validate(&list);
- }
- #ifndef ROCKSDB_VALGRIND_RUN
- // We want to make sure that with a single writer and multiple
- // concurrent readers (with no synchronization other than when a
- // reader's iterator is created), the reader always observes all the
- // data that was present in the skip list when the iterator was
- // constructor. Because insertions are happening concurrently, we may
- // also observe new values that were inserted since the iterator was
- // constructed, but we should never miss any values that were present
- // at iterator construction time.
- //
- // We generate multi-part keys:
- // <key,gen,hash>
- // where:
- // key is in range [0..K-1]
- // gen is a generation number for key
- // hash is hash(key,gen)
- //
- // The insertion code picks a random key, sets gen to be 1 + the last
- // generation number inserted for that key, and sets hash to Hash(key,gen).
- //
- // At the beginning of a read, we snapshot the last inserted
- // generation number for each key. We then iterate, including random
- // calls to Next() and Seek(). For every key we encounter, we
- // check that it is either expected given the initial snapshot or has
- // been concurrently added since the iterator started.
- class ConcurrentTest {
- public:
- static const uint32_t K = 8;
- private:
- static uint64_t key(Key key) { return (key >> 40); }
- static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
- static uint64_t hash(Key key) { return key & 0xff; }
- static uint64_t HashNumbers(uint64_t k, uint64_t g) {
- uint64_t data[2] = {k, g};
- return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
- }
- static Key MakeKey(uint64_t k, uint64_t g) {
- assert(sizeof(Key) == sizeof(uint64_t));
- assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
- assert(g <= 0xffffffffu);
- return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
- }
- static bool IsValidKey(Key k) {
- return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
- }
- static Key RandomTarget(Random* rnd) {
- switch (rnd->Next() % 10) {
- case 0:
- // Seek to beginning
- return MakeKey(0, 0);
- case 1:
- // Seek to end
- return MakeKey(K, 0);
- default:
- // Seek to middle
- return MakeKey(rnd->Next() % K, 0);
- }
- }
- // Per-key generation
- struct State {
- std::atomic<int> generation[K];
- void Set(int k, int v) {
- generation[k].store(v, std::memory_order_release);
- }
- int Get(int k) { return generation[k].load(std::memory_order_acquire); }
- State() {
- for (unsigned int k = 0; k < K; k++) {
- Set(k, 0);
- }
- }
- };
- // Current state of the test
- State current_;
- ConcurrentArena arena_;
- // InlineSkipList is not protected by mu_. We just use a single writer
- // thread to modify it.
- InlineSkipList<TestComparator> list_;
- public:
- ConcurrentTest() : list_(TestComparator(), &arena_) {}
- // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep
- void WriteStep(Random* rnd) {
- const uint32_t k = rnd->Next() % K;
- const int g = current_.Get(k) + 1;
- const Key new_key = MakeKey(k, g);
- char* buf = list_.AllocateKey(sizeof(Key));
- memcpy(buf, &new_key, sizeof(Key));
- list_.Insert(buf);
- current_.Set(k, g);
- }
- // REQUIRES: No concurrent calls for the same k
- void ConcurrentWriteStep(uint32_t k, bool use_hint = false) {
- const int g = current_.Get(k) + 1;
- const Key new_key = MakeKey(k, g);
- char* buf = list_.AllocateKey(sizeof(Key));
- memcpy(buf, &new_key, sizeof(Key));
- if (use_hint) {
- void* hint = nullptr;
- list_.InsertWithHintConcurrently(buf, &hint);
- delete[] reinterpret_cast<char*>(hint);
- } else {
- list_.InsertConcurrently(buf);
- }
- ASSERT_EQ(g, current_.Get(k) + 1);
- current_.Set(k, g);
- }
- void ReadStep(Random* rnd) {
- // Remember the initial committed state of the skiplist.
- State initial_state;
- for (unsigned int k = 0; k < K; k++) {
- initial_state.Set(k, current_.Get(k));
- }
- Key pos = RandomTarget(rnd);
- InlineSkipList<TestComparator>::Iterator iter(&list_);
- iter.Seek(Encode(&pos));
- while (true) {
- Key current;
- if (!iter.Valid()) {
- current = MakeKey(K, 0);
- } else {
- current = Decode(iter.key());
- ASSERT_TRUE(IsValidKey(current)) << current;
- }
- ASSERT_LE(pos, current) << "should not go backwards";
- // Verify that everything in [pos,current) was not present in
- // initial_state.
- while (pos < current) {
- ASSERT_LT(key(pos), K) << pos;
- // Note that generation 0 is never inserted, so it is ok if
- // <*,0,*> is missing.
- ASSERT_TRUE((gen(pos) == 0U) ||
- (gen(pos) > static_cast<uint64_t>(initial_state.Get(
- static_cast<int>(key(pos))))))
- << "key: " << key(pos) << "; gen: " << gen(pos)
- << "; initgen: " << initial_state.Get(static_cast<int>(key(pos)));
- // Advance to next key in the valid key space
- if (key(pos) < key(current)) {
- pos = MakeKey(key(pos) + 1, 0);
- } else {
- pos = MakeKey(key(pos), gen(pos) + 1);
- }
- }
- if (!iter.Valid()) {
- break;
- }
- if (rnd->Next() % 2) {
- iter.Next();
- pos = MakeKey(key(pos), gen(pos) + 1);
- } else {
- Key new_target = RandomTarget(rnd);
- if (new_target > pos) {
- pos = new_target;
- iter.Seek(Encode(&new_target));
- }
- }
- }
- }
- };
- const uint32_t ConcurrentTest::K;
- // Simple test that does single-threaded testing of the ConcurrentTest
- // scaffolding.
- TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) {
- ConcurrentTest test;
- Random rnd(test::RandomSeed());
- for (int i = 0; i < 10000; i++) {
- test.ReadStep(&rnd);
- test.WriteStep(&rnd);
- }
- }
- TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
- ConcurrentTest test;
- Random rnd(test::RandomSeed());
- for (int i = 0; i < 10000; i++) {
- test.ReadStep(&rnd);
- uint32_t base = rnd.Next();
- for (int j = 0; j < 4; ++j) {
- test.ConcurrentWriteStep((base + j) % ConcurrentTest::K);
- }
- }
- }
- class TestState {
- public:
- ConcurrentTest t_;
- bool use_hint_;
- int seed_;
- std::atomic<bool> quit_flag_;
- std::atomic<uint32_t> next_writer_;
- enum ReaderState { STARTING, RUNNING, DONE };
- explicit TestState(int s)
- : seed_(s),
- quit_flag_(false),
- state_(STARTING),
- pending_writers_(0),
- state_cv_(&mu_) {}
- void Wait(ReaderState s) {
- mu_.Lock();
- while (state_ != s) {
- state_cv_.Wait();
- }
- mu_.Unlock();
- }
- void Change(ReaderState s) {
- mu_.Lock();
- state_ = s;
- state_cv_.Signal();
- mu_.Unlock();
- }
- void AdjustPendingWriters(int delta) {
- mu_.Lock();
- pending_writers_ += delta;
- if (pending_writers_ == 0) {
- state_cv_.Signal();
- }
- mu_.Unlock();
- }
- void WaitForPendingWriters() {
- mu_.Lock();
- while (pending_writers_ != 0) {
- state_cv_.Wait();
- }
- mu_.Unlock();
- }
- private:
- port::Mutex mu_;
- ReaderState state_;
- int pending_writers_;
- port::CondVar state_cv_;
- };
- static void ConcurrentReader(void* arg) {
- TestState* state = reinterpret_cast<TestState*>(arg);
- Random rnd(state->seed_);
- int64_t reads = 0;
- state->Change(TestState::RUNNING);
- while (!state->quit_flag_.load(std::memory_order_acquire)) {
- state->t_.ReadStep(&rnd);
- ++reads;
- }
- state->Change(TestState::DONE);
- }
- static void ConcurrentWriter(void* arg) {
- TestState* state = reinterpret_cast<TestState*>(arg);
- uint32_t k = state->next_writer_++ % ConcurrentTest::K;
- state->t_.ConcurrentWriteStep(k, state->use_hint_);
- state->AdjustPendingWriters(-1);
- }
- static void RunConcurrentRead(int run) {
- const int seed = test::RandomSeed() + (run * 100);
- Random rnd(seed);
- const int N = 1000;
- const int kSize = 1000;
- for (int i = 0; i < N; i++) {
- if ((i % 100) == 0) {
- fprintf(stderr, "Run %d of %d\n", i, N);
- }
- TestState state(seed + 1);
- Env::Default()->SetBackgroundThreads(1);
- Env::Default()->Schedule(ConcurrentReader, &state);
- state.Wait(TestState::RUNNING);
- for (int k = 0; k < kSize; ++k) {
- state.t_.WriteStep(&rnd);
- }
- state.quit_flag_.store(true, std::memory_order_release);
- state.Wait(TestState::DONE);
- }
- }
- static void RunConcurrentInsert(int run, bool use_hint = false,
- int write_parallelism = 4) {
- Env::Default()->SetBackgroundThreads(1 + write_parallelism,
- Env::Priority::LOW);
- const int seed = test::RandomSeed() + (run * 100);
- Random rnd(seed);
- const int N = 1000;
- const int kSize = 1000;
- for (int i = 0; i < N; i++) {
- if ((i % 100) == 0) {
- fprintf(stderr, "Run %d of %d\n", i, N);
- }
- TestState state(seed + 1);
- state.use_hint_ = use_hint;
- Env::Default()->Schedule(ConcurrentReader, &state);
- state.Wait(TestState::RUNNING);
- for (int k = 0; k < kSize; k += write_parallelism) {
- state.next_writer_ = rnd.Next();
- state.AdjustPendingWriters(write_parallelism);
- for (int p = 0; p < write_parallelism; ++p) {
- Env::Default()->Schedule(ConcurrentWriter, &state);
- }
- state.WaitForPendingWriters();
- }
- state.quit_flag_.store(true, std::memory_order_release);
- state.Wait(TestState::DONE);
- }
- }
- TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); }
- TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); }
- TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); }
- TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); }
- TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
- TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
- TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
- TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
- TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) {
- RunConcurrentInsert(1, true);
- }
- TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) {
- RunConcurrentInsert(2, true);
- }
- TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) {
- RunConcurrentInsert(3, true);
- }
- #endif // ROCKSDB_VALGRIND_RUN
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|