file_reader_writer_test.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include <algorithm>
  7. #include <vector>
  8. #include "env/composite_env_wrapper.h"
  9. #include "file/random_access_file_reader.h"
  10. #include "file/readahead_raf.h"
  11. #include "file/sequence_file_reader.h"
  12. #include "file/writable_file_writer.h"
  13. #include "test_util/testharness.h"
  14. #include "test_util/testutil.h"
  15. #include "util/random.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class WritableFileWriterTest : public testing::Test {};
  18. const uint32_t kMb = 1 << 20;
  19. TEST_F(WritableFileWriterTest, RangeSync) {
  20. class FakeWF : public WritableFile {
  21. public:
  22. explicit FakeWF() : size_(0), last_synced_(0) {}
  23. ~FakeWF() override {}
  24. Status Append(const Slice& data) override {
  25. size_ += data.size();
  26. return Status::OK();
  27. }
  28. Status Truncate(uint64_t /*size*/) override { return Status::OK(); }
  29. Status Close() override {
  30. EXPECT_GE(size_, last_synced_ + kMb);
  31. EXPECT_LT(size_, last_synced_ + 2 * kMb);
  32. // Make sure random writes generated enough writes.
  33. EXPECT_GT(size_, 10 * kMb);
  34. return Status::OK();
  35. }
  36. Status Flush() override { return Status::OK(); }
  37. Status Sync() override { return Status::OK(); }
  38. Status Fsync() override { return Status::OK(); }
  39. void SetIOPriority(Env::IOPriority /*pri*/) override {}
  40. uint64_t GetFileSize() override { return size_; }
  41. void GetPreallocationStatus(size_t* /*block_size*/,
  42. size_t* /*last_allocated_block*/) override {}
  43. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  44. return 0;
  45. }
  46. Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  47. return Status::OK();
  48. }
  49. protected:
  50. Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
  51. return Status::OK();
  52. }
  53. Status RangeSync(uint64_t offset, uint64_t nbytes) override {
  54. EXPECT_EQ(offset % 4096, 0u);
  55. EXPECT_EQ(nbytes % 4096, 0u);
  56. EXPECT_EQ(offset, last_synced_);
  57. last_synced_ = offset + nbytes;
  58. EXPECT_GE(size_, last_synced_ + kMb);
  59. if (size_ > 2 * kMb) {
  60. EXPECT_LT(size_, last_synced_ + 2 * kMb);
  61. }
  62. return Status::OK();
  63. }
  64. uint64_t size_;
  65. uint64_t last_synced_;
  66. };
  67. EnvOptions env_options;
  68. env_options.bytes_per_sync = kMb;
  69. std::unique_ptr<FakeWF> wf(new FakeWF);
  70. std::unique_ptr<WritableFileWriter> writer(
  71. new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
  72. "" /* don't care */, env_options));
  73. Random r(301);
  74. std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
  75. for (int i = 0; i < 1000; i++) {
  76. int skew_limit = (i < 700) ? 10 : 15;
  77. uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
  78. writer->Append(Slice(large_buf.get(), num));
  79. // Flush in a chance of 1/10.
  80. if (r.Uniform(10) == 0) {
  81. writer->Flush();
  82. }
  83. }
  84. writer->Close();
  85. }
  86. TEST_F(WritableFileWriterTest, IncrementalBuffer) {
  87. class FakeWF : public WritableFile {
  88. public:
  89. explicit FakeWF(std::string* _file_data, bool _use_direct_io,
  90. bool _no_flush)
  91. : file_data_(_file_data),
  92. use_direct_io_(_use_direct_io),
  93. no_flush_(_no_flush) {}
  94. ~FakeWF() override {}
  95. Status Append(const Slice& data) override {
  96. file_data_->append(data.data(), data.size());
  97. size_ += data.size();
  98. return Status::OK();
  99. }
  100. Status PositionedAppend(const Slice& data, uint64_t pos) override {
  101. EXPECT_TRUE(pos % 512 == 0);
  102. EXPECT_TRUE(data.size() % 512 == 0);
  103. file_data_->resize(pos);
  104. file_data_->append(data.data(), data.size());
  105. size_ += data.size();
  106. return Status::OK();
  107. }
  108. Status Truncate(uint64_t size) override {
  109. file_data_->resize(size);
  110. return Status::OK();
  111. }
  112. Status Close() override { return Status::OK(); }
  113. Status Flush() override { return Status::OK(); }
  114. Status Sync() override { return Status::OK(); }
  115. Status Fsync() override { return Status::OK(); }
  116. void SetIOPriority(Env::IOPriority /*pri*/) override {}
  117. uint64_t GetFileSize() override { return size_; }
  118. void GetPreallocationStatus(size_t* /*block_size*/,
  119. size_t* /*last_allocated_block*/) override {}
  120. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  121. return 0;
  122. }
  123. Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  124. return Status::OK();
  125. }
  126. bool use_direct_io() const override { return use_direct_io_; }
  127. std::string* file_data_;
  128. bool use_direct_io_;
  129. bool no_flush_;
  130. size_t size_ = 0;
  131. };
  132. Random r(301);
  133. const int kNumAttempts = 50;
  134. for (int attempt = 0; attempt < kNumAttempts; attempt++) {
  135. bool no_flush = (attempt % 3 == 0);
  136. EnvOptions env_options;
  137. env_options.writable_file_max_buffer_size =
  138. (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
  139. std::string actual;
  140. std::unique_ptr<FakeWF> wf(new FakeWF(&actual,
  141. #ifndef ROCKSDB_LITE
  142. attempt % 2 == 1,
  143. #else
  144. false,
  145. #endif
  146. no_flush));
  147. std::unique_ptr<WritableFileWriter> writer(
  148. new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
  149. "" /* don't care */, env_options));
  150. std::string target;
  151. for (int i = 0; i < 20; i++) {
  152. uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
  153. std::string random_string;
  154. test::RandomString(&r, num, &random_string);
  155. writer->Append(Slice(random_string.c_str(), num));
  156. target.append(random_string.c_str(), num);
  157. // In some attempts, flush in a chance of 1/10.
  158. if (!no_flush && r.Uniform(10) == 0) {
  159. writer->Flush();
  160. }
  161. }
  162. writer->Flush();
  163. writer->Close();
  164. ASSERT_EQ(target.size(), actual.size());
  165. ASSERT_EQ(target, actual);
  166. }
  167. }
  168. #ifndef ROCKSDB_LITE
  169. TEST_F(WritableFileWriterTest, AppendStatusReturn) {
  170. class FakeWF : public WritableFile {
  171. public:
  172. explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
  173. bool use_direct_io() const override { return use_direct_io_; }
  174. Status Append(const Slice& /*data*/) override {
  175. if (io_error_) {
  176. return Status::IOError("Fake IO error");
  177. }
  178. return Status::OK();
  179. }
  180. Status PositionedAppend(const Slice& /*data*/, uint64_t) override {
  181. if (io_error_) {
  182. return Status::IOError("Fake IO error");
  183. }
  184. return Status::OK();
  185. }
  186. Status Close() override { return Status::OK(); }
  187. Status Flush() override { return Status::OK(); }
  188. Status Sync() override { return Status::OK(); }
  189. void Setuse_direct_io(bool val) { use_direct_io_ = val; }
  190. void SetIOError(bool val) { io_error_ = val; }
  191. protected:
  192. bool use_direct_io_;
  193. bool io_error_;
  194. };
  195. std::unique_ptr<FakeWF> wf(new FakeWF());
  196. wf->Setuse_direct_io(true);
  197. std::unique_ptr<WritableFileWriter> writer(
  198. new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
  199. "" /* don't care */, EnvOptions()));
  200. ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
  201. // Next call to WritableFile::Append() should fail
  202. LegacyWritableFileWrapper* file =
  203. static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
  204. static_cast<FakeWF*>(file->target())->SetIOError(true);
  205. ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
  206. }
  207. #endif
  208. class ReadaheadRandomAccessFileTest
  209. : public testing::Test,
  210. public testing::WithParamInterface<size_t> {
  211. public:
  212. static std::vector<size_t> GetReadaheadSizeList() {
  213. return {1lu << 12, 1lu << 16};
  214. }
  215. void SetUp() override {
  216. readahead_size_ = GetParam();
  217. scratch_.reset(new char[2 * readahead_size_]);
  218. ResetSourceStr();
  219. }
  220. ReadaheadRandomAccessFileTest() : control_contents_() {}
  221. std::string Read(uint64_t offset, size_t n) {
  222. Slice result;
  223. test_read_holder_->Read(offset, n, &result, scratch_.get());
  224. return std::string(result.data(), result.size());
  225. }
  226. void ResetSourceStr(const std::string& str = "") {
  227. auto write_holder =
  228. std::unique_ptr<WritableFileWriter>(test::GetWritableFileWriter(
  229. new test::StringSink(&control_contents_), "" /* don't care */));
  230. write_holder->Append(Slice(str));
  231. write_holder->Flush();
  232. auto read_holder = std::unique_ptr<RandomAccessFile>(
  233. new test::StringSource(control_contents_));
  234. test_read_holder_ =
  235. NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
  236. }
  237. size_t GetReadaheadSize() const { return readahead_size_; }
  238. private:
  239. size_t readahead_size_;
  240. Slice control_contents_;
  241. std::unique_ptr<RandomAccessFile> test_read_holder_;
  242. std::unique_ptr<char[]> scratch_;
  243. };
  244. TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
  245. ASSERT_EQ("", Read(0, 1));
  246. ASSERT_EQ("", Read(0, 0));
  247. ASSERT_EQ("", Read(13, 13));
  248. }
  249. TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
  250. std::string str = "abcdefghijklmnopqrs";
  251. ResetSourceStr(str);
  252. ASSERT_EQ(str.substr(3, 4), Read(3, 4));
  253. ASSERT_EQ(str.substr(0, 3), Read(0, 3));
  254. ASSERT_EQ(str, Read(0, str.size()));
  255. ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
  256. Read(7, 30));
  257. ASSERT_EQ("", Read(100, 100));
  258. }
  259. TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
  260. Random rng(42);
  261. for (int k = 0; k < 100; ++k) {
  262. size_t strLen = k * GetReadaheadSize() +
  263. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  264. std::string str =
  265. test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
  266. ResetSourceStr(str);
  267. for (int test = 1; test <= 100; ++test) {
  268. size_t offset = rng.Uniform(static_cast<int>(strLen));
  269. size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
  270. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
  271. Read(offset, n));
  272. }
  273. }
  274. }
  275. TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
  276. Random rng(7);
  277. size_t strLen = 4 * GetReadaheadSize() +
  278. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  279. std::string str =
  280. test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
  281. ResetSourceStr(str);
  282. for (int test = 1; test <= 100; ++test) {
  283. size_t offset = rng.Uniform(static_cast<int>(strLen));
  284. size_t n =
  285. GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
  286. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
  287. Read(offset, n));
  288. }
  289. }
  290. INSTANTIATE_TEST_CASE_P(
  291. EmptySourceStr, ReadaheadRandomAccessFileTest,
  292. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  293. INSTANTIATE_TEST_CASE_P(
  294. SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
  295. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  296. INSTANTIATE_TEST_CASE_P(
  297. SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
  298. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  299. INSTANTIATE_TEST_CASE_P(
  300. ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
  301. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  302. class ReadaheadSequentialFileTest : public testing::Test,
  303. public testing::WithParamInterface<size_t> {
  304. public:
  305. static std::vector<size_t> GetReadaheadSizeList() {
  306. return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
  307. }
  308. void SetUp() override {
  309. readahead_size_ = GetParam();
  310. scratch_.reset(new char[2 * readahead_size_]);
  311. ResetSourceStr();
  312. }
  313. ReadaheadSequentialFileTest() {}
  314. std::string Read(size_t n) {
  315. Slice result;
  316. test_read_holder_->Read(n, &result, scratch_.get());
  317. return std::string(result.data(), result.size());
  318. }
  319. void Skip(size_t n) { test_read_holder_->Skip(n); }
  320. void ResetSourceStr(const std::string& str = "") {
  321. auto read_holder = std::unique_ptr<SequentialFile>(
  322. new test::SeqStringSource(str, &seq_read_count_));
  323. test_read_holder_.reset(new SequentialFileReader(
  324. NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_));
  325. }
  326. size_t GetReadaheadSize() const { return readahead_size_; }
  327. private:
  328. size_t readahead_size_;
  329. std::unique_ptr<SequentialFileReader> test_read_holder_;
  330. std::unique_ptr<char[]> scratch_;
  331. std::atomic<int> seq_read_count_;
  332. };
  333. TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
  334. ASSERT_EQ("", Read(0));
  335. ASSERT_EQ("", Read(1));
  336. ASSERT_EQ("", Read(13));
  337. }
  338. TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
  339. std::string str = "abcdefghijklmnopqrs";
  340. ResetSourceStr(str);
  341. ASSERT_EQ(str.substr(0, 3), Read(3));
  342. ASSERT_EQ(str.substr(3, 1), Read(1));
  343. ASSERT_EQ(str.substr(4), Read(str.size()));
  344. ASSERT_EQ("", Read(100));
  345. }
  346. TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
  347. Random rng(42);
  348. for (int s = 0; s < 1; ++s) {
  349. for (int k = 0; k < 100; ++k) {
  350. size_t strLen = k * GetReadaheadSize() +
  351. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  352. std::string str =
  353. test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
  354. ResetSourceStr(str);
  355. size_t offset = 0;
  356. for (int test = 1; test <= 100; ++test) {
  357. size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
  358. if (s && test % 2) {
  359. Skip(n);
  360. } else {
  361. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
  362. }
  363. offset = std::min(offset + n, strLen);
  364. }
  365. }
  366. }
  367. }
  368. TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
  369. Random rng(42);
  370. for (int s = 0; s < 1; ++s) {
  371. for (int k = 0; k < 100; ++k) {
  372. size_t strLen = k * GetReadaheadSize() +
  373. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  374. std::string str =
  375. test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
  376. ResetSourceStr(str);
  377. size_t offset = 0;
  378. for (int test = 1; test <= 100; ++test) {
  379. size_t n = GetReadaheadSize() +
  380. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  381. if (s && test % 2) {
  382. Skip(n);
  383. } else {
  384. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
  385. }
  386. offset = std::min(offset + n, strLen);
  387. }
  388. }
  389. }
  390. }
  391. INSTANTIATE_TEST_CASE_P(
  392. EmptySourceStr, ReadaheadSequentialFileTest,
  393. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  394. INSTANTIATE_TEST_CASE_P(
  395. SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
  396. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  397. INSTANTIATE_TEST_CASE_P(
  398. SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
  399. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  400. INSTANTIATE_TEST_CASE_P(
  401. ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
  402. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  403. } // namespace ROCKSDB_NAMESPACE
  404. int main(int argc, char** argv) {
  405. ::testing::InitGoogleTest(&argc, argv);
  406. return RUN_ALL_TESTS();
  407. }