blob_log_sequential_reader.cc 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "db/blob/blob_log_sequential_reader.h"
  7. #include "file/random_access_file_reader.h"
  8. #include "monitoring/statistics_impl.h"
  9. #include "util/stop_watch.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. BlobLogSequentialReader::BlobLogSequentialReader(
  12. std::unique_ptr<RandomAccessFileReader>&& file_reader, SystemClock* clock,
  13. Statistics* statistics)
  14. : file_(std::move(file_reader)),
  15. clock_(clock),
  16. statistics_(statistics),
  17. next_byte_(0) {}
  18. BlobLogSequentialReader::~BlobLogSequentialReader() = default;
  19. Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
  20. char* buf) {
  21. assert(slice);
  22. assert(file_);
  23. StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
  24. // TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?)
  25. Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
  26. slice, buf, nullptr);
  27. next_byte_ += size;
  28. if (!s.ok()) {
  29. return s;
  30. }
  31. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
  32. if (slice->size() != size) {
  33. return Status::Corruption("EOF reached while reading record");
  34. }
  35. return s;
  36. }
  37. Status BlobLogSequentialReader::ReadHeader(BlobLogHeader* header) {
  38. assert(header);
  39. assert(next_byte_ == 0);
  40. static_assert(BlobLogHeader::kSize <= sizeof(header_buf_),
  41. "Buffer is smaller than BlobLogHeader::kSize");
  42. Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
  43. if (!s.ok()) {
  44. return s;
  45. }
  46. if (buffer_.size() != BlobLogHeader::kSize) {
  47. return Status::Corruption("EOF reached before file header");
  48. }
  49. return header->DecodeFrom(buffer_);
  50. }
  51. Status BlobLogSequentialReader::ReadRecord(BlobLogRecord* record,
  52. ReadLevel level,
  53. uint64_t* blob_offset) {
  54. assert(record);
  55. static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_),
  56. "Buffer is smaller than BlobLogRecord::kHeaderSize");
  57. Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
  58. if (!s.ok()) {
  59. return s;
  60. }
  61. if (buffer_.size() != BlobLogRecord::kHeaderSize) {
  62. return Status::Corruption("EOF reached before record header");
  63. }
  64. s = record->DecodeHeaderFrom(buffer_);
  65. if (!s.ok()) {
  66. return s;
  67. }
  68. uint64_t kb_size = record->key_size + record->value_size;
  69. if (blob_offset != nullptr) {
  70. *blob_offset = next_byte_ + record->key_size;
  71. }
  72. switch (level) {
  73. case kReadHeader:
  74. next_byte_ += kb_size;
  75. break;
  76. case kReadHeaderKey:
  77. record->key_buf.reset(new char[record->key_size]);
  78. s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
  79. next_byte_ += record->value_size;
  80. break;
  81. case kReadHeaderKeyBlob:
  82. record->key_buf.reset(new char[record->key_size]);
  83. s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
  84. if (s.ok()) {
  85. record->value_buf.reset(new char[record->value_size]);
  86. s = ReadSlice(record->value_size, &record->value,
  87. record->value_buf.get());
  88. }
  89. if (s.ok()) {
  90. s = record->CheckBlobCRC();
  91. }
  92. break;
  93. }
  94. return s;
  95. }
  96. Status BlobLogSequentialReader::ReadFooter(BlobLogFooter* footer) {
  97. assert(footer);
  98. static_assert(BlobLogFooter::kSize <= sizeof(header_buf_),
  99. "Buffer is smaller than BlobLogFooter::kSize");
  100. Status s = ReadSlice(BlobLogFooter::kSize, &buffer_, header_buf_);
  101. if (!s.ok()) {
  102. return s;
  103. }
  104. if (buffer_.size() != BlobLogFooter::kSize) {
  105. return Status::Corruption("EOF reached before file footer");
  106. }
  107. return footer->DecodeFrom(buffer_);
  108. }
  109. } // namespace ROCKSDB_NAMESPACE