| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- // Copyright (c) Meta Platforms, Inc. and affiliates.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/blob/blob_source.h"
- #include <cassert>
- #include <string>
- #include "cache/cache_reservation_manager.h"
- #include "cache/charged_cache.h"
- #include "db/blob/blob_contents.h"
- #include "db/blob/blob_file_reader.h"
- #include "db/blob/blob_log_format.h"
- #include "monitoring/statistics_impl.h"
- #include "options/cf_options.h"
- #include "table/get_context.h"
- #include "table/multiget_context.h"
- namespace ROCKSDB_NAMESPACE {
- BlobSource::BlobSource(const ImmutableOptions& immutable_options,
- const MutableCFOptions& mutable_cf_options,
- const std::string& db_id,
- const std::string& db_session_id,
- BlobFileCache* blob_file_cache)
- : db_id_(db_id),
- db_session_id_(db_session_id),
- statistics_(immutable_options.statistics.get()),
- blob_file_cache_(blob_file_cache),
- blob_cache_(immutable_options.blob_cache),
- lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) {
- auto bbto =
- mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
- if (bbto &&
- bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
- .charged == CacheEntryRoleOptions::Decision::kEnabled) {
- blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>(
- immutable_options.blob_cache, bbto->block_cache)};
- }
- }
- BlobSource::~BlobSource() = default;
- Status BlobSource::GetBlobFromCache(
- const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const {
- assert(blob_cache_);
- assert(!cache_key.empty());
- assert(cached_blob);
- assert(cached_blob->IsEmpty());
- Cache::Handle* cache_handle = nullptr;
- cache_handle = GetEntryFromCache(cache_key);
- if (cache_handle != nullptr) {
- *cached_blob =
- CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
- assert(cached_blob->GetValue());
- PERF_COUNTER_ADD(blob_cache_hit_count, 1);
- RecordTick(statistics_, BLOB_DB_CACHE_HIT);
- RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
- cached_blob->GetValue()->size());
- return Status::OK();
- }
- RecordTick(statistics_, BLOB_DB_CACHE_MISS);
- return Status::NotFound("Blob not found in cache");
- }
- Status BlobSource::PutBlobIntoCache(
- const Slice& cache_key, std::unique_ptr<BlobContents>* blob,
- CacheHandleGuard<BlobContents>* cached_blob) const {
- assert(blob_cache_);
- assert(!cache_key.empty());
- assert(blob);
- assert(*blob);
- assert(cached_blob);
- assert(cached_blob->IsEmpty());
- TypedHandle* cache_handle = nullptr;
- const Status s = InsertEntryIntoCache(cache_key, blob->get(), &cache_handle,
- Cache::Priority::BOTTOM);
- if (s.ok()) {
- blob->release();
- assert(cache_handle != nullptr);
- *cached_blob =
- CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
- assert(cached_blob->GetValue());
- RecordTick(statistics_, BLOB_DB_CACHE_ADD);
- RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
- cached_blob->GetValue()->size());
- } else {
- RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
- }
- return s;
- }
- BlobSource::TypedHandle* BlobSource::GetEntryFromCache(const Slice& key) const {
- return blob_cache_.LookupFull(key, nullptr /* context */,
- Cache::Priority::BOTTOM, statistics_,
- lowest_used_cache_tier_);
- }
- void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
- PinnableSlice* value) {
- assert(cached_blob);
- assert(cached_blob->GetValue());
- assert(value);
- // To avoid copying the cached blob into the buffer provided by the
- // application, we can simply transfer ownership of the cache handle to
- // the target PinnableSlice. This has the potential to save a lot of
- // CPU, especially with large blob values.
- value->Reset();
- constexpr Cleanable* cleanable = nullptr;
- value->PinSlice(cached_blob->GetValue()->data(), cleanable);
- cached_blob->TransferTo(value);
- }
- void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob,
- PinnableSlice* value) {
- assert(owned_blob);
- assert(*owned_blob);
- assert(value);
- BlobContents* const blob = owned_blob->release();
- assert(blob);
- value->Reset();
- value->PinSlice(
- blob->data(),
- [](void* arg1, void* /* arg2 */) {
- delete static_cast<BlobContents*>(arg1);
- },
- blob, nullptr);
- }
- Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
- TypedHandle** cache_handle,
- Cache::Priority priority) const {
- return blob_cache_.InsertFull(key, value, value->ApproximateMemoryUsage(),
- cache_handle, priority,
- lowest_used_cache_tier_);
- }
- Status BlobSource::GetBlob(const ReadOptions& read_options,
- const Slice& user_key, uint64_t file_number,
- uint64_t offset, uint64_t file_size,
- uint64_t value_size,
- CompressionType compression_type,
- FilePrefetchBuffer* prefetch_buffer,
- PinnableSlice* value, uint64_t* bytes_read) {
- assert(value);
- Status s;
- const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
- CacheHandleGuard<BlobContents> blob_handle;
- // First, try to get the blob from the cache
- //
- // If blob cache is enabled, we'll try to read from it.
- if (blob_cache_) {
- Slice key = cache_key.AsSlice();
- s = GetBlobFromCache(key, &blob_handle);
- if (s.ok()) {
- PinCachedBlob(&blob_handle, value);
- // For consistency, the size of on-disk (possibly compressed) blob record
- // is assigned to bytes_read.
- uint64_t adjustment =
- read_options.verify_checksums
- ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
- user_key.size())
- : 0;
- assert(offset >= adjustment);
- uint64_t record_size = value_size + adjustment;
- if (bytes_read) {
- *bytes_read = record_size;
- }
- return s;
- }
- }
- assert(blob_handle.IsEmpty());
- const bool no_io = read_options.read_tier == kBlockCacheTier;
- if (no_io) {
- s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
- return s;
- }
- // Can't find the blob from the cache. Since I/O is allowed, read from the
- // file.
- std::unique_ptr<BlobContents> blob_contents;
- {
- CacheHandleGuard<BlobFileReader> blob_file_reader;
- s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
- &blob_file_reader);
- if (!s.ok()) {
- return s;
- }
- assert(blob_file_reader.GetValue());
- if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
- return Status::Corruption("Compression type mismatch when reading blob");
- }
- MemoryAllocator* const allocator =
- (blob_cache_ && read_options.fill_cache)
- ? blob_cache_.get()->memory_allocator()
- : nullptr;
- uint64_t read_size = 0;
- s = blob_file_reader.GetValue()->GetBlob(
- read_options, user_key, offset, value_size, compression_type,
- prefetch_buffer, allocator, &blob_contents, &read_size);
- if (!s.ok()) {
- return s;
- }
- if (bytes_read) {
- *bytes_read = read_size;
- }
- }
- if (blob_cache_ && read_options.fill_cache) {
- // If filling cache is allowed and a cache is configured, try to put the
- // blob to the cache.
- Slice key = cache_key.AsSlice();
- s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
- if (!s.ok()) {
- return s;
- }
- PinCachedBlob(&blob_handle, value);
- } else {
- PinOwnedBlob(&blob_contents, value);
- }
- assert(s.ok());
- return s;
- }
- void BlobSource::MultiGetBlob(const ReadOptions& read_options,
- autovector<BlobFileReadRequests>& blob_reqs,
- uint64_t* bytes_read) {
- assert(blob_reqs.size() > 0);
- uint64_t total_bytes_read = 0;
- uint64_t bytes_read_in_file = 0;
- for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
- // sort blob_reqs_in_file by file offset.
- std::sort(
- blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
- [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
- return lhs.offset < rhs.offset;
- });
- MultiGetBlobFromOneFile(read_options, file_number, file_size,
- blob_reqs_in_file, &bytes_read_in_file);
- total_bytes_read += bytes_read_in_file;
- }
- if (bytes_read) {
- *bytes_read = total_bytes_read;
- }
- }
- void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
- uint64_t file_number,
- uint64_t /*file_size*/,
- autovector<BlobReadRequest>& blob_reqs,
- uint64_t* bytes_read) {
- const size_t num_blobs = blob_reqs.size();
- assert(num_blobs > 0);
- assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
- #ifndef NDEBUG
- for (size_t i = 0; i < num_blobs - 1; ++i) {
- assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
- }
- #endif // !NDEBUG
- using Mask = uint64_t;
- Mask cache_hit_mask = 0;
- uint64_t total_bytes = 0;
- const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
- if (blob_cache_) {
- size_t cached_blob_count = 0;
- for (size_t i = 0; i < num_blobs; ++i) {
- auto& req = blob_reqs[i];
- CacheHandleGuard<BlobContents> blob_handle;
- const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
- const Slice key = cache_key.AsSlice();
- const Status s = GetBlobFromCache(key, &blob_handle);
- if (s.ok()) {
- assert(req.status);
- *req.status = s;
- PinCachedBlob(&blob_handle, req.result);
- // Update the counter for the number of valid blobs read from the cache.
- ++cached_blob_count;
- // For consistency, the size of each on-disk (possibly compressed) blob
- // record is accumulated to total_bytes.
- uint64_t adjustment =
- read_options.verify_checksums
- ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
- req.user_key->size())
- : 0;
- assert(req.offset >= adjustment);
- total_bytes += req.len + adjustment;
- cache_hit_mask |= (Mask{1} << i); // cache hit
- }
- }
- // All blobs were read from the cache.
- if (cached_blob_count == num_blobs) {
- if (bytes_read) {
- *bytes_read = total_bytes;
- }
- return;
- }
- }
- const bool no_io = read_options.read_tier == kBlockCacheTier;
- if (no_io) {
- for (size_t i = 0; i < num_blobs; ++i) {
- if (!(cache_hit_mask & (Mask{1} << i))) {
- BlobReadRequest& req = blob_reqs[i];
- assert(req.status);
- *req.status =
- Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
- }
- }
- return;
- }
- {
- // Find the rest of blobs from the file since I/O is allowed.
- autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
- _blob_reqs;
- uint64_t _bytes_read = 0;
- for (size_t i = 0; i < num_blobs; ++i) {
- if (!(cache_hit_mask & (Mask{1} << i))) {
- _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>());
- }
- }
- CacheHandleGuard<BlobFileReader> blob_file_reader;
- Status s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
- &blob_file_reader);
- if (!s.ok()) {
- for (size_t i = 0; i < _blob_reqs.size(); ++i) {
- BlobReadRequest* const req = _blob_reqs[i].first;
- assert(req);
- assert(req->status);
- *req->status = s;
- }
- return;
- }
- assert(blob_file_reader.GetValue());
- MemoryAllocator* const allocator =
- (blob_cache_ && read_options.fill_cache)
- ? blob_cache_.get()->memory_allocator()
- : nullptr;
- blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator,
- _blob_reqs, &_bytes_read);
- if (blob_cache_ && read_options.fill_cache) {
- // If filling cache is allowed and a cache is configured, try to put
- // the blob(s) to the cache.
- for (auto& [req, blob_contents] : _blob_reqs) {
- assert(req);
- if (req->status->ok()) {
- CacheHandleGuard<BlobContents> blob_handle;
- const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
- const Slice key = cache_key.AsSlice();
- s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
- if (!s.ok()) {
- *req->status = s;
- } else {
- PinCachedBlob(&blob_handle, req->result);
- }
- }
- }
- } else {
- for (auto& [req, blob_contents] : _blob_reqs) {
- assert(req);
- if (req->status->ok()) {
- PinOwnedBlob(&blob_contents, req->result);
- }
- }
- }
- total_bytes += _bytes_read;
- if (bytes_read) {
- *bytes_read = total_bytes;
- }
- }
- }
- bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
- uint64_t offset, size_t* charge) const {
- const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
- const Slice key = cache_key.AsSlice();
- CacheHandleGuard<BlobContents> blob_handle;
- const Status s = GetBlobFromCache(key, &blob_handle);
- if (s.ok() && blob_handle.GetValue() != nullptr) {
- if (charge) {
- const Cache* const cache = blob_handle.GetCache();
- assert(cache);
- Cache::Handle* const handle = blob_handle.GetCacheHandle();
- assert(handle);
- *charge = cache->GetUsage(handle);
- }
- return true;
- }
- return false;
- }
- } // namespace ROCKSDB_NAMESPACE
|