blob_log_writer.cc 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/blob_db/blob_log_writer.h"
  7. #include <cstdint>
  8. #include <string>
  9. #include "file/writable_file_writer.h"
  10. #include "monitoring/statistics.h"
  11. #include "rocksdb/env.h"
  12. #include "util/coding.h"
  13. #include "util/stop_watch.h"
  14. #include "utilities/blob_db/blob_log_format.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. namespace blob_db {
  17. Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
  18. Statistics* statistics, uint64_t log_number, uint64_t bpsync,
  19. bool use_fs, uint64_t boffset)
  20. : dest_(std::move(dest)),
  21. env_(env),
  22. statistics_(statistics),
  23. log_number_(log_number),
  24. block_offset_(boffset),
  25. bytes_per_sync_(bpsync),
  26. next_sync_offset_(0),
  27. use_fsync_(use_fs),
  28. last_elem_type_(kEtNone) {}
  29. Status Writer::Sync() {
  30. StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
  31. Status s = dest_->Sync(use_fsync_);
  32. RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
  33. return s;
  34. }
  35. Status Writer::WriteHeader(BlobLogHeader& header) {
  36. assert(block_offset_ == 0);
  37. assert(last_elem_type_ == kEtNone);
  38. std::string str;
  39. header.EncodeTo(&str);
  40. Status s = dest_->Append(Slice(str));
  41. if (s.ok()) {
  42. block_offset_ += str.size();
  43. s = dest_->Flush();
  44. }
  45. last_elem_type_ = kEtFileHdr;
  46. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  47. BlobLogHeader::kSize);
  48. return s;
  49. }
  50. Status Writer::AppendFooter(BlobLogFooter& footer) {
  51. assert(block_offset_ != 0);
  52. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  53. std::string str;
  54. footer.EncodeTo(&str);
  55. Status s = dest_->Append(Slice(str));
  56. if (s.ok()) {
  57. block_offset_ += str.size();
  58. s = dest_->Close();
  59. dest_.reset();
  60. }
  61. last_elem_type_ = kEtFileFooter;
  62. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  63. BlobLogFooter::kSize);
  64. return s;
  65. }
  66. Status Writer::AddRecord(const Slice& key, const Slice& val,
  67. uint64_t expiration, uint64_t* key_offset,
  68. uint64_t* blob_offset) {
  69. assert(block_offset_ != 0);
  70. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  71. std::string buf;
  72. ConstructBlobHeader(&buf, key, val, expiration);
  73. Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
  74. return s;
  75. }
  76. Status Writer::AddRecord(const Slice& key, const Slice& val,
  77. uint64_t* key_offset, uint64_t* blob_offset) {
  78. assert(block_offset_ != 0);
  79. assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
  80. std::string buf;
  81. ConstructBlobHeader(&buf, key, val, 0);
  82. Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
  83. return s;
  84. }
  85. void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
  86. const Slice& val, uint64_t expiration) {
  87. BlobLogRecord record;
  88. record.key = key;
  89. record.value = val;
  90. record.expiration = expiration;
  91. record.EncodeHeaderTo(buf);
  92. }
  93. Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
  94. const Slice& key, const Slice& val,
  95. uint64_t* key_offset, uint64_t* blob_offset) {
  96. StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
  97. Status s = dest_->Append(Slice(headerbuf));
  98. if (s.ok()) {
  99. s = dest_->Append(key);
  100. }
  101. if (s.ok()) {
  102. s = dest_->Append(val);
  103. }
  104. if (s.ok()) {
  105. s = dest_->Flush();
  106. }
  107. *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
  108. *blob_offset = *key_offset + key.size();
  109. block_offset_ = *blob_offset + val.size();
  110. last_elem_type_ = kEtRecord;
  111. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
  112. BlobLogRecord::kHeaderSize + key.size() + val.size());
  113. return s;
  114. }
  115. } // namespace blob_db
  116. } // namespace ROCKSDB_NAMESPACE
  117. #endif // ROCKSDB_LITE