wbwi_memtable.cc 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "memtable/wbwi_memtable.h"
  6. #include "db/memtable.h"
  7. namespace ROCKSDB_NAMESPACE {
  8. const std::unordered_map<WriteType, ValueType>
  9. WBWIMemTableIterator::WriteTypeToValueTypeMap = {
  10. {kPutRecord, kTypeValue},
  11. {kMergeRecord, kTypeMerge},
  12. {kDeleteRecord, kTypeDeletion},
  13. {kSingleDeleteRecord, kTypeSingleDeletion},
  14. {kDeleteRangeRecord, kTypeRangeDeletion},
  15. {kPutEntityRecord, kTypeWideColumnEntity},
  16. // Only the above record types are added to WBWI.
  17. // kLogDataRecord, kXIDRecord, kUnknownRecord
  18. };
  19. InternalIterator* WBWIMemTable::NewIterator(
  20. const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
  21. const SliceTransform* /* prefix_extractor */, bool for_flush) {
  22. // Ingested WBWIMemTable should have an assigned seqno
  23. assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
  24. assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
  25. assert(arena);
  26. auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator));
  27. return new (mem) WBWIMemTableIterator(
  28. std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
  29. assigned_seqno_, comparator_, for_flush);
  30. }
  31. inline InternalIterator* WBWIMemTable::NewIterator() const {
  32. assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
  33. assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
  34. return new WBWIMemTableIterator(
  35. std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
  36. assigned_seqno_, comparator_, /*for_flush=*/false);
  37. }
  38. bool WBWIMemTable::Get(const LookupKey& key, std::string* value,
  39. PinnableWideColumns* columns, std::string* timestamp,
  40. Status* s, MergeContext* merge_context,
  41. SequenceNumber* max_covering_tombstone_seq,
  42. SequenceNumber* out_seq, const ReadOptions&,
  43. bool immutable_memtable, ReadCallback* callback,
  44. bool* is_blob_index, bool do_merge) {
  45. assert(s->ok() || s->IsMergeInProgress());
  46. (void)immutable_memtable;
  47. (void)timestamp;
  48. (void)columns;
  49. assert(immutable_memtable);
  50. assert(!timestamp); // TODO: support UDT
  51. assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
  52. assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
  53. // WBWI does not support DeleteRange yet.
  54. assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
  55. assert(merge_context);
  56. *out_seq = kMaxSequenceNumber;
  57. [[maybe_unused]] SequenceNumber read_seq =
  58. GetInternalKeySeqno(key.internal_key());
  59. // This is memtable is a single write batch, no snapshot can be taken within
  60. // assigned seqnos for this memtable.
  61. assert(read_seq >= assigned_seqno_.upper_bound ||
  62. read_seq < assigned_seqno_.lower_bound);
  63. std::unique_ptr<InternalIterator> iter{NewIterator()};
  64. iter->Seek(key.internal_key());
  65. const Slice lookup_user_key = key.user_key();
  66. bool merge_in_progress = s->IsMergeInProgress();
  67. while (iter->Valid() && comparator_->EqualWithoutTimestamp(
  68. ExtractUserKey(iter->key()), lookup_user_key)) {
  69. uint64_t tag = ExtractInternalKeyFooter(iter->key());
  70. ValueType type;
  71. SequenceNumber seq;
  72. UnPackSequenceAndType(tag, &seq, &type);
  73. // Unsupported operations.
  74. assert(type != kTypeBlobIndex);
  75. assert(type != kTypeWideColumnEntity);
  76. assert(type != kTypeValuePreferredSeqno);
  77. assert(type != kTypeDeletionWithTimestamp);
  78. if (!callback || callback->IsVisible(seq)) {
  79. if (*out_seq == kMaxSequenceNumber) {
  80. *out_seq = std::max(seq, *max_covering_tombstone_seq);
  81. }
  82. if (*max_covering_tombstone_seq > seq) {
  83. type = kTypeRangeDeletion;
  84. }
  85. switch (type) {
  86. case kTypeValue: {
  87. HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(),
  88. do_merge, merge_in_progress, merge_context,
  89. moptions_.merge_operator, clock_,
  90. moptions_.statistics, moptions_.info_log, s, value,
  91. columns, is_blob_index);
  92. assert(seq <= read_seq);
  93. return /*found_final_value=*/true;
  94. }
  95. case kTypeDeletion:
  96. case kTypeSingleDeletion:
  97. case kTypeRangeDeletion: {
  98. HandleTypeDeletion(lookup_user_key, merge_in_progress, merge_context,
  99. moptions_.merge_operator, clock_,
  100. moptions_.statistics, moptions_.info_log, s, value,
  101. columns);
  102. assert(seq <= read_seq);
  103. return /*found_final_value=*/true;
  104. }
  105. case kTypeMerge: {
  106. merge_in_progress = true;
  107. if (ReadOnlyMemTable::HandleTypeMerge(
  108. lookup_user_key, iter->value(), iter->IsValuePinned(),
  109. do_merge, merge_context, moptions_.merge_operator, clock_,
  110. moptions_.statistics, moptions_.info_log, s, value,
  111. columns)) {
  112. return true;
  113. }
  114. break;
  115. }
  116. default: {
  117. std::string msg(
  118. "Unrecognized or unsupported value type for "
  119. "WBWI-based memtable: " +
  120. std::to_string(static_cast<int>(type)) + ". ");
  121. msg.append("User key: " +
  122. ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". ");
  123. msg.append("seq: " + std::to_string(seq) + ".");
  124. *s = Status::Corruption(msg.c_str());
  125. return /*found_final_value=*/true;
  126. }
  127. }
  128. }
  129. // Current key is a merge key or not visible
  130. assert(merge_in_progress || (callback && !callback->IsVisible(seq)));
  131. iter->Next();
  132. }
  133. if (!iter->status().ok() &&
  134. (s->ok() || s->IsMergeInProgress() || s->IsNotFound())) {
  135. *s = iter->status();
  136. // stop further look up
  137. return true;
  138. }
  139. if (merge_in_progress) {
  140. assert(s->ok() || s->IsMergeInProgress());
  141. *s = Status::MergeInProgress();
  142. }
  143. return /*found_final_value=*/false;
  144. }
  145. void WBWIMemTable::MultiGet(const ReadOptions& read_options,
  146. MultiGetRange* range, ReadCallback* callback,
  147. bool immutable_memtable) {
  148. (void)immutable_memtable;
  149. // Should only be used as immutable memtable.
  150. assert(immutable_memtable);
  151. // TODO: reuse the InternalIterator created in Get().
  152. for (auto iter = range->begin(); iter != range->end(); ++iter) {
  153. SequenceNumber dummy_seq = 0;
  154. bool found_final_value =
  155. Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr,
  156. iter->columns, iter->timestamp, iter->s, &(iter->merge_context),
  157. &(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true,
  158. callback, nullptr, true);
  159. if (found_final_value) {
  160. if (iter->s->ok() || iter->s->IsNotFound()) {
  161. if (iter->value) {
  162. iter->value->PinSelf();
  163. range->AddValueSize(iter->value->size());
  164. } else {
  165. assert(iter->columns);
  166. range->AddValueSize(iter->columns->serialized_size());
  167. }
  168. }
  169. range->MarkKeyDone(iter);
  170. if (range->GetValueSize() > read_options.value_size_soft_limit) {
  171. // Set all remaining keys in range to Abort
  172. for (auto range_iter = range->begin(); range_iter != range->end();
  173. ++range_iter) {
  174. range->MarkKeyDone(range_iter);
  175. *(range_iter->s) = Status::Aborted();
  176. }
  177. break;
  178. }
  179. }
  180. }
  181. }
  182. } // namespace ROCKSDB_NAMESPACE