log_reader.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <stdint.h>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <unordered_map>
  14. #include <vector>
  15. #include "db/log_format.h"
  16. #include "file/sequence_file_reader.h"
  17. #include "rocksdb/options.h"
  18. #include "rocksdb/slice.h"
  19. #include "rocksdb/status.h"
  20. #include "util/compression.h"
  21. #include "util/hash_containers.h"
  22. #include "util/udt_util.h"
  23. #include "util/xxhash.h"
  24. namespace ROCKSDB_NAMESPACE {
  25. class Logger;
  26. namespace log {
  27. /**
  28. * Reader is a general purpose log stream reader implementation. The actual job
  29. * of reading from the device is implemented by the SequentialFile interface.
  30. *
  31. * Please see Writer for details on the file and record layout.
  32. */
  33. class Reader {
  34. public:
  35. // Interface for reporting errors.
  36. class Reporter {
  37. public:
  38. virtual ~Reporter();
  39. // Some corruption was detected. "size" is the approximate number
  40. // of bytes dropped due to the corruption.
  41. virtual void Corruption(size_t bytes, const Status& status,
  42. uint64_t log_number = kMaxSequenceNumber) = 0;
  43. virtual void OldLogRecord(size_t /*bytes*/) {}
  44. };
  45. // Create a reader that will return log records from "*file".
  46. // "*file" must remain live while this Reader is in use.
  47. //
  48. // If "reporter" is non-nullptr, it is notified whenever some data is
  49. // dropped due to a detected corruption. "*reporter" must remain
  50. // live while this Reader is in use.
  51. //
  52. // If "checksum" is true, verify checksums if available.
  53. // TODO(hx235): separate WAL related parameters from general `Reader`
  54. // parameters
  55. Reader(std::shared_ptr<Logger> info_log,
  56. std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
  57. bool checksum, uint64_t log_num, bool track_and_verify_wals = false,
  58. bool stop_replay_for_corruption = false,
  59. uint64_t min_wal_number_to_keep = std::numeric_limits<uint64_t>::max(),
  60. const PredecessorWALInfo& observed_predecessor_wal_info =
  61. PredecessorWALInfo());
  62. // No copying allowed
  63. Reader(const Reader&) = delete;
  64. void operator=(const Reader&) = delete;
  65. virtual ~Reader();
  66. // Read the next record into *record. Returns true if read
  67. // successfully, false if we hit end of the input. May use
  68. // "*scratch" as temporary storage. The contents filled in *record
  69. // will only be valid until the next mutating operation on this
  70. // reader or the next mutation to *scratch.
  71. // If record_checksum is not nullptr, then this function will calculate the
  72. // checksum of the record read and set record_checksum to it. The checksum is
  73. // calculated from the original buffers that contain the contents of the
  74. // record.
  75. virtual bool ReadRecord(Slice* record, std::string* scratch,
  76. WALRecoveryMode wal_recovery_mode =
  77. WALRecoveryMode::kTolerateCorruptedTailRecords,
  78. uint64_t* record_checksum = nullptr);
  79. // Return the recorded user-defined timestamp size that have been read so
  80. // far. This only applies to WAL logs.
  81. const UnorderedMap<uint32_t, size_t>& GetRecordedTimestampSize() const {
  82. return recorded_cf_to_ts_sz_;
  83. }
  84. // Returns the physical offset of the last record returned by ReadRecord.
  85. //
  86. // Undefined before the first call to ReadRecord.
  87. uint64_t LastRecordOffset();
  88. // Returns the first physical offset after the last record returned by
  89. // ReadRecord, or zero before first call to ReadRecord. This can also be
  90. // thought of as the "current" position in processing the file bytes.
  91. uint64_t LastRecordEnd();
  92. // returns true if the reader has encountered an eof condition.
  93. bool IsEOF() { return eof_; }
  94. // returns true if the reader has encountered read error.
  95. bool hasReadError() const { return read_error_; }
  96. // when we know more data has been written to the file. we can use this
  97. // function to force the reader to look again in the file.
  98. // Also aligns the file position indicator to the start of the next block
  99. // by reading the rest of the data from the EOF position to the end of the
  100. // block that was partially read.
  101. virtual void UnmarkEOF();
  102. SequentialFileReader* file() { return file_.get(); }
  103. Reporter* GetReporter() const { return reporter_; }
  104. uint64_t GetLogNumber() const { return log_number_; }
  105. size_t GetReadOffset() const {
  106. return static_cast<size_t>(end_of_buffer_offset_);
  107. }
  108. bool IsCompressedAndEmptyFile() {
  109. return !first_record_read_ && compression_type_record_read_;
  110. }
  111. protected:
  112. std::shared_ptr<Logger> info_log_;
  113. const std::unique_ptr<SequentialFileReader> file_;
  114. Reporter* const reporter_;
  115. bool const checksum_;
  116. char* const backing_store_;
  117. // Internal state variables used for reading records
  118. Slice buffer_;
  119. bool eof_; // Last Read() indicated EOF by returning < kBlockSize
  120. bool read_error_; // Error occurred while reading from file
  121. // Offset of the file position indicator within the last block when an
  122. // EOF was detected.
  123. size_t eof_offset_;
  124. // Offset of the last record returned by ReadRecord.
  125. uint64_t last_record_offset_;
  126. // Offset of the first location past the end of buffer_.
  127. uint64_t end_of_buffer_offset_;
  128. // which log number this is
  129. uint64_t const log_number_;
  130. // See `Options::track_and_verify_wals`
  131. bool track_and_verify_wals_;
  132. // Below variables are used for WAL verification
  133. // TODO(hx235): To revise `stop_replay_for_corruption_` inside `LogReader`
  134. // since we have `observed_predecessor_wal_info_` to verify against the
  135. // `recorded_predecessor_wal_info_` recorded in current WAL. If there is no
  136. // WAL hole, we can revise `stop_replay_for_corruption_` to be false.
  137. bool stop_replay_for_corruption_;
  138. uint64_t min_wal_number_to_keep_;
  139. PredecessorWALInfo observed_predecessor_wal_info_;
  140. // Whether this is a recycled log file
  141. bool recycled_;
  142. // Whether the first record has been read or not.
  143. bool first_record_read_;
  144. // Type of compression used
  145. CompressionType compression_type_;
  146. // Track whether the compression type record has been read or not.
  147. bool compression_type_record_read_;
  148. StreamingUncompress* uncompress_;
  149. // Reusable uncompressed output buffer
  150. std::unique_ptr<char[]> uncompressed_buffer_;
  151. // Reusable uncompressed record
  152. std::string uncompressed_record_;
  153. // Used for stream hashing fragment content in ReadRecord()
  154. XXH3_state_t* hash_state_;
  155. // Used for stream hashing uncompressed buffer in ReadPhysicalRecord()
  156. XXH3_state_t* uncompress_hash_state_;
  157. // The recorded user-defined timestamp sizes that have been read so far. This
  158. // is only for WAL logs.
  159. UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;
  160. // Extend record types with the following special values
  161. enum : uint8_t {
  162. kEof = kMaxRecordType + 1,
  163. // Returned whenever we find an invalid physical record.
  164. // Currently there are three situations in which this happens:
  165. // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
  166. // * The record is a 0-length record (No drop is reported)
  167. kBadRecord = kMaxRecordType + 2,
  168. // Returned when we fail to read a valid header.
  169. kBadHeader = kMaxRecordType + 3,
  170. // Returned when we read an old record from a previous user of the log.
  171. kOldRecord = kMaxRecordType + 4,
  172. // Returned when we get a bad record length
  173. kBadRecordLen = kMaxRecordType + 5,
  174. // Returned when we get a bad record checksum
  175. kBadRecordChecksum = kMaxRecordType + 6,
  176. };
  177. // Return type, or one of the preceding special values
  178. // If WAL compression is enabled, fragment_checksum is the checksum of the
  179. // fragment computed from the original buffer containing uncompressed
  180. // fragment.
  181. uint8_t ReadPhysicalRecord(Slice* result, size_t* drop_size,
  182. uint64_t* fragment_checksum = nullptr);
  183. // Read some more
  184. bool ReadMore(size_t* drop_size, uint8_t* error);
  185. void UnmarkEOFInternal();
  186. // Reports dropped bytes to the reporter.
  187. // buffer_ must be updated to remove the dropped bytes prior to invocation.
  188. void ReportCorruption(size_t bytes, const char* reason,
  189. uint64_t log_number = kMaxSequenceNumber);
  190. void ReportDrop(size_t bytes, const Status& reason,
  191. uint64_t log_number = kMaxSequenceNumber);
  192. void ReportOldLogRecord(size_t bytes);
  193. void InitCompression(const CompressionTypeRecord& compression_record);
  194. Status UpdateRecordedTimestampSize(
  195. const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz);
  196. void MaybeVerifyPredecessorWALInfo(
  197. WALRecoveryMode wal_recovery_mode, Slice fragment,
  198. const PredecessorWALInfo& recorded_predecessor_wal_info);
  199. };
  200. class FragmentBufferedReader : public Reader {
  201. public:
  202. FragmentBufferedReader(std::shared_ptr<Logger> info_log,
  203. std::unique_ptr<SequentialFileReader>&& _file,
  204. Reporter* reporter, bool checksum, uint64_t log_num)
  205. : Reader(info_log, std::move(_file), reporter, checksum, log_num,
  206. false /*verify_and_track_wals*/,
  207. false /*stop_replay_for_corruption*/,
  208. std::numeric_limits<uint64_t>::max() /*min_wal_number_to_keep*/,
  209. PredecessorWALInfo() /*observed_predecessor_wal_info*/),
  210. fragments_(),
  211. in_fragmented_record_(false) {}
  212. ~FragmentBufferedReader() override {}
  213. bool ReadRecord(Slice* record, std::string* scratch,
  214. WALRecoveryMode wal_recovery_mode =
  215. WALRecoveryMode::kTolerateCorruptedTailRecords,
  216. uint64_t* record_checksum = nullptr) override;
  217. void UnmarkEOF() override;
  218. private:
  219. std::string fragments_;
  220. bool in_fragmented_record_;
  221. bool TryReadFragment(Slice* result, size_t* drop_size,
  222. uint8_t* fragment_type_or_err);
  223. bool TryReadMore(size_t* drop_size, uint8_t* error);
  224. // No copy allowed
  225. FragmentBufferedReader(const FragmentBufferedReader&);
  226. void operator=(const FragmentBufferedReader&);
  227. };
  228. } // namespace log
  229. } // namespace ROCKSDB_NAMESPACE