blob_log_writer.cc 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_log_writer.h"
  6. #include <cstdint>
  7. #include <string>
  8. #include "db/blob/blob_log_format.h"
  9. #include "file/writable_file_writer.h"
  10. #include "monitoring/statistics_impl.h"
  11. #include "rocksdb/system_clock.h"
  12. #include "test_util/sync_point.h"
  13. #include "util/coding.h"
  14. #include "util/stop_watch.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
  17. SystemClock* clock, Statistics* statistics,
  18. uint64_t log_number, bool use_fs, bool do_flush,
  19. uint64_t boffset)
  20. : dest_(std::move(dest)),
  21. clock_(clock),
  22. statistics_(statistics),
  23. log_number_(log_number),
  24. block_offset_(boffset),
  25. use_fsync_(use_fs),
  26. do_flush_(do_flush),
  27. last_elem_type_(kEtNone) {}
  28. BlobLogWriter::~BlobLogWriter() = default;
  29. Status BlobLogWriter::Sync(const WriteOptions& write_options) {
  30. TEST_SYNC_POINT("BlobLogWriter::Sync");
  31. StopWatch sync_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
  32. IOOptions opts;
  33. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  34. if (s.ok()) {
  35. s = dest_->Sync(opts, use_fsync_);
  36. }
  37. if (s.ok()) {
  38. RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
  39. }
  40. return s;
  41. }
  42. Status BlobLogWriter::WriteHeader(const WriteOptions& write_options,
  43. BlobLogHeader& header) {
  44. assert(block_offset_ == 0);
  45. assert(last_elem_type_ == kEtNone);
  46. std::string str;
  47. header.EncodeTo(&str);
  48. IOOptions opts;
  49. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  50. if (s.ok()) {
  51. s = dest_->Append(opts, Slice(str));
  52. }
  53. if (s.ok()) {
  54. block_offset_ += str.size();
  55. if (do_flush_) {
  56. s = dest_->Flush(opts);
  57. }
  58. }
  59. last_elem_type_ = kEtFileHdr;
  60. if (s.ok()) {
  61. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  62. BlobLogHeader::kSize);
  63. }
  64. return s;
  65. }
  66. Status BlobLogWriter::AppendFooter(const WriteOptions& write_options,
  67. BlobLogFooter& footer,
  68. std::string* checksum_method,
  69. std::string* checksum_value) {
  70. assert(block_offset_ != 0);
  71. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  72. std::string str;
  73. footer.EncodeTo(&str);
  74. Status s;
  75. if (dest_->seen_error()) {
  76. s.PermitUncheckedError();
  77. return Status::IOError("Seen Error. Skip closing.");
  78. } else {
  79. IOOptions opts;
  80. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  81. if (s.ok()) {
  82. s = dest_->Append(opts, Slice(str));
  83. }
  84. if (s.ok()) {
  85. block_offset_ += str.size();
  86. s = Sync(write_options);
  87. if (s.ok()) {
  88. s = dest_->Close(opts);
  89. if (s.ok()) {
  90. assert(!!checksum_method == !!checksum_value);
  91. if (checksum_method) {
  92. assert(checksum_method->empty());
  93. std::string method = dest_->GetFileChecksumFuncName();
  94. if (method != kUnknownFileChecksumFuncName) {
  95. *checksum_method = std::move(method);
  96. }
  97. }
  98. if (checksum_value) {
  99. assert(checksum_value->empty());
  100. std::string value = dest_->GetFileChecksum();
  101. if (value != kUnknownFileChecksum) {
  102. *checksum_value = std::move(value);
  103. }
  104. }
  105. }
  106. }
  107. }
  108. dest_.reset();
  109. }
  110. last_elem_type_ = kEtFileFooter;
  111. if (s.ok()) {
  112. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  113. BlobLogFooter::kSize);
  114. }
  115. return s;
  116. }
  117. Status BlobLogWriter::AddRecord(const WriteOptions& write_options,
  118. const Slice& key, const Slice& val,
  119. uint64_t expiration, uint64_t* key_offset,
  120. uint64_t* blob_offset) {
  121. assert(block_offset_ != 0);
  122. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  123. std::string buf;
  124. ConstructBlobHeader(&buf, key, val, expiration);
  125. Status s =
  126. EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset);
  127. return s;
  128. }
  129. Status BlobLogWriter::AddRecord(const WriteOptions& write_options,
  130. const Slice& key, const Slice& val,
  131. uint64_t* key_offset, uint64_t* blob_offset) {
  132. assert(block_offset_ != 0);
  133. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  134. std::string buf;
  135. ConstructBlobHeader(&buf, key, val, 0);
  136. Status s =
  137. EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset);
  138. return s;
  139. }
  140. void BlobLogWriter::ConstructBlobHeader(std::string* buf, const Slice& key,
  141. const Slice& val, uint64_t expiration) {
  142. BlobLogRecord record;
  143. record.key = key;
  144. record.value = val;
  145. record.expiration = expiration;
  146. record.EncodeHeaderTo(buf);
  147. }
  148. Status BlobLogWriter::EmitPhysicalRecord(const WriteOptions& write_options,
  149. const std::string& headerbuf,
  150. const Slice& key, const Slice& val,
  151. uint64_t* key_offset,
  152. uint64_t* blob_offset) {
  153. IOOptions opts;
  154. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  155. if (s.ok()) {
  156. s = dest_->Append(opts, Slice(headerbuf));
  157. }
  158. if (s.ok()) {
  159. s = dest_->Append(opts, key);
  160. }
  161. if (s.ok()) {
  162. s = dest_->Append(opts, val);
  163. }
  164. if (do_flush_ && s.ok()) {
  165. s = dest_->Flush(opts);
  166. }
  167. *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
  168. *blob_offset = *key_offset + key.size();
  169. block_offset_ = *blob_offset + val.size();
  170. last_elem_type_ = kEtRecord;
  171. if (s.ok()) {
  172. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  173. BlobLogRecord::kHeaderSize + key.size() + val.size());
  174. }
  175. return s;
  176. }
  177. } // namespace ROCKSDB_NAMESPACE