udt_util.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "util/udt_util.h"
  7. #include "db/dbformat.h"
  8. #include "rocksdb/types.h"
  9. #include "util/coding.h"
  10. #include "util/write_batch_util.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. namespace {
  13. enum class RecoveryType {
  14. kNoop,
  15. kUnrecoverable,
  16. kStripTimestamp,
  17. kPadTimestamp,
  18. };
  19. RecoveryType GetRecoveryType(const size_t running_ts_sz,
  20. const std::optional<size_t>& recorded_ts_sz) {
  21. if (running_ts_sz == 0) {
  22. if (!recorded_ts_sz.has_value()) {
  23. // A column family id not recorded is equivalent to that column family has
  24. // zero timestamp size.
  25. return RecoveryType::kNoop;
  26. }
  27. return RecoveryType::kStripTimestamp;
  28. }
  29. assert(running_ts_sz != 0);
  30. if (!recorded_ts_sz.has_value()) {
  31. return RecoveryType::kPadTimestamp;
  32. }
  33. if (running_ts_sz != *recorded_ts_sz) {
  34. return RecoveryType::kUnrecoverable;
  35. }
  36. return RecoveryType::kNoop;
  37. }
  38. bool AllRunningColumnFamiliesConsistent(
  39. const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  40. const UnorderedMap<uint32_t, size_t>& record_ts_sz) {
  41. for (const auto& [cf_id, ts_sz] : running_ts_sz) {
  42. auto record_it = record_ts_sz.find(cf_id);
  43. RecoveryType recovery_type =
  44. GetRecoveryType(ts_sz, record_it != record_ts_sz.end()
  45. ? std::optional<size_t>(record_it->second)
  46. : std::nullopt);
  47. if (recovery_type != RecoveryType::kNoop) {
  48. return false;
  49. }
  50. }
  51. return true;
  52. }
  53. Status CheckWriteBatchTimestampSizeConsistency(
  54. const WriteBatch* batch,
  55. const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  56. const UnorderedMap<uint32_t, size_t>& record_ts_sz,
  57. TimestampSizeConsistencyMode check_mode, bool* ts_need_recovery) {
  58. std::vector<uint32_t> column_family_ids;
  59. Status status =
  60. CollectColumnFamilyIdsFromWriteBatch(*batch, &column_family_ids);
  61. if (!status.ok()) {
  62. return status;
  63. }
  64. for (const auto& cf_id : column_family_ids) {
  65. auto running_iter = running_ts_sz.find(cf_id);
  66. if (running_iter == running_ts_sz.end()) {
  67. // Ignore dropped column family referred to in a WriteBatch regardless of
  68. // its consistency.
  69. continue;
  70. }
  71. auto record_iter = record_ts_sz.find(cf_id);
  72. RecoveryType recovery_type = GetRecoveryType(
  73. running_iter->second, record_iter != record_ts_sz.end()
  74. ? std::optional<size_t>(record_iter->second)
  75. : std::nullopt);
  76. if (recovery_type != RecoveryType::kNoop) {
  77. if (check_mode == TimestampSizeConsistencyMode::kVerifyConsistency) {
  78. return Status::InvalidArgument(
  79. "WriteBatch contains timestamp size inconsistency.");
  80. }
  81. if (recovery_type == RecoveryType::kUnrecoverable) {
  82. return Status::InvalidArgument(
  83. "WriteBatch contains unrecoverable timestamp size inconsistency.");
  84. }
  85. // If any column family needs reconciliation, it will mark the whole
  86. // WriteBatch to need recovery and rebuilt.
  87. *ts_need_recovery = true;
  88. }
  89. }
  90. return Status::OK();
  91. }
  92. enum class ToggleUDT {
  93. kUnchanged,
  94. kEnableUDT,
  95. kDisableUDT,
  96. kInvalidChange,
  97. };
  98. ToggleUDT CompareComparator(const Comparator* new_comparator,
  99. const std::string& old_comparator_name) {
  100. static const char* kUDTSuffix = ".u64ts";
  101. static const Slice kSuffixSlice = kUDTSuffix;
  102. static const size_t kSuffixSize = 6;
  103. size_t ts_sz = new_comparator->timestamp_size();
  104. (void)ts_sz;
  105. Slice new_ucmp_name(new_comparator->Name());
  106. Slice old_ucmp_name(old_comparator_name);
  107. if (new_ucmp_name.compare(old_ucmp_name) == 0) {
  108. return ToggleUDT::kUnchanged;
  109. }
  110. if (new_ucmp_name.size() == old_ucmp_name.size() + kSuffixSize &&
  111. new_ucmp_name.starts_with(old_ucmp_name) &&
  112. new_ucmp_name.ends_with(kSuffixSlice)) {
  113. assert(ts_sz == 8);
  114. return ToggleUDT::kEnableUDT;
  115. }
  116. if (old_ucmp_name.size() == new_ucmp_name.size() + kSuffixSize &&
  117. old_ucmp_name.starts_with(new_ucmp_name) &&
  118. old_ucmp_name.ends_with(kSuffixSlice)) {
  119. assert(ts_sz == 0);
  120. return ToggleUDT::kDisableUDT;
  121. }
  122. return ToggleUDT::kInvalidChange;
  123. }
  124. } // namespace
  125. TimestampRecoveryHandler::TimestampRecoveryHandler(
  126. const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  127. const UnorderedMap<uint32_t, size_t>& record_ts_sz, bool seq_per_batch,
  128. bool batch_per_txn)
  129. : running_ts_sz_(running_ts_sz),
  130. record_ts_sz_(record_ts_sz),
  131. // Write after commit currently uses one seq per key (instead of per
  132. // batch). So seq_per_batch being false indicates write_after_commit
  133. // approach.
  134. write_after_commit_(!seq_per_batch),
  135. // WriteUnprepared can write multiple WriteBatches per transaction, so
  136. // batch_per_txn being false indicates write_before_prepare.
  137. write_before_prepare_(!batch_per_txn),
  138. new_batch_(new WriteBatch()),
  139. handler_valid_(true),
  140. new_batch_diff_from_orig_batch_(false) {}
  141. Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key,
  142. const Slice& value) {
  143. std::string new_key_buf;
  144. Slice new_key;
  145. Status status =
  146. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  147. if (!status.ok()) {
  148. return status;
  149. }
  150. return WriteBatchInternal::Put(new_batch_.get(), cf, new_key, value);
  151. }
  152. Status TimestampRecoveryHandler::PutEntityCF(uint32_t cf, const Slice& key,
  153. const Slice& entity) {
  154. std::string new_key_buf;
  155. Slice new_key;
  156. Status status = TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
  157. cf, key, &new_key_buf, &new_key);
  158. if (!status.ok()) {
  159. return status;
  160. }
  161. Slice entity_copy = entity;
  162. WideColumns columns;
  163. if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
  164. return Status::Corruption("Unable to deserialize entity",
  165. entity.ToString(/* hex */ true));
  166. }
  167. return WriteBatchInternal::PutEntity(new_batch_.get(), cf, new_key, columns);
  168. }
  169. Status TimestampRecoveryHandler::TimedPutCF(uint32_t cf, const Slice& key,
  170. const Slice& value,
  171. uint64_t write_time) {
  172. std::string new_key_buf;
  173. Slice new_key;
  174. Status status =
  175. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  176. if (!status.ok()) {
  177. return status;
  178. }
  179. return WriteBatchInternal::TimedPut(new_batch_.get(), cf, new_key, value,
  180. write_time);
  181. }
  182. Status TimestampRecoveryHandler::DeleteCF(uint32_t cf, const Slice& key) {
  183. std::string new_key_buf;
  184. Slice new_key;
  185. Status status =
  186. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  187. if (!status.ok()) {
  188. return status;
  189. }
  190. return WriteBatchInternal::Delete(new_batch_.get(), cf, new_key);
  191. }
  192. Status TimestampRecoveryHandler::SingleDeleteCF(uint32_t cf, const Slice& key) {
  193. std::string new_key_buf;
  194. Slice new_key;
  195. Status status =
  196. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  197. if (!status.ok()) {
  198. return status;
  199. }
  200. return WriteBatchInternal::SingleDelete(new_batch_.get(), cf, new_key);
  201. }
  202. Status TimestampRecoveryHandler::DeleteRangeCF(uint32_t cf,
  203. const Slice& begin_key,
  204. const Slice& end_key) {
  205. std::string new_begin_key_buf;
  206. Slice new_begin_key;
  207. std::string new_end_key_buf;
  208. Slice new_end_key;
  209. Status status = ReconcileTimestampDiscrepancy(
  210. cf, begin_key, &new_begin_key_buf, &new_begin_key);
  211. if (!status.ok()) {
  212. return status;
  213. }
  214. status = ReconcileTimestampDiscrepancy(cf, end_key, &new_end_key_buf,
  215. &new_end_key);
  216. if (!status.ok()) {
  217. return status;
  218. }
  219. return WriteBatchInternal::DeleteRange(new_batch_.get(), cf, new_begin_key,
  220. new_end_key);
  221. }
  222. Status TimestampRecoveryHandler::MergeCF(uint32_t cf, const Slice& key,
  223. const Slice& value) {
  224. std::string new_key_buf;
  225. Slice new_key;
  226. Status status =
  227. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  228. if (!status.ok()) {
  229. return status;
  230. }
  231. return WriteBatchInternal::Merge(new_batch_.get(), cf, new_key, value);
  232. }
  233. Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key,
  234. const Slice& value) {
  235. std::string new_key_buf;
  236. Slice new_key;
  237. Status status =
  238. ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
  239. if (!status.ok()) {
  240. return status;
  241. }
  242. return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value);
  243. }
  244. Status TimestampRecoveryHandler::MarkBeginPrepare(bool unprepare) {
  245. // Transaction policy change requires empty WAL and User-defined timestamp is
  246. // only supported for write committed txns.
  247. // WriteBatch::Iterate has will handle this based on
  248. // handler->WriteAfterCommit() and handler->WriteBeforePrepare().
  249. if (unprepare) {
  250. return Status::InvalidArgument(
  251. "Handle user defined timestamp setting change is not supported for"
  252. "write unprepared policy. The WAL must be emptied.");
  253. }
  254. return WriteBatchInternal::InsertBeginPrepare(new_batch_.get(),
  255. write_after_commit_,
  256. /* unprepared_batch */ false);
  257. }
  258. Status TimestampRecoveryHandler::MarkEndPrepare(const Slice& name) {
  259. return WriteBatchInternal::InsertEndPrepare(new_batch_.get(), name);
  260. }
  261. Status TimestampRecoveryHandler::MarkCommit(const Slice& name) {
  262. return WriteBatchInternal::MarkCommit(new_batch_.get(), name);
  263. }
  264. Status TimestampRecoveryHandler::MarkCommitWithTimestamp(
  265. const Slice& name, const Slice& commit_ts) {
  266. return WriteBatchInternal::MarkCommitWithTimestamp(new_batch_.get(), name,
  267. commit_ts);
  268. }
  269. Status TimestampRecoveryHandler::MarkRollback(const Slice& name) {
  270. return WriteBatchInternal::MarkRollback(new_batch_.get(), name);
  271. }
  272. Status TimestampRecoveryHandler::MarkNoop(bool /*empty_batch*/) {
  273. return WriteBatchInternal::InsertNoop(new_batch_.get());
  274. }
  275. Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
  276. uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) {
  277. assert(handler_valid_);
  278. auto running_iter = running_ts_sz_.find(cf);
  279. if (running_iter == running_ts_sz_.end()) {
  280. // The column family referred to by the WriteBatch is no longer running.
  281. // Copy over the entry as is to the new WriteBatch.
  282. *new_key = key;
  283. return Status::OK();
  284. }
  285. size_t running_ts_sz = running_iter->second;
  286. auto record_iter = record_ts_sz_.find(cf);
  287. std::optional<size_t> record_ts_sz =
  288. record_iter != record_ts_sz_.end()
  289. ? std::optional<size_t>(record_iter->second)
  290. : std::nullopt;
  291. RecoveryType recovery_type = GetRecoveryType(running_ts_sz, record_ts_sz);
  292. switch (recovery_type) {
  293. case RecoveryType::kNoop:
  294. *new_key = key;
  295. break;
  296. case RecoveryType::kStripTimestamp:
  297. assert(record_ts_sz.has_value());
  298. *new_key = StripTimestampFromUserKey(key, *record_ts_sz);
  299. new_batch_diff_from_orig_batch_ = true;
  300. break;
  301. case RecoveryType::kPadTimestamp:
  302. AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz);
  303. *new_key = *new_key_buf;
  304. new_batch_diff_from_orig_batch_ = true;
  305. break;
  306. case RecoveryType::kUnrecoverable:
  307. return Status::InvalidArgument(
  308. "Unrecoverable timestamp size inconsistency encountered by "
  309. "TimestampRecoveryHandler.");
  310. default:
  311. assert(false);
  312. }
  313. return Status::OK();
  314. }
  315. Status HandleWriteBatchTimestampSizeDifference(
  316. const WriteBatch* batch,
  317. const UnorderedMap<uint32_t, size_t>& running_ts_sz,
  318. const UnorderedMap<uint32_t, size_t>& record_ts_sz,
  319. TimestampSizeConsistencyMode check_mode, bool seq_per_batch,
  320. bool batch_per_txn, std::unique_ptr<WriteBatch>* new_batch) {
  321. // Quick path to bypass checking the WriteBatch.
  322. if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
  323. return Status::OK();
  324. }
  325. bool need_recovery = false;
  326. Status status = CheckWriteBatchTimestampSizeConsistency(
  327. batch, running_ts_sz, record_ts_sz, check_mode, &need_recovery);
  328. if (!status.ok()) {
  329. return status;
  330. } else if (need_recovery) {
  331. assert(new_batch);
  332. SequenceNumber sequence = WriteBatchInternal::Sequence(batch);
  333. TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz,
  334. seq_per_batch, batch_per_txn);
  335. status = batch->Iterate(&recovery_handler);
  336. if (!status.ok()) {
  337. return status;
  338. } else {
  339. *new_batch = recovery_handler.TransferNewBatch();
  340. WriteBatchInternal::SetSequence(new_batch->get(), sequence);
  341. }
  342. }
  343. return Status::OK();
  344. }
  345. Status ValidateUserDefinedTimestampsOptions(
  346. const Comparator* new_comparator, const std::string& old_comparator_name,
  347. bool new_persist_udt, bool old_persist_udt,
  348. bool* mark_sst_files_has_no_udt) {
  349. size_t ts_sz = new_comparator->timestamp_size();
  350. ToggleUDT res = CompareComparator(new_comparator, old_comparator_name);
  351. switch (res) {
  352. case ToggleUDT::kUnchanged:
  353. if (old_persist_udt == new_persist_udt) {
  354. return Status::OK();
  355. }
  356. if (ts_sz == 0) {
  357. return Status::OK();
  358. }
  359. return Status::InvalidArgument(
  360. "Cannot toggle the persist_user_defined_timestamps flag for a column "
  361. "family with user-defined timestamps feature enabled.");
  362. case ToggleUDT::kEnableUDT:
  363. if (!new_persist_udt) {
  364. *mark_sst_files_has_no_udt = true;
  365. return Status::OK();
  366. }
  367. return Status::InvalidArgument(
  368. "Cannot open a column family and enable user-defined timestamps "
  369. "feature without setting persist_user_defined_timestamps flag to "
  370. "false.");
  371. case ToggleUDT::kDisableUDT:
  372. if (!old_persist_udt) {
  373. return Status::OK();
  374. }
  375. return Status::InvalidArgument(
  376. "Cannot open a column family and disable user-defined timestamps "
  377. "feature if its existing persist_user_defined_timestamps flag is not "
  378. "false.");
  379. case ToggleUDT::kInvalidChange:
  380. return Status::InvalidArgument(
  381. new_comparator->Name(),
  382. "does not match existing comparator " + old_comparator_name);
  383. default:
  384. break;
  385. }
  386. return Status::InvalidArgument(
  387. "Unsupported user defined timestamps settings change.");
  388. }
  389. void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
  390. std::string* full_history_ts_low) {
  391. uint64_t cutoff_udt_ts = 0;
  392. [[maybe_unused]] bool format_res = GetFixed64(cutoff_ts, &cutoff_udt_ts);
  393. assert(format_res);
  394. PutFixed64(full_history_ts_low, cutoff_udt_ts + 1);
  395. }
  396. void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
  397. std::string* cutoff_ts) {
  398. uint64_t full_history_ts_low_int = 0;
  399. [[maybe_unused]] bool format_res =
  400. GetFixed64(full_history_ts_low, &full_history_ts_low_int);
  401. assert(format_res);
  402. assert(full_history_ts_low_int > 0);
  403. if (full_history_ts_low_int > 0) {
  404. PutFixed64(cutoff_ts, full_history_ts_low_int - 1);
  405. } else {
  406. PutFixed64(cutoff_ts, 0);
  407. }
  408. }
  409. std::tuple<OptSlice, OptSlice> MaybeAddTimestampsToRange(
  410. const OptSlice& start, const OptSlice& end, size_t ts_sz,
  411. std::string* start_with_ts, std::string* end_with_ts, bool exclusive_end) {
  412. OptSlice ret_start, ret_end;
  413. if (start) {
  414. if (ts_sz == 0) {
  415. ret_start = *start;
  416. } else {
  417. // Maximum timestamp means including all keys with any timestamp for start
  418. AppendKeyWithMaxTimestamp(start_with_ts, *start, ts_sz);
  419. ret_start = Slice(*start_with_ts);
  420. }
  421. }
  422. if (end) {
  423. if (ts_sz == 0) {
  424. ret_end = *end;
  425. } else {
  426. if (exclusive_end) {
  427. // Append a maximum timestamp as the range limit is exclusive:
  428. // [start, end)
  429. AppendKeyWithMaxTimestamp(end_with_ts, *end, ts_sz);
  430. } else {
  431. // Append a minimum timestamp to end so the range limit is inclusive:
  432. // [start, end]
  433. AppendKeyWithMinTimestamp(end_with_ts, *end, ts_sz);
  434. }
  435. ret_end = Slice(*end_with_ts);
  436. }
  437. }
  438. return std::make_tuple(ret_start, ret_end);
  439. }
  440. } // namespace ROCKSDB_NAMESPACE