log_reader.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/log_reader.h"
  10. #include <stdio.h>
  11. #include "file/sequence_file_reader.h"
  12. #include "rocksdb/env.h"
  13. #include "test_util/sync_point.h"
  14. #include "util/coding.h"
  15. #include "util/crc32c.h"
  16. #include "util/util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. namespace log {
  19. Reader::Reporter::~Reporter() {
  20. }
  21. Reader::Reader(std::shared_ptr<Logger> info_log,
  22. std::unique_ptr<SequentialFileReader>&& _file,
  23. Reporter* reporter, bool checksum, uint64_t log_num)
  24. : info_log_(info_log),
  25. file_(std::move(_file)),
  26. reporter_(reporter),
  27. checksum_(checksum),
  28. backing_store_(new char[kBlockSize]),
  29. buffer_(),
  30. eof_(false),
  31. read_error_(false),
  32. eof_offset_(0),
  33. last_record_offset_(0),
  34. end_of_buffer_offset_(0),
  35. log_number_(log_num),
  36. recycled_(false) {}
  37. Reader::~Reader() {
  38. delete[] backing_store_;
  39. }
  40. // For kAbsoluteConsistency, on clean shutdown we don't expect any error
  41. // in the log files. For other modes, we can ignore only incomplete records
  42. // in the last log file, which are presumably due to a write in progress
  43. // during restart (or from log recycling).
  44. //
  45. // TODO krad: Evaluate if we need to move to a more strict mode where we
  46. // restrict the inconsistency to only the last log
  47. bool Reader::ReadRecord(Slice* record, std::string* scratch,
  48. WALRecoveryMode wal_recovery_mode) {
  49. scratch->clear();
  50. record->clear();
  51. bool in_fragmented_record = false;
  52. // Record offset of the logical record that we're reading
  53. // 0 is a dummy value to make compilers happy
  54. uint64_t prospective_record_offset = 0;
  55. Slice fragment;
  56. while (true) {
  57. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
  58. size_t drop_size = 0;
  59. const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
  60. switch (record_type) {
  61. case kFullType:
  62. case kRecyclableFullType:
  63. if (in_fragmented_record && !scratch->empty()) {
  64. // Handle bug in earlier versions of log::Writer where
  65. // it could emit an empty kFirstType record at the tail end
  66. // of a block followed by a kFullType or kFirstType record
  67. // at the beginning of the next block.
  68. ReportCorruption(scratch->size(), "partial record without end(1)");
  69. }
  70. prospective_record_offset = physical_record_offset;
  71. scratch->clear();
  72. *record = fragment;
  73. last_record_offset_ = prospective_record_offset;
  74. return true;
  75. case kFirstType:
  76. case kRecyclableFirstType:
  77. if (in_fragmented_record && !scratch->empty()) {
  78. // Handle bug in earlier versions of log::Writer where
  79. // it could emit an empty kFirstType record at the tail end
  80. // of a block followed by a kFullType or kFirstType record
  81. // at the beginning of the next block.
  82. ReportCorruption(scratch->size(), "partial record without end(2)");
  83. }
  84. prospective_record_offset = physical_record_offset;
  85. scratch->assign(fragment.data(), fragment.size());
  86. in_fragmented_record = true;
  87. break;
  88. case kMiddleType:
  89. case kRecyclableMiddleType:
  90. if (!in_fragmented_record) {
  91. ReportCorruption(fragment.size(),
  92. "missing start of fragmented record(1)");
  93. } else {
  94. scratch->append(fragment.data(), fragment.size());
  95. }
  96. break;
  97. case kLastType:
  98. case kRecyclableLastType:
  99. if (!in_fragmented_record) {
  100. ReportCorruption(fragment.size(),
  101. "missing start of fragmented record(2)");
  102. } else {
  103. scratch->append(fragment.data(), fragment.size());
  104. *record = Slice(*scratch);
  105. last_record_offset_ = prospective_record_offset;
  106. return true;
  107. }
  108. break;
  109. case kBadHeader:
  110. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
  111. // in clean shutdown we don't expect any error in the log files
  112. ReportCorruption(drop_size, "truncated header");
  113. }
  114. FALLTHROUGH_INTENDED;
  115. case kEof:
  116. if (in_fragmented_record) {
  117. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
  118. // in clean shutdown we don't expect any error in the log files
  119. ReportCorruption(scratch->size(), "error reading trailing data");
  120. }
  121. // This can be caused by the writer dying immediately after
  122. // writing a physical record but before completing the next; don't
  123. // treat it as a corruption, just ignore the entire logical record.
  124. scratch->clear();
  125. }
  126. return false;
  127. case kOldRecord:
  128. if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
  129. // Treat a record from a previous instance of the log as EOF.
  130. if (in_fragmented_record) {
  131. if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
  132. // in clean shutdown we don't expect any error in the log files
  133. ReportCorruption(scratch->size(), "error reading trailing data");
  134. }
  135. // This can be caused by the writer dying immediately after
  136. // writing a physical record but before completing the next; don't
  137. // treat it as a corruption, just ignore the entire logical record.
  138. scratch->clear();
  139. }
  140. return false;
  141. }
  142. FALLTHROUGH_INTENDED;
  143. case kBadRecord:
  144. if (in_fragmented_record) {
  145. ReportCorruption(scratch->size(), "error in middle of record");
  146. in_fragmented_record = false;
  147. scratch->clear();
  148. }
  149. break;
  150. case kBadRecordLen:
  151. case kBadRecordChecksum:
  152. if (recycled_ &&
  153. wal_recovery_mode ==
  154. WALRecoveryMode::kTolerateCorruptedTailRecords) {
  155. scratch->clear();
  156. return false;
  157. }
  158. if (record_type == kBadRecordLen) {
  159. ReportCorruption(drop_size, "bad record length");
  160. } else {
  161. ReportCorruption(drop_size, "checksum mismatch");
  162. }
  163. if (in_fragmented_record) {
  164. ReportCorruption(scratch->size(), "error in middle of record");
  165. in_fragmented_record = false;
  166. scratch->clear();
  167. }
  168. break;
  169. default: {
  170. char buf[40];
  171. snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
  172. ReportCorruption(
  173. (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
  174. buf);
  175. in_fragmented_record = false;
  176. scratch->clear();
  177. break;
  178. }
  179. }
  180. }
  181. return false;
  182. }
  183. uint64_t Reader::LastRecordOffset() {
  184. return last_record_offset_;
  185. }
  186. void Reader::UnmarkEOF() {
  187. if (read_error_) {
  188. return;
  189. }
  190. eof_ = false;
  191. if (eof_offset_ == 0) {
  192. return;
  193. }
  194. UnmarkEOFInternal();
  195. }
  196. void Reader::UnmarkEOFInternal() {
  197. // If the EOF was in the middle of a block (a partial block was read) we have
  198. // to read the rest of the block as ReadPhysicalRecord can only read full
  199. // blocks and expects the file position indicator to be aligned to the start
  200. // of a block.
  201. //
  202. // consumed_bytes + buffer_size() + remaining == kBlockSize
  203. size_t consumed_bytes = eof_offset_ - buffer_.size();
  204. size_t remaining = kBlockSize - eof_offset_;
  205. // backing_store_ is used to concatenate what is left in buffer_ and
  206. // the remainder of the block. If buffer_ already uses backing_store_,
  207. // we just append the new data.
  208. if (buffer_.data() != backing_store_ + consumed_bytes) {
  209. // Buffer_ does not use backing_store_ for storage.
  210. // Copy what is left in buffer_ to backing_store.
  211. memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
  212. }
  213. Slice read_buffer;
  214. Status status = file_->Read(remaining, &read_buffer,
  215. backing_store_ + eof_offset_);
  216. size_t added = read_buffer.size();
  217. end_of_buffer_offset_ += added;
  218. if (!status.ok()) {
  219. if (added > 0) {
  220. ReportDrop(added, status);
  221. }
  222. read_error_ = true;
  223. return;
  224. }
  225. if (read_buffer.data() != backing_store_ + eof_offset_) {
  226. // Read did not write to backing_store_
  227. memmove(backing_store_ + eof_offset_, read_buffer.data(),
  228. read_buffer.size());
  229. }
  230. buffer_ = Slice(backing_store_ + consumed_bytes,
  231. eof_offset_ + added - consumed_bytes);
  232. if (added < remaining) {
  233. eof_ = true;
  234. eof_offset_ += added;
  235. } else {
  236. eof_offset_ = 0;
  237. }
  238. }
  239. void Reader::ReportCorruption(size_t bytes, const char* reason) {
  240. ReportDrop(bytes, Status::Corruption(reason));
  241. }
  242. void Reader::ReportDrop(size_t bytes, const Status& reason) {
  243. if (reporter_ != nullptr) {
  244. reporter_->Corruption(bytes, reason);
  245. }
  246. }
  247. bool Reader::ReadMore(size_t* drop_size, int *error) {
  248. if (!eof_ && !read_error_) {
  249. // Last read was a full read, so this is a trailer to skip
  250. buffer_.clear();
  251. Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
  252. end_of_buffer_offset_ += buffer_.size();
  253. if (!status.ok()) {
  254. buffer_.clear();
  255. ReportDrop(kBlockSize, status);
  256. read_error_ = true;
  257. *error = kEof;
  258. return false;
  259. } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
  260. eof_ = true;
  261. eof_offset_ = buffer_.size();
  262. }
  263. return true;
  264. } else {
  265. // Note that if buffer_ is non-empty, we have a truncated header at the
  266. // end of the file, which can be caused by the writer crashing in the
  267. // middle of writing the header. Unless explicitly requested we don't
  268. // considering this an error, just report EOF.
  269. if (buffer_.size()) {
  270. *drop_size = buffer_.size();
  271. buffer_.clear();
  272. *error = kBadHeader;
  273. return false;
  274. }
  275. buffer_.clear();
  276. *error = kEof;
  277. return false;
  278. }
  279. }
  280. unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
  281. while (true) {
  282. // We need at least the minimum header size
  283. if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
  284. // the default value of r is meaningless because ReadMore will overwrite
  285. // it if it returns false; in case it returns true, the return value will
  286. // not be used anyway
  287. int r = kEof;
  288. if (!ReadMore(drop_size, &r)) {
  289. return r;
  290. }
  291. continue;
  292. }
  293. // Parse the header
  294. const char* header = buffer_.data();
  295. const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
  296. const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
  297. const unsigned int type = header[6];
  298. const uint32_t length = a | (b << 8);
  299. int header_size = kHeaderSize;
  300. if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
  301. if (end_of_buffer_offset_ - buffer_.size() == 0) {
  302. recycled_ = true;
  303. }
  304. header_size = kRecyclableHeaderSize;
  305. // We need enough for the larger header
  306. if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
  307. int r = kEof;
  308. if (!ReadMore(drop_size, &r)) {
  309. return r;
  310. }
  311. continue;
  312. }
  313. const uint32_t log_num = DecodeFixed32(header + 7);
  314. if (log_num != log_number_) {
  315. return kOldRecord;
  316. }
  317. }
  318. if (header_size + length > buffer_.size()) {
  319. *drop_size = buffer_.size();
  320. buffer_.clear();
  321. if (!eof_) {
  322. return kBadRecordLen;
  323. }
  324. // If the end of the file has been reached without reading |length|
  325. // bytes of payload, assume the writer died in the middle of writing the
  326. // record. Don't report a corruption unless requested.
  327. if (*drop_size) {
  328. return kBadHeader;
  329. }
  330. return kEof;
  331. }
  332. if (type == kZeroType && length == 0) {
  333. // Skip zero length record without reporting any drops since
  334. // such records are produced by the mmap based writing code in
  335. // env_posix.cc that preallocates file regions.
  336. // NOTE: this should never happen in DB written by new RocksDB versions,
  337. // since we turn off mmap writes to manifest and log files
  338. buffer_.clear();
  339. return kBadRecord;
  340. }
  341. // Check crc
  342. if (checksum_) {
  343. uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
  344. uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
  345. if (actual_crc != expected_crc) {
  346. // Drop the rest of the buffer since "length" itself may have
  347. // been corrupted and if we trust it, we could find some
  348. // fragment of a real log record that just happens to look
  349. // like a valid log record.
  350. *drop_size = buffer_.size();
  351. buffer_.clear();
  352. return kBadRecordChecksum;
  353. }
  354. }
  355. buffer_.remove_prefix(header_size + length);
  356. *result = Slice(header + header_size, length);
  357. return type;
  358. }
  359. }
  360. bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
  361. WALRecoveryMode /*unused*/) {
  362. assert(record != nullptr);
  363. assert(scratch != nullptr);
  364. record->clear();
  365. scratch->clear();
  366. uint64_t prospective_record_offset = 0;
  367. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
  368. size_t drop_size = 0;
  369. unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
  370. Slice fragment;
  371. while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
  372. switch (fragment_type_or_err) {
  373. case kFullType:
  374. case kRecyclableFullType:
  375. if (in_fragmented_record_ && !fragments_.empty()) {
  376. ReportCorruption(fragments_.size(), "partial record without end(1)");
  377. }
  378. fragments_.clear();
  379. *record = fragment;
  380. prospective_record_offset = physical_record_offset;
  381. last_record_offset_ = prospective_record_offset;
  382. in_fragmented_record_ = false;
  383. return true;
  384. case kFirstType:
  385. case kRecyclableFirstType:
  386. if (in_fragmented_record_ || !fragments_.empty()) {
  387. ReportCorruption(fragments_.size(), "partial record without end(2)");
  388. }
  389. prospective_record_offset = physical_record_offset;
  390. fragments_.assign(fragment.data(), fragment.size());
  391. in_fragmented_record_ = true;
  392. break;
  393. case kMiddleType:
  394. case kRecyclableMiddleType:
  395. if (!in_fragmented_record_) {
  396. ReportCorruption(fragment.size(),
  397. "missing start of fragmented record(1)");
  398. } else {
  399. fragments_.append(fragment.data(), fragment.size());
  400. }
  401. break;
  402. case kLastType:
  403. case kRecyclableLastType:
  404. if (!in_fragmented_record_) {
  405. ReportCorruption(fragment.size(),
  406. "missing start of fragmented record(2)");
  407. } else {
  408. fragments_.append(fragment.data(), fragment.size());
  409. scratch->assign(fragments_.data(), fragments_.size());
  410. fragments_.clear();
  411. *record = Slice(*scratch);
  412. last_record_offset_ = prospective_record_offset;
  413. in_fragmented_record_ = false;
  414. return true;
  415. }
  416. break;
  417. case kBadHeader:
  418. case kBadRecord:
  419. case kEof:
  420. case kOldRecord:
  421. if (in_fragmented_record_) {
  422. ReportCorruption(fragments_.size(), "error in middle of record");
  423. in_fragmented_record_ = false;
  424. fragments_.clear();
  425. }
  426. break;
  427. case kBadRecordChecksum:
  428. if (recycled_) {
  429. fragments_.clear();
  430. return false;
  431. }
  432. ReportCorruption(drop_size, "checksum mismatch");
  433. if (in_fragmented_record_) {
  434. ReportCorruption(fragments_.size(), "error in middle of record");
  435. in_fragmented_record_ = false;
  436. fragments_.clear();
  437. }
  438. break;
  439. default: {
  440. char buf[40];
  441. snprintf(buf, sizeof(buf), "unknown record type %u",
  442. fragment_type_or_err);
  443. ReportCorruption(
  444. fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
  445. buf);
  446. in_fragmented_record_ = false;
  447. fragments_.clear();
  448. break;
  449. }
  450. }
  451. }
  452. return false;
  453. }
  454. void FragmentBufferedReader::UnmarkEOF() {
  455. if (read_error_) {
  456. return;
  457. }
  458. eof_ = false;
  459. UnmarkEOFInternal();
  460. }
  461. bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
  462. if (!eof_ && !read_error_) {
  463. // Last read was a full read, so this is a trailer to skip
  464. buffer_.clear();
  465. Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
  466. end_of_buffer_offset_ += buffer_.size();
  467. if (!status.ok()) {
  468. buffer_.clear();
  469. ReportDrop(kBlockSize, status);
  470. read_error_ = true;
  471. *error = kEof;
  472. return false;
  473. } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
  474. eof_ = true;
  475. eof_offset_ = buffer_.size();
  476. TEST_SYNC_POINT_CALLBACK(
  477. "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
  478. }
  479. return true;
  480. } else if (!read_error_) {
  481. UnmarkEOF();
  482. }
  483. if (!read_error_) {
  484. return true;
  485. }
  486. *error = kEof;
  487. *drop_size = buffer_.size();
  488. if (buffer_.size() > 0) {
  489. *error = kBadHeader;
  490. }
  491. buffer_.clear();
  492. return false;
  493. }
  494. // return true if the caller should process the fragment_type_or_err.
  495. bool FragmentBufferedReader::TryReadFragment(
  496. Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
  497. assert(fragment != nullptr);
  498. assert(drop_size != nullptr);
  499. assert(fragment_type_or_err != nullptr);
  500. while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
  501. size_t old_size = buffer_.size();
  502. int error = kEof;
  503. if (!TryReadMore(drop_size, &error)) {
  504. *fragment_type_or_err = error;
  505. return false;
  506. } else if (old_size == buffer_.size()) {
  507. return false;
  508. }
  509. }
  510. const char* header = buffer_.data();
  511. const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
  512. const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
  513. const unsigned int type = header[6];
  514. const uint32_t length = a | (b << 8);
  515. int header_size = kHeaderSize;
  516. if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
  517. if (end_of_buffer_offset_ - buffer_.size() == 0) {
  518. recycled_ = true;
  519. }
  520. header_size = kRecyclableHeaderSize;
  521. while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
  522. size_t old_size = buffer_.size();
  523. int error = kEof;
  524. if (!TryReadMore(drop_size, &error)) {
  525. *fragment_type_or_err = error;
  526. return false;
  527. } else if (old_size == buffer_.size()) {
  528. return false;
  529. }
  530. }
  531. const uint32_t log_num = DecodeFixed32(header + 7);
  532. if (log_num != log_number_) {
  533. *fragment_type_or_err = kOldRecord;
  534. return true;
  535. }
  536. }
  537. while (header_size + length > buffer_.size()) {
  538. size_t old_size = buffer_.size();
  539. int error = kEof;
  540. if (!TryReadMore(drop_size, &error)) {
  541. *fragment_type_or_err = error;
  542. return false;
  543. } else if (old_size == buffer_.size()) {
  544. return false;
  545. }
  546. }
  547. if (type == kZeroType && length == 0) {
  548. buffer_.clear();
  549. *fragment_type_or_err = kBadRecord;
  550. return true;
  551. }
  552. if (checksum_) {
  553. uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
  554. uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
  555. if (actual_crc != expected_crc) {
  556. *drop_size = buffer_.size();
  557. buffer_.clear();
  558. *fragment_type_or_err = kBadRecordChecksum;
  559. return true;
  560. }
  561. }
  562. buffer_.remove_prefix(header_size + length);
  563. *fragment = Slice(header + header_size, length);
  564. *fragment_type_or_err = type;
  565. return true;
  566. }
  567. } // namespace log
  568. } // namespace ROCKSDB_NAMESPACE