| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- // Copyright (c) Meta Platforms, Inc. and affiliates.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Defines auto skip compressor wrapper which intelligently decides bypassing
- // compression based on past data
- // Defines CostAwareCompressor which currently tries to predict the cpu and io
- // cost of the compression
- #pragma once
- #include <memory>
- #include "rocksdb/advanced_compression.h"
- namespace ROCKSDB_NAMESPACE {
- // Auto Skip Compression Components
- // Predict rejection probability using a moving window approach
- class CompressionRejectionProbabilityPredictor {
- public:
- explicit CompressionRejectionProbabilityPredictor(int window_size)
- : pred_rejection_prob_percentage_(0),
- rejected_count_(0),
- compressed_count_(0),
- window_size_(window_size) {}
- int Predict() const;
- bool Record(Slice uncompressed_block_data, char* compressed_output,
- size_t compressed_output_size, CompressionType compression_type);
- size_t attempted_compression_count() const;
- protected:
- int pred_rejection_prob_percentage_;
- size_t rejected_count_;
- size_t compressed_count_;
- size_t window_size_;
- };
- class AutoSkipWorkingArea : public Compressor::WorkingArea {
- public:
- explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
- : wrapped(std::move(wa)),
- predictor(
- std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
- ~AutoSkipWorkingArea() {}
- AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
- AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
- AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
- : wrapped(std::move(other.wrapped)),
- predictor(std::move(other.predictor)) {}
- AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
- if (this != &other) {
- wrapped = std::move(other.wrapped);
- predictor = std::move(other.predictor);
- }
- return *this;
- }
- Compressor::ManagedWorkingArea wrapped;
- std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
- };
- class AutoSkipCompressorWrapper : public CompressorWrapper {
- public:
- const char* Name() const override;
- explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
- const CompressionOptions& opts);
- Status CompressBlock(Slice uncompressed_data, char* compressed_output,
- size_t* compressed_output_size,
- CompressionType* out_compression_type,
- ManagedWorkingArea* wa) override;
- ManagedWorkingArea ObtainWorkingArea() override;
- void ReleaseWorkingArea(WorkingArea* wa) override;
- private:
- Status CompressBlockAndRecord(Slice uncompressed_data,
- char* compressed_output,
- size_t* compressed_output_size,
- CompressionType* out_compression_type,
- AutoSkipWorkingArea* wa);
- static constexpr int kExplorationPercentage = 10;
- static constexpr int kProbabilityCutOff = 50;
- const CompressionOptions opts_;
- };
- class AutoSkipCompressorManager : public CompressionManagerWrapper {
- using CompressionManagerWrapper::CompressionManagerWrapper;
- const char* Name() const override;
- std::unique_ptr<Compressor> GetCompressorForSST(
- const FilterBuildingContext& context, const CompressionOptions& opts,
- CompressionType preferred) override;
- };
- // Cost Aware Components
- template <typename T>
- class WindowAveragePredictor {
- public:
- explicit WindowAveragePredictor(int window_size)
- : sum_(0), prediction_(0), count_(0), kWindowSize(window_size) {}
- T Predict() { return prediction_; }
- bool Record(T data) {
- sum_ += data;
- count_++;
- if (count_ >= kWindowSize) {
- prediction_ = sum_ / count_;
- sum_ = 0;
- count_ = 0;
- }
- return true;
- }
- void SetPrediction(T prediction) { prediction_ = prediction; }
- private:
- T sum_;
- T prediction_;
- int count_;
- const int kWindowSize;
- };
- using IOCostPredictor = WindowAveragePredictor<size_t>;
- using CPUUtilPredictor = WindowAveragePredictor<uint64_t>;
- struct IOCPUCostPredictor {
- explicit IOCPUCostPredictor(int window_size)
- : IOPredictor(window_size), CPUPredictor(window_size) {}
- IOCostPredictor IOPredictor;
- CPUUtilPredictor CPUPredictor;
- };
- class CostAwareWorkingArea : public Compressor::WorkingArea {
- public:
- explicit CostAwareWorkingArea(Compressor::ManagedWorkingArea&& wa)
- : wrapped_(std::move(wa)) {}
- ~CostAwareWorkingArea() {}
- CostAwareWorkingArea(const CostAwareWorkingArea&) = delete;
- CostAwareWorkingArea& operator=(const CostAwareWorkingArea&) = delete;
- CostAwareWorkingArea(CostAwareWorkingArea&& other) noexcept
- : wrapped_(std::move(other.wrapped_)) {}
- CostAwareWorkingArea& operator=(CostAwareWorkingArea&& other) noexcept {
- if (this != &other) {
- wrapped_ = std::move(other.wrapped_);
- cost_predictors_ = std::move(other.cost_predictors_);
- }
- return *this;
- }
- Compressor::ManagedWorkingArea wrapped_;
- std::vector<std::vector<IOCPUCostPredictor*>> cost_predictors_;
- };
- class CostAwareCompressor : public Compressor {
- public:
- explicit CostAwareCompressor(const CompressionOptions& opts);
- const char* Name() const override;
- size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override;
- Slice GetSerializedDict() const override;
- CompressionType GetPreferredCompressionType() const override;
- ManagedWorkingArea ObtainWorkingArea() override;
- std::unique_ptr<Compressor> MaybeCloneSpecialized(
- CacheEntryRole block_type, DictSampleArgs&& dict_samples) override;
- Status CompressBlock(Slice uncompressed_data, char* compressed_output,
- size_t* compressed_output_size,
- CompressionType* out_compression_type,
- ManagedWorkingArea* wa) override;
- void ReleaseWorkingArea(WorkingArea* wa) override;
- private:
- Status CompressBlockAndRecord(size_t choosen_compression_type,
- size_t compresion_level_ptr,
- Slice uncompressed_data,
- char* compressed_output,
- size_t* compressed_output_size,
- CompressionType* out_compression_type,
- CostAwareWorkingArea* wa);
- static constexpr int kExplorationPercentage = 10;
- static constexpr int kProbabilityCutOff = 50;
- // This is the vector containing the list of compression levels that
- // CostAwareCompressor will use create compressor and predicts the cost
- // The vector contains list of compression level for compression algorithm in
- // the order defined by enum CompressionType
- static const std::vector<std::vector<int>> kCompressionLevels;
- const CompressionOptions opts_;
- std::vector<std::vector<std::unique_ptr<Compressor>>> allcompressors_;
- std::vector<std::pair<size_t, size_t>> allcompressors_index_;
- };
- class CostAwareCompressorManager : public CompressionManagerWrapper {
- using CompressionManagerWrapper::CompressionManagerWrapper;
- const char* Name() const override;
- std::unique_ptr<Compressor> GetCompressorForSST(
- const FilterBuildingContext& context, const CompressionOptions& opts,
- CompressionType preferred) override;
- };
- } // namespace ROCKSDB_NAMESPACE
|