sharded_cache.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <cstdint>
  12. #include <string>
  13. #include "port/lang.h"
  14. #include "port/port.h"
  15. #include "rocksdb/advanced_cache.h"
  16. #include "util/hash.h"
  17. #include "util/mutexlock.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. // Optional base class for classes implementing the CacheShard concept
  20. class CacheShardBase {
  21. public:
  22. explicit CacheShardBase(CacheMetadataChargePolicy metadata_charge_policy)
  23. : metadata_charge_policy_(metadata_charge_policy) {}
  24. using DeleterFn = Cache::DeleterFn;
  25. // Expected by concept CacheShard (TODO with C++20 support)
  26. // Some Defaults
  27. std::string GetPrintableOptions() const { return ""; }
  28. using HashVal = uint64_t;
  29. using HashCref = uint64_t;
  30. static inline HashVal ComputeHash(const Slice& key, uint32_t seed) {
  31. return GetSliceNPHash64(key, seed);
  32. }
  33. static inline uint32_t HashPieceForSharding(HashCref hash) {
  34. return Lower32of64(hash);
  35. }
  36. void AppendPrintableOptions(std::string& /*str*/) const {}
  37. // Must be provided for concept CacheShard (TODO with C++20 support)
  38. /*
  39. struct HandleImpl { // for concept HandleImpl
  40. HashVal hash;
  41. HashCref GetHash() const;
  42. ...
  43. };
  44. Status Insert(const Slice& key, HashCref hash, Cache::ObjectPtr value,
  45. const Cache::CacheItemHelper* helper, size_t charge,
  46. HandleImpl** handle, Cache::Priority priority,
  47. bool standalone) = 0;
  48. Handle* CreateStandalone(const Slice& key, HashCref hash, ObjectPtr obj,
  49. const CacheItemHelper* helper,
  50. size_t charge, bool allow_uncharged) = 0;
  51. HandleImpl* Lookup(const Slice& key, HashCref hash,
  52. const Cache::CacheItemHelper* helper,
  53. Cache::CreateContext* create_context,
  54. Cache::Priority priority,
  55. Statistics* stats) = 0;
  56. bool Release(HandleImpl* handle, bool useful, bool erase_if_last_ref) = 0;
  57. bool Ref(HandleImpl* handle) = 0;
  58. void Erase(const Slice& key, HashCref hash) = 0;
  59. void SetCapacity(size_t capacity) = 0;
  60. void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
  61. size_t GetUsage() const = 0;
  62. size_t GetPinnedUsage() const = 0;
  63. size_t GetOccupancyCount() const = 0;
  64. size_t GetTableAddressCount() const = 0;
  65. // Handles iterating over roughly `average_entries_per_lock` entries, using
  66. // `state` to somehow record where it last ended up. Caller initially uses
  67. // *state == 0 and implementation sets *state = SIZE_MAX to indicate
  68. // completion.
  69. void ApplyToSomeEntries(
  70. const std::function<void(const Slice& key, ObjectPtr value,
  71. size_t charge,
  72. const Cache::CacheItemHelper* helper)>& callback,
  73. size_t average_entries_per_lock, size_t* state) = 0;
  74. void EraseUnRefEntries() = 0;
  75. */
  76. protected:
  77. const CacheMetadataChargePolicy metadata_charge_policy_;
  78. };
  79. // Portions of ShardedCache that do not depend on the template parameter
  80. class ShardedCacheBase : public Cache {
  81. public:
  82. explicit ShardedCacheBase(const ShardedCacheOptions& opts);
  83. virtual ~ShardedCacheBase() = default;
  84. int GetNumShardBits() const;
  85. uint32_t GetNumShards() const;
  86. uint64_t NewId() override;
  87. bool HasStrictCapacityLimit() const override;
  88. size_t GetCapacity() const override;
  89. Status GetSecondaryCacheCapacity(size_t& size) const override;
  90. Status GetSecondaryCachePinnedUsage(size_t& size) const override;
  91. using Cache::GetUsage;
  92. size_t GetUsage(Handle* handle) const override;
  93. std::string GetPrintableOptions() const override;
  94. uint32_t GetHashSeed() const override { return hash_seed_; }
  95. protected: // fns
  96. virtual void AppendPrintableOptions(std::string& str) const = 0;
  97. size_t GetPerShardCapacity() const;
  98. size_t ComputePerShardCapacity(size_t capacity) const;
  99. protected: // data
  100. std::atomic<uint64_t> last_id_; // For NewId
  101. const uint32_t shard_mask_;
  102. const uint32_t hash_seed_;
  103. // Dynamic configuration parameters, guarded by config_mutex_
  104. bool strict_capacity_limit_;
  105. size_t capacity_;
  106. mutable port::Mutex config_mutex_;
  107. };
  108. // Generic cache interface that shards cache by hash of keys. 2^num_shard_bits
  109. // shards will be created, with capacity split evenly to each of the shards.
  110. // Keys are typically sharded by the lowest num_shard_bits bits of hash value
  111. // so that the upper bits of the hash value can keep a stable ordering of
  112. // table entries even as the table grows (using more upper hash bits).
  113. // See CacheShardBase above for what is expected of the CacheShard parameter.
  114. template <class CacheShard>
  115. class ShardedCache : public ShardedCacheBase {
  116. public:
  117. using HashVal = typename CacheShard::HashVal;
  118. using HashCref = typename CacheShard::HashCref;
  119. using HandleImpl = typename CacheShard::HandleImpl;
  120. explicit ShardedCache(const ShardedCacheOptions& opts)
  121. : ShardedCacheBase(opts),
  122. shards_(static_cast<CacheShard*>(port::cacheline_aligned_alloc(
  123. sizeof(CacheShard) * GetNumShards()))),
  124. destroy_shards_in_dtor_(false) {}
  125. virtual ~ShardedCache() {
  126. if (destroy_shards_in_dtor_) {
  127. ForEachShard([](CacheShard* cs) { cs->~CacheShard(); });
  128. }
  129. port::cacheline_aligned_free(shards_);
  130. }
  131. CacheShard& GetShard(HashCref hash) {
  132. return shards_[CacheShard::HashPieceForSharding(hash) & shard_mask_];
  133. }
  134. const CacheShard& GetShard(HashCref hash) const {
  135. return shards_[CacheShard::HashPieceForSharding(hash) & shard_mask_];
  136. }
  137. void SetCapacity(size_t capacity) override {
  138. MutexLock l(&config_mutex_);
  139. capacity_ = capacity;
  140. auto per_shard = ComputePerShardCapacity(capacity);
  141. ForEachShard([=](CacheShard* cs) { cs->SetCapacity(per_shard); });
  142. }
  143. void SetStrictCapacityLimit(bool s_c_l) override {
  144. MutexLock l(&config_mutex_);
  145. strict_capacity_limit_ = s_c_l;
  146. ForEachShard(
  147. [s_c_l](CacheShard* cs) { cs->SetStrictCapacityLimit(s_c_l); });
  148. }
  149. Status Insert(
  150. const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
  151. size_t charge, Handle** handle = nullptr,
  152. Priority priority = Priority::LOW,
  153. const Slice& /*compressed_value*/ = Slice(),
  154. CompressionType /*type*/ = CompressionType::kNoCompression) override {
  155. assert(helper);
  156. HashVal hash = CacheShard::ComputeHash(key, hash_seed_);
  157. auto h_out = reinterpret_cast<HandleImpl**>(handle);
  158. return GetShard(hash).Insert(key, hash, obj, helper, charge, h_out,
  159. priority);
  160. }
  161. Handle* CreateStandalone(const Slice& key, ObjectPtr obj,
  162. const CacheItemHelper* helper, size_t charge,
  163. bool allow_uncharged) override {
  164. assert(helper);
  165. HashVal hash = CacheShard::ComputeHash(key, hash_seed_);
  166. HandleImpl* result = GetShard(hash).CreateStandalone(
  167. key, hash, obj, helper, charge, allow_uncharged);
  168. return static_cast<Handle*>(result);
  169. }
  170. Handle* Lookup(const Slice& key, const CacheItemHelper* helper = nullptr,
  171. CreateContext* create_context = nullptr,
  172. Priority priority = Priority::LOW,
  173. Statistics* stats = nullptr) override {
  174. HashVal hash = CacheShard::ComputeHash(key, hash_seed_);
  175. HandleImpl* result = GetShard(hash).Lookup(key, hash, helper,
  176. create_context, priority, stats);
  177. return static_cast<Handle*>(result);
  178. }
  179. void Erase(const Slice& key) override {
  180. HashVal hash = CacheShard::ComputeHash(key, hash_seed_);
  181. GetShard(hash).Erase(key, hash);
  182. }
  183. bool Release(Handle* handle, bool useful,
  184. bool erase_if_last_ref = false) override {
  185. auto h = static_cast<HandleImpl*>(handle);
  186. return GetShard(h->GetHash()).Release(h, useful, erase_if_last_ref);
  187. }
  188. bool Ref(Handle* handle) override {
  189. auto h = static_cast<HandleImpl*>(handle);
  190. return GetShard(h->GetHash()).Ref(h);
  191. }
  192. bool Release(Handle* handle, bool erase_if_last_ref = false) override {
  193. return Release(handle, true /*useful*/, erase_if_last_ref);
  194. }
  195. using ShardedCacheBase::GetUsage;
  196. size_t GetUsage() const override {
  197. return SumOverShards2(&CacheShard::GetUsage);
  198. }
  199. size_t GetPinnedUsage() const override {
  200. return SumOverShards2(&CacheShard::GetPinnedUsage);
  201. }
  202. size_t GetOccupancyCount() const override {
  203. return SumOverShards2(&CacheShard::GetOccupancyCount);
  204. }
  205. size_t GetTableAddressCount() const override {
  206. return SumOverShards2(&CacheShard::GetTableAddressCount);
  207. }
  208. void ApplyToAllEntries(
  209. const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
  210. const CacheItemHelper* helper)>& callback,
  211. const ApplyToAllEntriesOptions& opts) override {
  212. uint32_t num_shards = GetNumShards();
  213. // Iterate over part of each shard, rotating between shards, to
  214. // minimize impact on latency of concurrent operations.
  215. std::unique_ptr<size_t[]> states(new size_t[num_shards]{});
  216. size_t aepl = opts.average_entries_per_lock;
  217. aepl = std::min(aepl, size_t{1});
  218. bool remaining_work;
  219. do {
  220. remaining_work = false;
  221. for (uint32_t i = 0; i < num_shards; i++) {
  222. if (states[i] != SIZE_MAX) {
  223. shards_[i].ApplyToSomeEntries(callback, aepl, &states[i]);
  224. remaining_work |= states[i] != SIZE_MAX;
  225. }
  226. }
  227. } while (remaining_work);
  228. }
  229. void EraseUnRefEntries() override {
  230. ForEachShard([](CacheShard* cs) { cs->EraseUnRefEntries(); });
  231. }
  232. void DisownData() override {
  233. // Leak data only if that won't generate an ASAN/valgrind warning.
  234. if (!kMustFreeHeapAllocations) {
  235. destroy_shards_in_dtor_ = false;
  236. }
  237. }
  238. protected:
  239. inline void ForEachShard(const std::function<void(CacheShard*)>& fn) {
  240. uint32_t num_shards = GetNumShards();
  241. for (uint32_t i = 0; i < num_shards; i++) {
  242. fn(shards_ + i);
  243. }
  244. }
  245. inline void ForEachShard(
  246. const std::function<void(const CacheShard*)>& fn) const {
  247. uint32_t num_shards = GetNumShards();
  248. for (uint32_t i = 0; i < num_shards; i++) {
  249. fn(shards_ + i);
  250. }
  251. }
  252. inline size_t SumOverShards(
  253. const std::function<size_t(CacheShard&)>& fn) const {
  254. uint32_t num_shards = GetNumShards();
  255. size_t result = 0;
  256. for (uint32_t i = 0; i < num_shards; i++) {
  257. result += fn(shards_[i]);
  258. }
  259. return result;
  260. }
  261. inline size_t SumOverShards2(size_t (CacheShard::*fn)() const) const {
  262. return SumOverShards([fn](CacheShard& cs) { return (cs.*fn)(); });
  263. }
  264. // Must be called exactly once by derived class constructor
  265. void InitShards(const std::function<void(CacheShard*)>& placement_new) {
  266. ForEachShard(placement_new);
  267. destroy_shards_in_dtor_ = true;
  268. }
  269. void AppendPrintableOptions(std::string& str) const override {
  270. shards_[0].AppendPrintableOptions(str);
  271. }
  272. private:
  273. CacheShard* const shards_;
  274. bool destroy_shards_in_dtor_;
  275. };
  276. // 512KB is traditional minimum shard size.
  277. int GetDefaultCacheShardBits(size_t capacity,
  278. size_t min_shard_size = 512U * 1024U);
  279. } // namespace ROCKSDB_NAMESPACE