write_batch_internal.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <array>
  11. #include <vector>
  12. #include "db/flush_scheduler.h"
  13. #include "db/kv_checksum.h"
  14. #include "db/trim_history_scheduler.h"
  15. #include "db/write_thread.h"
  16. #include "rocksdb/db.h"
  17. #include "rocksdb/options.h"
  18. #include "rocksdb/types.h"
  19. #include "rocksdb/write_batch.h"
  20. #include "util/autovector.h"
  21. #include "util/cast_util.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. class MemTable;
  24. class FlushScheduler;
  25. class ColumnFamilyData;
  26. class ColumnFamilyMemTables {
  27. public:
  28. virtual ~ColumnFamilyMemTables() {}
  29. virtual bool Seek(uint32_t column_family_id) = 0;
  30. // returns true if the update to memtable should be ignored
  31. // (useful when recovering from log whose updates have already
  32. // been processed)
  33. virtual uint64_t GetLogNumber() const = 0;
  34. virtual MemTable* GetMemTable() const = 0;
  35. virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
  36. virtual ColumnFamilyData* current() { return nullptr; }
  37. };
  38. class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
  39. public:
  40. explicit ColumnFamilyMemTablesDefault(MemTable* mem)
  41. : ok_(false), mem_(mem) {}
  42. bool Seek(uint32_t column_family_id) override {
  43. ok_ = (column_family_id == 0);
  44. return ok_;
  45. }
  46. uint64_t GetLogNumber() const override { return 0; }
  47. MemTable* GetMemTable() const override {
  48. assert(ok_);
  49. return mem_;
  50. }
  51. ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
  52. private:
  53. bool ok_;
  54. MemTable* mem_;
  55. };
  56. struct WriteBatch::ProtectionInfo {
  57. // `WriteBatch` usually doesn't contain a huge number of keys so protecting
  58. // with a fixed, non-configurable eight bytes per key may work well enough.
  59. autovector<ProtectionInfoKVOC64> entries_;
  60. size_t GetBytesPerKey() const { return 8; }
  61. };
  62. // WriteBatchInternal provides static methods for manipulating a
  63. // WriteBatch that we don't want in the public WriteBatch interface.
  64. class WriteBatchInternal {
  65. public:
  66. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  67. static constexpr size_t kHeader = 12;
  68. // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
  69. static Status Put(WriteBatch* batch, uint32_t column_family_id,
  70. const Slice& key, const Slice& value);
  71. static Status Put(WriteBatch* batch, uint32_t column_family_id,
  72. const SliceParts& key, const SliceParts& value);
  73. static Status TimedPut(WriteBatch* batch, uint32_t column_family_id,
  74. const Slice& key, const Slice& value,
  75. uint64_t unix_write_time);
  76. static Status PutEntity(WriteBatch* batch, uint32_t column_family_id,
  77. const Slice& key, const WideColumns& columns);
  78. static Status Delete(WriteBatch* batch, uint32_t column_family_id,
  79. const SliceParts& key);
  80. static Status Delete(WriteBatch* batch, uint32_t column_family_id,
  81. const Slice& key);
  82. static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
  83. const SliceParts& key);
  84. static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
  85. const Slice& key);
  86. static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
  87. const Slice& begin_key, const Slice& end_key);
  88. static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
  89. const SliceParts& begin_key,
  90. const SliceParts& end_key);
  91. static Status Merge(WriteBatch* batch, uint32_t column_family_id,
  92. const Slice& key, const Slice& value);
  93. static Status Merge(WriteBatch* batch, uint32_t column_family_id,
  94. const SliceParts& key, const SliceParts& value);
  95. static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
  96. const Slice& key, const Slice& value);
  97. static ValueType GetBeginPrepareType(bool write_after_commit,
  98. bool unprepared_batch);
  99. static Status InsertBeginPrepare(WriteBatch* batch,
  100. const bool write_after_commit = true,
  101. bool unprepared_batch = false);
  102. static Status InsertEndPrepare(WriteBatch* batch, const Slice& xid);
  103. static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
  104. const bool write_after_commit = true,
  105. const bool unprepared_batch = false);
  106. static Status MarkRollback(WriteBatch* batch, const Slice& xid);
  107. static Status MarkCommit(WriteBatch* batch, const Slice& xid);
  108. static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid,
  109. const Slice& commit_ts);
  110. static Status InsertNoop(WriteBatch* batch);
  111. // Return the number of entries in the batch.
  112. static uint32_t Count(const WriteBatch* batch);
  113. // Set the count for the number of entries in the batch.
  114. static void SetCount(WriteBatch* batch, uint32_t n);
  115. // Return the sequence number for the start of this batch.
  116. static SequenceNumber Sequence(const WriteBatch* batch);
  117. // Store the specified number as the sequence number for the start of
  118. // this batch.
  119. static void SetSequence(WriteBatch* batch, SequenceNumber seq);
  120. // Returns the offset of the first entry in the batch.
  121. // This offset is only valid if the batch is not empty.
  122. static size_t GetFirstOffset(WriteBatch* batch);
  123. static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); }
  124. static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); }
  125. static Status SetContents(WriteBatch* batch, const Slice& contents);
  126. static Status CheckSlicePartsLength(const SliceParts& key,
  127. const SliceParts& value);
  128. // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
  129. //
  130. // If ignore_missing_column_families == true. WriteBatch
  131. // referencing non-existing column family will be ignored.
  132. // If ignore_missing_column_families == false, processing of the
  133. // batches will be stopped if a reference is found to a non-existing
  134. // column family and InvalidArgument() will be returned. The writes
  135. // in batches may be only partially applied at that point.
  136. //
  137. // If log_number is non-zero, the memtable will be updated only if
  138. // memtables->GetLogNumber() >= log_number.
  139. //
  140. // If flush_scheduler is non-null, it will be invoked if the memtable
  141. // should be flushed.
  142. //
  143. // This overload is for non-concurrent insertion only.
  144. static Status InsertInto(
  145. WriteThread::WriteGroup& write_group, SequenceNumber sequence,
  146. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  147. TrimHistoryScheduler* trim_history_scheduler,
  148. bool ignore_missing_column_families = false, uint64_t log_number = 0,
  149. DB* db = nullptr, bool seq_per_batch = false, bool batch_per_txn = true);
  150. // Convenience form of InsertInto when you have only one batch
  151. // next_seq returns the seq after last sequence number used in MemTable insert
  152. //
  153. // Under concurrent use, the caller is responsible for making sure that
  154. // the memtables object itself is thread-local.
  155. static Status InsertInto(
  156. const WriteBatch* batch, ColumnFamilyMemTables* memtables,
  157. FlushScheduler* flush_scheduler,
  158. TrimHistoryScheduler* trim_history_scheduler,
  159. bool ignore_missing_column_families = false, uint64_t log_number = 0,
  160. DB* db = nullptr, bool concurrent_memtable_writes = false,
  161. SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
  162. bool seq_per_batch = false, bool batch_per_txn = true);
  163. static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
  164. ColumnFamilyMemTables* memtables,
  165. FlushScheduler* flush_scheduler,
  166. TrimHistoryScheduler* trim_history_scheduler,
  167. bool ignore_missing_column_families = false,
  168. uint64_t log_number = 0, DB* db = nullptr,
  169. bool concurrent_memtable_writes = false,
  170. bool seq_per_batch = false, size_t batch_cnt = 0,
  171. bool batch_per_txn = true,
  172. bool hint_per_batch = false);
  173. // Appends src write batch to dst write batch and updates count in dst
  174. // write batch. Returns OK if the append is successful. Checks number of
  175. // checksum against count in dst and src write batches, and returns Corruption
  176. // if the count is inconsistent.
  177. static Status Append(WriteBatch* dst, const WriteBatch* src,
  178. const bool WAL_only = false);
  179. // Returns the byte size of appending a WriteBatch with ByteSize
  180. // leftByteSize and a WriteBatch with ByteSize rightByteSize
  181. static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
  182. // Iterate over [begin, end) range of a write batch
  183. static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
  184. size_t begin, size_t end);
  185. // This write batch includes the latest state that should be persisted. Such
  186. // state meant to be used only during recovery.
  187. static void SetAsLatestPersistentState(WriteBatch* b);
  188. static bool IsLatestPersistentState(const WriteBatch* b);
  189. static void SetDefaultColumnFamilyTimestampSize(WriteBatch* wb,
  190. size_t default_cf_ts_sz);
  191. static std::tuple<Status, uint32_t, size_t> GetColumnFamilyIdAndTimestampSize(
  192. WriteBatch* b, ColumnFamilyHandle* column_family);
  193. static bool TimestampsUpdateNeeded(const WriteBatch& wb) {
  194. return wb.needs_in_place_update_ts_;
  195. }
  196. static bool HasKeyWithTimestamp(const WriteBatch& wb) {
  197. return wb.has_key_with_ts_;
  198. }
  199. // Update per-key value protection information on this write batch.
  200. // If checksum is provided, the batch content is verfied against the checksum.
  201. static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key,
  202. uint64_t* checksum = nullptr);
  203. };
  204. // LocalSavePoint is similar to a scope guard
  205. class LocalSavePoint {
  206. public:
  207. explicit LocalSavePoint(WriteBatch* batch)
  208. : batch_(batch),
  209. savepoint_(batch->GetDataSize(), batch->Count(),
  210. batch->content_flags_.load(std::memory_order_relaxed))
  211. #ifndef NDEBUG
  212. ,
  213. committed_(false)
  214. #endif
  215. {
  216. }
  217. #ifndef NDEBUG
  218. ~LocalSavePoint() { assert(committed_); }
  219. #endif
  220. Status commit() {
  221. #ifndef NDEBUG
  222. committed_ = true;
  223. #endif
  224. if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
  225. batch_->rep_.resize(savepoint_.size);
  226. WriteBatchInternal::SetCount(batch_, savepoint_.count);
  227. if (batch_->prot_info_ != nullptr) {
  228. batch_->prot_info_->entries_.resize(savepoint_.count);
  229. }
  230. batch_->content_flags_.store(savepoint_.content_flags,
  231. std::memory_order_relaxed);
  232. return Status::MemoryLimit();
  233. }
  234. return Status::OK();
  235. }
  236. private:
  237. WriteBatch* batch_;
  238. SavePoint savepoint_;
  239. #ifndef NDEBUG
  240. bool committed_;
  241. #endif
  242. };
  243. template <typename TimestampSizeFuncType>
  244. class TimestampUpdater : public WriteBatch::Handler {
  245. public:
  246. explicit TimestampUpdater(WriteBatch::ProtectionInfo* prot_info,
  247. TimestampSizeFuncType&& ts_sz_func, const Slice& ts)
  248. : prot_info_(prot_info),
  249. ts_sz_func_(std::move(ts_sz_func)),
  250. timestamp_(ts) {
  251. assert(!timestamp_.empty());
  252. }
  253. ~TimestampUpdater() override {}
  254. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  255. return UpdateTimestamp(cf, key);
  256. }
  257. Status DeleteCF(uint32_t cf, const Slice& key) override {
  258. return UpdateTimestamp(cf, key);
  259. }
  260. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  261. return UpdateTimestamp(cf, key);
  262. }
  263. Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
  264. const Slice& end_key) override {
  265. Status s = UpdateTimestamp(cf, begin_key, true /* is_key */);
  266. if (s.ok()) {
  267. s = UpdateTimestamp(cf, end_key, false /* is_key */);
  268. }
  269. return s;
  270. }
  271. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  272. return UpdateTimestamp(cf, key);
  273. }
  274. Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override {
  275. return UpdateTimestamp(cf, key);
  276. }
  277. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  278. Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
  279. Status MarkCommit(const Slice&) override { return Status::OK(); }
  280. Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
  281. return Status::OK();
  282. }
  283. Status MarkRollback(const Slice&) override { return Status::OK(); }
  284. Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
  285. private:
  286. // @param is_key specifies whether the update is for key or value.
  287. Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) {
  288. Status s = UpdateTimestampImpl(cf, buf, idx_, is_key);
  289. ++idx_;
  290. return s;
  291. }
  292. Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/,
  293. bool is_key) {
  294. if (timestamp_.empty()) {
  295. return Status::InvalidArgument("Timestamp is empty");
  296. }
  297. size_t cf_ts_sz = ts_sz_func_(cf);
  298. if (0 == cf_ts_sz) {
  299. // Skip this column family.
  300. return Status::OK();
  301. } else if (std::numeric_limits<size_t>::max() == cf_ts_sz) {
  302. // Column family timestamp info not found.
  303. return Status::NotFound();
  304. } else if (cf_ts_sz != timestamp_.size()) {
  305. return Status::InvalidArgument("timestamp size mismatch");
  306. }
  307. UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key);
  308. char* ptr = const_cast<char*>(buf.data() + buf.size() - cf_ts_sz);
  309. assert(ptr);
  310. memcpy(ptr, timestamp_.data(), timestamp_.size());
  311. return Status::OK();
  312. }
  313. void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts,
  314. bool is_key) {
  315. if (prot_info_ != nullptr) {
  316. const size_t ts_sz = ts.size();
  317. SliceParts old(&buf, 1);
  318. Slice old_no_ts(buf.data(), buf.size() - ts_sz);
  319. std::array<Slice, 2> new_key_cmpts{{old_no_ts, ts}};
  320. SliceParts new_parts(new_key_cmpts.data(), 2);
  321. if (is_key) {
  322. prot_info_->entries_[idx_].UpdateK(old, new_parts);
  323. } else {
  324. prot_info_->entries_[idx_].UpdateV(old, new_parts);
  325. }
  326. }
  327. }
  328. // No copy or move.
  329. TimestampUpdater(const TimestampUpdater&) = delete;
  330. TimestampUpdater(TimestampUpdater&&) = delete;
  331. TimestampUpdater& operator=(const TimestampUpdater&) = delete;
  332. TimestampUpdater& operator=(TimestampUpdater&&) = delete;
  333. WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
  334. const TimestampSizeFuncType ts_sz_func_{};
  335. const Slice timestamp_;
  336. size_t idx_ = 0;
  337. };
  338. } // namespace ROCKSDB_NAMESPACE