write_batch_internal.h 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <vector>
  11. #include "db/flush_scheduler.h"
  12. #include "db/trim_history_scheduler.h"
  13. #include "db/write_thread.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/types.h"
  17. #include "rocksdb/write_batch.h"
  18. #include "util/autovector.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. class MemTable;
  21. class FlushScheduler;
  22. class ColumnFamilyData;
  23. class ColumnFamilyMemTables {
  24. public:
  25. virtual ~ColumnFamilyMemTables() {}
  26. virtual bool Seek(uint32_t column_family_id) = 0;
  27. // returns true if the update to memtable should be ignored
  28. // (useful when recovering from log whose updates have already
  29. // been processed)
  30. virtual uint64_t GetLogNumber() const = 0;
  31. virtual MemTable* GetMemTable() const = 0;
  32. virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
  33. virtual ColumnFamilyData* current() { return nullptr; }
  34. };
  35. class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
  36. public:
  37. explicit ColumnFamilyMemTablesDefault(MemTable* mem)
  38. : ok_(false), mem_(mem) {}
  39. bool Seek(uint32_t column_family_id) override {
  40. ok_ = (column_family_id == 0);
  41. return ok_;
  42. }
  43. uint64_t GetLogNumber() const override { return 0; }
  44. MemTable* GetMemTable() const override {
  45. assert(ok_);
  46. return mem_;
  47. }
  48. ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
  49. private:
  50. bool ok_;
  51. MemTable* mem_;
  52. };
  53. // WriteBatchInternal provides static methods for manipulating a
  54. // WriteBatch that we don't want in the public WriteBatch interface.
  55. class WriteBatchInternal {
  56. public:
  57. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  58. static const size_t kHeader = 12;
  59. // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
  60. static Status Put(WriteBatch* batch, uint32_t column_family_id,
  61. const Slice& key, const Slice& value);
  62. static Status Put(WriteBatch* batch, uint32_t column_family_id,
  63. const SliceParts& key, const SliceParts& value);
  64. static Status Delete(WriteBatch* batch, uint32_t column_family_id,
  65. const SliceParts& key);
  66. static Status Delete(WriteBatch* batch, uint32_t column_family_id,
  67. const Slice& key);
  68. static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
  69. const SliceParts& key);
  70. static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
  71. const Slice& key);
  72. static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
  73. const Slice& begin_key, const Slice& end_key);
  74. static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
  75. const SliceParts& begin_key,
  76. const SliceParts& end_key);
  77. static Status Merge(WriteBatch* batch, uint32_t column_family_id,
  78. const Slice& key, const Slice& value);
  79. static Status Merge(WriteBatch* batch, uint32_t column_family_id,
  80. const SliceParts& key, const SliceParts& value);
  81. static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
  82. const Slice& key, const Slice& value);
  83. static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
  84. const bool write_after_commit = true,
  85. const bool unprepared_batch = false);
  86. static Status MarkRollback(WriteBatch* batch, const Slice& xid);
  87. static Status MarkCommit(WriteBatch* batch, const Slice& xid);
  88. static Status InsertNoop(WriteBatch* batch);
  89. // Return the number of entries in the batch.
  90. static uint32_t Count(const WriteBatch* batch);
  91. // Set the count for the number of entries in the batch.
  92. static void SetCount(WriteBatch* batch, uint32_t n);
  93. // Return the sequence number for the start of this batch.
  94. static SequenceNumber Sequence(const WriteBatch* batch);
  95. // Store the specified number as the sequence number for the start of
  96. // this batch.
  97. static void SetSequence(WriteBatch* batch, SequenceNumber seq);
  98. // Returns the offset of the first entry in the batch.
  99. // This offset is only valid if the batch is not empty.
  100. static size_t GetFirstOffset(WriteBatch* batch);
  101. static Slice Contents(const WriteBatch* batch) {
  102. return Slice(batch->rep_);
  103. }
  104. static size_t ByteSize(const WriteBatch* batch) {
  105. return batch->rep_.size();
  106. }
  107. static Status SetContents(WriteBatch* batch, const Slice& contents);
  108. static Status CheckSlicePartsLength(const SliceParts& key,
  109. const SliceParts& value);
  110. // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
  111. //
  112. // If ignore_missing_column_families == true. WriteBatch
  113. // referencing non-existing column family will be ignored.
  114. // If ignore_missing_column_families == false, processing of the
  115. // batches will be stopped if a reference is found to a non-existing
  116. // column family and InvalidArgument() will be returned. The writes
  117. // in batches may be only partially applied at that point.
  118. //
  119. // If log_number is non-zero, the memtable will be updated only if
  120. // memtables->GetLogNumber() >= log_number.
  121. //
  122. // If flush_scheduler is non-null, it will be invoked if the memtable
  123. // should be flushed.
  124. //
  125. // Under concurrent use, the caller is responsible for making sure that
  126. // the memtables object itself is thread-local.
  127. static Status InsertInto(
  128. WriteThread::WriteGroup& write_group, SequenceNumber sequence,
  129. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  130. TrimHistoryScheduler* trim_history_scheduler,
  131. bool ignore_missing_column_families = false, uint64_t log_number = 0,
  132. DB* db = nullptr, bool concurrent_memtable_writes = false,
  133. bool seq_per_batch = false, bool batch_per_txn = true);
  134. // Convenience form of InsertInto when you have only one batch
  135. // next_seq returns the seq after last sequence number used in MemTable insert
  136. static Status InsertInto(
  137. const WriteBatch* batch, ColumnFamilyMemTables* memtables,
  138. FlushScheduler* flush_scheduler,
  139. TrimHistoryScheduler* trim_history_scheduler,
  140. bool ignore_missing_column_families = false, uint64_t log_number = 0,
  141. DB* db = nullptr, bool concurrent_memtable_writes = false,
  142. SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
  143. bool seq_per_batch = false, bool batch_per_txn = true);
  144. static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
  145. ColumnFamilyMemTables* memtables,
  146. FlushScheduler* flush_scheduler,
  147. TrimHistoryScheduler* trim_history_scheduler,
  148. bool ignore_missing_column_families = false,
  149. uint64_t log_number = 0, DB* db = nullptr,
  150. bool concurrent_memtable_writes = false,
  151. bool seq_per_batch = false, size_t batch_cnt = 0,
  152. bool batch_per_txn = true,
  153. bool hint_per_batch = false);
  154. static Status Append(WriteBatch* dst, const WriteBatch* src,
  155. const bool WAL_only = false);
  156. // Returns the byte size of appending a WriteBatch with ByteSize
  157. // leftByteSize and a WriteBatch with ByteSize rightByteSize
  158. static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
  159. // Iterate over [begin, end) range of a write batch
  160. static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
  161. size_t begin, size_t end);
  162. // This write batch includes the latest state that should be persisted. Such
  163. // state meant to be used only during recovery.
  164. static void SetAsLastestPersistentState(WriteBatch* b);
  165. static bool IsLatestPersistentState(const WriteBatch* b);
  166. };
  167. // LocalSavePoint is similar to a scope guard
  168. class LocalSavePoint {
  169. public:
  170. explicit LocalSavePoint(WriteBatch* batch)
  171. : batch_(batch),
  172. savepoint_(batch->GetDataSize(), batch->Count(),
  173. batch->content_flags_.load(std::memory_order_relaxed))
  174. #ifndef NDEBUG
  175. ,
  176. committed_(false)
  177. #endif
  178. {
  179. }
  180. #ifndef NDEBUG
  181. ~LocalSavePoint() { assert(committed_); }
  182. #endif
  183. Status commit() {
  184. #ifndef NDEBUG
  185. committed_ = true;
  186. #endif
  187. if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
  188. batch_->rep_.resize(savepoint_.size);
  189. WriteBatchInternal::SetCount(batch_, savepoint_.count);
  190. batch_->content_flags_.store(savepoint_.content_flags,
  191. std::memory_order_relaxed);
  192. return Status::MemoryLimit();
  193. }
  194. return Status::OK();
  195. }
  196. private:
  197. WriteBatch* batch_;
  198. SavePoint savepoint_;
  199. #ifndef NDEBUG
  200. bool committed_;
  201. #endif
  202. };
  203. } // namespace ROCKSDB_NAMESPACE