compressed_secondary_cache.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "cache/compressed_secondary_cache.h"
  6. #include <algorithm>
  7. #include <cstdint>
  8. #include <memory>
  9. #include "memory/memory_allocator_impl.h"
  10. #include "monitoring/perf_context_imp.h"
  11. #include "util/coding.h"
  12. #include "util/compression.h"
  13. #include "util/string_util.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. namespace {
  16. // Format of values in CompressedSecondaryCache:
  17. // If enable_custom_split_merge:
  18. // * A chain of CacheValueChunk representing the sequence of bytes for a tagged
  19. // value. The overall length of the tagged value is determined by the chain
  20. // of CacheValueChunks.
  21. // If !enable_custom_split_merge:
  22. // * A LengthPrefixedSlice (starts with varint64 size) of a tagged value.
  23. //
  24. // A tagged value has a 2-byte header before the "saved" or compressed block
  25. // data:
  26. // * 1 byte for "source" CacheTier indicating which tier is responsible for
  27. // compression/decompression.
  28. // * 1 byte for compression type which is generated/used by
  29. // CompressedSecondaryCache iff source == CacheTier::kVolatileCompressedTier
  30. // (original entry passed in was uncompressed). Otherwise, the compression
  31. // type is preserved from the entry passed in.
  32. constexpr uint32_t kTagSize = 2;
  33. // Size of tag + varint size prefix when applicable
  34. uint32_t GetHeaderSize(size_t data_size, bool enable_split_merge) {
  35. return (enable_split_merge ? 0 : VarintLength(kTagSize + data_size)) +
  36. kTagSize;
  37. }
  38. } // namespace
  39. CompressedSecondaryCache::CompressedSecondaryCache(
  40. const CompressedSecondaryCacheOptions& opts)
  41. : cache_(opts.LRUCacheOptions::MakeSharedCache()),
  42. cache_options_(opts),
  43. cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
  44. std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
  45. cache_))),
  46. disable_cache_(opts.capacity == 0) {
  47. auto mgr =
  48. GetBuiltinCompressionManager(cache_options_.compress_format_version);
  49. compressor_ = mgr->GetCompressor(cache_options_.compression_opts,
  50. cache_options_.compression_type);
  51. decompressor_ =
  52. mgr->GetDecompressorOptimizeFor(cache_options_.compression_type);
  53. }
  54. CompressedSecondaryCache::~CompressedSecondaryCache() = default;
  55. std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
  56. const Slice& key, const Cache::CacheItemHelper* helper,
  57. Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase,
  58. Statistics* stats, bool& kept_in_sec_cache) {
  59. assert(helper);
  60. if (disable_cache_.LoadRelaxed()) {
  61. return nullptr;
  62. }
  63. std::unique_ptr<SecondaryCacheResultHandle> handle;
  64. kept_in_sec_cache = false;
  65. Cache::Handle* lru_handle = cache_->Lookup(key);
  66. if (lru_handle == nullptr) {
  67. return nullptr;
  68. }
  69. void* handle_value = cache_->Value(lru_handle);
  70. if (handle_value == nullptr) {
  71. cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
  72. RecordTick(stats, COMPRESSED_SECONDARY_CACHE_DUMMY_HITS);
  73. return nullptr;
  74. }
  75. std::string merged_value;
  76. Slice tagged_data;
  77. if (cache_options_.enable_custom_split_merge) {
  78. CacheValueChunk* value_chunk_ptr =
  79. static_cast<CacheValueChunk*>(handle_value);
  80. merged_value = MergeChunksIntoValue(value_chunk_ptr);
  81. tagged_data = Slice(merged_value);
  82. } else {
  83. tagged_data = GetLengthPrefixedSlice(static_cast<char*>(handle_value));
  84. }
  85. auto source = lossless_cast<CacheTier>(tagged_data[0]);
  86. auto type = lossless_cast<CompressionType>(tagged_data[1]);
  87. std::unique_ptr<char[]> uncompressed;
  88. Slice saved(tagged_data.data() + kTagSize, tagged_data.size() - kTagSize);
  89. if (source == CacheTier::kVolatileCompressedTier) {
  90. if (type != kNoCompression) {
  91. // TODO: can we do something to avoid yet another allocation?
  92. Decompressor::Args args;
  93. args.compressed_data = saved;
  94. args.compression_type = type;
  95. Status s = decompressor_->ExtractUncompressedSize(args);
  96. assert(s.ok()); // in-memory data
  97. if (s.ok()) {
  98. uncompressed = std::make_unique<char[]>(args.uncompressed_size);
  99. s = decompressor_->DecompressBlock(args, uncompressed.get());
  100. assert(s.ok()); // in-memory data
  101. }
  102. if (!s.ok()) {
  103. cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
  104. return nullptr;
  105. }
  106. saved = Slice(uncompressed.get(), args.uncompressed_size);
  107. type = kNoCompression;
  108. // Free temporary compressed data as early as we can. This could matter
  109. // for unusually large blocks because we also have
  110. // * Another compressed copy above (from lru_cache).
  111. // * The uncompressed copy in `uncompressed`.
  112. // * Another uncompressed copy in `result_value` below.
  113. // Let's try to max out at 3 copies instead of 4.
  114. merged_value = std::string();
  115. }
  116. // Reduced as if it came from primary cache
  117. source = CacheTier::kVolatileTier;
  118. }
  119. Cache::ObjectPtr result_value = nullptr;
  120. size_t result_charge = 0;
  121. Status s = helper->create_cb(saved, type, source, create_context,
  122. cache_options_.memory_allocator.get(),
  123. &result_value, &result_charge);
  124. if (!s.ok()) {
  125. cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
  126. return nullptr;
  127. }
  128. if (advise_erase) {
  129. cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
  130. // Insert a dummy handle.
  131. cache_
  132. ->Insert(key, /*obj=*/nullptr,
  133. GetHelper(cache_options_.enable_custom_split_merge),
  134. /*charge=*/0)
  135. .PermitUncheckedError();
  136. } else {
  137. kept_in_sec_cache = true;
  138. cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
  139. }
  140. handle.reset(
  141. new CompressedSecondaryCacheResultHandle(result_value, result_charge));
  142. RecordTick(stats, COMPRESSED_SECONDARY_CACHE_HITS);
  143. return handle;
  144. }
  145. bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) {
  146. auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
  147. Cache::Handle* lru_handle = cache_->Lookup(key);
  148. if (lru_handle == nullptr) {
  149. PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
  150. // Insert a dummy handle if the handle is evicted for the first time.
  151. cache_->Insert(key, /*obj=*/nullptr, internal_helper, /*charge=*/0)
  152. .PermitUncheckedError();
  153. return true;
  154. } else {
  155. cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
  156. }
  157. return false;
  158. }
  159. Status CompressedSecondaryCache::InsertInternal(
  160. const Slice& key, Cache::ObjectPtr value,
  161. const Cache::CacheItemHelper* helper, CompressionType from_type,
  162. CacheTier source) {
  163. bool enable_split_merge = cache_options_.enable_custom_split_merge;
  164. const Cache::CacheItemHelper* internal_helper = GetHelper(enable_split_merge);
  165. // TODO: variant of size_cb that also returns a pointer to the data if
  166. // already available. Saves an allocation if we keep the compressed version.
  167. const size_t data_size_original = (*helper->size_cb)(value);
  168. // Allocate enough memory for header/tag + original data because (a) we might
  169. // not be attempting compression at all, and (b) we might keep the original if
  170. // compression is insufficient. But we don't need the length prefix with
  171. // enable_split_merge. TODO: be smarter with CacheValueChunk to save an
  172. // allocation in the enable_split_merge case.
  173. size_t header_size = GetHeaderSize(data_size_original, enable_split_merge);
  174. CacheAllocationPtr allocation = AllocateBlock(
  175. header_size + data_size_original, cache_options_.memory_allocator.get());
  176. char* data_ptr = allocation.get() + header_size;
  177. Slice tagged_data(data_ptr - kTagSize, data_size_original + kTagSize);
  178. assert(tagged_data.data() >= allocation.get());
  179. Status s = (*helper->saveto_cb)(value, 0, data_size_original, data_ptr);
  180. if (!s.ok()) {
  181. return s;
  182. }
  183. std::unique_ptr<char[]> tagged_compressed_data;
  184. CompressionType to_type = kNoCompression;
  185. if (compressor_ && from_type == kNoCompression &&
  186. !cache_options_.do_not_compress_roles.Contains(helper->role)) {
  187. assert(source == CacheTier::kVolatileCompressedTier);
  188. // TODO: consider malloc sizes for max acceptable compressed size
  189. // Or maybe max_compressed_bytes_per_kb
  190. size_t data_size_compressed = data_size_original - 1;
  191. tagged_compressed_data =
  192. std::make_unique<char[]>(data_size_compressed + kTagSize);
  193. s = compressor_->CompressBlock(Slice(data_ptr, data_size_original),
  194. tagged_compressed_data.get() + kTagSize,
  195. &data_size_compressed, &to_type,
  196. nullptr /*working_area*/);
  197. if (!s.ok()) {
  198. return s;
  199. }
  200. PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes,
  201. data_size_original);
  202. if (to_type == kNoCompression) {
  203. // Compression rejected or otherwise aborted/failed
  204. to_type = kNoCompression;
  205. tagged_compressed_data.reset();
  206. // TODO: consider separate counters for rejected compressions
  207. PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes,
  208. data_size_original);
  209. } else {
  210. PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes,
  211. data_size_compressed);
  212. if (enable_split_merge) {
  213. // Only need tagged_data for copying into CacheValueChunks.
  214. tagged_data = Slice(tagged_compressed_data.get(),
  215. data_size_compressed + kTagSize);
  216. allocation.reset();
  217. } else {
  218. // Replace allocation with compressed version, copied from string
  219. header_size = GetHeaderSize(data_size_compressed, enable_split_merge);
  220. allocation = AllocateBlock(header_size + data_size_compressed,
  221. cache_options_.memory_allocator.get());
  222. data_ptr = allocation.get() + header_size;
  223. // Ignore unpopulated tag on tagged_compressed_data; will only be
  224. // populated on the new allocation.
  225. std::memcpy(data_ptr, tagged_compressed_data.get() + kTagSize,
  226. data_size_compressed);
  227. tagged_data =
  228. Slice(data_ptr - kTagSize, data_size_compressed + kTagSize);
  229. assert(tagged_data.data() >= allocation.get());
  230. }
  231. }
  232. }
  233. PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
  234. // Save the tag fields
  235. const_cast<char*>(tagged_data.data())[0] = lossless_cast<char>(source);
  236. const_cast<char*>(tagged_data.data())[1] = lossless_cast<char>(
  237. source == CacheTier::kVolatileCompressedTier ? to_type : from_type);
  238. if (enable_split_merge) {
  239. size_t split_charge{0};
  240. CacheValueChunk* value_chunks_head =
  241. SplitValueIntoChunks(tagged_data, split_charge);
  242. s = cache_->Insert(key, value_chunks_head, internal_helper, split_charge);
  243. assert(s.ok()); // LRUCache::Insert() with handle==nullptr always OK
  244. } else {
  245. // Save the size prefix
  246. char* ptr = allocation.get();
  247. ptr = EncodeVarint64(ptr, tagged_data.size());
  248. assert(ptr == tagged_data.data());
  249. #ifdef ROCKSDB_MALLOC_USABLE_SIZE
  250. size_t charge = malloc_usable_size(allocation.get());
  251. #else
  252. size_t charge = tagged_data.size();
  253. #endif
  254. s = cache_->Insert(key, allocation.release(), internal_helper, charge);
  255. assert(s.ok()); // LRUCache::Insert() with handle==nullptr always OK
  256. }
  257. return Status::OK();
  258. }
  259. Status CompressedSecondaryCache::Insert(const Slice& key,
  260. Cache::ObjectPtr value,
  261. const Cache::CacheItemHelper* helper,
  262. bool force_insert) {
  263. if (value == nullptr) {
  264. return Status::InvalidArgument();
  265. }
  266. if (!force_insert && MaybeInsertDummy(key)) {
  267. return Status::OK();
  268. }
  269. return InsertInternal(key, value, helper, kNoCompression,
  270. CacheTier::kVolatileCompressedTier);
  271. }
  272. Status CompressedSecondaryCache::InsertSaved(
  273. const Slice& key, const Slice& saved, CompressionType type = kNoCompression,
  274. CacheTier source = CacheTier::kVolatileTier) {
  275. if (source == CacheTier::kVolatileCompressedTier) {
  276. // Unexpected, would violate InsertInternal preconditions
  277. assert(source != CacheTier::kVolatileCompressedTier);
  278. return Status::OK();
  279. }
  280. if (type == kNoCompression) {
  281. // Not currently supported (why?)
  282. return Status::OK();
  283. }
  284. if (cache_options_.enable_custom_split_merge) {
  285. // We don't support custom split/merge for the tiered case (why?)
  286. return Status::OK();
  287. }
  288. auto slice_helper = &kSliceCacheItemHelper;
  289. if (MaybeInsertDummy(key)) {
  290. return Status::OK();
  291. }
  292. return InsertInternal(
  293. key, static_cast<Cache::ObjectPtr>(const_cast<Slice*>(&saved)),
  294. slice_helper, type, source);
  295. }
  296. void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
  297. Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
  298. MutexLock l(&capacity_mutex_);
  299. cache_options_.capacity = capacity;
  300. cache_->SetCapacity(capacity);
  301. disable_cache_.StoreRelaxed(capacity == 0);
  302. return Status::OK();
  303. }
  304. Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
  305. MutexLock l(&capacity_mutex_);
  306. capacity = cache_options_.capacity;
  307. return Status::OK();
  308. }
  309. std::string CompressedSecondaryCache::GetPrintableOptions() const {
  310. std::string ret;
  311. ret.reserve(20000);
  312. const int kBufferSize{200};
  313. char buffer[kBufferSize];
  314. ret.append(cache_->GetPrintableOptions());
  315. snprintf(buffer, kBufferSize, " compression_type : %s\n",
  316. CompressionTypeToString(cache_options_.compression_type).c_str());
  317. ret.append(buffer);
  318. snprintf(buffer, kBufferSize, " compression_opts : %s\n",
  319. CompressionOptionsToString(
  320. const_cast<CompressionOptions&>(cache_options_.compression_opts))
  321. .c_str());
  322. ret.append(buffer);
  323. snprintf(buffer, kBufferSize, " compress_format_version : %d\n",
  324. cache_options_.compress_format_version);
  325. ret.append(buffer);
  326. return ret;
  327. }
  328. // FIXME: this could use a lot of attention, including:
  329. // * Use allocator
  330. // * We shouldn't be worse than non-split; be more pro-actively aware of
  331. // internal fragmentation
  332. // * Consider a unified object/chunk structure that may or may not split
  333. // * Optimize size overhead of chunks
  334. CompressedSecondaryCache::CacheValueChunk*
  335. CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
  336. size_t& charge) {
  337. assert(!value.empty());
  338. const char* src_ptr = value.data();
  339. size_t src_size{value.size()};
  340. CacheValueChunk dummy_head = CacheValueChunk();
  341. CacheValueChunk* current_chunk = &dummy_head;
  342. // Do not split when value size is large or there is no compression.
  343. size_t predicted_chunk_size{0};
  344. size_t actual_chunk_size{0};
  345. size_t tmp_size{0};
  346. while (src_size > 0) {
  347. predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
  348. auto upper =
  349. std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
  350. predicted_chunk_size);
  351. // Do not split when value size is too small, too large, close to a bin
  352. // size, or there is no compression.
  353. if (upper == malloc_bin_sizes_.begin() ||
  354. upper == malloc_bin_sizes_.end() ||
  355. *upper - predicted_chunk_size < malloc_bin_sizes_.front()) {
  356. tmp_size = predicted_chunk_size;
  357. } else {
  358. tmp_size = *(--upper);
  359. }
  360. CacheValueChunk* new_chunk =
  361. static_cast<CacheValueChunk*>(static_cast<void*>(new char[tmp_size]));
  362. current_chunk->next = new_chunk;
  363. current_chunk = current_chunk->next;
  364. actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
  365. memcpy(current_chunk->data, src_ptr, actual_chunk_size);
  366. current_chunk->size = actual_chunk_size;
  367. src_ptr += actual_chunk_size;
  368. src_size -= actual_chunk_size;
  369. charge += tmp_size;
  370. }
  371. current_chunk->next = nullptr;
  372. return dummy_head.next;
  373. }
  374. std::string CompressedSecondaryCache::MergeChunksIntoValue(
  375. const CacheValueChunk* head) {
  376. const CacheValueChunk* current_chunk = head;
  377. size_t total_size = 0;
  378. while (current_chunk != nullptr) {
  379. total_size += current_chunk->size;
  380. current_chunk = current_chunk->next;
  381. }
  382. std::string result;
  383. result.reserve(total_size);
  384. current_chunk = head;
  385. while (current_chunk != nullptr) {
  386. result.append(current_chunk->data, current_chunk->size);
  387. current_chunk = current_chunk->next;
  388. }
  389. assert(result.size() == total_size);
  390. return result;
  391. }
  392. const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
  393. bool enable_custom_split_merge) const {
  394. if (enable_custom_split_merge) {
  395. static const Cache::CacheItemHelper kHelper{
  396. CacheEntryRole::kMisc,
  397. [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) {
  398. CacheValueChunk* chunks_head = static_cast<CacheValueChunk*>(obj);
  399. while (chunks_head != nullptr) {
  400. CacheValueChunk* tmp_chunk = chunks_head;
  401. chunks_head = chunks_head->next;
  402. tmp_chunk->Free();
  403. }
  404. }};
  405. return &kHelper;
  406. } else {
  407. static const Cache::CacheItemHelper kHelper{
  408. CacheEntryRole::kMisc,
  409. [](Cache::ObjectPtr obj, MemoryAllocator* alloc) {
  410. if (obj != nullptr) {
  411. CacheAllocationDeleter{alloc}(static_cast<char*>(obj));
  412. }
  413. }};
  414. return &kHelper;
  415. }
  416. }
  417. size_t CompressedSecondaryCache::TEST_GetCharge(const Slice& key) {
  418. Cache::Handle* lru_handle = cache_->Lookup(key);
  419. if (lru_handle == nullptr) {
  420. return 0;
  421. }
  422. size_t charge = cache_->GetCharge(lru_handle);
  423. cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
  424. return charge;
  425. }
  426. std::shared_ptr<SecondaryCache>
  427. CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
  428. return std::make_shared<CompressedSecondaryCache>(*this);
  429. }
  430. Status CompressedSecondaryCache::Deflate(size_t decrease) {
  431. return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true);
  432. }
  433. Status CompressedSecondaryCache::Inflate(size_t increase) {
  434. return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false);
  435. }
  436. } // namespace ROCKSDB_NAMESPACE