db_secondary_test.cc 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl_secondary.h"
  10. #include "db/db_test_util.h"
  11. #include "port/stack_trace.h"
  12. #include "test_util/fault_injection_test_env.h"
  13. #include "test_util/sync_point.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. #ifndef ROCKSDB_LITE
  16. class DBSecondaryTest : public DBTestBase {
  17. public:
  18. DBSecondaryTest()
  19. : DBTestBase("/db_secondary_test"),
  20. secondary_path_(),
  21. handles_secondary_(),
  22. db_secondary_(nullptr) {
  23. secondary_path_ =
  24. test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
  25. }
  26. ~DBSecondaryTest() override {
  27. CloseSecondary();
  28. if (getenv("KEEP_DB") != nullptr) {
  29. fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
  30. } else {
  31. Options options;
  32. options.env = env_;
  33. EXPECT_OK(DestroyDB(secondary_path_, options));
  34. }
  35. }
  36. protected:
  37. Status ReopenAsSecondary(const Options& options) {
  38. return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
  39. }
  40. void OpenSecondary(const Options& options);
  41. void OpenSecondaryWithColumnFamilies(
  42. const std::vector<std::string>& column_families, const Options& options);
  43. void CloseSecondary() {
  44. for (auto h : handles_secondary_) {
  45. db_secondary_->DestroyColumnFamilyHandle(h);
  46. }
  47. handles_secondary_.clear();
  48. delete db_secondary_;
  49. db_secondary_ = nullptr;
  50. }
  51. DBImplSecondary* db_secondary_full() {
  52. return static_cast<DBImplSecondary*>(db_secondary_);
  53. }
  54. void CheckFileTypeCounts(const std::string& dir, int expected_log,
  55. int expected_sst, int expected_manifest) const;
  56. std::string secondary_path_;
  57. std::vector<ColumnFamilyHandle*> handles_secondary_;
  58. DB* db_secondary_;
  59. };
  60. void DBSecondaryTest::OpenSecondary(const Options& options) {
  61. Status s =
  62. DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
  63. ASSERT_OK(s);
  64. }
  65. void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
  66. const std::vector<std::string>& column_families, const Options& options) {
  67. std::vector<ColumnFamilyDescriptor> cf_descs;
  68. cf_descs.emplace_back(kDefaultColumnFamilyName, options);
  69. for (const auto& cf_name : column_families) {
  70. cf_descs.emplace_back(cf_name, options);
  71. }
  72. Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
  73. &handles_secondary_, &db_secondary_);
  74. ASSERT_OK(s);
  75. }
  76. void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
  77. int expected_log, int expected_sst,
  78. int expected_manifest) const {
  79. std::vector<std::string> filenames;
  80. env_->GetChildren(dir, &filenames);
  81. int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
  82. for (auto file : filenames) {
  83. uint64_t number;
  84. FileType type;
  85. if (ParseFileName(file, &number, &type)) {
  86. log_cnt += (type == kLogFile);
  87. sst_cnt += (type == kTableFile);
  88. manifest_cnt += (type == kDescriptorFile);
  89. }
  90. }
  91. ASSERT_EQ(expected_log, log_cnt);
  92. ASSERT_EQ(expected_sst, sst_cnt);
  93. ASSERT_EQ(expected_manifest, manifest_cnt);
  94. }
  95. TEST_F(DBSecondaryTest, ReopenAsSecondary) {
  96. Options options;
  97. options.env = env_;
  98. Reopen(options);
  99. ASSERT_OK(Put("foo", "foo_value"));
  100. ASSERT_OK(Put("bar", "bar_value"));
  101. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  102. Close();
  103. ASSERT_OK(ReopenAsSecondary(options));
  104. ASSERT_EQ("foo_value", Get("foo"));
  105. ASSERT_EQ("bar_value", Get("bar"));
  106. ReadOptions ropts;
  107. ropts.verify_checksums = true;
  108. auto db1 = static_cast<DBImplSecondary*>(db_);
  109. ASSERT_NE(nullptr, db1);
  110. Iterator* iter = db1->NewIterator(ropts);
  111. ASSERT_NE(nullptr, iter);
  112. size_t count = 0;
  113. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  114. if (0 == count) {
  115. ASSERT_EQ("bar", iter->key().ToString());
  116. ASSERT_EQ("bar_value", iter->value().ToString());
  117. } else if (1 == count) {
  118. ASSERT_EQ("foo", iter->key().ToString());
  119. ASSERT_EQ("foo_value", iter->value().ToString());
  120. }
  121. ++count;
  122. }
  123. delete iter;
  124. ASSERT_EQ(2, count);
  125. }
  126. TEST_F(DBSecondaryTest, OpenAsSecondary) {
  127. Options options;
  128. options.env = env_;
  129. options.level0_file_num_compaction_trigger = 4;
  130. Reopen(options);
  131. for (int i = 0; i < 3; ++i) {
  132. ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
  133. ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
  134. ASSERT_OK(Flush());
  135. }
  136. Options options1;
  137. options1.env = env_;
  138. options1.max_open_files = -1;
  139. OpenSecondary(options1);
  140. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  141. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  142. ReadOptions ropts;
  143. ropts.verify_checksums = true;
  144. const auto verify_db_func = [&](const std::string& foo_val,
  145. const std::string& bar_val) {
  146. std::string value;
  147. ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
  148. ASSERT_EQ(foo_val, value);
  149. ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
  150. ASSERT_EQ(bar_val, value);
  151. Iterator* iter = db_secondary_->NewIterator(ropts);
  152. ASSERT_NE(nullptr, iter);
  153. iter->Seek("foo");
  154. ASSERT_TRUE(iter->Valid());
  155. ASSERT_EQ("foo", iter->key().ToString());
  156. ASSERT_EQ(foo_val, iter->value().ToString());
  157. iter->Seek("bar");
  158. ASSERT_TRUE(iter->Valid());
  159. ASSERT_EQ("bar", iter->key().ToString());
  160. ASSERT_EQ(bar_val, iter->value().ToString());
  161. size_t count = 0;
  162. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  163. ++count;
  164. }
  165. ASSERT_EQ(2, count);
  166. delete iter;
  167. };
  168. verify_db_func("foo_value2", "bar_value2");
  169. ASSERT_OK(Put("foo", "new_foo_value"));
  170. ASSERT_OK(Put("bar", "new_bar_value"));
  171. ASSERT_OK(Flush());
  172. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  173. verify_db_func("new_foo_value", "new_bar_value");
  174. }
  175. namespace {
  176. class TraceFileEnv : public EnvWrapper {
  177. public:
  178. explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
  179. Status NewRandomAccessFile(const std::string& f,
  180. std::unique_ptr<RandomAccessFile>* r,
  181. const EnvOptions& env_options) override {
  182. class TracedRandomAccessFile : public RandomAccessFile {
  183. public:
  184. TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
  185. std::atomic<int>& counter)
  186. : target_(std::move(target)), files_closed_(counter) {}
  187. ~TracedRandomAccessFile() override {
  188. files_closed_.fetch_add(1, std::memory_order_relaxed);
  189. }
  190. Status Read(uint64_t offset, size_t n, Slice* result,
  191. char* scratch) const override {
  192. return target_->Read(offset, n, result, scratch);
  193. }
  194. private:
  195. std::unique_ptr<RandomAccessFile> target_;
  196. std::atomic<int>& files_closed_;
  197. };
  198. Status s = target()->NewRandomAccessFile(f, r, env_options);
  199. if (s.ok()) {
  200. r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
  201. }
  202. return s;
  203. }
  204. int files_closed() const {
  205. return files_closed_.load(std::memory_order_relaxed);
  206. }
  207. private:
  208. std::atomic<int> files_closed_{0};
  209. };
  210. } // namespace
  211. TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
  212. Options options;
  213. options.env = env_;
  214. options.max_open_files = 1;
  215. options.disable_auto_compactions = true;
  216. Reopen(options);
  217. Options options1;
  218. std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
  219. options1.env = traced_env.get();
  220. OpenSecondary(options1);
  221. static const auto verify_db = [&]() {
  222. std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
  223. std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
  224. for (iter1->SeekToFirst(), iter2->SeekToFirst();
  225. iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
  226. ASSERT_EQ(iter1->key(), iter2->key());
  227. ASSERT_EQ(iter1->value(), iter2->value());
  228. }
  229. ASSERT_FALSE(iter1->Valid());
  230. ASSERT_FALSE(iter2->Valid());
  231. };
  232. ASSERT_OK(Put("a", "value"));
  233. ASSERT_OK(Put("c", "value"));
  234. ASSERT_OK(Flush());
  235. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  236. verify_db();
  237. ASSERT_OK(Put("b", "value"));
  238. ASSERT_OK(Put("d", "value"));
  239. ASSERT_OK(Flush());
  240. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  241. verify_db();
  242. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  243. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  244. ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
  245. Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
  246. ASSERT_TRUE(s.IsNotSupported());
  247. CloseSecondary();
  248. }
  249. TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
  250. Options options;
  251. options.env = env_;
  252. options.level0_file_num_compaction_trigger = 4;
  253. Reopen(options);
  254. for (int i = 0; i < 3; ++i) {
  255. ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
  256. ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
  257. }
  258. Options options1;
  259. options1.env = env_;
  260. options1.max_open_files = -1;
  261. OpenSecondary(options1);
  262. ReadOptions ropts;
  263. ropts.verify_checksums = true;
  264. const auto verify_db_func = [&](const std::string& foo_val,
  265. const std::string& bar_val) {
  266. std::string value;
  267. ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
  268. ASSERT_EQ(foo_val, value);
  269. ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
  270. ASSERT_EQ(bar_val, value);
  271. Iterator* iter = db_secondary_->NewIterator(ropts);
  272. ASSERT_NE(nullptr, iter);
  273. iter->Seek("foo");
  274. ASSERT_TRUE(iter->Valid());
  275. ASSERT_EQ("foo", iter->key().ToString());
  276. ASSERT_EQ(foo_val, iter->value().ToString());
  277. iter->Seek("bar");
  278. ASSERT_TRUE(iter->Valid());
  279. ASSERT_EQ("bar", iter->key().ToString());
  280. ASSERT_EQ(bar_val, iter->value().ToString());
  281. size_t count = 0;
  282. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  283. ++count;
  284. }
  285. ASSERT_EQ(2, count);
  286. delete iter;
  287. };
  288. verify_db_func("foo_value2", "bar_value2");
  289. ASSERT_OK(Put("foo", "new_foo_value"));
  290. ASSERT_OK(Put("bar", "new_bar_value"));
  291. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  292. verify_db_func("new_foo_value", "new_bar_value");
  293. ASSERT_OK(Flush());
  294. ASSERT_OK(Put("foo", "new_foo_value_1"));
  295. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  296. verify_db_func("new_foo_value_1", "new_bar_value");
  297. }
  298. TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
  299. Options options;
  300. options.env = env_;
  301. CreateAndReopenWithCF({"pikachu"}, options);
  302. Options options1;
  303. options1.env = env_;
  304. options1.max_open_files = -1;
  305. std::vector<ColumnFamilyDescriptor> cf_descs;
  306. cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
  307. cf_descs.emplace_back("pikachu", options1);
  308. cf_descs.emplace_back("eevee", options1);
  309. Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
  310. &handles_secondary_, &db_secondary_);
  311. ASSERT_NOK(s);
  312. }
  313. TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
  314. Options options;
  315. options.env = env_;
  316. CreateAndReopenWithCF({"pikachu"}, options);
  317. Options options1;
  318. options1.env = env_;
  319. options1.max_open_files = -1;
  320. OpenSecondary(options1);
  321. ASSERT_EQ(0, handles_secondary_.size());
  322. ASSERT_NE(nullptr, db_secondary_);
  323. ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
  324. ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
  325. ASSERT_OK(Flush(0 /*cf*/));
  326. ASSERT_OK(Flush(1 /*cf*/));
  327. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  328. ReadOptions ropts;
  329. ropts.verify_checksums = true;
  330. std::string value;
  331. ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
  332. ASSERT_EQ("foo_value", value);
  333. }
  334. TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
  335. Options options;
  336. options.env = env_;
  337. Reopen(options);
  338. Close();
  339. SyncPoint::GetInstance()->DisableProcessing();
  340. SyncPoint::GetInstance()->ClearAllCallBacks();
  341. SyncPoint::GetInstance()->LoadDependency(
  342. {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
  343. "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
  344. {"VersionSet::ProcessManifestWrites:AfterNewManifest",
  345. "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
  346. "1"}});
  347. SyncPoint::GetInstance()->EnableProcessing();
  348. // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
  349. // which causes the db to switch to a new MANIFEST upon start.
  350. port::Thread ro_db_thread([&]() {
  351. Options options1;
  352. options1.env = env_;
  353. options1.max_open_files = -1;
  354. OpenSecondary(options1);
  355. CloseSecondary();
  356. });
  357. Reopen(options);
  358. ro_db_thread.join();
  359. }
  360. TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
  361. Options options;
  362. options.env = env_;
  363. options.level0_file_num_compaction_trigger = 4;
  364. Reopen(options);
  365. for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
  366. ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
  367. ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
  368. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  369. }
  370. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  371. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  372. Options options1;
  373. options1.env = env_;
  374. options1.max_open_files = -1;
  375. OpenSecondary(options1);
  376. ReadOptions ropts;
  377. ropts.verify_checksums = true;
  378. std::string value;
  379. ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
  380. ASSERT_EQ("foo_value" +
  381. std::to_string(options.level0_file_num_compaction_trigger - 1),
  382. value);
  383. ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
  384. ASSERT_EQ("bar_value" +
  385. std::to_string(options.level0_file_num_compaction_trigger - 1),
  386. value);
  387. Iterator* iter = db_secondary_->NewIterator(ropts);
  388. ASSERT_NE(nullptr, iter);
  389. iter->Seek("bar");
  390. ASSERT_TRUE(iter->Valid());
  391. ASSERT_EQ("bar", iter->key().ToString());
  392. ASSERT_EQ("bar_value" +
  393. std::to_string(options.level0_file_num_compaction_trigger - 1),
  394. iter->value().ToString());
  395. iter->Seek("foo");
  396. ASSERT_TRUE(iter->Valid());
  397. ASSERT_EQ("foo", iter->key().ToString());
  398. ASSERT_EQ("foo_value" +
  399. std::to_string(options.level0_file_num_compaction_trigger - 1),
  400. iter->value().ToString());
  401. size_t count = 0;
  402. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  403. ++count;
  404. }
  405. ASSERT_EQ(2, count);
  406. delete iter;
  407. }
  408. TEST_F(DBSecondaryTest, MissingTableFile) {
  409. int table_files_not_exist = 0;
  410. SyncPoint::GetInstance()->DisableProcessing();
  411. SyncPoint::GetInstance()->ClearAllCallBacks();
  412. SyncPoint::GetInstance()->SetCallBack(
  413. "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
  414. [&](void* arg) {
  415. Status s = *reinterpret_cast<Status*>(arg);
  416. if (s.IsPathNotFound()) {
  417. ++table_files_not_exist;
  418. } else if (!s.ok()) {
  419. assert(false); // Should not reach here
  420. }
  421. });
  422. SyncPoint::GetInstance()->EnableProcessing();
  423. Options options;
  424. options.env = env_;
  425. options.level0_file_num_compaction_trigger = 4;
  426. Reopen(options);
  427. Options options1;
  428. options1.env = env_;
  429. options1.max_open_files = -1;
  430. OpenSecondary(options1);
  431. for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
  432. ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
  433. ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
  434. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  435. }
  436. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  437. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  438. ASSERT_NE(nullptr, db_secondary_full());
  439. ReadOptions ropts;
  440. ropts.verify_checksums = true;
  441. std::string value;
  442. ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
  443. ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
  444. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  445. ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
  446. ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
  447. ASSERT_EQ("foo_value" +
  448. std::to_string(options.level0_file_num_compaction_trigger - 1),
  449. value);
  450. ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
  451. ASSERT_EQ("bar_value" +
  452. std::to_string(options.level0_file_num_compaction_trigger - 1),
  453. value);
  454. Iterator* iter = db_secondary_->NewIterator(ropts);
  455. ASSERT_NE(nullptr, iter);
  456. iter->Seek("bar");
  457. ASSERT_TRUE(iter->Valid());
  458. ASSERT_EQ("bar", iter->key().ToString());
  459. ASSERT_EQ("bar_value" +
  460. std::to_string(options.level0_file_num_compaction_trigger - 1),
  461. iter->value().ToString());
  462. iter->Seek("foo");
  463. ASSERT_TRUE(iter->Valid());
  464. ASSERT_EQ("foo", iter->key().ToString());
  465. ASSERT_EQ("foo_value" +
  466. std::to_string(options.level0_file_num_compaction_trigger - 1),
  467. iter->value().ToString());
  468. size_t count = 0;
  469. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  470. ++count;
  471. }
  472. ASSERT_EQ(2, count);
  473. delete iter;
  474. }
  475. TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
  476. Options options;
  477. options.env = env_;
  478. const std::string kCfName1 = "pikachu";
  479. CreateAndReopenWithCF({kCfName1}, options);
  480. Options options1;
  481. options1.env = env_;
  482. options1.max_open_files = -1;
  483. OpenSecondaryWithColumnFamilies({kCfName1}, options1);
  484. ASSERT_EQ(2, handles_secondary_.size());
  485. ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
  486. ASSERT_OK(Flush(1 /*cf*/));
  487. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  488. ReadOptions ropts;
  489. ropts.verify_checksums = true;
  490. std::string value;
  491. ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
  492. ASSERT_EQ("foo_val_1", value);
  493. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  494. Close();
  495. CheckFileTypeCounts(dbname_, 1, 0, 1);
  496. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  497. value.clear();
  498. ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
  499. ASSERT_EQ("foo_val_1", value);
  500. }
  501. TEST_F(DBSecondaryTest, SwitchManifest) {
  502. Options options;
  503. options.env = env_;
  504. options.level0_file_num_compaction_trigger = 4;
  505. Reopen(options);
  506. Options options1;
  507. options1.env = env_;
  508. options1.max_open_files = -1;
  509. OpenSecondary(options1);
  510. const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
  511. // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
  512. // ..., 9.
  513. const int kNumKeys = 10;
  514. // Create two sst
  515. for (int i = 0; i != kNumFiles; ++i) {
  516. for (int j = 0; j != kNumKeys; ++j) {
  517. ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
  518. }
  519. ASSERT_OK(Flush());
  520. }
  521. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  522. const auto& range_scan_db = [&]() {
  523. ReadOptions tmp_ropts;
  524. tmp_ropts.total_order_seek = true;
  525. tmp_ropts.verify_checksums = true;
  526. std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
  527. int cnt = 0;
  528. for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
  529. ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
  530. ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
  531. iter->value().ToString());
  532. }
  533. };
  534. range_scan_db();
  535. // While secondary instance still keeps old MANIFEST open, we close primary,
  536. // restart primary, performs full compaction, close again, restart again so
  537. // that next time secondary tries to catch up with primary, the secondary
  538. // will skip the MANIFEST in middle.
  539. Reopen(options);
  540. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  541. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  542. Reopen(options);
  543. ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
  544. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  545. range_scan_db();
  546. }
  547. // Here, "Snapshot" refers to the version edits written by
  548. // VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
  549. // switching from the old one.
  550. TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
  551. Options options;
  552. options.env = env_;
  553. options.disable_auto_compactions = true;
  554. Reopen(options);
  555. Options options1;
  556. options1.env = env_;
  557. options1.max_open_files = -1;
  558. OpenSecondary(options1);
  559. ASSERT_OK(Put("0", "value0"));
  560. ASSERT_OK(Flush());
  561. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  562. std::string value;
  563. ReadOptions ropts;
  564. ropts.verify_checksums = true;
  565. ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
  566. ASSERT_EQ("value0", value);
  567. Reopen(options);
  568. ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
  569. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  570. }
  571. TEST_F(DBSecondaryTest, SwitchWAL) {
  572. const int kNumKeysPerMemtable = 1;
  573. Options options;
  574. options.env = env_;
  575. options.max_write_buffer_number = 4;
  576. options.min_write_buffer_number_to_merge = 2;
  577. options.memtable_factory.reset(
  578. new SpecialSkipListFactory(kNumKeysPerMemtable));
  579. Reopen(options);
  580. Options options1;
  581. options1.env = env_;
  582. options1.max_open_files = -1;
  583. OpenSecondary(options1);
  584. const auto& verify_db = [](DB* db1, DB* db2) {
  585. ASSERT_NE(nullptr, db1);
  586. ASSERT_NE(nullptr, db2);
  587. ReadOptions read_opts;
  588. read_opts.verify_checksums = true;
  589. std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts));
  590. std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts));
  591. it1->SeekToFirst();
  592. it2->SeekToFirst();
  593. for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
  594. ASSERT_EQ(it1->key(), it2->key());
  595. ASSERT_EQ(it1->value(), it2->value());
  596. }
  597. ASSERT_FALSE(it1->Valid());
  598. ASSERT_FALSE(it2->Valid());
  599. for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
  600. std::string value;
  601. ASSERT_OK(db2->Get(read_opts, it1->key(), &value));
  602. ASSERT_EQ(it1->value(), value);
  603. }
  604. for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
  605. std::string value;
  606. ASSERT_OK(db1->Get(read_opts, it2->key(), &value));
  607. ASSERT_EQ(it2->value(), value);
  608. }
  609. };
  610. for (int k = 0; k != 16; ++k) {
  611. ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k)));
  612. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  613. verify_db(dbfull(), db_secondary_);
  614. }
  615. }
  616. TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
  617. const int kNumKeysPerMemtable = 1;
  618. SyncPoint::GetInstance()->DisableProcessing();
  619. SyncPoint::GetInstance()->LoadDependency(
  620. {{"DBImpl::BackgroundCallFlush:ContextCleanedUp",
  621. "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}});
  622. SyncPoint::GetInstance()->EnableProcessing();
  623. const std::string kCFName1 = "pikachu";
  624. Options options;
  625. options.env = env_;
  626. options.max_write_buffer_number = 4;
  627. options.min_write_buffer_number_to_merge = 2;
  628. options.memtable_factory.reset(
  629. new SpecialSkipListFactory(kNumKeysPerMemtable));
  630. CreateAndReopenWithCF({kCFName1}, options);
  631. Options options1;
  632. options1.env = env_;
  633. options1.max_open_files = -1;
  634. OpenSecondaryWithColumnFamilies({kCFName1}, options1);
  635. ASSERT_EQ(2, handles_secondary_.size());
  636. const auto& verify_db = [](DB* db1,
  637. const std::vector<ColumnFamilyHandle*>& handles1,
  638. DB* db2,
  639. const std::vector<ColumnFamilyHandle*>& handles2) {
  640. ASSERT_NE(nullptr, db1);
  641. ASSERT_NE(nullptr, db2);
  642. ReadOptions read_opts;
  643. read_opts.verify_checksums = true;
  644. ASSERT_EQ(handles1.size(), handles2.size());
  645. for (size_t i = 0; i != handles1.size(); ++i) {
  646. std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i]));
  647. std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i]));
  648. it1->SeekToFirst();
  649. it2->SeekToFirst();
  650. for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
  651. ASSERT_EQ(it1->key(), it2->key());
  652. ASSERT_EQ(it1->value(), it2->value());
  653. }
  654. ASSERT_FALSE(it1->Valid());
  655. ASSERT_FALSE(it2->Valid());
  656. for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
  657. std::string value;
  658. ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value));
  659. ASSERT_EQ(it1->value(), value);
  660. }
  661. for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
  662. std::string value;
  663. ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value));
  664. ASSERT_EQ(it2->value(), value);
  665. }
  666. }
  667. };
  668. for (int k = 0; k != 8; ++k) {
  669. ASSERT_OK(
  670. Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
  671. ASSERT_OK(
  672. Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
  673. TEST_SYNC_POINT(
  674. "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
  675. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  676. verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
  677. SyncPoint::GetInstance()->ClearTrace();
  678. }
  679. }
  680. TEST_F(DBSecondaryTest, CatchUpAfterFlush) {
  681. const int kNumKeysPerMemtable = 16;
  682. Options options;
  683. options.env = env_;
  684. options.max_write_buffer_number = 4;
  685. options.min_write_buffer_number_to_merge = 2;
  686. options.memtable_factory.reset(
  687. new SpecialSkipListFactory(kNumKeysPerMemtable));
  688. Reopen(options);
  689. Options options1;
  690. options1.env = env_;
  691. options1.max_open_files = -1;
  692. OpenSecondary(options1);
  693. WriteOptions write_opts;
  694. WriteBatch wb;
  695. wb.Put("key0", "value0");
  696. wb.Put("key1", "value1");
  697. ASSERT_OK(dbfull()->Write(write_opts, &wb));
  698. ReadOptions read_opts;
  699. std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts));
  700. iter1->Seek("key0");
  701. ASSERT_FALSE(iter1->Valid());
  702. iter1->Seek("key1");
  703. ASSERT_FALSE(iter1->Valid());
  704. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  705. iter1->Seek("key0");
  706. ASSERT_FALSE(iter1->Valid());
  707. iter1->Seek("key1");
  708. ASSERT_FALSE(iter1->Valid());
  709. std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts));
  710. iter2->Seek("key0");
  711. ASSERT_TRUE(iter2->Valid());
  712. ASSERT_EQ("value0", iter2->value());
  713. iter2->Seek("key1");
  714. ASSERT_TRUE(iter2->Valid());
  715. ASSERT_EQ("value1", iter2->value());
  716. {
  717. WriteBatch wb1;
  718. wb1.Put("key0", "value01");
  719. wb1.Put("key1", "value11");
  720. ASSERT_OK(dbfull()->Write(write_opts, &wb1));
  721. }
  722. {
  723. WriteBatch wb2;
  724. wb2.Put("key0", "new_value0");
  725. wb2.Delete("key1");
  726. ASSERT_OK(dbfull()->Write(write_opts, &wb2));
  727. }
  728. ASSERT_OK(Flush());
  729. ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
  730. std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts));
  731. // iter3 should not see value01 and value11 at all.
  732. iter3->Seek("key0");
  733. ASSERT_TRUE(iter3->Valid());
  734. ASSERT_EQ("new_value0", iter3->value());
  735. iter3->Seek("key1");
  736. ASSERT_FALSE(iter3->Valid());
  737. }
  738. TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
  739. bool called = false;
  740. Options options;
  741. options.env = env_;
  742. options.disable_auto_compactions = true;
  743. Reopen(options);
  744. SyncPoint::GetInstance()->DisableProcessing();
  745. SyncPoint::GetInstance()->ClearAllCallBacks();
  746. SyncPoint::GetInstance()->SetCallBack(
  747. "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) {
  748. ASSERT_NE(nullptr, arg);
  749. called = true;
  750. auto* s = reinterpret_cast<Status*>(arg);
  751. ASSERT_NOK(*s);
  752. });
  753. SyncPoint::GetInstance()->LoadDependency(
  754. {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData",
  755. "BackgroundCallCompaction:0"},
  756. {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
  757. "DBImpl::CheckConsistency:BeforeGetFileSize"}});
  758. SyncPoint::GetInstance()->EnableProcessing();
  759. ASSERT_OK(Put("a", "value0"));
  760. ASSERT_OK(Put("c", "value0"));
  761. ASSERT_OK(Flush());
  762. ASSERT_OK(Put("b", "value1"));
  763. ASSERT_OK(Put("d", "value1"));
  764. ASSERT_OK(Flush());
  765. port::Thread thread([this]() {
  766. Options opts;
  767. opts.env = env_;
  768. opts.max_open_files = -1;
  769. OpenSecondary(opts);
  770. });
  771. ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  772. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  773. thread.join();
  774. ASSERT_TRUE(called);
  775. }
  776. #endif //! ROCKSDB_LITE
  777. } // namespace ROCKSDB_NAMESPACE
  778. int main(int argc, char** argv) {
  779. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  780. ::testing::InitGoogleTest(&argc, argv);
  781. return RUN_ALL_TESTS();
  782. }