compression.cc 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751
  1. // Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "util/compression.h"
  6. #include "options/options_helper.h"
  7. #include "rocksdb/convenience.h"
  8. #include "rocksdb/utilities/object_registry.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. StreamingCompress* StreamingCompress::Create(CompressionType compression_type,
  11. const CompressionOptions& opts,
  12. uint32_t compress_format_version,
  13. size_t max_output_len) {
  14. switch (compression_type) {
  15. case kZSTD: {
  16. if (!ZSTD_Streaming_Supported()) {
  17. return nullptr;
  18. }
  19. return new ZSTDStreamingCompress(opts, compress_format_version,
  20. max_output_len);
  21. }
  22. default:
  23. return nullptr;
  24. }
  25. }
  26. StreamingUncompress* StreamingUncompress::Create(
  27. CompressionType compression_type, uint32_t compress_format_version,
  28. size_t max_output_len) {
  29. switch (compression_type) {
  30. case kZSTD: {
  31. if (!ZSTD_Streaming_Supported()) {
  32. return nullptr;
  33. }
  34. return new ZSTDStreamingUncompress(compress_format_version,
  35. max_output_len);
  36. }
  37. default:
  38. return nullptr;
  39. }
  40. }
  41. int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
  42. char* output, size_t* output_pos) {
  43. assert(input != nullptr && output != nullptr && output_pos != nullptr);
  44. *output_pos = 0;
  45. // Don't need to compress an empty input
  46. if (input_size == 0) {
  47. return 0;
  48. }
  49. #ifndef ZSTD
  50. (void)input;
  51. (void)input_size;
  52. (void)output;
  53. return -1;
  54. #else
  55. if (input_buffer_.src == nullptr || input_buffer_.src != input) {
  56. // New input
  57. // Catch errors where the previous input was not fully decompressed.
  58. assert(input_buffer_.pos == input_buffer_.size);
  59. input_buffer_ = {input, input_size, /*pos=*/0};
  60. } else if (input_buffer_.src == input) {
  61. // Same input, not fully compressed.
  62. }
  63. ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
  64. const size_t remaining =
  65. ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end);
  66. if (ZSTD_isError(remaining)) {
  67. // Failure
  68. Reset();
  69. return -1;
  70. }
  71. // Success
  72. *output_pos = output_buffer.pos;
  73. return (int)remaining;
  74. #endif
  75. }
  76. void ZSTDStreamingCompress::Reset() {
  77. #ifdef ZSTD
  78. ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
  79. input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
  80. #endif
  81. }
  82. int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
  83. char* output, size_t* output_pos) {
  84. assert(output != nullptr && output_pos != nullptr);
  85. *output_pos = 0;
  86. // Don't need to uncompress an empty input
  87. if (input_size == 0) {
  88. return 0;
  89. }
  90. #ifdef ZSTD
  91. if (input) {
  92. // New input
  93. input_buffer_ = {input, input_size, /*pos=*/0};
  94. }
  95. ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
  96. size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_);
  97. if (ZSTD_isError(ret)) {
  98. Reset();
  99. return -1;
  100. }
  101. *output_pos = output_buffer.pos;
  102. return (int)(input_buffer_.size - input_buffer_.pos);
  103. #else
  104. (void)input;
  105. (void)input_size;
  106. (void)output;
  107. return -1;
  108. #endif
  109. }
  110. void ZSTDStreamingUncompress::Reset() {
  111. #ifdef ZSTD
  112. ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
  113. input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
  114. #endif
  115. }
  116. // ***********************************************************************
  117. // BEGIN built-in implementation of customization interface
  118. // ***********************************************************************
  119. Status Decompressor::ExtractUncompressedSize(Args& args) {
  120. // Default implementation:
  121. //
  122. // Standard format for prepending uncompressed size to the compressed
  123. // payload. (RocksDB compress_format_version=2 except Snappy)
  124. //
  125. // This is historically a varint32, but it is preliminarily generalized
  126. // to varint64, in case that is supported on the write side for some
  127. // algorithms.
  128. if (LIKELY(GetVarint64(&args.compressed_data, &args.uncompressed_size))) {
  129. if (LIKELY(args.uncompressed_size <= SIZE_MAX)) {
  130. return Status::OK();
  131. } else {
  132. return Status::MemoryLimit("Uncompressed size too large for platform");
  133. }
  134. } else {
  135. return Status::Corruption("Unable to extract uncompressed size");
  136. }
  137. }
  138. const Slice& Decompressor::GetSerializedDict() const {
  139. // Default: empty slice => no dictionary
  140. static Slice kEmptySlice;
  141. return kEmptySlice;
  142. }
  143. namespace {
  144. class CompressorBase : public Compressor {
  145. public:
  146. explicit CompressorBase(const CompressionOptions& opts) : opts_(opts) {}
  147. protected:
  148. CompressionOptions opts_;
  149. };
  150. class BuiltinCompressorV1 : public CompressorBase {
  151. public:
  152. const char* Name() const override { return "BuiltinCompressorV1"; }
  153. explicit BuiltinCompressorV1(const CompressionOptions& opts,
  154. CompressionType type)
  155. : CompressorBase(opts), type_(type) {
  156. assert(type != kNoCompression);
  157. }
  158. CompressionType GetPreferredCompressionType() const override { return type_; }
  159. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  160. size_t* compressed_output_size,
  161. CompressionType* out_compression_type,
  162. ManagedWorkingArea* wa) override {
  163. std::optional<CompressionContext> tmp_ctx;
  164. CompressionContext* ctx = nullptr;
  165. if (wa != nullptr && wa->owner() == this) {
  166. ctx = static_cast<CompressionContext*>(wa->get());
  167. }
  168. if (ctx == nullptr) {
  169. tmp_ctx.emplace(type_, opts_);
  170. ctx = &*tmp_ctx;
  171. }
  172. CompressionInfo info(opts_, *ctx, CompressionDict::GetEmptyDict(), type_);
  173. std::string str_output;
  174. str_output.reserve(uncompressed_data.size());
  175. if (!OLD_CompressData(uncompressed_data, info,
  176. 1 /*compress_format_version*/, &str_output)) {
  177. // Maybe rejected or bypassed
  178. *compressed_output_size = str_output.size();
  179. *out_compression_type = kNoCompression;
  180. return Status::OK();
  181. }
  182. if (str_output.size() > *compressed_output_size) {
  183. // Compression rejected
  184. *out_compression_type = kNoCompression;
  185. return Status::OK();
  186. }
  187. std::memcpy(compressed_output, str_output.data(), str_output.size());
  188. *compressed_output_size = str_output.size();
  189. *out_compression_type = type_;
  190. return Status::OK();
  191. }
  192. protected:
  193. const CompressionType type_;
  194. };
  195. class CompressorWithSimpleDictBase : public CompressorBase {
  196. public:
  197. explicit CompressorWithSimpleDictBase(const CompressionOptions& opts,
  198. std::string&& dict_data = {})
  199. : CompressorBase(opts), dict_data_(std::move(dict_data)) {}
  200. size_t GetMaxSampleSizeIfWantDict(
  201. CacheEntryRole /*block_type*/) const override {
  202. return opts_.max_dict_bytes;
  203. }
  204. // NOTE: empty dict is equivalent to no dict
  205. Slice GetSerializedDict() const override { return dict_data_; }
  206. std::unique_ptr<Compressor> MaybeCloneSpecialized(
  207. CacheEntryRole /*block_type*/,
  208. DictSampleArgs&& dict_samples) final override {
  209. assert(dict_samples.Verify());
  210. if (dict_samples.empty()) {
  211. // Nothing to specialize on
  212. return nullptr;
  213. } else {
  214. return CloneForDict(std::move(dict_samples.sample_data));
  215. }
  216. }
  217. virtual std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) = 0;
  218. protected:
  219. const std::string dict_data_;
  220. };
  221. // NOTE: the legacy behavior is to pretend to use dictionary compression when
  222. // enabled, including storing a dictionary block, but to ignore it. That is
  223. // matched here.
  224. class BuiltinSnappyCompressorV2 : public CompressorWithSimpleDictBase {
  225. public:
  226. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  227. const char* Name() const override { return "BuiltinSnappyCompressorV2"; }
  228. CompressionType GetPreferredCompressionType() const override {
  229. return kSnappyCompression;
  230. }
  231. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  232. return std::make_unique<BuiltinSnappyCompressorV2>(opts_,
  233. std::move(dict_data));
  234. }
  235. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  236. size_t* compressed_output_size,
  237. CompressionType* out_compression_type,
  238. ManagedWorkingArea*) override {
  239. #ifdef SNAPPY
  240. struct MySink : public snappy::Sink {
  241. MySink(char* output, size_t output_size)
  242. : output_(output), output_size_(output_size) {}
  243. char* output_;
  244. size_t output_size_;
  245. size_t pos_ = 0;
  246. void Append(const char* data, size_t n) override {
  247. if (pos_ + n <= output_size_) {
  248. std::memcpy(output_ + pos_, data, n);
  249. pos_ += n;
  250. } else {
  251. // Virtual abort
  252. pos_ = output_size_ + 1;
  253. }
  254. }
  255. char* GetAppendBuffer(size_t length, char* scratch) override {
  256. if (pos_ + length <= output_size_) {
  257. return output_ + pos_;
  258. }
  259. return scratch;
  260. }
  261. };
  262. MySink sink{compressed_output, *compressed_output_size};
  263. snappy::ByteArraySource source{uncompressed_data.data(),
  264. uncompressed_data.size()};
  265. size_t outlen = snappy::Compress(&source, &sink);
  266. if (outlen > 0 && sink.pos_ <= sink.output_size_) {
  267. // Compression kept/successful
  268. assert(outlen == sink.pos_);
  269. *compressed_output_size = outlen;
  270. *out_compression_type = kSnappyCompression;
  271. return Status::OK();
  272. }
  273. // Compression rejected
  274. *compressed_output_size = 1;
  275. #else
  276. (void)uncompressed_data;
  277. (void)compressed_output;
  278. // Compression bypassed (not supported)
  279. *compressed_output_size = 0;
  280. #endif
  281. *out_compression_type = kNoCompression;
  282. return Status::OK();
  283. }
  284. std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
  285. };
  286. [[maybe_unused]]
  287. std::pair<char*, size_t> StartCompressBlockV2(Slice uncompressed_data,
  288. char* compressed_output,
  289. size_t compressed_output_size) {
  290. if ( // Can't compress more than 4GB
  291. uncompressed_data.size() > std::numeric_limits<uint32_t>::max() ||
  292. // Need enough output space for encoding uncompressed size
  293. compressed_output_size <= 5) {
  294. // Compression bypassed
  295. return {nullptr, 0};
  296. }
  297. // Standard format for prepending uncompressed size to the compressed
  298. // data in compress_format_version=2
  299. char* alg_output = EncodeVarint32(
  300. compressed_output, static_cast<uint32_t>(uncompressed_data.size()));
  301. size_t alg_max_output_size =
  302. compressed_output_size - (alg_output - compressed_output);
  303. return {alg_output, alg_max_output_size};
  304. }
  305. class BuiltinZlibCompressorV2 : public CompressorWithSimpleDictBase {
  306. public:
  307. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  308. const char* Name() const override { return "BuiltinZlibCompressorV2"; }
  309. CompressionType GetPreferredCompressionType() const override {
  310. return kZlibCompression;
  311. }
  312. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  313. return std::make_unique<BuiltinZlibCompressorV2>(opts_,
  314. std::move(dict_data));
  315. }
  316. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  317. size_t* compressed_output_size,
  318. CompressionType* out_compression_type,
  319. ManagedWorkingArea*) override {
  320. #ifdef ZLIB
  321. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  322. uncompressed_data, compressed_output, *compressed_output_size);
  323. if (alg_max_output_size == 0) {
  324. // Compression bypassed
  325. *compressed_output_size = 0;
  326. *out_compression_type = kNoCompression;
  327. return Status::OK();
  328. }
  329. // The memLevel parameter specifies how much memory should be allocated for
  330. // the internal compression state.
  331. // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
  332. // memLevel=9 uses maximum memory for optimal speed.
  333. // The default value is 8. See zconf.h for more details.
  334. static const int memLevel = 8;
  335. int level = opts_.level;
  336. if (level == CompressionOptions::kDefaultCompressionLevel) {
  337. level = Z_DEFAULT_COMPRESSION;
  338. }
  339. z_stream stream;
  340. memset(&stream, 0, sizeof(z_stream));
  341. // Initialize the zlib stream
  342. int st = deflateInit2(&stream, level, Z_DEFLATED, opts_.window_bits,
  343. memLevel, opts_.strategy);
  344. if (st != Z_OK) {
  345. *compressed_output_size = 0;
  346. *out_compression_type = kNoCompression;
  347. return Status::OK();
  348. }
  349. // Set dictionary if available
  350. if (!dict_data_.empty()) {
  351. st = deflateSetDictionary(
  352. &stream, reinterpret_cast<const Bytef*>(dict_data_.data()),
  353. static_cast<unsigned int>(dict_data_.size()));
  354. if (st != Z_OK) {
  355. deflateEnd(&stream);
  356. *compressed_output_size = 0;
  357. *out_compression_type = kNoCompression;
  358. return Status::OK();
  359. }
  360. }
  361. // Set up input
  362. stream.next_in = (Bytef*)uncompressed_data.data();
  363. stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
  364. // Set up output
  365. stream.next_out = reinterpret_cast<Bytef*>(alg_output);
  366. stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
  367. // Compress
  368. st = deflate(&stream, Z_FINISH);
  369. size_t outlen = alg_max_output_size - stream.avail_out;
  370. deflateEnd(&stream);
  371. if (st == Z_STREAM_END) {
  372. // Compression kept/successful
  373. *compressed_output_size =
  374. outlen + /*header size*/ (alg_output - compressed_output);
  375. *out_compression_type = kZlibCompression;
  376. return Status::OK();
  377. }
  378. // Compression failed or rejected
  379. *compressed_output_size = 1;
  380. #else
  381. (void)uncompressed_data;
  382. (void)compressed_output;
  383. // Compression bypassed (not supported)
  384. *compressed_output_size = 0;
  385. #endif
  386. *out_compression_type = kNoCompression;
  387. return Status::OK();
  388. }
  389. };
  390. class BuiltinBZip2CompressorV2 : public CompressorWithSimpleDictBase {
  391. public:
  392. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  393. const char* Name() const override { return "BuiltinBZip2CompressorV2"; }
  394. CompressionType GetPreferredCompressionType() const override {
  395. return kBZip2Compression;
  396. }
  397. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  398. return std::make_unique<BuiltinBZip2CompressorV2>(opts_,
  399. std::move(dict_data));
  400. }
  401. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  402. size_t* compressed_output_size,
  403. CompressionType* out_compression_type,
  404. ManagedWorkingArea*) override {
  405. #ifdef BZIP2
  406. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  407. uncompressed_data, compressed_output, *compressed_output_size);
  408. if (alg_max_output_size == 0) {
  409. // Compression bypassed
  410. *compressed_output_size = 0;
  411. *out_compression_type = kNoCompression;
  412. return Status::OK();
  413. }
  414. // BZip2 doesn't actually use the dictionary, but we store it for
  415. // compatibility similar to BuiltinSnappyCompressorV2
  416. // Initialize the bzip2 stream
  417. bz_stream stream;
  418. memset(&stream, 0, sizeof(bz_stream));
  419. // Block size 1 is 100K.
  420. // 0 is for silent.
  421. // 30 is the default workFactor
  422. int st = BZ2_bzCompressInit(&stream, 1, 0, 30);
  423. if (st != BZ_OK) {
  424. *compressed_output_size = 0;
  425. *out_compression_type = kNoCompression;
  426. return Status::OK();
  427. }
  428. // Set up input
  429. stream.next_in = const_cast<char*>(uncompressed_data.data());
  430. stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
  431. // Set up output
  432. stream.next_out = alg_output;
  433. stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
  434. // Compress
  435. st = BZ2_bzCompress(&stream, BZ_FINISH);
  436. size_t outlen = alg_max_output_size - stream.avail_out;
  437. BZ2_bzCompressEnd(&stream);
  438. // Check for success
  439. if (st == BZ_STREAM_END) {
  440. // Compression kept/successful
  441. *compressed_output_size = outlen + (alg_output - compressed_output);
  442. *out_compression_type = kBZip2Compression;
  443. return Status::OK();
  444. }
  445. // Compression failed or rejected
  446. *compressed_output_size = 1;
  447. #else
  448. (void)uncompressed_data;
  449. (void)compressed_output;
  450. // Compression bypassed (not supported)
  451. *compressed_output_size = 0;
  452. #endif
  453. *out_compression_type = kNoCompression;
  454. return Status::OK();
  455. }
  456. };
  457. class BuiltinLZ4CompressorV2WithDict : public CompressorWithSimpleDictBase {
  458. public:
  459. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  460. const char* Name() const override { return "BuiltinLZ4CompressorV2"; }
  461. CompressionType GetPreferredCompressionType() const override {
  462. return kLZ4Compression;
  463. }
  464. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  465. return std::make_unique<BuiltinLZ4CompressorV2WithDict>(
  466. opts_, std::move(dict_data));
  467. }
  468. ManagedWorkingArea ObtainWorkingArea() override {
  469. #ifdef LZ4
  470. return {reinterpret_cast<WorkingArea*>(LZ4_createStream()), this};
  471. #else
  472. return {};
  473. #endif
  474. }
  475. void ReleaseWorkingArea(WorkingArea* wa) override {
  476. if (wa) {
  477. #ifdef LZ4
  478. LZ4_freeStream(reinterpret_cast<LZ4_stream_t*>(wa));
  479. #endif
  480. }
  481. }
  482. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  483. size_t* compressed_output_size,
  484. CompressionType* out_compression_type,
  485. ManagedWorkingArea* wa) override {
  486. #ifdef LZ4
  487. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  488. uncompressed_data, compressed_output, *compressed_output_size);
  489. if (alg_max_output_size == 0) {
  490. // Compression bypassed
  491. *compressed_output_size = 0;
  492. *out_compression_type = kNoCompression;
  493. return Status::OK();
  494. }
  495. ManagedWorkingArea tmp_wa;
  496. LZ4_stream_t* stream;
  497. if (wa != nullptr && wa->owner() == this) {
  498. stream = reinterpret_cast<LZ4_stream_t*>(wa->get());
  499. #if LZ4_VERSION_NUMBER >= 10900 // >= version 1.9.0
  500. LZ4_resetStream_fast(stream);
  501. #else
  502. LZ4_resetStream(stream);
  503. #endif
  504. } else {
  505. tmp_wa = ObtainWorkingArea();
  506. stream = reinterpret_cast<LZ4_stream_t*>(tmp_wa.get());
  507. }
  508. if (!dict_data_.empty()) {
  509. // TODO: more optimization possible here?
  510. LZ4_loadDict(stream, dict_data_.data(),
  511. static_cast<int>(dict_data_.size()));
  512. }
  513. int acceleration;
  514. if (opts_.level < 0) {
  515. acceleration = -opts_.level;
  516. } else {
  517. acceleration = 1;
  518. }
  519. auto outlen = LZ4_compress_fast_continue(
  520. stream, uncompressed_data.data(), alg_output,
  521. static_cast<int>(uncompressed_data.size()),
  522. static_cast<int>(alg_max_output_size), acceleration);
  523. if (outlen > 0) {
  524. // Compression kept/successful
  525. size_t output_size = static_cast<size_t>(
  526. outlen + /*header size*/ (alg_output - compressed_output));
  527. assert(output_size <= *compressed_output_size);
  528. *compressed_output_size = output_size;
  529. *out_compression_type = kLZ4Compression;
  530. return Status::OK();
  531. }
  532. // Compression rejected
  533. *compressed_output_size = 1;
  534. #else
  535. (void)uncompressed_data;
  536. (void)compressed_output;
  537. (void)wa;
  538. // Compression bypassed (not supported)
  539. *compressed_output_size = 0;
  540. #endif
  541. *out_compression_type = kNoCompression;
  542. return Status::OK();
  543. }
  544. };
  545. class BuiltinLZ4CompressorV2NoDict : public BuiltinLZ4CompressorV2WithDict {
  546. public:
  547. BuiltinLZ4CompressorV2NoDict(const CompressionOptions& opts)
  548. : BuiltinLZ4CompressorV2WithDict(opts, /*dict_data=*/{}) {}
  549. ManagedWorkingArea ObtainWorkingArea() override {
  550. // Using an LZ4_stream_t between compressions and resetting with
  551. // LZ4_resetStream_fast is actually slower than using a fresh LZ4_stream_t
  552. // each time, or not involving a stream at all. Similarly, using an extState
  553. // does not seem to offer a performance boost, perhaps a small regression.
  554. return {};
  555. }
  556. void ReleaseWorkingArea(WorkingArea* wa) override {
  557. // Should not be called
  558. (void)wa;
  559. assert(wa == nullptr);
  560. }
  561. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  562. size_t* compressed_output_size,
  563. CompressionType* out_compression_type,
  564. ManagedWorkingArea* wa) override {
  565. #ifdef LZ4
  566. (void)wa;
  567. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  568. uncompressed_data, compressed_output, *compressed_output_size);
  569. if (alg_max_output_size == 0) {
  570. // Compression bypassed
  571. *compressed_output_size = 0;
  572. *out_compression_type = kNoCompression;
  573. return Status::OK();
  574. }
  575. int acceleration;
  576. if (opts_.level < 0) {
  577. acceleration = -opts_.level;
  578. } else {
  579. acceleration = 1;
  580. }
  581. auto outlen =
  582. LZ4_compress_fast(uncompressed_data.data(), alg_output,
  583. static_cast<int>(uncompressed_data.size()),
  584. static_cast<int>(alg_max_output_size), acceleration);
  585. if (outlen > 0) {
  586. // Compression kept/successful
  587. size_t output_size = static_cast<size_t>(
  588. outlen + /*header size*/ (alg_output - compressed_output));
  589. assert(output_size <= *compressed_output_size);
  590. *compressed_output_size = output_size;
  591. *out_compression_type = kLZ4Compression;
  592. return Status::OK();
  593. }
  594. // Compression rejected
  595. *compressed_output_size = 1;
  596. #else
  597. (void)uncompressed_data;
  598. (void)compressed_output;
  599. (void)wa;
  600. // Compression bypassed (not supported)
  601. *compressed_output_size = 0;
  602. #endif
  603. *out_compression_type = kNoCompression;
  604. return Status::OK();
  605. }
  606. };
  607. class BuiltinLZ4HCCompressorV2 : public CompressorWithSimpleDictBase {
  608. public:
  609. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  610. const char* Name() const override { return "BuiltinLZ4HCCompressorV2"; }
  611. CompressionType GetPreferredCompressionType() const override {
  612. return kLZ4HCCompression;
  613. }
  614. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  615. return std::make_unique<BuiltinLZ4HCCompressorV2>(opts_,
  616. std::move(dict_data));
  617. }
  618. ManagedWorkingArea ObtainWorkingArea() override {
  619. #ifdef LZ4
  620. return {reinterpret_cast<WorkingArea*>(LZ4_createStreamHC()), this};
  621. #else
  622. return {};
  623. #endif
  624. }
  625. void ReleaseWorkingArea(WorkingArea* wa) override {
  626. if (wa) {
  627. #ifdef LZ4
  628. LZ4_freeStreamHC(reinterpret_cast<LZ4_streamHC_t*>(wa));
  629. #endif
  630. }
  631. }
  632. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  633. size_t* compressed_output_size,
  634. CompressionType* out_compression_type,
  635. ManagedWorkingArea* wa) override {
  636. #ifdef LZ4
  637. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  638. uncompressed_data, compressed_output, *compressed_output_size);
  639. if (alg_max_output_size == 0) {
  640. // Compression bypassed
  641. *compressed_output_size = 0;
  642. *out_compression_type = kNoCompression;
  643. return Status::OK();
  644. }
  645. int level = opts_.level;
  646. if (level == CompressionOptions::kDefaultCompressionLevel) {
  647. level = 0; // lz4hc.h says any value < 1 will be sanitized to default
  648. }
  649. ManagedWorkingArea tmp_wa;
  650. LZ4_streamHC_t* stream;
  651. if (wa != nullptr && wa->owner() == this) {
  652. stream = reinterpret_cast<LZ4_streamHC_t*>(wa->get());
  653. } else {
  654. tmp_wa = ObtainWorkingArea();
  655. stream = reinterpret_cast<LZ4_streamHC_t*>(tmp_wa.get());
  656. }
  657. #if LZ4_VERSION_NUMBER >= 10900 // >= version 1.9.0
  658. LZ4_resetStreamHC_fast(stream, level);
  659. #else
  660. LZ4_resetStreamHC(stream, level);
  661. #endif
  662. if (dict_data_.size() > 0) {
  663. // TODO: more optimization possible here?
  664. LZ4_loadDictHC(stream, dict_data_.data(),
  665. static_cast<int>(dict_data_.size()));
  666. }
  667. auto outlen =
  668. LZ4_compress_HC_continue(stream, uncompressed_data.data(), alg_output,
  669. static_cast<int>(uncompressed_data.size()),
  670. static_cast<int>(alg_max_output_size));
  671. if (outlen > 0) {
  672. // Compression kept/successful
  673. size_t output_size = static_cast<size_t>(
  674. outlen + /*header size*/ (alg_output - compressed_output));
  675. assert(output_size <= *compressed_output_size);
  676. *compressed_output_size = output_size;
  677. *out_compression_type = kLZ4HCCompression;
  678. return Status::OK();
  679. }
  680. // Compression rejected
  681. *compressed_output_size = 1;
  682. #else
  683. (void)uncompressed_data;
  684. (void)compressed_output;
  685. (void)wa;
  686. // Compression bypassed (not supported)
  687. *compressed_output_size = 0;
  688. #endif
  689. *out_compression_type = kNoCompression;
  690. return Status::OK();
  691. }
  692. };
  693. class BuiltinXpressCompressorV2 : public CompressorWithSimpleDictBase {
  694. public:
  695. using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
  696. const char* Name() const override { return "BuiltinXpressCompressorV2"; }
  697. CompressionType GetPreferredCompressionType() const override {
  698. return kXpressCompression;
  699. }
  700. std::unique_ptr<Compressor> CloneForDict(std::string&& dict_data) override {
  701. return std::make_unique<BuiltinXpressCompressorV2>(opts_,
  702. std::move(dict_data));
  703. }
  704. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  705. size_t* compressed_output_size,
  706. CompressionType* out_compression_type,
  707. ManagedWorkingArea*) override {
  708. #ifdef XPRESS
  709. // XPRESS doesn't actually use the dictionary, but we store it for
  710. // compatibility similar to BuiltinSnappyCompressorV2
  711. // Use the new CompressWithMaxSize function that writes directly to the
  712. // output buffer
  713. size_t compressed_size = port::xpress::CompressWithMaxSize(
  714. uncompressed_data.data(), uncompressed_data.size(), compressed_output,
  715. *compressed_output_size);
  716. if (compressed_size > 0) {
  717. // Compression kept/successful
  718. *compressed_output_size = compressed_size;
  719. *out_compression_type = kXpressCompression;
  720. return Status::OK();
  721. }
  722. // Compression rejected or failed
  723. *compressed_output_size = 1;
  724. #else
  725. (void)uncompressed_data;
  726. (void)compressed_output;
  727. // Compression bypassed (not supported)
  728. *compressed_output_size = 0;
  729. #endif
  730. *out_compression_type = kNoCompression;
  731. return Status::OK();
  732. }
  733. };
  734. class BuiltinZSTDCompressorV2 : public CompressorBase {
  735. public:
  736. explicit BuiltinZSTDCompressorV2(const CompressionOptions& opts,
  737. CompressionDict&& dict = {})
  738. : CompressorBase(opts), dict_(std::move(dict)) {}
  739. const char* Name() const override { return "BuiltinZSTDCompressorV2"; }
  740. CompressionType GetPreferredCompressionType() const override { return kZSTD; }
  741. size_t GetMaxSampleSizeIfWantDict(
  742. CacheEntryRole /*block_type*/) const override {
  743. if (opts_.max_dict_bytes == 0) {
  744. // Dictionary compression disabled
  745. return 0;
  746. } else {
  747. return opts_.zstd_max_train_bytes > 0 ? opts_.zstd_max_train_bytes
  748. : opts_.max_dict_bytes;
  749. }
  750. }
  751. // NOTE: empty dict is equivalent to no dict
  752. Slice GetSerializedDict() const override { return dict_.GetRawDict(); }
  753. ManagedWorkingArea ObtainWorkingArea() override {
  754. #ifdef ZSTD
  755. ZSTD_CCtx* ctx =
  756. #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
  757. ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
  758. #else // ROCKSDB_ZSTD_CUSTOM_MEM
  759. ZSTD_createCCtx();
  760. #endif // ROCKSDB_ZSTD_CUSTOM_MEM
  761. auto level = opts_.level;
  762. if (level == CompressionOptions::kDefaultCompressionLevel) {
  763. // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
  764. level = ZSTD_CLEVEL_DEFAULT;
  765. }
  766. size_t err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, level);
  767. if (ZSTD_isError(err)) {
  768. assert(false);
  769. ZSTD_freeCCtx(ctx);
  770. ctx = ZSTD_createCCtx();
  771. }
  772. if (opts_.checksum) {
  773. err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_checksumFlag, 1);
  774. if (ZSTD_isError(err)) {
  775. assert(false);
  776. ZSTD_freeCCtx(ctx);
  777. ctx = ZSTD_createCCtx();
  778. }
  779. }
  780. return ManagedWorkingArea(reinterpret_cast<WorkingArea*>(ctx), this);
  781. #else
  782. return {};
  783. #endif // ZSTD
  784. }
  785. void ReleaseWorkingArea(WorkingArea* wa) override {
  786. if (wa) {
  787. #ifdef ZSTD
  788. ZSTD_freeCCtx(reinterpret_cast<ZSTD_CCtx*>(wa));
  789. #endif // ZSTD
  790. }
  791. }
  792. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  793. size_t* compressed_output_size,
  794. CompressionType* out_compression_type,
  795. ManagedWorkingArea* wa) override {
  796. #ifdef ZSTD
  797. auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
  798. uncompressed_data, compressed_output, *compressed_output_size);
  799. if (alg_max_output_size == 0) {
  800. // Compression bypassed
  801. *compressed_output_size = 0;
  802. *out_compression_type = kNoCompression;
  803. return Status::OK();
  804. }
  805. ManagedWorkingArea tmp_wa;
  806. if (wa == nullptr || wa->owner() != this) {
  807. tmp_wa = ObtainWorkingArea();
  808. wa = &tmp_wa;
  809. }
  810. assert(wa->get() != nullptr);
  811. ZSTD_CCtx* ctx = reinterpret_cast<ZSTD_CCtx*>(wa->get());
  812. if (dict_.GetDigestedZstdCDict() != nullptr) {
  813. ZSTD_CCtx_refCDict(ctx, dict_.GetDigestedZstdCDict());
  814. } else {
  815. ZSTD_CCtx_loadDictionary(ctx, dict_.GetRawDict().data(),
  816. dict_.GetRawDict().size());
  817. }
  818. // Compression level is set in `contex` during ObtainWorkingArea()
  819. size_t outlen =
  820. ZSTD_compress2(ctx, alg_output, alg_max_output_size,
  821. uncompressed_data.data(), uncompressed_data.size());
  822. if (!ZSTD_isError(outlen)) {
  823. // Compression kept/successful
  824. size_t output_size = static_cast<size_t>(
  825. outlen + /*header size*/ (alg_output - compressed_output));
  826. assert(output_size <= *compressed_output_size);
  827. *compressed_output_size = output_size;
  828. *out_compression_type = kZSTD;
  829. return Status::OK();
  830. }
  831. if (ZSTD_getErrorCode(outlen) != ZSTD_error_dstSize_tooSmall) {
  832. return Status::Corruption(std::string("ZSTD_compress2 failed: ") +
  833. ZSTD_getErrorName(outlen));
  834. }
  835. // Compression rejected
  836. *compressed_output_size = 1;
  837. #else
  838. (void)uncompressed_data;
  839. (void)compressed_output;
  840. (void)wa;
  841. // Compression bypassed (not supported)
  842. *compressed_output_size = 0;
  843. #endif
  844. *out_compression_type = kNoCompression;
  845. return Status::OK();
  846. }
  847. std::unique_ptr<Compressor> MaybeCloneSpecialized(
  848. CacheEntryRole /*block_type*/, DictSampleArgs&& dict_samples) override {
  849. assert(dict_samples.Verify());
  850. if (dict_samples.empty()) {
  851. // Nothing to specialize on
  852. return nullptr;
  853. }
  854. std::string dict_data;
  855. // Migrated from BlockBasedTableBuilder::EnterUnbuffered()
  856. if (opts_.zstd_max_train_bytes > 0) {
  857. assert(dict_samples.sample_data.size() <= opts_.zstd_max_train_bytes);
  858. if (opts_.use_zstd_dict_trainer) {
  859. dict_data = ZSTD_TrainDictionary(dict_samples.sample_data,
  860. dict_samples.sample_lens,
  861. opts_.max_dict_bytes);
  862. } else {
  863. dict_data = ZSTD_FinalizeDictionary(dict_samples.sample_data,
  864. dict_samples.sample_lens,
  865. opts_.max_dict_bytes, opts_.level);
  866. }
  867. } else {
  868. assert(dict_samples.sample_data.size() <= opts_.max_dict_bytes);
  869. // ZSTD "raw content dictionary" - "Any buffer is a valid raw content
  870. // dictionary." Or similar for other compressions.
  871. dict_data = std::move(dict_samples.sample_data);
  872. }
  873. CompressionDict dict{std::move(dict_data), kZSTD, opts_.level};
  874. return std::make_unique<BuiltinZSTDCompressorV2>(opts_, std::move(dict));
  875. }
  876. std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
  877. protected:
  878. const CompressionDict dict_;
  879. };
  880. // NOTE: this implementation is intentionally SIMPLE based on existing code
  881. // and NOT EFFICIENT because this is an old/deprecated format.
  882. class BuiltinDecompressorV1 : public Decompressor {
  883. public:
  884. const char* Name() const override { return "BuiltinDecompressorV1"; }
  885. Status ExtractUncompressedSize(Args& args) override {
  886. CacheAllocationPtr throw_away_output;
  887. return DoUncompress(args, &throw_away_output, &args.uncompressed_size);
  888. }
  889. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  890. uint64_t same_uncompressed_size = 0;
  891. CacheAllocationPtr output;
  892. Status s = DoUncompress(args, &output, &same_uncompressed_size);
  893. if (same_uncompressed_size != args.uncompressed_size) {
  894. s = Status::Corruption("Compressed block size mismatch");
  895. }
  896. if (s.ok()) {
  897. // NOTE: simple but inefficient
  898. memcpy(uncompressed_output, output.get(), args.uncompressed_size);
  899. }
  900. return s;
  901. }
  902. protected:
  903. Status DoUncompress(const Args& args, CacheAllocationPtr* out_data,
  904. uint64_t* out_uncompressed_size) {
  905. assert(args.working_area == nullptr);
  906. assert(*out_uncompressed_size == 0);
  907. // NOTE: simple but inefficient
  908. UncompressionContext dummy_ctx{args.compression_type};
  909. UncompressionInfo info{dummy_ctx, UncompressionDict::GetEmptyDict(),
  910. args.compression_type};
  911. const char* error_message = nullptr;
  912. size_t size_t_uncompressed_size = 0;
  913. *out_data = OLD_UncompressData(
  914. info, args.compressed_data.data(), args.compressed_data.size(),
  915. &size_t_uncompressed_size, 1 /*compress_format_version*/,
  916. nullptr /*allocator*/, &error_message);
  917. if (*out_data == nullptr) {
  918. if (error_message != nullptr) {
  919. return Status::Corruption(error_message);
  920. } else {
  921. return Status::Corruption("Corrupted compressed block contents");
  922. }
  923. }
  924. *out_uncompressed_size = size_t_uncompressed_size;
  925. assert(*out_uncompressed_size > 0);
  926. return Status::OK();
  927. }
  928. };
  929. class BuiltinCompressionManagerV1 : public CompressionManager {
  930. public:
  931. BuiltinCompressionManagerV1() = default;
  932. ~BuiltinCompressionManagerV1() override = default;
  933. const char* Name() const override { return "BuiltinCompressionManagerV1"; }
  934. const char* CompatibilityName() const override { return "BuiltinV1"; }
  935. std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
  936. CompressionType type) override {
  937. // At the time of deprecating the writing of new format_version=1 files,
  938. // ZSTD was the last supported built-in compression type.
  939. if (type > kZSTD) {
  940. // Unrecognized; fall back on default compression
  941. type = ColumnFamilyOptions{}.compression;
  942. }
  943. if (type == kNoCompression) {
  944. return nullptr;
  945. } else {
  946. return std::make_unique<BuiltinCompressorV1>(opts, type);
  947. }
  948. }
  949. std::shared_ptr<Decompressor> GetDecompressor() override {
  950. return std::shared_ptr<Decompressor>(shared_from_this(), &decompressor_);
  951. }
  952. bool SupportsCompressionType(CompressionType type) const override {
  953. return CompressionTypeSupported(type);
  954. }
  955. protected:
  956. BuiltinDecompressorV1 decompressor_;
  957. };
  958. // Subroutines for BuiltinDecompressorV2
  959. Status Snappy_DecompressBlock(const Decompressor::Args& args,
  960. char* uncompressed_output) {
  961. #ifdef SNAPPY
  962. if (!snappy::RawUncompress(args.compressed_data.data(),
  963. args.compressed_data.size(),
  964. uncompressed_output)) {
  965. return Status::Corruption("Error decompressing snappy data");
  966. }
  967. return Status::OK();
  968. #else
  969. (void)args;
  970. (void)uncompressed_output;
  971. return Status::NotSupported("Snappy not supported in this build");
  972. #endif
  973. }
  974. Status Zlib_DecompressBlock(const Decompressor::Args& args, Slice dict,
  975. char* uncompressed_output) {
  976. #ifdef ZLIB
  977. // NOTE: uses "raw" format
  978. constexpr int kWindowBits = -14;
  979. z_stream _stream;
  980. memset(&_stream, 0, sizeof(z_stream));
  981. // For raw inflate, the windowBits should be -8..-15.
  982. // If windowBits is bigger than zero, it will use either zlib
  983. // header or gzip header. Adding 32 to it will do automatic detection.
  984. int st = inflateInit2(&_stream, kWindowBits);
  985. if (UNLIKELY(st != Z_OK)) {
  986. return Status::Corruption("Failed to initialize zlib inflate: " +
  987. std::to_string(st));
  988. }
  989. if (!dict.empty()) {
  990. // Initialize the compression library's dictionary
  991. st = inflateSetDictionary(&_stream,
  992. reinterpret_cast<const Bytef*>(dict.data()),
  993. static_cast<unsigned int>(dict.size()));
  994. if (UNLIKELY(st != Z_OK)) {
  995. return Status::Corruption("Failed to initialize zlib dictionary: " +
  996. std::to_string(st));
  997. }
  998. }
  999. _stream.next_in = const_cast<Bytef*>(
  1000. reinterpret_cast<const Bytef*>(args.compressed_data.data()));
  1001. _stream.avail_in = static_cast<unsigned int>(args.compressed_data.size());
  1002. _stream.next_out = reinterpret_cast<Bytef*>(uncompressed_output);
  1003. _stream.avail_out = static_cast<unsigned int>(args.uncompressed_size);
  1004. st = inflate(&_stream, Z_SYNC_FLUSH);
  1005. if (UNLIKELY(st != Z_STREAM_END)) {
  1006. inflateEnd(&_stream);
  1007. // NOTE: Z_OK is still corruption because it means we got the size wrong
  1008. return Status::Corruption("Failed zlib inflate: " + std::to_string(st));
  1009. }
  1010. // We should have no bytes left
  1011. if (_stream.avail_out != 0) {
  1012. inflateEnd(&_stream);
  1013. return Status::Corruption("Size mismatch decompressing zlib data");
  1014. }
  1015. inflateEnd(&_stream);
  1016. return Status::OK();
  1017. #else
  1018. (void)args;
  1019. (void)dict;
  1020. (void)uncompressed_output;
  1021. return Status::NotSupported("Zlib not supported in this build");
  1022. #endif
  1023. }
  1024. Status BZip2_DecompressBlock(const Decompressor::Args& args,
  1025. char* uncompressed_output) {
  1026. #ifdef BZIP2
  1027. auto uncompressed_size = static_cast<unsigned int>(args.uncompressed_size);
  1028. if (BZ_OK != BZ2_bzBuffToBuffDecompress(
  1029. uncompressed_output, &uncompressed_size,
  1030. const_cast<char*>(args.compressed_data.data()),
  1031. static_cast<unsigned int>(args.compressed_data.size()),
  1032. 0 /*small mem*/, 0 /*verbosity*/)) {
  1033. return Status::Corruption("Error decompressing bzip2 data");
  1034. }
  1035. if (uncompressed_size != args.uncompressed_size) {
  1036. return Status::Corruption("Size mismatch decompressing bzip2 data");
  1037. }
  1038. return Status::OK();
  1039. #else
  1040. (void)args;
  1041. (void)uncompressed_output;
  1042. return Status::NotSupported("BZip2 not supported in this build");
  1043. #endif
  1044. }
  1045. Status LZ4_DecompressBlock(const Decompressor::Args& args, Slice dict,
  1046. char* uncompressed_output) {
  1047. #ifdef LZ4
  1048. int expected_uncompressed_size = static_cast<int>(args.uncompressed_size);
  1049. LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
  1050. if (!dict.empty()) {
  1051. LZ4_setStreamDecode(stream, dict.data(), static_cast<int>(dict.size()));
  1052. }
  1053. int uncompressed_size = LZ4_decompress_safe_continue(
  1054. stream, args.compressed_data.data(), uncompressed_output,
  1055. static_cast<int>(args.compressed_data.size()),
  1056. expected_uncompressed_size);
  1057. LZ4_freeStreamDecode(stream);
  1058. if (uncompressed_size != expected_uncompressed_size) {
  1059. if (uncompressed_size < 0) {
  1060. return Status::Corruption("Error decompressing LZ4 data");
  1061. } else {
  1062. return Status::Corruption("Size mismatch decompressing LZ4 data");
  1063. }
  1064. }
  1065. return Status::OK();
  1066. #else
  1067. (void)args;
  1068. (void)dict;
  1069. (void)uncompressed_output;
  1070. return Status::NotSupported("LZ4 not supported in this build");
  1071. #endif
  1072. }
  1073. Status XPRESS_DecompressBlock(const Decompressor::Args& args,
  1074. char* uncompressed_output) {
  1075. #ifdef XPRESS
  1076. int64_t actual_uncompressed_size = port::xpress::DecompressToBuffer(
  1077. args.compressed_data.data(), args.compressed_data.size(),
  1078. uncompressed_output, args.uncompressed_size);
  1079. if (actual_uncompressed_size !=
  1080. static_cast<int64_t>(args.uncompressed_size)) {
  1081. if (actual_uncompressed_size < 0) {
  1082. return Status::Corruption("Error decompressing XPRESS data");
  1083. } else {
  1084. return Status::Corruption("Size mismatch decompressing XPRESS data");
  1085. }
  1086. }
  1087. return Status::OK();
  1088. #else
  1089. (void)args;
  1090. (void)uncompressed_output;
  1091. return Status::NotSupported("XPRESS not supported in this build");
  1092. #endif
  1093. }
  1094. template <bool kIsDigestedDict = false>
  1095. Status ZSTD_DecompressBlockWithContext(
  1096. const Decompressor::Args& args,
  1097. std::conditional_t<kIsDigestedDict, void*, Slice> dict,
  1098. ZSTDUncompressCachedData::ZSTDNativeContext zstd_context,
  1099. char* uncompressed_output) {
  1100. #ifdef ZSTD
  1101. size_t uncompressed_size;
  1102. assert(zstd_context != nullptr);
  1103. if constexpr (kIsDigestedDict) {
  1104. #ifdef ROCKSDB_ZSTD_DDICT
  1105. uncompressed_size = ZSTD_decompress_usingDDict(
  1106. zstd_context, uncompressed_output, args.uncompressed_size,
  1107. args.compressed_data.data(), args.compressed_data.size(),
  1108. static_cast<ZSTD_DDict*>(dict));
  1109. #else
  1110. static_assert(!kIsDigestedDict,
  1111. "Inconsistent expectation of ZSTD digested dict support");
  1112. #endif // ROCKSDB_ZSTD_DDICT
  1113. } else if (dict.empty()) {
  1114. uncompressed_size = ZSTD_decompressDCtx(
  1115. zstd_context, uncompressed_output, args.uncompressed_size,
  1116. args.compressed_data.data(), args.compressed_data.size());
  1117. } else {
  1118. uncompressed_size = ZSTD_decompress_usingDict(
  1119. zstd_context, uncompressed_output, args.uncompressed_size,
  1120. args.compressed_data.data(), args.compressed_data.size(), dict.data(),
  1121. dict.size());
  1122. }
  1123. if (ZSTD_isError(uncompressed_size)) {
  1124. return Status::Corruption(std::string("ZSTD ") +
  1125. ZSTD_getErrorName(uncompressed_size));
  1126. } else if (uncompressed_size != args.uncompressed_size) {
  1127. return Status::Corruption("ZSTD decompression size mismatch");
  1128. } else {
  1129. return Status::OK();
  1130. }
  1131. #else
  1132. (void)args;
  1133. (void)dict;
  1134. (void)zstd_context;
  1135. (void)uncompressed_output;
  1136. return Status::NotSupported("ZSTD not supported in this build");
  1137. #endif
  1138. }
  1139. template <bool kIsDigestedDict = false>
  1140. Status ZSTD_DecompressBlock(
  1141. const Decompressor::Args& args,
  1142. std::conditional_t<kIsDigestedDict, void*, Slice> dict,
  1143. const Decompressor* decompressor, char* uncompressed_output) {
  1144. if (args.working_area && args.working_area->owner() == decompressor) {
  1145. auto ctx = static_cast<UncompressionContext*>(args.working_area->get());
  1146. assert(ctx != nullptr);
  1147. if (ctx->GetZSTDContext() != nullptr) {
  1148. return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
  1149. args, dict, ctx->GetZSTDContext(), uncompressed_output);
  1150. }
  1151. }
  1152. UncompressionContext tmp_ctx{kZSTD};
  1153. return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
  1154. args, dict, tmp_ctx.GetZSTDContext(), uncompressed_output);
  1155. }
  1156. class BuiltinDecompressorV2 : public Decompressor {
  1157. public:
  1158. const char* Name() const override { return "BuiltinDecompressorV2"; }
  1159. Status ExtractUncompressedSize(Args& args) override {
  1160. assert(args.compression_type != kNoCompression);
  1161. if (args.compression_type == kSnappyCompression) {
  1162. // 1st exception to encoding of uncompressed size
  1163. #ifdef SNAPPY
  1164. size_t uncompressed_length = 0;
  1165. if (!snappy::GetUncompressedLength(args.compressed_data.data(),
  1166. args.compressed_data.size(),
  1167. &uncompressed_length)) {
  1168. return Status::Corruption("Error reading snappy compressed length");
  1169. }
  1170. args.uncompressed_size = uncompressed_length;
  1171. return Status::OK();
  1172. #else
  1173. return Status::NotSupported("Snappy not supported in this build");
  1174. #endif
  1175. } else if (args.compression_type == kXpressCompression) {
  1176. // 2nd exception to encoding of uncompressed size
  1177. #ifdef XPRESS
  1178. int64_t result = port::xpress::GetDecompressedSize(
  1179. args.compressed_data.data(), args.compressed_data.size());
  1180. if (result < 0) {
  1181. return Status::Corruption("Error reading XPRESS compressed length");
  1182. }
  1183. args.uncompressed_size = static_cast<size_t>(result);
  1184. return Status::OK();
  1185. #else
  1186. return Status::NotSupported("XPRESS not supported in this build");
  1187. #endif
  1188. } else {
  1189. // Extract encoded uncompressed size
  1190. return Decompressor::ExtractUncompressedSize(args);
  1191. }
  1192. }
  1193. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  1194. switch (args.compression_type) {
  1195. case kSnappyCompression:
  1196. return Snappy_DecompressBlock(args, uncompressed_output);
  1197. case kZlibCompression:
  1198. return Zlib_DecompressBlock(args, /*dict=*/Slice{},
  1199. uncompressed_output);
  1200. case kBZip2Compression:
  1201. return BZip2_DecompressBlock(args, uncompressed_output);
  1202. case kLZ4Compression:
  1203. case kLZ4HCCompression:
  1204. return LZ4_DecompressBlock(args, /*dict=*/Slice{}, uncompressed_output);
  1205. case kXpressCompression:
  1206. return XPRESS_DecompressBlock(args, uncompressed_output);
  1207. case kZSTD:
  1208. return ZSTD_DecompressBlock(args, /*dict=*/Slice{}, this,
  1209. uncompressed_output);
  1210. default:
  1211. return Status::NotSupported(
  1212. "Compression type not supported or not built-in: " +
  1213. CompressionTypeToString(args.compression_type));
  1214. }
  1215. }
  1216. Status MaybeCloneForDict(const Slice&,
  1217. std::unique_ptr<Decompressor>*) override;
  1218. size_t ApproximateOwnedMemoryUsage() const override {
  1219. return sizeof(BuiltinDecompressorV2);
  1220. }
  1221. };
  1222. class BuiltinDecompressorV2SnappyOnly : public BuiltinDecompressorV2 {
  1223. public:
  1224. const char* Name() const override {
  1225. return "BuiltinDecompressorV2SnappyOnly";
  1226. }
  1227. Status ExtractUncompressedSize(Args& args) override {
  1228. assert(args.compression_type == kSnappyCompression);
  1229. #ifdef SNAPPY
  1230. size_t uncompressed_length = 0;
  1231. if (!snappy::GetUncompressedLength(args.compressed_data.data(),
  1232. args.compressed_data.size(),
  1233. &uncompressed_length)) {
  1234. return Status::Corruption("Error reading snappy compressed length");
  1235. }
  1236. args.uncompressed_size = uncompressed_length;
  1237. return Status::OK();
  1238. #else
  1239. return Status::NotSupported("Snappy not supported in this build");
  1240. #endif
  1241. }
  1242. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  1243. assert(args.compression_type == kSnappyCompression);
  1244. return Snappy_DecompressBlock(args, uncompressed_output);
  1245. }
  1246. };
  1247. class BuiltinDecompressorV2WithDict : public BuiltinDecompressorV2 {
  1248. public:
  1249. explicit BuiltinDecompressorV2WithDict(const Slice& dict) : dict_(dict) {}
  1250. const char* Name() const override { return "BuiltinDecompressorV2WithDict"; }
  1251. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  1252. switch (args.compression_type) {
  1253. case kSnappyCompression:
  1254. // NOTE: quietly ignores the dictionary (for compatibility)
  1255. return Snappy_DecompressBlock(args, uncompressed_output);
  1256. case kZlibCompression:
  1257. return Zlib_DecompressBlock(args, dict_, uncompressed_output);
  1258. case kBZip2Compression:
  1259. // NOTE: quietly ignores the dictionary (for compatibility)
  1260. return BZip2_DecompressBlock(args, uncompressed_output);
  1261. case kLZ4Compression:
  1262. case kLZ4HCCompression:
  1263. return LZ4_DecompressBlock(args, dict_, uncompressed_output);
  1264. case kXpressCompression:
  1265. // NOTE: quietly ignores the dictionary (for compatibility)
  1266. return XPRESS_DecompressBlock(args, uncompressed_output);
  1267. case kZSTD:
  1268. return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
  1269. default:
  1270. return Status::NotSupported(
  1271. "Compression type not supported or not built-in: " +
  1272. CompressionTypeToString(args.compression_type));
  1273. }
  1274. }
  1275. const Slice& GetSerializedDict() const override { return dict_; }
  1276. size_t ApproximateOwnedMemoryUsage() const override {
  1277. return sizeof(BuiltinDecompressorV2WithDict);
  1278. }
  1279. protected:
  1280. const Slice dict_;
  1281. };
  1282. Status BuiltinDecompressorV2::MaybeCloneForDict(
  1283. const Slice& dict, std::unique_ptr<Decompressor>* out) {
  1284. // Check RocksDB-promised precondition
  1285. assert(dict.size() > 0);
  1286. // Because of unfortunate decisions in handling built-in compression types,
  1287. // all the compression types before ZSTD that do not actually support
  1288. // dictionary compression pretend to support it. Specifically, we have to be
  1289. // able to read files with a compression dictionary block using those
  1290. // compression types even though the compression dictionary is ignored by
  1291. // the compression algorithm. And the Decompressor has to return the
  1292. // configured dictionary from GetSerializedDict() even if it is ignored. This
  1293. // unfortunately means that a new schema version (BuiltinV3?) would be needed
  1294. // toactually support dictionary compression in the future for these
  1295. // algorithms (if the libraries add support).
  1296. // TODO: can we make this a better/cleaner experience?
  1297. *out = std::make_unique<BuiltinDecompressorV2WithDict>(dict);
  1298. return Status::OK();
  1299. }
  1300. class BuiltinDecompressorV2OptimizeZstd : public BuiltinDecompressorV2 {
  1301. public:
  1302. const char* Name() const override {
  1303. return "BuiltinDecompressorV2OptimizeZstd";
  1304. }
  1305. ManagedWorkingArea ObtainWorkingArea(CompressionType preferred) override {
  1306. if (preferred == kZSTD) {
  1307. // TODO: evaluate whether it makes sense to use core local cache here.
  1308. // (Perhaps not, because explicit WorkingArea could be long-running.)
  1309. return ManagedWorkingArea(new UncompressionContext(kZSTD), this);
  1310. } else {
  1311. return {};
  1312. }
  1313. }
  1314. void ReleaseWorkingArea(WorkingArea* wa) override {
  1315. delete static_cast<UncompressionContext*>(wa);
  1316. }
  1317. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  1318. if (LIKELY(args.compression_type == kZSTD)) {
  1319. return ZSTD_DecompressBlock(args, /*dict=*/Slice{}, this,
  1320. uncompressed_output);
  1321. } else {
  1322. return BuiltinDecompressorV2::DecompressBlock(args, uncompressed_output);
  1323. }
  1324. }
  1325. Status MaybeCloneForDict(const Slice& /*serialized_dict*/,
  1326. std::unique_ptr<Decompressor>* /*out*/) override;
  1327. };
  1328. class BuiltinDecompressorV2OptimizeZstdWithDict
  1329. : public BuiltinDecompressorV2OptimizeZstd {
  1330. public:
  1331. explicit BuiltinDecompressorV2OptimizeZstdWithDict(const Slice& dict)
  1332. :
  1333. #ifdef ROCKSDB_ZSTD_DDICT
  1334. dict_(dict),
  1335. ddict_(ZSTD_createDDict_byReference(dict.data(), dict.size())) {
  1336. assert(ddict_ != nullptr);
  1337. }
  1338. #else
  1339. dict_(dict) {
  1340. }
  1341. #endif // ROCKSDB_ZSTD_DDICT
  1342. const char* Name() const override {
  1343. return "BuiltinDecompressorV2OptimizeZstdWithDict";
  1344. }
  1345. ~BuiltinDecompressorV2OptimizeZstdWithDict() override {
  1346. #ifdef ROCKSDB_ZSTD_DDICT
  1347. size_t res = ZSTD_freeDDict(ddict_);
  1348. assert(res == 0); // Last I checked they can't fail
  1349. (void)res; // prevent unused var warning
  1350. #endif // ROCKSDB_ZSTD_DDICT
  1351. }
  1352. const Slice& GetSerializedDict() const override { return dict_; }
  1353. size_t ApproximateOwnedMemoryUsage() const override {
  1354. size_t sz = sizeof(BuiltinDecompressorV2WithDict);
  1355. #ifdef ROCKSDB_ZSTD_DDICT
  1356. sz += ZSTD_sizeof_DDict(ddict_);
  1357. #endif // ROCKSDB_ZSTD_DDICT
  1358. return sz;
  1359. }
  1360. Status DecompressBlock(const Args& args, char* uncompressed_output) override {
  1361. if (LIKELY(args.compression_type == kZSTD)) {
  1362. #ifdef ROCKSDB_ZSTD_DDICT
  1363. return ZSTD_DecompressBlock</*kIsDigestedDict=*/true>(
  1364. args, ddict_, this, uncompressed_output);
  1365. #else
  1366. return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
  1367. #endif // ROCKSDB_ZSTD_DDICT
  1368. } else {
  1369. return BuiltinDecompressorV2WithDict(dict_).DecompressBlock(
  1370. args, uncompressed_output);
  1371. }
  1372. }
  1373. protected:
  1374. const Slice dict_;
  1375. #ifdef ROCKSDB_ZSTD_DDICT
  1376. ZSTD_DDict* const ddict_;
  1377. #endif // ROCKSDB_ZSTD_DDICT
  1378. };
  1379. Status BuiltinDecompressorV2OptimizeZstd::MaybeCloneForDict(
  1380. const Slice& serialized_dict, std::unique_ptr<Decompressor>* out) {
  1381. *out = std::make_unique<BuiltinDecompressorV2OptimizeZstdWithDict>(
  1382. serialized_dict);
  1383. return Status::OK();
  1384. }
  1385. class BuiltinCompressionManagerV2 : public CompressionManager {
  1386. public:
  1387. BuiltinCompressionManagerV2() = default;
  1388. ~BuiltinCompressionManagerV2() override = default;
  1389. const char* Name() const override { return "BuiltinCompressionManagerV2"; }
  1390. const char* CompatibilityName() const override { return "BuiltinV2"; }
  1391. std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
  1392. CompressionType type) override {
  1393. if (opts.max_compressed_bytes_per_kb <= 0) {
  1394. // No acceptable compression ratio => no compression
  1395. return nullptr;
  1396. }
  1397. if (!SupportsCompressionType(type)) {
  1398. // Unrecognized or support not compiled in. Fall back on default
  1399. type = ColumnFamilyOptions{}.compression;
  1400. }
  1401. switch (type) {
  1402. case kNoCompression:
  1403. default:
  1404. assert(type == kNoCompression); // Others should be excluded above
  1405. return nullptr;
  1406. case kSnappyCompression:
  1407. return std::make_unique<BuiltinSnappyCompressorV2>(opts);
  1408. case kZlibCompression:
  1409. return std::make_unique<BuiltinZlibCompressorV2>(opts);
  1410. case kBZip2Compression:
  1411. return std::make_unique<BuiltinBZip2CompressorV2>(opts);
  1412. case kLZ4Compression:
  1413. return std::make_unique<BuiltinLZ4CompressorV2NoDict>(opts);
  1414. case kLZ4HCCompression:
  1415. return std::make_unique<BuiltinLZ4HCCompressorV2>(opts);
  1416. case kXpressCompression:
  1417. return std::make_unique<BuiltinXpressCompressorV2>(opts);
  1418. case kZSTD:
  1419. return std::make_unique<BuiltinZSTDCompressorV2>(opts);
  1420. }
  1421. }
  1422. std::shared_ptr<Decompressor> GetDecompressor() override {
  1423. return GetGeneralDecompressor();
  1424. }
  1425. std::shared_ptr<Decompressor> GetDecompressorOptimizeFor(
  1426. CompressionType optimize_for_type) override {
  1427. if (optimize_for_type == kZSTD) {
  1428. return GetZstdDecompressor();
  1429. } else {
  1430. return GetGeneralDecompressor();
  1431. }
  1432. }
  1433. std::shared_ptr<Decompressor> GetDecompressorForTypes(
  1434. const CompressionType* types_begin,
  1435. const CompressionType* types_end) override {
  1436. if (types_begin == types_end) {
  1437. return nullptr;
  1438. } else if (types_begin + 1 == types_end &&
  1439. *types_begin == kSnappyCompression) {
  1440. return GetSnappyDecompressor();
  1441. } else if (std::find(types_begin, types_end, kZSTD)) {
  1442. return GetZstdDecompressor();
  1443. } else {
  1444. return GetGeneralDecompressor();
  1445. }
  1446. }
  1447. bool SupportsCompressionType(CompressionType type) const override {
  1448. return CompressionTypeSupported(type);
  1449. }
  1450. protected:
  1451. BuiltinDecompressorV2 decompressor_;
  1452. BuiltinDecompressorV2OptimizeZstd zstd_decompressor_;
  1453. BuiltinDecompressorV2SnappyOnly snappy_decompressor_;
  1454. public:
  1455. inline std::shared_ptr<Decompressor> GetGeneralDecompressor() {
  1456. return std::shared_ptr<Decompressor>(shared_from_this(), &decompressor_);
  1457. }
  1458. inline std::shared_ptr<Decompressor> GetZstdDecompressor() {
  1459. return std::shared_ptr<Decompressor>(shared_from_this(),
  1460. &zstd_decompressor_);
  1461. }
  1462. inline std::shared_ptr<Decompressor> GetSnappyDecompressor() {
  1463. return std::shared_ptr<Decompressor>(shared_from_this(),
  1464. &snappy_decompressor_);
  1465. }
  1466. };
  1467. const std::shared_ptr<BuiltinCompressionManagerV1>
  1468. kBuiltinCompressionManagerV1 =
  1469. std::make_shared<BuiltinCompressionManagerV1>();
  1470. const std::shared_ptr<BuiltinCompressionManagerV2>
  1471. kBuiltinCompressionManagerV2 =
  1472. std::make_shared<BuiltinCompressionManagerV2>();
  1473. std::shared_ptr<Decompressor>
  1474. BuiltinZSTDCompressorV2::GetOptimizedDecompressor() const {
  1475. return kBuiltinCompressionManagerV2->GetZstdDecompressor();
  1476. }
  1477. std::shared_ptr<Decompressor>
  1478. BuiltinSnappyCompressorV2::GetOptimizedDecompressor() const {
  1479. return kBuiltinCompressionManagerV2->GetSnappyDecompressor();
  1480. }
  1481. } // namespace
  1482. Status CompressionManager::CreateFromString(
  1483. const ConfigOptions& config_options, const std::string& value,
  1484. std::shared_ptr<CompressionManager>* result) {
  1485. if (value == kNullptrString || value.empty()) {
  1486. result->reset();
  1487. return Status::OK();
  1488. }
  1489. static std::once_flag loaded;
  1490. std::call_once(loaded, [&]() {
  1491. auto& library = *ObjectLibrary::Default();
  1492. // TODO: try to enhance ObjectLibrary to support singletons
  1493. library.AddFactory<CompressionManager>(
  1494. kBuiltinCompressionManagerV1->CompatibilityName(),
  1495. [](const std::string& /*uri*/,
  1496. std::unique_ptr<CompressionManager>* guard,
  1497. std::string* /*errmsg*/) {
  1498. *guard = std::make_unique<BuiltinCompressionManagerV1>();
  1499. return guard->get();
  1500. });
  1501. library.AddFactory<CompressionManager>(
  1502. kBuiltinCompressionManagerV2->CompatibilityName(),
  1503. [](const std::string& /*uri*/,
  1504. std::unique_ptr<CompressionManager>* guard,
  1505. std::string* /*errmsg*/) {
  1506. *guard = std::make_unique<BuiltinCompressionManagerV2>();
  1507. return guard->get();
  1508. });
  1509. });
  1510. std::string id;
  1511. std::unordered_map<std::string, std::string> opt_map;
  1512. Status status = Customizable::GetOptionsMap(config_options, result->get(),
  1513. value, &id, &opt_map);
  1514. if (!status.ok()) { // GetOptionsMap failed
  1515. return status;
  1516. } else if (id.empty()) { // We have no Id but have options. Not good
  1517. return Status::NotSupported("Cannot reset object ", id);
  1518. } else {
  1519. status = config_options.registry->NewSharedObject(id, result);
  1520. }
  1521. if (config_options.ignore_unsupported_options && status.IsNotSupported()) {
  1522. return Status::OK();
  1523. } else if (status.ok()) {
  1524. status = Customizable::ConfigureNewObject(config_options, result->get(),
  1525. opt_map);
  1526. }
  1527. return status;
  1528. }
  1529. std::shared_ptr<CompressionManager>
  1530. CompressionManager::FindCompatibleCompressionManager(Slice compatibility_name) {
  1531. if (compatibility_name.compare(CompatibilityName()) == 0) {
  1532. return shared_from_this();
  1533. } else {
  1534. std::shared_ptr<CompressionManager> out;
  1535. Status s =
  1536. CreateFromString(ConfigOptions(), compatibility_name.ToString(), &out);
  1537. if (s.ok()) {
  1538. return out;
  1539. } else {
  1540. return nullptr;
  1541. }
  1542. }
  1543. }
  1544. const std::shared_ptr<CompressionManager>& GetBuiltinCompressionManager(
  1545. int compression_format_version) {
  1546. static const std::shared_ptr<CompressionManager> v1_as_base =
  1547. kBuiltinCompressionManagerV1;
  1548. static const std::shared_ptr<CompressionManager> v2_as_base =
  1549. kBuiltinCompressionManagerV2;
  1550. static const std::shared_ptr<CompressionManager> none;
  1551. if (compression_format_version == 1) {
  1552. return v1_as_base;
  1553. } else if (compression_format_version == 2) {
  1554. return v2_as_base;
  1555. } else {
  1556. // Unrecognized. In some cases this is unexpected and the caller can
  1557. // rightfully crash.
  1558. return none;
  1559. }
  1560. }
  1561. const std::shared_ptr<CompressionManager>& GetBuiltinV2CompressionManager() {
  1562. return GetBuiltinCompressionManager(2);
  1563. }
  1564. // ***********************************************************************
  1565. // END built-in implementation of customization interface
  1566. // ***********************************************************************
  1567. } // namespace ROCKSDB_NAMESPACE