log_writer.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/log_writer.h"
  10. #include <cstdint>
  11. #include "file/writable_file_writer.h"
  12. #include "rocksdb/env.h"
  13. #include "rocksdb/io_status.h"
  14. #include "util/coding.h"
  15. #include "util/crc32c.h"
  16. #include "util/udt_util.h"
  17. namespace ROCKSDB_NAMESPACE::log {
  18. Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
  19. bool recycle_log_files, bool manual_flush,
  20. CompressionType compression_type, bool track_and_verify_wals)
  21. : dest_(std::move(dest)),
  22. block_offset_(0),
  23. log_number_(log_number),
  24. recycle_log_files_(recycle_log_files),
  25. // Header size varies depending on whether we are recycling or not.
  26. header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize),
  27. manual_flush_(manual_flush),
  28. compression_type_(compression_type),
  29. compress_(nullptr),
  30. track_and_verify_wals_(track_and_verify_wals),
  31. last_seqno_recorded_(0) {
  32. for (uint8_t i = 0; i <= kMaxRecordType; i++) {
  33. char t = static_cast<char>(i);
  34. type_crc_[i] = crc32c::Value(&t, 1);
  35. }
  36. }
  37. Writer::~Writer() {
  38. ThreadStatus::OperationType cur_op_type =
  39. ThreadStatusUtil::GetThreadOperation();
  40. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  41. if (dest_) {
  42. WriteBuffer(WriteOptions()).PermitUncheckedError();
  43. }
  44. if (compress_) {
  45. delete compress_;
  46. }
  47. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  48. }
  49. IOStatus Writer::WriteBuffer(const WriteOptions& write_options) {
  50. IOStatus s = MaybeHandleSeenFileWriterError();
  51. if (!s.ok()) {
  52. return s;
  53. }
  54. IOOptions opts;
  55. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  56. if (!s.ok()) {
  57. return s;
  58. }
  59. return dest_->Flush(opts);
  60. }
  61. IOStatus Writer::Close(const WriteOptions& write_options) {
  62. IOStatus s;
  63. IOOptions opts;
  64. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  65. if (s.ok() && dest_) {
  66. s = dest_->Close(opts);
  67. dest_.reset();
  68. }
  69. return s;
  70. }
  71. bool Writer::PublishIfClosed() {
  72. if (dest_->IsClosed()) {
  73. dest_.reset();
  74. return true;
  75. } else {
  76. return false;
  77. }
  78. }
  79. IOStatus Writer::AddRecord(const WriteOptions& write_options,
  80. const Slice& slice, const SequenceNumber& seqno) {
  81. IOStatus s = MaybeHandleSeenFileWriterError();
  82. if (!s.ok()) {
  83. return s;
  84. }
  85. const char* ptr = slice.data();
  86. size_t left = slice.size();
  87. // Fragment the record if necessary and emit it. Note that if slice
  88. // is empty, we still want to iterate once to emit a single
  89. // zero-length record
  90. bool begin = true;
  91. int compress_remaining = 0;
  92. bool compress_start = false;
  93. if (compress_) {
  94. compress_->Reset();
  95. compress_start = true;
  96. }
  97. IOOptions opts;
  98. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  99. if (s.ok()) {
  100. do {
  101. const int64_t leftover = kBlockSize - block_offset_;
  102. assert(leftover >= 0);
  103. if (leftover < header_size_) {
  104. // Switch to a new block
  105. if (leftover > 0) {
  106. // Fill the trailer (literal below relies on kHeaderSize and
  107. // kRecyclableHeaderSize being <= 11)
  108. assert(header_size_ <= 11);
  109. s = dest_->Append(opts,
  110. Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
  111. static_cast<size_t>(leftover)),
  112. 0 /* crc32c_checksum */);
  113. if (!s.ok()) {
  114. break;
  115. }
  116. }
  117. block_offset_ = 0;
  118. }
  119. // Invariant: we never leave < header_size bytes in a block.
  120. assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size_);
  121. const size_t avail = kBlockSize - block_offset_ - header_size_;
  122. // Compress the record if compression is enabled.
  123. // Compress() is called at least once (compress_start=true) and after the
  124. // previous generated compressed chunk is written out as one or more
  125. // physical records (left=0).
  126. if (compress_ && (compress_start || left == 0)) {
  127. compress_remaining = compress_->Compress(
  128. slice.data(), slice.size(), compressed_buffer_.get(), &left);
  129. if (compress_remaining < 0) {
  130. // Set failure status
  131. s = IOStatus::IOError("Unexpected WAL compression error");
  132. s.SetDataLoss(true);
  133. break;
  134. } else if (left == 0) {
  135. // Nothing left to compress
  136. if (!compress_start) {
  137. break;
  138. }
  139. }
  140. compress_start = false;
  141. ptr = compressed_buffer_.get();
  142. }
  143. const size_t fragment_length = (left < avail) ? left : avail;
  144. RecordType type;
  145. const bool end = (left == fragment_length && compress_remaining == 0);
  146. if (begin && end) {
  147. type = recycle_log_files_ ? kRecyclableFullType : kFullType;
  148. } else if (begin) {
  149. type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
  150. } else if (end) {
  151. type = recycle_log_files_ ? kRecyclableLastType : kLastType;
  152. } else {
  153. type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
  154. }
  155. s = EmitPhysicalRecord(write_options, type, ptr, fragment_length);
  156. ptr += fragment_length;
  157. left -= fragment_length;
  158. begin = false;
  159. } while (s.ok() && (left > 0 || compress_remaining > 0));
  160. }
  161. if (s.ok()) {
  162. if (!manual_flush_) {
  163. s = dest_->Flush(opts);
  164. }
  165. }
  166. if (s.ok()) {
  167. last_seqno_recorded_ = std::max(last_seqno_recorded_, seqno);
  168. }
  169. return s;
  170. }
  171. IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
  172. // Should be the first record
  173. assert(block_offset_ == 0);
  174. if (compression_type_ == kNoCompression) {
  175. // No need to add a record
  176. return IOStatus::OK();
  177. }
  178. IOStatus s = MaybeHandleSeenFileWriterError();
  179. if (!s.ok()) {
  180. return s;
  181. }
  182. CompressionTypeRecord record(compression_type_);
  183. std::string encode;
  184. record.EncodeTo(&encode);
  185. s = EmitPhysicalRecord(write_options, kSetCompressionType, encode.data(),
  186. encode.size());
  187. if (s.ok()) {
  188. if (!manual_flush_) {
  189. IOOptions io_opts;
  190. s = WritableFileWriter::PrepareIOOptions(write_options, io_opts);
  191. if (s.ok()) {
  192. s = dest_->Flush(io_opts);
  193. }
  194. }
  195. // Initialize fields required for compression
  196. const size_t max_output_buffer_len = kBlockSize - header_size_;
  197. CompressionOptions opts;
  198. constexpr uint32_t compression_format_version = 2;
  199. compress_ = StreamingCompress::Create(compression_type_, opts,
  200. compression_format_version,
  201. max_output_buffer_len);
  202. assert(compress_ != nullptr);
  203. compressed_buffer_ =
  204. std::unique_ptr<char[]>(new char[max_output_buffer_len]);
  205. assert(compressed_buffer_);
  206. } else {
  207. // Disable compression if the record could not be added.
  208. compression_type_ = kNoCompression;
  209. }
  210. return s;
  211. }
  212. IOStatus Writer::MaybeAddPredecessorWALInfo(const WriteOptions& write_options,
  213. const PredecessorWALInfo& info) {
  214. IOStatus s = MaybeHandleSeenFileWriterError();
  215. if (!s.ok()) {
  216. return s;
  217. }
  218. if (!track_and_verify_wals_ || !info.IsInitialized()) {
  219. return IOStatus::OK();
  220. }
  221. std::string encode;
  222. info.EncodeTo(&encode);
  223. s = MaybeSwitchToNewBlock(write_options, encode);
  224. if (!s.ok()) {
  225. return s;
  226. }
  227. RecordType type = recycle_log_files_ ? kRecyclePredecessorWALInfoType
  228. : kPredecessorWALInfoType;
  229. s = EmitPhysicalRecord(write_options, type, encode.data(), encode.size());
  230. if (!s.ok()) {
  231. return s;
  232. }
  233. if (!manual_flush_) {
  234. IOOptions io_opts;
  235. s = WritableFileWriter::PrepareIOOptions(write_options, io_opts);
  236. if (s.ok()) {
  237. s = dest_->Flush(io_opts);
  238. }
  239. }
  240. return s;
  241. }
  242. IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
  243. const WriteOptions& write_options,
  244. const UnorderedMap<uint32_t, size_t>& cf_to_ts_sz) {
  245. std::vector<std::pair<uint32_t, size_t>> ts_sz_to_record;
  246. for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) {
  247. if (recorded_cf_to_ts_sz_.count(cf_id) != 0) {
  248. // A column family's user-defined timestamp size should not be
  249. // updated while DB is running.
  250. assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz);
  251. } else if (ts_sz != 0) {
  252. ts_sz_to_record.emplace_back(cf_id, ts_sz);
  253. recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz));
  254. }
  255. }
  256. if (ts_sz_to_record.empty()) {
  257. return IOStatus::OK();
  258. }
  259. UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record));
  260. std::string encoded;
  261. record.EncodeTo(&encoded);
  262. RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
  263. : kUserDefinedTimestampSizeType;
  264. IOStatus s = MaybeSwitchToNewBlock(write_options, encoded);
  265. if (!s.ok()) {
  266. return s;
  267. }
  268. return EmitPhysicalRecord(write_options, type, encoded.data(),
  269. encoded.size());
  270. }
  271. bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
  272. IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options,
  273. RecordType t, const char* ptr, size_t n) {
  274. assert(n <= 0xffff); // Must fit in two bytes
  275. size_t header_size;
  276. char buf[kRecyclableHeaderSize];
  277. // Format the header
  278. buf[4] = static_cast<char>(n & 0xff);
  279. buf[5] = static_cast<char>(n >> 8);
  280. buf[6] = static_cast<char>(t);
  281. uint32_t crc = type_crc_[t];
  282. if (t < kRecyclableFullType || t == kSetCompressionType ||
  283. t == kPredecessorWALInfoType || t == kUserDefinedTimestampSizeType) {
  284. // Legacy record format
  285. assert(block_offset_ + kHeaderSize + n <= kBlockSize);
  286. header_size = kHeaderSize;
  287. } else {
  288. // Recyclable record format
  289. assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
  290. header_size = kRecyclableHeaderSize;
  291. // Only encode low 32-bits of the 64-bit log number. This means
  292. // we will fail to detect an old record if we recycled a log from
  293. // ~4 billion logs ago, but that is effectively impossible, and
  294. // even if it were we'dbe far more likely to see a false positive
  295. // on the 32-bit CRC.
  296. EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
  297. crc = crc32c::Extend(crc, buf + 7, 4);
  298. }
  299. // Compute the crc of the record type and the payload.
  300. uint32_t payload_crc = crc32c::Value(ptr, n);
  301. crc = crc32c::Crc32cCombine(crc, payload_crc, n);
  302. crc = crc32c::Mask(crc); // Adjust for storage
  303. TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
  304. &crc);
  305. EncodeFixed32(buf, crc);
  306. // Write the header and the payload
  307. IOOptions opts;
  308. IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  309. if (s.ok()) {
  310. s = dest_->Append(opts, Slice(buf, header_size), 0 /* crc32c_checksum */);
  311. }
  312. if (s.ok()) {
  313. s = dest_->Append(opts, Slice(ptr, n), payload_crc);
  314. }
  315. block_offset_ += header_size + n;
  316. return s;
  317. }
  318. IOStatus Writer::MaybeHandleSeenFileWriterError() {
  319. if (dest_->seen_error()) {
  320. #ifndef NDEBUG
  321. if (dest_->seen_injected_error()) {
  322. std::stringstream msg;
  323. msg << "Seen " << FaultInjectionTestFS::kInjected
  324. << " error. Skip writing buffer.";
  325. return IOStatus::IOError(msg.str());
  326. }
  327. #endif // NDEBUG
  328. return IOStatus::IOError("Seen error. Skip writing buffer.");
  329. }
  330. return IOStatus::OK();
  331. }
  332. IOStatus Writer::MaybeSwitchToNewBlock(const WriteOptions& write_options,
  333. const std::string& content_to_write) {
  334. IOStatus s;
  335. const int64_t leftover = kBlockSize - block_offset_;
  336. // If there's not enough space for this record, switch to a new block.
  337. if (leftover < header_size_ + (int)content_to_write.size()) {
  338. IOOptions opts;
  339. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  340. if (!s.ok()) {
  341. return s;
  342. }
  343. std::vector<char> trailer(leftover, '\x00');
  344. s = dest_->Append(opts, Slice(trailer.data(), trailer.size()));
  345. if (!s.ok()) {
  346. return s;
  347. }
  348. block_offset_ = 0;
  349. }
  350. return s;
  351. }
  352. } // namespace ROCKSDB_NAMESPACE::log