auto_tune_compressor.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "util/auto_tune_compressor.h"
  7. #include "options/options_helper.h"
  8. #include "rocksdb/advanced_compression.h"
  9. #include "test_util/sync_point.h"
  10. #include "util/random.h"
  11. #include "util/stop_watch.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. const std::vector<std::vector<int>> CostAwareCompressor::kCompressionLevels{
  14. {0}, // KSnappyCompression
  15. {}, // kZlibCompression
  16. {}, // kBZip2Compression
  17. {1, 4, 9}, // kLZ4Compression
  18. {1, 4, 9}, // klZ4HCCompression
  19. {}, // kXpressCompression
  20. {1, 15, 22} // kZSTD
  21. };
  22. int CompressionRejectionProbabilityPredictor::Predict() const {
  23. return pred_rejection_prob_percentage_;
  24. }
  25. size_t CompressionRejectionProbabilityPredictor::attempted_compression_count()
  26. const {
  27. return rejected_count_ + compressed_count_;
  28. }
  29. bool CompressionRejectionProbabilityPredictor::Record(
  30. Slice /*uncompressed_block_data*/, char* /*compressed_output*/,
  31. size_t /*compressed_output_size*/, CompressionType compression_type) {
  32. if (compression_type == kNoCompression) {
  33. rejected_count_++;
  34. } else {
  35. compressed_count_++;
  36. }
  37. auto attempted = attempted_compression_count();
  38. if (attempted >= window_size_) {
  39. pred_rejection_prob_percentage_ =
  40. static_cast<int>(rejected_count_ * 100 / attempted);
  41. compressed_count_ = 0;
  42. rejected_count_ = 0;
  43. assert(attempted_compression_count() == 0);
  44. }
  45. return true;
  46. }
  47. AutoSkipCompressorWrapper::AutoSkipCompressorWrapper(
  48. std::unique_ptr<Compressor> compressor, const CompressionOptions& opts)
  49. : CompressorWrapper::CompressorWrapper(std::move(compressor)),
  50. opts_(opts) {}
  51. const char* AutoSkipCompressorWrapper::Name() const {
  52. return "AutoSkipCompressorWrapper";
  53. }
  54. Status AutoSkipCompressorWrapper::CompressBlock(
  55. Slice uncompressed_data, char* compressed_output,
  56. size_t* compressed_output_size, CompressionType* out_compression_type,
  57. ManagedWorkingArea* wa) {
  58. // Check if the managed working area is provided or owned by this object.
  59. // If not, bypass auto-skip logic since the working area lacks a predictor to
  60. // record or make necessary decisions to compress or bypass compression of the
  61. // block
  62. if (wa == nullptr || wa->owner() != this) {
  63. return wrapped_->CompressBlock(uncompressed_data, compressed_output,
  64. compressed_output_size, out_compression_type,
  65. wa);
  66. }
  67. bool exploration =
  68. Random::GetTLSInstance()->PercentTrue(kExplorationPercentage);
  69. TEST_SYNC_POINT_CALLBACK(
  70. "AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
  71. &exploration);
  72. auto autoskip_wa = static_cast<AutoSkipWorkingArea*>(wa->get());
  73. if (exploration) {
  74. return CompressBlockAndRecord(uncompressed_data, compressed_output,
  75. compressed_output_size, out_compression_type,
  76. autoskip_wa);
  77. } else {
  78. auto predictor_ptr = autoskip_wa->predictor;
  79. auto prediction = predictor_ptr->Predict();
  80. if (prediction <= kProbabilityCutOff) {
  81. // decide to compress
  82. return CompressBlockAndRecord(uncompressed_data, compressed_output,
  83. compressed_output_size,
  84. out_compression_type, autoskip_wa);
  85. } else {
  86. // decide to bypass compression
  87. *out_compression_type = kNoCompression;
  88. *compressed_output_size = 0;
  89. return Status::OK();
  90. }
  91. }
  92. return Status::OK();
  93. }
  94. Compressor::ManagedWorkingArea AutoSkipCompressorWrapper::ObtainWorkingArea() {
  95. auto wrap_wa = wrapped_->ObtainWorkingArea();
  96. return ManagedWorkingArea(new AutoSkipWorkingArea(std::move(wrap_wa)), this);
  97. }
  98. void AutoSkipCompressorWrapper::ReleaseWorkingArea(WorkingArea* wa) {
  99. delete static_cast<AutoSkipWorkingArea*>(wa);
  100. }
  101. Status AutoSkipCompressorWrapper::CompressBlockAndRecord(
  102. Slice uncompressed_data, char* compressed_output,
  103. size_t* compressed_output_size, CompressionType* out_compression_type,
  104. AutoSkipWorkingArea* wa) {
  105. Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output,
  106. compressed_output_size,
  107. out_compression_type, &(wa->wrapped));
  108. // determine if it was rejected or compressed
  109. auto predictor_ptr = wa->predictor;
  110. predictor_ptr->Record(uncompressed_data, compressed_output,
  111. *compressed_output_size, *out_compression_type);
  112. return status;
  113. }
  114. const char* AutoSkipCompressorManager::Name() const {
  115. // should have returned "AutoSkipCompressorManager" but we currently have an
  116. // error so for now returning name of the wrapped container
  117. return wrapped_->Name();
  118. }
  119. std::unique_ptr<Compressor> AutoSkipCompressorManager::GetCompressorForSST(
  120. const FilterBuildingContext& context, const CompressionOptions& opts,
  121. CompressionType preferred) {
  122. assert(GetSupportedCompressions().size() > 1);
  123. assert(preferred != kNoCompression);
  124. return std::make_unique<AutoSkipCompressorWrapper>(
  125. wrapped_->GetCompressorForSST(context, opts, preferred), opts);
  126. }
  127. CostAwareCompressor::CostAwareCompressor(const CompressionOptions& opts)
  128. : opts_(opts) {
  129. // Creates compressor supporting all the compression types and levels as per
  130. // the compression levels set in vector CompressionLevels
  131. auto builtInManager = GetBuiltinV2CompressionManager();
  132. const auto& compressions = GetSupportedCompressions();
  133. for (size_t i = 0; i < kCompressionLevels.size(); i++) {
  134. CompressionType type = static_cast<CompressionType>(i + 1);
  135. if (type == kNoCompression) {
  136. continue;
  137. }
  138. if (kCompressionLevels[type - 1].size() == 0) {
  139. allcompressors_.emplace_back();
  140. continue;
  141. } else {
  142. // if the compression type is not supported, then skip and remove
  143. // compression levels from the supported compression level list
  144. if (std::find(compressions.begin(), compressions.end(), type) ==
  145. compressions.end()) {
  146. allcompressors_.emplace_back();
  147. continue;
  148. }
  149. std::vector<std::unique_ptr<Compressor>> compressors_diff_levels;
  150. for (size_t j = 0; j < kCompressionLevels[type - 1].size(); j++) {
  151. auto level = kCompressionLevels[type - 1][j];
  152. CompressionOptions new_opts = opts;
  153. new_opts.level = level;
  154. compressors_diff_levels.push_back(
  155. builtInManager->GetCompressor(new_opts, type));
  156. allcompressors_index_.emplace_back(i, j);
  157. }
  158. allcompressors_.push_back(std::move(compressors_diff_levels));
  159. }
  160. }
  161. }
  162. const char* CostAwareCompressor::Name() const { return "CostAwareCompressor"; }
  163. size_t CostAwareCompressor::GetMaxSampleSizeIfWantDict(
  164. CacheEntryRole block_type) const {
  165. auto idx = allcompressors_index_.back();
  166. return allcompressors_[idx.first][idx.second]->GetMaxSampleSizeIfWantDict(
  167. block_type);
  168. }
  169. Slice CostAwareCompressor::GetSerializedDict() const {
  170. auto idx = allcompressors_index_.back();
  171. return allcompressors_[idx.first][idx.second]->GetSerializedDict();
  172. }
  173. CompressionType CostAwareCompressor::GetPreferredCompressionType() const {
  174. return kZSTD;
  175. }
  176. std::unique_ptr<Compressor> CostAwareCompressor::MaybeCloneSpecialized(
  177. CacheEntryRole block_type, DictSampleArgs&& dict_samples) {
  178. // TODO: full dictionary compression support. Currently this just falls
  179. // back on a non-multi compressor when asked to use a dictionary.
  180. auto idx = allcompressors_index_.back();
  181. return allcompressors_[idx.first][idx.second]->MaybeCloneSpecialized(
  182. block_type, std::move(dict_samples));
  183. }
  184. Status CostAwareCompressor::CompressBlock(Slice uncompressed_data,
  185. char* compressed_output,
  186. size_t* compressed_output_size,
  187. CompressionType* out_compression_type,
  188. ManagedWorkingArea* wa) {
  189. // Check if the managed working area is provided or owned by this object.
  190. // If not, bypass compressor logic since the working area lacks a predictor
  191. if (allcompressors_.size() == 0) {
  192. return Status::NotSupported("No compression type supported");
  193. }
  194. if (wa == nullptr || wa->owner() != this) {
  195. // highest compression level of Zstd
  196. size_t choosen_compression_type = 6;
  197. size_t compression_level_ptr = 2;
  198. return allcompressors_[choosen_compression_type][compression_level_ptr]
  199. ->CompressBlock(uncompressed_data, compressed_output,
  200. compressed_output_size, out_compression_type, wa);
  201. }
  202. auto local_wa = static_cast<CostAwareWorkingArea*>(wa->get());
  203. std::pair<size_t, size_t> choosen_index(6, 2);
  204. size_t choosen_compression_type = choosen_index.first;
  205. size_t compresion_level_ptr = choosen_index.second;
  206. return CompressBlockAndRecord(choosen_compression_type, compresion_level_ptr,
  207. uncompressed_data, compressed_output,
  208. compressed_output_size, out_compression_type,
  209. local_wa);
  210. }
  211. Compressor::ManagedWorkingArea CostAwareCompressor::ObtainWorkingArea() {
  212. auto wrap_wa = allcompressors_.back().back()->ObtainWorkingArea();
  213. auto wa = new CostAwareWorkingArea(std::move(wrap_wa));
  214. // Create cost predictors for each compression type and level
  215. wa->cost_predictors_.reserve(allcompressors_.size());
  216. for (size_t i = 0; i < allcompressors_.size(); i++) {
  217. CompressionType type = static_cast<CompressionType>(i + 1);
  218. if (allcompressors_[type - 1].size() == 0) {
  219. wa->cost_predictors_.emplace_back();
  220. continue;
  221. } else {
  222. std::vector<IOCPUCostPredictor*> predictors_diff_levels;
  223. predictors_diff_levels.reserve(kCompressionLevels[type - 1].size());
  224. for (size_t j = 0; j < kCompressionLevels[type - 1].size(); j++) {
  225. predictors_diff_levels.emplace_back(new IOCPUCostPredictor(10));
  226. }
  227. wa->cost_predictors_.emplace_back(std::move(predictors_diff_levels));
  228. }
  229. }
  230. return ManagedWorkingArea(wa, this);
  231. }
  232. void CostAwareCompressor::ReleaseWorkingArea(WorkingArea* wa) {
  233. // remove all created cost predictors
  234. for (auto& prdictors_diff_levels :
  235. static_cast<CostAwareWorkingArea*>(wa)->cost_predictors_) {
  236. for (auto& predictor : prdictors_diff_levels) {
  237. delete predictor;
  238. }
  239. }
  240. delete static_cast<CostAwareWorkingArea*>(wa);
  241. }
  242. Status CostAwareCompressor::CompressBlockAndRecord(
  243. size_t choosen_compression_type, size_t compression_level_ptr,
  244. Slice uncompressed_data, char* compressed_output,
  245. size_t* compressed_output_size, CompressionType* out_compression_type,
  246. CostAwareWorkingArea* wa) {
  247. assert(choosen_compression_type < allcompressors_.size());
  248. assert(compression_level_ptr <
  249. allcompressors_[choosen_compression_type].size());
  250. assert(choosen_compression_type < wa->cost_predictors_.size());
  251. assert(compression_level_ptr <
  252. wa->cost_predictors_[choosen_compression_type].size());
  253. StopWatchNano<> timer(Env::Default()->GetSystemClock().get(), true);
  254. Status status =
  255. allcompressors_[choosen_compression_type][compression_level_ptr]
  256. ->CompressBlock(uncompressed_data, compressed_output,
  257. compressed_output_size, out_compression_type,
  258. &(wa->wrapped_));
  259. std::pair<size_t, size_t> measured_data(timer.ElapsedMicros(),
  260. *compressed_output_size);
  261. auto predictor =
  262. wa->cost_predictors_[choosen_compression_type][compression_level_ptr];
  263. auto output_length = measured_data.second;
  264. auto cpu_time = measured_data.first;
  265. predictor->CPUPredictor.Record(cpu_time);
  266. predictor->IOPredictor.Record(output_length);
  267. TEST_SYNC_POINT_CALLBACK(
  268. "CostAwareCompressor::CompressBlockAndRecord::GetPredictor",
  269. wa->cost_predictors_[choosen_compression_type][compression_level_ptr]);
  270. return status;
  271. }
  272. std::shared_ptr<CompressionManagerWrapper> CreateAutoSkipCompressionManager(
  273. std::shared_ptr<CompressionManager> wrapped) {
  274. return std::make_shared<AutoSkipCompressorManager>(
  275. wrapped == nullptr ? GetBuiltinV2CompressionManager() : wrapped);
  276. }
  277. const char* CostAwareCompressorManager::Name() const {
  278. // should have returned "CostAwareCompressorManager" but we currently have an
  279. // error so for now returning name of the wrapped container
  280. return wrapped_->Name();
  281. }
  282. std::unique_ptr<Compressor> CostAwareCompressorManager::GetCompressorForSST(
  283. const FilterBuildingContext& context, const CompressionOptions& opts,
  284. CompressionType preferred) {
  285. assert(GetSupportedCompressions().size() > 1);
  286. (void)context;
  287. (void)preferred;
  288. return std::make_unique<CostAwareCompressor>(opts);
  289. }
  290. std::shared_ptr<CompressionManagerWrapper> CreateCostAwareCompressionManager(
  291. std::shared_ptr<CompressionManager> wrapped) {
  292. return std::make_shared<CostAwareCompressorManager>(
  293. wrapped == nullptr ? GetBuiltinV2CompressionManager() : wrapped);
  294. }
  295. } // namespace ROCKSDB_NAMESPACE