wbwi_memtable.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include "db/memtable.h"
  7. #include "rocksdb/utilities/write_batch_with_index.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. // An implementation of the ReadOnlyMemTable interface based on the content
  10. // of the given write batch with index (WBWI) object. This can be used to ingest
  11. // a transaction (which is based on WBWI) into the DB as an immutable memtable.
  12. //
  13. // REQUIRES: overwrite_key to be true for the WBWI
  14. // Since the keys in WBWI do not have sequence number, this class is responsible
  15. // for assigning sequence numbers to the keys. This memtable needs to be
  16. // assigned a range of sequence numbers through AssignSequenceNumbers(seqno)
  17. // before being available for reads.
  18. //
  19. // The sequence number assignment uses the update count for each key
  20. // tracked in WBWI (see WBWIIterator::GetUpdateCount()). For each key, the
  21. // sequence number assigned is seqno.lower_bound + update_count - 1. So more
  22. // recent updates will have higher sequence number.
  23. //
  24. // Since WBWI with overwrite mode keeps track of the most recent update for
  25. // each key, this memtable contains one update per key usually. However, there
  26. // are two exceptions:
  27. // 1. Merge operations: Each Merge operation do not overwrite existing entries,
  28. // if a user uses Merge, multiple entries may be kept.
  29. // 2. Overwriten SingleDelete: this memtable needs to emit an extra
  30. // SingleDelete even when the SD is overwritten by another update.
  31. // Consider the following scenario:
  32. // - WBWI has SD(k) then PUT(k, v1)
  33. // - DB has PUT(k, v2) in L1
  34. // - flush WBWI adds PUT(k, v1) into L0
  35. // - live memtable contains SD(k)
  36. // - flush live memtable and compact it with L0 will drop SD(k) and PUT(k, v1)
  37. // - the PUT(k, v2) in L1 incorrectly becomes visible
  38. // So during flush, iterator from this memtable will need emit overwritten
  39. // single deletion. This SD will be assigned seqno.lower_bound.
  40. class WBWIMemTable final : public ReadOnlyMemTable {
  41. public:
  42. struct SeqnoRange {
  43. SequenceNumber lower_bound = kMaxSequenceNumber;
  44. SequenceNumber upper_bound = kMaxSequenceNumber;
  45. };
  46. WBWIMemTable(const std::shared_ptr<WriteBatchWithIndex>& wbwi,
  47. const Comparator* cmp, uint32_t cf_id,
  48. const ImmutableOptions* immutable_options,
  49. const MutableCFOptions* cf_options,
  50. const WriteBatchWithIndex::CFStat& stat)
  51. : wbwi_(wbwi),
  52. comparator_(cmp),
  53. ikey_comparator_(comparator_),
  54. moptions_(*immutable_options, *cf_options),
  55. clock_(immutable_options->clock),
  56. // We need to include overwritten_sd_count in num_entries_ since flush
  57. // verifies number of entries processed and that iterator for this
  58. // memtable will emit overwritten SingleDelete entries during flush, See
  59. // comment above WBWIMemTableIterator for more detail.
  60. num_entries_(stat.entry_count + stat.overwritten_sd_count),
  61. cf_id_(cf_id) {
  62. assert(wbwi->GetOverwriteKey());
  63. }
  64. // No copying allowed
  65. WBWIMemTable(const WBWIMemTable&) = delete;
  66. WBWIMemTable& operator=(const WBWIMemTable&) = delete;
  67. ~WBWIMemTable() override { assert(refs_ == 0); }
  68. const char* Name() const override { return "WBWIMemTable"; }
  69. size_t ApproximateMemoryUsage() override {
  70. // FIXME: we can calculate for each CF or just divide evenly among CFs
  71. // Used in ReportFlushInputSize(), MemPurgeDecider, flush job event logging,
  72. // and InternalStats::HandleCurSizeAllMemTables
  73. return 0;
  74. }
  75. size_t MemoryAllocatedBytes() const override {
  76. // FIXME: similar to ApproximateMemoryUsage().
  77. // Used in MemTableList to trim memtable history.
  78. return 0;
  79. }
  80. void UniqueRandomSample(
  81. const uint64_t& /* target_sample_size */,
  82. std::unordered_set<const char*>* /* entries */) override {
  83. // TODO: support mempurge
  84. assert(false);
  85. }
  86. InternalIterator* NewIterator(const ReadOptions&,
  87. UnownedPtr<const SeqnoToTimeMapping>,
  88. Arena* arena,
  89. const SliceTransform* /* prefix_extractor */,
  90. bool for_flush) override;
  91. // Returns an iterator that wraps a MemTableIterator and logically strips the
  92. // user-defined timestamp of each key. This API is only used by flush when
  93. // user-defined timestamps in MemTable only feature is enabled.
  94. InternalIterator* NewTimestampStrippingIterator(
  95. const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
  96. const SliceTransform*, size_t) override {
  97. // TODO: support UDT
  98. assert(false);
  99. return NewErrorInternalIterator(
  100. Status::NotSupported(
  101. "WBWIMemTable does not support NewTimestampStrippingIterator."),
  102. arena);
  103. }
  104. FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
  105. const ReadOptions&, SequenceNumber, bool) override {
  106. // TODO: support DeleteRange
  107. assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
  108. return nullptr;
  109. }
  110. FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
  111. const ReadOptions&, SequenceNumber, size_t) override {
  112. // TODO: support UDT
  113. assert(false);
  114. return nullptr;
  115. }
  116. // FIXME: not a good practice to use default parameter with virtual function
  117. using ReadOnlyMemTable::Get;
  118. bool Get(const LookupKey& key, std::string* value,
  119. PinnableWideColumns* columns, std::string* timestamp, Status* s,
  120. MergeContext* merge_context,
  121. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  122. const ReadOptions& read_opts, bool immutable_memtable,
  123. ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
  124. bool do_merge = true) override;
  125. void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  126. ReadCallback* callback, bool immutable_memtable) override;
  127. uint64_t NumEntries() const override { return num_entries_; }
  128. uint64_t NumDeletion() const override {
  129. // FIXME: this is used for stats and event logging
  130. return 0;
  131. }
  132. uint64_t NumRangeDeletion() const override {
  133. // FIXME
  134. assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
  135. return 0;
  136. }
  137. uint64_t GetDataSize() const override {
  138. // FIXME: used in event logging in flush_job
  139. return 0;
  140. }
  141. SequenceNumber GetEarliestSequenceNumber() override {
  142. return assigned_seqno_.lower_bound;
  143. }
  144. bool IsEmpty() const override {
  145. // Ideally also check that wbwi contains updates from this CF. For now, we
  146. // only create WBWIMemTable for CFs with updates in wbwi.
  147. return wbwi_->GetWriteBatch()->Count() == 0;
  148. }
  149. SequenceNumber GetFirstSequenceNumber() override {
  150. return assigned_seqno_.lower_bound;
  151. }
  152. uint64_t GetMinLogContainingPrepSection() override {
  153. // FIXME: used to retain WAL with pending Prepare
  154. return min_prep_log_referenced_;
  155. }
  156. void MarkImmutable() override {}
  157. void MarkFlushed() override {}
  158. MemTableStats ApproximateStats(const Slice&, const Slice&) override {
  159. // FIXME: used for query planning
  160. return {};
  161. }
  162. const InternalKeyComparator& GetInternalKeyComparator() const override {
  163. return ikey_comparator_;
  164. }
  165. uint64_t ApproximateOldestKeyTime() const override {
  166. // FIXME: can use the time when this is added to the DB.
  167. return kUnknownOldestAncesterTime;
  168. }
  169. bool IsFragmentedRangeTombstonesConstructed() const override {
  170. assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
  171. return true;
  172. }
  173. const Slice& GetNewestUDT() const override {
  174. // FIXME: support UDT
  175. assert(false);
  176. return newest_udt_;
  177. }
  178. // Assign a sequence number to the entries in this memtable.
  179. void AssignSequenceNumbers(const SeqnoRange& seqno_range) {
  180. // Not expecting to assign seqno multiple times.
  181. assert(assigned_seqno_.lower_bound == kMaxSequenceNumber);
  182. assert(assigned_seqno_.upper_bound == kMaxSequenceNumber);
  183. assigned_seqno_ = seqno_range;
  184. assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound);
  185. assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
  186. }
  187. void SetMinPrepLog(uint64_t min_prep_log) {
  188. min_prep_log_referenced_ = min_prep_log;
  189. }
  190. private:
  191. inline InternalIterator* NewIterator() const;
  192. Slice newest_udt_;
  193. std::shared_ptr<WriteBatchWithIndex> wbwi_;
  194. const Comparator* comparator_;
  195. InternalKeyComparator ikey_comparator_;
  196. SeqnoRange assigned_seqno_;
  197. const ImmutableMemTableOptions moptions_;
  198. SystemClock* clock_;
  199. uint64_t min_prep_log_referenced_{0};
  200. uint64_t num_entries_;
  201. // WBWI can contains updates to multiple CFs. `cf_id_` determines which CF
  202. // this memtable is for.
  203. const uint32_t cf_id_;
  204. };
  205. class WBWIMemTableIterator final : public InternalIterator {
  206. public:
  207. WBWIMemTableIterator(std::unique_ptr<WBWIIterator>&& it,
  208. const WBWIMemTable::SeqnoRange& assigned_seqno,
  209. const Comparator* comparator, bool for_flush)
  210. : it_(std::move(it)),
  211. assigned_seqno_(assigned_seqno),
  212. comparator_(comparator),
  213. emit_overwritten_single_del_(for_flush) {
  214. assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound);
  215. assert(assigned_seqno_.upper_bound < kMaxSequenceNumber);
  216. s_.PermitUncheckedError();
  217. }
  218. // No copying allowed
  219. WBWIMemTableIterator(const WBWIMemTableIterator&) = delete;
  220. WBWIMemTableIterator& operator=(const WBWIMemTableIterator&) = delete;
  221. bool Valid() const override { return valid_; }
  222. void SeekToFirst() override {
  223. it_->SeekToFirst();
  224. UpdateKey();
  225. }
  226. void SeekToLast() override {
  227. assert(!emit_overwritten_single_del_);
  228. it_->SeekToLast();
  229. UpdateKey();
  230. }
  231. void Seek(const Slice& target) override {
  232. // `emit_overwritten_single_del_` is only used for flush, which does
  233. // sequential forward scan from the beginning.
  234. assert(!emit_overwritten_single_del_);
  235. Slice target_user_key = ExtractUserKey(target);
  236. // Moves to first update >= target_user_key
  237. it_->Seek(target_user_key);
  238. SequenceNumber target_seqno = GetInternalKeySeqno(target);
  239. // Move to the first entry with seqno <= target_seqno for the same
  240. // user key or a different user key.
  241. while (it_->Valid() &&
  242. comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
  243. target_seqno < CurrentKeySeqno()) {
  244. it_->Next();
  245. }
  246. UpdateKey();
  247. }
  248. void SeekForPrev(const Slice& target) override {
  249. assert(!emit_overwritten_single_del_);
  250. Slice target_user_key = ExtractUserKey(target);
  251. // Moves to last update <= target_user_key
  252. it_->SeekForPrev(target_user_key);
  253. SequenceNumber target_seqno = GetInternalKeySeqno(target);
  254. // Move to the first entry with seqno >= target_seqno for the same
  255. // user key or a different user key.
  256. while (it_->Valid() &&
  257. comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
  258. CurrentKeySeqno() < target_seqno) {
  259. it_->Prev();
  260. }
  261. UpdateKey();
  262. }
  263. void Next() override {
  264. assert(Valid());
  265. if (emit_overwritten_single_del_) {
  266. if (it_->HasOverWrittenSingleDel() && !at_overwritten_single_del_) {
  267. // Merge and SingleDelete on the same key is undefined behavior.
  268. assert(it_->Entry().type != kMergeRecord);
  269. UpdateSingleDeleteKey();
  270. return;
  271. }
  272. at_overwritten_single_del_ = false;
  273. }
  274. it_->Next();
  275. UpdateKey();
  276. }
  277. bool NextAndGetResult(IterateResult* result) override {
  278. assert(Valid());
  279. Next();
  280. bool is_valid = Valid();
  281. if (is_valid) {
  282. result->key = key();
  283. result->bound_check_result = IterBoundCheck::kUnknown;
  284. result->value_prepared = true;
  285. }
  286. return is_valid;
  287. }
  288. void Prev() override {
  289. assert(!emit_overwritten_single_del_);
  290. assert(Valid());
  291. it_->Prev();
  292. UpdateKey();
  293. }
  294. Slice key() const override {
  295. assert(Valid());
  296. return key_;
  297. }
  298. Slice value() const override {
  299. assert(Valid());
  300. return it_->Entry().value;
  301. }
  302. Status status() const override {
  303. assert(it_->status().ok());
  304. return s_;
  305. }
  306. bool IsValuePinned() const override { return true; }
  307. private:
  308. static const std::unordered_map<WriteType, ValueType> WriteTypeToValueTypeMap;
  309. SequenceNumber CurrentKeySeqno() {
  310. assert(it_->Valid());
  311. assert(it_->GetUpdateCount() >= 1);
  312. auto seq = assigned_seqno_.lower_bound + it_->GetUpdateCount() - 1;
  313. assert(seq <= assigned_seqno_.upper_bound);
  314. return seq;
  315. }
  316. // If it_ is valid, udate key_ to an internal key containing it_ current
  317. // key, CurrentKeySeqno() and a type corresponding to it_ current entry type.
  318. void UpdateKey() {
  319. valid_ = it_->Valid();
  320. if (!Valid()) {
  321. key_.clear();
  322. return;
  323. }
  324. auto t = WriteTypeToValueTypeMap.find(it_->Entry().type);
  325. assert(t != WriteTypeToValueTypeMap.end());
  326. if (t == WriteTypeToValueTypeMap.end()) {
  327. key_.clear();
  328. valid_ = false;
  329. s_ = Status::Corruption("Unexpected write_batch_with_index entry type " +
  330. std::to_string(it_->Entry().type));
  331. return;
  332. }
  333. key_buf_.SetInternalKey(it_->Entry().key, CurrentKeySeqno(), t->second);
  334. key_ = key_buf_.GetInternalKey();
  335. }
  336. void UpdateSingleDeleteKey() {
  337. assert(it_->Valid());
  338. assert(Valid());
  339. // The key that overwrites this SingleDelete will be assigned at least
  340. // seqno lower_bound + 1 (see CurrentKeySeqno()).
  341. key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.lower_bound,
  342. kTypeSingleDeletion);
  343. key_ = key_buf_.GetInternalKey();
  344. at_overwritten_single_del_ = true;
  345. }
  346. std::unique_ptr<WBWIIterator> it_;
  347. const WBWIMemTable::SeqnoRange assigned_seqno_;
  348. const Comparator* comparator_;
  349. IterKey key_buf_;
  350. // The current internal key.
  351. Slice key_;
  352. Status s_;
  353. bool valid_ = false;
  354. bool at_overwritten_single_del_ = false;
  355. bool emit_overwritten_single_del_ = false;
  356. };
  357. } // namespace ROCKSDB_NAMESPACE