udt_util.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #pragma once
  7. #include <memory>
  8. #include <optional>
  9. #include <sstream>
  10. #include <unordered_map>
  11. #include <vector>
  12. #include "db/wide/wide_column_serialization.h"
  13. #include "db/write_batch_internal.h"
  14. #include "rocksdb/slice.h"
  15. #include "rocksdb/status.h"
  16. #include "rocksdb/write_batch.h"
  17. #include "util/coding.h"
  18. #include "util/hash_containers.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. // Dummy record in WAL logs signaling user-defined timestamp sizes for
  21. // subsequent records.
  22. class UserDefinedTimestampSizeRecord {
  23. public:
  24. UserDefinedTimestampSizeRecord() {}
  25. explicit UserDefinedTimestampSizeRecord(
  26. std::vector<std::pair<uint32_t, size_t>>&& cf_to_ts_sz)
  27. : cf_to_ts_sz_(std::move(cf_to_ts_sz)) {}
  28. const std::vector<std::pair<uint32_t, size_t>>& GetUserDefinedTimestampSize()
  29. const {
  30. return cf_to_ts_sz_;
  31. }
  32. inline void EncodeTo(std::string* dst) const {
  33. assert(dst != nullptr);
  34. for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) {
  35. assert(ts_sz != 0);
  36. PutFixed32(dst, cf_id);
  37. PutFixed16(dst, static_cast<uint16_t>(ts_sz));
  38. }
  39. }
  40. inline Status DecodeFrom(Slice* src) {
  41. const size_t total_size = src->size();
  42. if ((total_size % kSizePerColumnFamily) != 0) {
  43. std::ostringstream oss;
  44. oss << "User-defined timestamp size record length: " << total_size
  45. << " is not a multiple of " << kSizePerColumnFamily << std::endl;
  46. return Status::Corruption(oss.str());
  47. }
  48. int num_of_entries = static_cast<int>(total_size / kSizePerColumnFamily);
  49. for (int i = 0; i < num_of_entries; i++) {
  50. uint32_t cf_id = 0;
  51. uint16_t ts_sz = 0;
  52. if (!GetFixed32(src, &cf_id) || !GetFixed16(src, &ts_sz)) {
  53. return Status::Corruption(
  54. "Error decoding user-defined timestamp size record entry");
  55. }
  56. cf_to_ts_sz_.emplace_back(cf_id, static_cast<size_t>(ts_sz));
  57. }
  58. return Status::OK();
  59. }
  60. inline std::string DebugString() const {
  61. std::ostringstream oss;
  62. for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) {
  63. oss << "Column family: " << cf_id
  64. << ", user-defined timestamp size: " << ts_sz << std::endl;
  65. }
  66. return oss.str();
  67. }
  68. private:
  69. // 4 bytes for column family id, 2 bytes for user-defined timestamp size.
  70. static constexpr size_t kSizePerColumnFamily = 4 + 2;
  71. std::vector<std::pair<uint32_t, size_t>> cf_to_ts_sz_;
  72. };
  73. // This handler is used to recover a WriteBatch read from WAL logs during
  74. // recovery. It does a best-effort recovery if the column families contained in
  75. // the WriteBatch have inconsistency between the recorded timestamp size and the
  76. // running timestamp size. And creates a new WriteBatch that are consistent with
  77. // the running timestamp size with entries from the original WriteBatch.
  78. //
  79. // Note that for a WriteBatch with no inconsistency, a new WriteBatch is created
  80. // nonetheless, and it should be exactly the same as the original WriteBatch.
  81. //
  82. // To access the new WriteBatch, invoke `TransferNewBatch` after calling
  83. // `Iterate`. The handler becomes invalid afterwards.
  84. //
  85. // For the user key in each entry, the best effort recovery means:
  86. // 1) If recorded timestamp size is 0, running timestamp size is > 0, a min
  87. // timestamp of length running timestamp size is padded to the user key.
  88. // 2) If recorded timestamp size is > 0, running timestamp size is 0, the last
  89. // bytes of length recorded timestamp size is stripped from user key.
  90. // 3) If recorded timestamp size is the same as running timestamp size, no-op.
  91. // 4) If recorded timestamp size and running timestamp size are both non-zero
  92. // but not equal, return Status::InvalidArgument.
  93. class TimestampRecoveryHandler : public WriteBatch::Handler {
  94. public:
  95. TimestampRecoveryHandler(const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  96. const UnorderedMap<uint32_t, size_t>& record_ts_sz,
  97. bool seq_per_batch, bool batch_per_txn);
  98. ~TimestampRecoveryHandler() override {}
  99. // No copy or move.
  100. TimestampRecoveryHandler(const TimestampRecoveryHandler&) = delete;
  101. TimestampRecoveryHandler(TimestampRecoveryHandler&&) = delete;
  102. TimestampRecoveryHandler& operator=(const TimestampRecoveryHandler&) = delete;
  103. TimestampRecoveryHandler& operator=(TimestampRecoveryHandler&&) = delete;
  104. Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override;
  105. Status PutEntityCF(uint32_t cf, const Slice& key,
  106. const Slice& entity) override;
  107. Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& value,
  108. uint64_t write_time) override;
  109. Status DeleteCF(uint32_t cf, const Slice& key) override;
  110. Status SingleDeleteCF(uint32_t cf, const Slice& key) override;
  111. Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
  112. const Slice& end_key) override;
  113. Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override;
  114. Status PutBlobIndexCF(uint32_t cf, const Slice& key,
  115. const Slice& value) override;
  116. Status MarkBeginPrepare(bool unprepare) override;
  117. Status MarkEndPrepare(const Slice& name) override;
  118. Status MarkCommit(const Slice& name) override;
  119. Status MarkCommitWithTimestamp(const Slice& name,
  120. const Slice& commit_ts) override;
  121. Status MarkRollback(const Slice& name) override;
  122. Status MarkNoop(bool empty_batch) override;
  123. std::unique_ptr<WriteBatch>&& TransferNewBatch() {
  124. assert(new_batch_diff_from_orig_batch_);
  125. handler_valid_ = false;
  126. return std::move(new_batch_);
  127. }
  128. protected:
  129. Handler::OptionState WriteBeforePrepare() const override {
  130. return write_before_prepare_ ? Handler::OptionState::kEnabled
  131. : Handler::OptionState::kDisabled;
  132. }
  133. Handler::OptionState WriteAfterCommit() const override {
  134. return write_after_commit_ ? Handler::OptionState::kEnabled
  135. : Handler::OptionState::kDisabled;
  136. }
  137. private:
  138. Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key,
  139. std::string* new_key_buf,
  140. Slice* new_key);
  141. // Mapping from column family id to user-defined timestamp size for all
  142. // running column families including the ones with zero timestamp size.
  143. const UnorderedMap<uint32_t, size_t>& running_ts_sz_;
  144. // Mapping from column family id to user-defined timestamp size as recorded
  145. // in the WAL. This only contains non-zero user-defined timestamp size.
  146. const UnorderedMap<uint32_t, size_t>& record_ts_sz_;
  147. bool write_after_commit_;
  148. bool write_before_prepare_;
  149. std::unique_ptr<WriteBatch> new_batch_;
  150. // Handler is valid upon creation and becomes invalid after its `new_batch_`
  151. // is transferred.
  152. bool handler_valid_;
  153. // False upon creation, and become true if at least one user key from the
  154. // original batch is updated when creating the new batch.
  155. bool new_batch_diff_from_orig_batch_;
  156. };
  157. // Mode for checking and handling timestamp size inconsistency encountered in a
  158. // WriteBatch read from WAL log.
  159. enum class TimestampSizeConsistencyMode {
  160. // Verified that the recorded user-defined timestamp size is consistent with
  161. // the running one for all the column families involved in a WriteBatch.
  162. // Column families referred to in the WriteBatch but are dropped are ignored.
  163. kVerifyConsistency,
  164. // Verified that if any inconsistency exists in a WriteBatch, it's all
  165. // tolerable by a best-effort reconciliation. And optionally creates a new
  166. // WriteBatch from the original WriteBatch that is consistent with the running
  167. // timestamp size. Column families referred to in the WriteBatch but are
  168. // dropped are ignored. If a new WriteBatch is created, such entries are
  169. // copied over as is.
  170. kReconcileInconsistency,
  171. };
  172. // Handles the inconsistency between recorded timestamp sizes and running
  173. // timestamp sizes for a WriteBatch. A non-OK `status` indicates there are
  174. // intolerable inconsistency with the specified `check_mode`.
  175. //
  176. // If `check_mode` is `kVerifyConsistency`, intolerable inconsistency means any
  177. // running column family has an inconsistent user-defined timestamp size.
  178. //
  179. // If `check_mode` is `kReconcileInconsistency`, intolerable inconsistency means
  180. // any running column family has an inconsistent user-defined timestamp size
  181. // that cannot be reconciled with a best-effort recovery. Check
  182. // `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In
  183. // this mode, output argument `new_batch` should be set, a new WriteBatch is
  184. // created on the heap and transferred to `new_batch` if there is tolerable
  185. // inconsistency.
  186. //
  187. // An invariant that WAL logging ensures is that all timestamp size info
  188. // is logged prior to a WriteBatch that needed this info. And zero timestamp
  189. // size is skipped. So `record_ts_sz` only contains column family with non-zero
  190. // timestamp size and a column family id absent from `record_ts_sz` will be
  191. // interpreted as that column family has zero timestamp size. On the other hand,
  192. // `running_ts_sz` should contain the timestamp size for all running column
  193. // families including the ones with zero timestamp size.
  194. Status HandleWriteBatchTimestampSizeDifference(
  195. const WriteBatch* batch,
  196. const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  197. const UnorderedMap<uint32_t, size_t>& record_ts_sz,
  198. TimestampSizeConsistencyMode check_mode, bool seq_per_batch,
  199. bool batch_per_txn, std::unique_ptr<WriteBatch>* new_batch = nullptr);
  200. // This util function is used when opening an existing column family and
  201. // processing its VersionEdit. It does a sanity check for the column family's
  202. // old user comparator and the persist_user_defined_timestamps flag as recorded
  203. // in the VersionEdit, against its new settings from the column family's
  204. // ImmutableCFOptions.
  205. //
  206. // Valid settings change include:
  207. // 1) no user comparator change and no effective persist_user_defined_timestamp
  208. // flag change.
  209. // 2) switch user comparator to enable user-defined timestamps feature provided
  210. // the immediately effective persist_user_defined_timestamps flag is false.
  211. // 3) switch user comparator to disable user-defined timestamps feature provided
  212. // that the before-change persist_user_defined_timestamps is already false.
  213. //
  214. // Switch user comparator to disable/enable UDT is only sanity checked by a user
  215. // comparator name comparison. The full check includes enforcing the new user
  216. // comparator ranks user keys exactly the same as the old user comparator and
  217. // only add / remove the user-defined timestamp comparison. We don't have ways
  218. // to strictly enforce this so currently only the RocksDB builtin comparator
  219. // wrapper `ComparatorWithU64TsImpl` is supported to enable / disable
  220. // user-defined timestamps. It formats user-defined timestamps as uint64_t.
  221. //
  222. // When the settings indicate a legit change to enable user-defined timestamps
  223. // feature on a column family, `mark_sst_files_has_no_udt` will be set to true
  224. // to indicate marking all existing SST files has no user-defined timestamps
  225. // when re-writing the manifest.
  226. Status ValidateUserDefinedTimestampsOptions(
  227. const Comparator* new_comparator, const std::string& old_comparator_name,
  228. bool new_persist_udt, bool old_persist_udt,
  229. bool* mark_sst_files_has_no_udt);
  230. // Given a cutoff user-defined timestamp formatted as uint64_t, get the
  231. // effective `full_history_ts_low` timestamp, which is the next immediately
  232. // bigger timestamp. Used by the UDT in memtable only feature when flushing
  233. // memtables and remove timestamps. This process collapses history and increase
  234. // the effective `full_history_ts_low`.
  235. void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
  236. std::string* full_history_ts_low);
  237. // The reverse of `GetFullHistoryTsLowFromU64CutoffTs`.
  238. void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
  239. std::string* cutoff_ts);
  240. // `start` is the inclusive lower user key bound without user-defined timestamp.
  241. // `end` is the upper user key bound without user-defined timestamp.
  242. // By default, `end` is treated as being exclusive. If `exclusive_end` is set to
  243. // false, it's treated as an inclusive upper bound. For either bound that has no
  244. // value, a "no value" OptSlice is returned for that bound.
  245. std::tuple<OptSlice, OptSlice> MaybeAddTimestampsToRange(
  246. const OptSlice& start, const OptSlice& end, size_t ts_sz,
  247. std::string* start_with_ts, std::string* end_with_ts,
  248. bool exclusive_end = true);
  249. } // namespace ROCKSDB_NAMESPACE