blob_file_reader_test.cc 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_file_reader.h"
  6. #include <cassert>
  7. #include <string>
  8. #include "db/blob/blob_contents.h"
  9. #include "db/blob/blob_log_format.h"
  10. #include "db/blob/blob_log_writer.h"
  11. #include "env/mock_env.h"
  12. #include "file/filename.h"
  13. #include "file/read_write_util.h"
  14. #include "file/writable_file_writer.h"
  15. #include "options/cf_options.h"
  16. #include "rocksdb/env.h"
  17. #include "rocksdb/file_system.h"
  18. #include "rocksdb/options.h"
  19. #include "test_util/sync_point.h"
  20. #include "test_util/testharness.h"
  21. #include "util/compression.h"
  22. #include "utilities/fault_injection_env.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. namespace {
  25. // Creates a test blob file with `num` blobs in it.
  26. void WriteBlobFile(const ImmutableOptions& immutable_options,
  27. uint32_t column_family_id, bool has_ttl,
  28. const ExpirationRange& expiration_range_header,
  29. const ExpirationRange& expiration_range_footer,
  30. uint64_t blob_file_number, const std::vector<Slice>& keys,
  31. const std::vector<Slice>& blobs, CompressionType compression,
  32. std::vector<uint64_t>& blob_offsets,
  33. std::vector<uint64_t>& blob_sizes) {
  34. assert(!immutable_options.cf_paths.empty());
  35. size_t num = keys.size();
  36. assert(num == blobs.size());
  37. assert(num == blob_offsets.size());
  38. assert(num == blob_sizes.size());
  39. const std::string blob_file_path =
  40. BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
  41. std::unique_ptr<FSWritableFile> file;
  42. ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
  43. FileOptions()));
  44. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  45. std::move(file), blob_file_path, FileOptions(), immutable_options.clock));
  46. constexpr Statistics* statistics = nullptr;
  47. constexpr bool use_fsync = false;
  48. constexpr bool do_flush = false;
  49. BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock,
  50. statistics, blob_file_number, use_fsync,
  51. do_flush);
  52. BlobLogHeader header(column_family_id, compression, has_ttl,
  53. expiration_range_header);
  54. ASSERT_OK(blob_log_writer.WriteHeader(WriteOptions(), header));
  55. std::vector<std::string> compressed_blobs(num);
  56. std::vector<Slice> blobs_to_write(num);
  57. if (kNoCompression == compression) {
  58. for (size_t i = 0; i < num; ++i) {
  59. blobs_to_write[i] = blobs[i];
  60. blob_sizes[i] = blobs[i].size();
  61. }
  62. } else {
  63. CompressionOptions opts;
  64. CompressionContext context(compression, opts);
  65. CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
  66. compression);
  67. constexpr uint32_t compression_format_version = 2;
  68. for (size_t i = 0; i < num; ++i) {
  69. ASSERT_TRUE(OLD_CompressData(blobs[i], info, compression_format_version,
  70. &compressed_blobs[i]));
  71. blobs_to_write[i] = compressed_blobs[i];
  72. blob_sizes[i] = compressed_blobs[i].size();
  73. }
  74. }
  75. for (size_t i = 0; i < num; ++i) {
  76. uint64_t key_offset = 0;
  77. ASSERT_OK(blob_log_writer.AddRecord(WriteOptions(), keys[i],
  78. blobs_to_write[i], &key_offset,
  79. &blob_offsets[i]));
  80. }
  81. BlobLogFooter footer;
  82. footer.blob_count = num;
  83. footer.expiration_range = expiration_range_footer;
  84. std::string checksum_method;
  85. std::string checksum_value;
  86. ASSERT_OK(blob_log_writer.AppendFooter(WriteOptions(), footer,
  87. &checksum_method, &checksum_value));
  88. }
  89. // Creates a test blob file with a single blob in it. Note: this method
  90. // makes it possible to test various corner cases by allowing the caller
  91. // to specify the contents of various blob file header/footer fields.
  92. void WriteBlobFile(const ImmutableOptions& immutable_options,
  93. uint32_t column_family_id, bool has_ttl,
  94. const ExpirationRange& expiration_range_header,
  95. const ExpirationRange& expiration_range_footer,
  96. uint64_t blob_file_number, const Slice& key,
  97. const Slice& blob, CompressionType compression,
  98. uint64_t* blob_offset, uint64_t* blob_size) {
  99. std::vector<Slice> keys{key};
  100. std::vector<Slice> blobs{blob};
  101. std::vector<uint64_t> blob_offsets{0};
  102. std::vector<uint64_t> blob_sizes{0};
  103. WriteBlobFile(immutable_options, column_family_id, has_ttl,
  104. expiration_range_header, expiration_range_footer,
  105. blob_file_number, keys, blobs, compression, blob_offsets,
  106. blob_sizes);
  107. if (blob_offset) {
  108. *blob_offset = blob_offsets[0];
  109. }
  110. if (blob_size) {
  111. *blob_size = blob_sizes[0];
  112. }
  113. }
  114. } // anonymous namespace
  115. class BlobFileReaderTest : public testing::Test {
  116. protected:
  117. BlobFileReaderTest() { mock_env_.reset(MockEnv::Create(Env::Default())); }
  118. std::unique_ptr<Env> mock_env_;
  119. };
  120. TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
  121. Options options;
  122. options.env = mock_env_.get();
  123. options.cf_paths.emplace_back(
  124. test::PerThreadDBPath(mock_env_.get(),
  125. "BlobFileReaderTest_CreateReaderAndGetBlob"),
  126. 0);
  127. options.enable_blob_files = true;
  128. ImmutableOptions immutable_options(options);
  129. constexpr uint32_t column_family_id = 1;
  130. constexpr bool has_ttl = false;
  131. constexpr ExpirationRange expiration_range;
  132. constexpr uint64_t blob_file_number = 1;
  133. constexpr size_t num_blobs = 3;
  134. const std::vector<std::string> key_strs = {"key1", "key2", "key3"};
  135. const std::vector<std::string> blob_strs = {"blob1", "blob2", "blob3"};
  136. const std::vector<Slice> keys = {key_strs[0], key_strs[1], key_strs[2]};
  137. const std::vector<Slice> blobs = {blob_strs[0], blob_strs[1], blob_strs[2]};
  138. std::vector<uint64_t> blob_offsets(keys.size());
  139. std::vector<uint64_t> blob_sizes(keys.size());
  140. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  141. expiration_range, blob_file_number, keys, blobs, kNoCompression,
  142. blob_offsets, blob_sizes);
  143. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  144. std::unique_ptr<BlobFileReader> reader;
  145. ReadOptions read_options;
  146. ASSERT_OK(BlobFileReader::Create(
  147. immutable_options, read_options, FileOptions(), column_family_id,
  148. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader));
  149. // Make sure the blob can be retrieved with and without checksum verification
  150. read_options.verify_checksums = false;
  151. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  152. constexpr MemoryAllocator* allocator = nullptr;
  153. {
  154. std::unique_ptr<BlobContents> value;
  155. uint64_t bytes_read = 0;
  156. ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
  157. blob_sizes[0], kNoCompression, prefetch_buffer,
  158. allocator, &value, &bytes_read));
  159. ASSERT_NE(value, nullptr);
  160. ASSERT_EQ(value->data(), blobs[0]);
  161. ASSERT_EQ(bytes_read, blob_sizes[0]);
  162. // MultiGetBlob
  163. bytes_read = 0;
  164. size_t total_size = 0;
  165. std::array<Status, num_blobs> statuses_buf;
  166. std::array<BlobReadRequest, num_blobs> requests_buf;
  167. autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
  168. blob_reqs;
  169. for (size_t i = 0; i < num_blobs; ++i) {
  170. requests_buf[i] =
  171. BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
  172. kNoCompression, nullptr, &statuses_buf[i]);
  173. blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
  174. }
  175. reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
  176. for (size_t i = 0; i < num_blobs; ++i) {
  177. const auto& result = blob_reqs[i].second;
  178. ASSERT_OK(statuses_buf[i]);
  179. ASSERT_NE(result, nullptr);
  180. ASSERT_EQ(result->data(), blobs[i]);
  181. total_size += blob_sizes[i];
  182. }
  183. ASSERT_EQ(bytes_read, total_size);
  184. }
  185. read_options.verify_checksums = true;
  186. {
  187. std::unique_ptr<BlobContents> value;
  188. uint64_t bytes_read = 0;
  189. ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
  190. blob_sizes[1], kNoCompression, prefetch_buffer,
  191. allocator, &value, &bytes_read));
  192. ASSERT_NE(value, nullptr);
  193. ASSERT_EQ(value->data(), blobs[1]);
  194. const uint64_t key_size = keys[1].size();
  195. ASSERT_EQ(bytes_read,
  196. BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
  197. blob_sizes[1]);
  198. }
  199. // Invalid offset (too close to start of file)
  200. {
  201. std::unique_ptr<BlobContents> value;
  202. uint64_t bytes_read = 0;
  203. ASSERT_TRUE(reader
  204. ->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
  205. blob_sizes[0], kNoCompression, prefetch_buffer,
  206. allocator, &value, &bytes_read)
  207. .IsCorruption());
  208. ASSERT_EQ(value, nullptr);
  209. ASSERT_EQ(bytes_read, 0);
  210. }
  211. // Invalid offset (too close to end of file)
  212. {
  213. std::unique_ptr<BlobContents> value;
  214. uint64_t bytes_read = 0;
  215. ASSERT_TRUE(reader
  216. ->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
  217. blob_sizes[2], kNoCompression, prefetch_buffer,
  218. allocator, &value, &bytes_read)
  219. .IsCorruption());
  220. ASSERT_EQ(value, nullptr);
  221. ASSERT_EQ(bytes_read, 0);
  222. }
  223. // Incorrect compression type
  224. {
  225. std::unique_ptr<BlobContents> value;
  226. uint64_t bytes_read = 0;
  227. ASSERT_TRUE(reader
  228. ->GetBlob(read_options, keys[0], blob_offsets[0],
  229. blob_sizes[0], kZSTD, prefetch_buffer, allocator,
  230. &value, &bytes_read)
  231. .IsCorruption());
  232. ASSERT_EQ(value, nullptr);
  233. ASSERT_EQ(bytes_read, 0);
  234. }
  235. // Incorrect key size
  236. {
  237. constexpr char shorter_key[] = "k";
  238. std::unique_ptr<BlobContents> value;
  239. uint64_t bytes_read = 0;
  240. ASSERT_TRUE(reader
  241. ->GetBlob(read_options, shorter_key,
  242. blob_offsets[0] -
  243. (keys[0].size() - sizeof(shorter_key) + 1),
  244. blob_sizes[0], kNoCompression, prefetch_buffer,
  245. allocator, &value, &bytes_read)
  246. .IsCorruption());
  247. ASSERT_EQ(value, nullptr);
  248. ASSERT_EQ(bytes_read, 0);
  249. // MultiGetBlob
  250. autovector<std::reference_wrapper<const Slice>> key_refs;
  251. for (const auto& key_ref : keys) {
  252. key_refs.emplace_back(std::cref(key_ref));
  253. }
  254. Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1);
  255. key_refs[1] = std::cref(shorter_key_slice);
  256. autovector<uint64_t> offsets{
  257. blob_offsets[0],
  258. blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
  259. blob_offsets[2]};
  260. std::array<Status, num_blobs> statuses_buf;
  261. std::array<BlobReadRequest, num_blobs> requests_buf;
  262. autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
  263. blob_reqs;
  264. for (size_t i = 0; i < num_blobs; ++i) {
  265. requests_buf[i] =
  266. BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
  267. kNoCompression, nullptr, &statuses_buf[i]);
  268. blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
  269. }
  270. reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
  271. for (size_t i = 0; i < num_blobs; ++i) {
  272. if (i == 1) {
  273. ASSERT_TRUE(statuses_buf[i].IsCorruption());
  274. } else {
  275. ASSERT_OK(statuses_buf[i]);
  276. }
  277. }
  278. }
  279. // Incorrect key
  280. {
  281. constexpr char incorrect_key[] = "foo1";
  282. std::unique_ptr<BlobContents> value;
  283. uint64_t bytes_read = 0;
  284. ASSERT_TRUE(reader
  285. ->GetBlob(read_options, incorrect_key, blob_offsets[0],
  286. blob_sizes[0], kNoCompression, prefetch_buffer,
  287. allocator, &value, &bytes_read)
  288. .IsCorruption());
  289. ASSERT_EQ(value, nullptr);
  290. ASSERT_EQ(bytes_read, 0);
  291. // MultiGetBlob
  292. autovector<std::reference_wrapper<const Slice>> key_refs;
  293. for (const auto& key_ref : keys) {
  294. key_refs.emplace_back(std::cref(key_ref));
  295. }
  296. Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
  297. key_refs[2] = std::cref(wrong_key_slice);
  298. std::array<Status, num_blobs> statuses_buf;
  299. std::array<BlobReadRequest, num_blobs> requests_buf;
  300. autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
  301. blob_reqs;
  302. for (size_t i = 0; i < num_blobs; ++i) {
  303. requests_buf[i] =
  304. BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
  305. kNoCompression, nullptr, &statuses_buf[i]);
  306. blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
  307. }
  308. reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
  309. for (size_t i = 0; i < num_blobs; ++i) {
  310. if (i == num_blobs - 1) {
  311. ASSERT_TRUE(statuses_buf[i].IsCorruption());
  312. } else {
  313. ASSERT_OK(statuses_buf[i]);
  314. }
  315. }
  316. }
  317. // Incorrect value size
  318. {
  319. std::unique_ptr<BlobContents> value;
  320. uint64_t bytes_read = 0;
  321. ASSERT_TRUE(reader
  322. ->GetBlob(read_options, keys[1], blob_offsets[1],
  323. blob_sizes[1] + 1, kNoCompression,
  324. prefetch_buffer, allocator, &value, &bytes_read)
  325. .IsCorruption());
  326. ASSERT_EQ(value, nullptr);
  327. ASSERT_EQ(bytes_read, 0);
  328. // MultiGetBlob
  329. autovector<std::reference_wrapper<const Slice>> key_refs;
  330. for (const auto& key_ref : keys) {
  331. key_refs.emplace_back(std::cref(key_ref));
  332. }
  333. std::array<Status, num_blobs> statuses_buf;
  334. std::array<BlobReadRequest, num_blobs> requests_buf;
  335. requests_buf[0] =
  336. BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
  337. kNoCompression, nullptr, statuses_buf.data());
  338. requests_buf[1] =
  339. BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
  340. kNoCompression, nullptr, &statuses_buf[1]);
  341. requests_buf[2] =
  342. BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
  343. kNoCompression, nullptr, &statuses_buf[2]);
  344. autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
  345. blob_reqs;
  346. for (size_t i = 0; i < num_blobs; ++i) {
  347. blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
  348. }
  349. reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
  350. for (size_t i = 0; i < num_blobs; ++i) {
  351. if (i != 1) {
  352. ASSERT_OK(statuses_buf[i]);
  353. } else {
  354. ASSERT_TRUE(statuses_buf[i].IsCorruption());
  355. }
  356. }
  357. }
  358. }
  359. TEST_F(BlobFileReaderTest, Malformed) {
  360. // Write a blob file consisting of nothing but a header, and make sure we
  361. // detect the error when we open it for reading
  362. Options options;
  363. options.env = mock_env_.get();
  364. options.cf_paths.emplace_back(
  365. test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Malformed"),
  366. 0);
  367. options.enable_blob_files = true;
  368. ImmutableOptions immutable_options(options);
  369. constexpr uint32_t column_family_id = 1;
  370. constexpr uint64_t blob_file_number = 1;
  371. {
  372. constexpr bool has_ttl = false;
  373. constexpr ExpirationRange expiration_range;
  374. const std::string blob_file_path =
  375. BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
  376. std::unique_ptr<FSWritableFile> file;
  377. ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
  378. FileOptions()));
  379. std::unique_ptr<WritableFileWriter> file_writer(
  380. new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
  381. immutable_options.clock));
  382. constexpr Statistics* statistics = nullptr;
  383. constexpr bool use_fsync = false;
  384. constexpr bool do_flush = false;
  385. BlobLogWriter blob_log_writer(std::move(file_writer),
  386. immutable_options.clock, statistics,
  387. blob_file_number, use_fsync, do_flush);
  388. BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
  389. expiration_range);
  390. ASSERT_OK(blob_log_writer.WriteHeader(WriteOptions(), header));
  391. }
  392. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  393. std::unique_ptr<BlobFileReader> reader;
  394. const ReadOptions read_options;
  395. ASSERT_TRUE(BlobFileReader::Create(immutable_options, read_options,
  396. FileOptions(), column_family_id,
  397. blob_file_read_hist, blob_file_number,
  398. nullptr /*IOTracer*/, &reader)
  399. .IsCorruption());
  400. }
  401. TEST_F(BlobFileReaderTest, TTL) {
  402. Options options;
  403. options.env = mock_env_.get();
  404. options.cf_paths.emplace_back(
  405. test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_TTL"), 0);
  406. options.enable_blob_files = true;
  407. ImmutableOptions immutable_options(options);
  408. constexpr uint32_t column_family_id = 1;
  409. constexpr bool has_ttl = true;
  410. constexpr ExpirationRange expiration_range;
  411. constexpr uint64_t blob_file_number = 1;
  412. constexpr char key[] = "key";
  413. constexpr char blob[] = "blob";
  414. uint64_t blob_offset = 0;
  415. uint64_t blob_size = 0;
  416. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  417. expiration_range, blob_file_number, key, blob, kNoCompression,
  418. &blob_offset, &blob_size);
  419. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  420. std::unique_ptr<BlobFileReader> reader;
  421. const ReadOptions read_options;
  422. ASSERT_TRUE(BlobFileReader::Create(immutable_options, read_options,
  423. FileOptions(), column_family_id,
  424. blob_file_read_hist, blob_file_number,
  425. nullptr /*IOTracer*/, &reader)
  426. .IsCorruption());
  427. }
  428. TEST_F(BlobFileReaderTest, ExpirationRangeInHeader) {
  429. Options options;
  430. options.env = mock_env_.get();
  431. options.cf_paths.emplace_back(
  432. test::PerThreadDBPath(mock_env_.get(),
  433. "BlobFileReaderTest_ExpirationRangeInHeader"),
  434. 0);
  435. options.enable_blob_files = true;
  436. ImmutableOptions immutable_options(options);
  437. constexpr uint32_t column_family_id = 1;
  438. constexpr bool has_ttl = false;
  439. const ExpirationRange expiration_range_header(
  440. 1, 2); // can be made constexpr when we adopt C++14
  441. constexpr ExpirationRange expiration_range_footer;
  442. constexpr uint64_t blob_file_number = 1;
  443. constexpr char key[] = "key";
  444. constexpr char blob[] = "blob";
  445. uint64_t blob_offset = 0;
  446. uint64_t blob_size = 0;
  447. WriteBlobFile(immutable_options, column_family_id, has_ttl,
  448. expiration_range_header, expiration_range_footer,
  449. blob_file_number, key, blob, kNoCompression, &blob_offset,
  450. &blob_size);
  451. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  452. std::unique_ptr<BlobFileReader> reader;
  453. const ReadOptions read_options;
  454. ASSERT_TRUE(BlobFileReader::Create(immutable_options, read_options,
  455. FileOptions(), column_family_id,
  456. blob_file_read_hist, blob_file_number,
  457. nullptr /*IOTracer*/, &reader)
  458. .IsCorruption());
  459. }
  460. TEST_F(BlobFileReaderTest, ExpirationRangeInFooter) {
  461. Options options;
  462. options.env = mock_env_.get();
  463. options.cf_paths.emplace_back(
  464. test::PerThreadDBPath(mock_env_.get(),
  465. "BlobFileReaderTest_ExpirationRangeInFooter"),
  466. 0);
  467. options.enable_blob_files = true;
  468. ImmutableOptions immutable_options(options);
  469. constexpr uint32_t column_family_id = 1;
  470. constexpr bool has_ttl = false;
  471. constexpr ExpirationRange expiration_range_header;
  472. const ExpirationRange expiration_range_footer(
  473. 1, 2); // can be made constexpr when we adopt C++14
  474. constexpr uint64_t blob_file_number = 1;
  475. constexpr char key[] = "key";
  476. constexpr char blob[] = "blob";
  477. uint64_t blob_offset = 0;
  478. uint64_t blob_size = 0;
  479. WriteBlobFile(immutable_options, column_family_id, has_ttl,
  480. expiration_range_header, expiration_range_footer,
  481. blob_file_number, key, blob, kNoCompression, &blob_offset,
  482. &blob_size);
  483. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  484. std::unique_ptr<BlobFileReader> reader;
  485. const ReadOptions read_options;
  486. ASSERT_TRUE(BlobFileReader::Create(immutable_options, read_options,
  487. FileOptions(), column_family_id,
  488. blob_file_read_hist, blob_file_number,
  489. nullptr /*IOTracer*/, &reader)
  490. .IsCorruption());
  491. }
  492. TEST_F(BlobFileReaderTest, IncorrectColumnFamily) {
  493. Options options;
  494. options.env = mock_env_.get();
  495. options.cf_paths.emplace_back(
  496. test::PerThreadDBPath(mock_env_.get(),
  497. "BlobFileReaderTest_IncorrectColumnFamily"),
  498. 0);
  499. options.enable_blob_files = true;
  500. ImmutableOptions immutable_options(options);
  501. constexpr uint32_t column_family_id = 1;
  502. constexpr bool has_ttl = false;
  503. constexpr ExpirationRange expiration_range;
  504. constexpr uint64_t blob_file_number = 1;
  505. constexpr char key[] = "key";
  506. constexpr char blob[] = "blob";
  507. uint64_t blob_offset = 0;
  508. uint64_t blob_size = 0;
  509. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  510. expiration_range, blob_file_number, key, blob, kNoCompression,
  511. &blob_offset, &blob_size);
  512. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  513. std::unique_ptr<BlobFileReader> reader;
  514. constexpr uint32_t incorrect_column_family_id = 2;
  515. const ReadOptions read_options;
  516. ASSERT_TRUE(BlobFileReader::Create(immutable_options, read_options,
  517. FileOptions(), incorrect_column_family_id,
  518. blob_file_read_hist, blob_file_number,
  519. nullptr /*IOTracer*/, &reader)
  520. .IsCorruption());
  521. }
  522. TEST_F(BlobFileReaderTest, BlobCRCError) {
  523. Options options;
  524. options.env = mock_env_.get();
  525. options.cf_paths.emplace_back(
  526. test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_BlobCRCError"),
  527. 0);
  528. options.enable_blob_files = true;
  529. ImmutableOptions immutable_options(options);
  530. constexpr uint32_t column_family_id = 1;
  531. constexpr bool has_ttl = false;
  532. constexpr ExpirationRange expiration_range;
  533. constexpr uint64_t blob_file_number = 1;
  534. constexpr char key[] = "key";
  535. constexpr char blob[] = "blob";
  536. uint64_t blob_offset = 0;
  537. uint64_t blob_size = 0;
  538. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  539. expiration_range, blob_file_number, key, blob, kNoCompression,
  540. &blob_offset, &blob_size);
  541. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  542. std::unique_ptr<BlobFileReader> reader;
  543. const ReadOptions read_options;
  544. ASSERT_OK(BlobFileReader::Create(
  545. immutable_options, read_options, FileOptions(), column_family_id,
  546. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader));
  547. SyncPoint::GetInstance()->SetCallBack(
  548. "BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg) {
  549. BlobLogRecord* const record = static_cast<BlobLogRecord*>(arg);
  550. assert(record);
  551. record->blob_crc = 0xfaceb00c;
  552. });
  553. SyncPoint::GetInstance()->EnableProcessing();
  554. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  555. constexpr MemoryAllocator* allocator = nullptr;
  556. std::unique_ptr<BlobContents> value;
  557. uint64_t bytes_read = 0;
  558. ASSERT_TRUE(reader
  559. ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
  560. kNoCompression, prefetch_buffer, allocator, &value,
  561. &bytes_read)
  562. .IsCorruption());
  563. ASSERT_EQ(value, nullptr);
  564. ASSERT_EQ(bytes_read, 0);
  565. SyncPoint::GetInstance()->DisableProcessing();
  566. SyncPoint::GetInstance()->ClearAllCallBacks();
  567. }
  568. TEST_F(BlobFileReaderTest, Compression) {
  569. if (!Snappy_Supported()) {
  570. return;
  571. }
  572. Options options;
  573. options.env = mock_env_.get();
  574. options.cf_paths.emplace_back(
  575. test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Compression"),
  576. 0);
  577. options.enable_blob_files = true;
  578. ImmutableOptions immutable_options(options);
  579. constexpr uint32_t column_family_id = 1;
  580. constexpr bool has_ttl = false;
  581. constexpr ExpirationRange expiration_range;
  582. constexpr uint64_t blob_file_number = 1;
  583. constexpr char key[] = "key";
  584. constexpr char blob[] = "blob";
  585. uint64_t blob_offset = 0;
  586. uint64_t blob_size = 0;
  587. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  588. expiration_range, blob_file_number, key, blob,
  589. kSnappyCompression, &blob_offset, &blob_size);
  590. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  591. std::unique_ptr<BlobFileReader> reader;
  592. ReadOptions read_options;
  593. ASSERT_OK(BlobFileReader::Create(
  594. immutable_options, read_options, FileOptions(), column_family_id,
  595. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader));
  596. // Make sure the blob can be retrieved with and without checksum verification
  597. read_options.verify_checksums = false;
  598. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  599. constexpr MemoryAllocator* allocator = nullptr;
  600. {
  601. std::unique_ptr<BlobContents> value;
  602. uint64_t bytes_read = 0;
  603. ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
  604. kSnappyCompression, prefetch_buffer, allocator,
  605. &value, &bytes_read));
  606. ASSERT_NE(value, nullptr);
  607. ASSERT_EQ(value->data(), blob);
  608. ASSERT_EQ(bytes_read, blob_size);
  609. }
  610. read_options.verify_checksums = true;
  611. {
  612. std::unique_ptr<BlobContents> value;
  613. uint64_t bytes_read = 0;
  614. ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
  615. kSnappyCompression, prefetch_buffer, allocator,
  616. &value, &bytes_read));
  617. ASSERT_NE(value, nullptr);
  618. ASSERT_EQ(value->data(), blob);
  619. constexpr uint64_t key_size = sizeof(key) - 1;
  620. ASSERT_EQ(bytes_read,
  621. BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
  622. blob_size);
  623. }
  624. }
  625. TEST_F(BlobFileReaderTest, UncompressionError) {
  626. if (!Snappy_Supported()) {
  627. return;
  628. }
  629. Options options;
  630. options.env = mock_env_.get();
  631. options.cf_paths.emplace_back(
  632. test::PerThreadDBPath(mock_env_.get(),
  633. "BlobFileReaderTest_UncompressionError"),
  634. 0);
  635. options.enable_blob_files = true;
  636. ImmutableOptions immutable_options(options);
  637. constexpr uint32_t column_family_id = 1;
  638. constexpr bool has_ttl = false;
  639. constexpr ExpirationRange expiration_range;
  640. constexpr uint64_t blob_file_number = 1;
  641. constexpr char key[] = "key";
  642. constexpr char blob[] = "blob";
  643. uint64_t blob_offset = 0;
  644. uint64_t blob_size = 0;
  645. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  646. expiration_range, blob_file_number, key, blob,
  647. kSnappyCompression, &blob_offset, &blob_size);
  648. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  649. std::unique_ptr<BlobFileReader> reader;
  650. const ReadOptions read_options;
  651. ASSERT_OK(BlobFileReader::Create(
  652. immutable_options, read_options, FileOptions(), column_family_id,
  653. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader));
  654. SyncPoint::GetInstance()->SetCallBack(
  655. "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg) {
  656. CacheAllocationPtr* const output =
  657. static_cast<CacheAllocationPtr*>(arg);
  658. assert(output);
  659. output->reset();
  660. });
  661. SyncPoint::GetInstance()->EnableProcessing();
  662. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  663. constexpr MemoryAllocator* allocator = nullptr;
  664. std::unique_ptr<BlobContents> value;
  665. uint64_t bytes_read = 0;
  666. ASSERT_TRUE(reader
  667. ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
  668. kSnappyCompression, prefetch_buffer, allocator,
  669. &value, &bytes_read)
  670. .IsCorruption());
  671. ASSERT_EQ(value, nullptr);
  672. ASSERT_EQ(bytes_read, 0);
  673. SyncPoint::GetInstance()->DisableProcessing();
  674. SyncPoint::GetInstance()->ClearAllCallBacks();
  675. }
  676. class BlobFileReaderIOErrorTest
  677. : public testing::Test,
  678. public testing::WithParamInterface<std::string> {
  679. protected:
  680. BlobFileReaderIOErrorTest() : sync_point_(GetParam()) {
  681. mock_env_.reset(MockEnv::Create(Env::Default()));
  682. fault_injection_env_.reset(new FaultInjectionTestEnv(mock_env_.get()));
  683. }
  684. std::unique_ptr<Env> mock_env_;
  685. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
  686. std::string sync_point_;
  687. };
  688. INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderIOErrorTest,
  689. ::testing::ValuesIn(std::vector<std::string>{
  690. "BlobFileReader::OpenFile:GetFileSize",
  691. "BlobFileReader::OpenFile:NewRandomAccessFile",
  692. "BlobFileReader::ReadHeader:ReadFromFile",
  693. "BlobFileReader::ReadFooter:ReadFromFile",
  694. "BlobFileReader::GetBlob:ReadFromFile"}));
  695. TEST_P(BlobFileReaderIOErrorTest, IOError) {
  696. // Simulates an I/O error during the specified step
  697. Options options;
  698. options.env = fault_injection_env_.get();
  699. options.cf_paths.emplace_back(
  700. test::PerThreadDBPath(fault_injection_env_.get(),
  701. "BlobFileReaderIOErrorTest_IOError"),
  702. 0);
  703. options.enable_blob_files = true;
  704. ImmutableOptions immutable_options(options);
  705. constexpr uint32_t column_family_id = 1;
  706. constexpr bool has_ttl = false;
  707. constexpr ExpirationRange expiration_range;
  708. constexpr uint64_t blob_file_number = 1;
  709. constexpr char key[] = "key";
  710. constexpr char blob[] = "blob";
  711. uint64_t blob_offset = 0;
  712. uint64_t blob_size = 0;
  713. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  714. expiration_range, blob_file_number, key, blob, kNoCompression,
  715. &blob_offset, &blob_size);
  716. SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
  717. fault_injection_env_->SetFilesystemActive(false,
  718. Status::IOError(sync_point_));
  719. });
  720. SyncPoint::GetInstance()->EnableProcessing();
  721. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  722. std::unique_ptr<BlobFileReader> reader;
  723. const ReadOptions read_options;
  724. const Status s = BlobFileReader::Create(
  725. immutable_options, read_options, FileOptions(), column_family_id,
  726. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader);
  727. const bool fail_during_create =
  728. (sync_point_ != "BlobFileReader::GetBlob:ReadFromFile");
  729. if (fail_during_create) {
  730. ASSERT_TRUE(s.IsIOError());
  731. } else {
  732. ASSERT_OK(s);
  733. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  734. constexpr MemoryAllocator* allocator = nullptr;
  735. std::unique_ptr<BlobContents> value;
  736. uint64_t bytes_read = 0;
  737. ASSERT_TRUE(reader
  738. ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
  739. kNoCompression, prefetch_buffer, allocator,
  740. &value, &bytes_read)
  741. .IsIOError());
  742. ASSERT_EQ(value, nullptr);
  743. ASSERT_EQ(bytes_read, 0);
  744. }
  745. SyncPoint::GetInstance()->DisableProcessing();
  746. SyncPoint::GetInstance()->ClearAllCallBacks();
  747. }
  748. class BlobFileReaderDecodingErrorTest
  749. : public testing::Test,
  750. public testing::WithParamInterface<std::string> {
  751. protected:
  752. BlobFileReaderDecodingErrorTest() : sync_point_(GetParam()) {
  753. mock_env_.reset(MockEnv::Create(Env::Default()));
  754. }
  755. std::unique_ptr<Env> mock_env_;
  756. std::string sync_point_;
  757. };
  758. INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderDecodingErrorTest,
  759. ::testing::ValuesIn(std::vector<std::string>{
  760. "BlobFileReader::ReadHeader:TamperWithResult",
  761. "BlobFileReader::ReadFooter:TamperWithResult",
  762. "BlobFileReader::GetBlob:TamperWithResult"}));
  763. TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
  764. Options options;
  765. options.env = mock_env_.get();
  766. options.cf_paths.emplace_back(
  767. test::PerThreadDBPath(mock_env_.get(),
  768. "BlobFileReaderDecodingErrorTest_DecodingError"),
  769. 0);
  770. options.enable_blob_files = true;
  771. ImmutableOptions immutable_options(options);
  772. constexpr uint32_t column_family_id = 1;
  773. constexpr bool has_ttl = false;
  774. constexpr ExpirationRange expiration_range;
  775. constexpr uint64_t blob_file_number = 1;
  776. constexpr char key[] = "key";
  777. constexpr char blob[] = "blob";
  778. uint64_t blob_offset = 0;
  779. uint64_t blob_size = 0;
  780. WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
  781. expiration_range, blob_file_number, key, blob, kNoCompression,
  782. &blob_offset, &blob_size);
  783. SyncPoint::GetInstance()->SetCallBack(sync_point_, [](void* arg) {
  784. Slice* const slice = static_cast<Slice*>(arg);
  785. assert(slice);
  786. assert(!slice->empty());
  787. slice->remove_prefix(1);
  788. });
  789. SyncPoint::GetInstance()->EnableProcessing();
  790. constexpr HistogramImpl* blob_file_read_hist = nullptr;
  791. std::unique_ptr<BlobFileReader> reader;
  792. const ReadOptions read_options;
  793. const Status s = BlobFileReader::Create(
  794. immutable_options, read_options, FileOptions(), column_family_id,
  795. blob_file_read_hist, blob_file_number, nullptr /*IOTracer*/, &reader);
  796. const bool fail_during_create =
  797. sync_point_ != "BlobFileReader::GetBlob:TamperWithResult";
  798. if (fail_during_create) {
  799. ASSERT_TRUE(s.IsCorruption());
  800. } else {
  801. ASSERT_OK(s);
  802. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  803. constexpr MemoryAllocator* allocator = nullptr;
  804. std::unique_ptr<BlobContents> value;
  805. uint64_t bytes_read = 0;
  806. ASSERT_TRUE(reader
  807. ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
  808. kNoCompression, prefetch_buffer, allocator,
  809. &value, &bytes_read)
  810. .IsCorruption());
  811. ASSERT_EQ(value, nullptr);
  812. ASSERT_EQ(bytes_read, 0);
  813. }
  814. SyncPoint::GetInstance()->DisableProcessing();
  815. SyncPoint::GetInstance()->ClearAllCallBacks();
  816. }
  817. } // namespace ROCKSDB_NAMESPACE
  818. int main(int argc, char** argv) {
  819. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  820. ::testing::InitGoogleTest(&argc, argv);
  821. return RUN_ALL_TESTS();
  822. }