concurrent_arena.h 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <memory>
  12. #include <utility>
  13. #include "memory/allocator.h"
  14. #include "memory/arena.h"
  15. #include "port/lang.h"
  16. #include "port/likely.h"
  17. #include "util/core_local.h"
  18. #include "util/mutexlock.h"
  19. #include "util/thread_local.h"
  20. // Only generate field unused warning for padding array, or build under
  21. // GCC 4.8.1 will fail.
  22. #ifdef __clang__
  23. #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
  24. #else
  25. #define ROCKSDB_FIELD_UNUSED
  26. #endif // __clang__
  27. namespace ROCKSDB_NAMESPACE {
  28. class Logger;
  29. // ConcurrentArena wraps an Arena. It makes it thread safe using a fast
  30. // inlined spinlock, and adds small per-core allocation caches to avoid
  31. // contention for small allocations. To avoid any memory waste from the
  32. // per-core shards, they are kept small, they are lazily instantiated
  33. // only if ConcurrentArena actually notices concurrent use, and they
  34. // adjust their size so that there is no fragmentation waste when the
  35. // shard blocks are allocated from the underlying main arena.
  36. class ConcurrentArena : public Allocator {
  37. public:
  38. // block_size and huge_page_size are the same as for Arena (and are
  39. // in fact just passed to the constructor of arena_. The core-local
  40. // shards compute their shard_block_size as a fraction of block_size
  41. // that varies according to the hardware concurrency level.
  42. explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
  43. AllocTracker* tracker = nullptr,
  44. size_t huge_page_size = 0);
  45. char* Allocate(size_t bytes) override {
  46. return AllocateImpl(bytes, false /*force_arena*/,
  47. [this, bytes]() { return arena_.Allocate(bytes); });
  48. }
  49. char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
  50. Logger* logger = nullptr) override {
  51. size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
  52. assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
  53. (rounded_up % sizeof(void*)) == 0);
  54. return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/,
  55. [this, rounded_up, huge_page_size, logger]() {
  56. return arena_.AllocateAligned(rounded_up,
  57. huge_page_size, logger);
  58. });
  59. }
  60. size_t ApproximateMemoryUsage() const {
  61. std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
  62. lock.lock();
  63. return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
  64. }
  65. size_t MemoryAllocatedBytes() const {
  66. return memory_allocated_bytes_.load(std::memory_order_relaxed);
  67. }
  68. size_t AllocatedAndUnused() const {
  69. return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
  70. ShardAllocatedAndUnused();
  71. }
  72. size_t IrregularBlockNum() const {
  73. return irregular_block_num_.load(std::memory_order_relaxed);
  74. }
  75. size_t BlockSize() const override { return arena_.BlockSize(); }
  76. private:
  77. struct Shard {
  78. char padding[40] ROCKSDB_FIELD_UNUSED;
  79. mutable SpinMutex mutex;
  80. char* free_begin_;
  81. std::atomic<size_t> allocated_and_unused_;
  82. Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
  83. };
  84. static thread_local size_t tls_cpuid;
  85. char padding0[56] ROCKSDB_FIELD_UNUSED;
  86. size_t shard_block_size_;
  87. CoreLocalArray<Shard> shards_;
  88. Arena arena_;
  89. mutable SpinMutex arena_mutex_;
  90. std::atomic<size_t> arena_allocated_and_unused_;
  91. std::atomic<size_t> memory_allocated_bytes_;
  92. std::atomic<size_t> irregular_block_num_;
  93. char padding1[56] ROCKSDB_FIELD_UNUSED;
  94. Shard* Repick();
  95. size_t ShardAllocatedAndUnused() const {
  96. size_t total = 0;
  97. for (size_t i = 0; i < shards_.Size(); ++i) {
  98. total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
  99. std::memory_order_relaxed);
  100. }
  101. return total;
  102. }
  103. template <typename Func>
  104. char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
  105. size_t cpu;
  106. // Go directly to the arena if the allocation is too large, or if
  107. // we've never needed to Repick() and the arena mutex is available
  108. // with no waiting. This keeps the fragmentation penalty of
  109. // concurrency zero unless it might actually confer an advantage.
  110. std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
  111. if (bytes > shard_block_size_ / 4 || force_arena ||
  112. ((cpu = tls_cpuid) == 0 &&
  113. !shards_.AccessAtCore(0)->allocated_and_unused_.load(
  114. std::memory_order_relaxed) &&
  115. arena_lock.try_lock())) {
  116. if (!arena_lock.owns_lock()) {
  117. arena_lock.lock();
  118. }
  119. auto rv = func();
  120. Fixup();
  121. return rv;
  122. }
  123. // pick a shard from which to allocate
  124. Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
  125. if (!s->mutex.try_lock()) {
  126. s = Repick();
  127. s->mutex.lock();
  128. }
  129. std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
  130. size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
  131. if (avail < bytes) {
  132. // reload
  133. std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
  134. // If the arena's current block is within a factor of 2 of the right
  135. // size, we adjust our request to avoid arena waste.
  136. auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
  137. assert(exact == arena_.AllocatedAndUnused());
  138. if (exact >= bytes && arena_.IsInInlineBlock()) {
  139. // If we haven't exhausted arena's inline block yet, allocate from arena
  140. // directly. This ensures that we'll do the first few small allocations
  141. // without allocating any blocks.
  142. // In particular this prevents empty memtables from using
  143. // disproportionately large amount of memory: a memtable allocates on
  144. // the order of 1 KB of memory when created; we wouldn't want to
  145. // allocate a full arena block (typically a few megabytes) for that,
  146. // especially if there are thousands of empty memtables.
  147. auto rv = func();
  148. Fixup();
  149. return rv;
  150. }
  151. avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
  152. ? exact
  153. : shard_block_size_;
  154. s->free_begin_ = arena_.AllocateAligned(avail);
  155. Fixup();
  156. }
  157. s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
  158. char* rv;
  159. if ((bytes % sizeof(void*)) == 0) {
  160. // aligned allocation from the beginning
  161. rv = s->free_begin_;
  162. s->free_begin_ += bytes;
  163. } else {
  164. // unaligned from the end
  165. rv = s->free_begin_ + avail - bytes;
  166. }
  167. return rv;
  168. }
  169. void Fixup() {
  170. arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
  171. std::memory_order_relaxed);
  172. memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
  173. std::memory_order_relaxed);
  174. irregular_block_num_.store(arena_.IrregularBlockNum(),
  175. std::memory_order_relaxed);
  176. }
  177. ConcurrentArena(const ConcurrentArena&) = delete;
  178. ConcurrentArena& operator=(const ConcurrentArena&) = delete;
  179. };
  180. } // namespace ROCKSDB_NAMESPACE