| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#pragma once#include <vector>#include "db/flush_scheduler.h"#include "db/trim_history_scheduler.h"#include "db/write_thread.h"#include "rocksdb/db.h"#include "rocksdb/options.h"#include "rocksdb/types.h"#include "rocksdb/write_batch.h"#include "util/autovector.h"namespace ROCKSDB_NAMESPACE {class MemTable;class FlushScheduler;class ColumnFamilyData;class ColumnFamilyMemTables { public:  virtual ~ColumnFamilyMemTables() {}  virtual bool Seek(uint32_t column_family_id) = 0;  // returns true if the update to memtable should be ignored  // (useful when recovering from log whose updates have already  // been processed)  virtual uint64_t GetLogNumber() const = 0;  virtual MemTable* GetMemTable() const = 0;  virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;  virtual ColumnFamilyData* current() { return nullptr; }};class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { public:  explicit ColumnFamilyMemTablesDefault(MemTable* mem)      : ok_(false), mem_(mem) {}  bool Seek(uint32_t column_family_id) override {    ok_ = (column_family_id == 0);    return ok_;  }  uint64_t GetLogNumber() const override { return 0; }  MemTable* GetMemTable() const override {    assert(ok_);    return mem_;  }  ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } private:  bool ok_;  MemTable* mem_;};// WriteBatchInternal provides static methods for manipulating a// WriteBatch that we don't want in the public WriteBatch interface.class WriteBatchInternal { public:  // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.  static const size_t kHeader = 12;  // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*  static Status Put(WriteBatch* batch, uint32_t column_family_id,                    const Slice& key, const Slice& value);  static Status Put(WriteBatch* batch, uint32_t column_family_id,                    const SliceParts& key, const SliceParts& value);  static Status Delete(WriteBatch* batch, uint32_t column_family_id,                       const SliceParts& key);  static Status Delete(WriteBatch* batch, uint32_t column_family_id,                       const Slice& key);  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,                             const SliceParts& key);  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,                             const Slice& key);  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,                            const Slice& begin_key, const Slice& end_key);  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,                            const SliceParts& begin_key,                            const SliceParts& end_key);  static Status Merge(WriteBatch* batch, uint32_t column_family_id,                      const Slice& key, const Slice& value);  static Status Merge(WriteBatch* batch, uint32_t column_family_id,                      const SliceParts& key, const SliceParts& value);  static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,                             const Slice& key, const Slice& value);  static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,                               const bool write_after_commit = true,                               const bool unprepared_batch = false);  static Status MarkRollback(WriteBatch* batch, const Slice& xid);  static Status MarkCommit(WriteBatch* batch, const Slice& xid);  static Status InsertNoop(WriteBatch* batch);  // Return the number of entries in the batch.  static uint32_t Count(const WriteBatch* batch);  // Set the count for the number of entries in the batch.  static void SetCount(WriteBatch* batch, uint32_t n);  // Return the sequence number for the start of this batch.  static SequenceNumber Sequence(const WriteBatch* batch);  // Store the specified number as the sequence number for the start of  // this batch.  static void SetSequence(WriteBatch* batch, SequenceNumber seq);  // Returns the offset of the first entry in the batch.  // This offset is only valid if the batch is not empty.  static size_t GetFirstOffset(WriteBatch* batch);  static Slice Contents(const WriteBatch* batch) {    return Slice(batch->rep_);  }  static size_t ByteSize(const WriteBatch* batch) {    return batch->rep_.size();  }  static Status SetContents(WriteBatch* batch, const Slice& contents);  static Status CheckSlicePartsLength(const SliceParts& key,                                      const SliceParts& value);  // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.  //  // If ignore_missing_column_families == true. WriteBatch  // referencing non-existing column family will be ignored.  // If ignore_missing_column_families == false, processing of the  // batches will be stopped if a reference is found to a non-existing  // column family and InvalidArgument() will be returned.  The writes  // in batches may be only partially applied at that point.  //  // If log_number is non-zero, the memtable will be updated only if  // memtables->GetLogNumber() >= log_number.  //  // If flush_scheduler is non-null, it will be invoked if the memtable  // should be flushed.  //  // Under concurrent use, the caller is responsible for making sure that  // the memtables object itself is thread-local.  static Status InsertInto(      WriteThread::WriteGroup& write_group, SequenceNumber sequence,      ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,      TrimHistoryScheduler* trim_history_scheduler,      bool ignore_missing_column_families = false, uint64_t log_number = 0,      DB* db = nullptr, bool concurrent_memtable_writes = false,      bool seq_per_batch = false, bool batch_per_txn = true);  // Convenience form of InsertInto when you have only one batch  // next_seq returns the seq after last sequence number used in MemTable insert  static Status InsertInto(      const WriteBatch* batch, ColumnFamilyMemTables* memtables,      FlushScheduler* flush_scheduler,      TrimHistoryScheduler* trim_history_scheduler,      bool ignore_missing_column_families = false, uint64_t log_number = 0,      DB* db = nullptr, bool concurrent_memtable_writes = false,      SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,      bool seq_per_batch = false, bool batch_per_txn = true);  static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,                           ColumnFamilyMemTables* memtables,                           FlushScheduler* flush_scheduler,                           TrimHistoryScheduler* trim_history_scheduler,                           bool ignore_missing_column_families = false,                           uint64_t log_number = 0, DB* db = nullptr,                           bool concurrent_memtable_writes = false,                           bool seq_per_batch = false, size_t batch_cnt = 0,                           bool batch_per_txn = true,                           bool hint_per_batch = false);  static Status Append(WriteBatch* dst, const WriteBatch* src,                       const bool WAL_only = false);  // Returns the byte size of appending a WriteBatch with ByteSize  // leftByteSize and a WriteBatch with ByteSize rightByteSize  static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);  // Iterate over [begin, end) range of a write batch  static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,                        size_t begin, size_t end);  // This write batch includes the latest state that should be persisted. Such  // state meant to be used only during recovery.  static void SetAsLastestPersistentState(WriteBatch* b);  static bool IsLatestPersistentState(const WriteBatch* b);};// LocalSavePoint is similar to a scope guardclass LocalSavePoint { public:  explicit LocalSavePoint(WriteBatch* batch)      : batch_(batch),        savepoint_(batch->GetDataSize(), batch->Count(),                   batch->content_flags_.load(std::memory_order_relaxed))#ifndef NDEBUG        ,        committed_(false)#endif  {  }#ifndef NDEBUG  ~LocalSavePoint() { assert(committed_); }#endif  Status commit() {#ifndef NDEBUG    committed_ = true;#endif    if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {      batch_->rep_.resize(savepoint_.size);      WriteBatchInternal::SetCount(batch_, savepoint_.count);      batch_->content_flags_.store(savepoint_.content_flags,                                   std::memory_order_relaxed);      return Status::MemoryLimit();    }    return Status::OK();  } private:  WriteBatch* batch_;  SavePoint savepoint_;#ifndef NDEBUG  bool committed_;#endif};}  // namespace ROCKSDB_NAMESPACE
 |