// Copyright (c) Meta Platforms, Inc. and affiliates. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "memtable/wbwi_memtable.h" #include "db/memtable.h" namespace ROCKSDB_NAMESPACE { const std::unordered_map WBWIMemTableIterator::WriteTypeToValueTypeMap = { {kPutRecord, kTypeValue}, {kMergeRecord, kTypeMerge}, {kDeleteRecord, kTypeDeletion}, {kSingleDeleteRecord, kTypeSingleDeletion}, {kDeleteRangeRecord, kTypeRangeDeletion}, {kPutEntityRecord, kTypeWideColumnEntity}, // Only the above record types are added to WBWI. // kLogDataRecord, kXIDRecord, kUnknownRecord }; InternalIterator* WBWIMemTable::NewIterator( const ReadOptions&, UnownedPtr, Arena* arena, const SliceTransform* /* prefix_extractor */, bool for_flush) { // Ingested WBWIMemTable should have an assigned seqno assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); assert(arena); auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator)); return new (mem) WBWIMemTableIterator( std::unique_ptr(wbwi_->NewIterator(cf_id_)), assigned_seqno_, comparator_, for_flush); } inline InternalIterator* WBWIMemTable::NewIterator() const { assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); return new WBWIMemTableIterator( std::unique_ptr(wbwi_->NewIterator(cf_id_)), assigned_seqno_, comparator_, /*for_flush=*/false); } bool WBWIMemTable::Get(const LookupKey& key, std::string* value, PinnableWideColumns* columns, std::string* timestamp, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* out_seq, const ReadOptions&, bool immutable_memtable, ReadCallback* callback, bool* is_blob_index, bool do_merge) { assert(s->ok() || s->IsMergeInProgress()); (void)immutable_memtable; (void)timestamp; (void)columns; assert(immutable_memtable); assert(!timestamp); // TODO: support UDT assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); // WBWI does not support DeleteRange yet. assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); assert(merge_context); *out_seq = kMaxSequenceNumber; [[maybe_unused]] SequenceNumber read_seq = GetInternalKeySeqno(key.internal_key()); // This is memtable is a single write batch, no snapshot can be taken within // assigned seqnos for this memtable. assert(read_seq >= assigned_seqno_.upper_bound || read_seq < assigned_seqno_.lower_bound); std::unique_ptr iter{NewIterator()}; iter->Seek(key.internal_key()); const Slice lookup_user_key = key.user_key(); bool merge_in_progress = s->IsMergeInProgress(); while (iter->Valid() && comparator_->EqualWithoutTimestamp( ExtractUserKey(iter->key()), lookup_user_key)) { uint64_t tag = ExtractInternalKeyFooter(iter->key()); ValueType type; SequenceNumber seq; UnPackSequenceAndType(tag, &seq, &type); // Unsupported operations. assert(type != kTypeBlobIndex); assert(type != kTypeWideColumnEntity); assert(type != kTypeValuePreferredSeqno); assert(type != kTypeDeletionWithTimestamp); if (!callback || callback->IsVisible(seq)) { if (*out_seq == kMaxSequenceNumber) { *out_seq = std::max(seq, *max_covering_tombstone_seq); } if (*max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } switch (type) { case kTypeValue: { HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(), do_merge, merge_in_progress, merge_context, moptions_.merge_operator, clock_, moptions_.statistics, moptions_.info_log, s, value, columns, is_blob_index); assert(seq <= read_seq); return /*found_final_value=*/true; } case kTypeDeletion: case kTypeSingleDeletion: case kTypeRangeDeletion: { HandleTypeDeletion(lookup_user_key, merge_in_progress, merge_context, moptions_.merge_operator, clock_, moptions_.statistics, moptions_.info_log, s, value, columns); assert(seq <= read_seq); return /*found_final_value=*/true; } case kTypeMerge: { merge_in_progress = true; if (ReadOnlyMemTable::HandleTypeMerge( lookup_user_key, iter->value(), iter->IsValuePinned(), do_merge, merge_context, moptions_.merge_operator, clock_, moptions_.statistics, moptions_.info_log, s, value, columns)) { return true; } break; } default: { std::string msg( "Unrecognized or unsupported value type for " "WBWI-based memtable: " + std::to_string(static_cast(type)) + ". "); msg.append("User key: " + ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". "); msg.append("seq: " + std::to_string(seq) + "."); *s = Status::Corruption(msg.c_str()); return /*found_final_value=*/true; } } } // Current key is a merge key or not visible assert(merge_in_progress || (callback && !callback->IsVisible(seq))); iter->Next(); } if (!iter->status().ok() && (s->ok() || s->IsMergeInProgress() || s->IsNotFound())) { *s = iter->status(); // stop further look up return true; } if (merge_in_progress) { assert(s->ok() || s->IsMergeInProgress()); *s = Status::MergeInProgress(); } return /*found_final_value=*/false; } void WBWIMemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback, bool immutable_memtable) { (void)immutable_memtable; // Should only be used as immutable memtable. assert(immutable_memtable); // TODO: reuse the InternalIterator created in Get(). for (auto iter = range->begin(); iter != range->end(); ++iter) { SequenceNumber dummy_seq = 0; bool found_final_value = Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr, iter->columns, iter->timestamp, iter->s, &(iter->merge_context), &(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true, callback, nullptr, true); if (found_final_value) { if (iter->s->ok() || iter->s->IsNotFound()) { if (iter->value) { iter->value->PinSelf(); range->AddValueSize(iter->value->size()); } else { assert(iter->columns); range->AddValueSize(iter->columns->serialized_size()); } } range->MarkKeyDone(iter); if (range->GetValueSize() > read_options.value_size_soft_limit) { // Set all remaining keys in range to Abort for (auto range_iter = range->begin(); range_iter != range->end(); ++range_iter) { range->MarkKeyDone(range_iter); *(range_iter->s) = Status::Aborted(); } break; } } } } } // namespace ROCKSDB_NAMESPACE