blob_log_reader.cc 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #ifndef ROCKSDB_LITE
  7. #include "utilities/blob_db/blob_log_reader.h"
  8. #include <algorithm>
  9. #include "file/random_access_file_reader.h"
  10. #include "monitoring/statistics.h"
  11. #include "util/stop_watch.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. namespace blob_db {
  14. Reader::Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
  15. Statistics* statistics)
  16. : file_(std::move(file_reader)),
  17. env_(env),
  18. statistics_(statistics),
  19. buffer_(),
  20. next_byte_(0) {}
  21. Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
  22. StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
  23. Status s = file_->Read(next_byte_, static_cast<size_t>(size), slice, buf);
  24. next_byte_ += size;
  25. if (!s.ok()) {
  26. return s;
  27. }
  28. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
  29. if (slice->size() != size) {
  30. return Status::Corruption("EOF reached while reading record");
  31. }
  32. return s;
  33. }
  34. Status Reader::ReadHeader(BlobLogHeader* header) {
  35. assert(file_.get() != nullptr);
  36. assert(next_byte_ == 0);
  37. Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
  38. if (!s.ok()) {
  39. return s;
  40. }
  41. if (buffer_.size() != BlobLogHeader::kSize) {
  42. return Status::Corruption("EOF reached before file header");
  43. }
  44. return header->DecodeFrom(buffer_);
  45. }
  46. Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
  47. uint64_t* blob_offset) {
  48. Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
  49. if (!s.ok()) {
  50. return s;
  51. }
  52. if (buffer_.size() != BlobLogRecord::kHeaderSize) {
  53. return Status::Corruption("EOF reached before record header");
  54. }
  55. s = record->DecodeHeaderFrom(buffer_);
  56. if (!s.ok()) {
  57. return s;
  58. }
  59. uint64_t kb_size = record->key_size + record->value_size;
  60. if (blob_offset != nullptr) {
  61. *blob_offset = next_byte_ + record->key_size;
  62. }
  63. switch (level) {
  64. case kReadHeader:
  65. next_byte_ += kb_size;
  66. break;
  67. case kReadHeaderKey:
  68. record->key_buf.reset(new char[record->key_size]);
  69. s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
  70. next_byte_ += record->value_size;
  71. break;
  72. case kReadHeaderKeyBlob:
  73. record->key_buf.reset(new char[record->key_size]);
  74. s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
  75. if (s.ok()) {
  76. record->value_buf.reset(new char[record->value_size]);
  77. s = ReadSlice(record->value_size, &record->value,
  78. record->value_buf.get());
  79. }
  80. if (s.ok()) {
  81. s = record->CheckBlobCRC();
  82. }
  83. break;
  84. }
  85. return s;
  86. }
  87. } // namespace blob_db
  88. } // namespace ROCKSDB_NAMESPACE
  89. #endif // ROCKSDB_LITE