blob_file_reader.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_file_reader.h"
  6. #include <cassert>
  7. #include <string>
  8. #include "db/blob/blob_contents.h"
  9. #include "db/blob/blob_log_format.h"
  10. #include "file/file_prefetch_buffer.h"
  11. #include "file/filename.h"
  12. #include "monitoring/statistics_impl.h"
  13. #include "options/cf_options.h"
  14. #include "rocksdb/file_system.h"
  15. #include "rocksdb/slice.h"
  16. #include "rocksdb/status.h"
  17. #include "table/multiget_context.h"
  18. #include "test_util/sync_point.h"
  19. #include "util/compression.h"
  20. #include "util/crc32c.h"
  21. #include "util/stop_watch.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. Status BlobFileReader::Create(
  24. const ImmutableOptions& immutable_options, const ReadOptions& read_options,
  25. const FileOptions& file_options, uint32_t column_family_id,
  26. HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
  27. const std::shared_ptr<IOTracer>& io_tracer,
  28. std::unique_ptr<BlobFileReader>* blob_file_reader) {
  29. assert(blob_file_reader);
  30. assert(!*blob_file_reader);
  31. uint64_t file_size = 0;
  32. std::unique_ptr<RandomAccessFileReader> file_reader;
  33. {
  34. const Status s =
  35. OpenFile(immutable_options, file_options, blob_file_read_hist,
  36. blob_file_number, io_tracer, &file_size, &file_reader);
  37. if (!s.ok()) {
  38. return s;
  39. }
  40. }
  41. assert(file_reader);
  42. Statistics* const statistics = immutable_options.stats;
  43. CompressionType compression_type = kNoCompression;
  44. {
  45. const Status s =
  46. ReadHeader(file_reader.get(), read_options, column_family_id,
  47. statistics, &compression_type);
  48. if (!s.ok()) {
  49. return s;
  50. }
  51. }
  52. {
  53. const Status s =
  54. ReadFooter(file_reader.get(), read_options, file_size, statistics);
  55. if (!s.ok()) {
  56. return s;
  57. }
  58. }
  59. blob_file_reader->reset(
  60. new BlobFileReader(std::move(file_reader), file_size, compression_type,
  61. immutable_options.clock, statistics));
  62. return Status::OK();
  63. }
  64. Status BlobFileReader::OpenFile(
  65. const ImmutableOptions& immutable_options, const FileOptions& file_opts,
  66. HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
  67. const std::shared_ptr<IOTracer>& io_tracer, uint64_t* file_size,
  68. std::unique_ptr<RandomAccessFileReader>* file_reader) {
  69. assert(file_size);
  70. assert(file_reader);
  71. const auto& cf_paths = immutable_options.cf_paths;
  72. assert(!cf_paths.empty());
  73. const std::string blob_file_path =
  74. BlobFileName(cf_paths.front().path, blob_file_number);
  75. FileSystem* const fs = immutable_options.fs.get();
  76. assert(fs);
  77. constexpr IODebugContext* dbg = nullptr;
  78. {
  79. TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
  80. const Status s =
  81. fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
  82. if (!s.ok()) {
  83. return s;
  84. }
  85. }
  86. if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
  87. return Status::Corruption("Malformed blob file");
  88. }
  89. std::unique_ptr<FSRandomAccessFile> file;
  90. {
  91. TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
  92. const Status s =
  93. fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
  94. if (!s.ok()) {
  95. return s;
  96. }
  97. }
  98. assert(file);
  99. if (immutable_options.advise_random_on_open) {
  100. file->Hint(FSRandomAccessFile::kRandom);
  101. }
  102. file_reader->reset(new RandomAccessFileReader(
  103. std::move(file), blob_file_path, immutable_options.clock, io_tracer,
  104. immutable_options.stats, BLOB_DB_BLOB_FILE_READ_MICROS,
  105. blob_file_read_hist, immutable_options.rate_limiter.get(),
  106. immutable_options.listeners));
  107. return Status::OK();
  108. }
  109. Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
  110. const ReadOptions& read_options,
  111. uint32_t column_family_id,
  112. Statistics* statistics,
  113. CompressionType* compression_type) {
  114. assert(file_reader);
  115. assert(compression_type);
  116. Slice header_slice;
  117. Buffer buf;
  118. AlignedBuf aligned_buf;
  119. {
  120. TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
  121. constexpr uint64_t read_offset = 0;
  122. constexpr size_t read_size = BlobLogHeader::kSize;
  123. const Status s =
  124. ReadFromFile(file_reader, read_options, read_offset, read_size,
  125. statistics, &header_slice, &buf, &aligned_buf);
  126. if (!s.ok()) {
  127. return s;
  128. }
  129. TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
  130. &header_slice);
  131. }
  132. BlobLogHeader header;
  133. {
  134. const Status s = header.DecodeFrom(header_slice);
  135. if (!s.ok()) {
  136. return s;
  137. }
  138. }
  139. constexpr ExpirationRange no_expiration_range;
  140. if (header.has_ttl || header.expiration_range != no_expiration_range) {
  141. return Status::Corruption("Unexpected TTL blob file");
  142. }
  143. if (header.column_family_id != column_family_id) {
  144. return Status::Corruption("Column family ID mismatch");
  145. }
  146. *compression_type = header.compression;
  147. return Status::OK();
  148. }
  149. Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
  150. const ReadOptions& read_options,
  151. uint64_t file_size, Statistics* statistics) {
  152. assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
  153. assert(file_reader);
  154. Slice footer_slice;
  155. Buffer buf;
  156. AlignedBuf aligned_buf;
  157. {
  158. TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
  159. const uint64_t read_offset = file_size - BlobLogFooter::kSize;
  160. constexpr size_t read_size = BlobLogFooter::kSize;
  161. const Status s =
  162. ReadFromFile(file_reader, read_options, read_offset, read_size,
  163. statistics, &footer_slice, &buf, &aligned_buf);
  164. if (!s.ok()) {
  165. return s;
  166. }
  167. TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
  168. &footer_slice);
  169. }
  170. BlobLogFooter footer;
  171. {
  172. const Status s = footer.DecodeFrom(footer_slice);
  173. if (!s.ok()) {
  174. return s;
  175. }
  176. }
  177. constexpr ExpirationRange no_expiration_range;
  178. if (footer.expiration_range != no_expiration_range) {
  179. return Status::Corruption("Unexpected TTL blob file");
  180. }
  181. return Status::OK();
  182. }
  183. Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
  184. const ReadOptions& read_options,
  185. uint64_t read_offset, size_t read_size,
  186. Statistics* statistics, Slice* slice,
  187. Buffer* buf, AlignedBuf* aligned_buf) {
  188. assert(slice);
  189. assert(buf);
  190. assert(aligned_buf);
  191. assert(file_reader);
  192. RecordTick(statistics, BLOB_DB_BLOB_FILE_BYTES_READ, read_size);
  193. Status s;
  194. IOOptions io_options;
  195. IODebugContext dbg;
  196. s = file_reader->PrepareIOOptions(read_options, io_options, &dbg);
  197. if (!s.ok()) {
  198. return s;
  199. }
  200. if (file_reader->use_direct_io()) {
  201. constexpr char* scratch = nullptr;
  202. s = file_reader->Read(io_options, read_offset, read_size, slice, scratch,
  203. aligned_buf, &dbg);
  204. } else {
  205. buf->reset(new char[read_size]);
  206. constexpr AlignedBuf* aligned_scratch = nullptr;
  207. s = file_reader->Read(io_options, read_offset, read_size, slice, buf->get(),
  208. aligned_scratch, &dbg);
  209. }
  210. if (!s.ok()) {
  211. return s;
  212. }
  213. if (slice->size() != read_size) {
  214. return Status::Corruption("Failed to read data from blob file");
  215. }
  216. return Status::OK();
  217. }
  218. BlobFileReader::BlobFileReader(
  219. std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
  220. CompressionType compression_type, SystemClock* clock,
  221. Statistics* statistics)
  222. : file_reader_(std::move(file_reader)),
  223. file_size_(file_size),
  224. compression_type_(compression_type),
  225. clock_(clock),
  226. statistics_(statistics) {
  227. assert(file_reader_);
  228. }
  229. BlobFileReader::~BlobFileReader() = default;
  230. Status BlobFileReader::GetBlob(
  231. const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
  232. uint64_t value_size, CompressionType compression_type,
  233. FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
  234. std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
  235. assert(result);
  236. const uint64_t key_size = user_key.size();
  237. if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
  238. return Status::Corruption("Invalid blob offset");
  239. }
  240. if (compression_type != compression_type_) {
  241. return Status::Corruption("Compression type mismatch when reading blob");
  242. }
  243. // Note: if verify_checksum is set, we read the entire blob record to be able
  244. // to perform the verification; otherwise, we just read the blob itself. Since
  245. // the offset in BlobIndex actually points to the blob value, we need to make
  246. // an adjustment in the former case.
  247. const uint64_t adjustment =
  248. read_options.verify_checksums
  249. ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
  250. : 0;
  251. assert(offset >= adjustment);
  252. const uint64_t record_offset = offset - adjustment;
  253. const uint64_t record_size = value_size + adjustment;
  254. Slice record_slice;
  255. Buffer buf;
  256. AlignedBuf aligned_buf;
  257. bool prefetched = false;
  258. if (prefetch_buffer) {
  259. Status s;
  260. constexpr bool for_compaction = true;
  261. IOOptions io_options;
  262. IODebugContext dbg;
  263. s = file_reader_->PrepareIOOptions(read_options, io_options, &dbg);
  264. if (!s.ok()) {
  265. return s;
  266. }
  267. prefetched = prefetch_buffer->TryReadFromCache(
  268. io_options, file_reader_.get(), record_offset,
  269. static_cast<size_t>(record_size), &record_slice, &s, for_compaction);
  270. if (!s.ok()) {
  271. return s;
  272. }
  273. }
  274. if (!prefetched) {
  275. TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
  276. PERF_COUNTER_ADD(blob_read_count, 1);
  277. PERF_COUNTER_ADD(blob_read_byte, record_size);
  278. PERF_TIMER_GUARD(blob_read_time);
  279. const Status s =
  280. ReadFromFile(file_reader_.get(), read_options, record_offset,
  281. static_cast<size_t>(record_size), statistics_,
  282. &record_slice, &buf, &aligned_buf);
  283. if (!s.ok()) {
  284. return s;
  285. }
  286. }
  287. TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
  288. &record_slice);
  289. if (read_options.verify_checksums) {
  290. const Status s = VerifyBlob(record_slice, user_key, value_size);
  291. if (!s.ok()) {
  292. return s;
  293. }
  294. }
  295. const Slice value_slice(record_slice.data() + adjustment, value_size);
  296. {
  297. const Status s = UncompressBlobIfNeeded(
  298. value_slice, compression_type, allocator, clock_, statistics_, result);
  299. if (!s.ok()) {
  300. return s;
  301. }
  302. }
  303. if (bytes_read) {
  304. *bytes_read = record_size;
  305. }
  306. return Status::OK();
  307. }
  308. void BlobFileReader::MultiGetBlob(
  309. const ReadOptions& read_options, MemoryAllocator* allocator,
  310. autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>&
  311. blob_reqs,
  312. uint64_t* bytes_read) const {
  313. const size_t num_blobs = blob_reqs.size();
  314. assert(num_blobs > 0);
  315. assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
  316. #ifndef NDEBUG
  317. for (size_t i = 0; i < num_blobs - 1; ++i) {
  318. assert(blob_reqs[i].first->offset <= blob_reqs[i + 1].first->offset);
  319. }
  320. #endif // !NDEBUG
  321. std::vector<FSReadRequest> read_reqs;
  322. autovector<uint64_t> adjustments;
  323. uint64_t total_len = 0;
  324. read_reqs.reserve(num_blobs);
  325. for (size_t i = 0; i < num_blobs; ++i) {
  326. BlobReadRequest* const req = blob_reqs[i].first;
  327. assert(req);
  328. assert(req->user_key);
  329. assert(req->status);
  330. const size_t key_size = req->user_key->size();
  331. const uint64_t offset = req->offset;
  332. const uint64_t value_size = req->len;
  333. if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
  334. *req->status = Status::Corruption("Invalid blob offset");
  335. continue;
  336. }
  337. if (req->compression != compression_type_) {
  338. *req->status =
  339. Status::Corruption("Compression type mismatch when reading a blob");
  340. continue;
  341. }
  342. const uint64_t adjustment =
  343. read_options.verify_checksums
  344. ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
  345. : 0;
  346. assert(req->offset >= adjustment);
  347. adjustments.push_back(adjustment);
  348. FSReadRequest read_req;
  349. read_req.offset = req->offset - adjustment;
  350. read_req.len = req->len + adjustment;
  351. total_len += read_req.len;
  352. read_reqs.emplace_back(std::move(read_req));
  353. }
  354. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
  355. Buffer buf;
  356. AlignedBuf aligned_buf;
  357. Status s;
  358. bool direct_io = file_reader_->use_direct_io();
  359. if (direct_io) {
  360. for (size_t i = 0; i < read_reqs.size(); ++i) {
  361. read_reqs[i].scratch = nullptr;
  362. }
  363. } else {
  364. buf.reset(new char[total_len]);
  365. std::ptrdiff_t pos = 0;
  366. for (size_t i = 0; i < read_reqs.size(); ++i) {
  367. read_reqs[i].scratch = buf.get() + pos;
  368. pos += read_reqs[i].len;
  369. }
  370. }
  371. TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
  372. PERF_COUNTER_ADD(blob_read_count, num_blobs);
  373. PERF_COUNTER_ADD(blob_read_byte, total_len);
  374. IOOptions opts;
  375. IODebugContext dbg;
  376. s = file_reader_->PrepareIOOptions(read_options, opts, &dbg);
  377. if (s.ok()) {
  378. s = file_reader_->MultiRead(opts, read_reqs.data(), read_reqs.size(),
  379. direct_io ? &aligned_buf : nullptr, &dbg);
  380. }
  381. if (!s.ok()) {
  382. for (auto& req : read_reqs) {
  383. req.status.PermitUncheckedError();
  384. }
  385. for (auto& blob_req : blob_reqs) {
  386. BlobReadRequest* const req = blob_req.first;
  387. assert(req);
  388. assert(req->status);
  389. if (!req->status->IsCorruption()) {
  390. // Avoid overwriting corruption status.
  391. *req->status = s;
  392. }
  393. }
  394. return;
  395. }
  396. assert(s.ok());
  397. uint64_t total_bytes = 0;
  398. for (size_t i = 0, j = 0; i < num_blobs; ++i) {
  399. BlobReadRequest* const req = blob_reqs[i].first;
  400. assert(req);
  401. assert(req->user_key);
  402. assert(req->status);
  403. if (!req->status->ok()) {
  404. continue;
  405. }
  406. assert(j < read_reqs.size());
  407. auto& read_req = read_reqs[j++];
  408. const auto& record_slice = read_req.result;
  409. if (read_req.status.ok() && record_slice.size() != read_req.len) {
  410. read_req.status =
  411. IOStatus::Corruption("Failed to read data from blob file");
  412. }
  413. *req->status = read_req.status;
  414. if (!req->status->ok()) {
  415. continue;
  416. }
  417. // Verify checksums if enabled
  418. if (read_options.verify_checksums) {
  419. *req->status = VerifyBlob(record_slice, *req->user_key, req->len);
  420. if (!req->status->ok()) {
  421. continue;
  422. }
  423. }
  424. // Uncompress blob if needed
  425. Slice value_slice(record_slice.data() + adjustments[i], req->len);
  426. *req->status =
  427. UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
  428. clock_, statistics_, &blob_reqs[i].second);
  429. if (req->status->ok()) {
  430. total_bytes += record_slice.size();
  431. }
  432. }
  433. if (bytes_read) {
  434. *bytes_read = total_bytes;
  435. }
  436. }
  437. Status BlobFileReader::VerifyBlob(const Slice& record_slice,
  438. const Slice& user_key, uint64_t value_size) {
  439. PERF_TIMER_GUARD(blob_checksum_time);
  440. BlobLogRecord record;
  441. const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
  442. {
  443. const Status s = record.DecodeHeaderFrom(header_slice);
  444. if (!s.ok()) {
  445. return s;
  446. }
  447. }
  448. if (record.key_size != user_key.size()) {
  449. return Status::Corruption("Key size mismatch when reading blob");
  450. }
  451. if (record.value_size != value_size) {
  452. return Status::Corruption("Value size mismatch when reading blob");
  453. }
  454. record.key =
  455. Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
  456. if (record.key != user_key) {
  457. return Status::Corruption("Key mismatch when reading blob");
  458. }
  459. record.value = Slice(record.key.data() + record.key_size, value_size);
  460. {
  461. TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
  462. &record);
  463. const Status s = record.CheckBlobCRC();
  464. if (!s.ok()) {
  465. return s;
  466. }
  467. }
  468. return Status::OK();
  469. }
  470. Status BlobFileReader::UncompressBlobIfNeeded(
  471. const Slice& value_slice, CompressionType compression_type,
  472. MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
  473. std::unique_ptr<BlobContents>* result) {
  474. assert(result);
  475. if (compression_type == kNoCompression) {
  476. BlobContentsCreator::Create(result, nullptr, value_slice, kNoCompression,
  477. allocator);
  478. return Status::OK();
  479. }
  480. UncompressionContext context(compression_type);
  481. UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
  482. compression_type);
  483. size_t uncompressed_size = 0;
  484. constexpr uint32_t compression_format_version = 2;
  485. CacheAllocationPtr output;
  486. {
  487. PERF_TIMER_GUARD(blob_decompress_time);
  488. StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
  489. output = OLD_UncompressData(info, value_slice.data(), value_slice.size(),
  490. &uncompressed_size, compression_format_version,
  491. allocator);
  492. }
  493. TEST_SYNC_POINT_CALLBACK(
  494. "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
  495. if (!output) {
  496. return Status::Corruption("Unable to uncompress blob");
  497. }
  498. result->reset(new BlobContents(std::move(output), uncompressed_size));
  499. return Status::OK();
  500. }
  501. } // namespace ROCKSDB_NAMESPACE