| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "rocksdb/utilities/sim_cache.h"
- #include <atomic>
- #include <iomanip>
- #include "file/writable_file_writer.h"
- #include "monitoring/statistics_impl.h"
- #include "port/port.h"
- #include "rocksdb/env.h"
- #include "rocksdb/file_system.h"
- #include "util/mutexlock.h"
- namespace ROCKSDB_NAMESPACE {
- namespace {
- class CacheActivityLogger {
- public:
- CacheActivityLogger()
- : activity_logging_enabled_(false), max_logging_size_(0) {}
- ~CacheActivityLogger() {
- MutexLock l(&mutex_);
- StopLoggingInternal();
- bg_status_.PermitUncheckedError();
- }
- Status StartLogging(const std::string& activity_log_file, Env* env,
- uint64_t max_logging_size = 0) {
- assert(activity_log_file != "");
- assert(env != nullptr);
- Status status;
- FileOptions file_opts;
- MutexLock l(&mutex_);
- // Stop existing logging if any
- StopLoggingInternal();
- // Open log file
- status = WritableFileWriter::Create(env->GetFileSystem(), activity_log_file,
- file_opts, &file_writer_, nullptr);
- if (!status.ok()) {
- return status;
- }
- max_logging_size_ = max_logging_size;
- activity_logging_enabled_.store(true);
- return status;
- }
- void StopLogging() {
- MutexLock l(&mutex_);
- StopLoggingInternal();
- }
- void ReportLookup(const Slice& key) {
- if (activity_logging_enabled_.load() == false) {
- return;
- }
- std::ostringstream oss;
- // line format: "LOOKUP - <KEY>"
- oss << "LOOKUP - " << key.ToString(true) << std::endl;
- MutexLock l(&mutex_);
- Status s = file_writer_->Append(IOOptions(), oss.str());
- if (!s.ok() && bg_status_.ok()) {
- bg_status_ = s;
- }
- if (MaxLoggingSizeReached() || !bg_status_.ok()) {
- // Stop logging if we have reached the max file size or
- // encountered an error
- StopLoggingInternal();
- }
- }
- void ReportAdd(const Slice& key, size_t size) {
- if (activity_logging_enabled_.load() == false) {
- return;
- }
- std::ostringstream oss;
- // line format: "ADD - <KEY> - <KEY-SIZE>"
- oss << "ADD - " << key.ToString(true) << " - " << size << std::endl;
- MutexLock l(&mutex_);
- Status s = file_writer_->Append(IOOptions(), oss.str());
- if (!s.ok() && bg_status_.ok()) {
- bg_status_ = s;
- }
- if (MaxLoggingSizeReached() || !bg_status_.ok()) {
- // Stop logging if we have reached the max file size or
- // encountered an error
- StopLoggingInternal();
- }
- }
- Status& bg_status() {
- MutexLock l(&mutex_);
- return bg_status_;
- }
- private:
- bool MaxLoggingSizeReached() {
- mutex_.AssertHeld();
- return (max_logging_size_ > 0 &&
- file_writer_->GetFileSize() >= max_logging_size_);
- }
- void StopLoggingInternal() {
- mutex_.AssertHeld();
- if (!activity_logging_enabled_) {
- return;
- }
- activity_logging_enabled_.store(false);
- Status s = file_writer_->Close(IOOptions());
- if (!s.ok() && bg_status_.ok()) {
- bg_status_ = s;
- }
- }
- // Mutex to sync writes to file_writer, and all following
- // class data members
- port::Mutex mutex_;
- // Indicates if logging is currently enabled
- // atomic to allow reads without mutex
- std::atomic<bool> activity_logging_enabled_;
- // When reached, we will stop logging and close the file
- // Value of 0 means unlimited
- uint64_t max_logging_size_;
- std::unique_ptr<WritableFileWriter> file_writer_;
- Status bg_status_;
- };
- // SimCacheImpl definition
- class SimCacheImpl : public SimCache {
- public:
- // capacity for real cache (ShardedLRUCache)
- // test_capacity for key only cache
- SimCacheImpl(std::shared_ptr<Cache> sim_cache, std::shared_ptr<Cache> cache)
- : SimCache(cache),
- key_only_cache_(sim_cache),
- miss_times_(0),
- hit_times_(0),
- stats_(nullptr) {}
- ~SimCacheImpl() override = default;
- const char* Name() const override { return "SimCache"; }
- void SetCapacity(size_t capacity) override { target_->SetCapacity(capacity); }
- void SetStrictCapacityLimit(bool strict_capacity_limit) override {
- target_->SetStrictCapacityLimit(strict_capacity_limit);
- }
- Status Insert(const Slice& key, Cache::ObjectPtr value,
- const CacheItemHelper* helper, size_t charge, Handle** handle,
- Priority priority, const Slice& compressed = {},
- CompressionType type = kNoCompression) override {
- // The handle and value passed in are for real cache, so we pass nullptr
- // to key_only_cache_ for both instead. Also, the deleter function pointer
- // will be called by user to perform some external operation which should
- // be applied only once. Thus key_only_cache accepts an empty function.
- // *Lambda function without capture can be assgined to a function pointer
- Handle* h = key_only_cache_->Lookup(key);
- if (h == nullptr) {
- // TODO: Check for error here?
- auto s =
- key_only_cache_->Insert(key, nullptr, &kNoopCacheItemHelper, charge,
- nullptr, priority, compressed, type);
- s.PermitUncheckedError();
- } else {
- key_only_cache_->Release(h);
- }
- cache_activity_logger_.ReportAdd(key, charge);
- if (!target_) {
- return Status::OK();
- }
- return target_->Insert(key, value, helper, charge, handle, priority,
- compressed, type);
- }
- Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
- CreateContext* create_context,
- Priority priority = Priority::LOW,
- Statistics* stats = nullptr) override {
- HandleLookup(key, stats);
- if (!target_) {
- return nullptr;
- }
- return target_->Lookup(key, helper, create_context, priority, stats);
- }
- void StartAsyncLookup(AsyncLookupHandle& async_handle) override {
- HandleLookup(async_handle.key, async_handle.stats);
- if (target_) {
- target_->StartAsyncLookup(async_handle);
- }
- }
- bool Ref(Handle* handle) override { return target_->Ref(handle); }
- using Cache::Release;
- bool Release(Handle* handle, bool erase_if_last_ref = false) override {
- return target_->Release(handle, erase_if_last_ref);
- }
- void Erase(const Slice& key) override {
- target_->Erase(key);
- key_only_cache_->Erase(key);
- }
- Cache::ObjectPtr Value(Handle* handle) override {
- return target_->Value(handle);
- }
- uint64_t NewId() override { return target_->NewId(); }
- size_t GetCapacity() const override { return target_->GetCapacity(); }
- bool HasStrictCapacityLimit() const override {
- return target_->HasStrictCapacityLimit();
- }
- size_t GetUsage() const override { return target_->GetUsage(); }
- size_t GetUsage(Handle* handle) const override {
- return target_->GetUsage(handle);
- }
- size_t GetCharge(Handle* handle) const override {
- return target_->GetCharge(handle);
- }
- const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override {
- return target_->GetCacheItemHelper(handle);
- }
- size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); }
- void DisownData() override {
- target_->DisownData();
- key_only_cache_->DisownData();
- }
- void ApplyToAllEntries(
- const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
- const CacheItemHelper* helper)>& callback,
- const ApplyToAllEntriesOptions& opts) override {
- target_->ApplyToAllEntries(callback, opts);
- }
- void EraseUnRefEntries() override {
- target_->EraseUnRefEntries();
- key_only_cache_->EraseUnRefEntries();
- }
- size_t GetSimCapacity() const override {
- return key_only_cache_->GetCapacity();
- }
- size_t GetSimUsage() const override { return key_only_cache_->GetUsage(); }
- void SetSimCapacity(size_t capacity) override {
- key_only_cache_->SetCapacity(capacity);
- }
- uint64_t get_miss_counter() const override {
- return miss_times_.load(std::memory_order_relaxed);
- }
- uint64_t get_hit_counter() const override {
- return hit_times_.load(std::memory_order_relaxed);
- }
- void reset_counter() override {
- miss_times_.store(0, std::memory_order_relaxed);
- hit_times_.store(0, std::memory_order_relaxed);
- SetTickerCount(stats_, SIM_BLOCK_CACHE_HIT, 0);
- SetTickerCount(stats_, SIM_BLOCK_CACHE_MISS, 0);
- }
- std::string ToString() const override {
- std::ostringstream oss;
- oss << "SimCache MISSes: " << get_miss_counter() << std::endl;
- oss << "SimCache HITs: " << get_hit_counter() << std::endl;
- auto lookups = get_miss_counter() + get_hit_counter();
- oss << "SimCache HITRATE: " << std::fixed << std::setprecision(2)
- << (lookups == 0 ? 0 : get_hit_counter() * 100.0f / lookups)
- << std::endl;
- return oss.str();
- }
- std::string GetPrintableOptions() const override {
- std::ostringstream oss;
- oss << " cache_options:" << std::endl;
- oss << target_->GetPrintableOptions();
- oss << " sim_cache_options:" << std::endl;
- oss << key_only_cache_->GetPrintableOptions();
- return oss.str();
- }
- Status StartActivityLogging(const std::string& activity_log_file, Env* env,
- uint64_t max_logging_size = 0) override {
- return cache_activity_logger_.StartLogging(activity_log_file, env,
- max_logging_size);
- }
- void StopActivityLogging() override { cache_activity_logger_.StopLogging(); }
- Status GetActivityLoggingStatus() override {
- return cache_activity_logger_.bg_status();
- }
- private:
- std::shared_ptr<Cache> key_only_cache_;
- std::atomic<uint64_t> miss_times_;
- std::atomic<uint64_t> hit_times_;
- Statistics* stats_;
- CacheActivityLogger cache_activity_logger_;
- void inc_miss_counter() {
- miss_times_.fetch_add(1, std::memory_order_relaxed);
- }
- void inc_hit_counter() { hit_times_.fetch_add(1, std::memory_order_relaxed); }
- void HandleLookup(const Slice& key, Statistics* stats) {
- Handle* h = key_only_cache_->Lookup(key);
- if (h != nullptr) {
- key_only_cache_->Release(h);
- inc_hit_counter();
- RecordTick(stats, SIM_BLOCK_CACHE_HIT);
- } else {
- inc_miss_counter();
- RecordTick(stats, SIM_BLOCK_CACHE_MISS);
- }
- cache_activity_logger_.ReportLookup(key);
- }
- };
- } // end anonymous namespace
- // For instrumentation purpose, use NewSimCache instead
- std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> cache,
- size_t sim_capacity, int num_shard_bits) {
- LRUCacheOptions co;
- co.capacity = sim_capacity;
- co.num_shard_bits = num_shard_bits;
- co.metadata_charge_policy = kDontChargeCacheMetadata;
- return NewSimCache(NewLRUCache(co), cache, num_shard_bits);
- }
- std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> sim_cache,
- std::shared_ptr<Cache> cache,
- int num_shard_bits) {
- if (num_shard_bits >= 20) {
- return nullptr; // the cache cannot be sharded into too many fine pieces
- }
- return std::make_shared<SimCacheImpl>(sim_cache, cache);
- }
- } // namespace ROCKSDB_NAMESPACE
|