auto_tune_compressor.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Defines auto skip compressor wrapper which intelligently decides bypassing
  7. // compression based on past data
  8. // Defines CostAwareCompressor which currently tries to predict the cpu and io
  9. // cost of the compression
  10. #pragma once
  11. #include <memory>
  12. #include "rocksdb/advanced_compression.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. // Auto Skip Compression Components
  15. // Predict rejection probability using a moving window approach
  16. class CompressionRejectionProbabilityPredictor {
  17. public:
  18. explicit CompressionRejectionProbabilityPredictor(int window_size)
  19. : pred_rejection_prob_percentage_(0),
  20. rejected_count_(0),
  21. compressed_count_(0),
  22. window_size_(window_size) {}
  23. int Predict() const;
  24. bool Record(Slice uncompressed_block_data, char* compressed_output,
  25. size_t compressed_output_size, CompressionType compression_type);
  26. size_t attempted_compression_count() const;
  27. protected:
  28. int pred_rejection_prob_percentage_;
  29. size_t rejected_count_;
  30. size_t compressed_count_;
  31. size_t window_size_;
  32. };
  33. class AutoSkipWorkingArea : public Compressor::WorkingArea {
  34. public:
  35. explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
  36. : wrapped(std::move(wa)),
  37. predictor(
  38. std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
  39. ~AutoSkipWorkingArea() {}
  40. AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
  41. AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
  42. AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
  43. : wrapped(std::move(other.wrapped)),
  44. predictor(std::move(other.predictor)) {}
  45. AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
  46. if (this != &other) {
  47. wrapped = std::move(other.wrapped);
  48. predictor = std::move(other.predictor);
  49. }
  50. return *this;
  51. }
  52. Compressor::ManagedWorkingArea wrapped;
  53. std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
  54. };
  55. class AutoSkipCompressorWrapper : public CompressorWrapper {
  56. public:
  57. const char* Name() const override;
  58. explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
  59. const CompressionOptions& opts);
  60. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  61. size_t* compressed_output_size,
  62. CompressionType* out_compression_type,
  63. ManagedWorkingArea* wa) override;
  64. ManagedWorkingArea ObtainWorkingArea() override;
  65. void ReleaseWorkingArea(WorkingArea* wa) override;
  66. private:
  67. Status CompressBlockAndRecord(Slice uncompressed_data,
  68. char* compressed_output,
  69. size_t* compressed_output_size,
  70. CompressionType* out_compression_type,
  71. AutoSkipWorkingArea* wa);
  72. static constexpr int kExplorationPercentage = 10;
  73. static constexpr int kProbabilityCutOff = 50;
  74. const CompressionOptions opts_;
  75. };
  76. class AutoSkipCompressorManager : public CompressionManagerWrapper {
  77. using CompressionManagerWrapper::CompressionManagerWrapper;
  78. const char* Name() const override;
  79. std::unique_ptr<Compressor> GetCompressorForSST(
  80. const FilterBuildingContext& context, const CompressionOptions& opts,
  81. CompressionType preferred) override;
  82. };
  83. // Cost Aware Components
  84. template <typename T>
  85. class WindowAveragePredictor {
  86. public:
  87. explicit WindowAveragePredictor(int window_size)
  88. : sum_(0), prediction_(0), count_(0), kWindowSize(window_size) {}
  89. T Predict() { return prediction_; }
  90. bool Record(T data) {
  91. sum_ += data;
  92. count_++;
  93. if (count_ >= kWindowSize) {
  94. prediction_ = sum_ / count_;
  95. sum_ = 0;
  96. count_ = 0;
  97. }
  98. return true;
  99. }
  100. void SetPrediction(T prediction) { prediction_ = prediction; }
  101. private:
  102. T sum_;
  103. T prediction_;
  104. int count_;
  105. const int kWindowSize;
  106. };
  107. using IOCostPredictor = WindowAveragePredictor<size_t>;
  108. using CPUUtilPredictor = WindowAveragePredictor<uint64_t>;
  109. struct IOCPUCostPredictor {
  110. explicit IOCPUCostPredictor(int window_size)
  111. : IOPredictor(window_size), CPUPredictor(window_size) {}
  112. IOCostPredictor IOPredictor;
  113. CPUUtilPredictor CPUPredictor;
  114. };
  115. class CostAwareWorkingArea : public Compressor::WorkingArea {
  116. public:
  117. explicit CostAwareWorkingArea(Compressor::ManagedWorkingArea&& wa)
  118. : wrapped_(std::move(wa)) {}
  119. ~CostAwareWorkingArea() {}
  120. CostAwareWorkingArea(const CostAwareWorkingArea&) = delete;
  121. CostAwareWorkingArea& operator=(const CostAwareWorkingArea&) = delete;
  122. CostAwareWorkingArea(CostAwareWorkingArea&& other) noexcept
  123. : wrapped_(std::move(other.wrapped_)) {}
  124. CostAwareWorkingArea& operator=(CostAwareWorkingArea&& other) noexcept {
  125. if (this != &other) {
  126. wrapped_ = std::move(other.wrapped_);
  127. cost_predictors_ = std::move(other.cost_predictors_);
  128. }
  129. return *this;
  130. }
  131. Compressor::ManagedWorkingArea wrapped_;
  132. std::vector<std::vector<IOCPUCostPredictor*>> cost_predictors_;
  133. };
  134. class CostAwareCompressor : public Compressor {
  135. public:
  136. explicit CostAwareCompressor(const CompressionOptions& opts);
  137. const char* Name() const override;
  138. size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override;
  139. Slice GetSerializedDict() const override;
  140. CompressionType GetPreferredCompressionType() const override;
  141. ManagedWorkingArea ObtainWorkingArea() override;
  142. std::unique_ptr<Compressor> MaybeCloneSpecialized(
  143. CacheEntryRole block_type, DictSampleArgs&& dict_samples) override;
  144. Status CompressBlock(Slice uncompressed_data, char* compressed_output,
  145. size_t* compressed_output_size,
  146. CompressionType* out_compression_type,
  147. ManagedWorkingArea* wa) override;
  148. void ReleaseWorkingArea(WorkingArea* wa) override;
  149. private:
  150. Status CompressBlockAndRecord(size_t choosen_compression_type,
  151. size_t compresion_level_ptr,
  152. Slice uncompressed_data,
  153. char* compressed_output,
  154. size_t* compressed_output_size,
  155. CompressionType* out_compression_type,
  156. CostAwareWorkingArea* wa);
  157. static constexpr int kExplorationPercentage = 10;
  158. static constexpr int kProbabilityCutOff = 50;
  159. // This is the vector containing the list of compression levels that
  160. // CostAwareCompressor will use create compressor and predicts the cost
  161. // The vector contains list of compression level for compression algorithm in
  162. // the order defined by enum CompressionType
  163. static const std::vector<std::vector<int>> kCompressionLevels;
  164. const CompressionOptions opts_;
  165. std::vector<std::vector<std::unique_ptr<Compressor>>> allcompressors_;
  166. std::vector<std::pair<size_t, size_t>> allcompressors_index_;
  167. };
  168. class CostAwareCompressorManager : public CompressionManagerWrapper {
  169. using CompressionManagerWrapper::CompressionManagerWrapper;
  170. const char* Name() const override;
  171. std::unique_ptr<Compressor> GetCompressorForSST(
  172. const FilterBuildingContext& context, const CompressionOptions& opts,
  173. CompressionType preferred) override;
  174. };
  175. } // namespace ROCKSDB_NAMESPACE