version_set_test.cc 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_set.h"
  10. #include "db/db_impl/db_impl.h"
  11. #include "db/log_writer.h"
  12. #include "logging/logging.h"
  13. #include "table/mock_table.h"
  14. #include "test_util/testharness.h"
  15. #include "test_util/testutil.h"
  16. #include "util/string_util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class GenerateLevelFilesBriefTest : public testing::Test {
  19. public:
  20. std::vector<FileMetaData*> files_;
  21. LevelFilesBrief file_level_;
  22. Arena arena_;
  23. GenerateLevelFilesBriefTest() { }
  24. ~GenerateLevelFilesBriefTest() override {
  25. for (size_t i = 0; i < files_.size(); i++) {
  26. delete files_[i];
  27. }
  28. }
  29. void Add(const char* smallest, const char* largest,
  30. SequenceNumber smallest_seq = 100,
  31. SequenceNumber largest_seq = 100) {
  32. FileMetaData* f = new FileMetaData(
  33. files_.size() + 1, 0, 0,
  34. InternalKey(smallest, smallest_seq, kTypeValue),
  35. InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
  36. largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
  37. kUnknownOldestAncesterTime, kUnknownFileCreationTime,
  38. kUnknownFileChecksum, kUnknownFileChecksumFuncName);
  39. files_.push_back(f);
  40. }
  41. int Compare() {
  42. int diff = 0;
  43. for (size_t i = 0; i < files_.size(); i++) {
  44. if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
  45. diff++;
  46. }
  47. }
  48. return diff;
  49. }
  50. };
  51. TEST_F(GenerateLevelFilesBriefTest, Empty) {
  52. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  53. ASSERT_EQ(0u, file_level_.num_files);
  54. ASSERT_EQ(0, Compare());
  55. }
  56. TEST_F(GenerateLevelFilesBriefTest, Single) {
  57. Add("p", "q");
  58. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  59. ASSERT_EQ(1u, file_level_.num_files);
  60. ASSERT_EQ(0, Compare());
  61. }
  62. TEST_F(GenerateLevelFilesBriefTest, Multiple) {
  63. Add("150", "200");
  64. Add("200", "250");
  65. Add("300", "350");
  66. Add("400", "450");
  67. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  68. ASSERT_EQ(4u, file_level_.num_files);
  69. ASSERT_EQ(0, Compare());
  70. }
  71. class CountingLogger : public Logger {
  72. public:
  73. CountingLogger() : log_count(0) {}
  74. using Logger::Logv;
  75. void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
  76. int log_count;
  77. };
  78. Options GetOptionsWithNumLevels(int num_levels,
  79. std::shared_ptr<CountingLogger> logger) {
  80. Options opt;
  81. opt.num_levels = num_levels;
  82. opt.info_log = logger;
  83. return opt;
  84. }
  85. class VersionStorageInfoTest : public testing::Test {
  86. public:
  87. const Comparator* ucmp_;
  88. InternalKeyComparator icmp_;
  89. std::shared_ptr<CountingLogger> logger_;
  90. Options options_;
  91. ImmutableCFOptions ioptions_;
  92. MutableCFOptions mutable_cf_options_;
  93. VersionStorageInfo vstorage_;
  94. InternalKey GetInternalKey(const char* ukey,
  95. SequenceNumber smallest_seq = 100) {
  96. return InternalKey(ukey, smallest_seq, kTypeValue);
  97. }
  98. VersionStorageInfoTest()
  99. : ucmp_(BytewiseComparator()),
  100. icmp_(ucmp_),
  101. logger_(new CountingLogger()),
  102. options_(GetOptionsWithNumLevels(6, logger_)),
  103. ioptions_(options_),
  104. mutable_cf_options_(options_),
  105. vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
  106. ~VersionStorageInfoTest() override {
  107. for (int i = 0; i < vstorage_.num_levels(); i++) {
  108. for (auto* f : vstorage_.LevelFiles(i)) {
  109. if (--f->refs == 0) {
  110. delete f;
  111. }
  112. }
  113. }
  114. }
  115. void Add(int level, uint32_t file_number, const char* smallest,
  116. const char* largest, uint64_t file_size = 0) {
  117. assert(level < vstorage_.num_levels());
  118. FileMetaData* f = new FileMetaData(
  119. file_number, 0, file_size, GetInternalKey(smallest, 0),
  120. GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
  121. /* marked_for_compact */ false, kInvalidBlobFileNumber,
  122. kUnknownOldestAncesterTime, kUnknownFileCreationTime,
  123. kUnknownFileChecksum, kUnknownFileChecksumFuncName);
  124. f->compensated_file_size = file_size;
  125. vstorage_.AddFile(level, f);
  126. }
  127. void Add(int level, uint32_t file_number, const InternalKey& smallest,
  128. const InternalKey& largest, uint64_t file_size = 0) {
  129. assert(level < vstorage_.num_levels());
  130. FileMetaData* f = new FileMetaData(
  131. file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
  132. /* largest_seq */ 0, /* marked_for_compact */ false,
  133. kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
  134. kUnknownFileCreationTime, kUnknownFileChecksum,
  135. kUnknownFileChecksumFuncName);
  136. f->compensated_file_size = file_size;
  137. vstorage_.AddFile(level, f);
  138. }
  139. std::string GetOverlappingFiles(int level, const InternalKey& begin,
  140. const InternalKey& end) {
  141. std::vector<FileMetaData*> inputs;
  142. vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
  143. std::string result;
  144. for (size_t i = 0; i < inputs.size(); ++i) {
  145. if (i > 0) {
  146. result += ",";
  147. }
  148. AppendNumberTo(&result, inputs[i]->fd.GetNumber());
  149. }
  150. return result;
  151. }
  152. };
  153. TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
  154. ioptions_.level_compaction_dynamic_level_bytes = false;
  155. mutable_cf_options_.max_bytes_for_level_base = 10;
  156. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  157. Add(4, 100U, "1", "2");
  158. Add(5, 101U, "1", "2");
  159. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  160. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
  161. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
  162. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
  163. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
  164. ASSERT_EQ(0, logger_->log_count);
  165. }
  166. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
  167. ioptions_.level_compaction_dynamic_level_bytes = true;
  168. mutable_cf_options_.max_bytes_for_level_base = 1000;
  169. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  170. Add(5, 1U, "1", "2", 500U);
  171. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  172. ASSERT_EQ(0, logger_->log_count);
  173. ASSERT_EQ(vstorage_.base_level(), 5);
  174. Add(5, 2U, "3", "4", 550U);
  175. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  176. ASSERT_EQ(0, logger_->log_count);
  177. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
  178. ASSERT_EQ(vstorage_.base_level(), 4);
  179. Add(4, 3U, "3", "4", 550U);
  180. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  181. ASSERT_EQ(0, logger_->log_count);
  182. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
  183. ASSERT_EQ(vstorage_.base_level(), 4);
  184. Add(3, 4U, "3", "4", 250U);
  185. Add(3, 5U, "5", "7", 300U);
  186. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  187. ASSERT_EQ(1, logger_->log_count);
  188. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
  189. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
  190. ASSERT_EQ(vstorage_.base_level(), 3);
  191. Add(1, 6U, "3", "4", 5U);
  192. Add(1, 7U, "8", "9", 5U);
  193. logger_->log_count = 0;
  194. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  195. ASSERT_EQ(1, logger_->log_count);
  196. ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
  197. ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
  198. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
  199. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
  200. ASSERT_EQ(vstorage_.base_level(), 1);
  201. }
  202. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
  203. ioptions_.level_compaction_dynamic_level_bytes = true;
  204. mutable_cf_options_.max_bytes_for_level_base = 100;
  205. mutable_cf_options_.max_bytes_for_level_multiplier = 2;
  206. Add(0, 1U, "1", "2", 50U);
  207. Add(1, 2U, "1", "2", 50U);
  208. Add(2, 3U, "1", "2", 500U);
  209. Add(3, 4U, "1", "2", 500U);
  210. Add(4, 5U, "1", "2", 1700U);
  211. Add(5, 6U, "1", "2", 500U);
  212. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  213. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
  214. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
  215. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
  216. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
  217. ASSERT_EQ(vstorage_.base_level(), 1);
  218. ASSERT_EQ(0, logger_->log_count);
  219. }
  220. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
  221. uint64_t kOneGB = 1000U * 1000U * 1000U;
  222. ioptions_.level_compaction_dynamic_level_bytes = true;
  223. mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
  224. mutable_cf_options_.max_bytes_for_level_multiplier = 10;
  225. Add(0, 1U, "1", "2", 50U);
  226. Add(3, 4U, "1", "2", 32U * kOneGB);
  227. Add(4, 5U, "1", "2", 500U * kOneGB);
  228. Add(5, 6U, "1", "2", 3000U * kOneGB);
  229. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  230. ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
  231. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
  232. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
  233. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
  234. ASSERT_EQ(vstorage_.base_level(), 2);
  235. ASSERT_EQ(0, logger_->log_count);
  236. }
  237. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
  238. ioptions_.level_compaction_dynamic_level_bytes = true;
  239. mutable_cf_options_.max_bytes_for_level_base = 40000;
  240. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  241. mutable_cf_options_.level0_file_num_compaction_trigger = 2;
  242. Add(0, 1U, "1", "2", 10000U);
  243. Add(0, 2U, "1", "2", 10000U);
  244. Add(0, 3U, "1", "2", 10000U);
  245. Add(5, 4U, "1", "2", 1286250U);
  246. Add(4, 5U, "1", "2", 200000U);
  247. Add(3, 6U, "1", "2", 40000U);
  248. Add(2, 7U, "1", "2", 8000U);
  249. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  250. ASSERT_EQ(0, logger_->log_count);
  251. ASSERT_EQ(2, vstorage_.base_level());
  252. // level multiplier should be 3.5
  253. ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
  254. // Level size should be around 30,000, 105,000, 367,500
  255. ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
  256. ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
  257. ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
  258. }
  259. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
  260. ioptions_.level_compaction_dynamic_level_bytes = true;
  261. mutable_cf_options_.max_bytes_for_level_base = 10000;
  262. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  263. mutable_cf_options_.level0_file_num_compaction_trigger = 2;
  264. Add(0, 11U, "1", "2", 10000U);
  265. Add(0, 12U, "1", "2", 10000U);
  266. Add(0, 13U, "1", "2", 10000U);
  267. Add(5, 4U, "1", "2", 1286250U);
  268. Add(4, 5U, "1", "2", 200000U);
  269. Add(3, 6U, "1", "2", 40000U);
  270. Add(2, 7U, "1", "2", 8000U);
  271. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  272. ASSERT_EQ(0, logger_->log_count);
  273. ASSERT_EQ(2, vstorage_.base_level());
  274. // level multiplier should be 3.5
  275. ASSERT_LT(vstorage_.level_multiplier(), 3.6);
  276. ASSERT_GT(vstorage_.level_multiplier(), 3.4);
  277. // Level size should be around 30,000, 105,000, 367,500
  278. ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
  279. ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
  280. ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
  281. ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
  282. ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
  283. }
  284. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
  285. ioptions_.level_compaction_dynamic_level_bytes = true;
  286. mutable_cf_options_.max_bytes_for_level_base = 10000;
  287. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  288. mutable_cf_options_.level0_file_num_compaction_trigger = 2;
  289. Add(0, 11U, "1", "2", 5000U);
  290. Add(0, 12U, "1", "2", 5000U);
  291. Add(0, 13U, "1", "2", 5000U);
  292. Add(0, 14U, "1", "2", 5000U);
  293. Add(0, 15U, "1", "2", 5000U);
  294. Add(0, 16U, "1", "2", 5000U);
  295. Add(5, 4U, "1", "2", 1286250U);
  296. Add(4, 5U, "1", "2", 200000U);
  297. Add(3, 6U, "1", "2", 40000U);
  298. Add(2, 7U, "1", "2", 8000U);
  299. vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
  300. ASSERT_EQ(0, logger_->log_count);
  301. ASSERT_EQ(2, vstorage_.base_level());
  302. // level multiplier should be 3.5
  303. ASSERT_LT(vstorage_.level_multiplier(), 3.6);
  304. ASSERT_GT(vstorage_.level_multiplier(), 3.4);
  305. // Level size should be around 30,000, 105,000, 367,500
  306. ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
  307. ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
  308. ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
  309. ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
  310. ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
  311. }
  312. TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
  313. // Test whether the overlaps are detected as expected
  314. Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
  315. Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
  316. Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
  317. Add(3, 4U, "1", "9", 1U); // Contains range of last level
  318. Add(4, 5U, "4", "5", 1U); // Inside range of last level
  319. Add(4, 5U, "6", "7", 1U); // Inside range of last level
  320. Add(5, 6U, "4", "7", 10U);
  321. ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
  322. }
  323. TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
  324. Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
  325. Add(0, 1U, "5", "6", 1U); // Ignored because of [5,6] in l1
  326. Add(1, 1U, "1", "2", 1U); // Ignored because of [2,3] in l2
  327. Add(1, 2U, "3", "4", 1U); // Ignored because of [2,3] in l2
  328. Add(1, 3U, "5", "6", 1U);
  329. Add(2, 4U, "2", "3", 1U);
  330. Add(3, 5U, "7", "8", 1U);
  331. ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
  332. }
  333. TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
  334. // Two files that overlap at the range deletion tombstone sentinel.
  335. Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
  336. Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
  337. // Two files that overlap at the same user key.
  338. Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
  339. Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
  340. // Two files that do not overlap.
  341. Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
  342. Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
  343. vstorage_.UpdateNumNonEmptyLevels();
  344. vstorage_.GenerateLevelFilesBrief();
  345. ASSERT_EQ("1,2", GetOverlappingFiles(
  346. 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
  347. ASSERT_EQ("1", GetOverlappingFiles(
  348. 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
  349. ASSERT_EQ("2", GetOverlappingFiles(
  350. 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
  351. ASSERT_EQ("3,4", GetOverlappingFiles(
  352. 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
  353. ASSERT_EQ("3", GetOverlappingFiles(
  354. 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
  355. ASSERT_EQ("3,4", GetOverlappingFiles(
  356. 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
  357. ASSERT_EQ("3,4", GetOverlappingFiles(
  358. 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
  359. ASSERT_EQ("5", GetOverlappingFiles(
  360. 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
  361. ASSERT_EQ("6", GetOverlappingFiles(
  362. 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
  363. }
  364. class FindLevelFileTest : public testing::Test {
  365. public:
  366. LevelFilesBrief file_level_;
  367. bool disjoint_sorted_files_;
  368. Arena arena_;
  369. FindLevelFileTest() : disjoint_sorted_files_(true) { }
  370. ~FindLevelFileTest() override {}
  371. void LevelFileInit(size_t num = 0) {
  372. char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
  373. file_level_.files = new (mem)FdWithKeyRange[num];
  374. file_level_.num_files = 0;
  375. }
  376. void Add(const char* smallest, const char* largest,
  377. SequenceNumber smallest_seq = 100,
  378. SequenceNumber largest_seq = 100) {
  379. InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
  380. InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
  381. Slice smallest_slice = smallest_key.Encode();
  382. Slice largest_slice = largest_key.Encode();
  383. char* mem = arena_.AllocateAligned(
  384. smallest_slice.size() + largest_slice.size());
  385. memcpy(mem, smallest_slice.data(), smallest_slice.size());
  386. memcpy(mem + smallest_slice.size(), largest_slice.data(),
  387. largest_slice.size());
  388. // add to file_level_
  389. size_t num = file_level_.num_files;
  390. auto& file = file_level_.files[num];
  391. file.fd = FileDescriptor(num + 1, 0, 0);
  392. file.smallest_key = Slice(mem, smallest_slice.size());
  393. file.largest_key = Slice(mem + smallest_slice.size(),
  394. largest_slice.size());
  395. file_level_.num_files++;
  396. }
  397. int Find(const char* key) {
  398. InternalKey target(key, 100, kTypeValue);
  399. InternalKeyComparator cmp(BytewiseComparator());
  400. return FindFile(cmp, file_level_, target.Encode());
  401. }
  402. bool Overlaps(const char* smallest, const char* largest) {
  403. InternalKeyComparator cmp(BytewiseComparator());
  404. Slice s(smallest != nullptr ? smallest : "");
  405. Slice l(largest != nullptr ? largest : "");
  406. return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
  407. (smallest != nullptr ? &s : nullptr),
  408. (largest != nullptr ? &l : nullptr));
  409. }
  410. };
  411. TEST_F(FindLevelFileTest, LevelEmpty) {
  412. LevelFileInit(0);
  413. ASSERT_EQ(0, Find("foo"));
  414. ASSERT_TRUE(! Overlaps("a", "z"));
  415. ASSERT_TRUE(! Overlaps(nullptr, "z"));
  416. ASSERT_TRUE(! Overlaps("a", nullptr));
  417. ASSERT_TRUE(! Overlaps(nullptr, nullptr));
  418. }
  419. TEST_F(FindLevelFileTest, LevelSingle) {
  420. LevelFileInit(1);
  421. Add("p", "q");
  422. ASSERT_EQ(0, Find("a"));
  423. ASSERT_EQ(0, Find("p"));
  424. ASSERT_EQ(0, Find("p1"));
  425. ASSERT_EQ(0, Find("q"));
  426. ASSERT_EQ(1, Find("q1"));
  427. ASSERT_EQ(1, Find("z"));
  428. ASSERT_TRUE(! Overlaps("a", "b"));
  429. ASSERT_TRUE(! Overlaps("z1", "z2"));
  430. ASSERT_TRUE(Overlaps("a", "p"));
  431. ASSERT_TRUE(Overlaps("a", "q"));
  432. ASSERT_TRUE(Overlaps("a", "z"));
  433. ASSERT_TRUE(Overlaps("p", "p1"));
  434. ASSERT_TRUE(Overlaps("p", "q"));
  435. ASSERT_TRUE(Overlaps("p", "z"));
  436. ASSERT_TRUE(Overlaps("p1", "p2"));
  437. ASSERT_TRUE(Overlaps("p1", "z"));
  438. ASSERT_TRUE(Overlaps("q", "q"));
  439. ASSERT_TRUE(Overlaps("q", "q1"));
  440. ASSERT_TRUE(! Overlaps(nullptr, "j"));
  441. ASSERT_TRUE(! Overlaps("r", nullptr));
  442. ASSERT_TRUE(Overlaps(nullptr, "p"));
  443. ASSERT_TRUE(Overlaps(nullptr, "p1"));
  444. ASSERT_TRUE(Overlaps("q", nullptr));
  445. ASSERT_TRUE(Overlaps(nullptr, nullptr));
  446. }
  447. TEST_F(FindLevelFileTest, LevelMultiple) {
  448. LevelFileInit(4);
  449. Add("150", "200");
  450. Add("200", "250");
  451. Add("300", "350");
  452. Add("400", "450");
  453. ASSERT_EQ(0, Find("100"));
  454. ASSERT_EQ(0, Find("150"));
  455. ASSERT_EQ(0, Find("151"));
  456. ASSERT_EQ(0, Find("199"));
  457. ASSERT_EQ(0, Find("200"));
  458. ASSERT_EQ(1, Find("201"));
  459. ASSERT_EQ(1, Find("249"));
  460. ASSERT_EQ(1, Find("250"));
  461. ASSERT_EQ(2, Find("251"));
  462. ASSERT_EQ(2, Find("299"));
  463. ASSERT_EQ(2, Find("300"));
  464. ASSERT_EQ(2, Find("349"));
  465. ASSERT_EQ(2, Find("350"));
  466. ASSERT_EQ(3, Find("351"));
  467. ASSERT_EQ(3, Find("400"));
  468. ASSERT_EQ(3, Find("450"));
  469. ASSERT_EQ(4, Find("451"));
  470. ASSERT_TRUE(! Overlaps("100", "149"));
  471. ASSERT_TRUE(! Overlaps("251", "299"));
  472. ASSERT_TRUE(! Overlaps("451", "500"));
  473. ASSERT_TRUE(! Overlaps("351", "399"));
  474. ASSERT_TRUE(Overlaps("100", "150"));
  475. ASSERT_TRUE(Overlaps("100", "200"));
  476. ASSERT_TRUE(Overlaps("100", "300"));
  477. ASSERT_TRUE(Overlaps("100", "400"));
  478. ASSERT_TRUE(Overlaps("100", "500"));
  479. ASSERT_TRUE(Overlaps("375", "400"));
  480. ASSERT_TRUE(Overlaps("450", "450"));
  481. ASSERT_TRUE(Overlaps("450", "500"));
  482. }
  483. TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
  484. LevelFileInit(4);
  485. Add("150", "200");
  486. Add("200", "250");
  487. Add("300", "350");
  488. Add("400", "450");
  489. ASSERT_TRUE(! Overlaps(nullptr, "149"));
  490. ASSERT_TRUE(! Overlaps("451", nullptr));
  491. ASSERT_TRUE(Overlaps(nullptr, nullptr));
  492. ASSERT_TRUE(Overlaps(nullptr, "150"));
  493. ASSERT_TRUE(Overlaps(nullptr, "199"));
  494. ASSERT_TRUE(Overlaps(nullptr, "200"));
  495. ASSERT_TRUE(Overlaps(nullptr, "201"));
  496. ASSERT_TRUE(Overlaps(nullptr, "400"));
  497. ASSERT_TRUE(Overlaps(nullptr, "800"));
  498. ASSERT_TRUE(Overlaps("100", nullptr));
  499. ASSERT_TRUE(Overlaps("200", nullptr));
  500. ASSERT_TRUE(Overlaps("449", nullptr));
  501. ASSERT_TRUE(Overlaps("450", nullptr));
  502. }
  503. TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
  504. LevelFileInit(1);
  505. Add("200", "200", 5000, 3000);
  506. ASSERT_TRUE(! Overlaps("199", "199"));
  507. ASSERT_TRUE(! Overlaps("201", "300"));
  508. ASSERT_TRUE(Overlaps("200", "200"));
  509. ASSERT_TRUE(Overlaps("190", "200"));
  510. ASSERT_TRUE(Overlaps("200", "210"));
  511. }
  512. TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
  513. LevelFileInit(2);
  514. Add("150", "600");
  515. Add("400", "500");
  516. disjoint_sorted_files_ = false;
  517. ASSERT_TRUE(! Overlaps("100", "149"));
  518. ASSERT_TRUE(! Overlaps("601", "700"));
  519. ASSERT_TRUE(Overlaps("100", "150"));
  520. ASSERT_TRUE(Overlaps("100", "200"));
  521. ASSERT_TRUE(Overlaps("100", "300"));
  522. ASSERT_TRUE(Overlaps("100", "400"));
  523. ASSERT_TRUE(Overlaps("100", "500"));
  524. ASSERT_TRUE(Overlaps("375", "400"));
  525. ASSERT_TRUE(Overlaps("450", "450"));
  526. ASSERT_TRUE(Overlaps("450", "500"));
  527. ASSERT_TRUE(Overlaps("450", "700"));
  528. ASSERT_TRUE(Overlaps("600", "700"));
  529. }
  530. class VersionSetTestBase {
  531. public:
  532. const static std::string kColumnFamilyName1;
  533. const static std::string kColumnFamilyName2;
  534. const static std::string kColumnFamilyName3;
  535. int num_initial_edits_;
  536. VersionSetTestBase()
  537. : env_(Env::Default()),
  538. fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
  539. dbname_(test::PerThreadDBPath("version_set_test")),
  540. db_options_(),
  541. mutable_cf_options_(cf_options_),
  542. table_cache_(NewLRUCache(50000, 16)),
  543. write_buffer_manager_(db_options_.db_write_buffer_size),
  544. shutting_down_(false),
  545. mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
  546. EXPECT_OK(env_->CreateDirIfMissing(dbname_));
  547. db_options_.env = env_;
  548. db_options_.fs = fs_;
  549. versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
  550. table_cache_.get(), &write_buffer_manager_,
  551. &write_controller_,
  552. /*block_cache_tracer=*/nullptr)),
  553. reactive_versions_ = std::make_shared<ReactiveVersionSet>(
  554. dbname_, &db_options_, env_options_, table_cache_.get(),
  555. &write_buffer_manager_, &write_controller_);
  556. db_options_.db_paths.emplace_back(dbname_,
  557. std::numeric_limits<uint64_t>::max());
  558. }
  559. void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
  560. SequenceNumber* last_seqno,
  561. std::unique_ptr<log::Writer>* log_writer) {
  562. assert(column_families != nullptr);
  563. assert(last_seqno != nullptr);
  564. assert(log_writer != nullptr);
  565. VersionEdit new_db;
  566. if (db_options_.write_dbid_to_manifest) {
  567. DBImpl* impl = new DBImpl(DBOptions(), dbname_);
  568. std::string db_id;
  569. impl->GetDbIdentityFromIdentityFile(&db_id);
  570. new_db.SetDBId(db_id);
  571. }
  572. new_db.SetLogNumber(0);
  573. new_db.SetNextFile(2);
  574. new_db.SetLastSequence(0);
  575. const std::vector<std::string> cf_names = {
  576. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  577. kColumnFamilyName3};
  578. const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
  579. autovector<VersionEdit> new_cfs;
  580. uint64_t last_seq = 1;
  581. uint32_t cf_id = 1;
  582. for (int i = 1; i != kInitialNumOfCfs; ++i) {
  583. VersionEdit new_cf;
  584. new_cf.AddColumnFamily(cf_names[i]);
  585. new_cf.SetColumnFamily(cf_id++);
  586. new_cf.SetLogNumber(0);
  587. new_cf.SetNextFile(2);
  588. new_cf.SetLastSequence(last_seq++);
  589. new_cfs.emplace_back(new_cf);
  590. }
  591. *last_seqno = last_seq;
  592. num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
  593. const std::string manifest = DescriptorFileName(dbname_, 1);
  594. std::unique_ptr<WritableFile> file;
  595. Status s = env_->NewWritableFile(
  596. manifest, &file, env_->OptimizeForManifestWrite(env_options_));
  597. ASSERT_OK(s);
  598. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  599. NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
  600. {
  601. log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
  602. std::string record;
  603. new_db.EncodeTo(&record);
  604. s = (*log_writer)->AddRecord(record);
  605. for (const auto& e : new_cfs) {
  606. record.clear();
  607. e.EncodeTo(&record);
  608. s = (*log_writer)->AddRecord(record);
  609. ASSERT_OK(s);
  610. }
  611. }
  612. ASSERT_OK(s);
  613. cf_options_.table_factory = mock_table_factory_;
  614. for (const auto& cf_name : cf_names) {
  615. column_families->emplace_back(cf_name, cf_options_);
  616. }
  617. }
  618. // Create DB with 3 column families.
  619. void NewDB() {
  620. std::vector<ColumnFamilyDescriptor> column_families;
  621. SequenceNumber last_seqno;
  622. std::unique_ptr<log::Writer> log_writer;
  623. SetIdentityFile(env_, dbname_);
  624. PrepareManifest(&column_families, &last_seqno, &log_writer);
  625. log_writer.reset();
  626. // Make "CURRENT" file point to the new manifest file.
  627. Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
  628. ASSERT_OK(s);
  629. EXPECT_OK(versions_->Recover(column_families, false));
  630. EXPECT_EQ(column_families.size(),
  631. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  632. }
  633. Env* env_;
  634. std::shared_ptr<FileSystem> fs_;
  635. const std::string dbname_;
  636. EnvOptions env_options_;
  637. ImmutableDBOptions db_options_;
  638. ColumnFamilyOptions cf_options_;
  639. MutableCFOptions mutable_cf_options_;
  640. std::shared_ptr<Cache> table_cache_;
  641. WriteController write_controller_;
  642. WriteBufferManager write_buffer_manager_;
  643. std::shared_ptr<VersionSet> versions_;
  644. std::shared_ptr<ReactiveVersionSet> reactive_versions_;
  645. InstrumentedMutex mutex_;
  646. std::atomic<bool> shutting_down_;
  647. std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
  648. };
  649. const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
  650. const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
  651. const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
  652. class VersionSetTest : public VersionSetTestBase, public testing::Test {
  653. public:
  654. VersionSetTest() : VersionSetTestBase() {}
  655. };
  656. TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
  657. NewDB();
  658. const int kGroupSize = 5;
  659. autovector<VersionEdit> edits;
  660. for (int i = 0; i != kGroupSize; ++i) {
  661. edits.emplace_back(VersionEdit());
  662. }
  663. autovector<ColumnFamilyData*> cfds;
  664. autovector<const MutableCFOptions*> all_mutable_cf_options;
  665. autovector<autovector<VersionEdit*>> edit_lists;
  666. for (int i = 0; i != kGroupSize; ++i) {
  667. cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
  668. all_mutable_cf_options.emplace_back(&mutable_cf_options_);
  669. autovector<VersionEdit*> edit_list;
  670. edit_list.emplace_back(&edits[i]);
  671. edit_lists.emplace_back(edit_list);
  672. }
  673. SyncPoint::GetInstance()->DisableProcessing();
  674. SyncPoint::GetInstance()->ClearAllCallBacks();
  675. int count = 0;
  676. SyncPoint::GetInstance()->SetCallBack(
  677. "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
  678. uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
  679. EXPECT_EQ(0u, *cf_id);
  680. ++count;
  681. });
  682. SyncPoint::GetInstance()->EnableProcessing();
  683. mutex_.Lock();
  684. Status s =
  685. versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
  686. mutex_.Unlock();
  687. EXPECT_OK(s);
  688. EXPECT_EQ(kGroupSize - 1, count);
  689. }
  690. class VersionSetAtomicGroupTest : public VersionSetTestBase,
  691. public testing::Test {
  692. public:
  693. VersionSetAtomicGroupTest() : VersionSetTestBase() {}
  694. void SetUp() override {
  695. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  696. SetupTestSyncPoints();
  697. }
  698. void SetupValidAtomicGroup(int atomic_group_size) {
  699. edits_.resize(atomic_group_size);
  700. int remaining = atomic_group_size;
  701. for (size_t i = 0; i != edits_.size(); ++i) {
  702. edits_[i].SetLogNumber(0);
  703. edits_[i].SetNextFile(2);
  704. edits_[i].MarkAtomicGroup(--remaining);
  705. edits_[i].SetLastSequence(last_seqno_++);
  706. }
  707. ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
  708. }
  709. void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
  710. edits_.resize(atomic_group_size);
  711. int remaining = atomic_group_size;
  712. for (size_t i = 0; i != edits_.size(); ++i) {
  713. edits_[i].SetLogNumber(0);
  714. edits_[i].SetNextFile(2);
  715. edits_[i].MarkAtomicGroup(--remaining);
  716. edits_[i].SetLastSequence(last_seqno_++);
  717. }
  718. ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
  719. }
  720. void SetupCorruptedAtomicGroup(int atomic_group_size) {
  721. edits_.resize(atomic_group_size);
  722. int remaining = atomic_group_size;
  723. for (size_t i = 0; i != edits_.size(); ++i) {
  724. edits_[i].SetLogNumber(0);
  725. edits_[i].SetNextFile(2);
  726. if (i != ((size_t)atomic_group_size / 2)) {
  727. edits_[i].MarkAtomicGroup(--remaining);
  728. }
  729. edits_[i].SetLastSequence(last_seqno_++);
  730. }
  731. ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
  732. }
  733. void SetupIncorrectAtomicGroup(int atomic_group_size) {
  734. edits_.resize(atomic_group_size);
  735. int remaining = atomic_group_size;
  736. for (size_t i = 0; i != edits_.size(); ++i) {
  737. edits_[i].SetLogNumber(0);
  738. edits_[i].SetNextFile(2);
  739. if (i != 1) {
  740. edits_[i].MarkAtomicGroup(--remaining);
  741. } else {
  742. edits_[i].MarkAtomicGroup(remaining--);
  743. }
  744. edits_[i].SetLastSequence(last_seqno_++);
  745. }
  746. ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
  747. }
  748. void SetupTestSyncPoints() {
  749. SyncPoint::GetInstance()->DisableProcessing();
  750. SyncPoint::GetInstance()->ClearAllCallBacks();
  751. SyncPoint::GetInstance()->SetCallBack(
  752. "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
  753. VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
  754. EXPECT_EQ(edits_.front().DebugString(),
  755. e->DebugString()); // compare based on value
  756. first_in_atomic_group_ = true;
  757. });
  758. SyncPoint::GetInstance()->SetCallBack(
  759. "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
  760. VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
  761. EXPECT_EQ(edits_.back().DebugString(),
  762. e->DebugString()); // compare based on value
  763. EXPECT_TRUE(first_in_atomic_group_);
  764. last_in_atomic_group_ = true;
  765. });
  766. SyncPoint::GetInstance()->SetCallBack(
  767. "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
  768. num_recovered_edits_ = *reinterpret_cast<int*>(arg);
  769. });
  770. SyncPoint::GetInstance()->SetCallBack(
  771. "ReactiveVersionSet::ReadAndApply:AppliedEdits",
  772. [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
  773. SyncPoint::GetInstance()->SetCallBack(
  774. "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
  775. [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
  776. SyncPoint::GetInstance()->SetCallBack(
  777. "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
  778. [&](void* arg) {
  779. corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
  780. });
  781. SyncPoint::GetInstance()->SetCallBack(
  782. "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
  783. [&](void* arg) {
  784. edit_with_incorrect_group_size_ =
  785. *reinterpret_cast<VersionEdit*>(arg);
  786. });
  787. SyncPoint::GetInstance()->EnableProcessing();
  788. }
  789. void AddNewEditsToLog(int num_edits) {
  790. for (int i = 0; i < num_edits; i++) {
  791. std::string record;
  792. edits_[i].EncodeTo(&record);
  793. ASSERT_OK(log_writer_->AddRecord(record));
  794. }
  795. }
  796. void TearDown() override {
  797. SyncPoint::GetInstance()->DisableProcessing();
  798. SyncPoint::GetInstance()->ClearAllCallBacks();
  799. log_writer_.reset();
  800. }
  801. protected:
  802. std::vector<ColumnFamilyDescriptor> column_families_;
  803. SequenceNumber last_seqno_;
  804. std::vector<VersionEdit> edits_;
  805. bool first_in_atomic_group_ = false;
  806. bool last_in_atomic_group_ = false;
  807. int num_edits_in_atomic_group_ = 0;
  808. int num_recovered_edits_ = 0;
  809. int num_applied_edits_ = 0;
  810. VersionEdit corrupted_edit_;
  811. VersionEdit edit_with_incorrect_group_size_;
  812. std::unique_ptr<log::Writer> log_writer_;
  813. };
  814. TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
  815. const int kAtomicGroupSize = 3;
  816. SetupValidAtomicGroup(kAtomicGroupSize);
  817. AddNewEditsToLog(kAtomicGroupSize);
  818. EXPECT_OK(versions_->Recover(column_families_, false));
  819. EXPECT_EQ(column_families_.size(),
  820. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  821. EXPECT_TRUE(first_in_atomic_group_);
  822. EXPECT_TRUE(last_in_atomic_group_);
  823. EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
  824. EXPECT_EQ(0, num_applied_edits_);
  825. }
  826. TEST_F(VersionSetAtomicGroupTest,
  827. HandleValidAtomicGroupWithReactiveVersionSetRecover) {
  828. const int kAtomicGroupSize = 3;
  829. SetupValidAtomicGroup(kAtomicGroupSize);
  830. AddNewEditsToLog(kAtomicGroupSize);
  831. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  832. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  833. std::unique_ptr<Status> manifest_reader_status;
  834. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  835. &manifest_reporter,
  836. &manifest_reader_status));
  837. EXPECT_EQ(column_families_.size(),
  838. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  839. EXPECT_TRUE(first_in_atomic_group_);
  840. EXPECT_TRUE(last_in_atomic_group_);
  841. // The recover should clean up the replay buffer.
  842. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  843. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  844. EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
  845. EXPECT_EQ(0, num_applied_edits_);
  846. }
  847. TEST_F(VersionSetAtomicGroupTest,
  848. HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
  849. const int kAtomicGroupSize = 3;
  850. SetupValidAtomicGroup(kAtomicGroupSize);
  851. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  852. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  853. std::unique_ptr<Status> manifest_reader_status;
  854. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  855. &manifest_reporter,
  856. &manifest_reader_status));
  857. AddNewEditsToLog(kAtomicGroupSize);
  858. InstrumentedMutex mu;
  859. std::unordered_set<ColumnFamilyData*> cfds_changed;
  860. mu.Lock();
  861. EXPECT_OK(
  862. reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
  863. mu.Unlock();
  864. EXPECT_TRUE(first_in_atomic_group_);
  865. EXPECT_TRUE(last_in_atomic_group_);
  866. // The recover should clean up the replay buffer.
  867. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  868. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  869. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  870. EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
  871. }
  872. TEST_F(VersionSetAtomicGroupTest,
  873. HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
  874. const int kAtomicGroupSize = 4;
  875. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  876. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  877. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  878. EXPECT_OK(versions_->Recover(column_families_, false));
  879. EXPECT_EQ(column_families_.size(),
  880. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  881. EXPECT_TRUE(first_in_atomic_group_);
  882. EXPECT_FALSE(last_in_atomic_group_);
  883. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  884. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  885. EXPECT_EQ(0, num_applied_edits_);
  886. }
  887. TEST_F(VersionSetAtomicGroupTest,
  888. HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
  889. const int kAtomicGroupSize = 4;
  890. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  891. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  892. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  893. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  894. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  895. std::unique_ptr<Status> manifest_reader_status;
  896. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  897. &manifest_reporter,
  898. &manifest_reader_status));
  899. EXPECT_EQ(column_families_.size(),
  900. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  901. EXPECT_TRUE(first_in_atomic_group_);
  902. EXPECT_FALSE(last_in_atomic_group_);
  903. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  904. // Reactive version set should store the edits in the replay buffer.
  905. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
  906. kNumberOfPersistedVersionEdits);
  907. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
  908. // Write the last record. The reactive version set should now apply all
  909. // edits.
  910. std::string last_record;
  911. edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
  912. EXPECT_OK(log_writer_->AddRecord(last_record));
  913. InstrumentedMutex mu;
  914. std::unordered_set<ColumnFamilyData*> cfds_changed;
  915. mu.Lock();
  916. EXPECT_OK(
  917. reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
  918. mu.Unlock();
  919. // Reactive version set should be empty now.
  920. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  921. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  922. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  923. EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
  924. }
  925. TEST_F(VersionSetAtomicGroupTest,
  926. HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
  927. const int kAtomicGroupSize = 4;
  928. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  929. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  930. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  931. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  932. std::unique_ptr<Status> manifest_reader_status;
  933. // No edits in an atomic group.
  934. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  935. &manifest_reporter,
  936. &manifest_reader_status));
  937. EXPECT_EQ(column_families_.size(),
  938. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  939. // Write a few edits in an atomic group.
  940. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  941. InstrumentedMutex mu;
  942. std::unordered_set<ColumnFamilyData*> cfds_changed;
  943. mu.Lock();
  944. EXPECT_OK(
  945. reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
  946. mu.Unlock();
  947. EXPECT_TRUE(first_in_atomic_group_);
  948. EXPECT_FALSE(last_in_atomic_group_);
  949. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  950. // Reactive version set should store the edits in the replay buffer.
  951. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
  952. kNumberOfPersistedVersionEdits);
  953. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
  954. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  955. EXPECT_EQ(0, num_applied_edits_);
  956. }
  957. TEST_F(VersionSetAtomicGroupTest,
  958. HandleCorruptedAtomicGroupWithVersionSetRecover) {
  959. const int kAtomicGroupSize = 4;
  960. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  961. AddNewEditsToLog(kAtomicGroupSize);
  962. EXPECT_NOK(versions_->Recover(column_families_, false));
  963. EXPECT_EQ(column_families_.size(),
  964. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  965. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  966. corrupted_edit_.DebugString());
  967. }
  968. TEST_F(VersionSetAtomicGroupTest,
  969. HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
  970. const int kAtomicGroupSize = 4;
  971. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  972. AddNewEditsToLog(kAtomicGroupSize);
  973. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  974. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  975. std::unique_ptr<Status> manifest_reader_status;
  976. EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
  977. &manifest_reporter,
  978. &manifest_reader_status));
  979. EXPECT_EQ(column_families_.size(),
  980. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  981. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  982. corrupted_edit_.DebugString());
  983. }
  984. TEST_F(VersionSetAtomicGroupTest,
  985. HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
  986. const int kAtomicGroupSize = 4;
  987. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  988. InstrumentedMutex mu;
  989. std::unordered_set<ColumnFamilyData*> cfds_changed;
  990. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  991. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  992. std::unique_ptr<Status> manifest_reader_status;
  993. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  994. &manifest_reporter,
  995. &manifest_reader_status));
  996. // Write the corrupted edits.
  997. AddNewEditsToLog(kAtomicGroupSize);
  998. mu.Lock();
  999. EXPECT_OK(
  1000. reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
  1001. mu.Unlock();
  1002. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  1003. corrupted_edit_.DebugString());
  1004. }
  1005. TEST_F(VersionSetAtomicGroupTest,
  1006. HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
  1007. const int kAtomicGroupSize = 4;
  1008. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  1009. AddNewEditsToLog(kAtomicGroupSize);
  1010. EXPECT_NOK(versions_->Recover(column_families_, false));
  1011. EXPECT_EQ(column_families_.size(),
  1012. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  1013. EXPECT_EQ(edits_[1].DebugString(),
  1014. edit_with_incorrect_group_size_.DebugString());
  1015. }
  1016. TEST_F(VersionSetAtomicGroupTest,
  1017. HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
  1018. const int kAtomicGroupSize = 4;
  1019. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  1020. AddNewEditsToLog(kAtomicGroupSize);
  1021. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  1022. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  1023. std::unique_ptr<Status> manifest_reader_status;
  1024. EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
  1025. &manifest_reporter,
  1026. &manifest_reader_status));
  1027. EXPECT_EQ(column_families_.size(),
  1028. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  1029. EXPECT_EQ(edits_[1].DebugString(),
  1030. edit_with_incorrect_group_size_.DebugString());
  1031. }
  1032. TEST_F(VersionSetAtomicGroupTest,
  1033. HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
  1034. const int kAtomicGroupSize = 4;
  1035. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  1036. InstrumentedMutex mu;
  1037. std::unordered_set<ColumnFamilyData*> cfds_changed;
  1038. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  1039. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  1040. std::unique_ptr<Status> manifest_reader_status;
  1041. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  1042. &manifest_reporter,
  1043. &manifest_reader_status));
  1044. AddNewEditsToLog(kAtomicGroupSize);
  1045. mu.Lock();
  1046. EXPECT_OK(
  1047. reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
  1048. mu.Unlock();
  1049. EXPECT_EQ(edits_[1].DebugString(),
  1050. edit_with_incorrect_group_size_.DebugString());
  1051. }
  1052. class VersionSetTestDropOneCF : public VersionSetTestBase,
  1053. public testing::TestWithParam<std::string> {
  1054. public:
  1055. VersionSetTestDropOneCF() : VersionSetTestBase() {}
  1056. };
  1057. // This test simulates the following execution sequence
  1058. // Time thread1 bg_flush_thr
  1059. // | Prepare version edits (e1,e2,e3) for atomic
  1060. // | flush cf1, cf2, cf3
  1061. // | Enqueue e to drop cfi
  1062. // | to manifest_writers_
  1063. // | Enqueue (e1,e2,e3) to manifest_writers_
  1064. // |
  1065. // | Apply e,
  1066. // | cfi.IsDropped() is true
  1067. // | Apply (e1,e2,e3),
  1068. // | since cfi.IsDropped() == true, we need to
  1069. // | drop ei and write the rest to MANIFEST.
  1070. // V
  1071. //
  1072. // Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
  1073. // last column family in an atomic group.
  1074. TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
  1075. std::vector<ColumnFamilyDescriptor> column_families;
  1076. SequenceNumber last_seqno;
  1077. std::unique_ptr<log::Writer> log_writer;
  1078. PrepareManifest(&column_families, &last_seqno, &log_writer);
  1079. Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
  1080. ASSERT_OK(s);
  1081. EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
  1082. EXPECT_EQ(column_families.size(),
  1083. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  1084. const int kAtomicGroupSize = 3;
  1085. const std::vector<std::string> non_default_cf_names = {
  1086. kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
  1087. // Drop one column family
  1088. VersionEdit drop_cf_edit;
  1089. drop_cf_edit.DropColumnFamily();
  1090. const std::string cf_to_drop_name(GetParam());
  1091. auto cfd_to_drop =
  1092. versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
  1093. ASSERT_NE(nullptr, cfd_to_drop);
  1094. // Increase its refcount because cfd_to_drop is used later, and we need to
  1095. // prevent it from being deleted.
  1096. cfd_to_drop->Ref();
  1097. drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
  1098. mutex_.Lock();
  1099. s = versions_->LogAndApply(cfd_to_drop,
  1100. *cfd_to_drop->GetLatestMutableCFOptions(),
  1101. &drop_cf_edit, &mutex_);
  1102. mutex_.Unlock();
  1103. ASSERT_OK(s);
  1104. std::vector<VersionEdit> edits(kAtomicGroupSize);
  1105. uint32_t remaining = kAtomicGroupSize;
  1106. size_t i = 0;
  1107. autovector<ColumnFamilyData*> cfds;
  1108. autovector<const MutableCFOptions*> mutable_cf_options_list;
  1109. autovector<autovector<VersionEdit*>> edit_lists;
  1110. for (const auto& cf_name : non_default_cf_names) {
  1111. auto cfd = (cf_name != cf_to_drop_name)
  1112. ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
  1113. : cfd_to_drop;
  1114. ASSERT_NE(nullptr, cfd);
  1115. cfds.push_back(cfd);
  1116. mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
  1117. edits[i].SetColumnFamily(cfd->GetID());
  1118. edits[i].SetLogNumber(0);
  1119. edits[i].SetNextFile(2);
  1120. edits[i].MarkAtomicGroup(--remaining);
  1121. edits[i].SetLastSequence(last_seqno++);
  1122. autovector<VersionEdit*> tmp_edits;
  1123. tmp_edits.push_back(&edits[i]);
  1124. edit_lists.emplace_back(tmp_edits);
  1125. ++i;
  1126. }
  1127. int called = 0;
  1128. SyncPoint::GetInstance()->DisableProcessing();
  1129. SyncPoint::GetInstance()->ClearAllCallBacks();
  1130. SyncPoint::GetInstance()->SetCallBack(
  1131. "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
  1132. std::vector<VersionEdit*>* tmp_edits =
  1133. reinterpret_cast<std::vector<VersionEdit*>*>(arg);
  1134. EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
  1135. for (const auto e : *tmp_edits) {
  1136. bool found = false;
  1137. for (const auto& e2 : edits) {
  1138. if (&e2 == e) {
  1139. found = true;
  1140. break;
  1141. }
  1142. }
  1143. ASSERT_TRUE(found);
  1144. }
  1145. ++called;
  1146. });
  1147. SyncPoint::GetInstance()->EnableProcessing();
  1148. mutex_.Lock();
  1149. s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
  1150. &mutex_);
  1151. mutex_.Unlock();
  1152. ASSERT_OK(s);
  1153. ASSERT_EQ(1, called);
  1154. if (cfd_to_drop->Unref()) {
  1155. delete cfd_to_drop;
  1156. cfd_to_drop = nullptr;
  1157. }
  1158. }
  1159. INSTANTIATE_TEST_CASE_P(
  1160. AtomicGroup, VersionSetTestDropOneCF,
  1161. testing::Values(VersionSetTestBase::kColumnFamilyName1,
  1162. VersionSetTestBase::kColumnFamilyName2,
  1163. VersionSetTestBase::kColumnFamilyName3));
  1164. } // namespace ROCKSDB_NAMESPACE
  1165. int main(int argc, char** argv) {
  1166. ::testing::InitGoogleTest(&argc, argv);
  1167. return RUN_ALL_TESTS();
  1168. }