db_memtable_test.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <memory>
  6. #include <string>
  7. #include "db/db_test_util.h"
  8. #include "db/memtable.h"
  9. #include "db/range_del_aggregator.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/memtablerep.h"
  12. #include "rocksdb/slice_transform.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. class DBMemTableTest : public DBTestBase {
  15. public:
  16. DBMemTableTest() : DBTestBase("db_memtable_test", /*env_do_fsync=*/true) {}
  17. };
  18. class MockMemTableRep : public MemTableRep {
  19. public:
  20. explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
  21. : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
  22. KeyHandle Allocate(const size_t len, char** buf) override {
  23. return rep_->Allocate(len, buf);
  24. }
  25. void Insert(KeyHandle handle) override { rep_->Insert(handle); }
  26. void InsertWithHint(KeyHandle handle, void** hint) override {
  27. num_insert_with_hint_++;
  28. EXPECT_NE(nullptr, hint);
  29. last_hint_in_ = *hint;
  30. rep_->InsertWithHint(handle, hint);
  31. last_hint_out_ = *hint;
  32. }
  33. bool Contains(const char* key) const override { return rep_->Contains(key); }
  34. void Get(const LookupKey& k, void* callback_args,
  35. bool (*callback_func)(void* arg, const char* entry)) override {
  36. rep_->Get(k, callback_args, callback_func);
  37. }
  38. size_t ApproximateMemoryUsage() override {
  39. return rep_->ApproximateMemoryUsage();
  40. }
  41. Iterator* GetIterator(Arena* arena) override {
  42. return rep_->GetIterator(arena);
  43. }
  44. void* last_hint_in() { return last_hint_in_; }
  45. void* last_hint_out() { return last_hint_out_; }
  46. int num_insert_with_hint() { return num_insert_with_hint_; }
  47. private:
  48. std::unique_ptr<MemTableRep> rep_;
  49. void* last_hint_in_;
  50. void* last_hint_out_;
  51. int num_insert_with_hint_;
  52. };
  53. class MockMemTableRepFactory : public MemTableRepFactory {
  54. public:
  55. MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
  56. Allocator* allocator,
  57. const SliceTransform* transform,
  58. Logger* logger) override {
  59. SkipListFactory factory;
  60. MemTableRep* skiplist_rep =
  61. factory.CreateMemTableRep(cmp, allocator, transform, logger);
  62. mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
  63. return mock_rep_;
  64. }
  65. MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
  66. Allocator* allocator,
  67. const SliceTransform* transform,
  68. Logger* logger,
  69. uint32_t column_family_id) override {
  70. last_column_family_id_ = column_family_id;
  71. return CreateMemTableRep(cmp, allocator, transform, logger);
  72. }
  73. const char* Name() const override { return "MockMemTableRepFactory"; }
  74. MockMemTableRep* rep() { return mock_rep_; }
  75. bool IsInsertConcurrentlySupported() const override { return false; }
  76. uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
  77. private:
  78. MockMemTableRep* mock_rep_;
  79. // workaround since there's no std::numeric_limits<uint32_t>::max() yet.
  80. uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
  81. };
  82. class TestPrefixExtractor : public SliceTransform {
  83. public:
  84. const char* Name() const override { return "TestPrefixExtractor"; }
  85. Slice Transform(const Slice& key) const override {
  86. const char* p = separator(key);
  87. if (p == nullptr) {
  88. return Slice();
  89. }
  90. return Slice(key.data(), p - key.data() + 1);
  91. }
  92. bool InDomain(const Slice& key) const override {
  93. return separator(key) != nullptr;
  94. }
  95. bool InRange(const Slice& /*key*/) const override { return false; }
  96. private:
  97. const char* separator(const Slice& key) const {
  98. return static_cast<const char*>(memchr(key.data(), '_', key.size()));
  99. }
  100. };
  101. // Test that ::Add properly returns false when inserting duplicate keys
  102. TEST_F(DBMemTableTest, DuplicateSeq) {
  103. SequenceNumber seq = 123;
  104. std::string value;
  105. MergeContext merge_context;
  106. Options options;
  107. InternalKeyComparator ikey_cmp(options.comparator);
  108. ReadRangeDelAggregator range_del_agg(&ikey_cmp,
  109. kMaxSequenceNumber /* upper_bound */);
  110. // Create a MemTable
  111. InternalKeyComparator cmp(BytewiseComparator());
  112. auto factory = std::make_shared<SkipListFactory>();
  113. options.memtable_factory = factory;
  114. ImmutableOptions ioptions(options);
  115. WriteBufferManager wb(options.db_write_buffer_size);
  116. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  117. kMaxSequenceNumber, 0 /* column_family_id */);
  118. // Write some keys and make sure it returns false on duplicates
  119. ASSERT_OK(
  120. mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */));
  121. ASSERT_TRUE(
  122. mem->Add(seq, kTypeValue, "key", "value2", nullptr /* kv_prot_info */)
  123. .IsTryAgain());
  124. // Changing the type should still cause the duplicatae key
  125. ASSERT_TRUE(
  126. mem->Add(seq, kTypeMerge, "key", "value2", nullptr /* kv_prot_info */)
  127. .IsTryAgain());
  128. // Changing the seq number will make the key fresh
  129. ASSERT_OK(mem->Add(seq + 1, kTypeMerge, "key", "value2",
  130. nullptr /* kv_prot_info */));
  131. // Test with different types for duplicate keys
  132. ASSERT_TRUE(
  133. mem->Add(seq, kTypeDeletion, "key", "", nullptr /* kv_prot_info */)
  134. .IsTryAgain());
  135. ASSERT_TRUE(
  136. mem->Add(seq, kTypeSingleDeletion, "key", "", nullptr /* kv_prot_info */)
  137. .IsTryAgain());
  138. // Test the duplicate keys under stress
  139. for (int i = 0; i < 10000; i++) {
  140. bool insert_dup = i % 10 == 1;
  141. if (!insert_dup) {
  142. seq++;
  143. }
  144. Status s = mem->Add(seq, kTypeValue, "foo", "value" + std::to_string(seq),
  145. nullptr /* kv_prot_info */);
  146. if (insert_dup) {
  147. ASSERT_TRUE(s.IsTryAgain());
  148. } else {
  149. ASSERT_OK(s);
  150. }
  151. }
  152. delete mem;
  153. // Test with InsertWithHint
  154. options.memtable_insert_with_hint_prefix_extractor.reset(
  155. new TestPrefixExtractor()); // which uses _ to extract the prefix
  156. ioptions = ImmutableOptions(options);
  157. mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  158. kMaxSequenceNumber, 0 /* column_family_id */);
  159. // Insert a duplicate key with _ in it
  160. ASSERT_OK(
  161. mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */));
  162. ASSERT_TRUE(
  163. mem->Add(seq, kTypeValue, "key_1", "value", nullptr /* kv_prot_info */)
  164. .IsTryAgain());
  165. delete mem;
  166. // Test when InsertConcurrently will be invoked
  167. options.allow_concurrent_memtable_write = true;
  168. ioptions = ImmutableOptions(options);
  169. mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  170. kMaxSequenceNumber, 0 /* column_family_id */);
  171. MemTablePostProcessInfo post_process_info;
  172. ASSERT_OK(mem->Add(seq, kTypeValue, "key", "value",
  173. nullptr /* kv_prot_info */, true, &post_process_info));
  174. ASSERT_TRUE(mem->Add(seq, kTypeValue, "key", "value",
  175. nullptr /* kv_prot_info */, true, &post_process_info)
  176. .IsTryAgain());
  177. delete mem;
  178. }
  179. // A simple test to verify that the concurrent merge writes is functional
  180. TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
  181. int num_ops = 1000;
  182. std::string value;
  183. MergeContext merge_context;
  184. Options options;
  185. // A merge operator that is not sensitive to concurrent writes since in this
  186. // test we don't order the writes.
  187. options.merge_operator = MergeOperators::CreateUInt64AddOperator();
  188. // Create a MemTable
  189. InternalKeyComparator cmp(BytewiseComparator());
  190. auto factory = std::make_shared<SkipListFactory>();
  191. options.memtable_factory = factory;
  192. options.allow_concurrent_memtable_write = true;
  193. ImmutableOptions ioptions(options);
  194. WriteBufferManager wb(options.db_write_buffer_size);
  195. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  196. kMaxSequenceNumber, 0 /* column_family_id */);
  197. // Put 0 as the base
  198. PutFixed64(&value, static_cast<uint64_t>(0));
  199. ASSERT_OK(mem->Add(0, kTypeValue, "key", value, nullptr /* kv_prot_info */));
  200. value.clear();
  201. // Write Merge concurrently
  202. ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {
  203. MemTablePostProcessInfo post_process_info1;
  204. std::string v1;
  205. for (int seq = 1; seq < num_ops / 2; seq++) {
  206. PutFixed64(&v1, seq);
  207. ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v1, nullptr /* kv_prot_info */,
  208. true, &post_process_info1));
  209. v1.clear();
  210. }
  211. });
  212. ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {
  213. MemTablePostProcessInfo post_process_info2;
  214. std::string v2;
  215. for (int seq = num_ops / 2; seq < num_ops; seq++) {
  216. PutFixed64(&v2, seq);
  217. ASSERT_OK(mem->Add(seq, kTypeMerge, "key", v2, nullptr /* kv_prot_info */,
  218. true, &post_process_info2));
  219. v2.clear();
  220. }
  221. });
  222. write_thread1.join();
  223. write_thread2.join();
  224. Status status;
  225. ReadOptions roptions;
  226. SequenceNumber max_covering_tombstone_seq = 0;
  227. LookupKey lkey("key", kMaxSequenceNumber);
  228. bool res = mem->Get(lkey, &value, /*columns=*/nullptr, /*timestamp=*/nullptr,
  229. &status, &merge_context, &max_covering_tombstone_seq,
  230. roptions, false /* immutable_memtable */);
  231. ASSERT_OK(status);
  232. ASSERT_TRUE(res);
  233. uint64_t ivalue = DecodeFixed64(Slice(value).data());
  234. uint64_t sum = 0;
  235. for (int seq = 0; seq < num_ops; seq++) {
  236. sum += seq;
  237. }
  238. ASSERT_EQ(ivalue, sum);
  239. delete mem;
  240. }
  241. TEST_F(DBMemTableTest, InsertWithHint) {
  242. Options options;
  243. options.allow_concurrent_memtable_write = false;
  244. options.create_if_missing = true;
  245. options.memtable_factory.reset(new MockMemTableRepFactory());
  246. options.memtable_insert_with_hint_prefix_extractor.reset(
  247. new TestPrefixExtractor());
  248. options.env = env_;
  249. Reopen(options);
  250. MockMemTableRep* rep =
  251. static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
  252. ->rep();
  253. ASSERT_OK(Put("foo_k1", "foo_v1"));
  254. ASSERT_EQ(nullptr, rep->last_hint_in());
  255. void* hint_foo = rep->last_hint_out();
  256. ASSERT_OK(Put("foo_k2", "foo_v2"));
  257. ASSERT_EQ(hint_foo, rep->last_hint_in());
  258. ASSERT_EQ(hint_foo, rep->last_hint_out());
  259. ASSERT_OK(Put("foo_k3", "foo_v3"));
  260. ASSERT_EQ(hint_foo, rep->last_hint_in());
  261. ASSERT_EQ(hint_foo, rep->last_hint_out());
  262. ASSERT_OK(Put("bar_k1", "bar_v1"));
  263. ASSERT_EQ(nullptr, rep->last_hint_in());
  264. void* hint_bar = rep->last_hint_out();
  265. ASSERT_NE(hint_foo, hint_bar);
  266. ASSERT_OK(Put("bar_k2", "bar_v2"));
  267. ASSERT_EQ(hint_bar, rep->last_hint_in());
  268. ASSERT_EQ(hint_bar, rep->last_hint_out());
  269. ASSERT_EQ(5, rep->num_insert_with_hint());
  270. ASSERT_OK(Put("NotInPrefixDomain", "vvv"));
  271. ASSERT_EQ(5, rep->num_insert_with_hint());
  272. ASSERT_EQ("foo_v1", Get("foo_k1"));
  273. ASSERT_EQ("foo_v2", Get("foo_k2"));
  274. ASSERT_EQ("foo_v3", Get("foo_k3"));
  275. ASSERT_EQ("bar_v1", Get("bar_k1"));
  276. ASSERT_EQ("bar_v2", Get("bar_k2"));
  277. ASSERT_OK(db_->DeleteRange(WriteOptions(), "foo_k1", "foo_k4"));
  278. ASSERT_EQ(hint_bar, rep->last_hint_in());
  279. ASSERT_EQ(hint_bar, rep->last_hint_out());
  280. ASSERT_EQ(5, rep->num_insert_with_hint());
  281. ASSERT_EQ("vvv", Get("NotInPrefixDomain"));
  282. }
  283. TEST_F(DBMemTableTest, ColumnFamilyId) {
  284. // Verifies MemTableRepFactory is told the right column family id.
  285. Options options;
  286. options.env = CurrentOptions().env;
  287. options.allow_concurrent_memtable_write = false;
  288. options.create_if_missing = true;
  289. options.memtable_factory.reset(new MockMemTableRepFactory());
  290. DestroyAndReopen(options);
  291. CreateAndReopenWithCF({"pikachu"}, options);
  292. for (uint32_t cf = 0; cf < 2; ++cf) {
  293. ASSERT_OK(Put(cf, "key", "val"));
  294. ASSERT_OK(Flush(cf));
  295. ASSERT_EQ(
  296. cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
  297. ->GetLastColumnFamilyId());
  298. }
  299. }
  300. class DBMemTableTestForSeek : public DBMemTableTest,
  301. virtual public ::testing::WithParamInterface<
  302. std::tuple<bool, bool, bool>> {};
  303. TEST_P(DBMemTableTestForSeek, IntegrityChecks) {
  304. // Validate key corruption could be detected during seek.
  305. // We insert many keys into skiplist. Then we corrupt the each key one at a
  306. // time. With memtable_veirfy_per_key_checksum_on_seek enabled, when the
  307. // corrupted key is searched, the checksum of every key visited during the
  308. // seek is validated. It will report data corruption. Otherwise seek returns
  309. // not found.
  310. auto allow_data_in_error = std::get<0>(GetParam());
  311. Options options = CurrentOptions();
  312. options.allow_data_in_errors = allow_data_in_error;
  313. options.paranoid_memory_checks = std::get<1>(GetParam());
  314. options.memtable_veirfy_per_key_checksum_on_seek = std::get<2>(GetParam());
  315. options.memtable_protection_bytes_per_key = 8;
  316. DestroyAndReopen(options);
  317. // capture the data pointer of all of the keys
  318. std::vector<char*> raw_data_pointer;
  319. // Insert enough keys, so memtable would create multiple levels.
  320. auto key_count = 100;
  321. for (int i = 0; i < key_count; i++) {
  322. // The last digit of the key will be corrupted from value 0 to value 5
  323. ASSERT_OK(Put(Key(i * 10), "val0"));
  324. }
  325. ReadOptions rops;
  326. // Iterate all the keys to get key pointers
  327. SyncPoint::GetInstance()->DisableProcessing();
  328. SyncPoint::GetInstance()->SetCallBack("InlineSkipList::Iterator::Next::key",
  329. [&raw_data_pointer](void* key) {
  330. auto p = static_cast<char*>(key);
  331. raw_data_pointer.push_back(p);
  332. });
  333. SyncPoint::GetInstance()->EnableProcessing();
  334. {
  335. std::unique_ptr<Iterator> iter{db_->NewIterator(rops)};
  336. iter->Seek(Key(0));
  337. while (iter->Valid()) {
  338. ASSERT_OK(iter->status());
  339. iter->Next();
  340. }
  341. // check status after valid returned false.
  342. auto status = iter->status();
  343. ASSERT_TRUE(status.ok());
  344. }
  345. SyncPoint::GetInstance()->DisableProcessing();
  346. SyncPoint::GetInstance()->ClearAllCallBacks();
  347. ASSERT_EQ(raw_data_pointer.size(), key_count);
  348. bool enable_key_validation_on_seek =
  349. options.memtable_veirfy_per_key_checksum_on_seek;
  350. // For each key, corrupt it, validate corruption is detected correctly, then
  351. // revert it.
  352. for (int i = 0; i < key_count; i++) {
  353. std::string key_to_corrupt = Key(i * 10);
  354. raw_data_pointer[i][key_to_corrupt.size()] = '5';
  355. auto corrupted_key = key_to_corrupt;
  356. corrupted_key.data()[key_to_corrupt.size() - 1] = '5';
  357. auto corrupted_key_slice =
  358. Slice(corrupted_key.data(), corrupted_key.length());
  359. auto corrupted_key_hex = corrupted_key_slice.ToString(/*hex=*/true);
  360. {
  361. // Test Get API
  362. std::string val;
  363. auto status = db_->Get(rops, key_to_corrupt, &val);
  364. if (enable_key_validation_on_seek) {
  365. ASSERT_TRUE(status.IsCorruption()) << key_to_corrupt;
  366. ASSERT_EQ(
  367. status.ToString().find(corrupted_key_hex) != std::string::npos,
  368. allow_data_in_error)
  369. << status.ToString() << "\n"
  370. << corrupted_key_hex;
  371. } else {
  372. ASSERT_TRUE(status.IsNotFound());
  373. }
  374. }
  375. {
  376. // Test MultiGet API
  377. std::vector<std::string> vals;
  378. std::vector<Status> statuses = db_->MultiGet(
  379. rops, {db_->DefaultColumnFamily()}, {key_to_corrupt}, &vals, nullptr);
  380. if (enable_key_validation_on_seek) {
  381. ASSERT_TRUE(statuses[0].IsCorruption());
  382. ASSERT_EQ(
  383. statuses[0].ToString().find(corrupted_key_hex) != std::string::npos,
  384. allow_data_in_error);
  385. } else {
  386. ASSERT_TRUE(statuses[0].IsNotFound());
  387. }
  388. }
  389. {
  390. // Test Iterator Seek API
  391. std::unique_ptr<Iterator> iter{db_->NewIterator(rops)};
  392. ASSERT_OK(iter->status());
  393. iter->Seek(key_to_corrupt);
  394. auto status = iter->status();
  395. if (enable_key_validation_on_seek) {
  396. ASSERT_TRUE(status.IsCorruption());
  397. ASSERT_EQ(
  398. status.ToString().find(corrupted_key_hex) != std::string::npos,
  399. allow_data_in_error);
  400. } else {
  401. ASSERT_FALSE(iter->Valid());
  402. ASSERT_FALSE(status.ok());
  403. }
  404. }
  405. // revert the key corruption.
  406. raw_data_pointer[i][key_to_corrupt.size()] = '0';
  407. }
  408. }
  409. INSTANTIATE_TEST_CASE_P(DBMemTableTestForSeek, DBMemTableTestForSeek,
  410. ::testing::Combine(::testing::Bool(), ::testing::Bool(),
  411. ::testing::Bool()));
  412. TEST_F(DBMemTableTest, IntegrityChecks) {
  413. // We insert keys key000000, key000001 and key000002 into skiplist at fixed
  414. // height 1 (smallest height). Then we corrupt the second key to aey000001 to
  415. // make it smaller. With `paranoid_memory_checks` set to true, if the
  416. // skip list sees key000000 and then aey000001, then it will report out of
  417. // order keys with corruption status. With `paranoid_memory_checks` set
  418. // to false, read/scan may return wrong results.
  419. for (bool allow_data_in_error : {false, true}) {
  420. Options options = CurrentOptions();
  421. options.allow_data_in_errors = allow_data_in_error;
  422. options.paranoid_memory_checks = true;
  423. DestroyAndReopen(options);
  424. SyncPoint::GetInstance()->SetCallBack(
  425. "InlineSkipList::RandomHeight::height", [](void* h) {
  426. auto height_ptr = static_cast<int*>(h);
  427. *height_ptr = 1;
  428. });
  429. SyncPoint::GetInstance()->EnableProcessing();
  430. ASSERT_OK(Put(Key(0), "val0"));
  431. ASSERT_OK(Put(Key(2), "val2"));
  432. // p will point to the buffer for encoded key000001
  433. char* p = nullptr;
  434. SyncPoint::GetInstance()->SetCallBack(
  435. "MemTable::Add:BeforeReturn:Encoded", [&](void* encoded) {
  436. p = const_cast<char*>(static_cast<Slice*>(encoded)->data());
  437. });
  438. ASSERT_OK(Put(Key(1), "val1"));
  439. SyncPoint::GetInstance()->DisableProcessing();
  440. SyncPoint::GetInstance()->ClearAllCallBacks();
  441. ASSERT_TRUE(p);
  442. // Offset 0 is key size, key bytes start at offset 1.
  443. // "key000001 -> aey000001"
  444. p[1] = 'a';
  445. ReadOptions rops;
  446. std::string val;
  447. Status s = db_->Get(rops, Key(1), &val);
  448. ASSERT_TRUE(s.IsCorruption());
  449. std::string key0 = Slice(Key(0)).ToString(true);
  450. ASSERT_EQ(s.ToString().find(key0) != std::string::npos,
  451. allow_data_in_error);
  452. // Without `paranoid_memory_checks`, NotFound will be returned.
  453. // This would fail an assertion in InlineSkipList::FindGreaterOrEqual().
  454. // If we remove the assertion, this passes.
  455. // ASSERT_TRUE(db_->Get(ReadOptions(), Key(1), &val).IsNotFound());
  456. std::vector<std::string> vals;
  457. std::vector<Status> statuses = db_->MultiGet(
  458. rops, {db_->DefaultColumnFamily()}, {Key(1)}, &vals, nullptr);
  459. ASSERT_TRUE(statuses[0].IsCorruption());
  460. ASSERT_EQ(statuses[0].ToString().find(key0) != std::string::npos,
  461. allow_data_in_error);
  462. std::unique_ptr<Iterator> iter{db_->NewIterator(rops)};
  463. ASSERT_OK(iter->status());
  464. iter->Seek(Key(1));
  465. ASSERT_TRUE(iter->status().IsCorruption());
  466. ASSERT_EQ(iter->status().ToString().find(key0) != std::string::npos,
  467. allow_data_in_error);
  468. iter->Seek(Key(0));
  469. ASSERT_TRUE(iter->Valid());
  470. ASSERT_OK(iter->status());
  471. // iterating through skip list at height at 1 should catch out-of-order keys
  472. iter->Next();
  473. ASSERT_TRUE(iter->status().IsCorruption());
  474. ASSERT_EQ(iter->status().ToString().find(key0) != std::string::npos,
  475. allow_data_in_error);
  476. ASSERT_FALSE(iter->Valid());
  477. iter->SeekForPrev(Key(2));
  478. ASSERT_TRUE(iter->status().IsCorruption());
  479. ASSERT_EQ(iter->status().ToString().find(key0) != std::string::npos,
  480. allow_data_in_error);
  481. // Internally DB Iter will iterate backwards (call Prev()) after
  482. // SeekToLast() to find the correct internal key with the last user key.
  483. // Prev() will do integrity checks and catch corruption.
  484. iter->SeekToLast();
  485. ASSERT_TRUE(iter->status().IsCorruption());
  486. ASSERT_EQ(iter->status().ToString().find(key0) != std::string::npos,
  487. allow_data_in_error);
  488. ASSERT_FALSE(iter->Valid());
  489. }
  490. }
  491. TEST_F(DBMemTableTest, VectorConcurrentInsert) {
  492. Options options;
  493. options.create_if_missing = true;
  494. options.create_missing_column_families = true;
  495. options.allow_concurrent_memtable_write = true;
  496. options.memtable_factory.reset(new VectorRepFactory());
  497. DestroyAndReopen(options);
  498. CreateAndReopenWithCF({"cf1"}, options);
  499. // Multi-threaded writes
  500. {
  501. WriteOptions write_options;
  502. std::vector<port::Thread> threads;
  503. for (int i = 0; i < 10; ++i) {
  504. threads.emplace_back([&, i]() {
  505. int start = i * 100;
  506. int end = start + 100;
  507. WriteBatch batch;
  508. for (int j = start; j < end; ++j) {
  509. ASSERT_OK(
  510. batch.Put(handles_[0], Key(j), "value" + std::to_string(j)));
  511. }
  512. ASSERT_OK(db_->Write(write_options, &batch));
  513. });
  514. }
  515. for (auto& t : threads) {
  516. t.join();
  517. }
  518. std::unique_ptr<Iterator> iter(
  519. db_->NewIterator(ReadOptions(), handles_[0]));
  520. iter->SeekToFirst();
  521. for (int i = 0; i < 1000; ++i) {
  522. ASSERT_TRUE(iter->Valid());
  523. ASSERT_EQ(iter->key().ToString(), Key(i));
  524. ASSERT_EQ(iter->value().ToString(), "value" + std::to_string(i));
  525. iter->Next();
  526. }
  527. ASSERT_FALSE(iter->Valid());
  528. ASSERT_OK(iter->status());
  529. }
  530. // Multi-threaded writes, multi CF
  531. {
  532. WriteOptions write_options;
  533. std::vector<port::Thread> threads;
  534. for (int i = 0; i < 10; ++i) {
  535. threads.emplace_back([&, i]() {
  536. int start = i * 100;
  537. int end = start + 100;
  538. WriteBatch batch;
  539. for (int j = start; j < end; ++j) {
  540. ASSERT_OK(batch.Put(handles_[0], Key(j), "CF0" + std::to_string(j)));
  541. ASSERT_OK(batch.Put(handles_[1], Key(j), "CF1" + std::to_string(j)));
  542. }
  543. ASSERT_OK(db_->Write(write_options, &batch));
  544. });
  545. }
  546. for (auto& t : threads) {
  547. t.join();
  548. }
  549. std::unique_ptr<Iterator> iter0(
  550. db_->NewIterator(ReadOptions(), handles_[0]));
  551. std::unique_ptr<Iterator> iter1(
  552. db_->NewIterator(ReadOptions(), handles_[1]));
  553. iter0->SeekToFirst();
  554. iter1->SeekToFirst();
  555. for (int i = 0; i < 1000; ++i) {
  556. ASSERT_TRUE(iter0->Valid());
  557. ASSERT_EQ(iter0->key().ToString(), Key(i));
  558. ASSERT_EQ(iter0->value().ToString(), "CF0" + std::to_string(i));
  559. iter0->Next();
  560. ASSERT_TRUE(iter1->Valid());
  561. ASSERT_EQ(iter1->key().ToString(), Key(i));
  562. ASSERT_EQ(iter1->value().ToString(), "CF1" + std::to_string(i));
  563. iter1->Next();
  564. }
  565. ASSERT_FALSE(iter0->Valid());
  566. ASSERT_OK(iter0->status());
  567. ASSERT_FALSE(iter1->Valid());
  568. ASSERT_OK(iter1->status());
  569. }
  570. ASSERT_OK(Flush(0));
  571. ASSERT_OK(Flush(1));
  572. }
  573. } // namespace ROCKSDB_NAMESPACE
  574. int main(int argc, char** argv) {
  575. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  576. ::testing::InitGoogleTest(&argc, argv);
  577. return RUN_ALL_TESTS();
  578. }