log_reader.cc 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/log_reader.h"
  10. #include <cstdio>
  11. #include "file/sequence_file_reader.h"
  12. #include "port/lang.h"
  13. #include "rocksdb/env.h"
  14. #include "test_util/sync_point.h"
  15. #include "util/coding.h"
  16. #include "util/crc32c.h"
  17. namespace ROCKSDB_NAMESPACE::log {
  18. Reader::Reporter::~Reporter() = default;
  19. Reader::Reader(std::shared_ptr<Logger> info_log,
  20. std::unique_ptr<SequentialFileReader>&& _file,
  21. Reporter* reporter, bool checksum, uint64_t log_num,
  22. bool track_and_verify_wals, bool stop_replay_for_corruption,
  23. uint64_t min_wal_number_to_keep,
  24. const PredecessorWALInfo& observed_predecessor_wal_info)
  25. : info_log_(info_log),
  26. file_(std::move(_file)),
  27. reporter_(reporter),
  28. checksum_(checksum),
  29. backing_store_(new char[kBlockSize]),
  30. buffer_(),
  31. eof_(false),
  32. read_error_(false),
  33. eof_offset_(0),
  34. last_record_offset_(0),
  35. end_of_buffer_offset_(0),
  36. log_number_(log_num),
  37. track_and_verify_wals_(track_and_verify_wals),
  38. stop_replay_for_corruption_(stop_replay_for_corruption),
  39. min_wal_number_to_keep_(min_wal_number_to_keep),
  40. observed_predecessor_wal_info_(observed_predecessor_wal_info),
  41. recycled_(false),
  42. first_record_read_(false),
  43. compression_type_(kNoCompression),
  44. compression_type_record_read_(false),
  45. uncompress_(nullptr),
  46. hash_state_(nullptr),
  47. uncompress_hash_state_(nullptr) {}
  48. Reader::~Reader() {
  49. delete[] backing_store_;
  50. if (uncompress_) {
  51. delete uncompress_;
  52. }
  53. if (hash_state_) {
  54. XXH3_freeState(hash_state_);
  55. }
  56. if (uncompress_hash_state_) {
  57. XXH3_freeState(uncompress_hash_state_);
  58. }
  59. }
  60. // For kAbsoluteConsistency, on clean shutdown we don't expect any error
  61. // in the log files. For other modes, we can ignore only incomplete records
  62. // in the last log file, which are presumably due to a write in progress
  63. // during restart (or from log recycling).
  64. //
  65. // TODO krad: Evaluate if we need to move to a more strict mode where we
  66. // restrict the inconsistency to only the last log
  67. // TODO (hx235): move `wal_recovery_mode` to be a member data like other
  68. // information (e.g, `stop_replay_for_corruption`) to decide whether to
  69. // check for and surface corruption in `ReadRecord()`
  70. bool Reader::ReadRecord(Slice* record, std::string* scratch,
  71. WALRecoveryMode wal_recovery_mode,
  72. uint64_t* record_checksum) {
  73. scratch->clear();
  74. record->clear();
  75. if (record_checksum != nullptr) {
  76. if (hash_state_ == nullptr) {
  77. hash_state_ = XXH3_createState();
  78. }
  79. XXH3_64bits_reset(hash_state_);
  80. }
  81. if (uncompress_) {
  82. uncompress_->Reset();
  83. }
  84. bool in_fragmented_record = false;
  85. // Record offset of the logical record that we're reading
  86. // 0 is a dummy value to make compilers happy
  87. uint64_t prospective_record_offset = 0;
  88. Slice fragment;
  89. for (;;) {
  90. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
  91. size_t drop_size = 0;
  92. const uint8_t record_type =
  93. ReadPhysicalRecord(&fragment, &drop_size, record_checksum);
  94. switch (record_type) {
  95. case kFullType:
  96. case kRecyclableFullType:
  97. if (in_fragmented_record && !scratch->empty()) {
  98. // Handle bug in earlier versions of log::Writer where
  99. // it could emit an empty kFirstType record at the tail end
  100. // of a block followed by a kFullType or kFirstType record
  101. // at the beginning of the next block.
  102. ReportCorruption(scratch->size(), "partial record without end(1)");
  103. }
  104. // No need to compute record_checksum since the record
  105. // consists of a single fragment and the checksum is computed
  106. // in ReadPhysicalRecord() if WAL compression is enabled
  107. if (record_checksum != nullptr && uncompress_ == nullptr) {
  108. // No need to stream since the record is a single fragment
  109. *record_checksum = XXH3_64bits(fragment.data(), fragment.size());
  110. }
  111. prospective_record_offset = physical_record_offset;
  112. scratch->clear();
  113. *record = fragment;
  114. last_record_offset_ = prospective_record_offset;
  115. first_record_read_ = true;
  116. return true;
  117. case kFirstType:
  118. case kRecyclableFirstType:
  119. if (in_fragmented_record && !scratch->empty()) {
  120. // Handle bug in earlier versions of log::Writer where
  121. // it could emit an empty kFirstType record at the tail end
  122. // of a block followed by a kFullType or kFirstType record
  123. // at the beginning of the next block.
  124. ReportCorruption(scratch->size(), "partial record without end(2)");
  125. XXH3_64bits_reset(hash_state_);
  126. }
  127. if (record_checksum != nullptr) {
  128. XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
  129. }
  130. prospective_record_offset = physical_record_offset;
  131. scratch->assign(fragment.data(), fragment.size());
  132. in_fragmented_record = true;
  133. break; // switch
  134. case kMiddleType:
  135. case kRecyclableMiddleType:
  136. if (!in_fragmented_record) {
  137. ReportCorruption(fragment.size(),
  138. "missing start of fragmented record(1)");
  139. } else {
  140. if (record_checksum != nullptr) {
  141. XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
  142. }
  143. scratch->append(fragment.data(), fragment.size());
  144. }
  145. break; // switch
  146. case kLastType:
  147. case kRecyclableLastType:
  148. if (!in_fragmented_record) {
  149. ReportCorruption(fragment.size(),
  150. "missing start of fragmented record(2)");
  151. } else {
  152. if (record_checksum != nullptr) {
  153. XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
  154. *record_checksum = XXH3_64bits_digest(hash_state_);
  155. }
  156. scratch->append(fragment.data(), fragment.size());
  157. *record = Slice(*scratch);
  158. last_record_offset_ = prospective_record_offset;
  159. first_record_read_ = true;
  160. return true;
  161. }
  162. break; // switch
  163. case kSetCompressionType: {
  164. if (compression_type_record_read_) {
  165. ReportCorruption(fragment.size(),
  166. "read multiple SetCompressionType records");
  167. }
  168. if (first_record_read_) {
  169. ReportCorruption(fragment.size(),
  170. "SetCompressionType not the first record");
  171. }
  172. prospective_record_offset = physical_record_offset;
  173. scratch->clear();
  174. last_record_offset_ = prospective_record_offset;
  175. CompressionTypeRecord compression_record(kNoCompression);
  176. Status s = compression_record.DecodeFrom(&fragment);
  177. if (!s.ok()) {
  178. ReportCorruption(fragment.size(),
  179. "could not decode SetCompressionType record");
  180. } else {
  181. InitCompression(compression_record);
  182. }
  183. break; // switch
  184. }
  185. case kPredecessorWALInfoType:
  186. case kRecyclePredecessorWALInfoType: {
  187. prospective_record_offset = physical_record_offset;
  188. scratch->clear();
  189. last_record_offset_ = prospective_record_offset;
  190. PredecessorWALInfo recorded_predecessor_wal_info;
  191. Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment);
  192. if (!s.ok()) {
  193. ReportCorruption(fragment.size(),
  194. "could not decode PredecessorWALInfoType record");
  195. } else {
  196. MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment,
  197. recorded_predecessor_wal_info);
  198. }
  199. break; // switch
  200. }
  201. case kUserDefinedTimestampSizeType:
  202. case kRecyclableUserDefinedTimestampSizeType: {
  203. if (in_fragmented_record && !scratch->empty()) {
  204. ReportCorruption(
  205. scratch->size(),
  206. "user-defined timestamp size record interspersed partial record");
  207. }
  208. prospective_record_offset = physical_record_offset;
  209. scratch->clear();
  210. last_record_offset_ = prospective_record_offset;
  211. UserDefinedTimestampSizeRecord ts_record;
  212. Status s = ts_record.DecodeFrom(&fragment);
  213. if (!s.ok()) {
  214. ReportCorruption(
  215. fragment.size(),
  216. "could not decode user-defined timestamp size record");
  217. } else {
  218. s = UpdateRecordedTimestampSize(
  219. ts_record.GetUserDefinedTimestampSize());
  220. if (!s.ok()) {
  221. ReportCorruption(fragment.size(), s.getState());
  222. }
  223. }
  224. break; // switch
  225. }
  226. case kBadHeader:
  227. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
  228. wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  229. // In clean shutdown we don't expect any error in the log files.
  230. // In point-in-time recovery an incomplete record at the end could
  231. // produce a hole in the recovered data. Report an error here, which
  232. // higher layers can choose to ignore when it's provable there is no
  233. // hole.
  234. ReportCorruption(drop_size, "truncated header");
  235. }
  236. FALLTHROUGH_INTENDED;
  237. case kEof:
  238. if (in_fragmented_record) {
  239. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
  240. wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  241. // In clean shutdown we don't expect any error in the log files.
  242. // In point-in-time recovery an incomplete record at the end could
  243. // produce a hole in the recovered data. Report an error here, which
  244. // higher layers can choose to ignore when it's provable there is no
  245. // hole.
  246. ReportCorruption(
  247. scratch->size(),
  248. "error reading trailing data due to encountering EOF");
  249. }
  250. // This can be caused by the writer dying immediately after
  251. // writing a physical record but before completing the next; don't
  252. // treat it as a corruption, just ignore the entire logical record.
  253. scratch->clear();
  254. }
  255. return false;
  256. case kOldRecord:
  257. if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
  258. // Treat a record from a previous instance of the log as EOF.
  259. if (in_fragmented_record) {
  260. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
  261. wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  262. // In clean shutdown we don't expect any error in the log files.
  263. // In point-in-time recovery an incomplete record at the end could
  264. // produce a hole in the recovered data. Report an error here,
  265. // which higher layers can choose to ignore when it's provable
  266. // there is no hole.
  267. ReportCorruption(
  268. scratch->size(),
  269. "error reading trailing data due to encountering old record");
  270. }
  271. // This can be caused by the writer dying immediately after
  272. // writing a physical record but before completing the next; don't
  273. // treat it as a corruption, just ignore the entire logical record.
  274. scratch->clear();
  275. } else {
  276. if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  277. ReportOldLogRecord(scratch->size());
  278. }
  279. }
  280. return false;
  281. }
  282. FALLTHROUGH_INTENDED;
  283. case kBadRecord:
  284. if (in_fragmented_record) {
  285. ReportCorruption(scratch->size(), "error in middle of record");
  286. in_fragmented_record = false;
  287. scratch->clear();
  288. }
  289. break; // switch
  290. case kBadRecordLen:
  291. if (eof_) {
  292. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
  293. wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
  294. // In clean shutdown we don't expect any error in the log files.
  295. // In point-in-time recovery an incomplete record at the end could
  296. // produce a hole in the recovered data. Report an error here, which
  297. // higher layers can choose to ignore when it's provable there is no
  298. // hole.
  299. ReportCorruption(drop_size, "truncated record body");
  300. }
  301. return false;
  302. }
  303. FALLTHROUGH_INTENDED;
  304. case kBadRecordChecksum:
  305. if (recycled_ && wal_recovery_mode ==
  306. WALRecoveryMode::kTolerateCorruptedTailRecords) {
  307. scratch->clear();
  308. return false;
  309. }
  310. if (record_type == kBadRecordLen) {
  311. ReportCorruption(drop_size, "bad record length");
  312. } else {
  313. ReportCorruption(drop_size, "checksum mismatch");
  314. }
  315. if (in_fragmented_record) {
  316. ReportCorruption(scratch->size(), "error in middle of record");
  317. in_fragmented_record = false;
  318. scratch->clear();
  319. }
  320. break; // switch
  321. default: {
  322. if ((record_type & kRecordTypeSafeIgnoreMask) == 0) {
  323. std::string reason =
  324. "unknown record type " + std::to_string(record_type);
  325. ReportCorruption(
  326. (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
  327. reason.c_str());
  328. }
  329. in_fragmented_record = false;
  330. scratch->clear();
  331. break; // switch
  332. }
  333. }
  334. }
  335. // unreachable
  336. }
  337. void Reader::MaybeVerifyPredecessorWALInfo(
  338. WALRecoveryMode wal_recovery_mode, Slice fragment,
  339. const PredecessorWALInfo& recorded_predecessor_wal_info) {
  340. if (!track_and_verify_wals_ ||
  341. wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords ||
  342. stop_replay_for_corruption_) {
  343. return;
  344. }
  345. assert(recorded_predecessor_wal_info.IsInitialized());
  346. uint64_t recorded_predecessor_log_number =
  347. recorded_predecessor_wal_info.GetLogNumber();
  348. // This is the first WAL recovered thus with no predecessor WAL info has been
  349. // initialized
  350. if (!observed_predecessor_wal_info_.IsInitialized()) {
  351. if (recorded_predecessor_log_number >= min_wal_number_to_keep_) {
  352. std::string reason = "Missing WAL of log number " +
  353. std::to_string(recorded_predecessor_log_number);
  354. ReportCorruption(fragment.size(), reason.c_str(),
  355. recorded_predecessor_log_number);
  356. }
  357. } else {
  358. if (observed_predecessor_wal_info_.GetLogNumber() !=
  359. recorded_predecessor_log_number) {
  360. std::string reason =
  361. "Mismatched predecessor log number of WAL file " +
  362. file_->file_name() + " Recorded " +
  363. std::to_string(recorded_predecessor_log_number) + ". Observed " +
  364. std::to_string(observed_predecessor_wal_info_.GetLogNumber());
  365. ReportCorruption(fragment.size(), reason.c_str(),
  366. recorded_predecessor_log_number);
  367. } else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() !=
  368. recorded_predecessor_wal_info.GetLastSeqnoRecorded()) {
  369. std::string reason =
  370. "Mismatched last sequence number recorded in the WAL of log number " +
  371. std::to_string(recorded_predecessor_log_number) + ". Recorded " +
  372. std::to_string(recorded_predecessor_wal_info.GetLastSeqnoRecorded()) +
  373. ". Observed " +
  374. std::to_string(
  375. observed_predecessor_wal_info_.GetLastSeqnoRecorded()) +
  376. ". (Last sequence number equal to 0 indicates no WAL records)";
  377. ReportCorruption(fragment.size(), reason.c_str(),
  378. recorded_predecessor_log_number);
  379. } else if (observed_predecessor_wal_info_.GetSizeBytes() !=
  380. recorded_predecessor_wal_info.GetSizeBytes()) {
  381. std::string reason =
  382. "Mismatched size of the WAL of log number " +
  383. std::to_string(recorded_predecessor_log_number) + ". Recorded " +
  384. std::to_string(recorded_predecessor_wal_info.GetSizeBytes()) +
  385. " bytes. Observed " +
  386. std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) +
  387. " bytes.";
  388. ReportCorruption(fragment.size(), reason.c_str(),
  389. recorded_predecessor_log_number);
  390. }
  391. }
  392. }
  393. uint64_t Reader::LastRecordOffset() { return last_record_offset_; }
  394. uint64_t Reader::LastRecordEnd() {
  395. return end_of_buffer_offset_ - buffer_.size();
  396. }
  397. void Reader::UnmarkEOF() {
  398. if (read_error_) {
  399. return;
  400. }
  401. eof_ = false;
  402. if (eof_offset_ == 0) {
  403. return;
  404. }
  405. UnmarkEOFInternal();
  406. }
  407. void Reader::UnmarkEOFInternal() {
  408. // If the EOF was in the middle of a block (a partial block was read) we have
  409. // to read the rest of the block as ReadPhysicalRecord can only read full
  410. // blocks and expects the file position indicator to be aligned to the start
  411. // of a block.
  412. //
  413. // consumed_bytes + buffer_size() + remaining == kBlockSize
  414. size_t consumed_bytes = eof_offset_ - buffer_.size();
  415. size_t remaining = kBlockSize - eof_offset_;
  416. // backing_store_ is used to concatenate what is left in buffer_ and
  417. // the remainder of the block. If buffer_ already uses backing_store_,
  418. // we just append the new data.
  419. if (buffer_.data() != backing_store_ + consumed_bytes) {
  420. // Buffer_ does not use backing_store_ for storage.
  421. // Copy what is left in buffer_ to backing_store.
  422. memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
  423. }
  424. Slice read_buffer;
  425. // TODO: rate limit log reader with approriate priority.
  426. // TODO: avoid overcharging rate limiter:
  427. // Note that the Read here might overcharge SequentialFileReader's internal
  428. // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
  429. // content left until EOF to read.
  430. Status status =
  431. file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_,
  432. Env::IO_TOTAL /* rate_limiter_priority */);
  433. size_t added = read_buffer.size();
  434. end_of_buffer_offset_ += added;
  435. if (!status.ok()) {
  436. if (added > 0) {
  437. ReportDrop(added, status);
  438. }
  439. read_error_ = true;
  440. return;
  441. }
  442. if (read_buffer.data() != backing_store_ + eof_offset_) {
  443. // Read did not write to backing_store_
  444. memmove(backing_store_ + eof_offset_, read_buffer.data(),
  445. read_buffer.size());
  446. }
  447. buffer_ = Slice(backing_store_ + consumed_bytes,
  448. eof_offset_ + added - consumed_bytes);
  449. if (added < remaining) {
  450. eof_ = true;
  451. eof_offset_ += added;
  452. } else {
  453. eof_offset_ = 0;
  454. }
  455. }
  456. void Reader::ReportCorruption(size_t bytes, const char* reason,
  457. uint64_t log_number) {
  458. ReportDrop(bytes, Status::Corruption(reason), log_number);
  459. }
  460. void Reader::ReportDrop(size_t bytes, const Status& reason,
  461. uint64_t log_number) {
  462. if (reporter_ != nullptr) {
  463. reporter_->Corruption(bytes, reason, log_number);
  464. }
  465. }
  466. void Reader::ReportOldLogRecord(size_t bytes) {
  467. if (reporter_ != nullptr) {
  468. reporter_->OldLogRecord(bytes);
  469. }
  470. }
  471. bool Reader::ReadMore(size_t* drop_size, uint8_t* error) {
  472. if (!eof_ && !read_error_) {
  473. // Last read was a full read, so this is a trailer to skip
  474. buffer_.clear();
  475. // TODO: rate limit log reader with approriate priority.
  476. // TODO: avoid overcharging rate limiter:
  477. // Note that the Read here might overcharge SequentialFileReader's internal
  478. // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
  479. // content left until EOF to read.
  480. Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
  481. Env::IO_TOTAL /* rate_limiter_priority */);
  482. TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
  483. end_of_buffer_offset_ += buffer_.size();
  484. if (!status.ok()) {
  485. buffer_.clear();
  486. ReportDrop(kBlockSize, status);
  487. read_error_ = true;
  488. *error = kEof;
  489. return false;
  490. } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
  491. eof_ = true;
  492. eof_offset_ = buffer_.size();
  493. }
  494. return true;
  495. } else {
  496. // Note that if buffer_ is non-empty, we have a truncated header at the
  497. // end of the file, which can be caused by the writer crashing in the
  498. // middle of writing the header. Unless explicitly requested we don't
  499. // considering this an error, just report EOF.
  500. if (buffer_.size()) {
  501. *drop_size = buffer_.size();
  502. buffer_.clear();
  503. *error = kBadHeader;
  504. return false;
  505. }
  506. buffer_.clear();
  507. *error = kEof;
  508. return false;
  509. }
  510. }
  511. uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
  512. uint64_t* fragment_checksum) {
  513. while (true) {
  514. // We need at least the minimum header size
  515. if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
  516. // the default value of r is meaningless because ReadMore will overwrite
  517. // it if it returns false; in case it returns true, the return value will
  518. // not be used anyway
  519. uint8_t r = kEof;
  520. if (!ReadMore(drop_size, &r)) {
  521. return r;
  522. }
  523. continue;
  524. }
  525. // Parse the header
  526. const char* header = buffer_.data();
  527. const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
  528. const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
  529. const uint8_t type = static_cast<uint8_t>(header[6]);
  530. const uint32_t length = a | (b << 8);
  531. int header_size = kHeaderSize;
  532. const bool is_recyclable_type =
  533. ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
  534. type == kRecyclableUserDefinedTimestampSizeType ||
  535. type == kRecyclePredecessorWALInfoType);
  536. if (is_recyclable_type) {
  537. header_size = kRecyclableHeaderSize;
  538. if (first_record_read_ && !recycled_) {
  539. // A recycled log should have started with a recycled record
  540. return kBadRecord;
  541. }
  542. recycled_ = true;
  543. // We need enough for the larger header
  544. if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
  545. uint8_t r = kEof;
  546. if (!ReadMore(drop_size, &r)) {
  547. return r;
  548. }
  549. continue;
  550. }
  551. }
  552. if (header_size + length > buffer_.size()) {
  553. assert(buffer_.size() >= static_cast<size_t>(header_size));
  554. *drop_size = buffer_.size();
  555. buffer_.clear();
  556. // If the end of the read has been reached without seeing
  557. // `header_size + length` bytes of payload, report a corruption. The
  558. // higher layers can decide how to handle it based on the recovery mode,
  559. // whether this occurred at EOF, whether this is the final WAL, etc.
  560. return kBadRecordLen;
  561. }
  562. if (is_recyclable_type) {
  563. const uint32_t log_num = DecodeFixed32(header + 7);
  564. if (log_num != log_number_) {
  565. buffer_.remove_prefix(header_size + length);
  566. return kOldRecord;
  567. }
  568. }
  569. if (type == kZeroType && length == 0) {
  570. // Skip zero length record without reporting any drops since
  571. // such records are produced by the mmap based writing code in
  572. // env_posix.cc that preallocates file regions.
  573. // NOTE: this should never happen in DB written by new RocksDB versions,
  574. // since we turn off mmap writes to manifest and log files
  575. buffer_.clear();
  576. return kBadRecord;
  577. }
  578. // Check crc
  579. if (checksum_) {
  580. uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
  581. uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
  582. if (actual_crc != expected_crc) {
  583. // Drop the rest of the buffer since "length" itself may have
  584. // been corrupted and if we trust it, we could find some
  585. // fragment of a real log record that just happens to look
  586. // like a valid log record.
  587. *drop_size = buffer_.size();
  588. buffer_.clear();
  589. return kBadRecordChecksum;
  590. }
  591. }
  592. buffer_.remove_prefix(header_size + length);
  593. if (!uncompress_ || type == kSetCompressionType ||
  594. type == kPredecessorWALInfoType ||
  595. type == kRecyclePredecessorWALInfoType ||
  596. type == kUserDefinedTimestampSizeType ||
  597. type == kRecyclableUserDefinedTimestampSizeType) {
  598. *result = Slice(header + header_size, length);
  599. return type;
  600. } else {
  601. // Uncompress compressed records
  602. uncompressed_record_.clear();
  603. if (fragment_checksum != nullptr) {
  604. if (uncompress_hash_state_ == nullptr) {
  605. uncompress_hash_state_ = XXH3_createState();
  606. }
  607. XXH3_64bits_reset(uncompress_hash_state_);
  608. }
  609. size_t uncompressed_size = 0;
  610. int remaining = 0;
  611. const char* input = header + header_size;
  612. do {
  613. remaining = uncompress_->Uncompress(
  614. input, length, uncompressed_buffer_.get(), &uncompressed_size);
  615. input = nullptr;
  616. if (remaining < 0) {
  617. buffer_.clear();
  618. return kBadRecord;
  619. }
  620. if (uncompressed_size > 0) {
  621. if (fragment_checksum != nullptr) {
  622. XXH3_64bits_update(uncompress_hash_state_,
  623. uncompressed_buffer_.get(), uncompressed_size);
  624. }
  625. uncompressed_record_.append(uncompressed_buffer_.get(),
  626. uncompressed_size);
  627. }
  628. } while (remaining > 0 || uncompressed_size == kBlockSize);
  629. if (fragment_checksum != nullptr) {
  630. // We can remove this check by updating hash_state_ directly,
  631. // but that requires resetting hash_state_ for full and first types
  632. // for edge cases like consecutive fist type records.
  633. // Leaving the check as is since it is cleaner and can revert to the
  634. // above approach if it causes performance impact.
  635. *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_);
  636. uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(),
  637. uncompressed_record_.size());
  638. if (*fragment_checksum != actual_checksum) {
  639. // uncompressed_record_ contains bad content that does not match
  640. // actual decompressed content
  641. return kBadRecord;
  642. }
  643. }
  644. *result = Slice(uncompressed_record_);
  645. return type;
  646. }
  647. }
  648. }
  649. // Initialize uncompress related fields
  650. void Reader::InitCompression(const CompressionTypeRecord& compression_record) {
  651. compression_type_ = compression_record.GetCompressionType();
  652. compression_type_record_read_ = true;
  653. constexpr uint32_t compression_format_version = 2;
  654. uncompress_ = StreamingUncompress::Create(
  655. compression_type_, compression_format_version, kBlockSize);
  656. assert(uncompress_ != nullptr);
  657. uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]);
  658. assert(uncompressed_buffer_);
  659. }
  660. Status Reader::UpdateRecordedTimestampSize(
  661. const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz) {
  662. for (const auto& [cf, ts_sz] : cf_to_ts_sz) {
  663. // Zero user-defined timestamp size are not recorded.
  664. if (ts_sz == 0) {
  665. return Status::Corruption(
  666. "User-defined timestamp size record contains zero timestamp size.");
  667. }
  668. // The user-defined timestamp size record for a column family should not be
  669. // updated in the same log file.
  670. if (recorded_cf_to_ts_sz_.count(cf) != 0) {
  671. return Status::Corruption(
  672. "User-defined timestamp size record contains update to "
  673. "recorded column family.");
  674. }
  675. recorded_cf_to_ts_sz_.insert(std::make_pair(cf, ts_sz));
  676. }
  677. return Status::OK();
  678. }
  679. bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
  680. WALRecoveryMode wal_recovery_mode
  681. ,
  682. uint64_t* /* checksum */) {
  683. assert(record != nullptr);
  684. assert(scratch != nullptr);
  685. record->clear();
  686. scratch->clear();
  687. if (uncompress_) {
  688. uncompress_->Reset();
  689. }
  690. uint64_t prospective_record_offset = 0;
  691. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
  692. size_t drop_size = 0;
  693. uint8_t fragment_type_or_err = 0; // Initialize to make compiler happy
  694. Slice fragment;
  695. while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
  696. switch (fragment_type_or_err) {
  697. case kFullType:
  698. case kRecyclableFullType:
  699. if (in_fragmented_record_ && !fragments_.empty()) {
  700. ReportCorruption(fragments_.size(), "partial record without end(1)");
  701. }
  702. fragments_.clear();
  703. *record = fragment;
  704. prospective_record_offset = physical_record_offset;
  705. last_record_offset_ = prospective_record_offset;
  706. first_record_read_ = true;
  707. in_fragmented_record_ = false;
  708. return true;
  709. case kFirstType:
  710. case kRecyclableFirstType:
  711. if (in_fragmented_record_ || !fragments_.empty()) {
  712. ReportCorruption(fragments_.size(), "partial record without end(2)");
  713. }
  714. prospective_record_offset = physical_record_offset;
  715. fragments_.assign(fragment.data(), fragment.size());
  716. in_fragmented_record_ = true;
  717. break;
  718. case kMiddleType:
  719. case kRecyclableMiddleType:
  720. if (!in_fragmented_record_) {
  721. ReportCorruption(fragment.size(),
  722. "missing start of fragmented record(1)");
  723. } else {
  724. fragments_.append(fragment.data(), fragment.size());
  725. }
  726. break;
  727. case kLastType:
  728. case kRecyclableLastType:
  729. if (!in_fragmented_record_) {
  730. ReportCorruption(fragment.size(),
  731. "missing start of fragmented record(2)");
  732. } else {
  733. fragments_.append(fragment.data(), fragment.size());
  734. scratch->assign(fragments_.data(), fragments_.size());
  735. fragments_.clear();
  736. *record = Slice(*scratch);
  737. last_record_offset_ = prospective_record_offset;
  738. first_record_read_ = true;
  739. in_fragmented_record_ = false;
  740. return true;
  741. }
  742. break;
  743. case kSetCompressionType: {
  744. if (compression_type_record_read_) {
  745. ReportCorruption(fragment.size(),
  746. "read multiple SetCompressionType records");
  747. }
  748. if (first_record_read_) {
  749. ReportCorruption(fragment.size(),
  750. "SetCompressionType not the first record");
  751. }
  752. fragments_.clear();
  753. prospective_record_offset = physical_record_offset;
  754. last_record_offset_ = prospective_record_offset;
  755. in_fragmented_record_ = false;
  756. CompressionTypeRecord compression_record(kNoCompression);
  757. Status s = compression_record.DecodeFrom(&fragment);
  758. if (!s.ok()) {
  759. ReportCorruption(fragment.size(),
  760. "could not decode SetCompressionType record");
  761. } else {
  762. InitCompression(compression_record);
  763. }
  764. break;
  765. }
  766. case kPredecessorWALInfoType:
  767. case kRecyclePredecessorWALInfoType: {
  768. fragments_.clear();
  769. prospective_record_offset = physical_record_offset;
  770. last_record_offset_ = prospective_record_offset;
  771. in_fragmented_record_ = false;
  772. PredecessorWALInfo recorded_predecessor_wal_info;
  773. Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment);
  774. if (!s.ok()) {
  775. ReportCorruption(fragment.size(),
  776. "could not decode PredecessorWALInfoType record");
  777. } else {
  778. MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment,
  779. recorded_predecessor_wal_info);
  780. }
  781. break;
  782. }
  783. case kUserDefinedTimestampSizeType:
  784. case kRecyclableUserDefinedTimestampSizeType: {
  785. if (in_fragmented_record_ && !scratch->empty()) {
  786. ReportCorruption(
  787. scratch->size(),
  788. "user-defined timestamp size record interspersed partial record");
  789. }
  790. fragments_.clear();
  791. prospective_record_offset = physical_record_offset;
  792. last_record_offset_ = prospective_record_offset;
  793. in_fragmented_record_ = false;
  794. UserDefinedTimestampSizeRecord ts_record;
  795. Status s = ts_record.DecodeFrom(&fragment);
  796. if (!s.ok()) {
  797. ReportCorruption(
  798. fragment.size(),
  799. "could not decode user-defined timestamp size record");
  800. } else {
  801. s = UpdateRecordedTimestampSize(
  802. ts_record.GetUserDefinedTimestampSize());
  803. if (!s.ok()) {
  804. ReportCorruption(fragment.size(), s.getState());
  805. }
  806. }
  807. break;
  808. }
  809. case kBadHeader:
  810. case kBadRecord:
  811. case kEof:
  812. case kOldRecord:
  813. if (in_fragmented_record_) {
  814. ReportCorruption(fragments_.size(), "error in middle of record");
  815. in_fragmented_record_ = false;
  816. fragments_.clear();
  817. }
  818. break;
  819. case kBadRecordChecksum:
  820. if (recycled_) {
  821. fragments_.clear();
  822. return false;
  823. }
  824. ReportCorruption(drop_size, "checksum mismatch");
  825. if (in_fragmented_record_) {
  826. ReportCorruption(fragments_.size(), "error in middle of record");
  827. in_fragmented_record_ = false;
  828. fragments_.clear();
  829. }
  830. break;
  831. default: {
  832. if ((fragment_type_or_err & kRecordTypeSafeIgnoreMask) == 0) {
  833. std::string reason =
  834. "unknown record type " + std::to_string(fragment_type_or_err);
  835. ReportCorruption(
  836. fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
  837. reason.c_str());
  838. }
  839. in_fragmented_record_ = false;
  840. fragments_.clear();
  841. break;
  842. }
  843. }
  844. }
  845. return false;
  846. }
  847. void FragmentBufferedReader::UnmarkEOF() {
  848. if (read_error_) {
  849. return;
  850. }
  851. eof_ = false;
  852. UnmarkEOFInternal();
  853. }
  854. bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) {
  855. if (!eof_ && !read_error_) {
  856. // Last read was a full read, so this is a trailer to skip
  857. buffer_.clear();
  858. // TODO: rate limit log reader with approriate priority.
  859. // TODO: avoid overcharging rate limiter:
  860. // Note that the Read here might overcharge SequentialFileReader's internal
  861. // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
  862. // content left until EOF to read.
  863. Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
  864. Env::IO_TOTAL /* rate_limiter_priority */);
  865. end_of_buffer_offset_ += buffer_.size();
  866. if (!status.ok()) {
  867. buffer_.clear();
  868. ReportDrop(kBlockSize, status);
  869. read_error_ = true;
  870. *error = kEof;
  871. return false;
  872. } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
  873. eof_ = true;
  874. eof_offset_ = buffer_.size();
  875. TEST_SYNC_POINT_CALLBACK(
  876. "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
  877. }
  878. return true;
  879. } else if (!read_error_) {
  880. UnmarkEOF();
  881. }
  882. if (!read_error_) {
  883. return true;
  884. }
  885. *error = kEof;
  886. *drop_size = buffer_.size();
  887. if (buffer_.size() > 0) {
  888. *error = kBadHeader;
  889. }
  890. buffer_.clear();
  891. return false;
  892. }
  893. // return true if the caller should process the fragment_type_or_err.
  894. bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size,
  895. uint8_t* fragment_type_or_err) {
  896. assert(fragment != nullptr);
  897. assert(drop_size != nullptr);
  898. assert(fragment_type_or_err != nullptr);
  899. while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
  900. size_t old_size = buffer_.size();
  901. uint8_t error = kEof;
  902. if (!TryReadMore(drop_size, &error)) {
  903. *fragment_type_or_err = error;
  904. return false;
  905. } else if (old_size == buffer_.size()) {
  906. return false;
  907. }
  908. }
  909. const char* header = buffer_.data();
  910. const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
  911. const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
  912. const uint8_t type = static_cast<uint8_t>(header[6]);
  913. const uint32_t length = a | (b << 8);
  914. int header_size = kHeaderSize;
  915. if ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
  916. type == kRecyclableUserDefinedTimestampSizeType ||
  917. type == kRecyclePredecessorWALInfoType) {
  918. if (first_record_read_ && !recycled_) {
  919. // A recycled log should have started with a recycled record
  920. *fragment_type_or_err = kBadRecord;
  921. return true;
  922. }
  923. recycled_ = true;
  924. header_size = kRecyclableHeaderSize;
  925. while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
  926. size_t old_size = buffer_.size();
  927. uint8_t error = kEof;
  928. if (!TryReadMore(drop_size, &error)) {
  929. *fragment_type_or_err = error;
  930. return false;
  931. } else if (old_size == buffer_.size()) {
  932. return false;
  933. }
  934. }
  935. const uint32_t log_num = DecodeFixed32(header + 7);
  936. if (log_num != log_number_) {
  937. *fragment_type_or_err = kOldRecord;
  938. return true;
  939. }
  940. }
  941. while (header_size + length > buffer_.size()) {
  942. size_t old_size = buffer_.size();
  943. uint8_t error = kEof;
  944. if (!TryReadMore(drop_size, &error)) {
  945. *fragment_type_or_err = error;
  946. return false;
  947. } else if (old_size == buffer_.size()) {
  948. return false;
  949. }
  950. }
  951. if (type == kZeroType && length == 0) {
  952. buffer_.clear();
  953. *fragment_type_or_err = kBadRecord;
  954. return true;
  955. }
  956. if (checksum_) {
  957. uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
  958. uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
  959. if (actual_crc != expected_crc) {
  960. *drop_size = buffer_.size();
  961. buffer_.clear();
  962. *fragment_type_or_err = kBadRecordChecksum;
  963. return true;
  964. }
  965. }
  966. buffer_.remove_prefix(header_size + length);
  967. if (!uncompress_ || type == kSetCompressionType ||
  968. type == kPredecessorWALInfoType ||
  969. type == kRecyclePredecessorWALInfoType ||
  970. type == kUserDefinedTimestampSizeType ||
  971. type == kRecyclableUserDefinedTimestampSizeType) {
  972. *fragment = Slice(header + header_size, length);
  973. *fragment_type_or_err = type;
  974. return true;
  975. } else {
  976. // Uncompress compressed records
  977. uncompressed_record_.clear();
  978. size_t uncompressed_size = 0;
  979. int remaining = 0;
  980. const char* input = header + header_size;
  981. do {
  982. remaining = uncompress_->Uncompress(
  983. input, length, uncompressed_buffer_.get(), &uncompressed_size);
  984. input = nullptr;
  985. if (remaining < 0) {
  986. buffer_.clear();
  987. *fragment_type_or_err = kBadRecord;
  988. return true;
  989. }
  990. if (uncompressed_size > 0) {
  991. uncompressed_record_.append(uncompressed_buffer_.get(),
  992. uncompressed_size);
  993. }
  994. } while (remaining > 0 || uncompressed_size == kBlockSize);
  995. *fragment = Slice(std::move(uncompressed_record_));
  996. *fragment_type_or_err = type;
  997. return true;
  998. }
  999. }
  1000. } // namespace ROCKSDB_NAMESPACE::log