concurrent_arena.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <memory>
  12. #include <utility>
  13. #include "memory/allocator.h"
  14. #include "memory/arena.h"
  15. #include "port/likely.h"
  16. #include "util/core_local.h"
  17. #include "util/mutexlock.h"
  18. #include "util/thread_local.h"
  19. // Only generate field unused warning for padding array, or build under
  20. // GCC 4.8.1 will fail.
  21. #ifdef __clang__
  22. #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
  23. #else
  24. #define ROCKSDB_FIELD_UNUSED
  25. #endif // __clang__
  26. namespace ROCKSDB_NAMESPACE {
  27. class Logger;
  28. // ConcurrentArena wraps an Arena. It makes it thread safe using a fast
  29. // inlined spinlock, and adds small per-core allocation caches to avoid
  30. // contention for small allocations. To avoid any memory waste from the
  31. // per-core shards, they are kept small, they are lazily instantiated
  32. // only if ConcurrentArena actually notices concurrent use, and they
  33. // adjust their size so that there is no fragmentation waste when the
  34. // shard blocks are allocated from the underlying main arena.
  35. class ConcurrentArena : public Allocator {
  36. public:
  37. // block_size and huge_page_size are the same as for Arena (and are
  38. // in fact just passed to the constructor of arena_. The core-local
  39. // shards compute their shard_block_size as a fraction of block_size
  40. // that varies according to the hardware concurrency level.
  41. explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
  42. AllocTracker* tracker = nullptr,
  43. size_t huge_page_size = 0);
  44. char* Allocate(size_t bytes) override {
  45. return AllocateImpl(bytes, false /*force_arena*/,
  46. [=]() { return arena_.Allocate(bytes); });
  47. }
  48. char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
  49. Logger* logger = nullptr) override {
  50. size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
  51. assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
  52. (rounded_up % sizeof(void*)) == 0);
  53. return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
  54. return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
  55. });
  56. }
  57. size_t ApproximateMemoryUsage() const {
  58. std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
  59. lock.lock();
  60. return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
  61. }
  62. size_t MemoryAllocatedBytes() const {
  63. return memory_allocated_bytes_.load(std::memory_order_relaxed);
  64. }
  65. size_t AllocatedAndUnused() const {
  66. return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
  67. ShardAllocatedAndUnused();
  68. }
  69. size_t IrregularBlockNum() const {
  70. return irregular_block_num_.load(std::memory_order_relaxed);
  71. }
  72. size_t BlockSize() const override { return arena_.BlockSize(); }
  73. private:
  74. struct Shard {
  75. char padding[40] ROCKSDB_FIELD_UNUSED;
  76. mutable SpinMutex mutex;
  77. char* free_begin_;
  78. std::atomic<size_t> allocated_and_unused_;
  79. Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
  80. };
  81. #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
  82. static __thread size_t tls_cpuid;
  83. #else
  84. enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
  85. #endif
  86. char padding0[56] ROCKSDB_FIELD_UNUSED;
  87. size_t shard_block_size_;
  88. CoreLocalArray<Shard> shards_;
  89. Arena arena_;
  90. mutable SpinMutex arena_mutex_;
  91. std::atomic<size_t> arena_allocated_and_unused_;
  92. std::atomic<size_t> memory_allocated_bytes_;
  93. std::atomic<size_t> irregular_block_num_;
  94. char padding1[56] ROCKSDB_FIELD_UNUSED;
  95. Shard* Repick();
  96. size_t ShardAllocatedAndUnused() const {
  97. size_t total = 0;
  98. for (size_t i = 0; i < shards_.Size(); ++i) {
  99. total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
  100. std::memory_order_relaxed);
  101. }
  102. return total;
  103. }
  104. template <typename Func>
  105. char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
  106. size_t cpu;
  107. // Go directly to the arena if the allocation is too large, or if
  108. // we've never needed to Repick() and the arena mutex is available
  109. // with no waiting. This keeps the fragmentation penalty of
  110. // concurrency zero unless it might actually confer an advantage.
  111. std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
  112. if (bytes > shard_block_size_ / 4 || force_arena ||
  113. ((cpu = tls_cpuid) == 0 &&
  114. !shards_.AccessAtCore(0)->allocated_and_unused_.load(
  115. std::memory_order_relaxed) &&
  116. arena_lock.try_lock())) {
  117. if (!arena_lock.owns_lock()) {
  118. arena_lock.lock();
  119. }
  120. auto rv = func();
  121. Fixup();
  122. return rv;
  123. }
  124. // pick a shard from which to allocate
  125. Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
  126. if (!s->mutex.try_lock()) {
  127. s = Repick();
  128. s->mutex.lock();
  129. }
  130. std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
  131. size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
  132. if (avail < bytes) {
  133. // reload
  134. std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
  135. // If the arena's current block is within a factor of 2 of the right
  136. // size, we adjust our request to avoid arena waste.
  137. auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
  138. assert(exact == arena_.AllocatedAndUnused());
  139. if (exact >= bytes && arena_.IsInInlineBlock()) {
  140. // If we haven't exhausted arena's inline block yet, allocate from arena
  141. // directly. This ensures that we'll do the first few small allocations
  142. // without allocating any blocks.
  143. // In particular this prevents empty memtables from using
  144. // disproportionately large amount of memory: a memtable allocates on
  145. // the order of 1 KB of memory when created; we wouldn't want to
  146. // allocate a full arena block (typically a few megabytes) for that,
  147. // especially if there are thousands of empty memtables.
  148. auto rv = func();
  149. Fixup();
  150. return rv;
  151. }
  152. avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
  153. ? exact
  154. : shard_block_size_;
  155. s->free_begin_ = arena_.AllocateAligned(avail);
  156. Fixup();
  157. }
  158. s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
  159. char* rv;
  160. if ((bytes % sizeof(void*)) == 0) {
  161. // aligned allocation from the beginning
  162. rv = s->free_begin_;
  163. s->free_begin_ += bytes;
  164. } else {
  165. // unaligned from the end
  166. rv = s->free_begin_ + avail - bytes;
  167. }
  168. return rv;
  169. }
  170. void Fixup() {
  171. arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
  172. std::memory_order_relaxed);
  173. memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
  174. std::memory_order_relaxed);
  175. irregular_block_num_.store(arena_.IrregularBlockNum(),
  176. std::memory_order_relaxed);
  177. }
  178. ConcurrentArena(const ConcurrentArena&) = delete;
  179. ConcurrentArena& operator=(const ConcurrentArena&) = delete;
  180. };
  181. } // namespace ROCKSDB_NAMESPACE