| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- // Copyright (c) 2013, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #ifndef ROCKSDB_LITE
- #ifndef OS_WIN
- #include <unistd.h>
- #endif // ! OS_WIN
- #include <atomic>
- #include <list>
- #include <memory>
- #include <set>
- #include <sstream>
- #include <stdexcept>
- #include <string>
- #include <thread>
- #include "rocksdb/cache.h"
- #include "rocksdb/comparator.h"
- #include "rocksdb/persistent_cache.h"
- #include "utilities/persistent_cache/block_cache_tier_file.h"
- #include "utilities/persistent_cache/block_cache_tier_metadata.h"
- #include "utilities/persistent_cache/persistent_cache_util.h"
- #include "memory/arena.h"
- #include "memtable/skiplist.h"
- #include "monitoring/histogram.h"
- #include "port/port.h"
- #include "util/coding.h"
- #include "util/crc32c.h"
- #include "util/mutexlock.h"
- namespace ROCKSDB_NAMESPACE {
- //
- // Block cache tier implementation
- //
- class BlockCacheTier : public PersistentCacheTier {
- public:
- explicit BlockCacheTier(const PersistentCacheConfig& opt)
- : opt_(opt),
- insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
- buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
- writer_(this, opt_.writer_qdepth, static_cast<size_t>(opt_.writer_dispatch_size)) {
- Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
- opt_.write_buffer_size, opt_.write_buffer_count());
- }
- virtual ~BlockCacheTier() {
- // Close is re-entrant so we can call close even if it is already closed
- Close();
- assert(!insert_th_.joinable());
- }
- Status Insert(const Slice& key, const char* data, const size_t size) override;
- Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
- size_t* size) override;
- Status Open() override;
- Status Close() override;
- bool Erase(const Slice& key) override;
- bool Reserve(const size_t size) override;
- bool IsCompressed() override { return opt_.is_compressed; }
- std::string GetPrintableOptions() const override { return opt_.ToString(); }
- PersistentCache::StatsType Stats() override;
- void TEST_Flush() override {
- while (insert_ops_.Size()) {
- /* sleep override */
- Env::Default()->SleepForMicroseconds(1000000);
- }
- }
- private:
- // Percentage of cache to be evicted when the cache is full
- static const size_t kEvictPct = 10;
- // Max attempts to insert key, value to cache in pipelined mode
- static const size_t kMaxRetry = 3;
- // Pipelined operation
- struct InsertOp {
- explicit InsertOp(const bool signal) : signal_(signal) {}
- explicit InsertOp(std::string&& key, const std::string& data)
- : key_(std::move(key)), data_(data) {}
- ~InsertOp() {}
- InsertOp() = delete;
- InsertOp(InsertOp&& /*rhs*/) = default;
- InsertOp& operator=(InsertOp&& rhs) = default;
- // used for estimating size by bounded queue
- size_t Size() { return data_.size() + key_.size(); }
- std::string key_;
- std::string data_;
- bool signal_ = false; // signal to request processing thread to exit
- };
- // entry point for insert thread
- void InsertMain();
- // insert implementation
- Status InsertImpl(const Slice& key, const Slice& data);
- // Create a new cache file
- Status NewCacheFile();
- // Get cache directory path
- std::string GetCachePath() const { return opt_.path + "/cache"; }
- // Cleanup folder
- Status CleanupCacheFolder(const std::string& folder);
- // Statistics
- struct Statistics {
- HistogramImpl bytes_pipelined_;
- HistogramImpl bytes_written_;
- HistogramImpl bytes_read_;
- HistogramImpl read_hit_latency_;
- HistogramImpl read_miss_latency_;
- HistogramImpl write_latency_;
- std::atomic<uint64_t> cache_hits_{0};
- std::atomic<uint64_t> cache_misses_{0};
- std::atomic<uint64_t> cache_errors_{0};
- std::atomic<uint64_t> insert_dropped_{0};
- double CacheHitPct() const {
- const auto lookups = cache_hits_ + cache_misses_;
- return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
- }
- double CacheMissPct() const {
- const auto lookups = cache_hits_ + cache_misses_;
- return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
- }
- };
- port::RWMutex lock_; // Synchronization
- const PersistentCacheConfig opt_; // BlockCache options
- BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
- ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread
- uint32_t writer_cache_id_ = 0; // Current cache file identifier
- WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
- CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
- ThreadedWriter writer_; // Writer threads
- BlockCacheTierMetadata metadata_; // Cache meta data manager
- std::atomic<uint64_t> size_{0}; // Size of the cache
- Statistics stats_; // Statistics
- };
- } // namespace ROCKSDB_NAMESPACE
- #endif
|