log_writer.h 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <cstdint>
  11. #include <memory>
  12. #include <unordered_map>
  13. #include <vector>
  14. #include "db/dbformat.h"
  15. #include "db/log_format.h"
  16. #include "rocksdb/compression_type.h"
  17. #include "rocksdb/env.h"
  18. #include "rocksdb/io_status.h"
  19. #include "rocksdb/slice.h"
  20. #include "rocksdb/status.h"
  21. #include "util/compression.h"
  22. #include "util/hash_containers.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. class WritableFileWriter;
  25. namespace log {
  26. /**
  27. * Writer is a general purpose log stream writer. It provides an append-only
  28. * abstraction for writing data. The details of the how the data is written is
  29. * handled by the WritableFile sub-class implementation.
  30. *
  31. * File format:
  32. *
  33. * File is broken down into variable sized records. The format of each record
  34. * is described below.
  35. * +-----+-------------+--+----+----------+------+-- ... ----+
  36. * File | r0 | r1 |P | r2 | r3 | r4 | |
  37. * +-----+-------------+--+----+----------+------+-- ... ----+
  38. * <--- kBlockSize ------>|<-- kBlockSize ------>|
  39. * rn = variable size records
  40. * P = Padding
  41. *
  42. * Data is written out in kBlockSize chunks. If next record does not fit
  43. * into the space left, the leftover space will be padded with \0.
  44. *
  45. * Legacy record format:
  46. *
  47. * +---------+-----------+-----------+--- ... ---+
  48. * |CRC (4B) | Size (2B) | Type (1B) | Payload |
  49. * +---------+-----------+-----------+--- ... ---+
  50. *
  51. * CRC = 32bit hash computed over the record type and payload using CRC
  52. * Size = Length of the payload data
  53. * Type = Type of record
  54. * (kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
  55. * The type is used to group a bunch of records together to represent
  56. * blocks that are larger than kBlockSize
  57. * Payload = Byte stream as long as specified by the payload size
  58. *
  59. * Recyclable record format:
  60. *
  61. * +---------+-----------+-----------+----------------+--- ... ---+
  62. * |CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
  63. * +---------+-----------+-----------+----------------+--- ... ---+
  64. *
  65. * Same as above, with the addition of
  66. * Log number = 32bit log file number, so that we can distinguish between
  67. * records written by the most recent log writer vs a previous one.
  68. */
  69. class Writer {
  70. public:
  71. // Create a writer that will append data to "*dest".
  72. // "*dest" must be initially empty.
  73. // "*dest" must remain live while this Writer is in use.
  74. // TODO(hx235): separate WAL related parameters from general `Reader`
  75. // parameters
  76. explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
  77. uint64_t log_number, bool recycle_log_files,
  78. bool manual_flush = false,
  79. CompressionType compressionType = kNoCompression,
  80. bool track_and_verify_wals = false);
  81. // No copying allowed
  82. Writer(const Writer&) = delete;
  83. void operator=(const Writer&) = delete;
  84. ~Writer();
  85. IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice,
  86. const SequenceNumber& seqno = 0);
  87. IOStatus AddCompressionTypeRecord(const WriteOptions& write_options);
  88. IOStatus MaybeAddPredecessorWALInfo(const WriteOptions& write_options,
  89. const PredecessorWALInfo& info);
  90. // If there are column families in `cf_to_ts_sz` not included in
  91. // `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero,
  92. // adds a record of type kUserDefinedTimestampSizeType or
  93. // kRecyclableUserDefinedTimestampSizeType for these column families.
  94. // This timestamp size record applies to all subsequent records.
  95. IOStatus MaybeAddUserDefinedTimestampSizeRecord(
  96. const WriteOptions& write_options,
  97. const UnorderedMap<uint32_t, size_t>& cf_to_ts_sz);
  98. WritableFileWriter* file() { return dest_.get(); }
  99. const WritableFileWriter* file() const { return dest_.get(); }
  100. uint64_t get_log_number() const { return log_number_; }
  101. IOStatus WriteBuffer(const WriteOptions& write_options);
  102. IOStatus Close(const WriteOptions& write_options);
  103. // If closing the writer through file(), call this afterwards to modify
  104. // this object's state to reflect that. Returns true if the destination file
  105. // has been closed. If it hasn't been closed, returns false with no change.
  106. bool PublishIfClosed();
  107. bool BufferIsEmpty();
  108. size_t TEST_block_offset() const { return block_offset_; }
  109. SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; };
  110. private:
  111. std::unique_ptr<WritableFileWriter> dest_;
  112. size_t block_offset_; // Current offset in block
  113. uint64_t log_number_;
  114. bool recycle_log_files_;
  115. int header_size_;
  116. // crc32c values for all supported record types. These are
  117. // pre-computed to reduce the overhead of computing the crc of the
  118. // record type stored in the header.
  119. uint32_t type_crc_[kMaxRecordType + 1];
  120. IOStatus EmitPhysicalRecord(const WriteOptions& write_options,
  121. RecordType type, const char* ptr, size_t length);
  122. IOStatus MaybeHandleSeenFileWriterError();
  123. IOStatus MaybeSwitchToNewBlock(const WriteOptions& write_options,
  124. const std::string& content_to_write);
  125. // If true, it does not flush after each write. Instead it relies on the upper
  126. // layer to manually does the flush by calling ::WriteBuffer()
  127. bool manual_flush_;
  128. // Compression Type
  129. CompressionType compression_type_;
  130. StreamingCompress* compress_;
  131. // Reusable compressed output buffer
  132. std::unique_ptr<char[]> compressed_buffer_;
  133. // The recorded user-defined timestamp size that have been written so far.
  134. // Since the user-defined timestamp size cannot be changed while the DB is
  135. // running, existing entry in this map cannot be updated.
  136. UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;
  137. // See `Options::track_and_verify_wals`
  138. bool track_and_verify_wals_;
  139. SequenceNumber last_seqno_recorded_;
  140. };
  141. } // namespace log
  142. } // namespace ROCKSDB_NAMESPACE