compression.h 64 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. #pragma once
  11. #include <algorithm>
  12. #include <limits>
  13. #ifdef ROCKSDB_MALLOC_USABLE_SIZE
  14. #ifdef OS_FREEBSD
  15. #include <malloc_np.h>
  16. #else // OS_FREEBSD
  17. #include <malloc.h>
  18. #endif // OS_FREEBSD
  19. #endif // ROCKSDB_MALLOC_USABLE_SIZE
  20. #include <string>
  21. #include "memory/memory_allocator_impl.h"
  22. #include "port/likely.h"
  23. #include "rocksdb/advanced_compression.h"
  24. #include "rocksdb/options.h"
  25. #include "table/block_based/block_type.h"
  26. #include "test_util/sync_point.h"
  27. #include "util/atomic.h"
  28. #include "util/cast_util.h"
  29. #include "util/coding.h"
  30. #include "util/compression_context_cache.h"
  31. #include "util/string_util.h"
  32. #ifdef SNAPPY
  33. #include <snappy-sinksource.h>
  34. #include <snappy.h>
  35. #endif
  36. #ifdef ZLIB
  37. #include <zlib.h>
  38. #endif
  39. #ifdef BZIP2
  40. #include <bzlib.h>
  41. #endif
  42. #if defined(LZ4)
  43. #include <lz4.h>
  44. #include <lz4hc.h>
  45. #if LZ4_VERSION_NUMBER < 10700 // < r129
  46. #error "LZ4 support requires version >= 1.7.0 (lz4-devel)"
  47. #endif
  48. #endif
  49. #ifdef ZSTD
  50. #include <zstd.h>
  51. #include <zstd_errors.h>
  52. // ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to
  53. // advanced APIs and require v1.4.0+, which is from April 2019.
  54. // https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45
  55. // To avoid a rat's nest of #ifdefs, we now require v1.4.0+ for ZSTD support.
  56. #if ZSTD_VERSION_NUMBER < 10400
  57. #error "ZSTD support requires version >= 1.4.0 (libzstd-devel)"
  58. #endif // ZSTD_VERSION_NUMBER
  59. // The above release also includes digested dictionary support, but some
  60. // required functions (ZSTD_createDDict_byReference) are still only exported
  61. // with ZSTD_STATIC_LINKING_ONLY defined.
  62. #if defined(ZSTD_STATIC_LINKING_ONLY)
  63. #define ROCKSDB_ZSTD_DDICT
  64. #endif // defined(ZSTD_STATIC_LINKING_ONLY)
  65. // For ZDICT_* functions
  66. #include <zdict.h>
  67. // ZDICT_finalizeDictionary API is exported and stable since v1.4.5
  68. #if ZSTD_VERSION_NUMBER >= 10405
  69. #define ROCKSDB_ZDICT_FINALIZE
  70. #endif // ZSTD_VERSION_NUMBER >= 10405
  71. #endif // ZSTD
  72. namespace ROCKSDB_NAMESPACE {
  73. // Need this for the context allocation override
  74. // On windows we need to do this explicitly
  75. #if defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
  76. defined(ZSTD_STATIC_LINKING_ONLY)
  77. #define ROCKSDB_ZSTD_CUSTOM_MEM
  78. namespace port {
  79. ZSTD_customMem GetJeZstdAllocationOverrides();
  80. } // namespace port
  81. #endif // defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
  82. // defined(ZSTD_STATIC_LINKING_ONLY)
  83. // Cached data represents a portion that can be re-used
  84. // If, in the future we have more than one native context to
  85. // cache we can arrange this as a tuple
  86. class ZSTDUncompressCachedData {
  87. public:
  88. #if defined(ZSTD)
  89. using ZSTDNativeContext = ZSTD_DCtx*;
  90. #else
  91. using ZSTDNativeContext = void*;
  92. #endif // ZSTD
  93. ZSTDUncompressCachedData() {}
  94. // Init from cache
  95. ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
  96. ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
  97. ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
  98. : ZSTDUncompressCachedData() {
  99. *this = std::move(o);
  100. }
  101. ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
  102. assert(zstd_ctx_ == nullptr);
  103. std::swap(zstd_ctx_, o.zstd_ctx_);
  104. std::swap(cache_idx_, o.cache_idx_);
  105. return *this;
  106. }
  107. ZSTDNativeContext Get() const { return zstd_ctx_; }
  108. int64_t GetCacheIndex() const { return cache_idx_; }
  109. void CreateIfNeeded() {
  110. if (zstd_ctx_ == nullptr) {
  111. #if !defined(ZSTD)
  112. zstd_ctx_ = nullptr;
  113. #elif defined(ROCKSDB_ZSTD_CUSTOM_MEM)
  114. zstd_ctx_ =
  115. ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
  116. #else // ZSTD && !ROCKSDB_ZSTD_CUSTOM_MEM
  117. zstd_ctx_ = ZSTD_createDCtx();
  118. #endif
  119. cache_idx_ = -1;
  120. }
  121. }
  122. void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
  123. zstd_ctx_ = o.zstd_ctx_;
  124. cache_idx_ = idx;
  125. }
  126. ~ZSTDUncompressCachedData() {
  127. #if defined(ZSTD)
  128. if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
  129. ZSTD_freeDCtx(zstd_ctx_);
  130. }
  131. #endif // ZSTD
  132. }
  133. private:
  134. ZSTDNativeContext zstd_ctx_ = nullptr;
  135. int64_t cache_idx_ = -1; // -1 means this instance owns the context
  136. };
  137. } // namespace ROCKSDB_NAMESPACE
  138. #if defined(XPRESS)
  139. #include "port/xpress.h"
  140. #endif
  141. namespace ROCKSDB_NAMESPACE {
  142. class FailureDecompressor : public Decompressor {
  143. public:
  144. explicit FailureDecompressor(Status&& status) : status_(std::move(status)) {
  145. assert(!status_.ok());
  146. }
  147. ~FailureDecompressor() override { status_.PermitUncheckedError(); }
  148. const char* Name() const override { return "FailureDecompressor"; }
  149. Status ExtractUncompressedSize(Args& /*args*/) override { return status_; }
  150. Status DecompressBlock(const Args& /*args*/,
  151. char* /*uncompressed_output*/) override {
  152. return status_;
  153. }
  154. protected:
  155. Status status_;
  156. };
  157. // Owns a decompression dictionary, and associated Decompressor, for storing
  158. // in the block cache.
  159. //
  160. // Justification: for a "processed" dictionary to be saved in block cache, we
  161. // also need a reference to the decompressor that processed it, to ensure it
  162. // is recognized properly. At that point, we might as well have the dictionary
  163. // part of the decompressor identity and track an associated decompressor along
  164. // with a decompression dictionary in the block cache, and the decompressor
  165. // hides potential details of processing the dictionary.
  166. struct DecompressorDict {
  167. // Block containing the data for the compression dictionary in case the
  168. // constructor that takes a string parameter is used.
  169. std::string dict_str_;
  170. // Block containing the data for the compression dictionary in case the
  171. // constructor that takes a Slice parameter is used and the passed in
  172. // CacheAllocationPtr is not nullptr.
  173. CacheAllocationPtr dict_allocation_;
  174. // A Decompressor referencing and using the dictionary owned by this.
  175. std::unique_ptr<Decompressor> decompressor_;
  176. // Approximate owned memory usage
  177. size_t memory_usage_;
  178. DecompressorDict(std::string&& dict, Decompressor& from_decompressor)
  179. : dict_str_(std::move(dict)) {
  180. Populate(from_decompressor, dict_str_);
  181. }
  182. DecompressorDict(Slice slice, CacheAllocationPtr&& allocation,
  183. Decompressor& from_decompressor)
  184. : dict_allocation_(std::move(allocation)) {
  185. Populate(from_decompressor, slice);
  186. }
  187. DecompressorDict(DecompressorDict&& rhs) noexcept
  188. : dict_str_(std::move(rhs.dict_str_)),
  189. dict_allocation_(std::move(rhs.dict_allocation_)),
  190. decompressor_(std::move(rhs.decompressor_)),
  191. memory_usage_(std::move(rhs.memory_usage_)) {}
  192. DecompressorDict& operator=(DecompressorDict&& rhs) noexcept {
  193. if (this == &rhs) {
  194. return *this;
  195. }
  196. dict_str_ = std::move(rhs.dict_str_);
  197. dict_allocation_ = std::move(rhs.dict_allocation_);
  198. decompressor_ = std::move(rhs.decompressor_);
  199. return *this;
  200. }
  201. // Disable copy
  202. DecompressorDict(const DecompressorDict&) = delete;
  203. DecompressorDict& operator=(const DecompressorDict&) = delete;
  204. // The object is self-contained if the string constructor is used, or the
  205. // Slice constructor is invoked with a non-null allocation. Otherwise, it
  206. // is the caller's responsibility to ensure that the underlying storage
  207. // outlives this object.
  208. bool own_bytes() const { return !dict_str_.empty() || dict_allocation_; }
  209. const Slice& GetRawDict() const { return decompressor_->GetSerializedDict(); }
  210. // For TypedCacheInterface
  211. const Slice& ContentSlice() const { return GetRawDict(); }
  212. static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
  213. static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
  214. size_t ApproximateMemoryUsage() const { return memory_usage_; }
  215. private:
  216. void Populate(Decompressor& from_decompressor, Slice dict) {
  217. if (UNLIKELY(dict.empty())) {
  218. dict_str_ = {};
  219. dict_allocation_ = {};
  220. // Appropriately reject bad files with empty dictionary block.
  221. // It is longstanding not to write an empty dictionary block:
  222. // https://github.com/facebook/rocksdb/blame/10.2.fb/table/block_based/block_based_table_builder.cc#L1841
  223. decompressor_ = std::make_unique<FailureDecompressor>(
  224. Status::Corruption("Decompression dictionary is empty"));
  225. } else {
  226. Status s = from_decompressor.MaybeCloneForDict(dict, &decompressor_);
  227. if (decompressor_ == nullptr) {
  228. dict_str_ = {};
  229. dict_allocation_ = {};
  230. assert(!s.ok());
  231. decompressor_ = std::make_unique<FailureDecompressor>(std::move(s));
  232. } else {
  233. assert(s.ok());
  234. assert(decompressor_->GetSerializedDict() == dict);
  235. }
  236. }
  237. memory_usage_ = sizeof(struct DecompressorDict);
  238. memory_usage_ += dict_str_.size();
  239. if (dict_allocation_) {
  240. auto allocator = dict_allocation_.get_deleter().allocator;
  241. if (allocator) {
  242. memory_usage_ +=
  243. allocator->UsableSize(dict_allocation_.get(), GetRawDict().size());
  244. } else {
  245. memory_usage_ += GetRawDict().size();
  246. }
  247. }
  248. memory_usage_ += decompressor_->ApproximateOwnedMemoryUsage();
  249. }
  250. };
  251. // Holds dictionary and related data, like ZSTD's digested compression
  252. // dictionary.
  253. struct CompressionDict {
  254. #ifdef ZSTD
  255. ZSTD_CDict* zstd_cdict_ = nullptr;
  256. #endif // ZSTD
  257. std::string dict_;
  258. public:
  259. CompressionDict() = default;
  260. CompressionDict(std::string&& dict, CompressionType type, int level) {
  261. dict_ = std::move(dict);
  262. #ifdef ZSTD
  263. zstd_cdict_ = nullptr;
  264. if (!dict_.empty() && type == kZSTD) {
  265. if (level == CompressionOptions::kDefaultCompressionLevel) {
  266. // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
  267. level = ZSTD_CLEVEL_DEFAULT;
  268. }
  269. // Should be safe (but slower) if below call fails as we'll use the
  270. // raw dictionary to compress.
  271. zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
  272. assert(zstd_cdict_ != nullptr);
  273. }
  274. #else
  275. (void)type;
  276. (void)level;
  277. #endif // ZSTD
  278. }
  279. CompressionDict(CompressionDict&& other) {
  280. #ifdef ZSTD
  281. zstd_cdict_ = other.zstd_cdict_;
  282. other.zstd_cdict_ = nullptr;
  283. #endif // ZSTD
  284. dict_ = std::move(other.dict_);
  285. }
  286. CompressionDict& operator=(CompressionDict&& other) {
  287. if (this == &other) {
  288. return *this;
  289. }
  290. #ifdef ZSTD
  291. zstd_cdict_ = other.zstd_cdict_;
  292. other.zstd_cdict_ = nullptr;
  293. #endif // ZSTD
  294. dict_ = std::move(other.dict_);
  295. return *this;
  296. }
  297. ~CompressionDict() {
  298. #ifdef ZSTD
  299. size_t res = 0;
  300. if (zstd_cdict_ != nullptr) {
  301. res = ZSTD_freeCDict(zstd_cdict_);
  302. }
  303. assert(res == 0); // Last I checked they can't fail
  304. (void)res; // prevent unused var warning
  305. #endif // ZSTD
  306. }
  307. #ifdef ZSTD
  308. const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
  309. #endif // ZSTD
  310. Slice GetRawDict() const { return dict_; }
  311. bool empty() const { return dict_.empty(); }
  312. static const CompressionDict& GetEmptyDict() {
  313. static CompressionDict empty_dict{};
  314. return empty_dict;
  315. }
  316. // Disable copy
  317. CompressionDict(const CompressionDict&) = delete;
  318. CompressionDict& operator=(const CompressionDict&) = delete;
  319. };
  320. // Holds dictionary and related data, like ZSTD's digested uncompression
  321. // dictionary.
  322. struct UncompressionDict {
  323. // Block containing the data for the compression dictionary in case the
  324. // constructor that takes a string parameter is used.
  325. std::string dict_;
  326. // Block containing the data for the compression dictionary in case the
  327. // constructor that takes a Slice parameter is used and the passed in
  328. // CacheAllocationPtr is not nullptr.
  329. CacheAllocationPtr allocation_;
  330. // Slice pointing to the compression dictionary data. Can point to
  331. // dict_, allocation_, or some other memory location, depending on how
  332. // the object was constructed.
  333. Slice slice_;
  334. #ifdef ROCKSDB_ZSTD_DDICT
  335. // Processed version of the contents of slice_ for ZSTD compression.
  336. ZSTD_DDict* zstd_ddict_ = nullptr;
  337. #endif // ROCKSDB_ZSTD_DDICT
  338. UncompressionDict(std::string&& dict, bool using_zstd)
  339. : dict_(std::move(dict)), slice_(dict_) {
  340. #ifdef ROCKSDB_ZSTD_DDICT
  341. if (!slice_.empty() && using_zstd) {
  342. zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
  343. assert(zstd_ddict_ != nullptr);
  344. }
  345. #else
  346. (void)using_zstd;
  347. #endif // ROCKSDB_ZSTD_DDICT
  348. }
  349. UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
  350. bool using_zstd)
  351. : allocation_(std::move(allocation)), slice_(std::move(slice)) {
  352. #ifdef ROCKSDB_ZSTD_DDICT
  353. if (!slice_.empty() && using_zstd) {
  354. zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
  355. assert(zstd_ddict_ != nullptr);
  356. }
  357. #else
  358. (void)using_zstd;
  359. #endif // ROCKSDB_ZSTD_DDICT
  360. }
  361. UncompressionDict(UncompressionDict&& rhs)
  362. : dict_(std::move(rhs.dict_)),
  363. allocation_(std::move(rhs.allocation_)),
  364. slice_(std::move(rhs.slice_))
  365. #ifdef ROCKSDB_ZSTD_DDICT
  366. ,
  367. zstd_ddict_(rhs.zstd_ddict_)
  368. #endif
  369. {
  370. #ifdef ROCKSDB_ZSTD_DDICT
  371. rhs.zstd_ddict_ = nullptr;
  372. #endif
  373. }
  374. ~UncompressionDict() {
  375. #ifdef ROCKSDB_ZSTD_DDICT
  376. size_t res = 0;
  377. if (zstd_ddict_ != nullptr) {
  378. res = ZSTD_freeDDict(zstd_ddict_);
  379. }
  380. assert(res == 0); // Last I checked they can't fail
  381. (void)res; // prevent unused var warning
  382. #endif // ROCKSDB_ZSTD_DDICT
  383. }
  384. UncompressionDict& operator=(UncompressionDict&& rhs) {
  385. if (this == &rhs) {
  386. return *this;
  387. }
  388. dict_ = std::move(rhs.dict_);
  389. allocation_ = std::move(rhs.allocation_);
  390. slice_ = std::move(rhs.slice_);
  391. #ifdef ROCKSDB_ZSTD_DDICT
  392. zstd_ddict_ = rhs.zstd_ddict_;
  393. rhs.zstd_ddict_ = nullptr;
  394. #endif
  395. return *this;
  396. }
  397. // The object is self-contained if the string constructor is used, or the
  398. // Slice constructor is invoked with a non-null allocation. Otherwise, it
  399. // is the caller's responsibility to ensure that the underlying storage
  400. // outlives this object.
  401. bool own_bytes() const { return !dict_.empty() || allocation_; }
  402. const Slice& GetRawDict() const { return slice_; }
  403. // For TypedCacheInterface
  404. const Slice& ContentSlice() const { return slice_; }
  405. static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
  406. static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
  407. #ifdef ROCKSDB_ZSTD_DDICT
  408. const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
  409. #endif // ROCKSDB_ZSTD_DDICT
  410. static const UncompressionDict& GetEmptyDict() {
  411. static UncompressionDict empty_dict{};
  412. return empty_dict;
  413. }
  414. size_t ApproximateMemoryUsage() const {
  415. size_t usage = sizeof(struct UncompressionDict);
  416. usage += dict_.size();
  417. if (allocation_) {
  418. auto allocator = allocation_.get_deleter().allocator;
  419. if (allocator) {
  420. usage += allocator->UsableSize(allocation_.get(), slice_.size());
  421. } else {
  422. usage += slice_.size();
  423. }
  424. }
  425. #ifdef ROCKSDB_ZSTD_DDICT
  426. usage += ZSTD_sizeof_DDict(zstd_ddict_);
  427. #endif // ROCKSDB_ZSTD_DDICT
  428. return usage;
  429. }
  430. UncompressionDict() = default;
  431. // Disable copy
  432. UncompressionDict(const CompressionDict&) = delete;
  433. UncompressionDict& operator=(const CompressionDict&) = delete;
  434. };
  435. class CompressionContext : public Compressor::WorkingArea {
  436. private:
  437. #ifdef ZSTD
  438. ZSTD_CCtx* zstd_ctx_ = nullptr;
  439. ZSTD_CCtx* CreateZSTDContext() {
  440. #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
  441. return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
  442. #else // ROCKSDB_ZSTD_CUSTOM_MEM
  443. return ZSTD_createCCtx();
  444. #endif // ROCKSDB_ZSTD_CUSTOM_MEM
  445. }
  446. public:
  447. // callable inside ZSTD_Compress
  448. ZSTD_CCtx* ZSTDPreallocCtx() const {
  449. assert(zstd_ctx_ != nullptr);
  450. return zstd_ctx_;
  451. }
  452. private:
  453. #endif // ZSTD
  454. void CreateNativeContext(CompressionType type, int level, bool checksum) {
  455. #ifdef ZSTD
  456. if (type == kZSTD) {
  457. zstd_ctx_ = CreateZSTDContext();
  458. if (level == CompressionOptions::kDefaultCompressionLevel) {
  459. // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
  460. level = ZSTD_CLEVEL_DEFAULT;
  461. }
  462. size_t err =
  463. ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
  464. if (ZSTD_isError(err)) {
  465. assert(false);
  466. ZSTD_freeCCtx(zstd_ctx_);
  467. zstd_ctx_ = CreateZSTDContext();
  468. }
  469. if (checksum) {
  470. err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
  471. if (ZSTD_isError(err)) {
  472. assert(false);
  473. ZSTD_freeCCtx(zstd_ctx_);
  474. zstd_ctx_ = CreateZSTDContext();
  475. }
  476. }
  477. }
  478. #else
  479. (void)type;
  480. (void)level;
  481. (void)checksum;
  482. #endif // ZSTD
  483. }
  484. void DestroyNativeContext() {
  485. #ifdef ZSTD
  486. if (zstd_ctx_ != nullptr) {
  487. ZSTD_freeCCtx(zstd_ctx_);
  488. }
  489. #endif // ZSTD
  490. }
  491. public:
  492. explicit CompressionContext(CompressionType type,
  493. const CompressionOptions& options) {
  494. CreateNativeContext(type, options.level, options.checksum);
  495. }
  496. ~CompressionContext() { DestroyNativeContext(); }
  497. CompressionContext(const CompressionContext&) = delete;
  498. CompressionContext& operator=(const CompressionContext&) = delete;
  499. };
  500. // TODO: rename
  501. class CompressionInfo {
  502. const CompressionOptions& opts_;
  503. const CompressionContext& context_;
  504. const CompressionDict& dict_;
  505. const CompressionType type_;
  506. public:
  507. CompressionInfo(const CompressionOptions& _opts,
  508. const CompressionContext& _context,
  509. const CompressionDict& _dict, CompressionType _type)
  510. : opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
  511. const CompressionOptions& options() const { return opts_; }
  512. const CompressionContext& context() const { return context_; }
  513. const CompressionDict& dict() const { return dict_; }
  514. CompressionType type() const { return type_; }
  515. };
  516. // This is like a working area, reusable for different dicts, etc.
  517. // TODO: refactor / consolidate
  518. class UncompressionContext : public Decompressor::WorkingArea {
  519. private:
  520. CompressionContextCache* ctx_cache_ = nullptr;
  521. ZSTDUncompressCachedData uncomp_cached_data_;
  522. public:
  523. explicit UncompressionContext(CompressionType type) {
  524. if (type == kZSTD) {
  525. ctx_cache_ = CompressionContextCache::Instance();
  526. uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
  527. }
  528. }
  529. ~UncompressionContext() {
  530. if (uncomp_cached_data_.GetCacheIndex() != -1) {
  531. assert(ctx_cache_ != nullptr);
  532. ctx_cache_->ReturnCachedZSTDUncompressData(
  533. uncomp_cached_data_.GetCacheIndex());
  534. }
  535. }
  536. UncompressionContext(const UncompressionContext&) = delete;
  537. UncompressionContext& operator=(const UncompressionContext&) = delete;
  538. ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
  539. return uncomp_cached_data_.Get();
  540. }
  541. };
  542. class UncompressionInfo {
  543. const UncompressionContext& context_;
  544. const UncompressionDict& dict_;
  545. const CompressionType type_;
  546. public:
  547. UncompressionInfo(const UncompressionContext& _context,
  548. const UncompressionDict& _dict, CompressionType _type)
  549. : context_(_context), dict_(_dict), type_(_type) {}
  550. const UncompressionContext& context() const { return context_; }
  551. const UncompressionDict& dict() const { return dict_; }
  552. CompressionType type() const { return type_; }
  553. };
  554. inline bool Snappy_Supported() {
  555. #ifdef SNAPPY
  556. return true;
  557. #else
  558. return false;
  559. #endif
  560. }
  561. inline bool Zlib_Supported() {
  562. #ifdef ZLIB
  563. return true;
  564. #else
  565. return false;
  566. #endif
  567. }
  568. inline bool BZip2_Supported() {
  569. #ifdef BZIP2
  570. return true;
  571. #else
  572. return false;
  573. #endif
  574. }
  575. inline bool LZ4_Supported() {
  576. #ifdef LZ4
  577. return true;
  578. #else
  579. return false;
  580. #endif
  581. }
  582. inline bool XPRESS_Supported() {
  583. #ifdef XPRESS
  584. return true;
  585. #else
  586. return false;
  587. #endif
  588. }
  589. inline bool ZSTD_Supported() {
  590. #ifdef ZSTD
  591. // NB: ZSTD format is finalized since version 0.8.0. See ZSTD_VERSION_NUMBER
  592. // check above.
  593. return true;
  594. #else
  595. return false;
  596. #endif
  597. }
  598. inline bool ZSTD_Streaming_Supported() {
  599. #if defined(ZSTD)
  600. return true;
  601. #else
  602. return false;
  603. #endif
  604. }
  605. inline bool StreamingCompressionTypeSupported(
  606. CompressionType compression_type) {
  607. switch (compression_type) {
  608. case kNoCompression:
  609. return true;
  610. case kZSTD:
  611. return ZSTD_Streaming_Supported();
  612. default:
  613. return false;
  614. }
  615. }
  616. inline bool CompressionTypeSupported(CompressionType compression_type) {
  617. switch (compression_type) {
  618. case kNoCompression:
  619. return true;
  620. case kSnappyCompression:
  621. return Snappy_Supported();
  622. case kZlibCompression:
  623. return Zlib_Supported();
  624. case kBZip2Compression:
  625. return BZip2_Supported();
  626. case kLZ4Compression:
  627. return LZ4_Supported();
  628. case kLZ4HCCompression:
  629. return LZ4_Supported();
  630. case kXpressCompression:
  631. return XPRESS_Supported();
  632. case kZSTD:
  633. return ZSTD_Supported();
  634. default: // Including custom compression types
  635. return false;
  636. }
  637. }
  638. inline bool DictCompressionTypeSupported(CompressionType compression_type) {
  639. switch (compression_type) {
  640. case kNoCompression:
  641. return false;
  642. case kSnappyCompression:
  643. return false;
  644. case kZlibCompression:
  645. return Zlib_Supported();
  646. case kBZip2Compression:
  647. return false;
  648. case kLZ4Compression:
  649. case kLZ4HCCompression:
  650. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  651. return LZ4_Supported();
  652. #else
  653. return false;
  654. #endif
  655. case kXpressCompression:
  656. return false;
  657. case kZSTD:
  658. // NB: dictionary supported since 0.5.0. See ZSTD_VERSION_NUMBER check
  659. // above.
  660. return ZSTD_Supported();
  661. default: // Including custom compression types
  662. return false;
  663. }
  664. }
  665. // WART: does not match OptionsHelper::compression_type_string_map
  666. inline std::string CompressionTypeToString(CompressionType compression_type) {
  667. switch (compression_type) {
  668. case kNoCompression:
  669. return "NoCompression";
  670. case kSnappyCompression:
  671. return "Snappy";
  672. case kZlibCompression:
  673. return "Zlib";
  674. case kBZip2Compression:
  675. return "BZip2";
  676. case kLZ4Compression:
  677. return "LZ4";
  678. case kLZ4HCCompression:
  679. return "LZ4HC";
  680. case kXpressCompression:
  681. return "Xpress";
  682. case kZSTD:
  683. return "ZSTD";
  684. case kDisableCompressionOption:
  685. return "DisableOption";
  686. default: {
  687. bool is_custom = compression_type >= kFirstCustomCompression &&
  688. compression_type <= kLastCustomCompression;
  689. unsigned char c = lossless_cast<unsigned char>(compression_type);
  690. return (is_custom ? "Custom" : "Reserved") +
  691. ToBaseCharsString<16>(2, c, /*uppercase=*/true);
  692. }
  693. }
  694. }
  695. // WART: does not match OptionsHelper::compression_type_string_map
  696. inline CompressionType CompressionTypeFromString(
  697. std::string compression_type_str) {
  698. if (!compression_type_str.empty()) {
  699. switch (compression_type_str[0]) {
  700. case 'N':
  701. if (compression_type_str == "NoCompression") {
  702. return kNoCompression;
  703. }
  704. break;
  705. case 'S':
  706. if (compression_type_str == "Snappy") {
  707. return kSnappyCompression;
  708. }
  709. break;
  710. case 'Z':
  711. if (compression_type_str == "ZSTD") {
  712. return kZSTD;
  713. }
  714. if (compression_type_str == "Zlib") {
  715. return kZlibCompression;
  716. }
  717. break;
  718. case 'B':
  719. if (compression_type_str == "BZip2") {
  720. return kBZip2Compression;
  721. }
  722. break;
  723. case 'L':
  724. if (compression_type_str == "LZ4") {
  725. return kLZ4Compression;
  726. }
  727. if (compression_type_str == "LZ4HC") {
  728. return kLZ4HCCompression;
  729. }
  730. break;
  731. case 'X':
  732. if (compression_type_str == "Xpress") {
  733. return kXpressCompression;
  734. }
  735. break;
  736. default:;
  737. }
  738. }
  739. // unrecognized
  740. return kDisableCompressionOption;
  741. }
  742. inline std::string CompressionOptionsToString(
  743. const CompressionOptions& compression_options) {
  744. std::string result;
  745. result.reserve(512);
  746. result.append("window_bits=")
  747. .append(std::to_string(compression_options.window_bits))
  748. .append("; ");
  749. result.append("level=")
  750. .append(std::to_string(compression_options.level))
  751. .append("; ");
  752. result.append("strategy=")
  753. .append(std::to_string(compression_options.strategy))
  754. .append("; ");
  755. result.append("max_dict_bytes=")
  756. .append(std::to_string(compression_options.max_dict_bytes))
  757. .append("; ");
  758. result.append("zstd_max_train_bytes=")
  759. .append(std::to_string(compression_options.zstd_max_train_bytes))
  760. .append("; ");
  761. // NOTE: parallel_threads is skipped because it doesn't really affect the file
  762. // contents written, arguably doesn't belong in CompressionOptions
  763. result.append("enabled=")
  764. .append(std::to_string(compression_options.enabled))
  765. .append("; ");
  766. result.append("max_dict_buffer_bytes=")
  767. .append(std::to_string(compression_options.max_dict_buffer_bytes))
  768. .append("; ");
  769. result.append("use_zstd_dict_trainer=")
  770. .append(std::to_string(compression_options.use_zstd_dict_trainer))
  771. .append("; ");
  772. result.append("max_compressed_bytes_per_kb=")
  773. .append(std::to_string(compression_options.max_compressed_bytes_per_kb))
  774. .append("; ");
  775. result.append("checksum=")
  776. .append(std::to_string(compression_options.checksum))
  777. .append("; ");
  778. return result;
  779. }
  780. // compress_format_version can have two values:
  781. // 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
  782. // block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
  783. // way.
  784. // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
  785. // start of compressed block. Snappy and XPRESS instead extract the decompressed
  786. // size from the compressed block itself, same as version 1.
  787. inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
  788. size_t length, ::std::string* output) {
  789. #ifdef SNAPPY
  790. output->resize(snappy::MaxCompressedLength(length));
  791. size_t outlen;
  792. snappy::RawCompress(input, length, &(*output)[0], &outlen);
  793. output->resize(outlen);
  794. return true;
  795. #else
  796. (void)input;
  797. (void)length;
  798. (void)output;
  799. return false;
  800. #endif
  801. }
  802. inline CacheAllocationPtr Snappy_Uncompress(
  803. const char* input, size_t length, size_t* uncompressed_size,
  804. MemoryAllocator* allocator = nullptr) {
  805. #ifdef SNAPPY
  806. size_t uncompressed_length = 0;
  807. if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) {
  808. return nullptr;
  809. }
  810. CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator);
  811. if (!snappy::RawUncompress(input, length, output.get())) {
  812. return nullptr;
  813. }
  814. *uncompressed_size = uncompressed_length;
  815. return output;
  816. #else
  817. (void)input;
  818. (void)length;
  819. (void)uncompressed_size;
  820. (void)allocator;
  821. return nullptr;
  822. #endif
  823. }
  824. namespace compression {
  825. // returns size
  826. inline size_t PutDecompressedSizeInfo(std::string* output, uint32_t length) {
  827. PutVarint32(output, length);
  828. return output->size();
  829. }
  830. inline bool GetDecompressedSizeInfo(const char** input_data,
  831. size_t* input_length,
  832. uint32_t* output_len) {
  833. auto new_input_data =
  834. GetVarint32Ptr(*input_data, *input_data + *input_length, output_len);
  835. if (new_input_data == nullptr) {
  836. return false;
  837. }
  838. *input_length -= (new_input_data - *input_data);
  839. *input_data = new_input_data;
  840. return true;
  841. }
  842. } // namespace compression
  843. // compress_format_version == 1 -- decompressed size is not included in the
  844. // block header
  845. // compress_format_version == 2 -- decompressed size is included in the block
  846. // header in varint32 format
  847. // @param compression_dict Data for presetting the compression library's
  848. // dictionary.
  849. inline bool Zlib_Compress(const CompressionInfo& info,
  850. uint32_t compress_format_version, const char* input,
  851. size_t length, ::std::string* output) {
  852. #ifdef ZLIB
  853. if (length > std::numeric_limits<uint32_t>::max()) {
  854. // Can't compress more than 4GB
  855. return false;
  856. }
  857. size_t output_header_len = 0;
  858. if (compress_format_version == 2) {
  859. output_header_len = compression::PutDecompressedSizeInfo(
  860. output, static_cast<uint32_t>(length));
  861. }
  862. // The memLevel parameter specifies how much memory should be allocated for
  863. // the internal compression state.
  864. // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
  865. // memLevel=9 uses maximum memory for optimal speed.
  866. // The default value is 8. See zconf.h for more details.
  867. static const int memLevel = 8;
  868. int level;
  869. if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
  870. level = Z_DEFAULT_COMPRESSION;
  871. } else {
  872. level = info.options().level;
  873. }
  874. z_stream _stream;
  875. memset(&_stream, 0, sizeof(z_stream));
  876. int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
  877. memLevel, info.options().strategy);
  878. if (st != Z_OK) {
  879. return false;
  880. }
  881. Slice compression_dict = info.dict().GetRawDict();
  882. if (compression_dict.size()) {
  883. // Initialize the compression library's dictionary
  884. st = deflateSetDictionary(
  885. &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
  886. static_cast<unsigned int>(compression_dict.size()));
  887. if (st != Z_OK) {
  888. deflateEnd(&_stream);
  889. return false;
  890. }
  891. }
  892. // Get an upper bound on the compressed size.
  893. size_t upper_bound =
  894. deflateBound(&_stream, static_cast<unsigned long>(length));
  895. output->resize(output_header_len + upper_bound);
  896. // Compress the input, and put compressed data in output.
  897. _stream.next_in = (Bytef*)input;
  898. _stream.avail_in = static_cast<unsigned int>(length);
  899. // Initialize the output size.
  900. _stream.avail_out = static_cast<unsigned int>(upper_bound);
  901. _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
  902. bool compressed = false;
  903. st = deflate(&_stream, Z_FINISH);
  904. if (st == Z_STREAM_END) {
  905. compressed = true;
  906. output->resize(output->size() - _stream.avail_out);
  907. }
  908. // The only return value we really care about is Z_STREAM_END.
  909. // Z_OK means insufficient output space. This means the compression is
  910. // bigger than decompressed size. Just fail the compression in that case.
  911. deflateEnd(&_stream);
  912. return compressed;
  913. #else
  914. (void)info;
  915. (void)compress_format_version;
  916. (void)input;
  917. (void)length;
  918. (void)output;
  919. return false;
  920. #endif
  921. }
  922. // compress_format_version == 1 -- decompressed size is not included in the
  923. // block header
  924. // compress_format_version == 2 -- decompressed size is included in the block
  925. // header in varint32 format
  926. // @param compression_dict Data for presetting the compression library's
  927. // dictionary.
  928. inline CacheAllocationPtr Zlib_Uncompress(
  929. const UncompressionInfo& info, const char* input_data, size_t input_length,
  930. size_t* uncompressed_size, uint32_t compress_format_version,
  931. MemoryAllocator* allocator = nullptr, int windowBits = -14) {
  932. #ifdef ZLIB
  933. uint32_t output_len = 0;
  934. if (compress_format_version == 2) {
  935. if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
  936. &output_len)) {
  937. return nullptr;
  938. }
  939. } else {
  940. // Assume the decompressed data size will 5x of compressed size, but round
  941. // to the page size
  942. size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
  943. output_len = static_cast<uint32_t>(
  944. std::min(proposed_output_len,
  945. static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
  946. }
  947. z_stream _stream;
  948. memset(&_stream, 0, sizeof(z_stream));
  949. // For raw inflate, the windowBits should be -8..-15.
  950. // If windowBits is bigger than zero, it will use either zlib
  951. // header or gzip header. Adding 32 to it will do automatic detection.
  952. int st =
  953. inflateInit2(&_stream, windowBits > 0 ? windowBits + 32 : windowBits);
  954. if (st != Z_OK) {
  955. return nullptr;
  956. }
  957. const Slice& compression_dict = info.dict().GetRawDict();
  958. if (compression_dict.size()) {
  959. // Initialize the compression library's dictionary
  960. st = inflateSetDictionary(
  961. &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
  962. static_cast<unsigned int>(compression_dict.size()));
  963. if (st != Z_OK) {
  964. return nullptr;
  965. }
  966. }
  967. _stream.next_in = (Bytef*)input_data;
  968. _stream.avail_in = static_cast<unsigned int>(input_length);
  969. auto output = AllocateBlock(output_len, allocator);
  970. _stream.next_out = (Bytef*)output.get();
  971. _stream.avail_out = static_cast<unsigned int>(output_len);
  972. bool done = false;
  973. while (!done) {
  974. st = inflate(&_stream, Z_SYNC_FLUSH);
  975. switch (st) {
  976. case Z_STREAM_END:
  977. done = true;
  978. break;
  979. case Z_OK: {
  980. // No output space. Increase the output space by 20%.
  981. // We should never run out of output space if
  982. // compress_format_version == 2
  983. assert(compress_format_version != 2);
  984. size_t old_sz = output_len;
  985. uint32_t output_len_delta = output_len / 5;
  986. output_len += output_len_delta < 10 ? 10 : output_len_delta;
  987. auto tmp = AllocateBlock(output_len, allocator);
  988. memcpy(tmp.get(), output.get(), old_sz);
  989. output = std::move(tmp);
  990. // Set more output.
  991. _stream.next_out = (Bytef*)(output.get() + old_sz);
  992. _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
  993. break;
  994. }
  995. case Z_BUF_ERROR:
  996. default:
  997. inflateEnd(&_stream);
  998. return nullptr;
  999. }
  1000. }
  1001. // If we encoded decompressed block size, we should have no bytes left
  1002. assert(compress_format_version != 2 || _stream.avail_out == 0);
  1003. assert(output_len >= _stream.avail_out);
  1004. *uncompressed_size = output_len - _stream.avail_out;
  1005. inflateEnd(&_stream);
  1006. return output;
  1007. #else
  1008. (void)info;
  1009. (void)input_data;
  1010. (void)input_length;
  1011. (void)uncompressed_size;
  1012. (void)compress_format_version;
  1013. (void)allocator;
  1014. (void)windowBits;
  1015. return nullptr;
  1016. #endif
  1017. }
  1018. // compress_format_version == 1 -- decompressed size is not included in the
  1019. // block header
  1020. // compress_format_version == 2 -- decompressed size is included in the block
  1021. // header in varint32 format
  1022. inline bool BZip2_Compress(const CompressionInfo& /*info*/,
  1023. uint32_t compress_format_version, const char* input,
  1024. size_t length, ::std::string* output) {
  1025. #ifdef BZIP2
  1026. if (length > std::numeric_limits<uint32_t>::max()) {
  1027. // Can't compress more than 4GB
  1028. return false;
  1029. }
  1030. size_t output_header_len = 0;
  1031. if (compress_format_version == 2) {
  1032. output_header_len = compression::PutDecompressedSizeInfo(
  1033. output, static_cast<uint32_t>(length));
  1034. }
  1035. // Resize output to be the plain data length.
  1036. // This may not be big enough if the compression actually expands data.
  1037. output->resize(output_header_len + length);
  1038. bz_stream _stream;
  1039. memset(&_stream, 0, sizeof(bz_stream));
  1040. // Block size 1 is 100K.
  1041. // 0 is for silent.
  1042. // 30 is the default workFactor
  1043. int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
  1044. if (st != BZ_OK) {
  1045. return false;
  1046. }
  1047. // Compress the input, and put compressed data in output.
  1048. _stream.next_in = (char*)input;
  1049. _stream.avail_in = static_cast<unsigned int>(length);
  1050. // Initialize the output size.
  1051. _stream.avail_out = static_cast<unsigned int>(length);
  1052. _stream.next_out = output->data() + output_header_len;
  1053. bool compressed = false;
  1054. st = BZ2_bzCompress(&_stream, BZ_FINISH);
  1055. if (st == BZ_STREAM_END) {
  1056. compressed = true;
  1057. output->resize(output->size() - _stream.avail_out);
  1058. }
  1059. // The only return value we really care about is BZ_STREAM_END.
  1060. // BZ_FINISH_OK means insufficient output space. This means the compression
  1061. // is bigger than decompressed size. Just fail the compression in that case.
  1062. BZ2_bzCompressEnd(&_stream);
  1063. return compressed;
  1064. #else
  1065. (void)compress_format_version;
  1066. (void)input;
  1067. (void)length;
  1068. (void)output;
  1069. return false;
  1070. #endif
  1071. }
  1072. // compress_format_version == 1 -- decompressed size is not included in the
  1073. // block header
  1074. // compress_format_version == 2 -- decompressed size is included in the block
  1075. // header in varint32 format
  1076. inline CacheAllocationPtr BZip2_Uncompress(
  1077. const char* input_data, size_t input_length, size_t* uncompressed_size,
  1078. uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
  1079. #ifdef BZIP2
  1080. uint32_t output_len = 0;
  1081. if (compress_format_version == 2) {
  1082. if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
  1083. &output_len)) {
  1084. return nullptr;
  1085. }
  1086. } else {
  1087. // Assume the decompressed data size will 5x of compressed size, but round
  1088. // to the next page size
  1089. size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
  1090. output_len = static_cast<uint32_t>(
  1091. std::min(proposed_output_len,
  1092. static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
  1093. }
  1094. bz_stream _stream;
  1095. memset(&_stream, 0, sizeof(bz_stream));
  1096. int st = BZ2_bzDecompressInit(&_stream, 0, 0);
  1097. if (st != BZ_OK) {
  1098. return nullptr;
  1099. }
  1100. _stream.next_in = (char*)input_data;
  1101. _stream.avail_in = static_cast<unsigned int>(input_length);
  1102. auto output = AllocateBlock(output_len, allocator);
  1103. _stream.next_out = (char*)output.get();
  1104. _stream.avail_out = static_cast<unsigned int>(output_len);
  1105. bool done = false;
  1106. while (!done) {
  1107. st = BZ2_bzDecompress(&_stream);
  1108. switch (st) {
  1109. case BZ_STREAM_END:
  1110. done = true;
  1111. break;
  1112. case BZ_OK: {
  1113. // No output space. Increase the output space by 20%.
  1114. // We should never run out of output space if
  1115. // compress_format_version == 2
  1116. assert(compress_format_version != 2);
  1117. uint32_t old_sz = output_len;
  1118. output_len = output_len * 1.2;
  1119. auto tmp = AllocateBlock(output_len, allocator);
  1120. memcpy(tmp.get(), output.get(), old_sz);
  1121. output = std::move(tmp);
  1122. // Set more output.
  1123. _stream.next_out = (char*)(output.get() + old_sz);
  1124. _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
  1125. break;
  1126. }
  1127. default:
  1128. BZ2_bzDecompressEnd(&_stream);
  1129. return nullptr;
  1130. }
  1131. }
  1132. // If we encoded decompressed block size, we should have no bytes left
  1133. assert(compress_format_version != 2 || _stream.avail_out == 0);
  1134. assert(output_len >= _stream.avail_out);
  1135. *uncompressed_size = output_len - _stream.avail_out;
  1136. BZ2_bzDecompressEnd(&_stream);
  1137. return output;
  1138. #else
  1139. (void)input_data;
  1140. (void)input_length;
  1141. (void)uncompressed_size;
  1142. (void)compress_format_version;
  1143. (void)allocator;
  1144. return nullptr;
  1145. #endif
  1146. }
  1147. // compress_format_version == 1 -- decompressed size is included in the
  1148. // block header using memcpy, which makes database non-portable)
  1149. // compress_format_version == 2 -- decompressed size is included in the block
  1150. // header in varint32 format
  1151. // @param compression_dict Data for presetting the compression library's
  1152. // dictionary.
  1153. inline bool LZ4_Compress(const CompressionInfo& info,
  1154. uint32_t compress_format_version, const char* input,
  1155. size_t length, ::std::string* output) {
  1156. #ifdef LZ4
  1157. if (length > std::numeric_limits<uint32_t>::max()) {
  1158. // Can't compress more than 4GB
  1159. return false;
  1160. }
  1161. size_t output_header_len = 0;
  1162. if (compress_format_version == 2) {
  1163. // new encoding, using varint32 to store size information
  1164. output_header_len = compression::PutDecompressedSizeInfo(
  1165. output, static_cast<uint32_t>(length));
  1166. } else {
  1167. // legacy encoding, which is not really portable (depends on big/little
  1168. // endianness)
  1169. output_header_len = 8;
  1170. output->resize(output_header_len);
  1171. char* p = const_cast<char*>(output->c_str());
  1172. memcpy(p, &length, sizeof(length));
  1173. }
  1174. int compress_bound = LZ4_compressBound(static_cast<int>(length));
  1175. output->resize(static_cast<size_t>(output_header_len + compress_bound));
  1176. int outlen;
  1177. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  1178. LZ4_stream_t* stream = LZ4_createStream();
  1179. Slice compression_dict = info.dict().GetRawDict();
  1180. if (compression_dict.size()) {
  1181. LZ4_loadDict(stream, compression_dict.data(),
  1182. static_cast<int>(compression_dict.size()));
  1183. }
  1184. #if LZ4_VERSION_NUMBER >= 10700 // r129+
  1185. int acceleration;
  1186. if (info.options().level < 0) {
  1187. acceleration = -info.options().level;
  1188. } else {
  1189. acceleration = 1;
  1190. }
  1191. outlen = LZ4_compress_fast_continue(
  1192. stream, input, &(*output)[output_header_len], static_cast<int>(length),
  1193. compress_bound, acceleration);
  1194. #else // up to r128
  1195. outlen = LZ4_compress_limitedOutput_continue(
  1196. stream, input, &(*output)[output_header_len], static_cast<int>(length),
  1197. compress_bound);
  1198. #endif
  1199. LZ4_freeStream(stream);
  1200. #else // up to r123
  1201. outlen = LZ4_compress_limitedOutput(input, &(*output)[output_header_len],
  1202. static_cast<int>(length), compress_bound);
  1203. #endif // LZ4_VERSION_NUMBER >= 10400
  1204. if (outlen == 0) {
  1205. return false;
  1206. }
  1207. output->resize(static_cast<size_t>(output_header_len + outlen));
  1208. return true;
  1209. #else // LZ4
  1210. (void)info;
  1211. (void)compress_format_version;
  1212. (void)input;
  1213. (void)length;
  1214. (void)output;
  1215. return false;
  1216. #endif
  1217. }
  1218. // compress_format_version == 1 -- decompressed size is included in the
  1219. // block header using memcpy, which makes database non-portable)
  1220. // compress_format_version == 2 -- decompressed size is included in the block
  1221. // header in varint32 format
  1222. // @param compression_dict Data for presetting the compression library's
  1223. // dictionary.
  1224. inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
  1225. const char* input_data,
  1226. size_t input_length,
  1227. size_t* uncompressed_size,
  1228. uint32_t compress_format_version,
  1229. MemoryAllocator* allocator = nullptr) {
  1230. #ifdef LZ4
  1231. uint32_t output_len = 0;
  1232. if (compress_format_version == 2) {
  1233. // new encoding, using varint32 to store size information
  1234. if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
  1235. &output_len)) {
  1236. return nullptr;
  1237. }
  1238. } else {
  1239. // legacy encoding, which is not really portable (depends on big/little
  1240. // endianness)
  1241. if (input_length < 8) {
  1242. return nullptr;
  1243. }
  1244. if (port::kLittleEndian) {
  1245. memcpy(&output_len, input_data, sizeof(output_len));
  1246. } else {
  1247. memcpy(&output_len, input_data + 4, sizeof(output_len));
  1248. }
  1249. input_length -= 8;
  1250. input_data += 8;
  1251. }
  1252. auto output = AllocateBlock(output_len, allocator);
  1253. int decompress_bytes = 0;
  1254. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  1255. LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
  1256. const Slice& compression_dict = info.dict().GetRawDict();
  1257. if (compression_dict.size()) {
  1258. LZ4_setStreamDecode(stream, compression_dict.data(),
  1259. static_cast<int>(compression_dict.size()));
  1260. }
  1261. decompress_bytes = LZ4_decompress_safe_continue(
  1262. stream, input_data, output.get(), static_cast<int>(input_length),
  1263. static_cast<int>(output_len));
  1264. LZ4_freeStreamDecode(stream);
  1265. #else // up to r123
  1266. decompress_bytes = LZ4_decompress_safe(input_data, output.get(),
  1267. static_cast<int>(input_length),
  1268. static_cast<int>(output_len));
  1269. #endif // LZ4_VERSION_NUMBER >= 10400
  1270. if (decompress_bytes < 0) {
  1271. return nullptr;
  1272. }
  1273. assert(decompress_bytes == static_cast<int>(output_len));
  1274. *uncompressed_size = decompress_bytes;
  1275. return output;
  1276. #else // LZ4
  1277. (void)info;
  1278. (void)input_data;
  1279. (void)input_length;
  1280. (void)uncompressed_size;
  1281. (void)compress_format_version;
  1282. (void)allocator;
  1283. return nullptr;
  1284. #endif
  1285. }
  1286. // compress_format_version == 1 -- decompressed size is included in the
  1287. // block header using memcpy, which makes database non-portable)
  1288. // compress_format_version == 2 -- decompressed size is included in the block
  1289. // header in varint32 format
  1290. // @param compression_dict Data for presetting the compression library's
  1291. // dictionary.
  1292. inline bool LZ4HC_Compress(const CompressionInfo& info,
  1293. uint32_t compress_format_version, const char* input,
  1294. size_t length, ::std::string* output) {
  1295. #ifdef LZ4
  1296. if (length > std::numeric_limits<uint32_t>::max()) {
  1297. // Can't compress more than 4GB
  1298. return false;
  1299. }
  1300. size_t output_header_len = 0;
  1301. if (compress_format_version == 2) {
  1302. // new encoding, using varint32 to store size information
  1303. output_header_len = compression::PutDecompressedSizeInfo(
  1304. output, static_cast<uint32_t>(length));
  1305. } else {
  1306. // legacy encoding, which is not really portable (depends on big/little
  1307. // endianness)
  1308. output_header_len = 8;
  1309. output->resize(output_header_len);
  1310. char* p = const_cast<char*>(output->c_str());
  1311. memcpy(p, &length, sizeof(length));
  1312. }
  1313. int compress_bound = LZ4_compressBound(static_cast<int>(length));
  1314. output->resize(static_cast<size_t>(output_header_len + compress_bound));
  1315. int outlen;
  1316. int level;
  1317. if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
  1318. level = 0; // lz4hc.h says any value < 1 will be sanitized to default
  1319. } else {
  1320. level = info.options().level;
  1321. }
  1322. #if LZ4_VERSION_NUMBER >= 10400 // r124+
  1323. LZ4_streamHC_t* stream = LZ4_createStreamHC();
  1324. LZ4_resetStreamHC(stream, level);
  1325. Slice compression_dict = info.dict().GetRawDict();
  1326. const char* compression_dict_data =
  1327. compression_dict.size() > 0 ? compression_dict.data() : nullptr;
  1328. size_t compression_dict_size = compression_dict.size();
  1329. if (compression_dict_data != nullptr) {
  1330. LZ4_loadDictHC(stream, compression_dict_data,
  1331. static_cast<int>(compression_dict_size));
  1332. }
  1333. #if LZ4_VERSION_NUMBER >= 10700 // r129+
  1334. outlen =
  1335. LZ4_compress_HC_continue(stream, input, &(*output)[output_header_len],
  1336. static_cast<int>(length), compress_bound);
  1337. #else // r124-r128
  1338. outlen = LZ4_compressHC_limitedOutput_continue(
  1339. stream, input, &(*output)[output_header_len], static_cast<int>(length),
  1340. compress_bound);
  1341. #endif // LZ4_VERSION_NUMBER >= 10700
  1342. LZ4_freeStreamHC(stream);
  1343. #elif LZ4_VERSION_MAJOR // r113-r123
  1344. outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len],
  1345. static_cast<int>(length),
  1346. compress_bound, level);
  1347. #else // up to r112
  1348. outlen =
  1349. LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len],
  1350. static_cast<int>(length), compress_bound);
  1351. #endif // LZ4_VERSION_NUMBER >= 10400
  1352. if (outlen == 0) {
  1353. return false;
  1354. }
  1355. output->resize(static_cast<size_t>(output_header_len + outlen));
  1356. return true;
  1357. #else // LZ4
  1358. (void)info;
  1359. (void)compress_format_version;
  1360. (void)input;
  1361. (void)length;
  1362. (void)output;
  1363. return false;
  1364. #endif
  1365. }
  1366. #ifdef XPRESS
  1367. inline bool XPRESS_Compress(const char* input, size_t length,
  1368. std::string* output) {
  1369. return port::xpress::Compress(input, length, output);
  1370. }
  1371. #else
  1372. inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
  1373. std::string* /*output*/) {
  1374. return false;
  1375. }
  1376. #endif
  1377. #ifdef XPRESS
  1378. inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
  1379. size_t* uncompressed_size) {
  1380. return port::xpress::Decompress(input_data, input_length, uncompressed_size);
  1381. }
  1382. #else
  1383. inline char* XPRESS_Uncompress(const char* /*input_data*/,
  1384. size_t /*input_length*/,
  1385. size_t* /*uncompressed_size*/) {
  1386. return nullptr;
  1387. }
  1388. #endif
  1389. inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
  1390. size_t length, ::std::string* output) {
  1391. #ifdef ZSTD
  1392. if (length > std::numeric_limits<uint32_t>::max()) {
  1393. // Can't compress more than 4GB
  1394. return false;
  1395. }
  1396. size_t output_header_len = compression::PutDecompressedSizeInfo(
  1397. output, static_cast<uint32_t>(length));
  1398. size_t compressBound = ZSTD_compressBound(length);
  1399. // TODO: use resize_and_overwrite with c++23
  1400. output->resize(static_cast<size_t>(output_header_len + compressBound));
  1401. size_t outlen = 0;
  1402. ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
  1403. assert(context != nullptr);
  1404. if (info.dict().GetDigestedZstdCDict() != nullptr) {
  1405. ZSTD_CCtx_refCDict(context, info.dict().GetDigestedZstdCDict());
  1406. } else {
  1407. ZSTD_CCtx_loadDictionary(context, info.dict().GetRawDict().data(),
  1408. info.dict().GetRawDict().size());
  1409. }
  1410. // Compression level is set in `contex` during CreateNativeContext()
  1411. outlen = ZSTD_compress2(context, &(*output)[output_header_len], compressBound,
  1412. input, length);
  1413. if (outlen == 0) {
  1414. return false;
  1415. }
  1416. output->resize(output_header_len + outlen);
  1417. return true;
  1418. #else // ZSTD
  1419. (void)info;
  1420. (void)input;
  1421. (void)length;
  1422. (void)output;
  1423. return false;
  1424. #endif
  1425. }
  1426. // @param compression_dict Data for presetting the compression library's
  1427. // dictionary.
  1428. // @param error_message If not null, will be set if decompression fails.
  1429. //
  1430. // Returns nullptr if decompression fails.
  1431. inline CacheAllocationPtr ZSTD_Uncompress(
  1432. const UncompressionInfo& info, const char* input_data, size_t input_length,
  1433. size_t* uncompressed_size, MemoryAllocator* allocator = nullptr,
  1434. const char** error_message = nullptr) {
  1435. #ifdef ZSTD
  1436. static const char* const kErrorDecodeOutputSize =
  1437. "Cannot decode output size.";
  1438. static const char* const kErrorOutputLenMismatch =
  1439. "Decompressed size does not match header.";
  1440. uint32_t output_len = 0;
  1441. if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
  1442. &output_len)) {
  1443. if (error_message) {
  1444. *error_message = kErrorDecodeOutputSize;
  1445. }
  1446. return nullptr;
  1447. }
  1448. CacheAllocationPtr output = AllocateBlock(output_len, allocator);
  1449. size_t actual_output_length = 0;
  1450. ZSTD_DCtx* context = info.context().GetZSTDContext();
  1451. assert(context != nullptr);
  1452. #ifdef ROCKSDB_ZSTD_DDICT
  1453. if (info.dict().GetDigestedZstdDDict() != nullptr) {
  1454. actual_output_length = ZSTD_decompress_usingDDict(
  1455. context, output.get(), output_len, input_data, input_length,
  1456. info.dict().GetDigestedZstdDDict());
  1457. } else {
  1458. #endif // ROCKSDB_ZSTD_DDICT
  1459. actual_output_length = ZSTD_decompress_usingDict(
  1460. context, output.get(), output_len, input_data, input_length,
  1461. info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
  1462. #ifdef ROCKSDB_ZSTD_DDICT
  1463. }
  1464. #endif // ROCKSDB_ZSTD_DDICT
  1465. if (ZSTD_isError(actual_output_length)) {
  1466. if (error_message) {
  1467. *error_message = ZSTD_getErrorName(actual_output_length);
  1468. }
  1469. return nullptr;
  1470. } else if (actual_output_length != output_len) {
  1471. if (error_message) {
  1472. *error_message = kErrorOutputLenMismatch;
  1473. }
  1474. return nullptr;
  1475. }
  1476. *uncompressed_size = actual_output_length;
  1477. return output;
  1478. #else // ZSTD
  1479. (void)info;
  1480. (void)input_data;
  1481. (void)input_length;
  1482. (void)uncompressed_size;
  1483. (void)allocator;
  1484. (void)error_message;
  1485. return nullptr;
  1486. #endif
  1487. }
  1488. inline bool ZSTD_TrainDictionarySupported() {
  1489. #ifdef ZSTD
  1490. // NB: Dictionary trainer is available since v0.6.1 for static linking, but
  1491. // not available for dynamic linking until v1.1.3. See ZSTD_VERSION_NUMBER
  1492. // check above.
  1493. return true;
  1494. #else
  1495. return false;
  1496. #endif
  1497. }
  1498. inline std::string ZSTD_TrainDictionary(const std::string& samples,
  1499. const std::vector<size_t>& sample_lens,
  1500. size_t max_dict_bytes) {
  1501. #ifdef ZSTD
  1502. assert(samples.empty() == sample_lens.empty());
  1503. if (samples.empty()) {
  1504. return "";
  1505. }
  1506. std::string dict_data(max_dict_bytes, '\0');
  1507. size_t dict_len = ZDICT_trainFromBuffer(
  1508. &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
  1509. static_cast<unsigned>(sample_lens.size()));
  1510. if (ZDICT_isError(dict_len)) {
  1511. return "";
  1512. }
  1513. assert(dict_len <= max_dict_bytes);
  1514. dict_data.resize(dict_len);
  1515. return dict_data;
  1516. #else
  1517. assert(false);
  1518. (void)samples;
  1519. (void)sample_lens;
  1520. (void)max_dict_bytes;
  1521. return "";
  1522. #endif // ZSTD
  1523. }
  1524. inline std::string ZSTD_TrainDictionary(const std::string& samples,
  1525. size_t sample_len_shift,
  1526. size_t max_dict_bytes) {
  1527. #ifdef ZSTD
  1528. // skips potential partial sample at the end of "samples"
  1529. size_t num_samples = samples.size() >> sample_len_shift;
  1530. std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
  1531. return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
  1532. #else
  1533. assert(false);
  1534. (void)samples;
  1535. (void)sample_len_shift;
  1536. (void)max_dict_bytes;
  1537. return "";
  1538. #endif // ZSTD
  1539. }
  1540. inline bool ZSTD_FinalizeDictionarySupported() {
  1541. #ifdef ROCKSDB_ZDICT_FINALIZE
  1542. return true;
  1543. #else
  1544. return false;
  1545. #endif
  1546. }
  1547. inline std::string ZSTD_FinalizeDictionary(
  1548. const std::string& samples, const std::vector<size_t>& sample_lens,
  1549. size_t max_dict_bytes, int level) {
  1550. #ifdef ROCKSDB_ZDICT_FINALIZE
  1551. assert(samples.empty() == sample_lens.empty());
  1552. if (samples.empty()) {
  1553. return "";
  1554. }
  1555. if (level == CompressionOptions::kDefaultCompressionLevel) {
  1556. // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
  1557. level = ZSTD_CLEVEL_DEFAULT;
  1558. }
  1559. std::string dict_data(max_dict_bytes, '\0');
  1560. size_t dict_len = ZDICT_finalizeDictionary(
  1561. dict_data.data(), max_dict_bytes, samples.data(),
  1562. std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
  1563. samples.data(), sample_lens.data(),
  1564. static_cast<unsigned>(sample_lens.size()),
  1565. {level, 0 /* notificationLevel */, 0 /* dictID */});
  1566. if (ZDICT_isError(dict_len)) {
  1567. return "";
  1568. } else {
  1569. assert(dict_len <= max_dict_bytes);
  1570. dict_data.resize(dict_len);
  1571. return dict_data;
  1572. }
  1573. #else
  1574. assert(false);
  1575. (void)samples;
  1576. (void)sample_lens;
  1577. (void)max_dict_bytes;
  1578. (void)level;
  1579. return "";
  1580. #endif // ROCKSDB_ZDICT_FINALIZE
  1581. }
  1582. inline bool OLD_CompressData(const Slice& raw,
  1583. const CompressionInfo& compression_info,
  1584. uint32_t compress_format_version,
  1585. std::string* compressed_output) {
  1586. bool ret = false;
  1587. // Will return compressed block contents if (1) the compression method is
  1588. // supported in this platform and (2) the compression rate is "good enough".
  1589. switch (compression_info.type()) {
  1590. case kSnappyCompression:
  1591. ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
  1592. compressed_output);
  1593. break;
  1594. case kZlibCompression:
  1595. ret = Zlib_Compress(compression_info, compress_format_version, raw.data(),
  1596. raw.size(), compressed_output);
  1597. break;
  1598. case kBZip2Compression:
  1599. ret = BZip2_Compress(compression_info, compress_format_version,
  1600. raw.data(), raw.size(), compressed_output);
  1601. break;
  1602. case kLZ4Compression:
  1603. ret = LZ4_Compress(compression_info, compress_format_version, raw.data(),
  1604. raw.size(), compressed_output);
  1605. break;
  1606. case kLZ4HCCompression:
  1607. ret = LZ4HC_Compress(compression_info, compress_format_version,
  1608. raw.data(), raw.size(), compressed_output);
  1609. break;
  1610. case kXpressCompression:
  1611. ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
  1612. break;
  1613. case kZSTD:
  1614. ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
  1615. compressed_output);
  1616. break;
  1617. default:
  1618. // Do not recognize this compression type
  1619. break;
  1620. }
  1621. TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
  1622. static_cast<void*>(&ret));
  1623. return ret;
  1624. }
  1625. inline CacheAllocationPtr OLD_UncompressData(
  1626. const UncompressionInfo& uncompression_info, const char* data, size_t n,
  1627. size_t* uncompressed_size, uint32_t compress_format_version,
  1628. MemoryAllocator* allocator = nullptr,
  1629. const char** error_message = nullptr) {
  1630. switch (uncompression_info.type()) {
  1631. case kSnappyCompression:
  1632. return Snappy_Uncompress(data, n, uncompressed_size, allocator);
  1633. case kZlibCompression:
  1634. return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size,
  1635. compress_format_version, allocator);
  1636. case kBZip2Compression:
  1637. return BZip2_Uncompress(data, n, uncompressed_size,
  1638. compress_format_version, allocator);
  1639. case kLZ4Compression:
  1640. case kLZ4HCCompression:
  1641. return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size,
  1642. compress_format_version, allocator);
  1643. case kXpressCompression:
  1644. // XPRESS allocates memory internally, thus no support for custom
  1645. // allocator.
  1646. return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
  1647. case kZSTD:
  1648. // TODO(cbi): error message handling for other compression algorithms.
  1649. return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
  1650. allocator, error_message);
  1651. default:
  1652. return CacheAllocationPtr();
  1653. }
  1654. }
  1655. // ***********************************************************************
  1656. // BEGIN built-in implementation of customization interface
  1657. // ***********************************************************************
  1658. // NOTE: to avoid compression API depending on block-based table API, uses
  1659. // its own format version. See internal function GetCompressFormatForVersion()
  1660. const std::shared_ptr<CompressionManager>& GetBuiltinCompressionManager(
  1661. int compression_format_version);
  1662. // ***********************************************************************
  1663. // END built-in implementation of customization interface
  1664. // ***********************************************************************
  1665. // Records the compression type for subsequent WAL records.
  1666. class CompressionTypeRecord {
  1667. public:
  1668. explicit CompressionTypeRecord(CompressionType compression_type)
  1669. : compression_type_(compression_type) {}
  1670. CompressionType GetCompressionType() const { return compression_type_; }
  1671. inline void EncodeTo(std::string* dst) const {
  1672. assert(dst != nullptr);
  1673. PutFixed32(dst, compression_type_);
  1674. }
  1675. inline Status DecodeFrom(Slice* src) {
  1676. constexpr char class_name[] = "CompressionTypeRecord";
  1677. uint32_t val;
  1678. if (!GetFixed32(src, &val)) {
  1679. return Status::Corruption(class_name,
  1680. "Error decoding WAL compression type");
  1681. }
  1682. CompressionType compression_type = static_cast<CompressionType>(val);
  1683. if (!StreamingCompressionTypeSupported(compression_type)) {
  1684. return Status::Corruption(class_name,
  1685. "WAL compression type not supported");
  1686. }
  1687. compression_type_ = compression_type;
  1688. return Status::OK();
  1689. }
  1690. inline std::string DebugString() const {
  1691. return "compression_type: " + CompressionTypeToString(compression_type_);
  1692. }
  1693. private:
  1694. CompressionType compression_type_;
  1695. };
  1696. // Base class to implement compression for a stream of buffers.
  1697. // Instantiate an implementation of the class using Create() with the
  1698. // compression type and use Compress() repeatedly.
  1699. // The output buffer needs to be at least max_output_len.
  1700. // Call Reset() in between frame boundaries or in case of an error.
  1701. // NOTE: This class is not thread safe.
  1702. class StreamingCompress {
  1703. public:
  1704. StreamingCompress(CompressionType compression_type,
  1705. const CompressionOptions& opts,
  1706. uint32_t compress_format_version, size_t max_output_len)
  1707. : compression_type_(compression_type),
  1708. opts_(opts),
  1709. compress_format_version_(compress_format_version),
  1710. max_output_len_(max_output_len) {}
  1711. virtual ~StreamingCompress() = default;
  1712. // compress should be called repeatedly with the same input till the method
  1713. // returns 0
  1714. // Parameters:
  1715. // input - buffer to compress
  1716. // input_size - size of input buffer
  1717. // output - compressed buffer allocated by caller, should be at least
  1718. // max_output_len
  1719. // output_size - size of the output buffer
  1720. // Returns -1 for errors, the remaining size of the input buffer that needs
  1721. // to be compressed
  1722. virtual int Compress(const char* input, size_t input_size, char* output,
  1723. size_t* output_pos) = 0;
  1724. // static method to create object of a class inherited from
  1725. // StreamingCompress based on the actual compression type.
  1726. static StreamingCompress* Create(CompressionType compression_type,
  1727. const CompressionOptions& opts,
  1728. uint32_t compress_format_version,
  1729. size_t max_output_len);
  1730. virtual void Reset() = 0;
  1731. protected:
  1732. const CompressionType compression_type_;
  1733. const CompressionOptions opts_;
  1734. const uint32_t compress_format_version_;
  1735. const size_t max_output_len_;
  1736. };
  1737. // Base class to uncompress a stream of compressed buffers.
  1738. // Instantiate an implementation of the class using Create() with the
  1739. // compression type and use Uncompress() repeatedly.
  1740. // The output buffer needs to be at least max_output_len.
  1741. // Call Reset() in between frame boundaries or in case of an error.
  1742. // NOTE: This class is not thread safe.
  1743. class StreamingUncompress {
  1744. public:
  1745. StreamingUncompress(CompressionType compression_type,
  1746. uint32_t compress_format_version, size_t max_output_len)
  1747. : compression_type_(compression_type),
  1748. compress_format_version_(compress_format_version),
  1749. max_output_len_(max_output_len) {}
  1750. virtual ~StreamingUncompress() = default;
  1751. // Uncompress can be called repeatedly to progressively process the same
  1752. // input buffer, or can be called with a new input buffer. When the input
  1753. // buffer is not fully consumed, the return value is > 0 or output_size
  1754. // == max_output_len. When calling uncompress to continue processing the
  1755. // same input buffer, the input argument should be nullptr.
  1756. // Parameters:
  1757. // input - buffer to uncompress
  1758. // input_size - size of input buffer
  1759. // output - uncompressed buffer allocated by caller, should be at least
  1760. // max_output_len
  1761. // output_size - size of the output buffer
  1762. // Returns -1 for errors, remaining input to be processed otherwise.
  1763. virtual int Uncompress(const char* input, size_t input_size, char* output,
  1764. size_t* output_pos) = 0;
  1765. static StreamingUncompress* Create(CompressionType compression_type,
  1766. uint32_t compress_format_version,
  1767. size_t max_output_len);
  1768. virtual void Reset() = 0;
  1769. protected:
  1770. CompressionType compression_type_;
  1771. uint32_t compress_format_version_;
  1772. size_t max_output_len_;
  1773. };
  1774. class ZSTDStreamingCompress final : public StreamingCompress {
  1775. public:
  1776. explicit ZSTDStreamingCompress(const CompressionOptions& opts,
  1777. uint32_t compress_format_version,
  1778. size_t max_output_len)
  1779. : StreamingCompress(kZSTD, opts, compress_format_version,
  1780. max_output_len) {
  1781. #ifdef ZSTD
  1782. cctx_ = ZSTD_createCCtx();
  1783. // Each compressed frame will have a checksum
  1784. ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
  1785. assert(cctx_ != nullptr);
  1786. input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
  1787. #endif
  1788. }
  1789. ~ZSTDStreamingCompress() override {
  1790. #ifdef ZSTD
  1791. ZSTD_freeCCtx(cctx_);
  1792. #endif
  1793. }
  1794. int Compress(const char* input, size_t input_size, char* output,
  1795. size_t* output_pos) override;
  1796. void Reset() override;
  1797. #ifdef ZSTD
  1798. ZSTD_CCtx* cctx_;
  1799. ZSTD_inBuffer input_buffer_;
  1800. #endif
  1801. };
  1802. class ZSTDStreamingUncompress final : public StreamingUncompress {
  1803. public:
  1804. explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
  1805. size_t max_output_len)
  1806. : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
  1807. #ifdef ZSTD
  1808. dctx_ = ZSTD_createDCtx();
  1809. assert(dctx_ != nullptr);
  1810. input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
  1811. #endif
  1812. }
  1813. ~ZSTDStreamingUncompress() override {
  1814. #ifdef ZSTD
  1815. ZSTD_freeDCtx(dctx_);
  1816. #endif
  1817. }
  1818. int Uncompress(const char* input, size_t input_size, char* output,
  1819. size_t* output_size) override;
  1820. void Reset() override;
  1821. private:
  1822. #ifdef ZSTD
  1823. ZSTD_DCtx* dctx_;
  1824. ZSTD_inBuffer input_buffer_;
  1825. #endif
  1826. };
  1827. } // namespace ROCKSDB_NAMESPACE