file_reader_writer_test.cc 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include <algorithm>
  7. #include <vector>
  8. #include "db/db_test_util.h"
  9. #include "env/mock_env.h"
  10. #include "file/line_file_reader.h"
  11. #include "file/random_access_file_reader.h"
  12. #include "file/read_write_util.h"
  13. #include "file/readahead_raf.h"
  14. #include "file/sequence_file_reader.h"
  15. #include "file/writable_file_writer.h"
  16. #include "rocksdb/file_system.h"
  17. #include "test_util/testharness.h"
  18. #include "test_util/testutil.h"
  19. #include "util/crc32c.h"
  20. #include "util/random.h"
  21. #include "utilities/fault_injection_fs.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. class WritableFileWriterTest : public testing::Test {};
  24. constexpr uint32_t kMb = static_cast<uint32_t>(1) << 20;
  25. TEST_F(WritableFileWriterTest, RangeSync) {
  26. class FakeWF : public FSWritableFile {
  27. public:
  28. explicit FakeWF() : size_(0), last_synced_(0) {}
  29. ~FakeWF() override = default;
  30. using FSWritableFile::Append;
  31. IOStatus Append(const Slice& data, const IOOptions& /*options*/,
  32. IODebugContext* /*dbg*/) override {
  33. size_ += data.size();
  34. return IOStatus::OK();
  35. }
  36. IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
  37. IODebugContext* /*dbg*/) override {
  38. return IOStatus::OK();
  39. }
  40. IOStatus Close(const IOOptions& /*options*/,
  41. IODebugContext* /*dbg*/) override {
  42. EXPECT_GE(size_, last_synced_ + kMb);
  43. EXPECT_LT(size_, last_synced_ + 2 * kMb);
  44. // Make sure random writes generated enough writes.
  45. EXPECT_GT(size_, 10 * kMb);
  46. return IOStatus::OK();
  47. }
  48. IOStatus Flush(const IOOptions& /*options*/,
  49. IODebugContext* /*dbg*/) override {
  50. return IOStatus::OK();
  51. }
  52. IOStatus Sync(const IOOptions& /*options*/,
  53. IODebugContext* /*dbg*/) override {
  54. return IOStatus::OK();
  55. }
  56. IOStatus Fsync(const IOOptions& /*options*/,
  57. IODebugContext* /*dbg*/) override {
  58. return IOStatus::OK();
  59. }
  60. void SetIOPriority(Env::IOPriority /*pri*/) override {}
  61. uint64_t GetFileSize(const IOOptions& /*options*/,
  62. IODebugContext* /*dbg*/) override {
  63. return size_;
  64. }
  65. void GetPreallocationStatus(size_t* /*block_size*/,
  66. size_t* /*last_allocated_block*/) override {}
  67. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  68. return 0;
  69. }
  70. IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  71. return IOStatus::OK();
  72. }
  73. protected:
  74. IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
  75. const IOOptions& /*options*/,
  76. IODebugContext* /*dbg*/) override {
  77. return IOStatus::OK();
  78. }
  79. IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
  80. const IOOptions& /*options*/,
  81. IODebugContext* /*dbg*/) override {
  82. EXPECT_EQ(offset % 4096, 0u);
  83. EXPECT_EQ(nbytes % 4096, 0u);
  84. EXPECT_EQ(offset, last_synced_);
  85. last_synced_ = offset + nbytes;
  86. EXPECT_GE(size_, last_synced_ + kMb);
  87. if (size_ > 2 * kMb) {
  88. EXPECT_LT(size_, last_synced_ + 2 * kMb);
  89. }
  90. return IOStatus::OK();
  91. }
  92. uint64_t size_;
  93. uint64_t last_synced_;
  94. };
  95. EnvOptions env_options;
  96. env_options.bytes_per_sync = kMb;
  97. std::unique_ptr<FakeWF> wf(new FakeWF);
  98. std::unique_ptr<WritableFileWriter> writer(
  99. new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
  100. Random r(301);
  101. Status s;
  102. std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
  103. for (int i = 0; i < 1000; i++) {
  104. int skew_limit = (i < 700) ? 10 : 15;
  105. uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
  106. s = writer->Append(IOOptions(), Slice(large_buf.get(), num));
  107. ASSERT_OK(s);
  108. // Flush in a chance of 1/10.
  109. if (r.Uniform(10) == 0) {
  110. s = writer->Flush(IOOptions());
  111. ASSERT_OK(s);
  112. }
  113. }
  114. s = writer->Close(IOOptions());
  115. ASSERT_OK(s);
  116. }
  117. TEST_F(WritableFileWriterTest, IncrementalBuffer) {
  118. class FakeWF : public FSWritableFile {
  119. public:
  120. explicit FakeWF(std::string* _file_data, bool _use_direct_io,
  121. bool _no_flush)
  122. : file_data_(_file_data),
  123. use_direct_io_(_use_direct_io),
  124. no_flush_(_no_flush) {}
  125. ~FakeWF() override = default;
  126. using FSWritableFile::Append;
  127. IOStatus Append(const Slice& data, const IOOptions& /*options*/,
  128. IODebugContext* /*dbg*/) override {
  129. file_data_->append(data.data(), data.size());
  130. size_ += data.size();
  131. return IOStatus::OK();
  132. }
  133. using FSWritableFile::PositionedAppend;
  134. IOStatus PositionedAppend(const Slice& data, uint64_t pos,
  135. const IOOptions& /*options*/,
  136. IODebugContext* /*dbg*/) override {
  137. EXPECT_TRUE(pos % 512 == 0);
  138. EXPECT_TRUE(data.size() % 512 == 0);
  139. file_data_->resize(pos);
  140. file_data_->append(data.data(), data.size());
  141. size_ += data.size();
  142. return IOStatus::OK();
  143. }
  144. IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
  145. IODebugContext* /*dbg*/) override {
  146. file_data_->resize(size);
  147. return IOStatus::OK();
  148. }
  149. IOStatus Close(const IOOptions& /*options*/,
  150. IODebugContext* /*dbg*/) override {
  151. return IOStatus::OK();
  152. }
  153. IOStatus Flush(const IOOptions& /*options*/,
  154. IODebugContext* /*dbg*/) override {
  155. return IOStatus::OK();
  156. }
  157. IOStatus Sync(const IOOptions& /*options*/,
  158. IODebugContext* /*dbg*/) override {
  159. return IOStatus::OK();
  160. }
  161. IOStatus Fsync(const IOOptions& /*options*/,
  162. IODebugContext* /*dbg*/) override {
  163. return IOStatus::OK();
  164. }
  165. void SetIOPriority(Env::IOPriority /*pri*/) override {}
  166. uint64_t GetFileSize(const IOOptions& /*options*/,
  167. IODebugContext* /*dbg*/) override {
  168. return size_;
  169. }
  170. void GetPreallocationStatus(size_t* /*block_size*/,
  171. size_t* /*last_allocated_block*/) override {}
  172. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  173. return 0;
  174. }
  175. IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  176. return IOStatus::OK();
  177. }
  178. bool use_direct_io() const override { return use_direct_io_; }
  179. std::string* file_data_;
  180. bool use_direct_io_;
  181. bool no_flush_;
  182. size_t size_ = 0;
  183. };
  184. Random r(301);
  185. const int kNumAttempts = 50;
  186. for (int attempt = 0; attempt < kNumAttempts; attempt++) {
  187. bool no_flush = (attempt % 3 == 0);
  188. EnvOptions env_options;
  189. env_options.writable_file_max_buffer_size =
  190. (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
  191. std::string actual;
  192. std::unique_ptr<FakeWF> wf(new FakeWF(&actual, attempt % 2 == 1, no_flush));
  193. std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
  194. std::move(wf), "" /* don't care */, env_options));
  195. std::string target;
  196. for (int i = 0; i < 20; i++) {
  197. uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
  198. std::string random_string = r.RandomString(num);
  199. ASSERT_OK(writer->Append(IOOptions(), Slice(random_string.c_str(), num)));
  200. target.append(random_string.c_str(), num);
  201. // In some attempts, flush in a chance of 1/10.
  202. if (!no_flush && r.Uniform(10) == 0) {
  203. ASSERT_OK(writer->Flush(IOOptions()));
  204. }
  205. }
  206. ASSERT_OK(writer->Flush(IOOptions()));
  207. ASSERT_OK(writer->Close(IOOptions()));
  208. ASSERT_EQ(target.size(), actual.size());
  209. ASSERT_EQ(target, actual);
  210. }
  211. }
  212. TEST_F(WritableFileWriterTest, AlignedBufferedWrites) {
  213. class FakeWF : public FSWritableFile {
  214. public:
  215. explicit FakeWF(std::string* _file_data) : file_data_(_file_data) {}
  216. ~FakeWF() override = default;
  217. using FSWritableFile::Append;
  218. IOStatus Append(const Slice& data, const IOOptions& /*options*/,
  219. IODebugContext* /*dbg*/) override {
  220. EXPECT_EQ(data.size() & (data.size() - 1), 0);
  221. file_data_->append(data.data(), data.size());
  222. size_ += data.size();
  223. return IOStatus::OK();
  224. }
  225. using FSWritableFile::PositionedAppend;
  226. IOStatus PositionedAppend(const Slice& data, uint64_t pos,
  227. const IOOptions& /*options*/,
  228. IODebugContext* /*dbg*/) override {
  229. EXPECT_TRUE(pos % 512 == 0);
  230. EXPECT_TRUE(data.size() % 512 == 0);
  231. file_data_->resize(pos);
  232. file_data_->append(data.data(), data.size());
  233. size_ += data.size();
  234. return IOStatus::OK();
  235. }
  236. IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
  237. IODebugContext* /*dbg*/) override {
  238. file_data_->resize(size);
  239. return IOStatus::OK();
  240. }
  241. IOStatus Close(const IOOptions& /*options*/,
  242. IODebugContext* /*dbg*/) override {
  243. return IOStatus::OK();
  244. }
  245. IOStatus Flush(const IOOptions& /*options*/,
  246. IODebugContext* /*dbg*/) override {
  247. return IOStatus::OK();
  248. }
  249. IOStatus Sync(const IOOptions& /*options*/,
  250. IODebugContext* /*dbg*/) override {
  251. return IOStatus::OK();
  252. }
  253. IOStatus Fsync(const IOOptions& /*options*/,
  254. IODebugContext* /*dbg*/) override {
  255. return IOStatus::OK();
  256. }
  257. void SetIOPriority(Env::IOPriority /*pri*/) override {}
  258. uint64_t GetFileSize(const IOOptions& /*options*/,
  259. IODebugContext* /*dbg*/) override {
  260. return size_;
  261. }
  262. void GetPreallocationStatus(size_t* /*block_size*/,
  263. size_t* /*last_allocated_block*/) override {}
  264. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  265. return 0;
  266. }
  267. IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  268. return IOStatus::OK();
  269. }
  270. bool use_direct_io() const override { return false; }
  271. std::string* file_data_;
  272. size_t size_ = 0;
  273. };
  274. Random r(301);
  275. EnvOptions env_options;
  276. env_options.writable_file_max_buffer_size = 64 * 1024 * 1024;
  277. std::string actual;
  278. std::unique_ptr<FakeWF> wf(new FakeWF(&actual));
  279. std::unique_ptr<WritableFileWriter> writer(
  280. new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
  281. std::string target;
  282. uint32_t left =
  283. static_cast<uint32_t>(2 * env_options.writable_file_max_buffer_size);
  284. ;
  285. while (left > 0) {
  286. uint32_t num = 4096 + r.Uniform(8192);
  287. num = std::min<uint32_t>(num, left);
  288. std::string random_string = r.RandomString(num);
  289. ASSERT_OK(writer->Append(IOOptions(), Slice(random_string.c_str(), num)));
  290. target.append(random_string.c_str(), num);
  291. left -= num;
  292. }
  293. ASSERT_OK(writer->Flush(IOOptions()));
  294. ASSERT_OK(writer->Close(IOOptions()));
  295. ASSERT_EQ(target.size(), actual.size());
  296. ASSERT_EQ(target, actual);
  297. }
  298. TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) {
  299. EnvOptions env_opts;
  300. env_opts.use_direct_writes = true;
  301. env_opts.writable_file_max_buffer_size = 0;
  302. {
  303. std::unique_ptr<WritableFileWriter> writer;
  304. const Status s =
  305. WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care",
  306. FileOptions(env_opts), &writer,
  307. /*dbg=*/nullptr);
  308. ASSERT_TRUE(s.IsInvalidArgument());
  309. }
  310. }
  311. class DBWritableFileWriterTest : public DBTestBase {
  312. public:
  313. DBWritableFileWriterTest()
  314. : DBTestBase("db_secondary_cache_test", /*env_do_fsync=*/true) {
  315. fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
  316. fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
  317. }
  318. std::shared_ptr<FaultInjectionTestFS> fault_fs_;
  319. std::unique_ptr<Env> fault_env_;
  320. };
  321. TEST_F(DBWritableFileWriterTest, AppendWithChecksum) {
  322. FileOptions file_options = FileOptions();
  323. Options options = GetDefaultOptions();
  324. options.create_if_missing = true;
  325. DestroyAndReopen(options);
  326. std::string fname = dbname_ + "/test_file";
  327. std::unique_ptr<FSWritableFile> writable_file_ptr;
  328. ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
  329. /*dbg*/ nullptr));
  330. std::unique_ptr<TestFSWritableFile> file;
  331. file.reset(new TestFSWritableFile(
  332. fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  333. std::unique_ptr<WritableFileWriter> file_writer;
  334. ImmutableOptions ioptions(options);
  335. file_writer.reset(new WritableFileWriter(
  336. std::move(file), fname, file_options, SystemClock::Default().get(),
  337. nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
  338. ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
  339. true));
  340. Random rnd(301);
  341. std::string data = rnd.RandomString(1000);
  342. uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
  343. fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  344. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  345. ASSERT_OK(file_writer->Flush(IOOptions()));
  346. Random size_r(47);
  347. for (int i = 0; i < 2000; i++) {
  348. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
  349. data_crc32c = crc32c::Value(data.c_str(), data.size());
  350. ASSERT_OK(
  351. file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  352. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
  353. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
  354. ASSERT_OK(file_writer->Flush(IOOptions()));
  355. }
  356. ASSERT_OK(file_writer->Close(IOOptions()));
  357. Destroy(options);
  358. }
  359. TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) {
  360. FileOptions file_options = FileOptions();
  361. Options options = GetDefaultOptions();
  362. options.create_if_missing = true;
  363. DestroyAndReopen(options);
  364. std::string fname = dbname_ + "/test_file";
  365. std::unique_ptr<FSWritableFile> writable_file_ptr;
  366. ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
  367. /*dbg*/ nullptr));
  368. std::unique_ptr<TestFSWritableFile> file;
  369. file.reset(new TestFSWritableFile(
  370. fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  371. std::unique_ptr<WritableFileWriter> file_writer;
  372. ImmutableOptions ioptions(options);
  373. // Enable checksum handoff for this file, but do not enable buffer checksum.
  374. // So Append with checksum logic will not be triggered
  375. file_writer.reset(new WritableFileWriter(
  376. std::move(file), fname, file_options, SystemClock::Default().get(),
  377. nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
  378. ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
  379. false));
  380. Random rnd(301);
  381. std::string data = rnd.RandomString(1000);
  382. uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
  383. fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  384. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  385. ASSERT_OK(file_writer->Flush(IOOptions()));
  386. Random size_r(47);
  387. for (int i = 0; i < 1000; i++) {
  388. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
  389. data_crc32c = crc32c::Value(data.c_str(), data.size());
  390. ASSERT_OK(
  391. file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  392. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
  393. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
  394. ASSERT_OK(file_writer->Flush(IOOptions()));
  395. }
  396. ASSERT_OK(file_writer->Close(IOOptions()));
  397. Destroy(options);
  398. }
  399. TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) {
  400. FileOptions file_options = FileOptions();
  401. file_options.rate_limiter = nullptr;
  402. Options options = GetDefaultOptions();
  403. options.create_if_missing = true;
  404. DestroyAndReopen(options);
  405. std::string fname = dbname_ + "/test_file";
  406. std::unique_ptr<FSWritableFile> writable_file_ptr;
  407. ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
  408. /*dbg*/ nullptr));
  409. std::unique_ptr<TestFSWritableFile> file;
  410. file.reset(new TestFSWritableFile(
  411. fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
  412. std::unique_ptr<WritableFileWriter> file_writer;
  413. ImmutableOptions ioptions(options);
  414. // Enable checksum handoff for this file, but do not enable buffer checksum.
  415. // So Append with checksum logic will not be triggered
  416. file_writer.reset(new WritableFileWriter(
  417. std::move(file), fname, file_options, SystemClock::Default().get(),
  418. nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
  419. ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
  420. true));
  421. fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  422. Random rnd(301);
  423. std::string data;
  424. uint32_t data_crc32c;
  425. uint64_t start = fault_env_->NowMicros();
  426. Random size_r(47);
  427. uint64_t bytes_written = 0;
  428. for (int i = 0; i < 100; i++) {
  429. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
  430. data_crc32c = crc32c::Value(data.c_str(), data.size());
  431. ASSERT_OK(
  432. file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  433. bytes_written += static_cast<uint64_t>(data.size());
  434. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
  435. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
  436. ASSERT_OK(file_writer->Flush(IOOptions()));
  437. bytes_written += static_cast<uint64_t>(data.size());
  438. }
  439. uint64_t elapsed = fault_env_->NowMicros() - start;
  440. double raw_rate = bytes_written * 1000000.0 / elapsed;
  441. ASSERT_OK(file_writer->Close(IOOptions()));
  442. // Set the rate-limiter
  443. FileOptions file_options1 = FileOptions();
  444. file_options1.rate_limiter =
  445. NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate));
  446. fname = dbname_ + "/test_file_1";
  447. std::unique_ptr<FSWritableFile> writable_file_ptr1;
  448. ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1,
  449. &writable_file_ptr1,
  450. /*dbg*/ nullptr));
  451. file.reset(new TestFSWritableFile(
  452. fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get()));
  453. // Enable checksum handoff for this file, but do not enable buffer checksum.
  454. // So Append with checksum logic will not be triggered
  455. file_writer.reset(new WritableFileWriter(
  456. std::move(file), fname, file_options1, SystemClock::Default().get(),
  457. nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
  458. ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
  459. true));
  460. for (int i = 0; i < 1000; i++) {
  461. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
  462. data_crc32c = crc32c::Value(data.c_str(), data.size());
  463. ASSERT_OK(
  464. file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
  465. data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
  466. ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
  467. ASSERT_OK(file_writer->Flush(IOOptions()));
  468. }
  469. ASSERT_OK(file_writer->Close(IOOptions()));
  470. if (file_options1.rate_limiter != nullptr) {
  471. delete file_options1.rate_limiter;
  472. }
  473. Destroy(options);
  474. }
  475. TEST_F(WritableFileWriterTest, AppendStatusReturn) {
  476. class FakeWF : public FSWritableFile {
  477. public:
  478. explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
  479. bool use_direct_io() const override { return use_direct_io_; }
  480. using FSWritableFile::Append;
  481. IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
  482. IODebugContext* /*dbg*/) override {
  483. if (io_error_) {
  484. return IOStatus::IOError("Fake IO error");
  485. }
  486. return IOStatus::OK();
  487. }
  488. using FSWritableFile::PositionedAppend;
  489. IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
  490. const IOOptions& /*options*/,
  491. IODebugContext* /*dbg*/) override {
  492. if (io_error_) {
  493. return IOStatus::IOError("Fake IO error");
  494. }
  495. return IOStatus::OK();
  496. }
  497. IOStatus Close(const IOOptions& /*options*/,
  498. IODebugContext* /*dbg*/) override {
  499. return IOStatus::OK();
  500. }
  501. IOStatus Flush(const IOOptions& /*options*/,
  502. IODebugContext* /*dbg*/) override {
  503. return IOStatus::OK();
  504. }
  505. IOStatus Sync(const IOOptions& /*options*/,
  506. IODebugContext* /*dbg*/) override {
  507. return IOStatus::OK();
  508. }
  509. void Setuse_direct_io(bool val) { use_direct_io_ = val; }
  510. void SetIOError(bool val) { io_error_ = val; }
  511. uint64_t GetFileSize(const IOOptions& /*options*/,
  512. IODebugContext* /*dbg*/) override {
  513. return 0;
  514. }
  515. protected:
  516. bool use_direct_io_;
  517. bool io_error_;
  518. };
  519. std::unique_ptr<FakeWF> wf(new FakeWF());
  520. wf->Setuse_direct_io(true);
  521. std::unique_ptr<WritableFileWriter> writer(
  522. new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
  523. ASSERT_OK(writer->Append(IOOptions(), std::string(2 * kMb, 'a')));
  524. // Next call to WritableFile::Append() should fail
  525. FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
  526. fwf->SetIOError(true);
  527. ASSERT_NOK(writer->Append(IOOptions(), std::string(2 * kMb, 'b')));
  528. }
  529. class ReadaheadRandomAccessFileTest
  530. : public testing::Test,
  531. public testing::WithParamInterface<size_t> {
  532. public:
  533. static std::vector<size_t> GetReadaheadSizeList() {
  534. return {1lu << 12, 1lu << 16};
  535. }
  536. void SetUp() override {
  537. readahead_size_ = GetParam();
  538. scratch_.reset(new char[2 * readahead_size_]);
  539. ResetSourceStr();
  540. }
  541. ReadaheadRandomAccessFileTest() : control_contents_() {}
  542. std::string Read(uint64_t offset, size_t n) {
  543. Slice result;
  544. Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
  545. scratch_.get(), nullptr);
  546. EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
  547. return std::string(result.data(), result.size());
  548. }
  549. void ResetSourceStr(const std::string& str = "") {
  550. std::unique_ptr<FSWritableFile> sink(
  551. new test::StringSink(&control_contents_));
  552. std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
  553. std::move(sink), "" /* don't care */, FileOptions()));
  554. Status s = write_holder->Append(IOOptions(), Slice(str));
  555. EXPECT_OK(s);
  556. s = write_holder->Flush(IOOptions());
  557. EXPECT_OK(s);
  558. std::unique_ptr<FSRandomAccessFile> read_holder(
  559. new test::StringSource(control_contents_));
  560. test_read_holder_ =
  561. NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
  562. }
  563. size_t GetReadaheadSize() const { return readahead_size_; }
  564. private:
  565. size_t readahead_size_;
  566. Slice control_contents_;
  567. std::unique_ptr<FSRandomAccessFile> test_read_holder_;
  568. std::unique_ptr<char[]> scratch_;
  569. };
  570. TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
  571. ASSERT_EQ("", Read(0, 1));
  572. ASSERT_EQ("", Read(0, 0));
  573. ASSERT_EQ("", Read(13, 13));
  574. }
  575. TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
  576. std::string str = "abcdefghijklmnopqrs";
  577. ResetSourceStr(str);
  578. ASSERT_EQ(str.substr(3, 4), Read(3, 4));
  579. ASSERT_EQ(str.substr(0, 3), Read(0, 3));
  580. ASSERT_EQ(str, Read(0, str.size()));
  581. ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
  582. Read(7, 30));
  583. ASSERT_EQ("", Read(100, 100));
  584. }
  585. TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
  586. Random rng(42);
  587. for (int k = 0; k < 100; ++k) {
  588. size_t strLen = k * GetReadaheadSize() +
  589. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  590. std::string str = rng.HumanReadableString(static_cast<int>(strLen));
  591. ResetSourceStr(str);
  592. for (int test = 1; test <= 100; ++test) {
  593. size_t offset = rng.Uniform(static_cast<int>(strLen));
  594. size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
  595. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
  596. Read(offset, n));
  597. }
  598. }
  599. }
  600. TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
  601. Random rng(7);
  602. size_t strLen = 4 * GetReadaheadSize() +
  603. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  604. std::string str = rng.HumanReadableString(static_cast<int>(strLen));
  605. ResetSourceStr(str);
  606. for (int test = 1; test <= 100; ++test) {
  607. size_t offset = rng.Uniform(static_cast<int>(strLen));
  608. size_t n =
  609. GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
  610. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
  611. Read(offset, n));
  612. }
  613. }
  614. INSTANTIATE_TEST_CASE_P(
  615. EmptySourceStr, ReadaheadRandomAccessFileTest,
  616. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  617. INSTANTIATE_TEST_CASE_P(
  618. SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
  619. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  620. INSTANTIATE_TEST_CASE_P(
  621. SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
  622. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  623. INSTANTIATE_TEST_CASE_P(
  624. ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
  625. ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
  626. class ReadaheadSequentialFileTest : public testing::Test,
  627. public testing::WithParamInterface<size_t> {
  628. public:
  629. static std::vector<size_t> GetReadaheadSizeList() {
  630. return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
  631. }
  632. void SetUp() override {
  633. readahead_size_ = GetParam();
  634. scratch_.reset(new char[2 * readahead_size_]);
  635. ResetSourceStr();
  636. }
  637. ReadaheadSequentialFileTest() = default;
  638. std::string Read(size_t n) {
  639. Slice result;
  640. Status s = test_read_holder_->Read(
  641. n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/);
  642. EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
  643. return std::string(result.data(), result.size());
  644. }
  645. void Skip(size_t n) { test_read_holder_->Skip(n); }
  646. void ResetSourceStr(const std::string& str = "") {
  647. auto read_holder = std::unique_ptr<FSSequentialFile>(
  648. new test::SeqStringSource(str, &seq_read_count_));
  649. test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
  650. "test", readahead_size_));
  651. }
  652. size_t GetReadaheadSize() const { return readahead_size_; }
  653. private:
  654. size_t readahead_size_;
  655. std::unique_ptr<SequentialFileReader> test_read_holder_;
  656. std::unique_ptr<char[]> scratch_;
  657. std::atomic<int> seq_read_count_;
  658. };
  659. TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
  660. ASSERT_EQ("", Read(0));
  661. ASSERT_EQ("", Read(1));
  662. ASSERT_EQ("", Read(13));
  663. }
  664. TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
  665. std::string str = "abcdefghijklmnopqrs";
  666. ResetSourceStr(str);
  667. ASSERT_EQ(str.substr(0, 3), Read(3));
  668. ASSERT_EQ(str.substr(3, 1), Read(1));
  669. ASSERT_EQ(str.substr(4), Read(str.size()));
  670. ASSERT_EQ("", Read(100));
  671. }
  672. TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
  673. Random rng(42);
  674. for (int s = 0; s < 1; ++s) {
  675. for (int k = 0; k < 100; ++k) {
  676. size_t strLen = k * GetReadaheadSize() +
  677. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  678. std::string str = rng.HumanReadableString(static_cast<int>(strLen));
  679. ResetSourceStr(str);
  680. size_t offset = 0;
  681. for (int test = 1; test <= 100; ++test) {
  682. size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
  683. if (s && test % 2) {
  684. Skip(n);
  685. } else {
  686. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
  687. }
  688. offset = std::min(offset + n, strLen);
  689. }
  690. }
  691. }
  692. }
  693. TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
  694. Random rng(42);
  695. for (int s = 0; s < 1; ++s) {
  696. for (int k = 0; k < 100; ++k) {
  697. size_t strLen = k * GetReadaheadSize() +
  698. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  699. std::string str = rng.HumanReadableString(static_cast<int>(strLen));
  700. ResetSourceStr(str);
  701. size_t offset = 0;
  702. for (int test = 1; test <= 100; ++test) {
  703. size_t n = GetReadaheadSize() +
  704. rng.Uniform(static_cast<int>(GetReadaheadSize()));
  705. if (s && test % 2) {
  706. Skip(n);
  707. } else {
  708. ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
  709. }
  710. offset = std::min(offset + n, strLen);
  711. }
  712. }
  713. }
  714. }
  715. INSTANTIATE_TEST_CASE_P(
  716. EmptySourceStr, ReadaheadSequentialFileTest,
  717. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  718. INSTANTIATE_TEST_CASE_P(
  719. SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
  720. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  721. INSTANTIATE_TEST_CASE_P(
  722. SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
  723. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  724. INSTANTIATE_TEST_CASE_P(
  725. ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
  726. ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
  727. namespace {
  728. std::string GenerateLine(int n) {
  729. std::string rv;
  730. // Multiples of 17 characters per line, for likely bad buffer alignment
  731. for (int i = 0; i < n; ++i) {
  732. rv.push_back(static_cast<char>('0' + (i % 10)));
  733. rv.append("xxxxxxxxxxxxxxxx");
  734. }
  735. return rv;
  736. }
  737. } // namespace
  738. TEST(LineFileReaderTest, LineFileReaderTest) {
  739. const int nlines = 1000;
  740. std::unique_ptr<Env> mem_env(MockEnv::Create(Env::Default()));
  741. std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
  742. // Create an input file
  743. {
  744. std::unique_ptr<FSWritableFile> file;
  745. ASSERT_OK(
  746. fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
  747. for (int i = 0; i < nlines; ++i) {
  748. std::string line = GenerateLine(i);
  749. line.push_back('\n');
  750. ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
  751. }
  752. }
  753. // Verify with no I/O errors
  754. {
  755. std::unique_ptr<LineFileReader> reader;
  756. ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
  757. nullptr /* dbg */,
  758. nullptr /* rate_limiter */));
  759. std::string line;
  760. int count = 0;
  761. while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
  762. ASSERT_EQ(line, GenerateLine(count));
  763. ++count;
  764. ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  765. }
  766. ASSERT_OK(reader->GetStatus());
  767. ASSERT_EQ(count, nlines);
  768. ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  769. // And still
  770. ASSERT_FALSE(
  771. reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
  772. ASSERT_OK(reader->GetStatus());
  773. ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  774. }
  775. // Verify with injected I/O error
  776. {
  777. std::unique_ptr<LineFileReader> reader;
  778. ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
  779. nullptr /* dbg */,
  780. nullptr /* rate_limiter */));
  781. std::string line;
  782. int count = 0;
  783. // Read part way through the file
  784. while (count < nlines / 4) {
  785. ASSERT_TRUE(
  786. reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
  787. ASSERT_EQ(line, GenerateLine(count));
  788. ++count;
  789. ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  790. }
  791. ASSERT_OK(reader->GetStatus());
  792. // Inject error
  793. int callback_count = 0;
  794. SyncPoint::GetInstance()->SetCallBack(
  795. "MemFile::Read:IOStatus", [&](void* arg) {
  796. IOStatus* status = static_cast<IOStatus*>(arg);
  797. *status = IOStatus::Corruption("test");
  798. ++callback_count;
  799. });
  800. SyncPoint::GetInstance()->EnableProcessing();
  801. while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
  802. ASSERT_EQ(line, GenerateLine(count));
  803. ++count;
  804. ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
  805. }
  806. ASSERT_TRUE(reader->GetStatus().IsCorruption());
  807. ASSERT_LT(count, nlines / 2);
  808. ASSERT_EQ(callback_count, 1);
  809. // Still get error & no retry
  810. ASSERT_FALSE(
  811. reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
  812. ASSERT_TRUE(reader->GetStatus().IsCorruption());
  813. ASSERT_EQ(callback_count, 1);
  814. SyncPoint::GetInstance()->DisableProcessing();
  815. SyncPoint::GetInstance()->ClearAllCallBacks();
  816. }
  817. }
  818. class IOErrorEventListener : public EventListener {
  819. public:
  820. IOErrorEventListener() { notify_error_.store(0); }
  821. void OnIOError(const IOErrorInfo& io_error_info) override {
  822. notify_error_++;
  823. EXPECT_FALSE(io_error_info.file_path.empty());
  824. EXPECT_FALSE(io_error_info.io_status.ok());
  825. }
  826. size_t NotifyErrorCount() { return notify_error_; }
  827. bool ShouldBeNotifiedOnFileIO() override { return true; }
  828. private:
  829. std::atomic<size_t> notify_error_;
  830. };
  831. TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
  832. class FakeWF : public FSWritableFile {
  833. public:
  834. explicit FakeWF() : io_error_(false) {
  835. file_append_errors_.store(0);
  836. file_flush_errors_.store(0);
  837. }
  838. using FSWritableFile::Append;
  839. IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
  840. IODebugContext* /*dbg*/) override {
  841. if (io_error_) {
  842. file_append_errors_++;
  843. return IOStatus::IOError("Fake IO error");
  844. }
  845. return IOStatus::OK();
  846. }
  847. using FSWritableFile::PositionedAppend;
  848. IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
  849. const IOOptions& /*options*/,
  850. IODebugContext* /*dbg*/) override {
  851. if (io_error_) {
  852. return IOStatus::IOError("Fake IO error");
  853. }
  854. return IOStatus::OK();
  855. }
  856. IOStatus Close(const IOOptions& /*options*/,
  857. IODebugContext* /*dbg*/) override {
  858. return IOStatus::OK();
  859. }
  860. IOStatus Flush(const IOOptions& /*options*/,
  861. IODebugContext* /*dbg*/) override {
  862. if (io_error_) {
  863. file_flush_errors_++;
  864. return IOStatus::IOError("Fake IO error");
  865. }
  866. return IOStatus::OK();
  867. }
  868. IOStatus Sync(const IOOptions& /*options*/,
  869. IODebugContext* /*dbg*/) override {
  870. return IOStatus::OK();
  871. }
  872. void SetIOError(bool val) { io_error_ = val; }
  873. void CheckCounters(int file_append_errors, int file_flush_errors) {
  874. ASSERT_EQ(file_append_errors, file_append_errors_);
  875. ASSERT_EQ(file_flush_errors_, file_flush_errors);
  876. }
  877. uint64_t GetFileSize(const IOOptions& /*options*/,
  878. IODebugContext* /*dbg*/) override {
  879. return 0;
  880. }
  881. protected:
  882. bool io_error_;
  883. std::atomic<size_t> file_append_errors_;
  884. std::atomic<size_t> file_flush_errors_;
  885. };
  886. FileOptions file_options = FileOptions();
  887. Options options = GetDefaultOptions();
  888. options.create_if_missing = true;
  889. IOErrorEventListener* listener = new IOErrorEventListener();
  890. options.listeners.emplace_back(listener);
  891. DestroyAndReopen(options);
  892. ImmutableOptions ioptions(options);
  893. std::string fname = dbname_ + "/test_file";
  894. std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF);
  895. std::unique_ptr<WritableFileWriter> file_writer;
  896. writable_file_ptr->SetIOError(true);
  897. file_writer.reset(new WritableFileWriter(
  898. std::move(writable_file_ptr), fname, file_options,
  899. SystemClock::Default().get(), nullptr, ioptions.stats,
  900. Histograms::HISTOGRAM_ENUM_MAX /* hist_type */, ioptions.listeners,
  901. ioptions.file_checksum_gen_factory.get(), true, true));
  902. FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file());
  903. fwf->SetIOError(true);
  904. ASSERT_NOK(file_writer->Append(IOOptions(), std::string(2 * kMb, 'a')));
  905. fwf->CheckCounters(1, 0);
  906. ASSERT_EQ(listener->NotifyErrorCount(), 1);
  907. file_writer->reset_seen_error();
  908. fwf->SetIOError(true);
  909. ASSERT_NOK(file_writer->Flush(IOOptions()));
  910. fwf->CheckCounters(1, 1);
  911. ASSERT_EQ(listener->NotifyErrorCount(), 2);
  912. /* No error generation */
  913. file_writer->reset_seen_error();
  914. fwf->SetIOError(false);
  915. ASSERT_OK(file_writer->Append(IOOptions(), std::string(2 * kMb, 'b')));
  916. ASSERT_EQ(listener->NotifyErrorCount(), 2);
  917. fwf->CheckCounters(1, 1);
  918. }
  919. class WritableFileWriterIOPriorityTest : public testing::Test {
  920. protected:
  921. // This test is to check whether the rate limiter priority can be passed
  922. // correctly from WritableFileWriter functions to FSWritableFile functions.
  923. void SetUp() override {
  924. // When op_rate_limiter_priority parameter in WritableFileWriter functions
  925. // is the default (Env::IO_TOTAL).
  926. std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)};
  927. FileOptions file_options;
  928. writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */,
  929. file_options));
  930. }
  931. class FakeWF : public FSWritableFile {
  932. public:
  933. explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); }
  934. ~FakeWF() override = default;
  935. IOStatus Append(const Slice& /*data*/, const IOOptions& options,
  936. IODebugContext* /*dbg*/) override {
  937. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  938. return IOStatus::OK();
  939. }
  940. IOStatus Append(const Slice& data, const IOOptions& options,
  941. const DataVerificationInfo& /* verification_info */,
  942. IODebugContext* dbg) override {
  943. return Append(data, options, dbg);
  944. }
  945. IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/,
  946. const IOOptions& options,
  947. IODebugContext* /*dbg*/) override {
  948. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  949. return IOStatus::OK();
  950. }
  951. IOStatus PositionedAppend(
  952. const Slice& /* data */, uint64_t /* offset */,
  953. const IOOptions& options,
  954. const DataVerificationInfo& /* verification_info */,
  955. IODebugContext* /*dbg*/) override {
  956. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  957. return IOStatus::OK();
  958. }
  959. IOStatus Truncate(uint64_t /*size*/, const IOOptions& options,
  960. IODebugContext* /*dbg*/) override {
  961. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  962. return IOStatus::OK();
  963. }
  964. IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
  965. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  966. return IOStatus::OK();
  967. }
  968. IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
  969. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  970. return IOStatus::OK();
  971. }
  972. IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
  973. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  974. return IOStatus::OK();
  975. }
  976. IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override {
  977. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  978. return IOStatus::OK();
  979. }
  980. uint64_t GetFileSize(const IOOptions& options,
  981. IODebugContext* /*dbg*/) override {
  982. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  983. return 0;
  984. }
  985. void GetPreallocationStatus(size_t* /*block_size*/,
  986. size_t* /*last_allocated_block*/) override {}
  987. size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
  988. return 0;
  989. }
  990. IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
  991. return IOStatus::OK();
  992. }
  993. IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
  994. const IOOptions& options,
  995. IODebugContext* /*dbg*/) override {
  996. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  997. return IOStatus::OK();
  998. }
  999. IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
  1000. const IOOptions& options,
  1001. IODebugContext* /*dbg*/) override {
  1002. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  1003. return IOStatus::OK();
  1004. }
  1005. void PrepareWrite(size_t /*offset*/, size_t /*len*/,
  1006. const IOOptions& options,
  1007. IODebugContext* /*dbg*/) override {
  1008. EXPECT_EQ(options.rate_limiter_priority, io_priority_);
  1009. }
  1010. bool IsSyncThreadSafe() const override { return true; }
  1011. };
  1012. std::unique_ptr<WritableFileWriter> writer_;
  1013. };
  1014. TEST_F(WritableFileWriterIOPriorityTest, Append) {
  1015. ASSERT_OK(writer_->Append(IOOptions(), Slice("abc")));
  1016. }
  1017. TEST_F(WritableFileWriterIOPriorityTest, Pad) {
  1018. ASSERT_OK(writer_->Pad(IOOptions(), 500, kDefaultPageSize));
  1019. }
  1020. TEST_F(WritableFileWriterIOPriorityTest, Flush) {
  1021. ASSERT_OK(writer_->Flush(IOOptions()));
  1022. }
  1023. TEST_F(WritableFileWriterIOPriorityTest, Close) {
  1024. ASSERT_OK(writer_->Close(IOOptions()));
  1025. }
  1026. TEST_F(WritableFileWriterIOPriorityTest, Sync) {
  1027. ASSERT_OK(writer_->Sync(IOOptions(), false));
  1028. ASSERT_OK(writer_->Sync(IOOptions(), true));
  1029. }
  1030. TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) {
  1031. ASSERT_OK(writer_->SyncWithoutFlush(IOOptions(), false));
  1032. ASSERT_OK(writer_->SyncWithoutFlush(IOOptions(), true));
  1033. }
  1034. TEST_F(WritableFileWriterIOPriorityTest, BasicOp) {
  1035. EnvOptions env_options;
  1036. env_options.bytes_per_sync = kMb;
  1037. std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH));
  1038. std::unique_ptr<WritableFileWriter> writer(
  1039. new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
  1040. Random r(301);
  1041. Status s;
  1042. std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
  1043. for (int i = 0; i < 1000; i++) {
  1044. int skew_limit = (i < 700) ? 10 : 15;
  1045. uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
  1046. s = writer->Append(IOOptions(), Slice(large_buf.get(), num));
  1047. ASSERT_OK(s);
  1048. // Flush in a chance of 1/10.
  1049. if (r.Uniform(10) == 0) {
  1050. s = writer->Flush(IOOptions());
  1051. ASSERT_OK(s);
  1052. }
  1053. }
  1054. s = writer->Close(IOOptions());
  1055. ASSERT_OK(s);
  1056. }
  1057. } // namespace ROCKSDB_NAMESPACE
  1058. int main(int argc, char** argv) {
  1059. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1060. ::testing::InitGoogleTest(&argc, argv);
  1061. return RUN_ALL_TESTS();
  1062. }