block_cache_tier_file_buffer.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <list>
  7. #include <memory>
  8. #include <string>
  9. #include "rocksdb/comparator.h"
  10. #include "memory/arena.h"
  11. #include "util/mutexlock.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. //
  14. // CacheWriteBuffer
  15. //
  16. // Buffer abstraction that can be manipulated via append
  17. // (not thread safe)
  18. class CacheWriteBuffer {
  19. public:
  20. explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) {
  21. buf_.reset(new char[size_]);
  22. assert(!pos_);
  23. assert(size_);
  24. }
  25. virtual ~CacheWriteBuffer() {}
  26. void Append(const char* buf, const size_t size) {
  27. assert(pos_ + size <= size_);
  28. memcpy(buf_.get() + pos_, buf, size);
  29. pos_ += size;
  30. assert(pos_ <= size_);
  31. }
  32. void FillTrailingZeros() {
  33. assert(pos_ <= size_);
  34. memset(buf_.get() + pos_, '0', size_ - pos_);
  35. pos_ = size_;
  36. }
  37. void Reset() { pos_ = 0; }
  38. size_t Free() const { return size_ - pos_; }
  39. size_t Capacity() const { return size_; }
  40. size_t Used() const { return pos_; }
  41. char* Data() const { return buf_.get(); }
  42. private:
  43. std::unique_ptr<char[]> buf_;
  44. const size_t size_;
  45. size_t pos_;
  46. };
  47. //
  48. // CacheWriteBufferAllocator
  49. //
  50. // Buffer pool abstraction(not thread safe)
  51. //
  52. class CacheWriteBufferAllocator {
  53. public:
  54. explicit CacheWriteBufferAllocator(const size_t buffer_size,
  55. const size_t buffer_count)
  56. : cond_empty_(&lock_), buffer_size_(buffer_size) {
  57. MutexLock _(&lock_);
  58. buffer_size_ = buffer_size;
  59. for (uint32_t i = 0; i < buffer_count; i++) {
  60. auto* buf = new CacheWriteBuffer(buffer_size_);
  61. assert(buf);
  62. if (buf) {
  63. bufs_.push_back(buf);
  64. cond_empty_.Signal();
  65. }
  66. }
  67. }
  68. virtual ~CacheWriteBufferAllocator() {
  69. MutexLock _(&lock_);
  70. assert(bufs_.size() * buffer_size_ == Capacity());
  71. for (auto* buf : bufs_) {
  72. delete buf;
  73. }
  74. bufs_.clear();
  75. }
  76. CacheWriteBuffer* Allocate() {
  77. MutexLock _(&lock_);
  78. if (bufs_.empty()) {
  79. return nullptr;
  80. }
  81. assert(!bufs_.empty());
  82. CacheWriteBuffer* const buf = bufs_.front();
  83. bufs_.pop_front();
  84. return buf;
  85. }
  86. void Deallocate(CacheWriteBuffer* const buf) {
  87. assert(buf);
  88. MutexLock _(&lock_);
  89. buf->Reset();
  90. bufs_.push_back(buf);
  91. cond_empty_.Signal();
  92. }
  93. void WaitUntilUsable() {
  94. // We are asked to wait till we have buffers available
  95. MutexLock _(&lock_);
  96. while (bufs_.empty()) {
  97. cond_empty_.Wait();
  98. }
  99. }
  100. size_t Capacity() const { return bufs_.size() * buffer_size_; }
  101. size_t Free() const { return bufs_.size() * buffer_size_; }
  102. size_t BufferSize() const { return buffer_size_; }
  103. private:
  104. port::Mutex lock_; // Sync lock
  105. port::CondVar cond_empty_; // Condition var for empty buffers
  106. size_t buffer_size_; // Size of each buffer
  107. std::list<CacheWriteBuffer*> bufs_; // Buffer stash
  108. };
  109. } // namespace ROCKSDB_NAMESPACE