db_basic_test.cc 83 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_test_util.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/perf_context.h"
  12. #include "rocksdb/utilities/debug.h"
  13. #include "table/block_based/block_based_table_reader.h"
  14. #include "table/block_based/block_builder.h"
  15. #include "test_util/fault_injection_test_env.h"
  16. #if !defined(ROCKSDB_LITE)
  17. #include "test_util/sync_point.h"
  18. #endif
  19. namespace ROCKSDB_NAMESPACE {
  20. class DBBasicTest : public DBTestBase {
  21. public:
  22. DBBasicTest() : DBTestBase("/db_basic_test") {}
  23. };
  24. TEST_F(DBBasicTest, OpenWhenOpen) {
  25. Options options = CurrentOptions();
  26. options.env = env_;
  27. ROCKSDB_NAMESPACE::DB* db2 = nullptr;
  28. ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
  29. ASSERT_EQ(Status::Code::kIOError, s.code());
  30. ASSERT_EQ(Status::SubCode::kNone, s.subcode());
  31. ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
  32. delete db2;
  33. }
  34. #ifndef ROCKSDB_LITE
  35. TEST_F(DBBasicTest, ReadOnlyDB) {
  36. ASSERT_OK(Put("foo", "v1"));
  37. ASSERT_OK(Put("bar", "v2"));
  38. ASSERT_OK(Put("foo", "v3"));
  39. Close();
  40. auto options = CurrentOptions();
  41. assert(options.env == env_);
  42. ASSERT_OK(ReadOnlyReopen(options));
  43. ASSERT_EQ("v3", Get("foo"));
  44. ASSERT_EQ("v2", Get("bar"));
  45. Iterator* iter = db_->NewIterator(ReadOptions());
  46. int count = 0;
  47. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  48. ASSERT_OK(iter->status());
  49. ++count;
  50. }
  51. ASSERT_EQ(count, 2);
  52. delete iter;
  53. Close();
  54. // Reopen and flush memtable.
  55. Reopen(options);
  56. Flush();
  57. Close();
  58. // Now check keys in read only mode.
  59. ASSERT_OK(ReadOnlyReopen(options));
  60. ASSERT_EQ("v3", Get("foo"));
  61. ASSERT_EQ("v2", Get("bar"));
  62. ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
  63. }
  64. TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
  65. ASSERT_OK(Put("foo", "v1"));
  66. ASSERT_OK(Put("bar", "v2"));
  67. ASSERT_OK(Put("foo", "v3"));
  68. Close();
  69. auto options = CurrentOptions();
  70. options.write_dbid_to_manifest = true;
  71. assert(options.env == env_);
  72. ASSERT_OK(ReadOnlyReopen(options));
  73. std::string db_id1;
  74. db_->GetDbIdentity(db_id1);
  75. ASSERT_EQ("v3", Get("foo"));
  76. ASSERT_EQ("v2", Get("bar"));
  77. Iterator* iter = db_->NewIterator(ReadOptions());
  78. int count = 0;
  79. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  80. ASSERT_OK(iter->status());
  81. ++count;
  82. }
  83. ASSERT_EQ(count, 2);
  84. delete iter;
  85. Close();
  86. // Reopen and flush memtable.
  87. Reopen(options);
  88. Flush();
  89. Close();
  90. // Now check keys in read only mode.
  91. ASSERT_OK(ReadOnlyReopen(options));
  92. ASSERT_EQ("v3", Get("foo"));
  93. ASSERT_EQ("v2", Get("bar"));
  94. ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
  95. std::string db_id2;
  96. db_->GetDbIdentity(db_id2);
  97. ASSERT_EQ(db_id1, db_id2);
  98. }
  99. TEST_F(DBBasicTest, CompactedDB) {
  100. const uint64_t kFileSize = 1 << 20;
  101. Options options = CurrentOptions();
  102. options.disable_auto_compactions = true;
  103. options.write_buffer_size = kFileSize;
  104. options.target_file_size_base = kFileSize;
  105. options.max_bytes_for_level_base = 1 << 30;
  106. options.compression = kNoCompression;
  107. Reopen(options);
  108. // 1 L0 file, use CompactedDB if max_open_files = -1
  109. ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
  110. Flush();
  111. Close();
  112. ASSERT_OK(ReadOnlyReopen(options));
  113. Status s = Put("new", "value");
  114. ASSERT_EQ(s.ToString(),
  115. "Not implemented: Not supported operation in read only mode.");
  116. ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
  117. Close();
  118. options.max_open_files = -1;
  119. ASSERT_OK(ReadOnlyReopen(options));
  120. s = Put("new", "value");
  121. ASSERT_EQ(s.ToString(),
  122. "Not implemented: Not supported in compacted db mode.");
  123. ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
  124. Close();
  125. Reopen(options);
  126. // Add more L0 files
  127. ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
  128. Flush();
  129. ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
  130. Flush();
  131. ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
  132. ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
  133. Flush();
  134. Close();
  135. ASSERT_OK(ReadOnlyReopen(options));
  136. // Fallback to read-only DB
  137. s = Put("new", "value");
  138. ASSERT_EQ(s.ToString(),
  139. "Not implemented: Not supported operation in read only mode.");
  140. Close();
  141. // Full compaction
  142. Reopen(options);
  143. // Add more keys
  144. ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
  145. ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
  146. ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
  147. ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
  148. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  149. ASSERT_EQ(3, NumTableFilesAtLevel(1));
  150. Close();
  151. // CompactedDB
  152. ASSERT_OK(ReadOnlyReopen(options));
  153. s = Put("new", "value");
  154. ASSERT_EQ(s.ToString(),
  155. "Not implemented: Not supported in compacted db mode.");
  156. ASSERT_EQ("NOT_FOUND", Get("abc"));
  157. ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
  158. ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
  159. ASSERT_EQ("NOT_FOUND", Get("ccc"));
  160. ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
  161. ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
  162. ASSERT_EQ("NOT_FOUND", Get("ggg"));
  163. ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
  164. ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
  165. ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
  166. ASSERT_EQ("NOT_FOUND", Get("kkk"));
  167. // MultiGet
  168. std::vector<std::string> values;
  169. std::vector<Status> status_list = dbfull()->MultiGet(
  170. ReadOptions(),
  171. std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
  172. Slice("ggg"), Slice("iii"), Slice("kkk")}),
  173. &values);
  174. ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
  175. ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
  176. ASSERT_OK(status_list[0]);
  177. ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
  178. ASSERT_TRUE(status_list[1].IsNotFound());
  179. ASSERT_OK(status_list[2]);
  180. ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
  181. ASSERT_TRUE(status_list[3].IsNotFound());
  182. ASSERT_OK(status_list[4]);
  183. ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
  184. ASSERT_TRUE(status_list[5].IsNotFound());
  185. Reopen(options);
  186. // Add a key
  187. ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
  188. Close();
  189. ASSERT_OK(ReadOnlyReopen(options));
  190. s = Put("new", "value");
  191. ASSERT_EQ(s.ToString(),
  192. "Not implemented: Not supported operation in read only mode.");
  193. }
  194. TEST_F(DBBasicTest, LevelLimitReopen) {
  195. Options options = CurrentOptions();
  196. CreateAndReopenWithCF({"pikachu"}, options);
  197. const std::string value(1024 * 1024, ' ');
  198. int i = 0;
  199. while (NumTableFilesAtLevel(2, 1) == 0) {
  200. ASSERT_OK(Put(1, Key(i++), value));
  201. dbfull()->TEST_WaitForFlushMemTable();
  202. dbfull()->TEST_WaitForCompact();
  203. }
  204. options.num_levels = 1;
  205. options.max_bytes_for_level_multiplier_additional.resize(1, 1);
  206. Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
  207. ASSERT_EQ(s.IsInvalidArgument(), true);
  208. ASSERT_EQ(s.ToString(),
  209. "Invalid argument: db has more levels than options.num_levels");
  210. options.num_levels = 10;
  211. options.max_bytes_for_level_multiplier_additional.resize(10, 1);
  212. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
  213. }
  214. #endif // ROCKSDB_LITE
  215. TEST_F(DBBasicTest, PutDeleteGet) {
  216. do {
  217. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  218. ASSERT_OK(Put(1, "foo", "v1"));
  219. ASSERT_EQ("v1", Get(1, "foo"));
  220. ASSERT_OK(Put(1, "foo", "v2"));
  221. ASSERT_EQ("v2", Get(1, "foo"));
  222. ASSERT_OK(Delete(1, "foo"));
  223. ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
  224. } while (ChangeOptions());
  225. }
  226. TEST_F(DBBasicTest, PutSingleDeleteGet) {
  227. do {
  228. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  229. ASSERT_OK(Put(1, "foo", "v1"));
  230. ASSERT_EQ("v1", Get(1, "foo"));
  231. ASSERT_OK(Put(1, "foo2", "v2"));
  232. ASSERT_EQ("v2", Get(1, "foo2"));
  233. ASSERT_OK(SingleDelete(1, "foo"));
  234. ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
  235. // Ski FIFO and universal compaction because they do not apply to the test
  236. // case. Skip MergePut because single delete does not get removed when it
  237. // encounters a merge.
  238. } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
  239. kSkipMergePut));
  240. }
  241. TEST_F(DBBasicTest, EmptyFlush) {
  242. // It is possible to produce empty flushes when using single deletes. Tests
  243. // whether empty flushes cause issues.
  244. do {
  245. Random rnd(301);
  246. Options options = CurrentOptions();
  247. options.disable_auto_compactions = true;
  248. CreateAndReopenWithCF({"pikachu"}, options);
  249. Put(1, "a", Slice());
  250. SingleDelete(1, "a");
  251. ASSERT_OK(Flush(1));
  252. ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
  253. // Skip FIFO and universal compaction as they do not apply to the test
  254. // case. Skip MergePut because merges cannot be combined with single
  255. // deletions.
  256. } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
  257. kSkipMergePut));
  258. }
  259. TEST_F(DBBasicTest, GetFromVersions) {
  260. do {
  261. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  262. ASSERT_OK(Put(1, "foo", "v1"));
  263. ASSERT_OK(Flush(1));
  264. ASSERT_EQ("v1", Get(1, "foo"));
  265. ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
  266. } while (ChangeOptions());
  267. }
  268. #ifndef ROCKSDB_LITE
  269. TEST_F(DBBasicTest, GetSnapshot) {
  270. anon::OptionsOverride options_override;
  271. options_override.skip_policy = kSkipNoSnapshot;
  272. do {
  273. CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
  274. // Try with both a short key and a long key
  275. for (int i = 0; i < 2; i++) {
  276. std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
  277. ASSERT_OK(Put(1, key, "v1"));
  278. const Snapshot* s1 = db_->GetSnapshot();
  279. ASSERT_OK(Put(1, key, "v2"));
  280. ASSERT_EQ("v2", Get(1, key));
  281. ASSERT_EQ("v1", Get(1, key, s1));
  282. ASSERT_OK(Flush(1));
  283. ASSERT_EQ("v2", Get(1, key));
  284. ASSERT_EQ("v1", Get(1, key, s1));
  285. db_->ReleaseSnapshot(s1);
  286. }
  287. } while (ChangeOptions());
  288. }
  289. #endif // ROCKSDB_LITE
  290. TEST_F(DBBasicTest, CheckLock) {
  291. do {
  292. DB* localdb;
  293. Options options = CurrentOptions();
  294. ASSERT_OK(TryReopen(options));
  295. // second open should fail
  296. ASSERT_TRUE(!(DB::Open(options, dbname_, &localdb)).ok());
  297. } while (ChangeCompactOptions());
  298. }
  299. TEST_F(DBBasicTest, FlushMultipleMemtable) {
  300. do {
  301. Options options = CurrentOptions();
  302. WriteOptions writeOpt = WriteOptions();
  303. writeOpt.disableWAL = true;
  304. options.max_write_buffer_number = 4;
  305. options.min_write_buffer_number_to_merge = 3;
  306. options.max_write_buffer_size_to_maintain = -1;
  307. CreateAndReopenWithCF({"pikachu"}, options);
  308. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
  309. ASSERT_OK(Flush(1));
  310. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  311. ASSERT_EQ("v1", Get(1, "foo"));
  312. ASSERT_EQ("v1", Get(1, "bar"));
  313. ASSERT_OK(Flush(1));
  314. } while (ChangeCompactOptions());
  315. }
  316. TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
  317. // Block flush thread and disable compaction thread
  318. env_->SetBackgroundThreads(1, Env::HIGH);
  319. env_->SetBackgroundThreads(1, Env::LOW);
  320. test::SleepingBackgroundTask sleeping_task_low;
  321. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
  322. Env::Priority::LOW);
  323. test::SleepingBackgroundTask sleeping_task_high;
  324. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  325. &sleeping_task_high, Env::Priority::HIGH);
  326. Options options = CurrentOptions();
  327. // disable compaction
  328. options.disable_auto_compactions = true;
  329. WriteOptions writeOpt = WriteOptions();
  330. writeOpt.disableWAL = true;
  331. options.max_write_buffer_number = 2;
  332. options.min_write_buffer_number_to_merge = 1;
  333. options.max_write_buffer_size_to_maintain =
  334. static_cast<int64_t>(options.write_buffer_size);
  335. CreateAndReopenWithCF({"pikachu"}, options);
  336. // Compaction can still go through even if no thread can flush the
  337. // mem table.
  338. ASSERT_OK(Flush(0));
  339. ASSERT_OK(Flush(1));
  340. // Insert can go through
  341. ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
  342. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  343. ASSERT_EQ("v1", Get(0, "foo"));
  344. ASSERT_EQ("v1", Get(1, "bar"));
  345. sleeping_task_high.WakeUp();
  346. sleeping_task_high.WaitUntilDone();
  347. // Flush can still go through.
  348. ASSERT_OK(Flush(0));
  349. ASSERT_OK(Flush(1));
  350. sleeping_task_low.WakeUp();
  351. sleeping_task_low.WaitUntilDone();
  352. }
  353. TEST_F(DBBasicTest, FLUSH) {
  354. do {
  355. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  356. WriteOptions writeOpt = WriteOptions();
  357. writeOpt.disableWAL = true;
  358. SetPerfLevel(kEnableTime);
  359. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
  360. // this will now also flush the last 2 writes
  361. ASSERT_OK(Flush(1));
  362. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  363. get_perf_context()->Reset();
  364. Get(1, "foo");
  365. ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
  366. ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
  367. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  368. ASSERT_EQ("v1", Get(1, "foo"));
  369. ASSERT_EQ("v1", Get(1, "bar"));
  370. writeOpt.disableWAL = true;
  371. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
  372. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
  373. ASSERT_OK(Flush(1));
  374. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  375. ASSERT_EQ("v2", Get(1, "bar"));
  376. get_perf_context()->Reset();
  377. ASSERT_EQ("v2", Get(1, "foo"));
  378. ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
  379. writeOpt.disableWAL = false;
  380. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
  381. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
  382. ASSERT_OK(Flush(1));
  383. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  384. // 'foo' should be there because its put
  385. // has WAL enabled.
  386. ASSERT_EQ("v3", Get(1, "foo"));
  387. ASSERT_EQ("v3", Get(1, "bar"));
  388. SetPerfLevel(kDisable);
  389. } while (ChangeCompactOptions());
  390. }
  391. TEST_F(DBBasicTest, ManifestRollOver) {
  392. do {
  393. Options options;
  394. options.max_manifest_file_size = 10; // 10 bytes
  395. options = CurrentOptions(options);
  396. CreateAndReopenWithCF({"pikachu"}, options);
  397. {
  398. ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
  399. ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
  400. ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
  401. uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
  402. ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
  403. uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
  404. ASSERT_GT(manifest_after_flush, manifest_before_flush);
  405. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  406. ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
  407. // check if a new manifest file got inserted or not.
  408. ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
  409. ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
  410. ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
  411. }
  412. } while (ChangeCompactOptions());
  413. }
  414. TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
  415. do {
  416. std::string id1;
  417. ASSERT_OK(db_->GetDbIdentity(id1));
  418. Options options = CurrentOptions();
  419. Reopen(options);
  420. std::string id2;
  421. ASSERT_OK(db_->GetDbIdentity(id2));
  422. // id1 should match id2 because identity was not regenerated
  423. ASSERT_EQ(id1.compare(id2), 0);
  424. std::string idfilename = IdentityFileName(dbname_);
  425. ASSERT_OK(env_->DeleteFile(idfilename));
  426. Reopen(options);
  427. std::string id3;
  428. ASSERT_OK(db_->GetDbIdentity(id3));
  429. if (options.write_dbid_to_manifest) {
  430. ASSERT_EQ(id1.compare(id3), 0);
  431. } else {
  432. // id1 should NOT match id3 because identity was regenerated
  433. ASSERT_NE(id1.compare(id3), 0);
  434. }
  435. } while (ChangeCompactOptions());
  436. }
  437. TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
  438. do {
  439. std::string id1;
  440. ASSERT_OK(db_->GetDbIdentity(id1));
  441. Options options = CurrentOptions();
  442. options.write_dbid_to_manifest = true;
  443. Reopen(options);
  444. std::string id2;
  445. ASSERT_OK(db_->GetDbIdentity(id2));
  446. // id1 should match id2 because identity was not regenerated
  447. ASSERT_EQ(id1.compare(id2), 0);
  448. std::string idfilename = IdentityFileName(dbname_);
  449. ASSERT_OK(env_->DeleteFile(idfilename));
  450. Reopen(options);
  451. std::string id3;
  452. ASSERT_OK(db_->GetDbIdentity(id3));
  453. // id1 should NOT match id3 because identity was regenerated
  454. ASSERT_EQ(id1, id3);
  455. } while (ChangeCompactOptions());
  456. }
  457. #ifndef ROCKSDB_LITE
  458. TEST_F(DBBasicTest, Snapshot) {
  459. anon::OptionsOverride options_override;
  460. options_override.skip_policy = kSkipNoSnapshot;
  461. do {
  462. CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
  463. Put(0, "foo", "0v1");
  464. Put(1, "foo", "1v1");
  465. const Snapshot* s1 = db_->GetSnapshot();
  466. ASSERT_EQ(1U, GetNumSnapshots());
  467. uint64_t time_snap1 = GetTimeOldestSnapshots();
  468. ASSERT_GT(time_snap1, 0U);
  469. ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
  470. Put(0, "foo", "0v2");
  471. Put(1, "foo", "1v2");
  472. env_->addon_time_.fetch_add(1);
  473. const Snapshot* s2 = db_->GetSnapshot();
  474. ASSERT_EQ(2U, GetNumSnapshots());
  475. ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
  476. ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
  477. Put(0, "foo", "0v3");
  478. Put(1, "foo", "1v3");
  479. {
  480. ManagedSnapshot s3(db_);
  481. ASSERT_EQ(3U, GetNumSnapshots());
  482. ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
  483. ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
  484. Put(0, "foo", "0v4");
  485. Put(1, "foo", "1v4");
  486. ASSERT_EQ("0v1", Get(0, "foo", s1));
  487. ASSERT_EQ("1v1", Get(1, "foo", s1));
  488. ASSERT_EQ("0v2", Get(0, "foo", s2));
  489. ASSERT_EQ("1v2", Get(1, "foo", s2));
  490. ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
  491. ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
  492. ASSERT_EQ("0v4", Get(0, "foo"));
  493. ASSERT_EQ("1v4", Get(1, "foo"));
  494. }
  495. ASSERT_EQ(2U, GetNumSnapshots());
  496. ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
  497. ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
  498. ASSERT_EQ("0v1", Get(0, "foo", s1));
  499. ASSERT_EQ("1v1", Get(1, "foo", s1));
  500. ASSERT_EQ("0v2", Get(0, "foo", s2));
  501. ASSERT_EQ("1v2", Get(1, "foo", s2));
  502. ASSERT_EQ("0v4", Get(0, "foo"));
  503. ASSERT_EQ("1v4", Get(1, "foo"));
  504. db_->ReleaseSnapshot(s1);
  505. ASSERT_EQ("0v2", Get(0, "foo", s2));
  506. ASSERT_EQ("1v2", Get(1, "foo", s2));
  507. ASSERT_EQ("0v4", Get(0, "foo"));
  508. ASSERT_EQ("1v4", Get(1, "foo"));
  509. ASSERT_EQ(1U, GetNumSnapshots());
  510. ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
  511. ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
  512. db_->ReleaseSnapshot(s2);
  513. ASSERT_EQ(0U, GetNumSnapshots());
  514. ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
  515. ASSERT_EQ("0v4", Get(0, "foo"));
  516. ASSERT_EQ("1v4", Get(1, "foo"));
  517. } while (ChangeOptions());
  518. }
  519. #endif // ROCKSDB_LITE
  520. TEST_F(DBBasicTest, CompactBetweenSnapshots) {
  521. anon::OptionsOverride options_override;
  522. options_override.skip_policy = kSkipNoSnapshot;
  523. do {
  524. Options options = CurrentOptions(options_override);
  525. options.disable_auto_compactions = true;
  526. CreateAndReopenWithCF({"pikachu"}, options);
  527. Random rnd(301);
  528. FillLevels("a", "z", 1);
  529. Put(1, "foo", "first");
  530. const Snapshot* snapshot1 = db_->GetSnapshot();
  531. Put(1, "foo", "second");
  532. Put(1, "foo", "third");
  533. Put(1, "foo", "fourth");
  534. const Snapshot* snapshot2 = db_->GetSnapshot();
  535. Put(1, "foo", "fifth");
  536. Put(1, "foo", "sixth");
  537. // All entries (including duplicates) exist
  538. // before any compaction or flush is triggered.
  539. ASSERT_EQ(AllEntriesFor("foo", 1),
  540. "[ sixth, fifth, fourth, third, second, first ]");
  541. ASSERT_EQ("sixth", Get(1, "foo"));
  542. ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
  543. ASSERT_EQ("first", Get(1, "foo", snapshot1));
  544. // After a flush, "second", "third" and "fifth" should
  545. // be removed
  546. ASSERT_OK(Flush(1));
  547. ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
  548. // after we release the snapshot1, only two values left
  549. db_->ReleaseSnapshot(snapshot1);
  550. FillLevels("a", "z", 1);
  551. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  552. nullptr);
  553. // We have only one valid snapshot snapshot2. Since snapshot1 is
  554. // not valid anymore, "first" should be removed by a compaction.
  555. ASSERT_EQ("sixth", Get(1, "foo"));
  556. ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
  557. ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
  558. // after we release the snapshot2, only one value should be left
  559. db_->ReleaseSnapshot(snapshot2);
  560. FillLevels("a", "z", 1);
  561. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  562. nullptr);
  563. ASSERT_EQ("sixth", Get(1, "foo"));
  564. ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
  565. } while (ChangeOptions(kSkipFIFOCompaction));
  566. }
  567. TEST_F(DBBasicTest, DBOpen_Options) {
  568. Options options = CurrentOptions();
  569. Close();
  570. Destroy(options);
  571. // Does not exist, and create_if_missing == false: error
  572. DB* db = nullptr;
  573. options.create_if_missing = false;
  574. Status s = DB::Open(options, dbname_, &db);
  575. ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
  576. ASSERT_TRUE(db == nullptr);
  577. // Does not exist, and create_if_missing == true: OK
  578. options.create_if_missing = true;
  579. s = DB::Open(options, dbname_, &db);
  580. ASSERT_OK(s);
  581. ASSERT_TRUE(db != nullptr);
  582. delete db;
  583. db = nullptr;
  584. // Does exist, and error_if_exists == true: error
  585. options.create_if_missing = false;
  586. options.error_if_exists = true;
  587. s = DB::Open(options, dbname_, &db);
  588. ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
  589. ASSERT_TRUE(db == nullptr);
  590. // Does exist, and error_if_exists == false: OK
  591. options.create_if_missing = true;
  592. options.error_if_exists = false;
  593. s = DB::Open(options, dbname_, &db);
  594. ASSERT_OK(s);
  595. ASSERT_TRUE(db != nullptr);
  596. delete db;
  597. db = nullptr;
  598. }
  599. TEST_F(DBBasicTest, CompactOnFlush) {
  600. anon::OptionsOverride options_override;
  601. options_override.skip_policy = kSkipNoSnapshot;
  602. do {
  603. Options options = CurrentOptions(options_override);
  604. options.disable_auto_compactions = true;
  605. CreateAndReopenWithCF({"pikachu"}, options);
  606. Put(1, "foo", "v1");
  607. ASSERT_OK(Flush(1));
  608. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
  609. // Write two new keys
  610. Put(1, "a", "begin");
  611. Put(1, "z", "end");
  612. Flush(1);
  613. // Case1: Delete followed by a put
  614. Delete(1, "foo");
  615. Put(1, "foo", "v2");
  616. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
  617. // After the current memtable is flushed, the DEL should
  618. // have been removed
  619. ASSERT_OK(Flush(1));
  620. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
  621. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  622. nullptr);
  623. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
  624. // Case 2: Delete followed by another delete
  625. Delete(1, "foo");
  626. Delete(1, "foo");
  627. ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
  628. ASSERT_OK(Flush(1));
  629. ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
  630. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  631. nullptr);
  632. ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
  633. // Case 3: Put followed by a delete
  634. Put(1, "foo", "v3");
  635. Delete(1, "foo");
  636. ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
  637. ASSERT_OK(Flush(1));
  638. ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
  639. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  640. nullptr);
  641. ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
  642. // Case 4: Put followed by another Put
  643. Put(1, "foo", "v4");
  644. Put(1, "foo", "v5");
  645. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
  646. ASSERT_OK(Flush(1));
  647. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
  648. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  649. nullptr);
  650. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
  651. // clear database
  652. Delete(1, "foo");
  653. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  654. nullptr);
  655. ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
  656. // Case 5: Put followed by snapshot followed by another Put
  657. // Both puts should remain.
  658. Put(1, "foo", "v6");
  659. const Snapshot* snapshot = db_->GetSnapshot();
  660. Put(1, "foo", "v7");
  661. ASSERT_OK(Flush(1));
  662. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
  663. db_->ReleaseSnapshot(snapshot);
  664. // clear database
  665. Delete(1, "foo");
  666. dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
  667. nullptr);
  668. ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
  669. // Case 5: snapshot followed by a put followed by another Put
  670. // Only the last put should remain.
  671. const Snapshot* snapshot1 = db_->GetSnapshot();
  672. Put(1, "foo", "v8");
  673. Put(1, "foo", "v9");
  674. ASSERT_OK(Flush(1));
  675. ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
  676. db_->ReleaseSnapshot(snapshot1);
  677. } while (ChangeCompactOptions());
  678. }
  679. TEST_F(DBBasicTest, FlushOneColumnFamily) {
  680. Options options = CurrentOptions();
  681. CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
  682. "alyosha", "popovich"},
  683. options);
  684. ASSERT_OK(Put(0, "Default", "Default"));
  685. ASSERT_OK(Put(1, "pikachu", "pikachu"));
  686. ASSERT_OK(Put(2, "ilya", "ilya"));
  687. ASSERT_OK(Put(3, "muromec", "muromec"));
  688. ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
  689. ASSERT_OK(Put(5, "nikitich", "nikitich"));
  690. ASSERT_OK(Put(6, "alyosha", "alyosha"));
  691. ASSERT_OK(Put(7, "popovich", "popovich"));
  692. for (int i = 0; i < 8; ++i) {
  693. Flush(i);
  694. auto tables = ListTableFiles(env_, dbname_);
  695. ASSERT_EQ(tables.size(), i + 1U);
  696. }
  697. }
  698. TEST_F(DBBasicTest, MultiGetSimple) {
  699. do {
  700. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  701. SetPerfLevel(kEnableCount);
  702. ASSERT_OK(Put(1, "k1", "v1"));
  703. ASSERT_OK(Put(1, "k2", "v2"));
  704. ASSERT_OK(Put(1, "k3", "v3"));
  705. ASSERT_OK(Put(1, "k4", "v4"));
  706. ASSERT_OK(Delete(1, "k4"));
  707. ASSERT_OK(Put(1, "k5", "v5"));
  708. ASSERT_OK(Delete(1, "no_key"));
  709. std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
  710. std::vector<std::string> values(20, "Temporary data to be overwritten");
  711. std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
  712. get_perf_context()->Reset();
  713. std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
  714. ASSERT_EQ(values.size(), keys.size());
  715. ASSERT_EQ(values[0], "v1");
  716. ASSERT_EQ(values[1], "v2");
  717. ASSERT_EQ(values[2], "v3");
  718. ASSERT_EQ(values[4], "v5");
  719. // four kv pairs * two bytes per value
  720. ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
  721. ASSERT_OK(s[0]);
  722. ASSERT_OK(s[1]);
  723. ASSERT_OK(s[2]);
  724. ASSERT_TRUE(s[3].IsNotFound());
  725. ASSERT_OK(s[4]);
  726. ASSERT_TRUE(s[5].IsNotFound());
  727. SetPerfLevel(kDisable);
  728. } while (ChangeCompactOptions());
  729. }
  730. TEST_F(DBBasicTest, MultiGetEmpty) {
  731. do {
  732. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  733. // Empty Key Set
  734. std::vector<Slice> keys;
  735. std::vector<std::string> values;
  736. std::vector<ColumnFamilyHandle*> cfs;
  737. std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
  738. ASSERT_EQ(s.size(), 0U);
  739. // Empty Database, Empty Key Set
  740. Options options = CurrentOptions();
  741. options.create_if_missing = true;
  742. DestroyAndReopen(options);
  743. CreateAndReopenWithCF({"pikachu"}, options);
  744. s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
  745. ASSERT_EQ(s.size(), 0U);
  746. // Empty Database, Search for Keys
  747. keys.resize(2);
  748. keys[0] = "a";
  749. keys[1] = "b";
  750. cfs.push_back(handles_[0]);
  751. cfs.push_back(handles_[1]);
  752. s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
  753. ASSERT_EQ(static_cast<int>(s.size()), 2);
  754. ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
  755. } while (ChangeCompactOptions());
  756. }
  757. TEST_F(DBBasicTest, ChecksumTest) {
  758. BlockBasedTableOptions table_options;
  759. Options options = CurrentOptions();
  760. // change when new checksum type added
  761. int max_checksum = static_cast<int>(kxxHash64);
  762. const int kNumPerFile = 2;
  763. // generate one table with each type of checksum
  764. for (int i = 0; i <= max_checksum; ++i) {
  765. table_options.checksum = static_cast<ChecksumType>(i);
  766. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  767. Reopen(options);
  768. for (int j = 0; j < kNumPerFile; ++j) {
  769. ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
  770. }
  771. ASSERT_OK(Flush());
  772. }
  773. // with each valid checksum type setting...
  774. for (int i = 0; i <= max_checksum; ++i) {
  775. table_options.checksum = static_cast<ChecksumType>(i);
  776. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  777. Reopen(options);
  778. // verify every type of checksum (should be regardless of that setting)
  779. for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
  780. ASSERT_EQ(Key(j), Get(Key(j)));
  781. }
  782. }
  783. }
  784. // On Windows you can have either memory mapped file or a file
  785. // with unbuffered access. So this asserts and does not make
  786. // sense to run
  787. #ifndef OS_WIN
  788. TEST_F(DBBasicTest, MmapAndBufferOptions) {
  789. if (!IsMemoryMappedAccessSupported()) {
  790. return;
  791. }
  792. Options options = CurrentOptions();
  793. options.use_direct_reads = true;
  794. options.allow_mmap_reads = true;
  795. ASSERT_NOK(TryReopen(options));
  796. // All other combinations are acceptable
  797. options.use_direct_reads = false;
  798. ASSERT_OK(TryReopen(options));
  799. if (IsDirectIOSupported()) {
  800. options.use_direct_reads = true;
  801. options.allow_mmap_reads = false;
  802. ASSERT_OK(TryReopen(options));
  803. }
  804. options.use_direct_reads = false;
  805. ASSERT_OK(TryReopen(options));
  806. }
  807. #endif
  808. class TestEnv : public EnvWrapper {
  809. public:
  810. explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
  811. class TestLogger : public Logger {
  812. public:
  813. using Logger::Logv;
  814. explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
  815. ~TestLogger() override {
  816. if (!closed_) {
  817. CloseHelper();
  818. }
  819. }
  820. void Logv(const char* /*format*/, va_list /*ap*/) override {}
  821. protected:
  822. Status CloseImpl() override { return CloseHelper(); }
  823. private:
  824. Status CloseHelper() {
  825. env->CloseCountInc();
  826. ;
  827. return Status::IOError();
  828. }
  829. TestEnv* env;
  830. };
  831. void CloseCountInc() { close_count++; }
  832. int GetCloseCount() { return close_count; }
  833. Status NewLogger(const std::string& /*fname*/,
  834. std::shared_ptr<Logger>* result) override {
  835. result->reset(new TestLogger(this));
  836. return Status::OK();
  837. }
  838. private:
  839. int close_count;
  840. };
  841. TEST_F(DBBasicTest, DBClose) {
  842. Options options = GetDefaultOptions();
  843. std::string dbname = test::PerThreadDBPath("db_close_test");
  844. ASSERT_OK(DestroyDB(dbname, options));
  845. DB* db = nullptr;
  846. TestEnv* env = new TestEnv(env_);
  847. std::unique_ptr<TestEnv> local_env_guard(env);
  848. options.create_if_missing = true;
  849. options.env = env;
  850. Status s = DB::Open(options, dbname, &db);
  851. ASSERT_OK(s);
  852. ASSERT_TRUE(db != nullptr);
  853. s = db->Close();
  854. ASSERT_EQ(env->GetCloseCount(), 1);
  855. ASSERT_EQ(s, Status::IOError());
  856. delete db;
  857. ASSERT_EQ(env->GetCloseCount(), 1);
  858. // Do not call DB::Close() and ensure our logger Close() still gets called
  859. s = DB::Open(options, dbname, &db);
  860. ASSERT_OK(s);
  861. ASSERT_TRUE(db != nullptr);
  862. delete db;
  863. ASSERT_EQ(env->GetCloseCount(), 2);
  864. // Provide our own logger and ensure DB::Close() does not close it
  865. options.info_log.reset(new TestEnv::TestLogger(env));
  866. options.create_if_missing = false;
  867. s = DB::Open(options, dbname, &db);
  868. ASSERT_OK(s);
  869. ASSERT_TRUE(db != nullptr);
  870. s = db->Close();
  871. ASSERT_EQ(s, Status::OK());
  872. delete db;
  873. ASSERT_EQ(env->GetCloseCount(), 2);
  874. options.info_log.reset();
  875. ASSERT_EQ(env->GetCloseCount(), 3);
  876. }
  877. TEST_F(DBBasicTest, DBCloseFlushError) {
  878. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  879. new FaultInjectionTestEnv(env_));
  880. Options options = GetDefaultOptions();
  881. options.create_if_missing = true;
  882. options.manual_wal_flush = true;
  883. options.write_buffer_size=100;
  884. options.env = fault_injection_env.get();
  885. Reopen(options);
  886. ASSERT_OK(Put("key1", "value1"));
  887. ASSERT_OK(Put("key2", "value2"));
  888. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  889. ASSERT_OK(Put("key3", "value3"));
  890. fault_injection_env->SetFilesystemActive(false);
  891. Status s = dbfull()->Close();
  892. fault_injection_env->SetFilesystemActive(true);
  893. ASSERT_NE(s, Status::OK());
  894. Destroy(options);
  895. }
  896. class DBMultiGetTestWithParam : public DBBasicTest,
  897. public testing::WithParamInterface<bool> {};
  898. TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
  899. Options options = CurrentOptions();
  900. CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
  901. "alyosha", "popovich"},
  902. options);
  903. // <CF, key, value> tuples
  904. std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
  905. static const int num_keys = 24;
  906. cf_kv_vec.reserve(num_keys);
  907. for (int i = 0; i < num_keys; ++i) {
  908. int cf = i / 3;
  909. int cf_key = 1 % 3;
  910. cf_kv_vec.emplace_back(std::make_tuple(
  911. cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
  912. "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
  913. ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
  914. std::get<2>(cf_kv_vec[i])));
  915. }
  916. int get_sv_count = 0;
  917. ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
  918. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  919. "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
  920. if (++get_sv_count == 2) {
  921. // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
  922. // is forced to repeat the process
  923. for (int i = 0; i < num_keys; ++i) {
  924. int cf = i / 3;
  925. int cf_key = i % 8;
  926. if (cf_key == 0) {
  927. ASSERT_OK(Flush(cf));
  928. }
  929. ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
  930. std::get<2>(cf_kv_vec[i]) + "_2"));
  931. }
  932. }
  933. if (get_sv_count == 11) {
  934. for (int i = 0; i < 8; ++i) {
  935. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
  936. db->GetColumnFamilyHandle(i))
  937. ->cfd();
  938. ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
  939. }
  940. }
  941. });
  942. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  943. std::vector<int> cfs;
  944. std::vector<std::string> keys;
  945. std::vector<std::string> values;
  946. for (int i = 0; i < num_keys; ++i) {
  947. cfs.push_back(std::get<0>(cf_kv_vec[i]));
  948. keys.push_back(std::get<1>(cf_kv_vec[i]));
  949. }
  950. values = MultiGet(cfs, keys, nullptr, GetParam());
  951. ASSERT_EQ(values.size(), num_keys);
  952. for (unsigned int j = 0; j < values.size(); ++j) {
  953. ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
  954. }
  955. keys.clear();
  956. cfs.clear();
  957. cfs.push_back(std::get<0>(cf_kv_vec[0]));
  958. keys.push_back(std::get<1>(cf_kv_vec[0]));
  959. cfs.push_back(std::get<0>(cf_kv_vec[3]));
  960. keys.push_back(std::get<1>(cf_kv_vec[3]));
  961. cfs.push_back(std::get<0>(cf_kv_vec[4]));
  962. keys.push_back(std::get<1>(cf_kv_vec[4]));
  963. values = MultiGet(cfs, keys, nullptr, GetParam());
  964. ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
  965. ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
  966. ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
  967. keys.clear();
  968. cfs.clear();
  969. cfs.push_back(std::get<0>(cf_kv_vec[7]));
  970. keys.push_back(std::get<1>(cf_kv_vec[7]));
  971. cfs.push_back(std::get<0>(cf_kv_vec[6]));
  972. keys.push_back(std::get<1>(cf_kv_vec[6]));
  973. cfs.push_back(std::get<0>(cf_kv_vec[1]));
  974. keys.push_back(std::get<1>(cf_kv_vec[1]));
  975. values = MultiGet(cfs, keys, nullptr, GetParam());
  976. ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
  977. ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
  978. ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
  979. for (int cf = 0; cf < 8; ++cf) {
  980. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
  981. reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
  982. ->cfd();
  983. ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
  984. ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
  985. }
  986. }
  987. TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
  988. Options options = CurrentOptions();
  989. CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
  990. "alyosha", "popovich"},
  991. options);
  992. for (int i = 0; i < 8; ++i) {
  993. ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
  994. "cf" + std::to_string(i) + "_val"));
  995. }
  996. int get_sv_count = 0;
  997. int retries = 0;
  998. bool last_try = false;
  999. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1000. "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
  1001. last_try = true;
  1002. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1003. });
  1004. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1005. "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
  1006. if (last_try) {
  1007. return;
  1008. }
  1009. if (++get_sv_count == 2) {
  1010. ++retries;
  1011. get_sv_count = 0;
  1012. for (int i = 0; i < 8; ++i) {
  1013. ASSERT_OK(Flush(i));
  1014. ASSERT_OK(Put(
  1015. i, "cf" + std::to_string(i) + "_key",
  1016. "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
  1017. }
  1018. }
  1019. });
  1020. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1021. std::vector<int> cfs;
  1022. std::vector<std::string> keys;
  1023. std::vector<std::string> values;
  1024. for (int i = 0; i < 8; ++i) {
  1025. cfs.push_back(i);
  1026. keys.push_back("cf" + std::to_string(i) + "_key");
  1027. }
  1028. values = MultiGet(cfs, keys, nullptr, GetParam());
  1029. ASSERT_TRUE(last_try);
  1030. ASSERT_EQ(values.size(), 8);
  1031. for (unsigned int j = 0; j < values.size(); ++j) {
  1032. ASSERT_EQ(values[j],
  1033. "cf" + std::to_string(j) + "_val" + std::to_string(retries));
  1034. }
  1035. for (int i = 0; i < 8; ++i) {
  1036. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
  1037. reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
  1038. ->cfd();
  1039. ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
  1040. }
  1041. }
  1042. TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
  1043. Options options = CurrentOptions();
  1044. CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
  1045. "alyosha", "popovich"},
  1046. options);
  1047. for (int i = 0; i < 8; ++i) {
  1048. ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
  1049. "cf" + std::to_string(i) + "_val"));
  1050. }
  1051. int get_sv_count = 0;
  1052. ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
  1053. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1054. "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
  1055. if (++get_sv_count == 2) {
  1056. for (int i = 0; i < 8; ++i) {
  1057. ASSERT_OK(Flush(i));
  1058. ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
  1059. "cf" + std::to_string(i) + "_val2"));
  1060. }
  1061. }
  1062. if (get_sv_count == 8) {
  1063. for (int i = 0; i < 8; ++i) {
  1064. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
  1065. db->GetColumnFamilyHandle(i))
  1066. ->cfd();
  1067. ASSERT_TRUE(
  1068. (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
  1069. (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
  1070. }
  1071. }
  1072. });
  1073. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1074. std::vector<int> cfs;
  1075. std::vector<std::string> keys;
  1076. std::vector<std::string> values;
  1077. for (int i = 0; i < 8; ++i) {
  1078. cfs.push_back(i);
  1079. keys.push_back("cf" + std::to_string(i) + "_key");
  1080. }
  1081. const Snapshot* snapshot = db_->GetSnapshot();
  1082. values = MultiGet(cfs, keys, snapshot, GetParam());
  1083. db_->ReleaseSnapshot(snapshot);
  1084. ASSERT_EQ(values.size(), 8);
  1085. for (unsigned int j = 0; j < values.size(); ++j) {
  1086. ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
  1087. }
  1088. for (int i = 0; i < 8; ++i) {
  1089. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
  1090. reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
  1091. ->cfd();
  1092. ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
  1093. }
  1094. }
  1095. INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
  1096. testing::Bool());
  1097. TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
  1098. do {
  1099. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  1100. SetPerfLevel(kEnableCount);
  1101. ASSERT_OK(Put(1, "k1", "v1"));
  1102. ASSERT_OK(Put(1, "k2", "v2"));
  1103. ASSERT_OK(Put(1, "k3", "v3"));
  1104. ASSERT_OK(Put(1, "k4", "v4"));
  1105. ASSERT_OK(Delete(1, "k4"));
  1106. ASSERT_OK(Put(1, "k5", "v5"));
  1107. ASSERT_OK(Delete(1, "no_key"));
  1108. get_perf_context()->Reset();
  1109. std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
  1110. std::vector<PinnableSlice> values(keys.size());
  1111. std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
  1112. std::vector<Status> s(keys.size());
  1113. db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
  1114. values.data(), s.data(), false);
  1115. ASSERT_EQ(values.size(), keys.size());
  1116. ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
  1117. ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
  1118. ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
  1119. ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
  1120. // four kv pairs * two bytes per value
  1121. ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
  1122. ASSERT_TRUE(s[0].IsNotFound());
  1123. ASSERT_OK(s[1]);
  1124. ASSERT_TRUE(s[2].IsNotFound());
  1125. ASSERT_OK(s[3]);
  1126. ASSERT_OK(s[4]);
  1127. ASSERT_OK(s[5]);
  1128. SetPerfLevel(kDisable);
  1129. } while (ChangeCompactOptions());
  1130. }
  1131. TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
  1132. do {
  1133. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  1134. SetPerfLevel(kEnableCount);
  1135. ASSERT_OK(Put(1, "k1", "v1"));
  1136. ASSERT_OK(Put(1, "k2", "v2"));
  1137. ASSERT_OK(Put(1, "k3", "v3"));
  1138. ASSERT_OK(Put(1, "k4", "v4"));
  1139. ASSERT_OK(Delete(1, "k4"));
  1140. ASSERT_OK(Put(1, "k5", "v5"));
  1141. ASSERT_OK(Delete(1, "no_key"));
  1142. get_perf_context()->Reset();
  1143. std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
  1144. std::vector<PinnableSlice> values(keys.size());
  1145. std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
  1146. std::vector<Status> s(keys.size());
  1147. db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
  1148. values.data(), s.data(), true);
  1149. ASSERT_EQ(values.size(), keys.size());
  1150. ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
  1151. ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
  1152. ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
  1153. ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
  1154. // four kv pairs * two bytes per value
  1155. ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
  1156. ASSERT_OK(s[0]);
  1157. ASSERT_OK(s[1]);
  1158. ASSERT_OK(s[2]);
  1159. ASSERT_TRUE(s[3].IsNotFound());
  1160. ASSERT_OK(s[4]);
  1161. ASSERT_TRUE(s[5].IsNotFound());
  1162. SetPerfLevel(kDisable);
  1163. } while (ChangeCompactOptions());
  1164. }
  1165. TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
  1166. Options options = CurrentOptions();
  1167. options.disable_auto_compactions = true;
  1168. Reopen(options);
  1169. int num_keys = 0;
  1170. for (int i = 0; i < 128; ++i) {
  1171. ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
  1172. num_keys++;
  1173. if (num_keys == 8) {
  1174. Flush();
  1175. num_keys = 0;
  1176. }
  1177. }
  1178. if (num_keys > 0) {
  1179. Flush();
  1180. num_keys = 0;
  1181. }
  1182. MoveFilesToLevel(2);
  1183. for (int i = 0; i < 128; i += 3) {
  1184. ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
  1185. num_keys++;
  1186. if (num_keys == 8) {
  1187. Flush();
  1188. num_keys = 0;
  1189. }
  1190. }
  1191. if (num_keys > 0) {
  1192. Flush();
  1193. num_keys = 0;
  1194. }
  1195. MoveFilesToLevel(1);
  1196. for (int i = 0; i < 128; i += 5) {
  1197. ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
  1198. num_keys++;
  1199. if (num_keys == 8) {
  1200. Flush();
  1201. num_keys = 0;
  1202. }
  1203. }
  1204. if (num_keys > 0) {
  1205. Flush();
  1206. num_keys = 0;
  1207. }
  1208. ASSERT_EQ(0, num_keys);
  1209. for (int i = 0; i < 128; i += 9) {
  1210. ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
  1211. }
  1212. std::vector<std::string> keys;
  1213. std::vector<std::string> values;
  1214. for (int i = 64; i < 80; ++i) {
  1215. keys.push_back("key_" + std::to_string(i));
  1216. }
  1217. values = MultiGet(keys, nullptr);
  1218. ASSERT_EQ(values.size(), 16);
  1219. for (unsigned int j = 0; j < values.size(); ++j) {
  1220. int key = j + 64;
  1221. if (key % 9 == 0) {
  1222. ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
  1223. } else if (key % 5 == 0) {
  1224. ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
  1225. } else if (key % 3 == 0) {
  1226. ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
  1227. } else {
  1228. ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
  1229. }
  1230. }
  1231. }
  1232. TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
  1233. Options options = CurrentOptions();
  1234. options.disable_auto_compactions = true;
  1235. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  1236. BlockBasedTableOptions bbto;
  1237. bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
  1238. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1239. Reopen(options);
  1240. int num_keys = 0;
  1241. for (int i = 0; i < 128; ++i) {
  1242. ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
  1243. num_keys++;
  1244. if (num_keys == 8) {
  1245. Flush();
  1246. num_keys = 0;
  1247. }
  1248. }
  1249. if (num_keys > 0) {
  1250. Flush();
  1251. num_keys = 0;
  1252. }
  1253. MoveFilesToLevel(2);
  1254. for (int i = 0; i < 128; i += 3) {
  1255. ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
  1256. num_keys++;
  1257. if (num_keys == 8) {
  1258. Flush();
  1259. num_keys = 0;
  1260. }
  1261. }
  1262. if (num_keys > 0) {
  1263. Flush();
  1264. num_keys = 0;
  1265. }
  1266. MoveFilesToLevel(1);
  1267. for (int i = 0; i < 128; i += 5) {
  1268. ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
  1269. num_keys++;
  1270. if (num_keys == 8) {
  1271. Flush();
  1272. num_keys = 0;
  1273. }
  1274. }
  1275. if (num_keys > 0) {
  1276. Flush();
  1277. num_keys = 0;
  1278. }
  1279. ASSERT_EQ(0, num_keys);
  1280. for (int i = 0; i < 128; i += 9) {
  1281. ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
  1282. }
  1283. std::vector<std::string> keys;
  1284. std::vector<std::string> values;
  1285. for (int i = 32; i < 80; ++i) {
  1286. keys.push_back("key_" + std::to_string(i));
  1287. }
  1288. values = MultiGet(keys, nullptr);
  1289. ASSERT_EQ(values.size(), keys.size());
  1290. for (unsigned int j = 0; j < 48; ++j) {
  1291. int key = j + 32;
  1292. std::string value;
  1293. value.append("val_l2_" + std::to_string(key));
  1294. if (key % 3 == 0) {
  1295. value.append(",");
  1296. value.append("val_l1_" + std::to_string(key));
  1297. }
  1298. if (key % 5 == 0) {
  1299. value.append(",");
  1300. value.append("val_l0_" + std::to_string(key));
  1301. }
  1302. if (key % 9 == 0) {
  1303. value.append(",");
  1304. value.append("val_mem_" + std::to_string(key));
  1305. }
  1306. ASSERT_EQ(values[j], value);
  1307. }
  1308. }
  1309. // Test class for batched MultiGet with prefix extractor
  1310. // Param bool - If true, use partitioned filters
  1311. // If false, use full filter block
  1312. class MultiGetPrefixExtractorTest : public DBBasicTest,
  1313. public ::testing::WithParamInterface<bool> {
  1314. };
  1315. TEST_P(MultiGetPrefixExtractorTest, Batched) {
  1316. Options options = CurrentOptions();
  1317. options.prefix_extractor.reset(NewFixedPrefixTransform(2));
  1318. options.memtable_prefix_bloom_size_ratio = 10;
  1319. BlockBasedTableOptions bbto;
  1320. if (GetParam()) {
  1321. bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
  1322. bbto.partition_filters = true;
  1323. }
  1324. bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
  1325. bbto.whole_key_filtering = false;
  1326. bbto.cache_index_and_filter_blocks = false;
  1327. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1328. Reopen(options);
  1329. SetPerfLevel(kEnableCount);
  1330. get_perf_context()->Reset();
  1331. // First key is not in the prefix_extractor domain
  1332. ASSERT_OK(Put("k", "v0"));
  1333. ASSERT_OK(Put("kk1", "v1"));
  1334. ASSERT_OK(Put("kk2", "v2"));
  1335. ASSERT_OK(Put("kk3", "v3"));
  1336. ASSERT_OK(Put("kk4", "v4"));
  1337. std::vector<std::string> mem_keys(
  1338. {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
  1339. std::vector<std::string> inmem_values;
  1340. inmem_values = MultiGet(mem_keys, nullptr);
  1341. ASSERT_EQ(inmem_values[0], "v0");
  1342. ASSERT_EQ(inmem_values[1], "v1");
  1343. ASSERT_EQ(inmem_values[2], "v2");
  1344. ASSERT_EQ(inmem_values[3], "v3");
  1345. ASSERT_EQ(inmem_values[4], "v4");
  1346. ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
  1347. ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
  1348. ASSERT_OK(Flush());
  1349. std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
  1350. std::vector<std::string> values;
  1351. get_perf_context()->Reset();
  1352. values = MultiGet(keys, nullptr);
  1353. ASSERT_EQ(values[0], "v0");
  1354. ASSERT_EQ(values[1], "v1");
  1355. ASSERT_EQ(values[2], "v2");
  1356. ASSERT_EQ(values[3], "v3");
  1357. ASSERT_EQ(values[4], "v4");
  1358. // Filter hits for 4 in-domain keys
  1359. ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
  1360. }
  1361. INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
  1362. ::testing::Bool());
  1363. #ifndef ROCKSDB_LITE
  1364. class DBMultiGetRowCacheTest : public DBBasicTest,
  1365. public ::testing::WithParamInterface<bool> {};
  1366. TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
  1367. do {
  1368. option_config_ = kRowCache;
  1369. Options options = CurrentOptions();
  1370. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  1371. CreateAndReopenWithCF({"pikachu"}, options);
  1372. SetPerfLevel(kEnableCount);
  1373. ASSERT_OK(Put(1, "k1", "v1"));
  1374. ASSERT_OK(Put(1, "k2", "v2"));
  1375. ASSERT_OK(Put(1, "k3", "v3"));
  1376. ASSERT_OK(Put(1, "k4", "v4"));
  1377. Flush(1);
  1378. ASSERT_OK(Put(1, "k5", "v5"));
  1379. const Snapshot* snap1 = dbfull()->GetSnapshot();
  1380. ASSERT_OK(Delete(1, "k4"));
  1381. Flush(1);
  1382. const Snapshot* snap2 = dbfull()->GetSnapshot();
  1383. get_perf_context()->Reset();
  1384. std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
  1385. std::vector<PinnableSlice> values(keys.size());
  1386. std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
  1387. std::vector<Status> s(keys.size());
  1388. ReadOptions ro;
  1389. bool use_snapshots = GetParam();
  1390. if (use_snapshots) {
  1391. ro.snapshot = snap2;
  1392. }
  1393. db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
  1394. s.data(), false);
  1395. ASSERT_EQ(values.size(), keys.size());
  1396. ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
  1397. ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
  1398. ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
  1399. // four kv pairs * two bytes per value
  1400. ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
  1401. ASSERT_TRUE(s[0].IsNotFound());
  1402. ASSERT_OK(s[1]);
  1403. ASSERT_TRUE(s[2].IsNotFound());
  1404. ASSERT_OK(s[3]);
  1405. ASSERT_OK(s[4]);
  1406. // Call MultiGet() again with some intersection with the previous set of
  1407. // keys. Those should already be in the row cache.
  1408. keys.assign({"no_key", "k5", "k3", "k2"});
  1409. for (size_t i = 0; i < keys.size(); ++i) {
  1410. values[i].Reset();
  1411. s[i] = Status::OK();
  1412. }
  1413. get_perf_context()->Reset();
  1414. if (use_snapshots) {
  1415. ro.snapshot = snap1;
  1416. }
  1417. db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
  1418. values.data(), s.data(), false);
  1419. ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
  1420. ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
  1421. ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
  1422. // four kv pairs * two bytes per value
  1423. ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
  1424. ASSERT_TRUE(s[0].IsNotFound());
  1425. ASSERT_OK(s[1]);
  1426. ASSERT_OK(s[2]);
  1427. ASSERT_OK(s[3]);
  1428. if (use_snapshots) {
  1429. // Only reads from the first SST file would have been cached, since
  1430. // snapshot seq no is > fd.largest_seqno
  1431. ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
  1432. } else {
  1433. ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
  1434. }
  1435. SetPerfLevel(kDisable);
  1436. dbfull()->ReleaseSnapshot(snap1);
  1437. dbfull()->ReleaseSnapshot(snap2);
  1438. } while (ChangeCompactOptions());
  1439. }
  1440. INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
  1441. testing::Values(true, false));
  1442. TEST_F(DBBasicTest, GetAllKeyVersions) {
  1443. Options options = CurrentOptions();
  1444. options.env = env_;
  1445. options.create_if_missing = true;
  1446. options.disable_auto_compactions = true;
  1447. CreateAndReopenWithCF({"pikachu"}, options);
  1448. ASSERT_EQ(2, handles_.size());
  1449. const size_t kNumInserts = 4;
  1450. const size_t kNumDeletes = 4;
  1451. const size_t kNumUpdates = 4;
  1452. // Check default column family
  1453. for (size_t i = 0; i != kNumInserts; ++i) {
  1454. ASSERT_OK(Put(std::to_string(i), "value"));
  1455. }
  1456. for (size_t i = 0; i != kNumUpdates; ++i) {
  1457. ASSERT_OK(Put(std::to_string(i), "value1"));
  1458. }
  1459. for (size_t i = 0; i != kNumDeletes; ++i) {
  1460. ASSERT_OK(Delete(std::to_string(i)));
  1461. }
  1462. std::vector<KeyVersion> key_versions;
  1463. ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
  1464. db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
  1465. &key_versions));
  1466. ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
  1467. ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
  1468. db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
  1469. &key_versions));
  1470. ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
  1471. // Check non-default column family
  1472. for (size_t i = 0; i != kNumInserts - 1; ++i) {
  1473. ASSERT_OK(Put(1, std::to_string(i), "value"));
  1474. }
  1475. for (size_t i = 0; i != kNumUpdates - 1; ++i) {
  1476. ASSERT_OK(Put(1, std::to_string(i), "value1"));
  1477. }
  1478. for (size_t i = 0; i != kNumDeletes - 1; ++i) {
  1479. ASSERT_OK(Delete(1, std::to_string(i)));
  1480. }
  1481. ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
  1482. db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
  1483. &key_versions));
  1484. ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
  1485. }
  1486. #endif // !ROCKSDB_LITE
  1487. TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
  1488. Options options = CurrentOptions();
  1489. Random rnd(301);
  1490. BlockBasedTableOptions table_options;
  1491. table_options.pin_l0_filter_and_index_blocks_in_cache = true;
  1492. table_options.block_size = 16 * 1024;
  1493. assert(table_options.block_size >
  1494. BlockBasedTable::kMultiGetReadStackBufSize);
  1495. options.table_factory.reset(new BlockBasedTableFactory(table_options));
  1496. Reopen(options);
  1497. std::string zero_str(128, '\0');
  1498. for (int i = 0; i < 100; ++i) {
  1499. // Make the value compressible. A purely random string doesn't compress
  1500. // and the resultant data block will not be compressed
  1501. std::string value(RandomString(&rnd, 128) + zero_str);
  1502. assert(Put(Key(i), value) == Status::OK());
  1503. }
  1504. Flush();
  1505. std::vector<std::string> key_data(10);
  1506. std::vector<Slice> keys;
  1507. // We cannot resize a PinnableSlice vector, so just set initial size to
  1508. // largest we think we will need
  1509. std::vector<PinnableSlice> values(10);
  1510. std::vector<Status> statuses;
  1511. ReadOptions ro;
  1512. // Warm up the cache first
  1513. key_data.emplace_back(Key(0));
  1514. keys.emplace_back(Slice(key_data.back()));
  1515. key_data.emplace_back(Key(50));
  1516. keys.emplace_back(Slice(key_data.back()));
  1517. statuses.resize(keys.size());
  1518. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1519. keys.data(), values.data(), statuses.data(), true);
  1520. }
  1521. class DBBasicTestWithParallelIO
  1522. : public DBTestBase,
  1523. public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
  1524. public:
  1525. DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
  1526. bool compressed_cache = std::get<0>(GetParam());
  1527. bool uncompressed_cache = std::get<1>(GetParam());
  1528. compression_enabled_ = std::get<2>(GetParam());
  1529. fill_cache_ = std::get<3>(GetParam());
  1530. if (compressed_cache) {
  1531. std::shared_ptr<Cache> cache = NewLRUCache(1048576);
  1532. compressed_cache_ = std::make_shared<MyBlockCache>(cache);
  1533. }
  1534. if (uncompressed_cache) {
  1535. std::shared_ptr<Cache> cache = NewLRUCache(1048576);
  1536. uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
  1537. }
  1538. env_->count_random_reads_ = true;
  1539. Options options = CurrentOptions();
  1540. Random rnd(301);
  1541. BlockBasedTableOptions table_options;
  1542. #ifndef ROCKSDB_LITE
  1543. if (compression_enabled_) {
  1544. std::vector<CompressionType> compression_types;
  1545. compression_types = GetSupportedCompressions();
  1546. // Not every platform may have compression libraries available, so
  1547. // dynamically pick based on what's available
  1548. if (compression_types.size() == 0) {
  1549. compression_enabled_ = false;
  1550. } else {
  1551. options.compression = compression_types[0];
  1552. }
  1553. }
  1554. #else
  1555. // GetSupportedCompressions() is not available in LITE build
  1556. if (!Snappy_Supported()) {
  1557. compression_enabled_ = false;
  1558. }
  1559. #endif //ROCKSDB_LITE
  1560. table_options.block_cache = uncompressed_cache_;
  1561. if (table_options.block_cache == nullptr) {
  1562. table_options.no_block_cache = true;
  1563. } else {
  1564. table_options.pin_l0_filter_and_index_blocks_in_cache = true;
  1565. }
  1566. table_options.block_cache_compressed = compressed_cache_;
  1567. table_options.flush_block_policy_factory.reset(
  1568. new MyFlushBlockPolicyFactory());
  1569. options.table_factory.reset(new BlockBasedTableFactory(table_options));
  1570. if (!compression_enabled_) {
  1571. options.compression = kNoCompression;
  1572. }
  1573. Reopen(options);
  1574. std::string zero_str(128, '\0');
  1575. for (int i = 0; i < 100; ++i) {
  1576. // Make the value compressible. A purely random string doesn't compress
  1577. // and the resultant data block will not be compressed
  1578. values_.emplace_back(RandomString(&rnd, 128) + zero_str);
  1579. assert(Put(Key(i), values_[i]) == Status::OK());
  1580. }
  1581. Flush();
  1582. for (int i = 0; i < 100; ++i) {
  1583. // block cannot gain space by compression
  1584. uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
  1585. std::string tmp_key = "a" + Key(i);
  1586. assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
  1587. }
  1588. Flush();
  1589. }
  1590. bool CheckValue(int i, const std::string& value) {
  1591. if (values_[i].compare(value) == 0) {
  1592. return true;
  1593. }
  1594. return false;
  1595. }
  1596. bool CheckUncompressableValue(int i, const std::string& value) {
  1597. if (uncompressable_values_[i].compare(value) == 0) {
  1598. return true;
  1599. }
  1600. return false;
  1601. }
  1602. int num_lookups() { return uncompressed_cache_->num_lookups(); }
  1603. int num_found() { return uncompressed_cache_->num_found(); }
  1604. int num_inserts() { return uncompressed_cache_->num_inserts(); }
  1605. int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
  1606. int num_found_compressed() { return compressed_cache_->num_found(); }
  1607. int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
  1608. bool fill_cache() { return fill_cache_; }
  1609. bool compression_enabled() { return compression_enabled_; }
  1610. bool has_compressed_cache() { return compressed_cache_ != nullptr; }
  1611. bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
  1612. static void SetUpTestCase() {}
  1613. static void TearDownTestCase() {}
  1614. private:
  1615. class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
  1616. public:
  1617. MyFlushBlockPolicyFactory() {}
  1618. virtual const char* Name() const override {
  1619. return "MyFlushBlockPolicyFactory";
  1620. }
  1621. virtual FlushBlockPolicy* NewFlushBlockPolicy(
  1622. const BlockBasedTableOptions& /*table_options*/,
  1623. const BlockBuilder& data_block_builder) const override {
  1624. return new MyFlushBlockPolicy(data_block_builder);
  1625. }
  1626. };
  1627. class MyFlushBlockPolicy : public FlushBlockPolicy {
  1628. public:
  1629. explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
  1630. : num_keys_(0), data_block_builder_(data_block_builder) {}
  1631. bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
  1632. if (data_block_builder_.empty()) {
  1633. // First key in this block
  1634. num_keys_ = 1;
  1635. return false;
  1636. }
  1637. // Flush every 10 keys
  1638. if (num_keys_ == 10) {
  1639. num_keys_ = 1;
  1640. return true;
  1641. }
  1642. num_keys_++;
  1643. return false;
  1644. }
  1645. private:
  1646. int num_keys_;
  1647. const BlockBuilder& data_block_builder_;
  1648. };
  1649. class MyBlockCache : public Cache {
  1650. public:
  1651. explicit MyBlockCache(std::shared_ptr<Cache>& target)
  1652. : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
  1653. virtual const char* Name() const override { return "MyBlockCache"; }
  1654. virtual Status Insert(const Slice& key, void* value, size_t charge,
  1655. void (*deleter)(const Slice& key, void* value),
  1656. Handle** handle = nullptr,
  1657. Priority priority = Priority::LOW) override {
  1658. num_inserts_++;
  1659. return target_->Insert(key, value, charge, deleter, handle, priority);
  1660. }
  1661. virtual Handle* Lookup(const Slice& key,
  1662. Statistics* stats = nullptr) override {
  1663. num_lookups_++;
  1664. Handle* handle = target_->Lookup(key, stats);
  1665. if (handle != nullptr) {
  1666. num_found_++;
  1667. }
  1668. return handle;
  1669. }
  1670. virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
  1671. virtual bool Release(Handle* handle, bool force_erase = false) override {
  1672. return target_->Release(handle, force_erase);
  1673. }
  1674. virtual void* Value(Handle* handle) override {
  1675. return target_->Value(handle);
  1676. }
  1677. virtual void Erase(const Slice& key) override { target_->Erase(key); }
  1678. virtual uint64_t NewId() override { return target_->NewId(); }
  1679. virtual void SetCapacity(size_t capacity) override {
  1680. target_->SetCapacity(capacity);
  1681. }
  1682. virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
  1683. target_->SetStrictCapacityLimit(strict_capacity_limit);
  1684. }
  1685. virtual bool HasStrictCapacityLimit() const override {
  1686. return target_->HasStrictCapacityLimit();
  1687. }
  1688. virtual size_t GetCapacity() const override {
  1689. return target_->GetCapacity();
  1690. }
  1691. virtual size_t GetUsage() const override { return target_->GetUsage(); }
  1692. virtual size_t GetUsage(Handle* handle) const override {
  1693. return target_->GetUsage(handle);
  1694. }
  1695. virtual size_t GetPinnedUsage() const override {
  1696. return target_->GetPinnedUsage();
  1697. }
  1698. virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
  1699. virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
  1700. bool thread_safe) override {
  1701. return target_->ApplyToAllCacheEntries(callback, thread_safe);
  1702. }
  1703. virtual void EraseUnRefEntries() override {
  1704. return target_->EraseUnRefEntries();
  1705. }
  1706. int num_lookups() { return num_lookups_; }
  1707. int num_found() { return num_found_; }
  1708. int num_inserts() { return num_inserts_; }
  1709. private:
  1710. std::shared_ptr<Cache> target_;
  1711. int num_lookups_;
  1712. int num_found_;
  1713. int num_inserts_;
  1714. };
  1715. std::shared_ptr<MyBlockCache> compressed_cache_;
  1716. std::shared_ptr<MyBlockCache> uncompressed_cache_;
  1717. bool compression_enabled_;
  1718. std::vector<std::string> values_;
  1719. std::vector<std::string> uncompressable_values_;
  1720. bool fill_cache_;
  1721. };
  1722. TEST_P(DBBasicTestWithParallelIO, MultiGet) {
  1723. std::vector<std::string> key_data(10);
  1724. std::vector<Slice> keys;
  1725. // We cannot resize a PinnableSlice vector, so just set initial size to
  1726. // largest we think we will need
  1727. std::vector<PinnableSlice> values(10);
  1728. std::vector<Status> statuses;
  1729. ReadOptions ro;
  1730. ro.fill_cache = fill_cache();
  1731. // Warm up the cache first
  1732. key_data.emplace_back(Key(0));
  1733. keys.emplace_back(Slice(key_data.back()));
  1734. key_data.emplace_back(Key(50));
  1735. keys.emplace_back(Slice(key_data.back()));
  1736. statuses.resize(keys.size());
  1737. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1738. keys.data(), values.data(), statuses.data(), true);
  1739. ASSERT_TRUE(CheckValue(0, values[0].ToString()));
  1740. ASSERT_TRUE(CheckValue(50, values[1].ToString()));
  1741. int random_reads = env_->random_read_counter_.Read();
  1742. key_data[0] = Key(1);
  1743. key_data[1] = Key(51);
  1744. keys[0] = Slice(key_data[0]);
  1745. keys[1] = Slice(key_data[1]);
  1746. values[0].Reset();
  1747. values[1].Reset();
  1748. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1749. keys.data(), values.data(), statuses.data(), true);
  1750. ASSERT_TRUE(CheckValue(1, values[0].ToString()));
  1751. ASSERT_TRUE(CheckValue(51, values[1].ToString()));
  1752. bool read_from_cache = false;
  1753. if (fill_cache()) {
  1754. if (has_uncompressed_cache()) {
  1755. read_from_cache = true;
  1756. } else if (has_compressed_cache() && compression_enabled()) {
  1757. read_from_cache = true;
  1758. }
  1759. }
  1760. int expected_reads = random_reads + (read_from_cache ? 0 : 2);
  1761. ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
  1762. keys.resize(10);
  1763. statuses.resize(10);
  1764. std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
  1765. for (size_t i = 0; i < key_ints.size(); ++i) {
  1766. key_data[i] = Key(key_ints[i]);
  1767. keys[i] = Slice(key_data[i]);
  1768. statuses[i] = Status::OK();
  1769. values[i].Reset();
  1770. }
  1771. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1772. keys.data(), values.data(), statuses.data(), true);
  1773. for (size_t i = 0; i < key_ints.size(); ++i) {
  1774. ASSERT_OK(statuses[i]);
  1775. ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
  1776. }
  1777. if (compression_enabled() && !has_compressed_cache()) {
  1778. expected_reads += (read_from_cache ? 2 : 3);
  1779. } else {
  1780. expected_reads += (read_from_cache ? 2 : 4);
  1781. }
  1782. ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
  1783. keys.resize(10);
  1784. statuses.resize(10);
  1785. std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
  1786. for (size_t i = 0; i < key_uncmp.size(); ++i) {
  1787. key_data[i] = "a" + Key(key_uncmp[i]);
  1788. keys[i] = Slice(key_data[i]);
  1789. statuses[i] = Status::OK();
  1790. values[i].Reset();
  1791. }
  1792. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1793. keys.data(), values.data(), statuses.data(), true);
  1794. for (size_t i = 0; i < key_uncmp.size(); ++i) {
  1795. ASSERT_OK(statuses[i]);
  1796. ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
  1797. }
  1798. if (compression_enabled() && !has_compressed_cache()) {
  1799. expected_reads += (read_from_cache ? 3 : 3);
  1800. } else {
  1801. expected_reads += (read_from_cache ? 4 : 4);
  1802. }
  1803. ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
  1804. keys.resize(5);
  1805. statuses.resize(5);
  1806. std::vector<int> key_tr{1, 2, 15, 16, 55};
  1807. for (size_t i = 0; i < key_tr.size(); ++i) {
  1808. key_data[i] = "a" + Key(key_tr[i]);
  1809. keys[i] = Slice(key_data[i]);
  1810. statuses[i] = Status::OK();
  1811. values[i].Reset();
  1812. }
  1813. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1814. keys.data(), values.data(), statuses.data(), true);
  1815. for (size_t i = 0; i < key_tr.size(); ++i) {
  1816. ASSERT_OK(statuses[i]);
  1817. ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
  1818. }
  1819. if (compression_enabled() && !has_compressed_cache()) {
  1820. expected_reads += (read_from_cache ? 0 : 2);
  1821. ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
  1822. } else {
  1823. if (has_uncompressed_cache()) {
  1824. expected_reads += (read_from_cache ? 0 : 3);
  1825. ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
  1826. } else {
  1827. // A rare case, even we enable the block compression but some of data
  1828. // blocks are not compressed due to content. If user only enable the
  1829. // compressed cache, the uncompressed blocks will not tbe cached, and
  1830. // block reads will be triggered. The number of reads is related to
  1831. // the compression algorithm.
  1832. ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
  1833. }
  1834. }
  1835. }
  1836. TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
  1837. std::vector<std::string> key_data(10);
  1838. std::vector<Slice> keys;
  1839. // We cannot resize a PinnableSlice vector, so just set initial size to
  1840. // largest we think we will need
  1841. std::vector<PinnableSlice> values(10);
  1842. std::vector<Status> statuses;
  1843. int read_count = 0;
  1844. ReadOptions ro;
  1845. ro.fill_cache = fill_cache();
  1846. SyncPoint::GetInstance()->SetCallBack(
  1847. "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
  1848. Status* s = static_cast<Status*>(status);
  1849. read_count++;
  1850. if (read_count == 2) {
  1851. *s = Status::Corruption();
  1852. }
  1853. });
  1854. SyncPoint::GetInstance()->EnableProcessing();
  1855. // Warm up the cache first
  1856. key_data.emplace_back(Key(0));
  1857. keys.emplace_back(Slice(key_data.back()));
  1858. key_data.emplace_back(Key(50));
  1859. keys.emplace_back(Slice(key_data.back()));
  1860. statuses.resize(keys.size());
  1861. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1862. keys.data(), values.data(), statuses.data(), true);
  1863. ASSERT_TRUE(CheckValue(0, values[0].ToString()));
  1864. //ASSERT_TRUE(CheckValue(50, values[1].ToString()));
  1865. ASSERT_EQ(statuses[0], Status::OK());
  1866. ASSERT_EQ(statuses[1], Status::Corruption());
  1867. SyncPoint::GetInstance()->DisableProcessing();
  1868. }
  1869. TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
  1870. std::vector<std::string> key_data(10);
  1871. std::vector<Slice> keys;
  1872. // We cannot resize a PinnableSlice vector, so just set initial size to
  1873. // largest we think we will need
  1874. std::vector<PinnableSlice> values(10);
  1875. std::vector<Status> statuses;
  1876. ReadOptions ro;
  1877. ro.fill_cache = fill_cache();
  1878. SyncPoint::GetInstance()->SetCallBack(
  1879. "TableCache::MultiGet:FindTable", [&](void *status) {
  1880. Status* s = static_cast<Status*>(status);
  1881. *s = Status::IOError();
  1882. });
  1883. // DB open will create table readers unless we reduce the table cache
  1884. // capacity.
  1885. // SanitizeOptions will set max_open_files to minimum of 20. Table cache
  1886. // is allocated with max_open_files - 10 as capacity. So override
  1887. // max_open_files to 11 so table cache capacity will become 1. This will
  1888. // prevent file open during DB open and force the file to be opened
  1889. // during MultiGet
  1890. SyncPoint::GetInstance()->SetCallBack(
  1891. "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
  1892. int* max_open_files = (int*)arg;
  1893. *max_open_files = 11;
  1894. });
  1895. SyncPoint::GetInstance()->EnableProcessing();
  1896. Reopen(CurrentOptions());
  1897. // Warm up the cache first
  1898. key_data.emplace_back(Key(0));
  1899. keys.emplace_back(Slice(key_data.back()));
  1900. key_data.emplace_back(Key(50));
  1901. keys.emplace_back(Slice(key_data.back()));
  1902. statuses.resize(keys.size());
  1903. dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
  1904. keys.data(), values.data(), statuses.data(), true);
  1905. ASSERT_EQ(statuses[0], Status::IOError());
  1906. ASSERT_EQ(statuses[1], Status::IOError());
  1907. SyncPoint::GetInstance()->DisableProcessing();
  1908. }
  1909. INSTANTIATE_TEST_CASE_P(
  1910. ParallelIO, DBBasicTestWithParallelIO,
  1911. // Params are as follows -
  1912. // Param 0 - Compressed cache enabled
  1913. // Param 1 - Uncompressed cache enabled
  1914. // Param 2 - Data compression enabled
  1915. // Param 3 - ReadOptions::fill_cache
  1916. ::testing::Combine(::testing::Bool(), ::testing::Bool(),
  1917. ::testing::Bool(), ::testing::Bool()));
  1918. class DBBasicTestWithTimestampBase : public DBTestBase {
  1919. public:
  1920. explicit DBBasicTestWithTimestampBase(const std::string& dbname)
  1921. : DBTestBase(dbname) {}
  1922. protected:
  1923. class TestComparatorBase : public Comparator {
  1924. public:
  1925. explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
  1926. const char* Name() const override { return "TestComparator"; }
  1927. void FindShortSuccessor(std::string*) const override {}
  1928. void FindShortestSeparator(std::string*, const Slice&) const override {}
  1929. int Compare(const Slice& a, const Slice& b) const override {
  1930. int r = CompareWithoutTimestamp(a, b);
  1931. if (r != 0 || 0 == timestamp_size()) {
  1932. return r;
  1933. }
  1934. return CompareTimestamp(
  1935. Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
  1936. Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
  1937. }
  1938. virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
  1939. int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
  1940. assert(a.size() >= timestamp_size());
  1941. assert(b.size() >= timestamp_size());
  1942. Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
  1943. Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
  1944. return CompareImpl(k1, k2);
  1945. }
  1946. int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
  1947. if (!ts1.data() && !ts2.data()) {
  1948. return 0;
  1949. } else if (ts1.data() && !ts2.data()) {
  1950. return 1;
  1951. } else if (!ts1.data() && ts2.data()) {
  1952. return -1;
  1953. }
  1954. assert(ts1.size() == ts2.size());
  1955. uint64_t low1 = 0;
  1956. uint64_t low2 = 0;
  1957. uint64_t high1 = 0;
  1958. uint64_t high2 = 0;
  1959. auto* ptr1 = const_cast<Slice*>(&ts1);
  1960. auto* ptr2 = const_cast<Slice*>(&ts2);
  1961. if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
  1962. !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
  1963. assert(false);
  1964. }
  1965. if (high1 < high2) {
  1966. return 1;
  1967. } else if (high1 > high2) {
  1968. return -1;
  1969. }
  1970. if (low1 < low2) {
  1971. return 1;
  1972. } else if (low1 > low2) {
  1973. return -1;
  1974. }
  1975. return 0;
  1976. }
  1977. };
  1978. Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
  1979. assert(nullptr != ts);
  1980. ts->clear();
  1981. PutFixed64(ts, low);
  1982. PutFixed64(ts, high);
  1983. assert(ts->size() == sizeof(low) + sizeof(high));
  1984. return Slice(*ts);
  1985. }
  1986. };
  1987. class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
  1988. public:
  1989. DBBasicTestWithTimestamp()
  1990. : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
  1991. protected:
  1992. class TestComparator : public TestComparatorBase {
  1993. public:
  1994. const int kKeyPrefixLength =
  1995. 3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
  1996. explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
  1997. int CompareImpl(const Slice& a, const Slice& b) const override {
  1998. int n1 = atoi(
  1999. std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
  2000. .c_str());
  2001. int n2 = atoi(
  2002. std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
  2003. .c_str());
  2004. return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
  2005. }
  2006. };
  2007. };
  2008. #ifndef ROCKSDB_LITE
  2009. // A class which remembers the name of each flushed file.
  2010. class FlushedFileCollector : public EventListener {
  2011. public:
  2012. FlushedFileCollector() {}
  2013. ~FlushedFileCollector() override {}
  2014. void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
  2015. InstrumentedMutexLock lock(&mutex_);
  2016. flushed_files_.push_back(info.file_path);
  2017. }
  2018. std::vector<std::string> GetFlushedFiles() {
  2019. std::vector<std::string> result;
  2020. {
  2021. InstrumentedMutexLock lock(&mutex_);
  2022. result = flushed_files_;
  2023. }
  2024. return result;
  2025. }
  2026. void ClearFlushedFiles() {
  2027. InstrumentedMutexLock lock(&mutex_);
  2028. flushed_files_.clear();
  2029. }
  2030. private:
  2031. std::vector<std::string> flushed_files_;
  2032. InstrumentedMutex mutex_;
  2033. };
  2034. TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
  2035. const int kNumKeysPerFile = 8192;
  2036. const size_t kNumTimestamps = 2;
  2037. const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
  2038. const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
  2039. Options options = CurrentOptions();
  2040. options.create_if_missing = true;
  2041. options.env = env_;
  2042. options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
  2043. FlushedFileCollector* collector = new FlushedFileCollector();
  2044. options.listeners.emplace_back(collector);
  2045. std::string tmp;
  2046. size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
  2047. TestComparator test_cmp(ts_sz);
  2048. options.comparator = &test_cmp;
  2049. BlockBasedTableOptions bbto;
  2050. bbto.filter_policy.reset(NewBloomFilterPolicy(
  2051. 10 /*bits_per_key*/, false /*use_block_based_builder*/));
  2052. bbto.whole_key_filtering = true;
  2053. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  2054. DestroyAndReopen(options);
  2055. CreateAndReopenWithCF({"pikachu"}, options);
  2056. size_t num_cfs = handles_.size();
  2057. ASSERT_EQ(2, num_cfs);
  2058. std::vector<std::string> write_ts_strs(kNumTimestamps);
  2059. std::vector<std::string> read_ts_strs(kNumTimestamps);
  2060. std::vector<Slice> write_ts_list;
  2061. std::vector<Slice> read_ts_list;
  2062. for (size_t i = 0; i != kNumTimestamps; ++i) {
  2063. write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
  2064. read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
  2065. const Slice& write_ts = write_ts_list.back();
  2066. WriteOptions wopts;
  2067. wopts.timestamp = &write_ts;
  2068. for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
  2069. for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
  2070. ASSERT_OK(Put(cf, "key" + std::to_string(j),
  2071. "value_" + std::to_string(j) + "_" + std::to_string(i),
  2072. wopts));
  2073. if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
  2074. // flush all keys with the same timestamp to two sst files, split at
  2075. // incremental positions such that lowerlevel[1].smallest.userkey ==
  2076. // higherlevel[0].largest.userkey
  2077. ASSERT_OK(Flush(cf));
  2078. // compact files (2 at each level) to a lower level such that all keys
  2079. // with the same timestamp is at one level, with newer versions at
  2080. // higher levels.
  2081. CompactionOptions compact_opt;
  2082. compact_opt.compression = kNoCompression;
  2083. db_->CompactFiles(compact_opt, handles_[cf],
  2084. collector->GetFlushedFiles(),
  2085. static_cast<int>(kNumTimestamps - i));
  2086. collector->ClearFlushedFiles();
  2087. }
  2088. }
  2089. }
  2090. }
  2091. const auto& verify_db_func = [&]() {
  2092. for (size_t i = 0; i != kNumTimestamps; ++i) {
  2093. ReadOptions ropts;
  2094. ropts.timestamp = &read_ts_list[i];
  2095. for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
  2096. ColumnFamilyHandle* cfh = handles_[cf];
  2097. for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
  2098. std::string value;
  2099. ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
  2100. ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
  2101. value);
  2102. }
  2103. }
  2104. }
  2105. };
  2106. verify_db_func();
  2107. }
  2108. #endif // !ROCKSDB_LITE
  2109. class DBBasicTestWithTimestampWithParam
  2110. : public DBBasicTestWithTimestampBase,
  2111. public testing::WithParamInterface<bool> {
  2112. public:
  2113. DBBasicTestWithTimestampWithParam()
  2114. : DBBasicTestWithTimestampBase(
  2115. "/db_basic_test_with_timestamp_with_param") {}
  2116. protected:
  2117. class TestComparator : public TestComparatorBase {
  2118. private:
  2119. const Comparator* cmp_without_ts_;
  2120. public:
  2121. explicit TestComparator(size_t ts_sz)
  2122. : TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
  2123. cmp_without_ts_ = BytewiseComparator();
  2124. }
  2125. int CompareImpl(const Slice& a, const Slice& b) const override {
  2126. return cmp_without_ts_->Compare(a, b);
  2127. }
  2128. };
  2129. };
  2130. TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
  2131. const int kNumKeysPerFile = 8192;
  2132. const size_t kNumTimestamps = 6;
  2133. bool memtable_only = GetParam();
  2134. Options options = CurrentOptions();
  2135. options.create_if_missing = true;
  2136. options.env = env_;
  2137. options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
  2138. std::string tmp;
  2139. size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
  2140. TestComparator test_cmp(ts_sz);
  2141. options.comparator = &test_cmp;
  2142. BlockBasedTableOptions bbto;
  2143. bbto.filter_policy.reset(NewBloomFilterPolicy(
  2144. 10 /*bits_per_key*/, false /*use_block_based_builder*/));
  2145. bbto.whole_key_filtering = true;
  2146. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  2147. std::vector<CompressionType> compression_types;
  2148. compression_types.push_back(kNoCompression);
  2149. if (Zlib_Supported()) {
  2150. compression_types.push_back(kZlibCompression);
  2151. }
  2152. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  2153. compression_types.push_back(kLZ4Compression);
  2154. compression_types.push_back(kLZ4HCCompression);
  2155. #endif // LZ4_VERSION_NUMBER >= 10400
  2156. if (ZSTD_Supported()) {
  2157. compression_types.push_back(kZSTD);
  2158. }
  2159. // Switch compression dictionary on/off to check key extraction
  2160. // correctness in kBuffered state
  2161. std::vector<uint32_t> max_dict_bytes_list = {0, 1 << 14}; // 0 or 16KB
  2162. for (auto compression_type : compression_types) {
  2163. for (uint32_t max_dict_bytes : max_dict_bytes_list) {
  2164. options.compression = compression_type;
  2165. options.compression_opts.max_dict_bytes = max_dict_bytes;
  2166. if (compression_type == kZSTD) {
  2167. options.compression_opts.zstd_max_train_bytes = max_dict_bytes;
  2168. }
  2169. options.target_file_size_base = 1 << 26; // 64MB
  2170. DestroyAndReopen(options);
  2171. CreateAndReopenWithCF({"pikachu"}, options);
  2172. size_t num_cfs = handles_.size();
  2173. ASSERT_EQ(2, num_cfs);
  2174. std::vector<std::string> write_ts_strs(kNumTimestamps);
  2175. std::vector<std::string> read_ts_strs(kNumTimestamps);
  2176. std::vector<Slice> write_ts_list;
  2177. std::vector<Slice> read_ts_list;
  2178. for (size_t i = 0; i != kNumTimestamps; ++i) {
  2179. write_ts_list.emplace_back(
  2180. EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
  2181. read_ts_list.emplace_back(
  2182. EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
  2183. const Slice& write_ts = write_ts_list.back();
  2184. WriteOptions wopts;
  2185. wopts.timestamp = &write_ts;
  2186. for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
  2187. for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
  2188. ASSERT_OK(Put(
  2189. cf, "key" + std::to_string(j),
  2190. "value_" + std::to_string(j) + "_" + std::to_string(i), wopts));
  2191. }
  2192. if (!memtable_only) {
  2193. ASSERT_OK(Flush(cf));
  2194. }
  2195. }
  2196. }
  2197. const auto& verify_db_func = [&]() {
  2198. for (size_t i = 0; i != kNumTimestamps; ++i) {
  2199. ReadOptions ropts;
  2200. ropts.timestamp = &read_ts_list[i];
  2201. for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
  2202. ColumnFamilyHandle* cfh = handles_[cf];
  2203. for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps;
  2204. ++j) {
  2205. std::string value;
  2206. ASSERT_OK(
  2207. db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
  2208. ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
  2209. value);
  2210. }
  2211. }
  2212. }
  2213. };
  2214. verify_db_func();
  2215. }
  2216. }
  2217. }
  2218. INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
  2219. ::testing::Bool());
  2220. } // namespace ROCKSDB_NAMESPACE
  2221. #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
  2222. extern "C" {
  2223. void RegisterCustomObjects(int argc, char** argv);
  2224. }
  2225. #else
  2226. void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
  2227. #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
  2228. int main(int argc, char** argv) {
  2229. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2230. ::testing::InitGoogleTest(&argc, argv);
  2231. RegisterCustomObjects(argc, argv);
  2232. return RUN_ALL_TESTS();
  2233. }