vectorrep.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include <algorithm>
  7. #include <memory>
  8. #include <set>
  9. #include <type_traits>
  10. #include <unordered_set>
  11. #include "db/memtable.h"
  12. #include "memory/arena.h"
  13. #include "memtable/stl_wrappers.h"
  14. #include "port/port.h"
  15. #include "rocksdb/memtablerep.h"
  16. #include "rocksdb/utilities/options_type.h"
  17. #include "util/mutexlock.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. namespace {
  20. class VectorRep : public MemTableRep {
  21. public:
  22. VectorRep(const KeyComparator& compare, Allocator* allocator, size_t count);
  23. // Insert key into the collection. (The caller will pack key and value into a
  24. // single buffer and pass that in as the parameter to Insert)
  25. // REQUIRES: nothing that compares equal to key is currently in the
  26. // collection.
  27. void Insert(KeyHandle handle) override;
  28. void InsertConcurrently(KeyHandle handle) override;
  29. // Returns true iff an entry that compares equal to key is in the collection.
  30. bool Contains(const char* key) const override;
  31. void MarkReadOnly() override;
  32. size_t ApproximateMemoryUsage() override;
  33. void Get(const LookupKey& k, void* callback_args,
  34. bool (*callback_func)(void* arg, const char* entry)) override;
  35. void BatchPostProcess() override;
  36. ~VectorRep() override = default;
  37. class Iterator : public MemTableRep::Iterator {
  38. class VectorRep* vrep_;
  39. std::shared_ptr<std::vector<const char*>> bucket_;
  40. std::vector<const char*>::const_iterator mutable cit_;
  41. const KeyComparator& compare_;
  42. std::string tmp_; // For passing to EncodeKey
  43. bool mutable sorted_;
  44. void DoSort() const;
  45. public:
  46. explicit Iterator(class VectorRep* vrep,
  47. std::shared_ptr<std::vector<const char*>> bucket,
  48. const KeyComparator& compare);
  49. // Initialize an iterator over the specified collection.
  50. // The returned iterator is not valid.
  51. // explicit Iterator(const MemTableRep* collection);
  52. ~Iterator() override = default;
  53. // Returns true iff the iterator is positioned at a valid node.
  54. bool Valid() const override;
  55. // Returns the key at the current position.
  56. // REQUIRES: Valid()
  57. const char* key() const override;
  58. // Advances to the next position.
  59. // REQUIRES: Valid()
  60. void Next() override;
  61. // Advances to the previous position.
  62. // REQUIRES: Valid()
  63. void Prev() override;
  64. // Advance to the first entry with a key >= target
  65. void Seek(const Slice& user_key, const char* memtable_key) override;
  66. // Seek and do some memory validation
  67. Status SeekAndValidate(const Slice& internal_key, const char* memtable_key,
  68. bool allow_data_in_errors,
  69. bool detect_key_out_of_order,
  70. const std::function<Status(const char*, bool)>&
  71. key_validation_callback) override;
  72. // Advance to the first entry with a key <= target
  73. void SeekForPrev(const Slice& user_key, const char* memtable_key) override;
  74. // Position at the first entry in collection.
  75. // Final state of iterator is Valid() iff collection is not empty.
  76. void SeekToFirst() override;
  77. // Position at the last entry in collection.
  78. // Final state of iterator is Valid() iff collection is not empty.
  79. void SeekToLast() override;
  80. };
  81. // Return an iterator over the keys in this representation.
  82. MemTableRep::Iterator* GetIterator(Arena* arena) override;
  83. private:
  84. friend class Iterator;
  85. ALIGN_AS(CACHE_LINE_SIZE) RelaxedAtomic<size_t> bucket_size_;
  86. using Bucket = std::vector<const char*>;
  87. std::shared_ptr<Bucket> bucket_;
  88. mutable port::RWMutex rwlock_;
  89. bool immutable_;
  90. bool sorted_;
  91. const KeyComparator& compare_;
  92. // Thread-local vector to buffer concurrent writes.
  93. using TlBucket = std::vector<const char*>;
  94. ThreadLocalPtr tl_writes_;
  95. static void DeleteTlBucket(void* ptr) {
  96. auto* v = static_cast<TlBucket*>(ptr);
  97. delete v;
  98. }
  99. };
  100. void VectorRep::Insert(KeyHandle handle) {
  101. auto* key = static_cast<char*>(handle);
  102. {
  103. WriteLock l(&rwlock_);
  104. assert(!immutable_);
  105. bucket_->push_back(key);
  106. }
  107. bucket_size_.FetchAddRelaxed(1);
  108. }
  109. void VectorRep::InsertConcurrently(KeyHandle handle) {
  110. auto* v = static_cast<TlBucket*>(tl_writes_.Get());
  111. if (!v) {
  112. v = new TlBucket();
  113. tl_writes_.Reset(v);
  114. }
  115. v->push_back(static_cast<char*>(handle));
  116. }
  117. // Returns true iff an entry that compares equal to key is in the collection.
  118. bool VectorRep::Contains(const char* key) const {
  119. ReadLock l(&rwlock_);
  120. return std::find(bucket_->begin(), bucket_->end(), key) != bucket_->end();
  121. }
  122. void VectorRep::MarkReadOnly() {
  123. WriteLock l(&rwlock_);
  124. immutable_ = true;
  125. }
  126. size_t VectorRep::ApproximateMemoryUsage() {
  127. return bucket_size_.LoadRelaxed() *
  128. sizeof(std::remove_reference<decltype(*bucket_)>::type::value_type);
  129. }
  130. void VectorRep::BatchPostProcess() {
  131. auto* v = static_cast<TlBucket*>(tl_writes_.Get());
  132. if (v) {
  133. {
  134. WriteLock l(&rwlock_);
  135. assert(!immutable_);
  136. for (auto& key : *v) {
  137. bucket_->push_back(key);
  138. }
  139. }
  140. bucket_size_.FetchAddRelaxed(v->size());
  141. delete v;
  142. tl_writes_.Reset(nullptr);
  143. }
  144. }
  145. VectorRep::VectorRep(const KeyComparator& compare, Allocator* allocator,
  146. size_t count)
  147. : MemTableRep(allocator),
  148. bucket_size_(0),
  149. bucket_(new Bucket()),
  150. immutable_(false),
  151. sorted_(false),
  152. compare_(compare),
  153. tl_writes_(DeleteTlBucket) {
  154. bucket_.get()->reserve(count);
  155. }
  156. VectorRep::Iterator::Iterator(class VectorRep* vrep,
  157. std::shared_ptr<std::vector<const char*>> bucket,
  158. const KeyComparator& compare)
  159. : vrep_(vrep),
  160. bucket_(bucket),
  161. cit_(bucket_->end()),
  162. compare_(compare),
  163. sorted_(false) {}
  164. void VectorRep::Iterator::DoSort() const {
  165. // vrep is non-null means that we are working on an immutable memtable
  166. if (!sorted_ && vrep_ != nullptr) {
  167. WriteLock l(&vrep_->rwlock_);
  168. if (!vrep_->sorted_) {
  169. std::sort(bucket_->begin(), bucket_->end(),
  170. stl_wrappers::Compare(compare_));
  171. cit_ = bucket_->begin();
  172. vrep_->sorted_ = true;
  173. }
  174. sorted_ = true;
  175. }
  176. if (!sorted_) {
  177. std::sort(bucket_->begin(), bucket_->end(),
  178. stl_wrappers::Compare(compare_));
  179. cit_ = bucket_->begin();
  180. sorted_ = true;
  181. }
  182. assert(sorted_);
  183. assert(vrep_ == nullptr || vrep_->sorted_);
  184. }
  185. // Returns true iff the iterator is positioned at a valid node.
  186. bool VectorRep::Iterator::Valid() const {
  187. DoSort();
  188. return cit_ != bucket_->end();
  189. }
  190. // Returns the key at the current position.
  191. // REQUIRES: Valid()
  192. const char* VectorRep::Iterator::key() const {
  193. assert(sorted_);
  194. return *cit_;
  195. }
  196. // Advances to the next position.
  197. // REQUIRES: Valid()
  198. void VectorRep::Iterator::Next() {
  199. assert(sorted_);
  200. if (cit_ == bucket_->end()) {
  201. return;
  202. }
  203. ++cit_;
  204. }
  205. // Advances to the previous position.
  206. // REQUIRES: Valid()
  207. void VectorRep::Iterator::Prev() {
  208. assert(sorted_);
  209. if (cit_ == bucket_->begin()) {
  210. // If you try to go back from the first element, the iterator should be
  211. // invalidated. So we set it to past-the-end. This means that you can
  212. // treat the container circularly.
  213. cit_ = bucket_->end();
  214. } else {
  215. --cit_;
  216. }
  217. }
  218. // Advance to the first entry with a key >= target
  219. void VectorRep::Iterator::Seek(const Slice& user_key,
  220. const char* memtable_key) {
  221. DoSort();
  222. // Do binary search to find first value not less than the target
  223. const char* encoded_key =
  224. (memtable_key != nullptr) ? memtable_key : EncodeKey(&tmp_, user_key);
  225. cit_ = std::equal_range(bucket_->begin(), bucket_->end(), encoded_key,
  226. [this](const char* a, const char* b) {
  227. return compare_(a, b) < 0;
  228. })
  229. .first;
  230. }
  231. Status VectorRep::Iterator::SeekAndValidate(
  232. const Slice& /* internal_key */, const char* /* memtable_key */,
  233. bool /* allow_data_in_errors */, bool /* detect_key_out_of_order */,
  234. const std::function<Status(const char*, bool)>&
  235. /* key_validation_callback */) {
  236. if (vrep_) {
  237. WriteLock l(&vrep_->rwlock_);
  238. if (bucket_->begin() == bucket_->end()) {
  239. // Memtable is empty
  240. return Status::OK();
  241. } else {
  242. return Status::NotSupported("SeekAndValidate() not implemented");
  243. }
  244. } else {
  245. return Status::NotSupported("SeekAndValidate() not implemented");
  246. }
  247. }
  248. // Advance to the first entry with a key <= target
  249. void VectorRep::Iterator::SeekForPrev(const Slice& /*user_key*/,
  250. const char* /*memtable_key*/) {
  251. assert(false);
  252. }
  253. // Position at the first entry in collection.
  254. // Final state of iterator is Valid() iff collection is not empty.
  255. void VectorRep::Iterator::SeekToFirst() {
  256. DoSort();
  257. cit_ = bucket_->begin();
  258. }
  259. // Position at the last entry in collection.
  260. // Final state of iterator is Valid() iff collection is not empty.
  261. void VectorRep::Iterator::SeekToLast() {
  262. DoSort();
  263. cit_ = bucket_->end();
  264. if (bucket_->size() != 0) {
  265. --cit_;
  266. }
  267. }
  268. void VectorRep::Get(const LookupKey& k, void* callback_args,
  269. bool (*callback_func)(void* arg, const char* entry)) {
  270. rwlock_.ReadLock();
  271. VectorRep* vector_rep;
  272. std::shared_ptr<Bucket> bucket;
  273. if (immutable_) {
  274. vector_rep = this;
  275. } else {
  276. vector_rep = nullptr;
  277. bucket.reset(new Bucket(*bucket_)); // make a copy
  278. }
  279. VectorRep::Iterator iter(vector_rep, immutable_ ? bucket_ : bucket, compare_);
  280. rwlock_.ReadUnlock();
  281. for (iter.Seek(k.user_key(), k.memtable_key().data());
  282. iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
  283. }
  284. }
  285. MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) {
  286. char* mem = nullptr;
  287. if (arena != nullptr) {
  288. mem = arena->AllocateAligned(sizeof(Iterator));
  289. }
  290. ReadLock l(&rwlock_);
  291. // Do not sort here. The sorting would be done the first time
  292. // a Seek is performed on the iterator.
  293. if (immutable_) {
  294. if (arena == nullptr) {
  295. return new Iterator(this, bucket_, compare_);
  296. } else {
  297. return new (mem) Iterator(this, bucket_, compare_);
  298. }
  299. } else {
  300. std::shared_ptr<Bucket> tmp;
  301. tmp.reset(new Bucket(*bucket_)); // make a copy
  302. if (arena == nullptr) {
  303. return new Iterator(nullptr, tmp, compare_);
  304. } else {
  305. return new (mem) Iterator(nullptr, tmp, compare_);
  306. }
  307. }
  308. }
  309. } // namespace
  310. static std::unordered_map<std::string, OptionTypeInfo> vector_rep_table_info = {
  311. {"count",
  312. {0, OptionType::kSizeT, OptionVerificationType::kNormal,
  313. OptionTypeFlags::kNone}},
  314. };
  315. VectorRepFactory::VectorRepFactory(size_t count) : count_(count) {
  316. RegisterOptions("VectorRepFactoryOptions", &count_, &vector_rep_table_info);
  317. }
  318. MemTableRep* VectorRepFactory::CreateMemTableRep(
  319. const MemTableRep::KeyComparator& compare, Allocator* allocator,
  320. const SliceTransform*, Logger* /*logger*/) {
  321. return new VectorRep(compare, allocator, count_);
  322. }
  323. } // namespace ROCKSDB_NAMESPACE