log_reader.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <memory>
  11. #include <stdint.h>
  12. #include "db/log_format.h"
  13. #include "file/sequence_file_reader.h"
  14. #include "rocksdb/options.h"
  15. #include "rocksdb/slice.h"
  16. #include "rocksdb/status.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class Logger;
  19. namespace log {
  20. /**
  21. * Reader is a general purpose log stream reader implementation. The actual job
  22. * of reading from the device is implemented by the SequentialFile interface.
  23. *
  24. * Please see Writer for details on the file and record layout.
  25. */
  26. class Reader {
  27. public:
  28. // Interface for reporting errors.
  29. class Reporter {
  30. public:
  31. virtual ~Reporter();
  32. // Some corruption was detected. "size" is the approximate number
  33. // of bytes dropped due to the corruption.
  34. virtual void Corruption(size_t bytes, const Status& status) = 0;
  35. };
  36. // Create a reader that will return log records from "*file".
  37. // "*file" must remain live while this Reader is in use.
  38. //
  39. // If "reporter" is non-nullptr, it is notified whenever some data is
  40. // dropped due to a detected corruption. "*reporter" must remain
  41. // live while this Reader is in use.
  42. //
  43. // If "checksum" is true, verify checksums if available.
  44. Reader(std::shared_ptr<Logger> info_log,
  45. // @lint-ignore TXT2 T25377293 Grandfathered in
  46. std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
  47. bool checksum, uint64_t log_num);
  48. // No copying allowed
  49. Reader(const Reader&) = delete;
  50. void operator=(const Reader&) = delete;
  51. virtual ~Reader();
  52. // Read the next record into *record. Returns true if read
  53. // successfully, false if we hit end of the input. May use
  54. // "*scratch" as temporary storage. The contents filled in *record
  55. // will only be valid until the next mutating operation on this
  56. // reader or the next mutation to *scratch.
  57. virtual bool ReadRecord(Slice* record, std::string* scratch,
  58. WALRecoveryMode wal_recovery_mode =
  59. WALRecoveryMode::kTolerateCorruptedTailRecords);
  60. // Returns the physical offset of the last record returned by ReadRecord.
  61. //
  62. // Undefined before the first call to ReadRecord.
  63. uint64_t LastRecordOffset();
  64. // returns true if the reader has encountered an eof condition.
  65. bool IsEOF() {
  66. return eof_;
  67. }
  68. // returns true if the reader has encountered read error.
  69. bool hasReadError() const { return read_error_; }
  70. // when we know more data has been written to the file. we can use this
  71. // function to force the reader to look again in the file.
  72. // Also aligns the file position indicator to the start of the next block
  73. // by reading the rest of the data from the EOF position to the end of the
  74. // block that was partially read.
  75. virtual void UnmarkEOF();
  76. SequentialFileReader* file() { return file_.get(); }
  77. Reporter* GetReporter() const { return reporter_; }
  78. uint64_t GetLogNumber() const { return log_number_; }
  79. size_t GetReadOffset() const {
  80. return static_cast<size_t>(end_of_buffer_offset_);
  81. }
  82. protected:
  83. std::shared_ptr<Logger> info_log_;
  84. const std::unique_ptr<SequentialFileReader> file_;
  85. Reporter* const reporter_;
  86. bool const checksum_;
  87. char* const backing_store_;
  88. // Internal state variables used for reading records
  89. Slice buffer_;
  90. bool eof_; // Last Read() indicated EOF by returning < kBlockSize
  91. bool read_error_; // Error occurred while reading from file
  92. // Offset of the file position indicator within the last block when an
  93. // EOF was detected.
  94. size_t eof_offset_;
  95. // Offset of the last record returned by ReadRecord.
  96. uint64_t last_record_offset_;
  97. // Offset of the first location past the end of buffer_.
  98. uint64_t end_of_buffer_offset_;
  99. // which log number this is
  100. uint64_t const log_number_;
  101. // Whether this is a recycled log file
  102. bool recycled_;
  103. // Extend record types with the following special values
  104. enum {
  105. kEof = kMaxRecordType + 1,
  106. // Returned whenever we find an invalid physical record.
  107. // Currently there are three situations in which this happens:
  108. // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
  109. // * The record is a 0-length record (No drop is reported)
  110. kBadRecord = kMaxRecordType + 2,
  111. // Returned when we fail to read a valid header.
  112. kBadHeader = kMaxRecordType + 3,
  113. // Returned when we read an old record from a previous user of the log.
  114. kOldRecord = kMaxRecordType + 4,
  115. // Returned when we get a bad record length
  116. kBadRecordLen = kMaxRecordType + 5,
  117. // Returned when we get a bad record checksum
  118. kBadRecordChecksum = kMaxRecordType + 6,
  119. };
  120. // Return type, or one of the preceding special values
  121. unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
  122. // Read some more
  123. bool ReadMore(size_t* drop_size, int *error);
  124. void UnmarkEOFInternal();
  125. // Reports dropped bytes to the reporter.
  126. // buffer_ must be updated to remove the dropped bytes prior to invocation.
  127. void ReportCorruption(size_t bytes, const char* reason);
  128. void ReportDrop(size_t bytes, const Status& reason);
  129. };
  130. class FragmentBufferedReader : public Reader {
  131. public:
  132. FragmentBufferedReader(std::shared_ptr<Logger> info_log,
  133. // @lint-ignore TXT2 T25377293 Grandfathered in
  134. std::unique_ptr<SequentialFileReader>&& _file,
  135. Reporter* reporter, bool checksum, uint64_t log_num)
  136. : Reader(info_log, std::move(_file), reporter, checksum, log_num),
  137. fragments_(),
  138. in_fragmented_record_(false) {}
  139. ~FragmentBufferedReader() override {}
  140. bool ReadRecord(Slice* record, std::string* scratch,
  141. WALRecoveryMode wal_recovery_mode =
  142. WALRecoveryMode::kTolerateCorruptedTailRecords) override;
  143. void UnmarkEOF() override;
  144. private:
  145. std::string fragments_;
  146. bool in_fragmented_record_;
  147. bool TryReadFragment(Slice* result, size_t* drop_size,
  148. unsigned int* fragment_type_or_err);
  149. bool TryReadMore(size_t* drop_size, int* error);
  150. // No copy allowed
  151. FragmentBufferedReader(const FragmentBufferedReader&);
  152. void operator=(const FragmentBufferedReader&);
  153. };
  154. } // namespace log
  155. } // namespace ROCKSDB_NAMESPACE