version_edit.cc 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_edit.h"
  10. #include "db/blob/blob_index.h"
  11. #include "db/version_set.h"
  12. #include "logging/event_logger.h"
  13. #include "rocksdb/slice.h"
  14. #include "table/unique_id_impl.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/coding.h"
  17. #include "util/string_util.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. namespace {} // anonymous namespace
  20. uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
  21. assert(number <= kFileNumberMask);
  22. return number | (path_id * (kFileNumberMask + 1));
  23. }
  24. Status FileMetaData::UpdateBoundaries(const Slice& key, const Slice& value,
  25. SequenceNumber seqno,
  26. ValueType value_type) {
  27. if (value_type == kTypeBlobIndex) {
  28. BlobIndex blob_index;
  29. const Status s = blob_index.DecodeFrom(value);
  30. if (!s.ok()) {
  31. return s;
  32. }
  33. if (!blob_index.IsInlined() && !blob_index.HasTTL()) {
  34. if (blob_index.file_number() == kInvalidBlobFileNumber) {
  35. return Status::Corruption("Invalid blob file number");
  36. }
  37. if (oldest_blob_file_number == kInvalidBlobFileNumber ||
  38. oldest_blob_file_number > blob_index.file_number()) {
  39. oldest_blob_file_number = blob_index.file_number();
  40. }
  41. }
  42. }
  43. if (smallest.size() == 0) {
  44. smallest.DecodeFrom(key);
  45. }
  46. largest.DecodeFrom(key);
  47. fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
  48. fd.largest_seqno = std::max(fd.largest_seqno, seqno);
  49. return Status::OK();
  50. }
  51. void VersionEdit::Clear() { *this = VersionEdit(); }
  52. bool VersionEdit::EncodeTo(std::string* dst,
  53. std::optional<size_t> ts_sz) const {
  54. assert(!IsNoManifestWriteDummy());
  55. if (has_db_id_) {
  56. PutVarint32(dst, kDbId);
  57. PutLengthPrefixedSlice(dst, db_id_);
  58. }
  59. if (has_comparator_) {
  60. assert(has_persist_user_defined_timestamps_);
  61. PutVarint32(dst, kComparator);
  62. PutLengthPrefixedSlice(dst, comparator_);
  63. }
  64. if (has_log_number_) {
  65. PutVarint32Varint64(dst, kLogNumber, log_number_);
  66. }
  67. if (has_prev_log_number_) {
  68. PutVarint32Varint64(dst, kPrevLogNumber, prev_log_number_);
  69. }
  70. if (has_next_file_number_) {
  71. PutVarint32Varint64(dst, kNextFileNumber, next_file_number_);
  72. }
  73. if (has_max_column_family_) {
  74. PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
  75. }
  76. if (has_min_log_number_to_keep_) {
  77. PutVarint32Varint64(dst, kMinLogNumberToKeep, min_log_number_to_keep_);
  78. }
  79. if (has_last_sequence_) {
  80. PutVarint32Varint64(dst, kLastSequence, last_sequence_);
  81. }
  82. for (size_t i = 0; i < compact_cursors_.size(); i++) {
  83. if (compact_cursors_[i].second.Valid()) {
  84. PutVarint32(dst, kCompactCursor);
  85. PutVarint32(dst, compact_cursors_[i].first); // level
  86. PutLengthPrefixedSlice(dst, compact_cursors_[i].second.Encode());
  87. }
  88. }
  89. for (const auto& deleted : deleted_files_) {
  90. PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
  91. deleted.second /* file number */);
  92. }
  93. bool min_log_num_written = false;
  94. assert(new_files_.empty() || ts_sz.has_value());
  95. for (size_t i = 0; i < new_files_.size(); i++) {
  96. const FileMetaData& f = new_files_[i].second;
  97. if (!f.smallest.Valid() || !f.largest.Valid() ||
  98. f.epoch_number == kUnknownEpochNumber) {
  99. return false;
  100. }
  101. EncodeToNewFile4(f, new_files_[i].first, ts_sz.value(),
  102. has_min_log_number_to_keep_, min_log_number_to_keep_,
  103. min_log_num_written, dst);
  104. }
  105. for (const auto& blob_file_addition : blob_file_additions_) {
  106. PutVarint32(dst, kBlobFileAddition);
  107. blob_file_addition.EncodeTo(dst);
  108. }
  109. for (const auto& blob_file_garbage : blob_file_garbages_) {
  110. PutVarint32(dst, kBlobFileGarbage);
  111. blob_file_garbage.EncodeTo(dst);
  112. }
  113. for (const auto& wal_addition : wal_additions_) {
  114. PutVarint32(dst, kWalAddition2);
  115. std::string encoded;
  116. wal_addition.EncodeTo(&encoded);
  117. PutLengthPrefixedSlice(dst, encoded);
  118. }
  119. if (!wal_deletion_.IsEmpty()) {
  120. PutVarint32(dst, kWalDeletion2);
  121. std::string encoded;
  122. wal_deletion_.EncodeTo(&encoded);
  123. PutLengthPrefixedSlice(dst, encoded);
  124. }
  125. // 0 is default and does not need to be explicitly written
  126. if (column_family_ != 0) {
  127. PutVarint32Varint32(dst, kColumnFamily, column_family_);
  128. }
  129. if (is_column_family_add_) {
  130. PutVarint32(dst, kColumnFamilyAdd);
  131. PutLengthPrefixedSlice(dst, Slice(column_family_name_));
  132. }
  133. if (is_column_family_drop_) {
  134. PutVarint32(dst, kColumnFamilyDrop);
  135. }
  136. if (is_in_atomic_group_) {
  137. PutVarint32(dst, kInAtomicGroup);
  138. PutVarint32(dst, remaining_entries_);
  139. }
  140. if (HasFullHistoryTsLow()) {
  141. PutVarint32(dst, kFullHistoryTsLow);
  142. PutLengthPrefixedSlice(dst, full_history_ts_low_);
  143. }
  144. if (HasPersistUserDefinedTimestamps()) {
  145. // persist_user_defined_timestamps flag should be logged in the same
  146. // VersionEdit as the user comparator name.
  147. assert(has_comparator_);
  148. PutVarint32(dst, kPersistUserDefinedTimestamps);
  149. char p = static_cast<char>(persist_user_defined_timestamps_);
  150. PutLengthPrefixedSlice(dst, Slice(&p, 1));
  151. }
  152. if (HasSubcompactionProgress()) {
  153. PutVarint32(dst, kSubcompactionProgress);
  154. std::string progress_data;
  155. subcompaction_progress_.EncodeTo(&progress_data);
  156. PutLengthPrefixedSlice(dst, progress_data);
  157. }
  158. return true;
  159. }
  160. void VersionEdit::EncodeToNewFile4(const FileMetaData& f, int level,
  161. size_t ts_sz,
  162. bool has_min_log_number_to_keep,
  163. uint64_t min_log_number_to_keep,
  164. bool& min_log_num_written,
  165. std::string* dst) {
  166. PutVarint32(dst, kNewFile4);
  167. PutVarint32Varint64(dst, level, f.fd.GetNumber());
  168. PutVarint64(dst, f.fd.GetFileSize());
  169. EncodeFileBoundaries(dst, f, ts_sz);
  170. PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
  171. // Customized fields' format:
  172. // +-----------------------------+
  173. // | 1st field's tag (varint32) |
  174. // +-----------------------------+
  175. // | 1st field's size (varint32) |
  176. // +-----------------------------+
  177. // | bytes for 1st field |
  178. // | (based on size decoded) |
  179. // +-----------------------------+
  180. // | |
  181. // | ...... |
  182. // | |
  183. // +-----------------------------+
  184. // | last field's size (varint32)|
  185. // +-----------------------------+
  186. // | bytes for last field |
  187. // | (based on size decoded) |
  188. // +-----------------------------+
  189. // | terminating tag (varint32) |
  190. // +-----------------------------+
  191. //
  192. // Customized encoding for fields:
  193. // tag kPathId: 1 byte as path_id
  194. // tag kNeedCompaction:
  195. // now only can take one char value 1 indicating need-compaction
  196. //
  197. PutVarint32(dst, NewFileCustomTag::kOldestAncesterTime);
  198. std::string varint_oldest_ancester_time;
  199. PutVarint64(&varint_oldest_ancester_time, f.oldest_ancester_time);
  200. TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime",
  201. &varint_oldest_ancester_time);
  202. PutLengthPrefixedSlice(dst, Slice(varint_oldest_ancester_time));
  203. PutVarint32(dst, NewFileCustomTag::kFileCreationTime);
  204. std::string varint_file_creation_time;
  205. PutVarint64(&varint_file_creation_time, f.file_creation_time);
  206. TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintFileCreationTime",
  207. &varint_file_creation_time);
  208. PutLengthPrefixedSlice(dst, Slice(varint_file_creation_time));
  209. PutVarint32(dst, NewFileCustomTag::kEpochNumber);
  210. std::string varint_epoch_number;
  211. PutVarint64(&varint_epoch_number, f.epoch_number);
  212. PutLengthPrefixedSlice(dst, Slice(varint_epoch_number));
  213. if (f.file_checksum_func_name != kUnknownFileChecksumFuncName) {
  214. PutVarint32(dst, NewFileCustomTag::kFileChecksum);
  215. PutLengthPrefixedSlice(dst, Slice(f.file_checksum));
  216. PutVarint32(dst, NewFileCustomTag::kFileChecksumFuncName);
  217. PutLengthPrefixedSlice(dst, Slice(f.file_checksum_func_name));
  218. }
  219. if (f.fd.GetPathId() != 0) {
  220. PutVarint32(dst, NewFileCustomTag::kPathId);
  221. char p = static_cast<char>(f.fd.GetPathId());
  222. PutLengthPrefixedSlice(dst, Slice(&p, 1));
  223. }
  224. if (f.temperature != Temperature::kUnknown) {
  225. PutVarint32(dst, NewFileCustomTag::kTemperature);
  226. char p = static_cast<char>(f.temperature);
  227. PutLengthPrefixedSlice(dst, Slice(&p, 1));
  228. }
  229. if (f.marked_for_compaction) {
  230. PutVarint32(dst, NewFileCustomTag::kNeedCompaction);
  231. char p = static_cast<char>(1);
  232. PutLengthPrefixedSlice(dst, Slice(&p, 1));
  233. }
  234. if (has_min_log_number_to_keep && !min_log_num_written) {
  235. PutVarint32(dst, NewFileCustomTag::kMinLogNumberToKeepHack);
  236. std::string varint_log_number;
  237. PutFixed64(&varint_log_number, min_log_number_to_keep);
  238. PutLengthPrefixedSlice(dst, Slice(varint_log_number));
  239. min_log_num_written = true;
  240. }
  241. if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
  242. PutVarint32(dst, NewFileCustomTag::kOldestBlobFileNumber);
  243. std::string oldest_blob_file_number;
  244. PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
  245. PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
  246. }
  247. UniqueId64x2 unique_id = f.unique_id;
  248. TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:UniqueId", &unique_id);
  249. if (unique_id != kNullUniqueId64x2) {
  250. PutVarint32(dst, NewFileCustomTag::kUniqueId);
  251. std::string unique_id_str = EncodeUniqueIdBytes(&unique_id);
  252. PutLengthPrefixedSlice(dst, Slice(unique_id_str));
  253. }
  254. if (f.compensated_range_deletion_size) {
  255. PutVarint32(dst, NewFileCustomTag::kCompensatedRangeDeletionSize);
  256. std::string compensated_range_deletion_size;
  257. PutVarint64(&compensated_range_deletion_size,
  258. f.compensated_range_deletion_size);
  259. PutLengthPrefixedSlice(dst, Slice(compensated_range_deletion_size));
  260. }
  261. if (f.tail_size) {
  262. PutVarint32(dst, NewFileCustomTag::kTailSize);
  263. std::string varint_tail_size;
  264. PutVarint64(&varint_tail_size, f.tail_size);
  265. PutLengthPrefixedSlice(dst, Slice(varint_tail_size));
  266. }
  267. if (!f.user_defined_timestamps_persisted) {
  268. // The default value for the flag is true, it's only explicitly persisted
  269. // when it's false. We are putting 0 as the value here to signal false
  270. // (i.e. UDTS not persisted).
  271. PutVarint32(dst, NewFileCustomTag::kUserDefinedTimestampsPersisted);
  272. char p = static_cast<char>(0);
  273. PutLengthPrefixedSlice(dst, Slice(&p, 1));
  274. }
  275. TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
  276. dst);
  277. PutVarint32(dst, NewFileCustomTag::kTerminate);
  278. }
  279. static bool GetInternalKey(Slice* input, InternalKey* dst) {
  280. Slice str;
  281. if (GetLengthPrefixedSlice(input, &str)) {
  282. dst->DecodeFrom(str);
  283. return dst->Valid();
  284. } else {
  285. return false;
  286. }
  287. }
  288. bool VersionEdit::GetLevel(Slice* input, int* level, int& max_level) {
  289. uint32_t v = 0;
  290. if (GetVarint32(input, &v)) {
  291. *level = v;
  292. if (max_level < *level) {
  293. max_level = *level;
  294. }
  295. return true;
  296. } else {
  297. return false;
  298. }
  299. }
  300. const char* VersionEdit::DecodeNewFile4From(Slice* input, int& max_level,
  301. uint64_t& min_log_number_to_keep,
  302. bool& has_min_log_number_to_keep,
  303. NewFiles& new_files,
  304. FileMetaData& f) {
  305. int level = 0;
  306. uint64_t number = 0;
  307. uint32_t path_id = 0;
  308. uint64_t file_size = 0;
  309. SequenceNumber smallest_seqno = 0;
  310. SequenceNumber largest_seqno = kMaxSequenceNumber;
  311. if (GetLevel(input, &level, max_level) && GetVarint64(input, &number) &&
  312. GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
  313. GetInternalKey(input, &f.largest) &&
  314. GetVarint64(input, &smallest_seqno) &&
  315. GetVarint64(input, &largest_seqno)) {
  316. // See comments in VersionEdit::EncodeTo() for format of customized fields
  317. while (true) {
  318. uint32_t custom_tag = 0;
  319. Slice field;
  320. if (!GetVarint32(input, &custom_tag)) {
  321. return "new-file4 custom field";
  322. }
  323. if (custom_tag == kTerminate) {
  324. break;
  325. }
  326. if (!GetLengthPrefixedSlice(input, &field)) {
  327. return "new-file4 custom field length prefixed slice error";
  328. }
  329. switch (custom_tag) {
  330. case kPathId:
  331. if (field.size() != 1) {
  332. return "path_id field wrong size";
  333. }
  334. path_id = field[0];
  335. if (path_id > 3) {
  336. return "path_id wrong vaue";
  337. }
  338. break;
  339. case kOldestAncesterTime:
  340. if (!GetVarint64(&field, &f.oldest_ancester_time)) {
  341. return "invalid oldest ancester time";
  342. }
  343. break;
  344. case kFileCreationTime:
  345. if (!GetVarint64(&field, &f.file_creation_time)) {
  346. return "invalid file creation time";
  347. }
  348. break;
  349. case kEpochNumber:
  350. if (!GetVarint64(&field, &f.epoch_number)) {
  351. return "invalid epoch number";
  352. }
  353. break;
  354. case kFileChecksum:
  355. f.file_checksum = field.ToString();
  356. break;
  357. case kFileChecksumFuncName:
  358. f.file_checksum_func_name = field.ToString();
  359. break;
  360. case kNeedCompaction:
  361. if (field.size() != 1) {
  362. return "need_compaction field wrong size";
  363. }
  364. f.marked_for_compaction = (field[0] == 1);
  365. break;
  366. case kMinLogNumberToKeepHack:
  367. // This is a hack to encode kMinLogNumberToKeep in a
  368. // forward-compatible fashion.
  369. if (!GetFixed64(&field, &min_log_number_to_keep)) {
  370. return "deleted log number malformatted";
  371. }
  372. has_min_log_number_to_keep = true;
  373. break;
  374. case kOldestBlobFileNumber:
  375. if (!GetVarint64(&field, &f.oldest_blob_file_number)) {
  376. return "invalid oldest blob file number";
  377. }
  378. break;
  379. case kTemperature:
  380. if (field.size() != 1) {
  381. return "temperature field wrong size";
  382. } else {
  383. Temperature casted_field = static_cast<Temperature>(field[0]);
  384. if (casted_field < Temperature::kLastTemperature) {
  385. f.temperature = casted_field;
  386. }
  387. }
  388. break;
  389. case kUniqueId:
  390. if (!DecodeUniqueIdBytes(field.ToString(), &f.unique_id).ok()) {
  391. f.unique_id = kNullUniqueId64x2;
  392. return "invalid unique id";
  393. }
  394. break;
  395. case kCompensatedRangeDeletionSize:
  396. if (!GetVarint64(&field, &f.compensated_range_deletion_size)) {
  397. return "Invalid compensated range deletion size";
  398. }
  399. break;
  400. case kTailSize:
  401. if (!GetVarint64(&field, &f.tail_size)) {
  402. return "invalid tail start offset";
  403. }
  404. break;
  405. case kUserDefinedTimestampsPersisted:
  406. if (field.size() != 1) {
  407. return "user-defined timestamps persisted field wrong size";
  408. }
  409. f.user_defined_timestamps_persisted = (field[0] == 1);
  410. break;
  411. default:
  412. if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
  413. // Should not proceed if cannot understand it
  414. return "new-file4 custom field not supported";
  415. }
  416. break;
  417. }
  418. }
  419. } else {
  420. return "new-file4 entry";
  421. }
  422. f.fd =
  423. FileDescriptor(number, path_id, file_size, smallest_seqno, largest_seqno);
  424. new_files.emplace_back(level, f);
  425. return nullptr;
  426. }
  427. void VersionEdit::EncodeFileBoundaries(std::string* dst,
  428. const FileMetaData& meta, size_t ts_sz) {
  429. if (ts_sz == 0 || meta.user_defined_timestamps_persisted) {
  430. PutLengthPrefixedSlice(dst, meta.smallest.Encode());
  431. PutLengthPrefixedSlice(dst, meta.largest.Encode());
  432. return;
  433. }
  434. std::string smallest_buf;
  435. std::string largest_buf;
  436. StripTimestampFromInternalKey(&smallest_buf, meta.smallest.Encode(), ts_sz);
  437. StripTimestampFromInternalKey(&largest_buf, meta.largest.Encode(), ts_sz);
  438. PutLengthPrefixedSlice(dst, smallest_buf);
  439. PutLengthPrefixedSlice(dst, largest_buf);
  440. }
  441. Status VersionEdit::DecodeFrom(const Slice& src) {
  442. Clear();
  443. #ifndef NDEBUG
  444. bool ignore_ignorable_tags = false;
  445. TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags",
  446. &ignore_ignorable_tags);
  447. #endif
  448. Slice input = src;
  449. const char* msg = nullptr;
  450. uint32_t tag = 0;
  451. // Temporary storage for parsing
  452. int level = 0;
  453. FileMetaData f;
  454. Slice str;
  455. InternalKey key;
  456. while (msg == nullptr && GetVarint32(&input, &tag)) {
  457. #ifndef NDEBUG
  458. if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) {
  459. tag = kTagSafeIgnoreMask;
  460. }
  461. #endif
  462. switch (tag) {
  463. case kDbId:
  464. if (GetLengthPrefixedSlice(&input, &str)) {
  465. db_id_ = str.ToString();
  466. has_db_id_ = true;
  467. } else {
  468. msg = "db id";
  469. }
  470. break;
  471. case kComparator:
  472. if (GetLengthPrefixedSlice(&input, &str)) {
  473. comparator_ = str.ToString();
  474. has_comparator_ = true;
  475. } else {
  476. msg = "comparator name";
  477. }
  478. break;
  479. case kLogNumber:
  480. if (GetVarint64(&input, &log_number_)) {
  481. has_log_number_ = true;
  482. } else {
  483. msg = "log number";
  484. }
  485. break;
  486. case kPrevLogNumber:
  487. if (GetVarint64(&input, &prev_log_number_)) {
  488. has_prev_log_number_ = true;
  489. } else {
  490. msg = "previous log number";
  491. }
  492. break;
  493. case kNextFileNumber:
  494. if (GetVarint64(&input, &next_file_number_)) {
  495. has_next_file_number_ = true;
  496. } else {
  497. msg = "next file number";
  498. }
  499. break;
  500. case kMaxColumnFamily:
  501. if (GetVarint32(&input, &max_column_family_)) {
  502. has_max_column_family_ = true;
  503. } else {
  504. msg = "max column family";
  505. }
  506. break;
  507. case kMinLogNumberToKeep:
  508. if (GetVarint64(&input, &min_log_number_to_keep_)) {
  509. has_min_log_number_to_keep_ = true;
  510. } else {
  511. msg = "min log number to kee";
  512. }
  513. break;
  514. case kLastSequence:
  515. if (GetVarint64(&input, &last_sequence_)) {
  516. has_last_sequence_ = true;
  517. } else {
  518. msg = "last sequence number";
  519. }
  520. break;
  521. case kCompactCursor:
  522. if (GetLevel(&input, &level, max_level_) &&
  523. GetInternalKey(&input, &key)) {
  524. // Here we re-use the output format of compact pointer in LevelDB
  525. // to persist compact_cursors_
  526. compact_cursors_.push_back(std::make_pair(level, key));
  527. } else {
  528. if (!msg) {
  529. msg = "compaction cursor";
  530. }
  531. }
  532. break;
  533. case kDeletedFile: {
  534. uint64_t number = 0;
  535. if (GetLevel(&input, &level, max_level_) &&
  536. GetVarint64(&input, &number)) {
  537. deleted_files_.insert(std::make_pair(level, number));
  538. } else {
  539. if (!msg) {
  540. msg = "deleted file";
  541. }
  542. }
  543. break;
  544. }
  545. case kNewFile: {
  546. uint64_t number = 0;
  547. uint64_t file_size = 0;
  548. if (GetLevel(&input, &level, max_level_) &&
  549. GetVarint64(&input, &number) && GetVarint64(&input, &file_size) &&
  550. GetInternalKey(&input, &f.smallest) &&
  551. GetInternalKey(&input, &f.largest)) {
  552. f.fd = FileDescriptor(number, 0, file_size);
  553. new_files_.push_back(std::make_pair(level, f));
  554. } else {
  555. if (!msg) {
  556. msg = "new-file entry";
  557. }
  558. }
  559. break;
  560. }
  561. case kNewFile2: {
  562. uint64_t number = 0;
  563. uint64_t file_size = 0;
  564. SequenceNumber smallest_seqno = 0;
  565. SequenceNumber largest_seqno = kMaxSequenceNumber;
  566. if (GetLevel(&input, &level, max_level_) &&
  567. GetVarint64(&input, &number) && GetVarint64(&input, &file_size) &&
  568. GetInternalKey(&input, &f.smallest) &&
  569. GetInternalKey(&input, &f.largest) &&
  570. GetVarint64(&input, &smallest_seqno) &&
  571. GetVarint64(&input, &largest_seqno)) {
  572. f.fd = FileDescriptor(number, 0, file_size, smallest_seqno,
  573. largest_seqno);
  574. new_files_.push_back(std::make_pair(level, f));
  575. } else {
  576. if (!msg) {
  577. msg = "new-file2 entry";
  578. }
  579. }
  580. break;
  581. }
  582. case kNewFile3: {
  583. uint64_t number = 0;
  584. uint32_t path_id = 0;
  585. uint64_t file_size = 0;
  586. SequenceNumber smallest_seqno = 0;
  587. SequenceNumber largest_seqno = kMaxSequenceNumber;
  588. if (GetLevel(&input, &level, max_level_) &&
  589. GetVarint64(&input, &number) && GetVarint32(&input, &path_id) &&
  590. GetVarint64(&input, &file_size) &&
  591. GetInternalKey(&input, &f.smallest) &&
  592. GetInternalKey(&input, &f.largest) &&
  593. GetVarint64(&input, &smallest_seqno) &&
  594. GetVarint64(&input, &largest_seqno)) {
  595. f.fd = FileDescriptor(number, path_id, file_size, smallest_seqno,
  596. largest_seqno);
  597. new_files_.push_back(std::make_pair(level, f));
  598. } else {
  599. if (!msg) {
  600. msg = "new-file3 entry";
  601. }
  602. }
  603. break;
  604. }
  605. case kNewFile4: {
  606. FileMetaData ignored_file;
  607. msg = DecodeNewFile4From(&input, max_level_, min_log_number_to_keep_,
  608. has_min_log_number_to_keep_, new_files_,
  609. ignored_file);
  610. break;
  611. }
  612. case kBlobFileAddition:
  613. case kBlobFileAddition_DEPRECATED: {
  614. BlobFileAddition blob_file_addition;
  615. const Status s = blob_file_addition.DecodeFrom(&input);
  616. if (!s.ok()) {
  617. return s;
  618. }
  619. AddBlobFile(std::move(blob_file_addition));
  620. break;
  621. }
  622. case kBlobFileGarbage:
  623. case kBlobFileGarbage_DEPRECATED: {
  624. BlobFileGarbage blob_file_garbage;
  625. const Status s = blob_file_garbage.DecodeFrom(&input);
  626. if (!s.ok()) {
  627. return s;
  628. }
  629. AddBlobFileGarbage(std::move(blob_file_garbage));
  630. break;
  631. }
  632. case kWalAddition: {
  633. WalAddition wal_addition;
  634. const Status s = wal_addition.DecodeFrom(&input);
  635. if (!s.ok()) {
  636. return s;
  637. }
  638. wal_additions_.emplace_back(std::move(wal_addition));
  639. break;
  640. }
  641. case kWalAddition2: {
  642. Slice encoded;
  643. if (!GetLengthPrefixedSlice(&input, &encoded)) {
  644. msg = "WalAddition not prefixed by length";
  645. break;
  646. }
  647. WalAddition wal_addition;
  648. const Status s = wal_addition.DecodeFrom(&encoded);
  649. if (!s.ok()) {
  650. return s;
  651. }
  652. wal_additions_.emplace_back(std::move(wal_addition));
  653. break;
  654. }
  655. case kWalDeletion: {
  656. WalDeletion wal_deletion;
  657. const Status s = wal_deletion.DecodeFrom(&input);
  658. if (!s.ok()) {
  659. return s;
  660. }
  661. wal_deletion_ = std::move(wal_deletion);
  662. break;
  663. }
  664. case kWalDeletion2: {
  665. Slice encoded;
  666. if (!GetLengthPrefixedSlice(&input, &encoded)) {
  667. msg = "WalDeletion not prefixed by length";
  668. break;
  669. }
  670. WalDeletion wal_deletion;
  671. const Status s = wal_deletion.DecodeFrom(&encoded);
  672. if (!s.ok()) {
  673. return s;
  674. }
  675. wal_deletion_ = std::move(wal_deletion);
  676. break;
  677. }
  678. case kColumnFamily:
  679. if (!GetVarint32(&input, &column_family_)) {
  680. if (!msg) {
  681. msg = "set column family id";
  682. }
  683. }
  684. break;
  685. case kColumnFamilyAdd:
  686. if (GetLengthPrefixedSlice(&input, &str)) {
  687. is_column_family_add_ = true;
  688. column_family_name_ = str.ToString();
  689. } else {
  690. if (!msg) {
  691. msg = "column family add";
  692. }
  693. }
  694. break;
  695. case kColumnFamilyDrop:
  696. is_column_family_drop_ = true;
  697. break;
  698. case kInAtomicGroup:
  699. is_in_atomic_group_ = true;
  700. if (!GetVarint32(&input, &remaining_entries_)) {
  701. if (!msg) {
  702. msg = "remaining entries";
  703. }
  704. }
  705. break;
  706. case kFullHistoryTsLow:
  707. if (!GetLengthPrefixedSlice(&input, &str)) {
  708. msg = "full_history_ts_low";
  709. } else if (str.empty()) {
  710. msg = "full_history_ts_low: empty";
  711. } else {
  712. full_history_ts_low_.assign(str.data(), str.size());
  713. }
  714. break;
  715. case kPersistUserDefinedTimestamps:
  716. if (!GetLengthPrefixedSlice(&input, &str)) {
  717. msg = "persist_user_defined_timestamps";
  718. } else if (str.size() != 1) {
  719. msg = "persist_user_defined_timestamps field wrong size";
  720. } else {
  721. persist_user_defined_timestamps_ = (str[0] == 1);
  722. has_persist_user_defined_timestamps_ = true;
  723. }
  724. break;
  725. case kSubcompactionProgress: {
  726. Slice encoded;
  727. if (!GetLengthPrefixedSlice(&input, &encoded)) {
  728. msg = "SubcompactionProgress not prefixed by length";
  729. break;
  730. }
  731. SubcompactionProgress progress;
  732. Status s = progress.DecodeFrom(&encoded);
  733. if (!s.ok()) {
  734. return s;
  735. }
  736. SetSubcompactionProgress(progress);
  737. break;
  738. }
  739. default:
  740. if (tag & kTagSafeIgnoreMask) {
  741. // Tag from future which can be safely ignored.
  742. // The next field must be the length of the entry.
  743. uint32_t field_len;
  744. if (!GetVarint32(&input, &field_len) ||
  745. static_cast<size_t>(field_len) > input.size()) {
  746. if (!msg) {
  747. msg = "safely ignoreable tag length error";
  748. }
  749. } else {
  750. input.remove_prefix(static_cast<size_t>(field_len));
  751. }
  752. } else {
  753. msg = "unknown tag";
  754. }
  755. break;
  756. }
  757. }
  758. if (msg == nullptr && !input.empty()) {
  759. msg = "invalid tag";
  760. }
  761. Status result;
  762. if (msg != nullptr) {
  763. result = Status::Corruption("VersionEdit", msg);
  764. }
  765. return result;
  766. }
  767. std::string VersionEdit::DebugString(bool hex_key) const {
  768. std::string r;
  769. r.append("VersionEdit {");
  770. if (has_db_id_) {
  771. r.append("\n DB ID: ");
  772. r.append(db_id_);
  773. }
  774. if (has_comparator_) {
  775. r.append("\n Comparator: ");
  776. r.append(comparator_);
  777. }
  778. if (has_persist_user_defined_timestamps_) {
  779. r.append("\n PersistUserDefinedTimestamps: ");
  780. r.append(persist_user_defined_timestamps_ ? "true" : "false");
  781. }
  782. if (has_log_number_) {
  783. r.append("\n LogNumber: ");
  784. AppendNumberTo(&r, log_number_);
  785. }
  786. if (has_prev_log_number_) {
  787. r.append("\n PrevLogNumber: ");
  788. AppendNumberTo(&r, prev_log_number_);
  789. }
  790. if (has_next_file_number_) {
  791. r.append("\n NextFileNumber: ");
  792. AppendNumberTo(&r, next_file_number_);
  793. }
  794. if (has_max_column_family_) {
  795. r.append("\n MaxColumnFamily: ");
  796. AppendNumberTo(&r, max_column_family_);
  797. }
  798. if (has_min_log_number_to_keep_) {
  799. r.append("\n MinLogNumberToKeep: ");
  800. AppendNumberTo(&r, min_log_number_to_keep_);
  801. }
  802. if (has_last_sequence_) {
  803. r.append("\n LastSeq: ");
  804. AppendNumberTo(&r, last_sequence_);
  805. }
  806. for (const auto& level_and_compact_cursor : compact_cursors_) {
  807. r.append("\n CompactCursor: ");
  808. AppendNumberTo(&r, level_and_compact_cursor.first);
  809. r.append(" ");
  810. r.append(level_and_compact_cursor.second.DebugString(hex_key));
  811. }
  812. for (const auto& deleted_file : deleted_files_) {
  813. r.append("\n DeleteFile: ");
  814. AppendNumberTo(&r, deleted_file.first);
  815. r.append(" ");
  816. AppendNumberTo(&r, deleted_file.second);
  817. }
  818. for (size_t i = 0; i < new_files_.size(); i++) {
  819. const FileMetaData& f = new_files_[i].second;
  820. r.append("\n AddFile: ");
  821. AppendNumberTo(&r, new_files_[i].first);
  822. r.append(" ");
  823. AppendNumberTo(&r, f.fd.GetNumber());
  824. r.append(" ");
  825. AppendNumberTo(&r, f.fd.GetFileSize());
  826. r.append(" ");
  827. r.append(f.smallest.DebugString(hex_key));
  828. r.append(" .. ");
  829. r.append(f.largest.DebugString(hex_key));
  830. if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
  831. r.append(" blob_file:");
  832. AppendNumberTo(&r, f.oldest_blob_file_number);
  833. }
  834. r.append(" oldest_ancester_time:");
  835. AppendNumberTo(&r, f.oldest_ancester_time);
  836. r.append(" file_creation_time:");
  837. AppendNumberTo(&r, f.file_creation_time);
  838. r.append(" epoch_number:");
  839. AppendNumberTo(&r, f.epoch_number);
  840. r.append(" file_checksum:");
  841. r.append(Slice(f.file_checksum).ToString(true));
  842. r.append(" file_checksum_func_name: ");
  843. r.append(f.file_checksum_func_name);
  844. if (f.temperature != Temperature::kUnknown) {
  845. r.append(" temperature: ");
  846. // Maybe change to human readable format whenthe feature becomes
  847. // permanent
  848. r.append(std::to_string(static_cast<int>(f.temperature)));
  849. }
  850. if (f.unique_id != kNullUniqueId64x2) {
  851. r.append(" unique_id(internal): ");
  852. UniqueId64x2 id = f.unique_id;
  853. r.append(InternalUniqueIdToHumanString(&id));
  854. r.append(" public_unique_id: ");
  855. InternalUniqueIdToExternal(&id);
  856. r.append(UniqueIdToHumanString(EncodeUniqueIdBytes(&id)));
  857. }
  858. r.append(" tail size: ");
  859. AppendNumberTo(&r, f.tail_size);
  860. r.append(" User-defined timestamps persisted: ");
  861. r.append(f.user_defined_timestamps_persisted ? "true" : "false");
  862. }
  863. for (const auto& blob_file_addition : blob_file_additions_) {
  864. r.append("\n BlobFileAddition: ");
  865. r.append(blob_file_addition.DebugString());
  866. }
  867. for (const auto& blob_file_garbage : blob_file_garbages_) {
  868. r.append("\n BlobFileGarbage: ");
  869. r.append(blob_file_garbage.DebugString());
  870. }
  871. for (const auto& wal_addition : wal_additions_) {
  872. r.append("\n WalAddition: ");
  873. r.append(wal_addition.DebugString());
  874. }
  875. if (!wal_deletion_.IsEmpty()) {
  876. r.append("\n WalDeletion: ");
  877. r.append(wal_deletion_.DebugString());
  878. }
  879. r.append("\n ColumnFamily: ");
  880. AppendNumberTo(&r, column_family_);
  881. if (is_column_family_add_) {
  882. r.append("\n ColumnFamilyAdd: ");
  883. r.append(column_family_name_);
  884. }
  885. if (is_column_family_drop_) {
  886. r.append("\n ColumnFamilyDrop");
  887. }
  888. if (is_in_atomic_group_) {
  889. r.append("\n AtomicGroup: ");
  890. AppendNumberTo(&r, remaining_entries_);
  891. r.append(" entries remains");
  892. }
  893. if (HasFullHistoryTsLow()) {
  894. r.append("\n FullHistoryTsLow: ");
  895. r.append(Slice(full_history_ts_low_).ToString(hex_key));
  896. }
  897. if (HasSubcompactionProgress()) {
  898. r.append("\n SubcompactionProgress: ");
  899. r.append(subcompaction_progress_.ToString());
  900. }
  901. r.append("\n}\n");
  902. return r;
  903. }
  904. std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
  905. JSONWriter jw;
  906. jw << "EditNumber" << edit_num;
  907. if (has_db_id_) {
  908. jw << "DB ID" << db_id_;
  909. }
  910. if (has_comparator_) {
  911. jw << "Comparator" << comparator_;
  912. }
  913. if (has_log_number_) {
  914. jw << "LogNumber" << log_number_;
  915. }
  916. if (has_prev_log_number_) {
  917. jw << "PrevLogNumber" << prev_log_number_;
  918. }
  919. if (has_next_file_number_) {
  920. jw << "NextFileNumber" << next_file_number_;
  921. }
  922. if (has_max_column_family_) {
  923. jw << "MaxColumnFamily" << max_column_family_;
  924. }
  925. if (has_min_log_number_to_keep_) {
  926. jw << "MinLogNumberToKeep" << min_log_number_to_keep_;
  927. }
  928. if (has_last_sequence_) {
  929. jw << "LastSeq" << last_sequence_;
  930. }
  931. if (!deleted_files_.empty()) {
  932. jw << "DeletedFiles";
  933. jw.StartArray();
  934. for (const auto& deleted_file : deleted_files_) {
  935. jw.StartArrayedObject();
  936. jw << "Level" << deleted_file.first;
  937. jw << "FileNumber" << deleted_file.second;
  938. jw.EndArrayedObject();
  939. }
  940. jw.EndArray();
  941. }
  942. if (!new_files_.empty()) {
  943. jw << "AddedFiles";
  944. jw.StartArray();
  945. for (size_t i = 0; i < new_files_.size(); i++) {
  946. jw.StartArrayedObject();
  947. jw << "Level" << new_files_[i].first;
  948. const FileMetaData& f = new_files_[i].second;
  949. jw << "FileNumber" << f.fd.GetNumber();
  950. jw << "FileSize" << f.fd.GetFileSize();
  951. jw << "SmallestIKey" << f.smallest.DebugString(hex_key);
  952. jw << "LargestIKey" << f.largest.DebugString(hex_key);
  953. jw << "OldestAncesterTime" << f.oldest_ancester_time;
  954. jw << "FileCreationTime" << f.file_creation_time;
  955. jw << "EpochNumber" << f.epoch_number;
  956. jw << "FileChecksum" << Slice(f.file_checksum).ToString(true);
  957. jw << "FileChecksumFuncName" << f.file_checksum_func_name;
  958. if (f.temperature != Temperature::kUnknown) {
  959. jw << "temperature" << std::to_string(static_cast<int>(f.temperature));
  960. }
  961. if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
  962. jw << "OldestBlobFile" << f.oldest_blob_file_number;
  963. }
  964. if (f.temperature != Temperature::kUnknown) {
  965. // Maybe change to human readable format whenthe feature becomes
  966. // permanent
  967. jw << "Temperature" << static_cast<int>(f.temperature);
  968. }
  969. jw << "TailSize" << f.tail_size;
  970. jw << "UserDefinedTimestampsPersisted"
  971. << f.user_defined_timestamps_persisted;
  972. jw.EndArrayedObject();
  973. }
  974. jw.EndArray();
  975. }
  976. if (!blob_file_additions_.empty()) {
  977. jw << "BlobFileAdditions";
  978. jw.StartArray();
  979. for (const auto& blob_file_addition : blob_file_additions_) {
  980. jw.StartArrayedObject();
  981. jw << blob_file_addition;
  982. jw.EndArrayedObject();
  983. }
  984. jw.EndArray();
  985. }
  986. if (!blob_file_garbages_.empty()) {
  987. jw << "BlobFileGarbages";
  988. jw.StartArray();
  989. for (const auto& blob_file_garbage : blob_file_garbages_) {
  990. jw.StartArrayedObject();
  991. jw << blob_file_garbage;
  992. jw.EndArrayedObject();
  993. }
  994. jw.EndArray();
  995. }
  996. if (!wal_additions_.empty()) {
  997. jw << "WalAdditions";
  998. jw.StartArray();
  999. for (const auto& wal_addition : wal_additions_) {
  1000. jw.StartArrayedObject();
  1001. jw << wal_addition;
  1002. jw.EndArrayedObject();
  1003. }
  1004. jw.EndArray();
  1005. }
  1006. if (!wal_deletion_.IsEmpty()) {
  1007. jw << "WalDeletion";
  1008. jw.StartObject();
  1009. jw << wal_deletion_;
  1010. jw.EndObject();
  1011. }
  1012. jw << "ColumnFamily" << column_family_;
  1013. if (is_column_family_add_) {
  1014. jw << "ColumnFamilyAdd" << column_family_name_;
  1015. }
  1016. if (is_column_family_drop_) {
  1017. jw << "ColumnFamilyDrop" << column_family_name_;
  1018. }
  1019. if (is_in_atomic_group_) {
  1020. jw << "AtomicGroup" << remaining_entries_;
  1021. }
  1022. if (HasFullHistoryTsLow()) {
  1023. jw << "FullHistoryTsLow" << Slice(full_history_ts_low_).ToString(hex_key);
  1024. }
  1025. if (HasSubcompactionProgress()) {
  1026. jw << "SubcompactionProgress" << subcompaction_progress_.ToString();
  1027. }
  1028. jw.EndObject();
  1029. return jw.Get();
  1030. }
  1031. void SubcompactionProgressPerLevel::EncodeTo(std::string* dst) const {
  1032. if (num_processed_output_records_ > 0) {
  1033. PutVarint32(
  1034. dst,
  1035. SubcompactionProgressPerLevelCustomTag::kNumProcessedOutputRecords);
  1036. std::string varint_records;
  1037. PutVarint64(&varint_records, num_processed_output_records_);
  1038. PutLengthPrefixedSlice(dst, varint_records);
  1039. }
  1040. if (!output_files_.empty()) {
  1041. PutVarint32(dst, SubcompactionProgressPerLevelCustomTag::kOutputFilesDelta);
  1042. std::string files_data;
  1043. EncodeOutputFiles(&files_data);
  1044. PutLengthPrefixedSlice(dst, files_data);
  1045. }
  1046. PutVarint32(dst, SubcompactionProgressPerLevelCustomTag::
  1047. kSubcompactionProgressPerLevelTerminate);
  1048. }
  1049. Status SubcompactionProgressPerLevel::DecodeFrom(Slice* input) {
  1050. Clear();
  1051. while (true) {
  1052. uint32_t tag = 0;
  1053. if (!GetVarint32(input, &tag)) {
  1054. return Status::Corruption("SubcompactionProgressPerLevel", "tag error");
  1055. }
  1056. if (tag == SubcompactionProgressPerLevelCustomTag::
  1057. kSubcompactionProgressPerLevelTerminate) {
  1058. break;
  1059. }
  1060. Slice field;
  1061. if (!GetLengthPrefixedSlice(input, &field)) {
  1062. return Status::Corruption("SubcompactionProgressPerLevel",
  1063. "field length prefixed slice error");
  1064. }
  1065. switch (tag) {
  1066. case SubcompactionProgressPerLevelCustomTag::kNumProcessedOutputRecords: {
  1067. if (!GetVarint64(&field, &num_processed_output_records_)) {
  1068. return Status::Corruption("SubcompactionProgressPerLevel",
  1069. "invalid num_processed_output_records_");
  1070. }
  1071. break;
  1072. }
  1073. case SubcompactionProgressPerLevelCustomTag::kOutputFilesDelta: {
  1074. Status s = DecodeOutputFiles(&field, output_files_);
  1075. if (!s.ok()) {
  1076. return s;
  1077. }
  1078. break;
  1079. }
  1080. default:
  1081. // Forward compatibility: Handle unknown tags
  1082. if ((tag & SubcompactionProgressPerLevelCustomTag::
  1083. kSubcompactionProgressPerLevelCustomTagSafeIgnoreMask) !=
  1084. 0) {
  1085. break;
  1086. } else {
  1087. return Status::NotSupported("SubcompactionProgress",
  1088. "unsupported critical custom field");
  1089. }
  1090. }
  1091. }
  1092. return Status::OK();
  1093. }
  1094. void SubcompactionProgressPerLevel::EncodeOutputFiles(std::string* dst) const {
  1095. size_t new_files_count =
  1096. output_files_.size() > last_persisted_output_files_count_
  1097. ? output_files_.size() - last_persisted_output_files_count_
  1098. : 0;
  1099. assert(new_files_count > 0);
  1100. PutVarint32(dst, static_cast<uint32_t>(new_files_count));
  1101. for (size_t i = last_persisted_output_files_count_; i < output_files_.size();
  1102. ++i) {
  1103. std::string file_dst;
  1104. bool ignored_min_log_written = false;
  1105. VersionEdit::EncodeToNewFile4(
  1106. output_files_[i], -1 /* level */, 0 /* ts_sz */,
  1107. false /* has_min_log_number_to_keep */, 0 /* min_log_number_to_keep */,
  1108. ignored_min_log_written, &file_dst);
  1109. PutLengthPrefixedSlice(dst, file_dst);
  1110. }
  1111. }
  1112. Status SubcompactionProgressPerLevel::DecodeOutputFiles(
  1113. Slice* input, autovector<FileMetaData>& output_files) {
  1114. uint32_t new_file_count = 0;
  1115. if (!GetVarint32(input, &new_file_count)) {
  1116. return Status::Corruption("SubcompactionProgressPerLevel",
  1117. "new output file count");
  1118. }
  1119. assert(output_files.size() == 0);
  1120. output_files.reserve(new_file_count);
  1121. for (uint32_t i = 0; i < new_file_count; ++i) {
  1122. Slice file_input;
  1123. if (!GetLengthPrefixedSlice(input, &file_input)) {
  1124. return Status::Corruption("SubcompactionProgressPerLevel",
  1125. "output file metadata");
  1126. }
  1127. uint32_t tag = 0;
  1128. if (!GetVarint32(&file_input, &tag) || tag != kNewFile4) {
  1129. return Status::Corruption("SubcompactionProgressPerLevel",
  1130. "expected kNewFile4 tag");
  1131. }
  1132. int ignored_max_level = -1;
  1133. uint64_t ignored_min_log_number_to_keep = 0;
  1134. bool ignored_has_min_log_number_to_keep = false;
  1135. VersionEdit::NewFiles ignored_new_files;
  1136. FileMetaData file;
  1137. const char* err = VersionEdit::DecodeNewFile4From(
  1138. &file_input, ignored_max_level, ignored_min_log_number_to_keep,
  1139. ignored_has_min_log_number_to_keep, ignored_new_files, file);
  1140. if (err != nullptr) {
  1141. return Status::Corruption("SubcompactionProgressPerLevel", err);
  1142. }
  1143. output_files.push_back(std::move(file));
  1144. }
  1145. return Status::OK();
  1146. }
  1147. void SubcompactionProgress::EncodeTo(std::string* dst) const {
  1148. if (!next_internal_key_to_compact.empty()) {
  1149. PutVarint32(dst, SubcompactionProgressCustomTag::kNextInternalKeyToCompact);
  1150. PutLengthPrefixedSlice(dst, next_internal_key_to_compact);
  1151. }
  1152. PutVarint32(dst, SubcompactionProgressCustomTag::kNumProcessedInputRecords);
  1153. std::string varint_records;
  1154. PutVarint64(&varint_records, num_processed_input_records);
  1155. PutLengthPrefixedSlice(dst, varint_records);
  1156. if (output_level_progress.GetOutputFiles().size() >
  1157. output_level_progress.GetLastPersistedOutputFilesCount()) {
  1158. PutVarint32(dst, SubcompactionProgressCustomTag::kOutputLevelProgress);
  1159. std::string level_progress_data;
  1160. output_level_progress.EncodeTo(&level_progress_data);
  1161. PutLengthPrefixedSlice(dst, level_progress_data);
  1162. }
  1163. if (proximal_output_level_progress.GetOutputFiles().size() >
  1164. proximal_output_level_progress.GetLastPersistedOutputFilesCount()) {
  1165. PutVarint32(dst,
  1166. SubcompactionProgressCustomTag::kProximalOutputLevelProgress);
  1167. std::string level_progress_data;
  1168. proximal_output_level_progress.EncodeTo(&level_progress_data);
  1169. PutLengthPrefixedSlice(dst, level_progress_data);
  1170. }
  1171. PutVarint32(dst,
  1172. SubcompactionProgressCustomTag::kSubcompactionProgressTerminate);
  1173. }
  1174. Status SubcompactionProgress::DecodeFrom(Slice* input) {
  1175. Clear();
  1176. while (true) {
  1177. uint32_t custom_tag = 0;
  1178. if (!GetVarint32(input, &custom_tag)) {
  1179. return Status::Corruption("SubcompactionProgress",
  1180. "custom field tag error");
  1181. }
  1182. if (custom_tag ==
  1183. SubcompactionProgressCustomTag::kSubcompactionProgressTerminate) {
  1184. break;
  1185. }
  1186. Slice field;
  1187. if (!GetLengthPrefixedSlice(input, &field)) {
  1188. return Status::Corruption("SubcompactionProgress",
  1189. "custom field length prefixed slice error");
  1190. }
  1191. switch (custom_tag) {
  1192. case SubcompactionProgressCustomTag::kNextInternalKeyToCompact:
  1193. next_internal_key_to_compact = field.ToString();
  1194. break;
  1195. case SubcompactionProgressCustomTag::kNumProcessedInputRecords:
  1196. if (!GetVarint64(&field, &num_processed_input_records)) {
  1197. return Status::Corruption("SubcompactionProgress",
  1198. "invalid num_processed_input_records");
  1199. }
  1200. break;
  1201. case SubcompactionProgressCustomTag::kOutputLevelProgress: {
  1202. Status s = output_level_progress.DecodeFrom(&field);
  1203. if (!s.ok()) {
  1204. return s;
  1205. }
  1206. break;
  1207. }
  1208. case SubcompactionProgressCustomTag::kProximalOutputLevelProgress: {
  1209. Status s = proximal_output_level_progress.DecodeFrom(&field);
  1210. if (!s.ok()) {
  1211. return s;
  1212. }
  1213. break;
  1214. }
  1215. default:
  1216. if ((custom_tag & SubcompactionProgressCustomTag::
  1217. kSubcompactionProgressCustomTagSafeIgnoreMask) !=
  1218. 0) {
  1219. break;
  1220. } else {
  1221. return Status::NotSupported("SubcompactionProgress",
  1222. "unsupported critical custom field");
  1223. }
  1224. }
  1225. }
  1226. return Status::OK();
  1227. }
  1228. bool SubcompactionProgressBuilder::ProcessVersionEdit(const VersionEdit& edit) {
  1229. if (!edit.HasSubcompactionProgress()) {
  1230. return false;
  1231. }
  1232. const SubcompactionProgress& progress = edit.GetSubcompactionProgress();
  1233. MergeDeltaProgress(progress);
  1234. has_subcompaction_progress_ = true;
  1235. return true;
  1236. }
  1237. void SubcompactionProgressBuilder::MergeDeltaProgress(
  1238. const SubcompactionProgress& delta_progress) {
  1239. accumulated_subcompaction_progress_.next_internal_key_to_compact =
  1240. delta_progress.next_internal_key_to_compact;
  1241. accumulated_subcompaction_progress_.num_processed_input_records =
  1242. delta_progress.num_processed_input_records;
  1243. MaybeMergeDeltaProgressPerLevel(
  1244. accumulated_subcompaction_progress_.output_level_progress,
  1245. delta_progress.output_level_progress);
  1246. MaybeMergeDeltaProgressPerLevel(
  1247. accumulated_subcompaction_progress_.proximal_output_level_progress,
  1248. delta_progress.proximal_output_level_progress);
  1249. }
  1250. void SubcompactionProgressBuilder::MaybeMergeDeltaProgressPerLevel(
  1251. SubcompactionProgressPerLevel& accumulated_level_progress,
  1252. const SubcompactionProgressPerLevel& delta_level_progress) {
  1253. const auto& delta_files = delta_level_progress.GetOutputFiles();
  1254. if (delta_files.empty()) {
  1255. return;
  1256. }
  1257. for (const FileMetaData& file : delta_files) {
  1258. accumulated_level_progress.AddToOutputFiles(file); // Stored as copy
  1259. }
  1260. accumulated_level_progress.SetNumProcessedOutputRecords(
  1261. delta_level_progress.GetNumProcessedOutputRecords());
  1262. }
  1263. void SubcompactionProgressBuilder::Clear() {
  1264. accumulated_subcompaction_progress_.Clear();
  1265. has_subcompaction_progress_ = false;
  1266. }
  1267. } // namespace ROCKSDB_NAMESPACE