compression_test.cc 73 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Testing various compression features
  7. #include <cstdlib>
  8. #include <memory>
  9. #include "db/db_test_util.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/flush_block_policy.h"
  12. #include "rocksdb/utilities/object_registry.h"
  13. #include "table/block_based/block_builder.h"
  14. #include "test_util/testutil.h"
  15. #include "util/auto_tune_compressor.h"
  16. #include "util/random.h"
  17. #include "util/simple_mixed_compressor.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. class DBCompressionTest : public DBTestBase {
  20. public:
  21. DBCompressionTest() : DBTestBase("compression_test", /*env_do_fsync=*/true) {}
  22. };
  23. TEST_F(DBCompressionTest, PresetCompressionDict) {
  24. // Verifies that compression ratio improves when dictionary is enabled, and
  25. // improves even further when the dictionary is trained by ZSTD.
  26. const size_t kBlockSizeBytes = 4 << 10;
  27. const size_t kL0FileBytes = 128 << 10;
  28. const size_t kApproxPerBlockOverheadBytes = 50;
  29. const int kNumL0Files = 5;
  30. Options options;
  31. // Make sure to use any custom env that the test is configured with.
  32. options.env = CurrentOptions().env;
  33. options.allow_concurrent_memtable_write = false;
  34. options.arena_block_size = kBlockSizeBytes;
  35. options.create_if_missing = true;
  36. options.disable_auto_compactions = true;
  37. options.level0_file_num_compaction_trigger = kNumL0Files;
  38. options.memtable_factory.reset(
  39. test::NewSpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
  40. options.num_levels = 2;
  41. options.target_file_size_base = kL0FileBytes;
  42. options.target_file_size_multiplier = 2;
  43. options.write_buffer_size = kL0FileBytes;
  44. BlockBasedTableOptions table_options;
  45. table_options.block_size = kBlockSizeBytes;
  46. std::vector<CompressionType> compression_types;
  47. if (Zlib_Supported()) {
  48. compression_types.push_back(kZlibCompression);
  49. }
  50. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  51. compression_types.push_back(kLZ4Compression);
  52. compression_types.push_back(kLZ4HCCompression);
  53. #endif // LZ4_VERSION_NUMBER >= 10400
  54. if (ZSTD_Supported()) {
  55. compression_types.push_back(kZSTD);
  56. }
  57. enum DictionaryTypes : int {
  58. kWithoutDict,
  59. kWithDict,
  60. kWithZSTDfinalizeDict,
  61. kWithZSTDTrainedDict,
  62. kDictEnd,
  63. };
  64. for (auto compression_type : compression_types) {
  65. options.compression = compression_type;
  66. size_t bytes_without_dict = 0;
  67. size_t bytes_with_dict = 0;
  68. size_t bytes_with_zstd_finalize_dict = 0;
  69. size_t bytes_with_zstd_trained_dict = 0;
  70. for (int i = kWithoutDict; i < kDictEnd; i++) {
  71. // First iteration: compress without preset dictionary
  72. // Second iteration: compress with preset dictionary
  73. // Third iteration (zstd only): compress with zstd-trained dictionary
  74. //
  75. // To make sure the compression dictionary has the intended effect, we
  76. // verify the compressed size is smaller in successive iterations. Also in
  77. // the non-first iterations, verify the data we get out is the same data
  78. // we put in.
  79. switch (i) {
  80. case kWithoutDict:
  81. options.compression_opts.max_dict_bytes = 0;
  82. options.compression_opts.zstd_max_train_bytes = 0;
  83. break;
  84. case kWithDict:
  85. options.compression_opts.max_dict_bytes = kBlockSizeBytes;
  86. options.compression_opts.zstd_max_train_bytes = 0;
  87. break;
  88. case kWithZSTDfinalizeDict:
  89. if (compression_type != kZSTD ||
  90. !ZSTD_FinalizeDictionarySupported()) {
  91. continue;
  92. }
  93. options.compression_opts.max_dict_bytes = kBlockSizeBytes;
  94. options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
  95. options.compression_opts.use_zstd_dict_trainer = false;
  96. break;
  97. case kWithZSTDTrainedDict:
  98. if (compression_type != kZSTD || !ZSTD_TrainDictionarySupported()) {
  99. continue;
  100. }
  101. options.compression_opts.max_dict_bytes = kBlockSizeBytes;
  102. options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
  103. options.compression_opts.use_zstd_dict_trainer = true;
  104. break;
  105. default:
  106. assert(false);
  107. }
  108. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  109. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  110. CreateAndReopenWithCF({"pikachu"}, options);
  111. Random rnd(301);
  112. std::string seq_datas[10];
  113. for (int j = 0; j < 10; ++j) {
  114. seq_datas[j] =
  115. rnd.RandomString(kBlockSizeBytes - kApproxPerBlockOverheadBytes);
  116. }
  117. ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
  118. for (int j = 0; j < kNumL0Files; ++j) {
  119. for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
  120. auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
  121. ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
  122. seq_datas[(key_num / 10) % 10]));
  123. }
  124. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
  125. ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
  126. }
  127. ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
  128. true /* disallow_trivial_move */));
  129. ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
  130. ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
  131. // Get the live sst files size
  132. size_t total_sst_bytes = TotalSize(1);
  133. if (i == kWithoutDict) {
  134. bytes_without_dict = total_sst_bytes;
  135. } else if (i == kWithDict) {
  136. bytes_with_dict = total_sst_bytes;
  137. } else if (i == kWithZSTDfinalizeDict) {
  138. bytes_with_zstd_finalize_dict = total_sst_bytes;
  139. } else if (i == kWithZSTDTrainedDict) {
  140. bytes_with_zstd_trained_dict = total_sst_bytes;
  141. }
  142. for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
  143. j++) {
  144. ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
  145. }
  146. if (i == kWithDict) {
  147. ASSERT_GT(bytes_without_dict, bytes_with_dict);
  148. } else if (i == kWithZSTDTrainedDict) {
  149. // In zstd compression, it is sometimes possible that using a finalized
  150. // dictionary does not get as good a compression ratio as raw content
  151. // dictionary. But using a dictionary should always get better
  152. // compression ratio than not using one.
  153. ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_finalize_dict ||
  154. bytes_without_dict > bytes_with_zstd_finalize_dict);
  155. } else if (i == kWithZSTDTrainedDict) {
  156. // In zstd compression, it is sometimes possible that using a trained
  157. // dictionary does not get as good a compression ratio as without
  158. // training.
  159. // But using a dictionary (with or without training) should always get
  160. // better compression ratio than not using one.
  161. ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_trained_dict ||
  162. bytes_without_dict > bytes_with_zstd_trained_dict);
  163. }
  164. DestroyAndReopen(options);
  165. }
  166. }
  167. }
  168. TEST_F(DBCompressionTest, PresetCompressionDictLocality) {
  169. if (!ZSTD_Supported()) {
  170. return;
  171. }
  172. // Verifies that compression dictionary is generated from local data. The
  173. // verification simply checks all output SSTs have different compression
  174. // dictionaries. We do not verify effectiveness as that'd likely be flaky in
  175. // the future.
  176. const int kNumEntriesPerFile = 1 << 10; // 1KB
  177. const int kNumBytesPerEntry = 1 << 10; // 1KB
  178. const int kNumFiles = 4;
  179. Options options = CurrentOptions();
  180. options.compression = kZSTD;
  181. options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
  182. options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
  183. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  184. options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
  185. BlockBasedTableOptions table_options;
  186. table_options.cache_index_and_filter_blocks = true;
  187. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  188. Reopen(options);
  189. Random rnd(301);
  190. for (int i = 0; i < kNumFiles; ++i) {
  191. for (int j = 0; j < kNumEntriesPerFile; ++j) {
  192. ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
  193. rnd.RandomString(kNumBytesPerEntry)));
  194. }
  195. ASSERT_OK(Flush());
  196. MoveFilesToLevel(1);
  197. ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
  198. }
  199. // Store all the dictionaries generated during a full compaction.
  200. std::vector<std::string> compression_dicts;
  201. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  202. "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
  203. [&](void* arg) {
  204. compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
  205. });
  206. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  207. CompactRangeOptions compact_range_opts;
  208. compact_range_opts.bottommost_level_compaction =
  209. BottommostLevelCompaction::kForceOptimized;
  210. ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
  211. // Dictionary compression should not be so good as to compress four totally
  212. // random files into one. If it does then there's probably something wrong
  213. // with the test.
  214. ASSERT_GT(NumTableFilesAtLevel(1), 1);
  215. // Furthermore, there should be one compression dictionary generated per file.
  216. // And they should all be different from each other.
  217. ASSERT_EQ(NumTableFilesAtLevel(1),
  218. static_cast<int>(compression_dicts.size()));
  219. for (size_t i = 1; i < compression_dicts.size(); ++i) {
  220. std::string& a = compression_dicts[i - 1];
  221. std::string& b = compression_dicts[i];
  222. size_t alen = a.size();
  223. size_t blen = b.size();
  224. ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
  225. }
  226. }
  227. static std::string CompressibleString(Random* rnd, int len) {
  228. std::string r;
  229. test::CompressibleString(rnd, 0.8, len, &r);
  230. return r;
  231. }
  232. TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel) {
  233. if (!Snappy_Supported()) {
  234. return;
  235. }
  236. const int kNKeys = 120;
  237. int keys[kNKeys];
  238. for (int i = 0; i < kNKeys; i++) {
  239. keys[i] = i;
  240. }
  241. Random rnd(301);
  242. Options options;
  243. options.env = env_;
  244. options.create_if_missing = true;
  245. options.db_write_buffer_size = 20480;
  246. options.write_buffer_size = 20480;
  247. options.max_write_buffer_number = 2;
  248. options.level0_file_num_compaction_trigger = 2;
  249. options.level0_slowdown_writes_trigger = 2;
  250. options.level0_stop_writes_trigger = 2;
  251. options.target_file_size_base = 20480;
  252. options.level_compaction_dynamic_level_bytes = true;
  253. options.max_bytes_for_level_base = 102400;
  254. options.max_bytes_for_level_multiplier = 4;
  255. options.max_background_compactions = 1;
  256. options.num_levels = 5;
  257. options.statistics = CreateDBStatistics();
  258. options.compression_per_level.resize(3);
  259. // No compression for L0
  260. options.compression_per_level[0] = kNoCompression;
  261. // No compression for the Ln whre L0 is compacted to
  262. options.compression_per_level[1] = kNoCompression;
  263. // Snappy compression for Ln+1
  264. options.compression_per_level[2] = kSnappyCompression;
  265. OnFileDeletionListener* listener = new OnFileDeletionListener();
  266. options.listeners.emplace_back(listener);
  267. DestroyAndReopen(options);
  268. // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
  269. // be compressed, so there shouldn't be any compression.
  270. for (int i = 0; i < 20; i++) {
  271. ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
  272. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  273. }
  274. ASSERT_OK(Flush());
  275. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  276. ASSERT_EQ(NumTableFilesAtLevel(1), 0);
  277. ASSERT_EQ(NumTableFilesAtLevel(2), 0);
  278. ASSERT_EQ(NumTableFilesAtLevel(3), 0);
  279. ASSERT_TRUE(NumTableFilesAtLevel(0) > 0 || NumTableFilesAtLevel(4) > 0);
  280. // Verify there was no compression
  281. auto num_block_compressed =
  282. options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
  283. ASSERT_EQ(num_block_compressed, 0);
  284. // Insert 400KB and there will be some files end up in L3. According to the
  285. // above compression settings for each level, there will be some compression.
  286. ASSERT_OK(options.statistics->Reset());
  287. ASSERT_EQ(num_block_compressed, 0);
  288. for (int i = 20; i < 120; i++) {
  289. ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
  290. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  291. }
  292. ASSERT_OK(Flush());
  293. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  294. ASSERT_EQ(NumTableFilesAtLevel(1), 0);
  295. ASSERT_EQ(NumTableFilesAtLevel(2), 0);
  296. ASSERT_GE(NumTableFilesAtLevel(3), 1);
  297. ASSERT_GE(NumTableFilesAtLevel(4), 1);
  298. // Verify there was compression
  299. num_block_compressed =
  300. options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
  301. ASSERT_GT(num_block_compressed, 0);
  302. // Make sure data in files in L3 is not compacted by removing all files
  303. // in L4 and calculate number of rows
  304. ASSERT_OK(dbfull()->SetOptions({
  305. {"disable_auto_compactions", "true"},
  306. }));
  307. ColumnFamilyMetaData cf_meta;
  308. db_->GetColumnFamilyMetaData(&cf_meta);
  309. // Ensure that L1+ files are non-overlapping and together with L0 encompass
  310. // full key range between smallestkey and largestkey from CF file metadata.
  311. int largestkey_in_prev_level = -1;
  312. int keys_found = 0;
  313. for (int level = (int)cf_meta.levels.size() - 1; level >= 0; level--) {
  314. int files_in_level = (int)cf_meta.levels[level].files.size();
  315. int largestkey_in_prev_file = -1;
  316. for (int j = 0; j < files_in_level; j++) {
  317. int smallestkey = IdFromKey(cf_meta.levels[level].files[j].smallestkey);
  318. int largestkey = IdFromKey(cf_meta.levels[level].files[j].largestkey);
  319. int num_entries = (int)cf_meta.levels[level].files[j].num_entries;
  320. ASSERT_EQ(num_entries, largestkey - smallestkey + 1);
  321. keys_found += num_entries;
  322. if (level > 0) {
  323. if (j == 0) {
  324. ASSERT_GT(smallestkey, largestkey_in_prev_level);
  325. }
  326. if (j > 0) {
  327. ASSERT_GT(smallestkey, largestkey_in_prev_file);
  328. }
  329. if (j == files_in_level - 1) {
  330. largestkey_in_prev_level = largestkey;
  331. }
  332. }
  333. largestkey_in_prev_file = largestkey;
  334. }
  335. }
  336. ASSERT_EQ(keys_found, kNKeys);
  337. for (const auto& file : cf_meta.levels[4].files) {
  338. listener->SetExpectedFileName(dbname_ + file.name);
  339. const RangeOpt ranges(file.smallestkey, file.largestkey);
  340. // Given verification from above, we're guaranteed that by deleting all the
  341. // files in [<smallestkey>, <largestkey>] range, we're effectively deleting
  342. // that very single file and nothing more.
  343. EXPECT_OK(dbfull()->DeleteFilesInRanges(dbfull()->DefaultColumnFamily(),
  344. &ranges, true /* include_end */));
  345. }
  346. listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
  347. int num_keys = 0;
  348. std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
  349. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  350. num_keys++;
  351. }
  352. ASSERT_OK(iter->status());
  353. ASSERT_EQ(NumTableFilesAtLevel(1), 0);
  354. ASSERT_EQ(NumTableFilesAtLevel(2), 0);
  355. ASSERT_GE(NumTableFilesAtLevel(3), 1);
  356. ASSERT_EQ(NumTableFilesAtLevel(4), 0);
  357. ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U + num_keys * 10U);
  358. }
  359. TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel2) {
  360. if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
  361. return;
  362. }
  363. const int kNKeys = 500;
  364. int keys[kNKeys];
  365. for (int i = 0; i < kNKeys; i++) {
  366. keys[i] = i;
  367. }
  368. RandomShuffle(std::begin(keys), std::end(keys));
  369. Random rnd(301);
  370. Options options;
  371. options.create_if_missing = true;
  372. options.db_write_buffer_size = 6000000;
  373. options.write_buffer_size = 600000;
  374. options.max_write_buffer_number = 2;
  375. options.level0_file_num_compaction_trigger = 2;
  376. options.level0_slowdown_writes_trigger = 2;
  377. options.level0_stop_writes_trigger = 2;
  378. options.soft_pending_compaction_bytes_limit = 1024 * 1024;
  379. options.target_file_size_base = 20;
  380. options.env = env_;
  381. options.level_compaction_dynamic_level_bytes = true;
  382. options.max_bytes_for_level_base = 200;
  383. options.max_bytes_for_level_multiplier = 8;
  384. options.max_background_compactions = 1;
  385. options.num_levels = 5;
  386. std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
  387. options.table_factory = mtf;
  388. options.compression_per_level.resize(3);
  389. options.compression_per_level[0] = kNoCompression;
  390. options.compression_per_level[1] = kLZ4Compression;
  391. options.compression_per_level[2] = kZlibCompression;
  392. DestroyAndReopen(options);
  393. // When base level is L4, L4 is LZ4.
  394. std::atomic<int> num_zlib(0);
  395. std::atomic<int> num_lz4(0);
  396. std::atomic<int> num_no(0);
  397. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  398. "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
  399. Compaction* compaction = static_cast<Compaction*>(arg);
  400. if (compaction->output_level() == 4) {
  401. ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
  402. num_lz4.fetch_add(1);
  403. }
  404. });
  405. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  406. "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
  407. auto* compression = static_cast<CompressionType*>(arg);
  408. ASSERT_TRUE(*compression == kNoCompression);
  409. num_no.fetch_add(1);
  410. });
  411. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  412. for (int i = 0; i < 100; i++) {
  413. std::string value = rnd.RandomString(200);
  414. ASSERT_OK(Put(Key(keys[i]), value));
  415. if (i % 25 == 24) {
  416. ASSERT_OK(Flush());
  417. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  418. }
  419. }
  420. ASSERT_OK(Flush());
  421. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  422. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  423. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  424. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  425. ASSERT_EQ(NumTableFilesAtLevel(1), 0);
  426. ASSERT_EQ(NumTableFilesAtLevel(2), 0);
  427. ASSERT_EQ(NumTableFilesAtLevel(3), 0);
  428. ASSERT_GT(NumTableFilesAtLevel(4), 0);
  429. ASSERT_GT(num_no.load(), 2);
  430. ASSERT_GT(num_lz4.load(), 0);
  431. int prev_num_files_l4 = NumTableFilesAtLevel(4);
  432. // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
  433. num_lz4.store(0);
  434. num_no.store(0);
  435. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  436. "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
  437. Compaction* compaction = static_cast<Compaction*>(arg);
  438. if (compaction->output_level() == 4 && compaction->start_level() == 3) {
  439. ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
  440. num_zlib.fetch_add(1);
  441. } else {
  442. ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
  443. num_lz4.fetch_add(1);
  444. }
  445. });
  446. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  447. "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
  448. auto* compression = static_cast<CompressionType*>(arg);
  449. ASSERT_TRUE(*compression == kNoCompression);
  450. num_no.fetch_add(1);
  451. });
  452. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  453. for (int i = 101; i < 500; i++) {
  454. std::string value = rnd.RandomString(200);
  455. ASSERT_OK(Put(Key(keys[i]), value));
  456. if (i % 100 == 99) {
  457. ASSERT_OK(Flush());
  458. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  459. }
  460. }
  461. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  462. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  463. ASSERT_EQ(NumTableFilesAtLevel(1), 0);
  464. ASSERT_EQ(NumTableFilesAtLevel(2), 0);
  465. ASSERT_GT(NumTableFilesAtLevel(3), 0);
  466. ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
  467. ASSERT_GT(num_no.load(), 2);
  468. ASSERT_GT(num_lz4.load(), 0);
  469. ASSERT_GT(num_zlib.load(), 0);
  470. }
  471. class PresetCompressionDictTest
  472. : public DBTestBase,
  473. public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
  474. public:
  475. PresetCompressionDictTest()
  476. : DBTestBase("db_test2", false /* env_do_fsync */),
  477. compression_type_(std::get<0>(GetParam())),
  478. bottommost_(std::get<1>(GetParam())) {}
  479. protected:
  480. const CompressionType compression_type_;
  481. const bool bottommost_;
  482. };
  483. INSTANTIATE_TEST_CASE_P(
  484. DBCompressionTest, PresetCompressionDictTest,
  485. ::testing::Combine(::testing::ValuesIn(GetSupportedDictCompressions()),
  486. ::testing::Bool()));
  487. TEST_P(PresetCompressionDictTest, Flush) {
  488. // Verifies that dictionary is generated and written during flush only when
  489. // `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
  490. // size of the dictionary is within expectations according to the limit on
  491. // buffering set by `CompressionOptions::max_dict_buffer_bytes`.
  492. const size_t kValueLen = 256;
  493. const size_t kKeysPerFile = 1 << 10;
  494. const size_t kDictLen = 16 << 10;
  495. const size_t kBlockLen = 4 << 10;
  496. Options options = CurrentOptions();
  497. if (bottommost_) {
  498. options.bottommost_compression = compression_type_;
  499. options.bottommost_compression_opts.enabled = true;
  500. options.bottommost_compression_opts.max_dict_bytes = kDictLen;
  501. options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
  502. } else {
  503. options.compression = compression_type_;
  504. options.compression_opts.max_dict_bytes = kDictLen;
  505. options.compression_opts.max_dict_buffer_bytes = kBlockLen;
  506. }
  507. options.memtable_factory.reset(test::NewSpecialSkipListFactory(kKeysPerFile));
  508. options.statistics = CreateDBStatistics();
  509. BlockBasedTableOptions bbto;
  510. bbto.block_size = kBlockLen;
  511. bbto.cache_index_and_filter_blocks = true;
  512. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  513. Reopen(options);
  514. Random rnd(301);
  515. for (size_t i = 0; i <= kKeysPerFile; ++i) {
  516. ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
  517. }
  518. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  519. // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
  520. // compression dictionary exists since dictionaries would be preloaded when
  521. // the flush finishes.
  522. if (bottommost_) {
  523. // Flush is never considered bottommost. This should change in the future
  524. // since flushed files may have nothing underneath them, like the one in
  525. // this test case.
  526. ASSERT_EQ(
  527. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  528. 0);
  529. } else {
  530. ASSERT_GT(
  531. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  532. 0);
  533. // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
  534. // number of bytes needs to be adjusted in case the cached block is in
  535. // ZSTD's digested dictionary format.
  536. if (compression_type_ != kZSTD) {
  537. // Although we limited buffering to `kBlockLen`, there may be up to two
  538. // blocks of data included in the dictionary since we only check limit
  539. // after each block is built.
  540. ASSERT_LE(TestGetTickerCount(options,
  541. BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  542. 2 * kBlockLen);
  543. }
  544. }
  545. }
  546. TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
  547. // Verifies that dictionary is generated and written during compaction to
  548. // non-bottommost level only when `ColumnFamilyOptions::compression` enables
  549. // dictionary. Also verifies the size of the dictionary is within expectations
  550. // according to the limit on buffering set by
  551. // `CompressionOptions::max_dict_buffer_bytes`.
  552. const size_t kValueLen = 256;
  553. const size_t kKeysPerFile = 1 << 10;
  554. const size_t kDictLen = 16 << 10;
  555. const size_t kBlockLen = 4 << 10;
  556. Options options = CurrentOptions();
  557. if (bottommost_) {
  558. options.bottommost_compression = compression_type_;
  559. options.bottommost_compression_opts.enabled = true;
  560. options.bottommost_compression_opts.max_dict_bytes = kDictLen;
  561. options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
  562. } else {
  563. options.compression = compression_type_;
  564. options.compression_opts.max_dict_bytes = kDictLen;
  565. options.compression_opts.max_dict_buffer_bytes = kBlockLen;
  566. }
  567. options.disable_auto_compactions = true;
  568. options.statistics = CreateDBStatistics();
  569. BlockBasedTableOptions bbto;
  570. bbto.block_size = kBlockLen;
  571. bbto.cache_index_and_filter_blocks = true;
  572. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  573. Reopen(options);
  574. Random rnd(301);
  575. for (size_t j = 0; j <= kKeysPerFile; ++j) {
  576. ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
  577. }
  578. ASSERT_OK(Flush());
  579. MoveFilesToLevel(2);
  580. for (int i = 0; i < 2; ++i) {
  581. for (size_t j = 0; j <= kKeysPerFile; ++j) {
  582. ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
  583. }
  584. ASSERT_OK(Flush());
  585. }
  586. ASSERT_EQ("2,0,1", FilesPerLevel(0));
  587. uint64_t prev_compression_dict_bytes_inserted =
  588. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
  589. // This L0->L1 compaction merges the two L0 files into L1. The produced L1
  590. // file is not bottommost due to the existing L2 file covering the same key-
  591. // range.
  592. ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
  593. ASSERT_EQ("0,1,1", FilesPerLevel(0));
  594. // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
  595. // compression dictionary exists since dictionaries would be preloaded when
  596. // the compaction finishes.
  597. if (bottommost_) {
  598. ASSERT_EQ(
  599. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  600. prev_compression_dict_bytes_inserted);
  601. } else {
  602. ASSERT_GT(
  603. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  604. prev_compression_dict_bytes_inserted);
  605. // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
  606. // number of bytes needs to be adjusted in case the cached block is in
  607. // ZSTD's digested dictionary format.
  608. if (compression_type_ != kZSTD) {
  609. // Although we limited buffering to `kBlockLen`, there may be up to two
  610. // blocks of data included in the dictionary since we only check limit
  611. // after each block is built.
  612. ASSERT_LE(TestGetTickerCount(options,
  613. BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  614. prev_compression_dict_bytes_inserted + 2 * kBlockLen);
  615. }
  616. }
  617. }
  618. TEST_P(PresetCompressionDictTest, CompactBottommost) {
  619. // Verifies that dictionary is generated and written during compaction to
  620. // non-bottommost level only when either `ColumnFamilyOptions::compression` or
  621. // `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
  622. // verifies the size of the dictionary is within expectations according to the
  623. // limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
  624. const size_t kValueLen = 256;
  625. const size_t kKeysPerFile = 1 << 10;
  626. const size_t kDictLen = 16 << 10;
  627. const size_t kBlockLen = 4 << 10;
  628. Options options = CurrentOptions();
  629. if (bottommost_) {
  630. options.bottommost_compression = compression_type_;
  631. options.bottommost_compression_opts.enabled = true;
  632. options.bottommost_compression_opts.max_dict_bytes = kDictLen;
  633. options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
  634. } else {
  635. options.compression = compression_type_;
  636. options.compression_opts.max_dict_bytes = kDictLen;
  637. options.compression_opts.max_dict_buffer_bytes = kBlockLen;
  638. }
  639. options.disable_auto_compactions = true;
  640. options.statistics = CreateDBStatistics();
  641. BlockBasedTableOptions bbto;
  642. bbto.block_size = kBlockLen;
  643. bbto.cache_index_and_filter_blocks = true;
  644. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  645. Reopen(options);
  646. Random rnd(301);
  647. for (int i = 0; i < 2; ++i) {
  648. for (size_t j = 0; j <= kKeysPerFile; ++j) {
  649. ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
  650. }
  651. ASSERT_OK(Flush());
  652. }
  653. ASSERT_EQ("2", FilesPerLevel(0));
  654. uint64_t prev_compression_dict_bytes_inserted =
  655. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
  656. CompactRangeOptions cro;
  657. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  658. ASSERT_EQ("0,1", FilesPerLevel(0));
  659. ASSERT_GT(
  660. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  661. prev_compression_dict_bytes_inserted);
  662. // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
  663. // number of bytes needs to be adjusted in case the cached block is in ZSTD's
  664. // digested dictionary format.
  665. if (compression_type_ != kZSTD) {
  666. // Although we limited buffering to `kBlockLen`, there may be up to two
  667. // blocks of data included in the dictionary since we only check limit after
  668. // each block is built.
  669. ASSERT_LE(
  670. TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
  671. prev_compression_dict_bytes_inserted + 2 * kBlockLen);
  672. }
  673. }
  674. class CompactionCompressionListener : public EventListener {
  675. public:
  676. explicit CompactionCompressionListener(Options* db_options)
  677. : db_options_(db_options) {}
  678. void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
  679. // Figure out last level with files
  680. int bottommost_level = 0;
  681. for (int level = 0; level < db->NumberLevels(); level++) {
  682. std::string files_at_level;
  683. ASSERT_TRUE(
  684. db->GetProperty("rocksdb.num-files-at-level" + std::to_string(level),
  685. &files_at_level));
  686. if (files_at_level != "0") {
  687. bottommost_level = level;
  688. }
  689. }
  690. if (db_options_->bottommost_compression != kDisableCompressionOption &&
  691. ci.output_level == bottommost_level) {
  692. ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
  693. } else if (db_options_->compression_per_level.size() != 0) {
  694. ASSERT_EQ(ci.compression,
  695. db_options_->compression_per_level[ci.output_level]);
  696. } else {
  697. ASSERT_EQ(ci.compression, db_options_->compression);
  698. }
  699. max_level_checked = std::max(max_level_checked, ci.output_level);
  700. }
  701. int max_level_checked = 0;
  702. const Options* db_options_;
  703. };
  704. enum CompressionFailureType {
  705. kTestCompressionFail,
  706. kTestDecompressionFail,
  707. kTestDecompressionCorruption
  708. };
  709. class CompressionFailuresTest
  710. : public DBCompressionTest,
  711. public testing::WithParamInterface<std::tuple<
  712. CompressionFailureType, CompressionType, uint32_t, uint32_t>> {
  713. public:
  714. CompressionFailuresTest() {
  715. std::tie(compression_failure_type_, compression_type_,
  716. compression_max_dict_bytes_, compression_parallel_threads_) =
  717. GetParam();
  718. }
  719. CompressionFailureType compression_failure_type_ = kTestCompressionFail;
  720. CompressionType compression_type_ = kNoCompression;
  721. uint32_t compression_max_dict_bytes_ = 0;
  722. uint32_t compression_parallel_threads_ = 0;
  723. };
  724. INSTANTIATE_TEST_CASE_P(
  725. DBCompressionTest, CompressionFailuresTest,
  726. ::testing::Combine(::testing::Values(kTestCompressionFail,
  727. kTestDecompressionFail,
  728. kTestDecompressionCorruption),
  729. ::testing::ValuesIn(GetSupportedCompressions()),
  730. ::testing::Values(0, 10), ::testing::Values(1, 4)));
  731. TEST_P(CompressionFailuresTest, CompressionFailures) {
  732. if (compression_type_ == kNoCompression) {
  733. return;
  734. }
  735. Options options = CurrentOptions();
  736. options.level0_file_num_compaction_trigger = 2;
  737. options.max_bytes_for_level_base = 1024;
  738. options.max_bytes_for_level_multiplier = 2;
  739. options.num_levels = 7;
  740. options.max_background_compactions = 1;
  741. options.target_file_size_base = 512;
  742. BlockBasedTableOptions table_options;
  743. table_options.block_size = 512;
  744. table_options.verify_compression = true;
  745. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  746. options.compression = compression_type_;
  747. options.compression_opts.parallel_threads = compression_parallel_threads_;
  748. options.compression_opts.max_dict_bytes = compression_max_dict_bytes_;
  749. options.bottommost_compression_opts.parallel_threads =
  750. compression_parallel_threads_;
  751. options.bottommost_compression_opts.max_dict_bytes =
  752. compression_max_dict_bytes_;
  753. if (compression_failure_type_ == kTestCompressionFail) {
  754. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  755. "CompressData:TamperWithReturnValue", [](void* arg) {
  756. bool* ret = static_cast<bool*>(arg);
  757. *ret = false;
  758. });
  759. } else if (compression_failure_type_ == kTestDecompressionFail) {
  760. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  761. "DecompressBlockData:TamperWithReturnValue", [](void* arg) {
  762. Status* ret = static_cast<Status*>(arg);
  763. ASSERT_OK(*ret);
  764. *ret = Status::Corruption("kTestDecompressionFail");
  765. });
  766. } else if (compression_failure_type_ == kTestDecompressionCorruption) {
  767. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  768. "DecompressBlockData:TamperWithDecompressionOutput", [](void* arg) {
  769. BlockContents* contents = static_cast<BlockContents*>(arg);
  770. // Ensure uncompressed data != original data
  771. const size_t len = contents->data.size() + 1;
  772. std::unique_ptr<char[]> fake_data(new char[len]());
  773. *contents = BlockContents(std::move(fake_data), len);
  774. });
  775. }
  776. std::map<std::string, std::string> key_value_written;
  777. const int kKeySize = 5;
  778. const int kValUnitSize = 16;
  779. const int kValSize = 256;
  780. Random rnd(405);
  781. Status s = Status::OK();
  782. DestroyAndReopen(options);
  783. // Write 10 random files
  784. for (int i = 0; i < 10; i++) {
  785. for (int j = 0; j < 5; j++) {
  786. std::string key = rnd.RandomString(kKeySize);
  787. // Ensure good compression ratio
  788. std::string valueUnit = rnd.RandomString(kValUnitSize);
  789. std::string value;
  790. for (int k = 0; k < kValSize; k += kValUnitSize) {
  791. value += valueUnit;
  792. }
  793. s = Put(key, value);
  794. if (compression_failure_type_ == kTestCompressionFail) {
  795. key_value_written[key] = value;
  796. ASSERT_OK(s);
  797. }
  798. }
  799. s = Flush();
  800. if (compression_failure_type_ == kTestCompressionFail) {
  801. ASSERT_OK(s);
  802. }
  803. s = dbfull()->TEST_WaitForCompact();
  804. if (compression_failure_type_ == kTestCompressionFail) {
  805. ASSERT_OK(s);
  806. }
  807. if (i == 4) {
  808. // Make compression fail at the mid of table building
  809. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  810. }
  811. }
  812. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  813. if (compression_failure_type_ == kTestCompressionFail) {
  814. // Should be kNoCompression, check content consistency
  815. std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
  816. for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
  817. std::string key = db_iter->key().ToString();
  818. std::string value = db_iter->value().ToString();
  819. ASSERT_NE(key_value_written.find(key), key_value_written.end());
  820. ASSERT_EQ(key_value_written[key], value);
  821. key_value_written.erase(key);
  822. }
  823. ASSERT_OK(db_iter->status());
  824. ASSERT_EQ(0, key_value_written.size());
  825. } else if (compression_failure_type_ == kTestDecompressionFail) {
  826. ASSERT_EQ(std::string(s.getState()),
  827. "Could not decompress: kTestDecompressionFail");
  828. } else if (compression_failure_type_ == kTestDecompressionCorruption) {
  829. ASSERT_EQ(std::string(s.getState()),
  830. "Decompressed block did not match pre-compression block");
  831. }
  832. }
  833. TEST_F(DBCompressionTest, CompressionOptions) {
  834. if (!Zlib_Supported() || !Snappy_Supported()) {
  835. return;
  836. }
  837. Options options = CurrentOptions();
  838. options.level0_file_num_compaction_trigger = 2;
  839. options.max_bytes_for_level_base = 100;
  840. options.max_bytes_for_level_multiplier = 2;
  841. options.num_levels = 7;
  842. options.max_background_compactions = 1;
  843. CompactionCompressionListener* listener =
  844. new CompactionCompressionListener(&options);
  845. options.listeners.emplace_back(listener);
  846. const int kKeySize = 5;
  847. const int kValSize = 20;
  848. Random rnd(301);
  849. std::vector<uint32_t> compression_parallel_threads = {1, 4};
  850. std::map<std::string, std::string> key_value_written;
  851. for (int iter = 0; iter <= 2; iter++) {
  852. listener->max_level_checked = 0;
  853. if (iter == 0) {
  854. // Use different compression algorithms for different levels but
  855. // always use Zlib for bottommost level
  856. options.compression_per_level = {kNoCompression, kNoCompression,
  857. kNoCompression, kSnappyCompression,
  858. kSnappyCompression, kSnappyCompression,
  859. kZlibCompression};
  860. options.compression = kNoCompression;
  861. options.bottommost_compression = kZlibCompression;
  862. } else if (iter == 1) {
  863. // Use Snappy except for bottommost level use ZLib
  864. options.compression_per_level = {};
  865. options.compression = kSnappyCompression;
  866. options.bottommost_compression = kZlibCompression;
  867. } else if (iter == 2) {
  868. // Use Snappy everywhere
  869. options.compression_per_level = {};
  870. options.compression = kSnappyCompression;
  871. options.bottommost_compression = kDisableCompressionOption;
  872. }
  873. for (auto num_threads : compression_parallel_threads) {
  874. options.compression_opts.parallel_threads = num_threads;
  875. options.bottommost_compression_opts.parallel_threads = num_threads;
  876. DestroyAndReopen(options);
  877. // Write 10 random files
  878. for (int i = 0; i < 10; i++) {
  879. for (int j = 0; j < 5; j++) {
  880. std::string key = rnd.RandomString(kKeySize);
  881. std::string value = rnd.RandomString(kValSize);
  882. key_value_written[key] = value;
  883. ASSERT_OK(Put(key, value));
  884. }
  885. ASSERT_OK(Flush());
  886. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  887. }
  888. // Make sure that we wrote enough to check all 7 levels
  889. ASSERT_EQ(listener->max_level_checked, 6);
  890. // Make sure database content is the same as key_value_written
  891. std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
  892. for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
  893. std::string key = db_iter->key().ToString();
  894. std::string value = db_iter->value().ToString();
  895. ASSERT_NE(key_value_written.find(key), key_value_written.end());
  896. ASSERT_EQ(key_value_written[key], value);
  897. key_value_written.erase(key);
  898. }
  899. ASSERT_OK(db_iter->status());
  900. ASSERT_EQ(0, key_value_written.size());
  901. }
  902. }
  903. }
  904. TEST_F(DBCompressionTest, RoundRobinManager) {
  905. if (ZSTD_Supported()) {
  906. auto mgr =
  907. std::make_shared<RoundRobinManager>(GetBuiltinV2CompressionManager());
  908. std::vector<std::string> values;
  909. for (bool use_wrapper : {true}) {
  910. SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
  911. Options options = CurrentOptions();
  912. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  913. options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
  914. BlockBasedTableOptions bbto;
  915. bbto.enable_index_compression = false;
  916. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  917. options.compression_manager = use_wrapper ? mgr : nullptr;
  918. DestroyAndReopen(options);
  919. Random rnd(301);
  920. constexpr int kCount = 13;
  921. // Highly compressible blocks, except 1 non-compressible. Half of the
  922. // compressible are morked for bypass and 1 marked for rejection. Values
  923. // are large enough to ensure just 1 k-v per block.
  924. for (int i = 0; i < kCount; ++i) {
  925. std::string value;
  926. if (i == 6) {
  927. // One non-compressible block
  928. value = rnd.RandomBinaryString(20000);
  929. } else {
  930. test::CompressibleString(&rnd, 0.1, 20000, &value);
  931. }
  932. values.push_back(value);
  933. ASSERT_OK(Put(Key(i), value));
  934. ASSERT_EQ(Get(Key(i)), value);
  935. }
  936. ASSERT_OK(Flush());
  937. // Ensure well-formed for reads
  938. for (int i = 0; i < kCount; ++i) {
  939. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  940. ASSERT_EQ(Get(Key(i)), values[i]);
  941. }
  942. ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
  943. }
  944. }
  945. }
  946. TEST_F(DBCompressionTest, RandomMixedCompressionManager) {
  947. if (ZSTD_Supported()) {
  948. auto mgr = std::make_shared<RandomMixedCompressionManager>(
  949. GetBuiltinV2CompressionManager());
  950. std::vector<std::string> values;
  951. for (bool use_wrapper : {true}) {
  952. SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
  953. Options options = CurrentOptions();
  954. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  955. options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
  956. BlockBasedTableOptions bbto;
  957. bbto.enable_index_compression = false;
  958. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  959. options.compression_manager = use_wrapper ? mgr : nullptr;
  960. DestroyAndReopen(options);
  961. Random rnd(301);
  962. constexpr int kCount = 13;
  963. // Highly compressible blocks, except 1 non-compressible. Half of the
  964. // compressible are morked for bypass and 1 marked for rejection. Values
  965. // are large enough to ensure just 1 k-v per block.
  966. for (int i = 0; i < kCount; ++i) {
  967. std::string value;
  968. if (i == 6) {
  969. // One non-compressible block
  970. value = rnd.RandomBinaryString(20000);
  971. } else {
  972. test::CompressibleString(&rnd, 0.1, 20000, &value);
  973. }
  974. values.push_back(value);
  975. ASSERT_OK(Put(Key(i), value));
  976. ASSERT_EQ(Get(Key(i)), value);
  977. }
  978. ASSERT_OK(Flush());
  979. // Ensure well-formed for reads
  980. for (int i = 0; i < kCount; ++i) {
  981. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  982. ASSERT_EQ(Get(Key(i)), values[i]);
  983. }
  984. ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
  985. }
  986. }
  987. }
  988. TEST_F(DBCompressionTest, CompressionManagerWrapper) {
  989. // Test that we can use a custom CompressionManager to wrap the built-in
  990. // CompressionManager, thus adopting a custom *strategy* based on existing
  991. // algorithms. This will "mark" some blocks (in their contents) as "do not
  992. // compress", i.e. no attempt to compress, and some blocks as "reject
  993. // compression", i.e. compression attempted but rejected because of ratio
  994. // or otherwise. These cases are distinguishable for statistics that
  995. // approximate "wasted effort".
  996. static std::string kDoNotCompress = "do_not_compress";
  997. static std::string kRejectCompression = "reject_compression";
  998. struct MyCompressor : public CompressorWrapper {
  999. using CompressorWrapper::CompressorWrapper;
  1000. const char* Name() const override { return "MyCompressor"; }
  1001. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  1002. size_t* compressed_output_size,
  1003. CompressionType* out_compression_type,
  1004. ManagedWorkingArea* working_area) override {
  1005. auto begin = uncompressed_data.data();
  1006. auto end = uncompressed_data.data() + uncompressed_data.size();
  1007. if (std::search(begin, end, kDoNotCompress.begin(),
  1008. kDoNotCompress.end()) != end) {
  1009. // Do not attempt compression
  1010. *compressed_output_size = 0;
  1011. EXPECT_EQ(*out_compression_type, kNoCompression);
  1012. return Status::OK();
  1013. } else if (std::search(begin, end, kRejectCompression.begin(),
  1014. kRejectCompression.end()) != end) {
  1015. // Simulate attempted & rejected compression
  1016. *compressed_output_size = 1;
  1017. EXPECT_EQ(*out_compression_type, kNoCompression);
  1018. return Status::OK();
  1019. } else {
  1020. return wrapped_->CompressBlock(uncompressed_data, compressed_output,
  1021. compressed_output_size,
  1022. out_compression_type, working_area);
  1023. }
  1024. }
  1025. // Also check WorkingArea handling
  1026. struct MyWorkingArea : public WorkingArea {
  1027. explicit MyWorkingArea(ManagedWorkingArea&& wrapped)
  1028. : wrapped_(std::move(wrapped)) {}
  1029. ManagedWorkingArea wrapped_;
  1030. };
  1031. ManagedWorkingArea ObtainWorkingArea() override {
  1032. ManagedWorkingArea rv{
  1033. new MyWorkingArea{CompressorWrapper::ObtainWorkingArea()}, this};
  1034. if (GetPreferredCompressionType() == kZSTD) {
  1035. // ZSTD should always use WorkingArea, so this is our chance to ensure
  1036. // CompressorWrapper::ObtainWorkingArea() is properly connected
  1037. assert(rv.get() != nullptr);
  1038. }
  1039. return rv;
  1040. }
  1041. void ReleaseWorkingArea(WorkingArea* wa) override {
  1042. delete static_cast<MyWorkingArea*>(wa);
  1043. }
  1044. };
  1045. struct MyManager : public CompressionManagerWrapper {
  1046. using CompressionManagerWrapper::CompressionManagerWrapper;
  1047. const char* Name() const override { return "MyManager"; }
  1048. std::unique_ptr<Compressor> GetCompressorForSST(
  1049. const FilterBuildingContext& context, const CompressionOptions& opts,
  1050. CompressionType preferred) override {
  1051. return std::make_unique<MyCompressor>(
  1052. wrapped_->GetCompressorForSST(context, opts, preferred));
  1053. }
  1054. };
  1055. auto mgr = std::make_shared<MyManager>(GetBuiltinV2CompressionManager());
  1056. for (CompressionType type : GetSupportedCompressions()) {
  1057. for (bool use_wrapper : {false, true}) {
  1058. if (type == kNoCompression) {
  1059. continue;
  1060. }
  1061. SCOPED_TRACE("Compression type: " + std::to_string(type) +
  1062. (use_wrapper ? " with " : " no ") + "wrapper");
  1063. Options options = CurrentOptions();
  1064. options.compression = type;
  1065. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  1066. options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
  1067. BlockBasedTableOptions bbto;
  1068. bbto.enable_index_compression = false;
  1069. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1070. options.compression_manager = use_wrapper ? mgr : nullptr;
  1071. DestroyAndReopen(options);
  1072. auto PopStat = [&](Tickers t) -> uint64_t {
  1073. return options.statistics->getAndResetTickerCount(t);
  1074. };
  1075. Random rnd(301);
  1076. constexpr int kCount = 13;
  1077. // Highly compressible blocks, except 1 non-compressible. Half of the
  1078. // compressible are morked for bypass and 1 marked for rejection. Values
  1079. // are large enough to ensure just 1 k-v per block.
  1080. for (int i = 0; i < kCount; ++i) {
  1081. std::string value;
  1082. if (i == 6) {
  1083. // One non-compressible block
  1084. value = rnd.RandomBinaryString(20000);
  1085. } else {
  1086. test::CompressibleString(&rnd, 0.1, 20000, &value);
  1087. if ((i % 2) == 0) {
  1088. // Half for bypass
  1089. value += kDoNotCompress;
  1090. } else if (i == 7) {
  1091. // One for rejection
  1092. value += kRejectCompression;
  1093. }
  1094. }
  1095. ASSERT_OK(Put(Key(i), value));
  1096. }
  1097. ASSERT_OK(Flush());
  1098. if (use_wrapper) {
  1099. EXPECT_EQ(kCount / 2 - 1, PopStat(NUMBER_BLOCK_COMPRESSED));
  1100. EXPECT_EQ(kCount / 2, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
  1101. EXPECT_EQ(1 + 1, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
  1102. } else {
  1103. EXPECT_EQ(kCount - 1, PopStat(NUMBER_BLOCK_COMPRESSED));
  1104. EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
  1105. EXPECT_EQ(1, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
  1106. }
  1107. // Ensure well-formed for reads
  1108. for (int i = 0; i < kCount; ++i) {
  1109. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  1110. }
  1111. ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
  1112. }
  1113. }
  1114. }
  1115. TEST_F(DBCompressionTest, CompressionManagerCustomCompression) {
  1116. // Test that we can use a custom CompressionManager to implement custom
  1117. // compression algorithms, and that there are appropriate schema guard rails
  1118. // to ensure data is not processed by the wrong algorithm.
  1119. using Compressor8A = test::CompressorCustomAlg<kCustomCompression8A>;
  1120. using Compressor8B = test::CompressorCustomAlg<kCustomCompression8B>;
  1121. using Compressor8C = test::CompressorCustomAlg<kCustomCompression8C>;
  1122. if (!Compressor8A::Supported() || !LZ4_Supported()) {
  1123. fprintf(stderr,
  1124. "Prerequisite compression library not supported. Skipping\n");
  1125. return;
  1126. }
  1127. class MyManager : public CompressionManager {
  1128. public:
  1129. explicit MyManager(const char* compat_name) : compat_name_(compat_name) {}
  1130. const char* Name() const override { return name_.c_str(); }
  1131. const char* CompatibilityName() const override { return compat_name_; }
  1132. bool SupportsCompressionType(CompressionType type) const override {
  1133. return type == kCustomCompression8A || type == kCustomCompression8B ||
  1134. type == kCustomCompression8C ||
  1135. GetBuiltinV2CompressionManager()->SupportsCompressionType(type);
  1136. }
  1137. int used_compressor8A_count_ = 0;
  1138. int used_compressor8B_count_ = 0;
  1139. int used_compressor8C_count_ = 0;
  1140. std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
  1141. CompressionType type) override {
  1142. switch (static_cast<unsigned char>(type)) {
  1143. case kCustomCompression8A:
  1144. used_compressor8A_count_++;
  1145. return std::make_unique<Compressor8A>();
  1146. case kCustomCompression8B:
  1147. used_compressor8B_count_++;
  1148. return std::make_unique<Compressor8B>();
  1149. case kCustomCompression8C:
  1150. used_compressor8C_count_++;
  1151. return std::make_unique<Compressor8C>();
  1152. // Also support built-in compression algorithms
  1153. default:
  1154. return GetBuiltinV2CompressionManager()->GetCompressor(opts, type);
  1155. }
  1156. }
  1157. std::shared_ptr<Decompressor> GetDecompressor() override {
  1158. return std::make_shared<test::DecompressorCustomAlg>();
  1159. }
  1160. RelaxedAtomic<CompressionType> last_specific_decompressor_type_{
  1161. kNoCompression};
  1162. std::shared_ptr<Decompressor> GetDecompressorForTypes(
  1163. const CompressionType* types_begin,
  1164. const CompressionType* types_end) override {
  1165. assert(types_end > types_begin);
  1166. last_specific_decompressor_type_.StoreRelaxed(*types_begin);
  1167. auto decomp = std::make_shared<test::DecompressorCustomAlg>();
  1168. decomp->SetAllowedTypes(types_begin, types_end);
  1169. return decomp;
  1170. }
  1171. void AddFriend(const std::shared_ptr<CompressionManager>& mgr) {
  1172. friends_[mgr->CompatibilityName()] = mgr;
  1173. }
  1174. std::shared_ptr<CompressionManager> FindCompatibleCompressionManager(
  1175. Slice compatibility_name) override {
  1176. std::shared_ptr<CompressionManager> rv =
  1177. CompressionManager::FindCompatibleCompressionManager(
  1178. compatibility_name);
  1179. if (!rv) {
  1180. auto it = friends_.find(compatibility_name.ToString());
  1181. if (it != friends_.end()) {
  1182. return it->second.lock();
  1183. }
  1184. }
  1185. return rv;
  1186. }
  1187. private:
  1188. const char* compat_name_;
  1189. std::string name_;
  1190. // weak_ptr to avoid cycles
  1191. std::map<std::string, std::weak_ptr<CompressionManager>> friends_;
  1192. };
  1193. for (bool use_dict : {false, true}) {
  1194. SCOPED_TRACE(use_dict ? "With dict" : "No dict");
  1195. // Although these compression managers are actually compatible, we must
  1196. // respect their distinct compatibility names and treat them as incompatible
  1197. // (or else risk processing data incorrectly)
  1198. // NOTE: these are not registered in ObjectRegistry to test what happens
  1199. // when the original CompressionManager might not be available, but
  1200. // mgr_bar will be registered during the test, with different names to
  1201. // prevent interference between iterations.
  1202. auto mgr_foo = std::make_shared<MyManager>("Foo");
  1203. auto mgr_bar = std::make_shared<MyManager>(use_dict ? "Bar1" : "Bar2");
  1204. // And this one claims to be fully compatible with the built-in compression
  1205. // manager when it's not fully compatible (for custom CompressionTypes)
  1206. auto mgr_claim_compatible = std::make_shared<MyManager>("BuiltinV2");
  1207. constexpr uint16_t kValueSize = 10000;
  1208. Options options = CurrentOptions();
  1209. options.level0_file_num_compaction_trigger = 20;
  1210. BlockBasedTableOptions bbto;
  1211. bbto.enable_index_compression = false;
  1212. bbto.format_version = 6; // Before custom compression alg support
  1213. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1214. // Claims not to use custom compression (and doesn't unless setting a custom
  1215. // CompressionType)
  1216. options.compression_manager = mgr_claim_compatible;
  1217. // Use a built-in compression type with dictionary support
  1218. options.compression = kLZ4Compression;
  1219. options.compression_opts.max_dict_bytes = kValueSize / 2;
  1220. DestroyAndReopen(options);
  1221. Random rnd(404);
  1222. std::string value;
  1223. ASSERT_OK(
  1224. Put("a", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1225. ASSERT_OK(Flush());
  1226. // That data should be readable without access to the original compression
  1227. // manager, because it used the built-in CompatibilityName and a built-in
  1228. // CompressionType
  1229. options.compression_manager = nullptr;
  1230. Reopen(options);
  1231. ASSERT_EQ(Get("a"), value);
  1232. // Verify it was compressed
  1233. Range r = {"a", "a0"};
  1234. TablePropertiesCollection tables_properties;
  1235. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1236. 1, &tables_properties));
  1237. ASSERT_EQ(tables_properties.size(), 1U);
  1238. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1239. EXPECT_EQ(tables_properties.begin()->second->compression_name, "LZ4");
  1240. // Disallow setting a custom CompressionType with a CompressionManager
  1241. // claiming to be built-in compatible.
  1242. options.compression_manager = mgr_claim_compatible;
  1243. options.compression = kCustomCompression8A;
  1244. ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
  1245. options.compression_manager = nullptr;
  1246. options.compression = kCustomCompressionFE;
  1247. ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
  1248. options.compression =
  1249. static_cast<CompressionType>(kLastBuiltinCompression + 1);
  1250. ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
  1251. // Custom compression schema (different CompatibilityName) not supported
  1252. // before format_version=7
  1253. options.compression_manager = mgr_foo;
  1254. options.compression = kLZ4Compression;
  1255. ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
  1256. // Set format version supporting custom compression
  1257. bbto.format_version = 7;
  1258. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1259. // Custom compression type not supported with built-in schema name, even
  1260. // with format_version=7
  1261. options.compression_manager = mgr_claim_compatible;
  1262. options.compression = kCustomCompression8B;
  1263. ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
  1264. // Custom compression schema, but specifying a custom compression type it
  1265. // doesn't support.
  1266. options.compression_manager = mgr_foo;
  1267. options.compression = kCustomCompressionF0;
  1268. ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
  1269. // Using a built-in compression type with fv=7 but named custom schema
  1270. options.compression = kLZ4Compression;
  1271. Reopen(options);
  1272. ASSERT_OK(
  1273. Put("b", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1274. ASSERT_OK(Flush());
  1275. ASSERT_EQ(NumTableFilesAtLevel(0), 2);
  1276. ASSERT_EQ(Get("b"), value);
  1277. // Verify it was compressed with LZ4
  1278. r = {"b", "b0"};
  1279. tables_properties.clear();
  1280. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1281. 1, &tables_properties));
  1282. ASSERT_EQ(tables_properties.size(), 1U);
  1283. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1284. // Uses new format for "compression_name" property
  1285. EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
  1286. EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
  1287. kLZ4Compression);
  1288. // Custom compression type
  1289. options.compression = kCustomCompression8A;
  1290. Reopen(options);
  1291. ASSERT_OK(
  1292. Put("c", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1293. EXPECT_EQ(mgr_foo->used_compressor8A_count_, 0);
  1294. ASSERT_OK(Flush());
  1295. ASSERT_EQ(NumTableFilesAtLevel(0), 3);
  1296. ASSERT_EQ(Get("c"), value);
  1297. EXPECT_EQ(mgr_foo->used_compressor8A_count_, 1);
  1298. // Verify it was compressed with custom format
  1299. r = {"c", "c0"};
  1300. tables_properties.clear();
  1301. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1302. 1, &tables_properties));
  1303. ASSERT_EQ(tables_properties.size(), 1U);
  1304. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1305. EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8A;");
  1306. EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
  1307. kCustomCompression8A);
  1308. // Also dynamically changeable, because the compression manager will respect
  1309. // the current setting as reported under the legacy logic
  1310. ASSERT_OK(dbfull()->SetOptions({{"compression", "kLZ4Compression"}}));
  1311. ASSERT_OK(
  1312. Put("d", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1313. ASSERT_OK(Flush());
  1314. ASSERT_EQ(NumTableFilesAtLevel(0), 4);
  1315. ASSERT_EQ(Get("d"), value);
  1316. // Verify it was compressed with LZ4
  1317. r = {"d", "d0"};
  1318. tables_properties.clear();
  1319. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1320. 1, &tables_properties));
  1321. ASSERT_EQ(tables_properties.size(), 1U);
  1322. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1323. EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
  1324. EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
  1325. kLZ4Compression);
  1326. // Dynamically changeable to custom compressions also
  1327. ASSERT_OK(dbfull()->SetOptions({{"compression", "kCustomCompression8B"}}));
  1328. ASSERT_OK(
  1329. Put("e", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1330. ASSERT_OK(Flush());
  1331. ASSERT_EQ(NumTableFilesAtLevel(0), 5);
  1332. ASSERT_EQ(Get("e"), value);
  1333. // Verify it was compressed with custom format
  1334. r = {"e", "e0"};
  1335. tables_properties.clear();
  1336. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1337. 1, &tables_properties));
  1338. ASSERT_EQ(tables_properties.size(), 1U);
  1339. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1340. EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8B;");
  1341. EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
  1342. kCustomCompression8B);
  1343. // Fails to re-open with incompatible compression manager (can't find
  1344. // compression manager Foo because it's not registered nor known by Bar)
  1345. options.compression_manager = mgr_bar;
  1346. options.compression = kLZ4Compression;
  1347. ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
  1348. // But should re-open if we make Bar aware of the Foo compression manager
  1349. mgr_bar->AddFriend(mgr_foo);
  1350. Reopen(options);
  1351. // Can still read everything
  1352. ASSERT_EQ(Get("a").size(), kValueSize);
  1353. ASSERT_EQ(Get("b").size(), kValueSize);
  1354. ASSERT_EQ(Get("c").size(), kValueSize);
  1355. ASSERT_EQ(Get("d").size(), kValueSize);
  1356. ASSERT_EQ(Get("e").size(), kValueSize);
  1357. // Add a file using mgr_bar
  1358. ASSERT_OK(
  1359. Put("f", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
  1360. ASSERT_OK(Flush());
  1361. ASSERT_EQ(NumTableFilesAtLevel(0), 6);
  1362. ASSERT_EQ(Get("f"), value);
  1363. // Verify it was compressed appropriately
  1364. r = {"f", "f0"};
  1365. tables_properties.clear();
  1366. ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
  1367. 1, &tables_properties));
  1368. ASSERT_EQ(tables_properties.size(), 1U);
  1369. EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
  1370. EXPECT_EQ(mgr_bar->last_specific_decompressor_type_.LoadRelaxed(),
  1371. kLZ4Compression);
  1372. // Fails to re-open with incompatible compression manager (can't find
  1373. // compression manager Bar because it's not registered nor known by Foo)
  1374. options.compression_manager = mgr_foo;
  1375. ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
  1376. // Register and re-open
  1377. auto& library = *ObjectLibrary::Default();
  1378. library.AddFactory<CompressionManager>(
  1379. mgr_bar->CompatibilityName(),
  1380. [mgr_bar](const std::string& /*uri*/,
  1381. std::unique_ptr<CompressionManager>* guard,
  1382. std::string* /*errmsg*/) {
  1383. *guard = std::make_unique<MyManager>(mgr_bar->CompatibilityName());
  1384. return guard->get();
  1385. });
  1386. Reopen(options);
  1387. // Can still read everything
  1388. ASSERT_EQ(Get("a").size(), kValueSize);
  1389. ASSERT_EQ(Get("b").size(), kValueSize);
  1390. ASSERT_EQ(Get("c").size(), kValueSize);
  1391. ASSERT_EQ(Get("d").size(), kValueSize);
  1392. ASSERT_EQ(Get("e").size(), kValueSize);
  1393. ASSERT_EQ(Get("f").size(), kValueSize);
  1394. // TODO: test old version of a compression manager unable to read a
  1395. // compression type
  1396. }
  1397. }
  1398. TEST_F(DBCompressionTest, FailWhenCompressionNotSupportedTest) {
  1399. CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
  1400. kLZ4Compression, kLZ4HCCompression,
  1401. kXpressCompression};
  1402. for (auto comp : compressions) {
  1403. if (!CompressionTypeSupported(comp)) {
  1404. // not supported, we should fail the Open()
  1405. Options options = CurrentOptions();
  1406. options.compression = comp;
  1407. ASSERT_TRUE(!TryReopen(options).ok());
  1408. // Try if CreateColumnFamily also fails
  1409. options.compression = kNoCompression;
  1410. ASSERT_OK(TryReopen(options));
  1411. ColumnFamilyOptions cf_options(options);
  1412. cf_options.compression = comp;
  1413. ColumnFamilyHandle* handle;
  1414. ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
  1415. }
  1416. }
  1417. }
  1418. class AutoSkipTestFlushBlockPolicy : public FlushBlockPolicy {
  1419. public:
  1420. explicit AutoSkipTestFlushBlockPolicy(const int window,
  1421. const BlockBuilder& data_block_builder,
  1422. std::shared_ptr<Statistics> statistics)
  1423. : window_(window),
  1424. num_keys_(0),
  1425. data_block_builder_(data_block_builder),
  1426. statistics_(statistics) {}
  1427. bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
  1428. auto nth_window = num_keys_ / window_;
  1429. if (data_block_builder_.empty()) {
  1430. // First key in this block
  1431. return false;
  1432. }
  1433. // Check every window
  1434. if (num_keys_ % window_ == 0) {
  1435. auto set_exploration = [&](void* arg) {
  1436. bool* exploration = static_cast<bool*>(arg);
  1437. *exploration = true;
  1438. };
  1439. auto unset_exploration = [&](void* arg) {
  1440. bool* exploration = static_cast<bool*>(arg);
  1441. *exploration = false;
  1442. };
  1443. SyncPoint::GetInstance()->DisableProcessing();
  1444. SyncPoint::GetInstance()->ClearAllCallBacks();
  1445. // We force exploration to set the predicted rejection ratio for odd
  1446. // window and then test that the prediction is exploited in the even
  1447. // window
  1448. if (nth_window % 2 == 0) {
  1449. SyncPoint::GetInstance()->SetCallBack(
  1450. "AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
  1451. set_exploration);
  1452. } else {
  1453. SyncPoint::GetInstance()->SetCallBack(
  1454. "AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
  1455. unset_exploration);
  1456. }
  1457. SyncPoint::GetInstance()->EnableProcessing();
  1458. auto compressed_count = PopStat(NUMBER_BLOCK_COMPRESSED);
  1459. auto bypassed_count = PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED);
  1460. auto rejected_count = PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED);
  1461. auto total = compressed_count + rejected_count + bypassed_count;
  1462. int rejection_percentage, bypassed_percentage, compressed_percentage;
  1463. if (total != 0) {
  1464. rejection_percentage = static_cast<int>(rejected_count * 100 / total);
  1465. bypassed_percentage = static_cast<int>(bypassed_count * 100 / total);
  1466. compressed_percentage =
  1467. static_cast<int>(compressed_count * 100 / total);
  1468. // use nth window to detect test cases and set the expected
  1469. switch (nth_window) {
  1470. case 1:
  1471. // In first window we only explore and thus here we verify that the
  1472. // correct prediction has been made by the end of the window
  1473. // Since 6 of 10 blocks are compression unfriendly, the predicted
  1474. // rejection ratio should be 60%
  1475. EXPECT_EQ(rejection_percentage, 60);
  1476. EXPECT_EQ(bypassed_percentage, 0);
  1477. EXPECT_EQ(compressed_percentage, 40);
  1478. break;
  1479. case 2:
  1480. // With the rejection ratio set to 0.6 all the blocks should be
  1481. // bypassed in next window
  1482. EXPECT_EQ(rejection_percentage, 0);
  1483. EXPECT_EQ(bypassed_percentage, 100);
  1484. EXPECT_EQ(compressed_percentage, 0);
  1485. break;
  1486. case 3:
  1487. // In third window we only explore and verify that the correct
  1488. // prediction has been made by the end of the window
  1489. // since 4 of 10 blocks are compression ufriendly, the predicted
  1490. // rejection ratio should be 40%
  1491. EXPECT_EQ(rejection_percentage, 40);
  1492. EXPECT_EQ(bypassed_percentage, 0);
  1493. EXPECT_EQ(compressed_percentage, 60);
  1494. break;
  1495. case 4:
  1496. // With the rejection ratio set to 0.4 all the blocks should be
  1497. // attempted to be compressed
  1498. // 6 of 10 blocks are compression unfriendly and thus should be
  1499. // rejected 4 of 10 blocks are compression friendly and thus should
  1500. // be compressed
  1501. EXPECT_EQ(rejection_percentage, 60);
  1502. EXPECT_EQ(bypassed_percentage, 0);
  1503. EXPECT_EQ(compressed_percentage, 40);
  1504. }
  1505. }
  1506. }
  1507. num_keys_++;
  1508. return true;
  1509. }
  1510. uint64_t PopStat(Tickers t) { return statistics_->getAndResetTickerCount(t); }
  1511. private:
  1512. int window_;
  1513. int num_keys_;
  1514. const BlockBuilder& data_block_builder_;
  1515. std::shared_ptr<Statistics> statistics_;
  1516. };
  1517. class AutoSkipTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
  1518. public:
  1519. explicit AutoSkipTestFlushBlockPolicyFactory(
  1520. const int window, std::shared_ptr<Statistics> statistics)
  1521. : window_(window), statistics_(statistics) {}
  1522. virtual const char* Name() const override {
  1523. return "AutoSkipTestFlushBlockPolicyFactory";
  1524. }
  1525. virtual FlushBlockPolicy* NewFlushBlockPolicy(
  1526. const BlockBasedTableOptions& /*table_options*/,
  1527. const BlockBuilder& data_block_builder) const override {
  1528. (void)data_block_builder;
  1529. return new AutoSkipTestFlushBlockPolicy(window_, data_block_builder,
  1530. statistics_);
  1531. }
  1532. private:
  1533. int window_;
  1534. std::shared_ptr<Statistics> statistics_;
  1535. };
  1536. class DBAutoSkip : public DBTestBase {
  1537. public:
  1538. Options options;
  1539. Random rnd_;
  1540. int key_index_;
  1541. DBAutoSkip()
  1542. : DBTestBase("db_auto_skip", /*env_do_fsync=*/true),
  1543. options(CurrentOptions()),
  1544. rnd_(231),
  1545. key_index_(0) {
  1546. options.compression_manager = CreateAutoSkipCompressionManager();
  1547. auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  1548. options.statistics = statistics;
  1549. options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
  1550. BlockBasedTableOptions bbto;
  1551. bbto.enable_index_compression = false;
  1552. bbto.flush_block_policy_factory.reset(
  1553. new AutoSkipTestFlushBlockPolicyFactory(10, statistics));
  1554. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1555. }
  1556. bool CompressionFriendlyPut(const int no_of_kvs, const int size_of_value) {
  1557. auto value = std::string(size_of_value, 'A');
  1558. for (int i = 0; i < no_of_kvs; ++i) {
  1559. auto status = Put(Key(key_index_), value);
  1560. EXPECT_EQ(status.ok(), true);
  1561. key_index_++;
  1562. }
  1563. return true;
  1564. }
  1565. bool CompressionUnfriendlyPut(const int no_of_kvs, const int size_of_value) {
  1566. auto value = rnd_.RandomBinaryString(size_of_value);
  1567. for (int i = 0; i < no_of_kvs; ++i) {
  1568. auto status = Put(Key(key_index_), value);
  1569. EXPECT_EQ(status.ok(), true);
  1570. key_index_++;
  1571. }
  1572. return true;
  1573. }
  1574. };
  1575. TEST_F(DBAutoSkip, AutoSkipCompressionManager) {
  1576. for (auto type : GetSupportedCompressions()) {
  1577. if (type == kNoCompression) {
  1578. continue;
  1579. }
  1580. options.compression = type;
  1581. options.bottommost_compression = type;
  1582. DestroyAndReopen(options);
  1583. const int kValueSize = 20000;
  1584. // This will set the rejection ratio to 60%
  1585. CompressionUnfriendlyPut(6, kValueSize);
  1586. CompressionFriendlyPut(4, kValueSize);
  1587. // This will verify all the data block compressions are bypassed based on
  1588. // previous prediction
  1589. CompressionUnfriendlyPut(6, kValueSize);
  1590. CompressionFriendlyPut(4, kValueSize);
  1591. // This will set the rejection ratio to 40%
  1592. CompressionUnfriendlyPut(4, kValueSize);
  1593. CompressionFriendlyPut(6, kValueSize);
  1594. // This will verify all the data block compression are attempted based on
  1595. // previous prediction
  1596. // Compression will be rejected for 6 compression unfriendly blocks
  1597. // Compression will be accepted for 4 compression friendly blocks
  1598. CompressionUnfriendlyPut(6, kValueSize);
  1599. CompressionFriendlyPut(4, kValueSize);
  1600. // Extra block write to ensure that the all above cases are checked
  1601. CompressionFriendlyPut(6, kValueSize);
  1602. CompressionFriendlyPut(4, kValueSize);
  1603. ASSERT_OK(Flush());
  1604. }
  1605. }
  1606. class CostAwareTestFlushBlockPolicy : public FlushBlockPolicy {
  1607. public:
  1608. explicit CostAwareTestFlushBlockPolicy(const int window,
  1609. const BlockBuilder& data_block_builder)
  1610. : window_(window),
  1611. num_keys_(0),
  1612. data_block_builder_(data_block_builder) {}
  1613. bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
  1614. auto nth_window = num_keys_ / window_;
  1615. if (data_block_builder_.empty()) {
  1616. // First key in this block
  1617. return false;
  1618. }
  1619. // Check every window
  1620. if (num_keys_ % window_ == 0) {
  1621. auto get_predictor = [&](void* arg) {
  1622. // gets the predictor and sets the mocked cpu and io cost
  1623. predictor_ = static_cast<IOCPUCostPredictor*>(arg);
  1624. predictor_->CPUPredictor.SetPrediction(1000);
  1625. predictor_->IOPredictor.SetPrediction(100);
  1626. };
  1627. SyncPoint::GetInstance()->DisableProcessing();
  1628. SyncPoint::GetInstance()->ClearAllCallBacks();
  1629. // Add syncpoint to get the cpu and io cost
  1630. SyncPoint::GetInstance()->SetCallBack(
  1631. "CostAwareCompressor::CompressBlockAndRecord::"
  1632. "GetPredictor",
  1633. get_predictor);
  1634. SyncPoint::GetInstance()->EnableProcessing();
  1635. // use nth window to detect test cases and set the expected
  1636. switch (nth_window) {
  1637. case 0:
  1638. break;
  1639. case 1:
  1640. // Verify that the Mocked cpu cost and io cost are predicted correctly
  1641. auto predicted_cpu_time = predictor_->CPUPredictor.Predict();
  1642. auto predicted_io_bytes = predictor_->IOPredictor.Predict();
  1643. EXPECT_EQ(predicted_io_bytes, 100);
  1644. EXPECT_EQ(predicted_cpu_time, 1000);
  1645. break;
  1646. }
  1647. }
  1648. num_keys_++;
  1649. return true;
  1650. }
  1651. private:
  1652. int window_;
  1653. int num_keys_;
  1654. const BlockBuilder& data_block_builder_;
  1655. IOCPUCostPredictor* predictor_;
  1656. };
  1657. class CostAwareTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
  1658. public:
  1659. explicit CostAwareTestFlushBlockPolicyFactory(const int window)
  1660. : window_(window) {}
  1661. virtual const char* Name() const override {
  1662. return "CostAwareTestFlushBlockPolicyFactory";
  1663. }
  1664. virtual FlushBlockPolicy* NewFlushBlockPolicy(
  1665. const BlockBasedTableOptions& /*table_options*/,
  1666. const BlockBuilder& data_block_builder) const override {
  1667. (void)data_block_builder;
  1668. return new CostAwareTestFlushBlockPolicy(window_, data_block_builder);
  1669. }
  1670. private:
  1671. int window_;
  1672. };
  1673. class DBCompressionCostPredictor : public DBTestBase {
  1674. public:
  1675. Options options;
  1676. DBCompressionCostPredictor()
  1677. : DBTestBase("db_cpuio_skip", /*env_do_fsync=*/true),
  1678. options(CurrentOptions()) {
  1679. options.compression_manager = CreateCostAwareCompressionManager();
  1680. auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  1681. options.statistics = statistics;
  1682. options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
  1683. BlockBasedTableOptions bbto;
  1684. bbto.enable_index_compression = false;
  1685. bbto.flush_block_policy_factory.reset(
  1686. new CostAwareTestFlushBlockPolicyFactory(10));
  1687. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  1688. DestroyAndReopen(options);
  1689. }
  1690. };
  1691. TEST_F(DBCompressionCostPredictor, CostAwareCompressorManager) {
  1692. // making sure that the compression is supported
  1693. if (!ZSTD_Supported()) {
  1694. return;
  1695. }
  1696. const int kValueSize = 20000;
  1697. int next_key = 0;
  1698. Random rnd(231);
  1699. auto value = rnd.RandomBinaryString(kValueSize);
  1700. int window_size = 10;
  1701. auto WindowWrite = [&]() {
  1702. for (auto i = 0; i < window_size; ++i) {
  1703. auto status = Put(Key(next_key), value);
  1704. EXPECT_OK(status);
  1705. next_key++;
  1706. }
  1707. };
  1708. // This denotes the first window
  1709. // Mocked to have specific cpu utilization and io cost
  1710. WindowWrite();
  1711. // check the predictor is predicting the correct cpu and io cost
  1712. WindowWrite();
  1713. ASSERT_OK(Flush());
  1714. }
  1715. } // namespace ROCKSDB_NAMESPACE
  1716. int main(int argc, char** argv) {
  1717. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1718. ::testing::InitGoogleTest(&argc, argv);
  1719. RegisterCustomObjects(argc, argv);
  1720. return RUN_ALL_TESTS();
  1721. }