| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- //
- #pragma once
- #include <algorithm>
- #include <limits>
- #ifdef ROCKSDB_MALLOC_USABLE_SIZE
- #ifdef OS_FREEBSD
- #include <malloc_np.h>
- #else // OS_FREEBSD
- #include <malloc.h>
- #endif // OS_FREEBSD
- #endif // ROCKSDB_MALLOC_USABLE_SIZE
- #include <string>
- #include "memory/memory_allocator_impl.h"
- #include "port/likely.h"
- #include "rocksdb/advanced_compression.h"
- #include "rocksdb/options.h"
- #include "table/block_based/block_type.h"
- #include "test_util/sync_point.h"
- #include "util/atomic.h"
- #include "util/cast_util.h"
- #include "util/coding.h"
- #include "util/compression_context_cache.h"
- #include "util/string_util.h"
- #ifdef SNAPPY
- #include <snappy-sinksource.h>
- #include <snappy.h>
- #endif
- #ifdef ZLIB
- #include <zlib.h>
- #endif
- #ifdef BZIP2
- #include <bzlib.h>
- #endif
- #if defined(LZ4)
- #include <lz4.h>
- #include <lz4hc.h>
- #if LZ4_VERSION_NUMBER < 10700 // < r129
- #error "LZ4 support requires version >= 1.7.0 (lz4-devel)"
- #endif
- #endif
- #ifdef ZSTD
- #include <zstd.h>
- #include <zstd_errors.h>
- // ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to
- // advanced APIs and require v1.4.0+, which is from April 2019.
- // https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45
- // To avoid a rat's nest of #ifdefs, we now require v1.4.0+ for ZSTD support.
- #if ZSTD_VERSION_NUMBER < 10400
- #error "ZSTD support requires version >= 1.4.0 (libzstd-devel)"
- #endif // ZSTD_VERSION_NUMBER
- // The above release also includes digested dictionary support, but some
- // required functions (ZSTD_createDDict_byReference) are still only exported
- // with ZSTD_STATIC_LINKING_ONLY defined.
- #if defined(ZSTD_STATIC_LINKING_ONLY)
- #define ROCKSDB_ZSTD_DDICT
- #endif // defined(ZSTD_STATIC_LINKING_ONLY)
- // For ZDICT_* functions
- #include <zdict.h>
- // ZDICT_finalizeDictionary API is exported and stable since v1.4.5
- #if ZSTD_VERSION_NUMBER >= 10405
- #define ROCKSDB_ZDICT_FINALIZE
- #endif // ZSTD_VERSION_NUMBER >= 10405
- #endif // ZSTD
- namespace ROCKSDB_NAMESPACE {
- // Need this for the context allocation override
- // On windows we need to do this explicitly
- #if defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
- defined(ZSTD_STATIC_LINKING_ONLY)
- #define ROCKSDB_ZSTD_CUSTOM_MEM
- namespace port {
- ZSTD_customMem GetJeZstdAllocationOverrides();
- } // namespace port
- #endif // defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
- // defined(ZSTD_STATIC_LINKING_ONLY)
- // Cached data represents a portion that can be re-used
- // If, in the future we have more than one native context to
- // cache we can arrange this as a tuple
- class ZSTDUncompressCachedData {
- public:
- #if defined(ZSTD)
- using ZSTDNativeContext = ZSTD_DCtx*;
- #else
- using ZSTDNativeContext = void*;
- #endif // ZSTD
- ZSTDUncompressCachedData() {}
- // Init from cache
- ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
- ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
- ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
- : ZSTDUncompressCachedData() {
- *this = std::move(o);
- }
- ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
- assert(zstd_ctx_ == nullptr);
- std::swap(zstd_ctx_, o.zstd_ctx_);
- std::swap(cache_idx_, o.cache_idx_);
- return *this;
- }
- ZSTDNativeContext Get() const { return zstd_ctx_; }
- int64_t GetCacheIndex() const { return cache_idx_; }
- void CreateIfNeeded() {
- if (zstd_ctx_ == nullptr) {
- #if !defined(ZSTD)
- zstd_ctx_ = nullptr;
- #elif defined(ROCKSDB_ZSTD_CUSTOM_MEM)
- zstd_ctx_ =
- ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
- #else // ZSTD && !ROCKSDB_ZSTD_CUSTOM_MEM
- zstd_ctx_ = ZSTD_createDCtx();
- #endif
- cache_idx_ = -1;
- }
- }
- void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
- zstd_ctx_ = o.zstd_ctx_;
- cache_idx_ = idx;
- }
- ~ZSTDUncompressCachedData() {
- #if defined(ZSTD)
- if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
- ZSTD_freeDCtx(zstd_ctx_);
- }
- #endif // ZSTD
- }
- private:
- ZSTDNativeContext zstd_ctx_ = nullptr;
- int64_t cache_idx_ = -1; // -1 means this instance owns the context
- };
- } // namespace ROCKSDB_NAMESPACE
- #if defined(XPRESS)
- #include "port/xpress.h"
- #endif
- namespace ROCKSDB_NAMESPACE {
- class FailureDecompressor : public Decompressor {
- public:
- explicit FailureDecompressor(Status&& status) : status_(std::move(status)) {
- assert(!status_.ok());
- }
- ~FailureDecompressor() override { status_.PermitUncheckedError(); }
- const char* Name() const override { return "FailureDecompressor"; }
- Status ExtractUncompressedSize(Args& /*args*/) override { return status_; }
- Status DecompressBlock(const Args& /*args*/,
- char* /*uncompressed_output*/) override {
- return status_;
- }
- protected:
- Status status_;
- };
- // Owns a decompression dictionary, and associated Decompressor, for storing
- // in the block cache.
- //
- // Justification: for a "processed" dictionary to be saved in block cache, we
- // also need a reference to the decompressor that processed it, to ensure it
- // is recognized properly. At that point, we might as well have the dictionary
- // part of the decompressor identity and track an associated decompressor along
- // with a decompression dictionary in the block cache, and the decompressor
- // hides potential details of processing the dictionary.
- struct DecompressorDict {
- // Block containing the data for the compression dictionary in case the
- // constructor that takes a string parameter is used.
- std::string dict_str_;
- // Block containing the data for the compression dictionary in case the
- // constructor that takes a Slice parameter is used and the passed in
- // CacheAllocationPtr is not nullptr.
- CacheAllocationPtr dict_allocation_;
- // A Decompressor referencing and using the dictionary owned by this.
- std::unique_ptr<Decompressor> decompressor_;
- // Approximate owned memory usage
- size_t memory_usage_;
- DecompressorDict(std::string&& dict, Decompressor& from_decompressor)
- : dict_str_(std::move(dict)) {
- Populate(from_decompressor, dict_str_);
- }
- DecompressorDict(Slice slice, CacheAllocationPtr&& allocation,
- Decompressor& from_decompressor)
- : dict_allocation_(std::move(allocation)) {
- Populate(from_decompressor, slice);
- }
- DecompressorDict(DecompressorDict&& rhs) noexcept
- : dict_str_(std::move(rhs.dict_str_)),
- dict_allocation_(std::move(rhs.dict_allocation_)),
- decompressor_(std::move(rhs.decompressor_)),
- memory_usage_(std::move(rhs.memory_usage_)) {}
- DecompressorDict& operator=(DecompressorDict&& rhs) noexcept {
- if (this == &rhs) {
- return *this;
- }
- dict_str_ = std::move(rhs.dict_str_);
- dict_allocation_ = std::move(rhs.dict_allocation_);
- decompressor_ = std::move(rhs.decompressor_);
- return *this;
- }
- // Disable copy
- DecompressorDict(const DecompressorDict&) = delete;
- DecompressorDict& operator=(const DecompressorDict&) = delete;
- // The object is self-contained if the string constructor is used, or the
- // Slice constructor is invoked with a non-null allocation. Otherwise, it
- // is the caller's responsibility to ensure that the underlying storage
- // outlives this object.
- bool own_bytes() const { return !dict_str_.empty() || dict_allocation_; }
- const Slice& GetRawDict() const { return decompressor_->GetSerializedDict(); }
- // For TypedCacheInterface
- const Slice& ContentSlice() const { return GetRawDict(); }
- static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
- static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
- size_t ApproximateMemoryUsage() const { return memory_usage_; }
- private:
- void Populate(Decompressor& from_decompressor, Slice dict) {
- if (UNLIKELY(dict.empty())) {
- dict_str_ = {};
- dict_allocation_ = {};
- // Appropriately reject bad files with empty dictionary block.
- // It is longstanding not to write an empty dictionary block:
- // https://github.com/facebook/rocksdb/blame/10.2.fb/table/block_based/block_based_table_builder.cc#L1841
- decompressor_ = std::make_unique<FailureDecompressor>(
- Status::Corruption("Decompression dictionary is empty"));
- } else {
- Status s = from_decompressor.MaybeCloneForDict(dict, &decompressor_);
- if (decompressor_ == nullptr) {
- dict_str_ = {};
- dict_allocation_ = {};
- assert(!s.ok());
- decompressor_ = std::make_unique<FailureDecompressor>(std::move(s));
- } else {
- assert(s.ok());
- assert(decompressor_->GetSerializedDict() == dict);
- }
- }
- memory_usage_ = sizeof(struct DecompressorDict);
- memory_usage_ += dict_str_.size();
- if (dict_allocation_) {
- auto allocator = dict_allocation_.get_deleter().allocator;
- if (allocator) {
- memory_usage_ +=
- allocator->UsableSize(dict_allocation_.get(), GetRawDict().size());
- } else {
- memory_usage_ += GetRawDict().size();
- }
- }
- memory_usage_ += decompressor_->ApproximateOwnedMemoryUsage();
- }
- };
- // Holds dictionary and related data, like ZSTD's digested compression
- // dictionary.
- struct CompressionDict {
- #ifdef ZSTD
- ZSTD_CDict* zstd_cdict_ = nullptr;
- #endif // ZSTD
- std::string dict_;
- public:
- CompressionDict() = default;
- CompressionDict(std::string&& dict, CompressionType type, int level) {
- dict_ = std::move(dict);
- #ifdef ZSTD
- zstd_cdict_ = nullptr;
- if (!dict_.empty() && type == kZSTD) {
- if (level == CompressionOptions::kDefaultCompressionLevel) {
- // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
- level = ZSTD_CLEVEL_DEFAULT;
- }
- // Should be safe (but slower) if below call fails as we'll use the
- // raw dictionary to compress.
- zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
- assert(zstd_cdict_ != nullptr);
- }
- #else
- (void)type;
- (void)level;
- #endif // ZSTD
- }
- CompressionDict(CompressionDict&& other) {
- #ifdef ZSTD
- zstd_cdict_ = other.zstd_cdict_;
- other.zstd_cdict_ = nullptr;
- #endif // ZSTD
- dict_ = std::move(other.dict_);
- }
- CompressionDict& operator=(CompressionDict&& other) {
- if (this == &other) {
- return *this;
- }
- #ifdef ZSTD
- zstd_cdict_ = other.zstd_cdict_;
- other.zstd_cdict_ = nullptr;
- #endif // ZSTD
- dict_ = std::move(other.dict_);
- return *this;
- }
- ~CompressionDict() {
- #ifdef ZSTD
- size_t res = 0;
- if (zstd_cdict_ != nullptr) {
- res = ZSTD_freeCDict(zstd_cdict_);
- }
- assert(res == 0); // Last I checked they can't fail
- (void)res; // prevent unused var warning
- #endif // ZSTD
- }
- #ifdef ZSTD
- const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
- #endif // ZSTD
- Slice GetRawDict() const { return dict_; }
- bool empty() const { return dict_.empty(); }
- static const CompressionDict& GetEmptyDict() {
- static CompressionDict empty_dict{};
- return empty_dict;
- }
- // Disable copy
- CompressionDict(const CompressionDict&) = delete;
- CompressionDict& operator=(const CompressionDict&) = delete;
- };
- // Holds dictionary and related data, like ZSTD's digested uncompression
- // dictionary.
- struct UncompressionDict {
- // Block containing the data for the compression dictionary in case the
- // constructor that takes a string parameter is used.
- std::string dict_;
- // Block containing the data for the compression dictionary in case the
- // constructor that takes a Slice parameter is used and the passed in
- // CacheAllocationPtr is not nullptr.
- CacheAllocationPtr allocation_;
- // Slice pointing to the compression dictionary data. Can point to
- // dict_, allocation_, or some other memory location, depending on how
- // the object was constructed.
- Slice slice_;
- #ifdef ROCKSDB_ZSTD_DDICT
- // Processed version of the contents of slice_ for ZSTD compression.
- ZSTD_DDict* zstd_ddict_ = nullptr;
- #endif // ROCKSDB_ZSTD_DDICT
- UncompressionDict(std::string&& dict, bool using_zstd)
- : dict_(std::move(dict)), slice_(dict_) {
- #ifdef ROCKSDB_ZSTD_DDICT
- if (!slice_.empty() && using_zstd) {
- zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
- assert(zstd_ddict_ != nullptr);
- }
- #else
- (void)using_zstd;
- #endif // ROCKSDB_ZSTD_DDICT
- }
- UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
- bool using_zstd)
- : allocation_(std::move(allocation)), slice_(std::move(slice)) {
- #ifdef ROCKSDB_ZSTD_DDICT
- if (!slice_.empty() && using_zstd) {
- zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
- assert(zstd_ddict_ != nullptr);
- }
- #else
- (void)using_zstd;
- #endif // ROCKSDB_ZSTD_DDICT
- }
- UncompressionDict(UncompressionDict&& rhs)
- : dict_(std::move(rhs.dict_)),
- allocation_(std::move(rhs.allocation_)),
- slice_(std::move(rhs.slice_))
- #ifdef ROCKSDB_ZSTD_DDICT
- ,
- zstd_ddict_(rhs.zstd_ddict_)
- #endif
- {
- #ifdef ROCKSDB_ZSTD_DDICT
- rhs.zstd_ddict_ = nullptr;
- #endif
- }
- ~UncompressionDict() {
- #ifdef ROCKSDB_ZSTD_DDICT
- size_t res = 0;
- if (zstd_ddict_ != nullptr) {
- res = ZSTD_freeDDict(zstd_ddict_);
- }
- assert(res == 0); // Last I checked they can't fail
- (void)res; // prevent unused var warning
- #endif // ROCKSDB_ZSTD_DDICT
- }
- UncompressionDict& operator=(UncompressionDict&& rhs) {
- if (this == &rhs) {
- return *this;
- }
- dict_ = std::move(rhs.dict_);
- allocation_ = std::move(rhs.allocation_);
- slice_ = std::move(rhs.slice_);
- #ifdef ROCKSDB_ZSTD_DDICT
- zstd_ddict_ = rhs.zstd_ddict_;
- rhs.zstd_ddict_ = nullptr;
- #endif
- return *this;
- }
- // The object is self-contained if the string constructor is used, or the
- // Slice constructor is invoked with a non-null allocation. Otherwise, it
- // is the caller's responsibility to ensure that the underlying storage
- // outlives this object.
- bool own_bytes() const { return !dict_.empty() || allocation_; }
- const Slice& GetRawDict() const { return slice_; }
- // For TypedCacheInterface
- const Slice& ContentSlice() const { return slice_; }
- static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
- static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
- #ifdef ROCKSDB_ZSTD_DDICT
- const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
- #endif // ROCKSDB_ZSTD_DDICT
- static const UncompressionDict& GetEmptyDict() {
- static UncompressionDict empty_dict{};
- return empty_dict;
- }
- size_t ApproximateMemoryUsage() const {
- size_t usage = sizeof(struct UncompressionDict);
- usage += dict_.size();
- if (allocation_) {
- auto allocator = allocation_.get_deleter().allocator;
- if (allocator) {
- usage += allocator->UsableSize(allocation_.get(), slice_.size());
- } else {
- usage += slice_.size();
- }
- }
- #ifdef ROCKSDB_ZSTD_DDICT
- usage += ZSTD_sizeof_DDict(zstd_ddict_);
- #endif // ROCKSDB_ZSTD_DDICT
- return usage;
- }
- UncompressionDict() = default;
- // Disable copy
- UncompressionDict(const CompressionDict&) = delete;
- UncompressionDict& operator=(const CompressionDict&) = delete;
- };
- class CompressionContext : public Compressor::WorkingArea {
- private:
- #ifdef ZSTD
- ZSTD_CCtx* zstd_ctx_ = nullptr;
- ZSTD_CCtx* CreateZSTDContext() {
- #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
- return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
- #else // ROCKSDB_ZSTD_CUSTOM_MEM
- return ZSTD_createCCtx();
- #endif // ROCKSDB_ZSTD_CUSTOM_MEM
- }
- public:
- // callable inside ZSTD_Compress
- ZSTD_CCtx* ZSTDPreallocCtx() const {
- assert(zstd_ctx_ != nullptr);
- return zstd_ctx_;
- }
- private:
- #endif // ZSTD
- void CreateNativeContext(CompressionType type, int level, bool checksum) {
- #ifdef ZSTD
- if (type == kZSTD) {
- zstd_ctx_ = CreateZSTDContext();
- if (level == CompressionOptions::kDefaultCompressionLevel) {
- // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
- level = ZSTD_CLEVEL_DEFAULT;
- }
- size_t err =
- ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
- if (ZSTD_isError(err)) {
- assert(false);
- ZSTD_freeCCtx(zstd_ctx_);
- zstd_ctx_ = CreateZSTDContext();
- }
- if (checksum) {
- err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
- if (ZSTD_isError(err)) {
- assert(false);
- ZSTD_freeCCtx(zstd_ctx_);
- zstd_ctx_ = CreateZSTDContext();
- }
- }
- }
- #else
- (void)type;
- (void)level;
- (void)checksum;
- #endif // ZSTD
- }
- void DestroyNativeContext() {
- #ifdef ZSTD
- if (zstd_ctx_ != nullptr) {
- ZSTD_freeCCtx(zstd_ctx_);
- }
- #endif // ZSTD
- }
- public:
- explicit CompressionContext(CompressionType type,
- const CompressionOptions& options) {
- CreateNativeContext(type, options.level, options.checksum);
- }
- ~CompressionContext() { DestroyNativeContext(); }
- CompressionContext(const CompressionContext&) = delete;
- CompressionContext& operator=(const CompressionContext&) = delete;
- };
- // TODO: rename
- class CompressionInfo {
- const CompressionOptions& opts_;
- const CompressionContext& context_;
- const CompressionDict& dict_;
- const CompressionType type_;
- public:
- CompressionInfo(const CompressionOptions& _opts,
- const CompressionContext& _context,
- const CompressionDict& _dict, CompressionType _type)
- : opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
- const CompressionOptions& options() const { return opts_; }
- const CompressionContext& context() const { return context_; }
- const CompressionDict& dict() const { return dict_; }
- CompressionType type() const { return type_; }
- };
- // This is like a working area, reusable for different dicts, etc.
- // TODO: refactor / consolidate
- class UncompressionContext : public Decompressor::WorkingArea {
- private:
- CompressionContextCache* ctx_cache_ = nullptr;
- ZSTDUncompressCachedData uncomp_cached_data_;
- public:
- explicit UncompressionContext(CompressionType type) {
- if (type == kZSTD) {
- ctx_cache_ = CompressionContextCache::Instance();
- uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
- }
- }
- ~UncompressionContext() {
- if (uncomp_cached_data_.GetCacheIndex() != -1) {
- assert(ctx_cache_ != nullptr);
- ctx_cache_->ReturnCachedZSTDUncompressData(
- uncomp_cached_data_.GetCacheIndex());
- }
- }
- UncompressionContext(const UncompressionContext&) = delete;
- UncompressionContext& operator=(const UncompressionContext&) = delete;
- ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
- return uncomp_cached_data_.Get();
- }
- };
- class UncompressionInfo {
- const UncompressionContext& context_;
- const UncompressionDict& dict_;
- const CompressionType type_;
- public:
- UncompressionInfo(const UncompressionContext& _context,
- const UncompressionDict& _dict, CompressionType _type)
- : context_(_context), dict_(_dict), type_(_type) {}
- const UncompressionContext& context() const { return context_; }
- const UncompressionDict& dict() const { return dict_; }
- CompressionType type() const { return type_; }
- };
- inline bool Snappy_Supported() {
- #ifdef SNAPPY
- return true;
- #else
- return false;
- #endif
- }
- inline bool Zlib_Supported() {
- #ifdef ZLIB
- return true;
- #else
- return false;
- #endif
- }
- inline bool BZip2_Supported() {
- #ifdef BZIP2
- return true;
- #else
- return false;
- #endif
- }
- inline bool LZ4_Supported() {
- #ifdef LZ4
- return true;
- #else
- return false;
- #endif
- }
- inline bool XPRESS_Supported() {
- #ifdef XPRESS
- return true;
- #else
- return false;
- #endif
- }
- inline bool ZSTD_Supported() {
- #ifdef ZSTD
- // NB: ZSTD format is finalized since version 0.8.0. See ZSTD_VERSION_NUMBER
- // check above.
- return true;
- #else
- return false;
- #endif
- }
- inline bool ZSTD_Streaming_Supported() {
- #if defined(ZSTD)
- return true;
- #else
- return false;
- #endif
- }
- inline bool StreamingCompressionTypeSupported(
- CompressionType compression_type) {
- switch (compression_type) {
- case kNoCompression:
- return true;
- case kZSTD:
- return ZSTD_Streaming_Supported();
- default:
- return false;
- }
- }
- inline bool CompressionTypeSupported(CompressionType compression_type) {
- switch (compression_type) {
- case kNoCompression:
- return true;
- case kSnappyCompression:
- return Snappy_Supported();
- case kZlibCompression:
- return Zlib_Supported();
- case kBZip2Compression:
- return BZip2_Supported();
- case kLZ4Compression:
- return LZ4_Supported();
- case kLZ4HCCompression:
- return LZ4_Supported();
- case kXpressCompression:
- return XPRESS_Supported();
- case kZSTD:
- return ZSTD_Supported();
- default: // Including custom compression types
- return false;
- }
- }
- inline bool DictCompressionTypeSupported(CompressionType compression_type) {
- switch (compression_type) {
- case kNoCompression:
- return false;
- case kSnappyCompression:
- return false;
- case kZlibCompression:
- return Zlib_Supported();
- case kBZip2Compression:
- return false;
- case kLZ4Compression:
- case kLZ4HCCompression:
- #if LZ4_VERSION_NUMBER >= 10400 // r124+
- return LZ4_Supported();
- #else
- return false;
- #endif
- case kXpressCompression:
- return false;
- case kZSTD:
- // NB: dictionary supported since 0.5.0. See ZSTD_VERSION_NUMBER check
- // above.
- return ZSTD_Supported();
- default: // Including custom compression types
- return false;
- }
- }
- // WART: does not match OptionsHelper::compression_type_string_map
- inline std::string CompressionTypeToString(CompressionType compression_type) {
- switch (compression_type) {
- case kNoCompression:
- return "NoCompression";
- case kSnappyCompression:
- return "Snappy";
- case kZlibCompression:
- return "Zlib";
- case kBZip2Compression:
- return "BZip2";
- case kLZ4Compression:
- return "LZ4";
- case kLZ4HCCompression:
- return "LZ4HC";
- case kXpressCompression:
- return "Xpress";
- case kZSTD:
- return "ZSTD";
- case kDisableCompressionOption:
- return "DisableOption";
- default: {
- bool is_custom = compression_type >= kFirstCustomCompression &&
- compression_type <= kLastCustomCompression;
- unsigned char c = lossless_cast<unsigned char>(compression_type);
- return (is_custom ? "Custom" : "Reserved") +
- ToBaseCharsString<16>(2, c, /*uppercase=*/true);
- }
- }
- }
- // WART: does not match OptionsHelper::compression_type_string_map
- inline CompressionType CompressionTypeFromString(
- std::string compression_type_str) {
- if (!compression_type_str.empty()) {
- switch (compression_type_str[0]) {
- case 'N':
- if (compression_type_str == "NoCompression") {
- return kNoCompression;
- }
- break;
- case 'S':
- if (compression_type_str == "Snappy") {
- return kSnappyCompression;
- }
- break;
- case 'Z':
- if (compression_type_str == "ZSTD") {
- return kZSTD;
- }
- if (compression_type_str == "Zlib") {
- return kZlibCompression;
- }
- break;
- case 'B':
- if (compression_type_str == "BZip2") {
- return kBZip2Compression;
- }
- break;
- case 'L':
- if (compression_type_str == "LZ4") {
- return kLZ4Compression;
- }
- if (compression_type_str == "LZ4HC") {
- return kLZ4HCCompression;
- }
- break;
- case 'X':
- if (compression_type_str == "Xpress") {
- return kXpressCompression;
- }
- break;
- default:;
- }
- }
- // unrecognized
- return kDisableCompressionOption;
- }
- inline std::string CompressionOptionsToString(
- const CompressionOptions& compression_options) {
- std::string result;
- result.reserve(512);
- result.append("window_bits=")
- .append(std::to_string(compression_options.window_bits))
- .append("; ");
- result.append("level=")
- .append(std::to_string(compression_options.level))
- .append("; ");
- result.append("strategy=")
- .append(std::to_string(compression_options.strategy))
- .append("; ");
- result.append("max_dict_bytes=")
- .append(std::to_string(compression_options.max_dict_bytes))
- .append("; ");
- result.append("zstd_max_train_bytes=")
- .append(std::to_string(compression_options.zstd_max_train_bytes))
- .append("; ");
- // NOTE: parallel_threads is skipped because it doesn't really affect the file
- // contents written, arguably doesn't belong in CompressionOptions
- result.append("enabled=")
- .append(std::to_string(compression_options.enabled))
- .append("; ");
- result.append("max_dict_buffer_bytes=")
- .append(std::to_string(compression_options.max_dict_buffer_bytes))
- .append("; ");
- result.append("use_zstd_dict_trainer=")
- .append(std::to_string(compression_options.use_zstd_dict_trainer))
- .append("; ");
- result.append("max_compressed_bytes_per_kb=")
- .append(std::to_string(compression_options.max_compressed_bytes_per_kb))
- .append("; ");
- result.append("checksum=")
- .append(std::to_string(compression_options.checksum))
- .append("; ");
- return result;
- }
- // compress_format_version can have two values:
- // 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
- // block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
- // way.
- // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
- // start of compressed block. Snappy and XPRESS instead extract the decompressed
- // size from the compressed block itself, same as version 1.
- inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
- size_t length, ::std::string* output) {
- #ifdef SNAPPY
- output->resize(snappy::MaxCompressedLength(length));
- size_t outlen;
- snappy::RawCompress(input, length, &(*output)[0], &outlen);
- output->resize(outlen);
- return true;
- #else
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- inline CacheAllocationPtr Snappy_Uncompress(
- const char* input, size_t length, size_t* uncompressed_size,
- MemoryAllocator* allocator = nullptr) {
- #ifdef SNAPPY
- size_t uncompressed_length = 0;
- if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) {
- return nullptr;
- }
- CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator);
- if (!snappy::RawUncompress(input, length, output.get())) {
- return nullptr;
- }
- *uncompressed_size = uncompressed_length;
- return output;
- #else
- (void)input;
- (void)length;
- (void)uncompressed_size;
- (void)allocator;
- return nullptr;
- #endif
- }
- namespace compression {
- // returns size
- inline size_t PutDecompressedSizeInfo(std::string* output, uint32_t length) {
- PutVarint32(output, length);
- return output->size();
- }
- inline bool GetDecompressedSizeInfo(const char** input_data,
- size_t* input_length,
- uint32_t* output_len) {
- auto new_input_data =
- GetVarint32Ptr(*input_data, *input_data + *input_length, output_len);
- if (new_input_data == nullptr) {
- return false;
- }
- *input_length -= (new_input_data - *input_data);
- *input_data = new_input_data;
- return true;
- }
- } // namespace compression
- // compress_format_version == 1 -- decompressed size is not included in the
- // block header
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- inline bool Zlib_Compress(const CompressionInfo& info,
- uint32_t compress_format_version, const char* input,
- size_t length, ::std::string* output) {
- #ifdef ZLIB
- if (length > std::numeric_limits<uint32_t>::max()) {
- // Can't compress more than 4GB
- return false;
- }
- size_t output_header_len = 0;
- if (compress_format_version == 2) {
- output_header_len = compression::PutDecompressedSizeInfo(
- output, static_cast<uint32_t>(length));
- }
- // The memLevel parameter specifies how much memory should be allocated for
- // the internal compression state.
- // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
- // memLevel=9 uses maximum memory for optimal speed.
- // The default value is 8. See zconf.h for more details.
- static const int memLevel = 8;
- int level;
- if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
- level = Z_DEFAULT_COMPRESSION;
- } else {
- level = info.options().level;
- }
- z_stream _stream;
- memset(&_stream, 0, sizeof(z_stream));
- int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
- memLevel, info.options().strategy);
- if (st != Z_OK) {
- return false;
- }
- Slice compression_dict = info.dict().GetRawDict();
- if (compression_dict.size()) {
- // Initialize the compression library's dictionary
- st = deflateSetDictionary(
- &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
- static_cast<unsigned int>(compression_dict.size()));
- if (st != Z_OK) {
- deflateEnd(&_stream);
- return false;
- }
- }
- // Get an upper bound on the compressed size.
- size_t upper_bound =
- deflateBound(&_stream, static_cast<unsigned long>(length));
- output->resize(output_header_len + upper_bound);
- // Compress the input, and put compressed data in output.
- _stream.next_in = (Bytef*)input;
- _stream.avail_in = static_cast<unsigned int>(length);
- // Initialize the output size.
- _stream.avail_out = static_cast<unsigned int>(upper_bound);
- _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
- bool compressed = false;
- st = deflate(&_stream, Z_FINISH);
- if (st == Z_STREAM_END) {
- compressed = true;
- output->resize(output->size() - _stream.avail_out);
- }
- // The only return value we really care about is Z_STREAM_END.
- // Z_OK means insufficient output space. This means the compression is
- // bigger than decompressed size. Just fail the compression in that case.
- deflateEnd(&_stream);
- return compressed;
- #else
- (void)info;
- (void)compress_format_version;
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is not included in the
- // block header
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- inline CacheAllocationPtr Zlib_Uncompress(
- const UncompressionInfo& info, const char* input_data, size_t input_length,
- size_t* uncompressed_size, uint32_t compress_format_version,
- MemoryAllocator* allocator = nullptr, int windowBits = -14) {
- #ifdef ZLIB
- uint32_t output_len = 0;
- if (compress_format_version == 2) {
- if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
- &output_len)) {
- return nullptr;
- }
- } else {
- // Assume the decompressed data size will 5x of compressed size, but round
- // to the page size
- size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
- output_len = static_cast<uint32_t>(
- std::min(proposed_output_len,
- static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
- }
- z_stream _stream;
- memset(&_stream, 0, sizeof(z_stream));
- // For raw inflate, the windowBits should be -8..-15.
- // If windowBits is bigger than zero, it will use either zlib
- // header or gzip header. Adding 32 to it will do automatic detection.
- int st =
- inflateInit2(&_stream, windowBits > 0 ? windowBits + 32 : windowBits);
- if (st != Z_OK) {
- return nullptr;
- }
- const Slice& compression_dict = info.dict().GetRawDict();
- if (compression_dict.size()) {
- // Initialize the compression library's dictionary
- st = inflateSetDictionary(
- &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
- static_cast<unsigned int>(compression_dict.size()));
- if (st != Z_OK) {
- return nullptr;
- }
- }
- _stream.next_in = (Bytef*)input_data;
- _stream.avail_in = static_cast<unsigned int>(input_length);
- auto output = AllocateBlock(output_len, allocator);
- _stream.next_out = (Bytef*)output.get();
- _stream.avail_out = static_cast<unsigned int>(output_len);
- bool done = false;
- while (!done) {
- st = inflate(&_stream, Z_SYNC_FLUSH);
- switch (st) {
- case Z_STREAM_END:
- done = true;
- break;
- case Z_OK: {
- // No output space. Increase the output space by 20%.
- // We should never run out of output space if
- // compress_format_version == 2
- assert(compress_format_version != 2);
- size_t old_sz = output_len;
- uint32_t output_len_delta = output_len / 5;
- output_len += output_len_delta < 10 ? 10 : output_len_delta;
- auto tmp = AllocateBlock(output_len, allocator);
- memcpy(tmp.get(), output.get(), old_sz);
- output = std::move(tmp);
- // Set more output.
- _stream.next_out = (Bytef*)(output.get() + old_sz);
- _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
- break;
- }
- case Z_BUF_ERROR:
- default:
- inflateEnd(&_stream);
- return nullptr;
- }
- }
- // If we encoded decompressed block size, we should have no bytes left
- assert(compress_format_version != 2 || _stream.avail_out == 0);
- assert(output_len >= _stream.avail_out);
- *uncompressed_size = output_len - _stream.avail_out;
- inflateEnd(&_stream);
- return output;
- #else
- (void)info;
- (void)input_data;
- (void)input_length;
- (void)uncompressed_size;
- (void)compress_format_version;
- (void)allocator;
- (void)windowBits;
- return nullptr;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is not included in the
- // block header
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- inline bool BZip2_Compress(const CompressionInfo& /*info*/,
- uint32_t compress_format_version, const char* input,
- size_t length, ::std::string* output) {
- #ifdef BZIP2
- if (length > std::numeric_limits<uint32_t>::max()) {
- // Can't compress more than 4GB
- return false;
- }
- size_t output_header_len = 0;
- if (compress_format_version == 2) {
- output_header_len = compression::PutDecompressedSizeInfo(
- output, static_cast<uint32_t>(length));
- }
- // Resize output to be the plain data length.
- // This may not be big enough if the compression actually expands data.
- output->resize(output_header_len + length);
- bz_stream _stream;
- memset(&_stream, 0, sizeof(bz_stream));
- // Block size 1 is 100K.
- // 0 is for silent.
- // 30 is the default workFactor
- int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
- if (st != BZ_OK) {
- return false;
- }
- // Compress the input, and put compressed data in output.
- _stream.next_in = (char*)input;
- _stream.avail_in = static_cast<unsigned int>(length);
- // Initialize the output size.
- _stream.avail_out = static_cast<unsigned int>(length);
- _stream.next_out = output->data() + output_header_len;
- bool compressed = false;
- st = BZ2_bzCompress(&_stream, BZ_FINISH);
- if (st == BZ_STREAM_END) {
- compressed = true;
- output->resize(output->size() - _stream.avail_out);
- }
- // The only return value we really care about is BZ_STREAM_END.
- // BZ_FINISH_OK means insufficient output space. This means the compression
- // is bigger than decompressed size. Just fail the compression in that case.
- BZ2_bzCompressEnd(&_stream);
- return compressed;
- #else
- (void)compress_format_version;
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is not included in the
- // block header
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- inline CacheAllocationPtr BZip2_Uncompress(
- const char* input_data, size_t input_length, size_t* uncompressed_size,
- uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
- #ifdef BZIP2
- uint32_t output_len = 0;
- if (compress_format_version == 2) {
- if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
- &output_len)) {
- return nullptr;
- }
- } else {
- // Assume the decompressed data size will 5x of compressed size, but round
- // to the next page size
- size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
- output_len = static_cast<uint32_t>(
- std::min(proposed_output_len,
- static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
- }
- bz_stream _stream;
- memset(&_stream, 0, sizeof(bz_stream));
- int st = BZ2_bzDecompressInit(&_stream, 0, 0);
- if (st != BZ_OK) {
- return nullptr;
- }
- _stream.next_in = (char*)input_data;
- _stream.avail_in = static_cast<unsigned int>(input_length);
- auto output = AllocateBlock(output_len, allocator);
- _stream.next_out = (char*)output.get();
- _stream.avail_out = static_cast<unsigned int>(output_len);
- bool done = false;
- while (!done) {
- st = BZ2_bzDecompress(&_stream);
- switch (st) {
- case BZ_STREAM_END:
- done = true;
- break;
- case BZ_OK: {
- // No output space. Increase the output space by 20%.
- // We should never run out of output space if
- // compress_format_version == 2
- assert(compress_format_version != 2);
- uint32_t old_sz = output_len;
- output_len = output_len * 1.2;
- auto tmp = AllocateBlock(output_len, allocator);
- memcpy(tmp.get(), output.get(), old_sz);
- output = std::move(tmp);
- // Set more output.
- _stream.next_out = (char*)(output.get() + old_sz);
- _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
- break;
- }
- default:
- BZ2_bzDecompressEnd(&_stream);
- return nullptr;
- }
- }
- // If we encoded decompressed block size, we should have no bytes left
- assert(compress_format_version != 2 || _stream.avail_out == 0);
- assert(output_len >= _stream.avail_out);
- *uncompressed_size = output_len - _stream.avail_out;
- BZ2_bzDecompressEnd(&_stream);
- return output;
- #else
- (void)input_data;
- (void)input_length;
- (void)uncompressed_size;
- (void)compress_format_version;
- (void)allocator;
- return nullptr;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is included in the
- // block header using memcpy, which makes database non-portable)
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- inline bool LZ4_Compress(const CompressionInfo& info,
- uint32_t compress_format_version, const char* input,
- size_t length, ::std::string* output) {
- #ifdef LZ4
- if (length > std::numeric_limits<uint32_t>::max()) {
- // Can't compress more than 4GB
- return false;
- }
- size_t output_header_len = 0;
- if (compress_format_version == 2) {
- // new encoding, using varint32 to store size information
- output_header_len = compression::PutDecompressedSizeInfo(
- output, static_cast<uint32_t>(length));
- } else {
- // legacy encoding, which is not really portable (depends on big/little
- // endianness)
- output_header_len = 8;
- output->resize(output_header_len);
- char* p = const_cast<char*>(output->c_str());
- memcpy(p, &length, sizeof(length));
- }
- int compress_bound = LZ4_compressBound(static_cast<int>(length));
- output->resize(static_cast<size_t>(output_header_len + compress_bound));
- int outlen;
- #if LZ4_VERSION_NUMBER >= 10400 // r124+
- LZ4_stream_t* stream = LZ4_createStream();
- Slice compression_dict = info.dict().GetRawDict();
- if (compression_dict.size()) {
- LZ4_loadDict(stream, compression_dict.data(),
- static_cast<int>(compression_dict.size()));
- }
- #if LZ4_VERSION_NUMBER >= 10700 // r129+
- int acceleration;
- if (info.options().level < 0) {
- acceleration = -info.options().level;
- } else {
- acceleration = 1;
- }
- outlen = LZ4_compress_fast_continue(
- stream, input, &(*output)[output_header_len], static_cast<int>(length),
- compress_bound, acceleration);
- #else // up to r128
- outlen = LZ4_compress_limitedOutput_continue(
- stream, input, &(*output)[output_header_len], static_cast<int>(length),
- compress_bound);
- #endif
- LZ4_freeStream(stream);
- #else // up to r123
- outlen = LZ4_compress_limitedOutput(input, &(*output)[output_header_len],
- static_cast<int>(length), compress_bound);
- #endif // LZ4_VERSION_NUMBER >= 10400
- if (outlen == 0) {
- return false;
- }
- output->resize(static_cast<size_t>(output_header_len + outlen));
- return true;
- #else // LZ4
- (void)info;
- (void)compress_format_version;
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is included in the
- // block header using memcpy, which makes database non-portable)
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
- const char* input_data,
- size_t input_length,
- size_t* uncompressed_size,
- uint32_t compress_format_version,
- MemoryAllocator* allocator = nullptr) {
- #ifdef LZ4
- uint32_t output_len = 0;
- if (compress_format_version == 2) {
- // new encoding, using varint32 to store size information
- if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
- &output_len)) {
- return nullptr;
- }
- } else {
- // legacy encoding, which is not really portable (depends on big/little
- // endianness)
- if (input_length < 8) {
- return nullptr;
- }
- if (port::kLittleEndian) {
- memcpy(&output_len, input_data, sizeof(output_len));
- } else {
- memcpy(&output_len, input_data + 4, sizeof(output_len));
- }
- input_length -= 8;
- input_data += 8;
- }
- auto output = AllocateBlock(output_len, allocator);
- int decompress_bytes = 0;
- #if LZ4_VERSION_NUMBER >= 10400 // r124+
- LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
- const Slice& compression_dict = info.dict().GetRawDict();
- if (compression_dict.size()) {
- LZ4_setStreamDecode(stream, compression_dict.data(),
- static_cast<int>(compression_dict.size()));
- }
- decompress_bytes = LZ4_decompress_safe_continue(
- stream, input_data, output.get(), static_cast<int>(input_length),
- static_cast<int>(output_len));
- LZ4_freeStreamDecode(stream);
- #else // up to r123
- decompress_bytes = LZ4_decompress_safe(input_data, output.get(),
- static_cast<int>(input_length),
- static_cast<int>(output_len));
- #endif // LZ4_VERSION_NUMBER >= 10400
- if (decompress_bytes < 0) {
- return nullptr;
- }
- assert(decompress_bytes == static_cast<int>(output_len));
- *uncompressed_size = decompress_bytes;
- return output;
- #else // LZ4
- (void)info;
- (void)input_data;
- (void)input_length;
- (void)uncompressed_size;
- (void)compress_format_version;
- (void)allocator;
- return nullptr;
- #endif
- }
- // compress_format_version == 1 -- decompressed size is included in the
- // block header using memcpy, which makes database non-portable)
- // compress_format_version == 2 -- decompressed size is included in the block
- // header in varint32 format
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- inline bool LZ4HC_Compress(const CompressionInfo& info,
- uint32_t compress_format_version, const char* input,
- size_t length, ::std::string* output) {
- #ifdef LZ4
- if (length > std::numeric_limits<uint32_t>::max()) {
- // Can't compress more than 4GB
- return false;
- }
- size_t output_header_len = 0;
- if (compress_format_version == 2) {
- // new encoding, using varint32 to store size information
- output_header_len = compression::PutDecompressedSizeInfo(
- output, static_cast<uint32_t>(length));
- } else {
- // legacy encoding, which is not really portable (depends on big/little
- // endianness)
- output_header_len = 8;
- output->resize(output_header_len);
- char* p = const_cast<char*>(output->c_str());
- memcpy(p, &length, sizeof(length));
- }
- int compress_bound = LZ4_compressBound(static_cast<int>(length));
- output->resize(static_cast<size_t>(output_header_len + compress_bound));
- int outlen;
- int level;
- if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
- level = 0; // lz4hc.h says any value < 1 will be sanitized to default
- } else {
- level = info.options().level;
- }
- #if LZ4_VERSION_NUMBER >= 10400 // r124+
- LZ4_streamHC_t* stream = LZ4_createStreamHC();
- LZ4_resetStreamHC(stream, level);
- Slice compression_dict = info.dict().GetRawDict();
- const char* compression_dict_data =
- compression_dict.size() > 0 ? compression_dict.data() : nullptr;
- size_t compression_dict_size = compression_dict.size();
- if (compression_dict_data != nullptr) {
- LZ4_loadDictHC(stream, compression_dict_data,
- static_cast<int>(compression_dict_size));
- }
- #if LZ4_VERSION_NUMBER >= 10700 // r129+
- outlen =
- LZ4_compress_HC_continue(stream, input, &(*output)[output_header_len],
- static_cast<int>(length), compress_bound);
- #else // r124-r128
- outlen = LZ4_compressHC_limitedOutput_continue(
- stream, input, &(*output)[output_header_len], static_cast<int>(length),
- compress_bound);
- #endif // LZ4_VERSION_NUMBER >= 10700
- LZ4_freeStreamHC(stream);
- #elif LZ4_VERSION_MAJOR // r113-r123
- outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len],
- static_cast<int>(length),
- compress_bound, level);
- #else // up to r112
- outlen =
- LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len],
- static_cast<int>(length), compress_bound);
- #endif // LZ4_VERSION_NUMBER >= 10400
- if (outlen == 0) {
- return false;
- }
- output->resize(static_cast<size_t>(output_header_len + outlen));
- return true;
- #else // LZ4
- (void)info;
- (void)compress_format_version;
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- #ifdef XPRESS
- inline bool XPRESS_Compress(const char* input, size_t length,
- std::string* output) {
- return port::xpress::Compress(input, length, output);
- }
- #else
- inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
- std::string* /*output*/) {
- return false;
- }
- #endif
- #ifdef XPRESS
- inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
- size_t* uncompressed_size) {
- return port::xpress::Decompress(input_data, input_length, uncompressed_size);
- }
- #else
- inline char* XPRESS_Uncompress(const char* /*input_data*/,
- size_t /*input_length*/,
- size_t* /*uncompressed_size*/) {
- return nullptr;
- }
- #endif
- inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
- size_t length, ::std::string* output) {
- #ifdef ZSTD
- if (length > std::numeric_limits<uint32_t>::max()) {
- // Can't compress more than 4GB
- return false;
- }
- size_t output_header_len = compression::PutDecompressedSizeInfo(
- output, static_cast<uint32_t>(length));
- size_t compressBound = ZSTD_compressBound(length);
- // TODO: use resize_and_overwrite with c++23
- output->resize(static_cast<size_t>(output_header_len + compressBound));
- size_t outlen = 0;
- ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
- assert(context != nullptr);
- if (info.dict().GetDigestedZstdCDict() != nullptr) {
- ZSTD_CCtx_refCDict(context, info.dict().GetDigestedZstdCDict());
- } else {
- ZSTD_CCtx_loadDictionary(context, info.dict().GetRawDict().data(),
- info.dict().GetRawDict().size());
- }
- // Compression level is set in `contex` during CreateNativeContext()
- outlen = ZSTD_compress2(context, &(*output)[output_header_len], compressBound,
- input, length);
- if (outlen == 0) {
- return false;
- }
- output->resize(output_header_len + outlen);
- return true;
- #else // ZSTD
- (void)info;
- (void)input;
- (void)length;
- (void)output;
- return false;
- #endif
- }
- // @param compression_dict Data for presetting the compression library's
- // dictionary.
- // @param error_message If not null, will be set if decompression fails.
- //
- // Returns nullptr if decompression fails.
- inline CacheAllocationPtr ZSTD_Uncompress(
- const UncompressionInfo& info, const char* input_data, size_t input_length,
- size_t* uncompressed_size, MemoryAllocator* allocator = nullptr,
- const char** error_message = nullptr) {
- #ifdef ZSTD
- static const char* const kErrorDecodeOutputSize =
- "Cannot decode output size.";
- static const char* const kErrorOutputLenMismatch =
- "Decompressed size does not match header.";
- uint32_t output_len = 0;
- if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
- &output_len)) {
- if (error_message) {
- *error_message = kErrorDecodeOutputSize;
- }
- return nullptr;
- }
- CacheAllocationPtr output = AllocateBlock(output_len, allocator);
- size_t actual_output_length = 0;
- ZSTD_DCtx* context = info.context().GetZSTDContext();
- assert(context != nullptr);
- #ifdef ROCKSDB_ZSTD_DDICT
- if (info.dict().GetDigestedZstdDDict() != nullptr) {
- actual_output_length = ZSTD_decompress_usingDDict(
- context, output.get(), output_len, input_data, input_length,
- info.dict().GetDigestedZstdDDict());
- } else {
- #endif // ROCKSDB_ZSTD_DDICT
- actual_output_length = ZSTD_decompress_usingDict(
- context, output.get(), output_len, input_data, input_length,
- info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
- #ifdef ROCKSDB_ZSTD_DDICT
- }
- #endif // ROCKSDB_ZSTD_DDICT
- if (ZSTD_isError(actual_output_length)) {
- if (error_message) {
- *error_message = ZSTD_getErrorName(actual_output_length);
- }
- return nullptr;
- } else if (actual_output_length != output_len) {
- if (error_message) {
- *error_message = kErrorOutputLenMismatch;
- }
- return nullptr;
- }
- *uncompressed_size = actual_output_length;
- return output;
- #else // ZSTD
- (void)info;
- (void)input_data;
- (void)input_length;
- (void)uncompressed_size;
- (void)allocator;
- (void)error_message;
- return nullptr;
- #endif
- }
- inline bool ZSTD_TrainDictionarySupported() {
- #ifdef ZSTD
- // NB: Dictionary trainer is available since v0.6.1 for static linking, but
- // not available for dynamic linking until v1.1.3. See ZSTD_VERSION_NUMBER
- // check above.
- return true;
- #else
- return false;
- #endif
- }
- inline std::string ZSTD_TrainDictionary(const std::string& samples,
- const std::vector<size_t>& sample_lens,
- size_t max_dict_bytes) {
- #ifdef ZSTD
- assert(samples.empty() == sample_lens.empty());
- if (samples.empty()) {
- return "";
- }
- std::string dict_data(max_dict_bytes, '\0');
- size_t dict_len = ZDICT_trainFromBuffer(
- &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
- static_cast<unsigned>(sample_lens.size()));
- if (ZDICT_isError(dict_len)) {
- return "";
- }
- assert(dict_len <= max_dict_bytes);
- dict_data.resize(dict_len);
- return dict_data;
- #else
- assert(false);
- (void)samples;
- (void)sample_lens;
- (void)max_dict_bytes;
- return "";
- #endif // ZSTD
- }
- inline std::string ZSTD_TrainDictionary(const std::string& samples,
- size_t sample_len_shift,
- size_t max_dict_bytes) {
- #ifdef ZSTD
- // skips potential partial sample at the end of "samples"
- size_t num_samples = samples.size() >> sample_len_shift;
- std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
- return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
- #else
- assert(false);
- (void)samples;
- (void)sample_len_shift;
- (void)max_dict_bytes;
- return "";
- #endif // ZSTD
- }
- inline bool ZSTD_FinalizeDictionarySupported() {
- #ifdef ROCKSDB_ZDICT_FINALIZE
- return true;
- #else
- return false;
- #endif
- }
- inline std::string ZSTD_FinalizeDictionary(
- const std::string& samples, const std::vector<size_t>& sample_lens,
- size_t max_dict_bytes, int level) {
- #ifdef ROCKSDB_ZDICT_FINALIZE
- assert(samples.empty() == sample_lens.empty());
- if (samples.empty()) {
- return "";
- }
- if (level == CompressionOptions::kDefaultCompressionLevel) {
- // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
- level = ZSTD_CLEVEL_DEFAULT;
- }
- std::string dict_data(max_dict_bytes, '\0');
- size_t dict_len = ZDICT_finalizeDictionary(
- dict_data.data(), max_dict_bytes, samples.data(),
- std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
- samples.data(), sample_lens.data(),
- static_cast<unsigned>(sample_lens.size()),
- {level, 0 /* notificationLevel */, 0 /* dictID */});
- if (ZDICT_isError(dict_len)) {
- return "";
- } else {
- assert(dict_len <= max_dict_bytes);
- dict_data.resize(dict_len);
- return dict_data;
- }
- #else
- assert(false);
- (void)samples;
- (void)sample_lens;
- (void)max_dict_bytes;
- (void)level;
- return "";
- #endif // ROCKSDB_ZDICT_FINALIZE
- }
- inline bool OLD_CompressData(const Slice& raw,
- const CompressionInfo& compression_info,
- uint32_t compress_format_version,
- std::string* compressed_output) {
- bool ret = false;
- // Will return compressed block contents if (1) the compression method is
- // supported in this platform and (2) the compression rate is "good enough".
- switch (compression_info.type()) {
- case kSnappyCompression:
- ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
- compressed_output);
- break;
- case kZlibCompression:
- ret = Zlib_Compress(compression_info, compress_format_version, raw.data(),
- raw.size(), compressed_output);
- break;
- case kBZip2Compression:
- ret = BZip2_Compress(compression_info, compress_format_version,
- raw.data(), raw.size(), compressed_output);
- break;
- case kLZ4Compression:
- ret = LZ4_Compress(compression_info, compress_format_version, raw.data(),
- raw.size(), compressed_output);
- break;
- case kLZ4HCCompression:
- ret = LZ4HC_Compress(compression_info, compress_format_version,
- raw.data(), raw.size(), compressed_output);
- break;
- case kXpressCompression:
- ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
- break;
- case kZSTD:
- ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
- compressed_output);
- break;
- default:
- // Do not recognize this compression type
- break;
- }
- TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
- static_cast<void*>(&ret));
- return ret;
- }
- inline CacheAllocationPtr OLD_UncompressData(
- const UncompressionInfo& uncompression_info, const char* data, size_t n,
- size_t* uncompressed_size, uint32_t compress_format_version,
- MemoryAllocator* allocator = nullptr,
- const char** error_message = nullptr) {
- switch (uncompression_info.type()) {
- case kSnappyCompression:
- return Snappy_Uncompress(data, n, uncompressed_size, allocator);
- case kZlibCompression:
- return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size,
- compress_format_version, allocator);
- case kBZip2Compression:
- return BZip2_Uncompress(data, n, uncompressed_size,
- compress_format_version, allocator);
- case kLZ4Compression:
- case kLZ4HCCompression:
- return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size,
- compress_format_version, allocator);
- case kXpressCompression:
- // XPRESS allocates memory internally, thus no support for custom
- // allocator.
- return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
- case kZSTD:
- // TODO(cbi): error message handling for other compression algorithms.
- return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
- allocator, error_message);
- default:
- return CacheAllocationPtr();
- }
- }
- // ***********************************************************************
- // BEGIN built-in implementation of customization interface
- // ***********************************************************************
- // NOTE: to avoid compression API depending on block-based table API, uses
- // its own format version. See internal function GetCompressFormatForVersion()
- const std::shared_ptr<CompressionManager>& GetBuiltinCompressionManager(
- int compression_format_version);
- // ***********************************************************************
- // END built-in implementation of customization interface
- // ***********************************************************************
- // Records the compression type for subsequent WAL records.
- class CompressionTypeRecord {
- public:
- explicit CompressionTypeRecord(CompressionType compression_type)
- : compression_type_(compression_type) {}
- CompressionType GetCompressionType() const { return compression_type_; }
- inline void EncodeTo(std::string* dst) const {
- assert(dst != nullptr);
- PutFixed32(dst, compression_type_);
- }
- inline Status DecodeFrom(Slice* src) {
- constexpr char class_name[] = "CompressionTypeRecord";
- uint32_t val;
- if (!GetFixed32(src, &val)) {
- return Status::Corruption(class_name,
- "Error decoding WAL compression type");
- }
- CompressionType compression_type = static_cast<CompressionType>(val);
- if (!StreamingCompressionTypeSupported(compression_type)) {
- return Status::Corruption(class_name,
- "WAL compression type not supported");
- }
- compression_type_ = compression_type;
- return Status::OK();
- }
- inline std::string DebugString() const {
- return "compression_type: " + CompressionTypeToString(compression_type_);
- }
- private:
- CompressionType compression_type_;
- };
- // Base class to implement compression for a stream of buffers.
- // Instantiate an implementation of the class using Create() with the
- // compression type and use Compress() repeatedly.
- // The output buffer needs to be at least max_output_len.
- // Call Reset() in between frame boundaries or in case of an error.
- // NOTE: This class is not thread safe.
- class StreamingCompress {
- public:
- StreamingCompress(CompressionType compression_type,
- const CompressionOptions& opts,
- uint32_t compress_format_version, size_t max_output_len)
- : compression_type_(compression_type),
- opts_(opts),
- compress_format_version_(compress_format_version),
- max_output_len_(max_output_len) {}
- virtual ~StreamingCompress() = default;
- // compress should be called repeatedly with the same input till the method
- // returns 0
- // Parameters:
- // input - buffer to compress
- // input_size - size of input buffer
- // output - compressed buffer allocated by caller, should be at least
- // max_output_len
- // output_size - size of the output buffer
- // Returns -1 for errors, the remaining size of the input buffer that needs
- // to be compressed
- virtual int Compress(const char* input, size_t input_size, char* output,
- size_t* output_pos) = 0;
- // static method to create object of a class inherited from
- // StreamingCompress based on the actual compression type.
- static StreamingCompress* Create(CompressionType compression_type,
- const CompressionOptions& opts,
- uint32_t compress_format_version,
- size_t max_output_len);
- virtual void Reset() = 0;
- protected:
- const CompressionType compression_type_;
- const CompressionOptions opts_;
- const uint32_t compress_format_version_;
- const size_t max_output_len_;
- };
- // Base class to uncompress a stream of compressed buffers.
- // Instantiate an implementation of the class using Create() with the
- // compression type and use Uncompress() repeatedly.
- // The output buffer needs to be at least max_output_len.
- // Call Reset() in between frame boundaries or in case of an error.
- // NOTE: This class is not thread safe.
- class StreamingUncompress {
- public:
- StreamingUncompress(CompressionType compression_type,
- uint32_t compress_format_version, size_t max_output_len)
- : compression_type_(compression_type),
- compress_format_version_(compress_format_version),
- max_output_len_(max_output_len) {}
- virtual ~StreamingUncompress() = default;
- // Uncompress can be called repeatedly to progressively process the same
- // input buffer, or can be called with a new input buffer. When the input
- // buffer is not fully consumed, the return value is > 0 or output_size
- // == max_output_len. When calling uncompress to continue processing the
- // same input buffer, the input argument should be nullptr.
- // Parameters:
- // input - buffer to uncompress
- // input_size - size of input buffer
- // output - uncompressed buffer allocated by caller, should be at least
- // max_output_len
- // output_size - size of the output buffer
- // Returns -1 for errors, remaining input to be processed otherwise.
- virtual int Uncompress(const char* input, size_t input_size, char* output,
- size_t* output_pos) = 0;
- static StreamingUncompress* Create(CompressionType compression_type,
- uint32_t compress_format_version,
- size_t max_output_len);
- virtual void Reset() = 0;
- protected:
- CompressionType compression_type_;
- uint32_t compress_format_version_;
- size_t max_output_len_;
- };
- class ZSTDStreamingCompress final : public StreamingCompress {
- public:
- explicit ZSTDStreamingCompress(const CompressionOptions& opts,
- uint32_t compress_format_version,
- size_t max_output_len)
- : StreamingCompress(kZSTD, opts, compress_format_version,
- max_output_len) {
- #ifdef ZSTD
- cctx_ = ZSTD_createCCtx();
- // Each compressed frame will have a checksum
- ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
- assert(cctx_ != nullptr);
- input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
- #endif
- }
- ~ZSTDStreamingCompress() override {
- #ifdef ZSTD
- ZSTD_freeCCtx(cctx_);
- #endif
- }
- int Compress(const char* input, size_t input_size, char* output,
- size_t* output_pos) override;
- void Reset() override;
- #ifdef ZSTD
- ZSTD_CCtx* cctx_;
- ZSTD_inBuffer input_buffer_;
- #endif
- };
- class ZSTDStreamingUncompress final : public StreamingUncompress {
- public:
- explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
- size_t max_output_len)
- : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
- #ifdef ZSTD
- dctx_ = ZSTD_createDCtx();
- assert(dctx_ != nullptr);
- input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
- #endif
- }
- ~ZSTDStreamingUncompress() override {
- #ifdef ZSTD
- ZSTD_freeDCtx(dctx_);
- #endif
- }
- int Uncompress(const char* input, size_t input_size, char* output,
- size_t* output_size) override;
- void Reset() override;
- private:
- #ifdef ZSTD
- ZSTD_DCtx* dctx_;
- ZSTD_inBuffer input_buffer_;
- #endif
- };
- } // namespace ROCKSDB_NAMESPACE
|