| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- #include <algorithm>
- #include <vector>
- #include "db/db_test_util.h"
- #include "env/mock_env.h"
- #include "file/line_file_reader.h"
- #include "file/random_access_file_reader.h"
- #include "file/read_write_util.h"
- #include "file/readahead_raf.h"
- #include "file/sequence_file_reader.h"
- #include "file/writable_file_writer.h"
- #include "rocksdb/file_system.h"
- #include "test_util/testharness.h"
- #include "test_util/testutil.h"
- #include "util/crc32c.h"
- #include "util/random.h"
- #include "utilities/fault_injection_fs.h"
- namespace ROCKSDB_NAMESPACE {
- class WritableFileWriterTest : public testing::Test {};
- constexpr uint32_t kMb = static_cast<uint32_t>(1) << 20;
- TEST_F(WritableFileWriterTest, RangeSync) {
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF() : size_(0), last_synced_(0) {}
- ~FakeWF() override = default;
- using FSWritableFile::Append;
- IOStatus Append(const Slice& data, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- size_ += data.size();
- return IOStatus::OK();
- }
- IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- EXPECT_GE(size_, last_synced_ + kMb);
- EXPECT_LT(size_, last_synced_ + 2 * kMb);
- // Make sure random writes generated enough writes.
- EXPECT_GT(size_, 10 * kMb);
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Fsync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- void SetIOPriority(Env::IOPriority /*pri*/) override {}
- uint64_t GetFileSize(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return size_;
- }
- void GetPreallocationStatus(size_t* /*block_size*/,
- size_t* /*last_allocated_block*/) override {}
- size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
- return 0;
- }
- IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
- return IOStatus::OK();
- }
- protected:
- IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(offset % 4096, 0u);
- EXPECT_EQ(nbytes % 4096, 0u);
- EXPECT_EQ(offset, last_synced_);
- last_synced_ = offset + nbytes;
- EXPECT_GE(size_, last_synced_ + kMb);
- if (size_ > 2 * kMb) {
- EXPECT_LT(size_, last_synced_ + 2 * kMb);
- }
- return IOStatus::OK();
- }
- uint64_t size_;
- uint64_t last_synced_;
- };
- EnvOptions env_options;
- env_options.bytes_per_sync = kMb;
- std::unique_ptr<FakeWF> wf(new FakeWF);
- std::unique_ptr<WritableFileWriter> writer(
- new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
- Random r(301);
- Status s;
- std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
- for (int i = 0; i < 1000; i++) {
- int skew_limit = (i < 700) ? 10 : 15;
- uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
- s = writer->Append(IOOptions(), Slice(large_buf.get(), num));
- ASSERT_OK(s);
- // Flush in a chance of 1/10.
- if (r.Uniform(10) == 0) {
- s = writer->Flush(IOOptions());
- ASSERT_OK(s);
- }
- }
- s = writer->Close(IOOptions());
- ASSERT_OK(s);
- }
- TEST_F(WritableFileWriterTest, IncrementalBuffer) {
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF(std::string* _file_data, bool _use_direct_io,
- bool _no_flush)
- : file_data_(_file_data),
- use_direct_io_(_use_direct_io),
- no_flush_(_no_flush) {}
- ~FakeWF() override = default;
- using FSWritableFile::Append;
- IOStatus Append(const Slice& data, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- file_data_->append(data.data(), data.size());
- size_ += data.size();
- return IOStatus::OK();
- }
- using FSWritableFile::PositionedAppend;
- IOStatus PositionedAppend(const Slice& data, uint64_t pos,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- EXPECT_TRUE(pos % 512 == 0);
- EXPECT_TRUE(data.size() % 512 == 0);
- file_data_->resize(pos);
- file_data_->append(data.data(), data.size());
- size_ += data.size();
- return IOStatus::OK();
- }
- IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- file_data_->resize(size);
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Fsync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- void SetIOPriority(Env::IOPriority /*pri*/) override {}
- uint64_t GetFileSize(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return size_;
- }
- void GetPreallocationStatus(size_t* /*block_size*/,
- size_t* /*last_allocated_block*/) override {}
- size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
- return 0;
- }
- IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
- return IOStatus::OK();
- }
- bool use_direct_io() const override { return use_direct_io_; }
- std::string* file_data_;
- bool use_direct_io_;
- bool no_flush_;
- size_t size_ = 0;
- };
- Random r(301);
- const int kNumAttempts = 50;
- for (int attempt = 0; attempt < kNumAttempts; attempt++) {
- bool no_flush = (attempt % 3 == 0);
- EnvOptions env_options;
- env_options.writable_file_max_buffer_size =
- (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
- std::string actual;
- std::unique_ptr<FakeWF> wf(new FakeWF(&actual, attempt % 2 == 1, no_flush));
- std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
- std::move(wf), "" /* don't care */, env_options));
- std::string target;
- for (int i = 0; i < 20; i++) {
- uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
- std::string random_string = r.RandomString(num);
- ASSERT_OK(writer->Append(IOOptions(), Slice(random_string.c_str(), num)));
- target.append(random_string.c_str(), num);
- // In some attempts, flush in a chance of 1/10.
- if (!no_flush && r.Uniform(10) == 0) {
- ASSERT_OK(writer->Flush(IOOptions()));
- }
- }
- ASSERT_OK(writer->Flush(IOOptions()));
- ASSERT_OK(writer->Close(IOOptions()));
- ASSERT_EQ(target.size(), actual.size());
- ASSERT_EQ(target, actual);
- }
- }
- TEST_F(WritableFileWriterTest, AlignedBufferedWrites) {
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF(std::string* _file_data) : file_data_(_file_data) {}
- ~FakeWF() override = default;
- using FSWritableFile::Append;
- IOStatus Append(const Slice& data, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(data.size() & (data.size() - 1), 0);
- file_data_->append(data.data(), data.size());
- size_ += data.size();
- return IOStatus::OK();
- }
- using FSWritableFile::PositionedAppend;
- IOStatus PositionedAppend(const Slice& data, uint64_t pos,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- EXPECT_TRUE(pos % 512 == 0);
- EXPECT_TRUE(data.size() % 512 == 0);
- file_data_->resize(pos);
- file_data_->append(data.data(), data.size());
- size_ += data.size();
- return IOStatus::OK();
- }
- IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- file_data_->resize(size);
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Fsync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- void SetIOPriority(Env::IOPriority /*pri*/) override {}
- uint64_t GetFileSize(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return size_;
- }
- void GetPreallocationStatus(size_t* /*block_size*/,
- size_t* /*last_allocated_block*/) override {}
- size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
- return 0;
- }
- IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
- return IOStatus::OK();
- }
- bool use_direct_io() const override { return false; }
- std::string* file_data_;
- size_t size_ = 0;
- };
- Random r(301);
- EnvOptions env_options;
- env_options.writable_file_max_buffer_size = 64 * 1024 * 1024;
- std::string actual;
- std::unique_ptr<FakeWF> wf(new FakeWF(&actual));
- std::unique_ptr<WritableFileWriter> writer(
- new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
- std::string target;
- uint32_t left =
- static_cast<uint32_t>(2 * env_options.writable_file_max_buffer_size);
- ;
- while (left > 0) {
- uint32_t num = 4096 + r.Uniform(8192);
- num = std::min<uint32_t>(num, left);
- std::string random_string = r.RandomString(num);
- ASSERT_OK(writer->Append(IOOptions(), Slice(random_string.c_str(), num)));
- target.append(random_string.c_str(), num);
- left -= num;
- }
- ASSERT_OK(writer->Flush(IOOptions()));
- ASSERT_OK(writer->Close(IOOptions()));
- ASSERT_EQ(target.size(), actual.size());
- ASSERT_EQ(target, actual);
- }
- TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) {
- EnvOptions env_opts;
- env_opts.use_direct_writes = true;
- env_opts.writable_file_max_buffer_size = 0;
- {
- std::unique_ptr<WritableFileWriter> writer;
- const Status s =
- WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care",
- FileOptions(env_opts), &writer,
- /*dbg=*/nullptr);
- ASSERT_TRUE(s.IsInvalidArgument());
- }
- }
- class DBWritableFileWriterTest : public DBTestBase {
- public:
- DBWritableFileWriterTest()
- : DBTestBase("db_secondary_cache_test", /*env_do_fsync=*/true) {
- fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
- fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
- }
- std::shared_ptr<FaultInjectionTestFS> fault_fs_;
- std::unique_ptr<Env> fault_env_;
- };
- TEST_F(DBWritableFileWriterTest, AppendWithChecksum) {
- FileOptions file_options = FileOptions();
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- DestroyAndReopen(options);
- std::string fname = dbname_ + "/test_file";
- std::unique_ptr<FSWritableFile> writable_file_ptr;
- ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
- /*dbg*/ nullptr));
- std::unique_ptr<TestFSWritableFile> file;
- file.reset(new TestFSWritableFile(
- fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
- std::unique_ptr<WritableFileWriter> file_writer;
- ImmutableOptions ioptions(options);
- file_writer.reset(new WritableFileWriter(
- std::move(file), fname, file_options, SystemClock::Default().get(),
- nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
- ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
- true));
- Random rnd(301);
- std::string data = rnd.RandomString(1000);
- uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
- fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- Random size_r(47);
- for (int i = 0; i < 2000; i++) {
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
- data_crc32c = crc32c::Value(data.c_str(), data.size());
- ASSERT_OK(
- file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- }
- ASSERT_OK(file_writer->Close(IOOptions()));
- Destroy(options);
- }
- TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) {
- FileOptions file_options = FileOptions();
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- DestroyAndReopen(options);
- std::string fname = dbname_ + "/test_file";
- std::unique_ptr<FSWritableFile> writable_file_ptr;
- ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
- /*dbg*/ nullptr));
- std::unique_ptr<TestFSWritableFile> file;
- file.reset(new TestFSWritableFile(
- fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
- std::unique_ptr<WritableFileWriter> file_writer;
- ImmutableOptions ioptions(options);
- // Enable checksum handoff for this file, but do not enable buffer checksum.
- // So Append with checksum logic will not be triggered
- file_writer.reset(new WritableFileWriter(
- std::move(file), fname, file_options, SystemClock::Default().get(),
- nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
- ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
- false));
- Random rnd(301);
- std::string data = rnd.RandomString(1000);
- uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
- fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- Random size_r(47);
- for (int i = 0; i < 1000; i++) {
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
- data_crc32c = crc32c::Value(data.c_str(), data.size());
- ASSERT_OK(
- file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- }
- ASSERT_OK(file_writer->Close(IOOptions()));
- Destroy(options);
- }
- TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) {
- FileOptions file_options = FileOptions();
- file_options.rate_limiter = nullptr;
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- DestroyAndReopen(options);
- std::string fname = dbname_ + "/test_file";
- std::unique_ptr<FSWritableFile> writable_file_ptr;
- ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
- /*dbg*/ nullptr));
- std::unique_ptr<TestFSWritableFile> file;
- file.reset(new TestFSWritableFile(
- fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
- std::unique_ptr<WritableFileWriter> file_writer;
- ImmutableOptions ioptions(options);
- // Enable checksum handoff for this file, but do not enable buffer checksum.
- // So Append with checksum logic will not be triggered
- file_writer.reset(new WritableFileWriter(
- std::move(file), fname, file_options, SystemClock::Default().get(),
- nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
- ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
- true));
- fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
- Random rnd(301);
- std::string data;
- uint32_t data_crc32c;
- uint64_t start = fault_env_->NowMicros();
- Random size_r(47);
- uint64_t bytes_written = 0;
- for (int i = 0; i < 100; i++) {
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
- data_crc32c = crc32c::Value(data.c_str(), data.size());
- ASSERT_OK(
- file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- bytes_written += static_cast<uint64_t>(data.size());
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- bytes_written += static_cast<uint64_t>(data.size());
- }
- uint64_t elapsed = fault_env_->NowMicros() - start;
- double raw_rate = bytes_written * 1000000.0 / elapsed;
- ASSERT_OK(file_writer->Close(IOOptions()));
- // Set the rate-limiter
- FileOptions file_options1 = FileOptions();
- file_options1.rate_limiter =
- NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate));
- fname = dbname_ + "/test_file_1";
- std::unique_ptr<FSWritableFile> writable_file_ptr1;
- ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1,
- &writable_file_ptr1,
- /*dbg*/ nullptr));
- file.reset(new TestFSWritableFile(
- fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get()));
- // Enable checksum handoff for this file, but do not enable buffer checksum.
- // So Append with checksum logic will not be triggered
- file_writer.reset(new WritableFileWriter(
- std::move(file), fname, file_options1, SystemClock::Default().get(),
- nullptr, ioptions.stats, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
- ioptions.listeners, ioptions.file_checksum_gen_factory.get(), true,
- true));
- for (int i = 0; i < 1000; i++) {
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
- data_crc32c = crc32c::Value(data.c_str(), data.size());
- ASSERT_OK(
- file_writer->Append(IOOptions(), Slice(data.c_str()), data_crc32c));
- data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
- ASSERT_OK(file_writer->Append(IOOptions(), Slice(data.c_str())));
- ASSERT_OK(file_writer->Flush(IOOptions()));
- }
- ASSERT_OK(file_writer->Close(IOOptions()));
- if (file_options1.rate_limiter != nullptr) {
- delete file_options1.rate_limiter;
- }
- Destroy(options);
- }
- TEST_F(WritableFileWriterTest, AppendStatusReturn) {
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
- bool use_direct_io() const override { return use_direct_io_; }
- using FSWritableFile::Append;
- IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- if (io_error_) {
- return IOStatus::IOError("Fake IO error");
- }
- return IOStatus::OK();
- }
- using FSWritableFile::PositionedAppend;
- IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- if (io_error_) {
- return IOStatus::IOError("Fake IO error");
- }
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- void Setuse_direct_io(bool val) { use_direct_io_ = val; }
- void SetIOError(bool val) { io_error_ = val; }
- uint64_t GetFileSize(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return 0;
- }
- protected:
- bool use_direct_io_;
- bool io_error_;
- };
- std::unique_ptr<FakeWF> wf(new FakeWF());
- wf->Setuse_direct_io(true);
- std::unique_ptr<WritableFileWriter> writer(
- new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
- ASSERT_OK(writer->Append(IOOptions(), std::string(2 * kMb, 'a')));
- // Next call to WritableFile::Append() should fail
- FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
- fwf->SetIOError(true);
- ASSERT_NOK(writer->Append(IOOptions(), std::string(2 * kMb, 'b')));
- }
- class ReadaheadRandomAccessFileTest
- : public testing::Test,
- public testing::WithParamInterface<size_t> {
- public:
- static std::vector<size_t> GetReadaheadSizeList() {
- return {1lu << 12, 1lu << 16};
- }
- void SetUp() override {
- readahead_size_ = GetParam();
- scratch_.reset(new char[2 * readahead_size_]);
- ResetSourceStr();
- }
- ReadaheadRandomAccessFileTest() : control_contents_() {}
- std::string Read(uint64_t offset, size_t n) {
- Slice result;
- Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
- scratch_.get(), nullptr);
- EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
- return std::string(result.data(), result.size());
- }
- void ResetSourceStr(const std::string& str = "") {
- std::unique_ptr<FSWritableFile> sink(
- new test::StringSink(&control_contents_));
- std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
- std::move(sink), "" /* don't care */, FileOptions()));
- Status s = write_holder->Append(IOOptions(), Slice(str));
- EXPECT_OK(s);
- s = write_holder->Flush(IOOptions());
- EXPECT_OK(s);
- std::unique_ptr<FSRandomAccessFile> read_holder(
- new test::StringSource(control_contents_));
- test_read_holder_ =
- NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
- }
- size_t GetReadaheadSize() const { return readahead_size_; }
- private:
- size_t readahead_size_;
- Slice control_contents_;
- std::unique_ptr<FSRandomAccessFile> test_read_holder_;
- std::unique_ptr<char[]> scratch_;
- };
- TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
- ASSERT_EQ("", Read(0, 1));
- ASSERT_EQ("", Read(0, 0));
- ASSERT_EQ("", Read(13, 13));
- }
- TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
- std::string str = "abcdefghijklmnopqrs";
- ResetSourceStr(str);
- ASSERT_EQ(str.substr(3, 4), Read(3, 4));
- ASSERT_EQ(str.substr(0, 3), Read(0, 3));
- ASSERT_EQ(str, Read(0, str.size()));
- ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
- Read(7, 30));
- ASSERT_EQ("", Read(100, 100));
- }
- TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
- Random rng(42);
- for (int k = 0; k < 100; ++k) {
- size_t strLen = k * GetReadaheadSize() +
- rng.Uniform(static_cast<int>(GetReadaheadSize()));
- std::string str = rng.HumanReadableString(static_cast<int>(strLen));
- ResetSourceStr(str);
- for (int test = 1; test <= 100; ++test) {
- size_t offset = rng.Uniform(static_cast<int>(strLen));
- size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
- ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
- Read(offset, n));
- }
- }
- }
- TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
- Random rng(7);
- size_t strLen = 4 * GetReadaheadSize() +
- rng.Uniform(static_cast<int>(GetReadaheadSize()));
- std::string str = rng.HumanReadableString(static_cast<int>(strLen));
- ResetSourceStr(str);
- for (int test = 1; test <= 100; ++test) {
- size_t offset = rng.Uniform(static_cast<int>(strLen));
- size_t n =
- GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
- ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
- Read(offset, n));
- }
- }
- INSTANTIATE_TEST_CASE_P(
- EmptySourceStr, ReadaheadRandomAccessFileTest,
- ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
- ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
- ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
- ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
- class ReadaheadSequentialFileTest : public testing::Test,
- public testing::WithParamInterface<size_t> {
- public:
- static std::vector<size_t> GetReadaheadSizeList() {
- return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
- }
- void SetUp() override {
- readahead_size_ = GetParam();
- scratch_.reset(new char[2 * readahead_size_]);
- ResetSourceStr();
- }
- ReadaheadSequentialFileTest() = default;
- std::string Read(size_t n) {
- Slice result;
- Status s = test_read_holder_->Read(
- n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/);
- EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
- return std::string(result.data(), result.size());
- }
- void Skip(size_t n) { test_read_holder_->Skip(n); }
- void ResetSourceStr(const std::string& str = "") {
- auto read_holder = std::unique_ptr<FSSequentialFile>(
- new test::SeqStringSource(str, &seq_read_count_));
- test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
- "test", readahead_size_));
- }
- size_t GetReadaheadSize() const { return readahead_size_; }
- private:
- size_t readahead_size_;
- std::unique_ptr<SequentialFileReader> test_read_holder_;
- std::unique_ptr<char[]> scratch_;
- std::atomic<int> seq_read_count_;
- };
- TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
- ASSERT_EQ("", Read(0));
- ASSERT_EQ("", Read(1));
- ASSERT_EQ("", Read(13));
- }
- TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
- std::string str = "abcdefghijklmnopqrs";
- ResetSourceStr(str);
- ASSERT_EQ(str.substr(0, 3), Read(3));
- ASSERT_EQ(str.substr(3, 1), Read(1));
- ASSERT_EQ(str.substr(4), Read(str.size()));
- ASSERT_EQ("", Read(100));
- }
- TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
- Random rng(42);
- for (int s = 0; s < 1; ++s) {
- for (int k = 0; k < 100; ++k) {
- size_t strLen = k * GetReadaheadSize() +
- rng.Uniform(static_cast<int>(GetReadaheadSize()));
- std::string str = rng.HumanReadableString(static_cast<int>(strLen));
- ResetSourceStr(str);
- size_t offset = 0;
- for (int test = 1; test <= 100; ++test) {
- size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
- if (s && test % 2) {
- Skip(n);
- } else {
- ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
- }
- offset = std::min(offset + n, strLen);
- }
- }
- }
- }
- TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
- Random rng(42);
- for (int s = 0; s < 1; ++s) {
- for (int k = 0; k < 100; ++k) {
- size_t strLen = k * GetReadaheadSize() +
- rng.Uniform(static_cast<int>(GetReadaheadSize()));
- std::string str = rng.HumanReadableString(static_cast<int>(strLen));
- ResetSourceStr(str);
- size_t offset = 0;
- for (int test = 1; test <= 100; ++test) {
- size_t n = GetReadaheadSize() +
- rng.Uniform(static_cast<int>(GetReadaheadSize()));
- if (s && test % 2) {
- Skip(n);
- } else {
- ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
- }
- offset = std::min(offset + n, strLen);
- }
- }
- }
- }
- INSTANTIATE_TEST_CASE_P(
- EmptySourceStr, ReadaheadSequentialFileTest,
- ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
- ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
- ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
- INSTANTIATE_TEST_CASE_P(
- ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
- ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
- namespace {
- std::string GenerateLine(int n) {
- std::string rv;
- // Multiples of 17 characters per line, for likely bad buffer alignment
- for (int i = 0; i < n; ++i) {
- rv.push_back(static_cast<char>('0' + (i % 10)));
- rv.append("xxxxxxxxxxxxxxxx");
- }
- return rv;
- }
- } // namespace
- TEST(LineFileReaderTest, LineFileReaderTest) {
- const int nlines = 1000;
- std::unique_ptr<Env> mem_env(MockEnv::Create(Env::Default()));
- std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
- // Create an input file
- {
- std::unique_ptr<FSWritableFile> file;
- ASSERT_OK(
- fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
- for (int i = 0; i < nlines; ++i) {
- std::string line = GenerateLine(i);
- line.push_back('\n');
- ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
- }
- }
- // Verify with no I/O errors
- {
- std::unique_ptr<LineFileReader> reader;
- ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
- nullptr /* dbg */,
- nullptr /* rate_limiter */));
- std::string line;
- int count = 0;
- while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
- ASSERT_EQ(line, GenerateLine(count));
- ++count;
- ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
- }
- ASSERT_OK(reader->GetStatus());
- ASSERT_EQ(count, nlines);
- ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
- // And still
- ASSERT_FALSE(
- reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
- ASSERT_OK(reader->GetStatus());
- ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
- }
- // Verify with injected I/O error
- {
- std::unique_ptr<LineFileReader> reader;
- ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
- nullptr /* dbg */,
- nullptr /* rate_limiter */));
- std::string line;
- int count = 0;
- // Read part way through the file
- while (count < nlines / 4) {
- ASSERT_TRUE(
- reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
- ASSERT_EQ(line, GenerateLine(count));
- ++count;
- ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
- }
- ASSERT_OK(reader->GetStatus());
- // Inject error
- int callback_count = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "MemFile::Read:IOStatus", [&](void* arg) {
- IOStatus* status = static_cast<IOStatus*>(arg);
- *status = IOStatus::Corruption("test");
- ++callback_count;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
- ASSERT_EQ(line, GenerateLine(count));
- ++count;
- ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
- }
- ASSERT_TRUE(reader->GetStatus().IsCorruption());
- ASSERT_LT(count, nlines / 2);
- ASSERT_EQ(callback_count, 1);
- // Still get error & no retry
- ASSERT_FALSE(
- reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
- ASSERT_TRUE(reader->GetStatus().IsCorruption());
- ASSERT_EQ(callback_count, 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- }
- class IOErrorEventListener : public EventListener {
- public:
- IOErrorEventListener() { notify_error_.store(0); }
- void OnIOError(const IOErrorInfo& io_error_info) override {
- notify_error_++;
- EXPECT_FALSE(io_error_info.file_path.empty());
- EXPECT_FALSE(io_error_info.io_status.ok());
- }
- size_t NotifyErrorCount() { return notify_error_; }
- bool ShouldBeNotifiedOnFileIO() override { return true; }
- private:
- std::atomic<size_t> notify_error_;
- };
- TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF() : io_error_(false) {
- file_append_errors_.store(0);
- file_flush_errors_.store(0);
- }
- using FSWritableFile::Append;
- IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- if (io_error_) {
- file_append_errors_++;
- return IOStatus::IOError("Fake IO error");
- }
- return IOStatus::OK();
- }
- using FSWritableFile::PositionedAppend;
- IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
- const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- if (io_error_) {
- return IOStatus::IOError("Fake IO error");
- }
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- if (io_error_) {
- file_flush_errors_++;
- return IOStatus::IOError("Fake IO error");
- }
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return IOStatus::OK();
- }
- void SetIOError(bool val) { io_error_ = val; }
- void CheckCounters(int file_append_errors, int file_flush_errors) {
- ASSERT_EQ(file_append_errors, file_append_errors_);
- ASSERT_EQ(file_flush_errors_, file_flush_errors);
- }
- uint64_t GetFileSize(const IOOptions& /*options*/,
- IODebugContext* /*dbg*/) override {
- return 0;
- }
- protected:
- bool io_error_;
- std::atomic<size_t> file_append_errors_;
- std::atomic<size_t> file_flush_errors_;
- };
- FileOptions file_options = FileOptions();
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- IOErrorEventListener* listener = new IOErrorEventListener();
- options.listeners.emplace_back(listener);
- DestroyAndReopen(options);
- ImmutableOptions ioptions(options);
- std::string fname = dbname_ + "/test_file";
- std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF);
- std::unique_ptr<WritableFileWriter> file_writer;
- writable_file_ptr->SetIOError(true);
- file_writer.reset(new WritableFileWriter(
- std::move(writable_file_ptr), fname, file_options,
- SystemClock::Default().get(), nullptr, ioptions.stats,
- Histograms::HISTOGRAM_ENUM_MAX /* hist_type */, ioptions.listeners,
- ioptions.file_checksum_gen_factory.get(), true, true));
- FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file());
- fwf->SetIOError(true);
- ASSERT_NOK(file_writer->Append(IOOptions(), std::string(2 * kMb, 'a')));
- fwf->CheckCounters(1, 0);
- ASSERT_EQ(listener->NotifyErrorCount(), 1);
- file_writer->reset_seen_error();
- fwf->SetIOError(true);
- ASSERT_NOK(file_writer->Flush(IOOptions()));
- fwf->CheckCounters(1, 1);
- ASSERT_EQ(listener->NotifyErrorCount(), 2);
- /* No error generation */
- file_writer->reset_seen_error();
- fwf->SetIOError(false);
- ASSERT_OK(file_writer->Append(IOOptions(), std::string(2 * kMb, 'b')));
- ASSERT_EQ(listener->NotifyErrorCount(), 2);
- fwf->CheckCounters(1, 1);
- }
- class WritableFileWriterIOPriorityTest : public testing::Test {
- protected:
- // This test is to check whether the rate limiter priority can be passed
- // correctly from WritableFileWriter functions to FSWritableFile functions.
- void SetUp() override {
- // When op_rate_limiter_priority parameter in WritableFileWriter functions
- // is the default (Env::IO_TOTAL).
- std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)};
- FileOptions file_options;
- writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */,
- file_options));
- }
- class FakeWF : public FSWritableFile {
- public:
- explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); }
- ~FakeWF() override = default;
- IOStatus Append(const Slice& /*data*/, const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Append(const Slice& data, const IOOptions& options,
- const DataVerificationInfo& /* verification_info */,
- IODebugContext* dbg) override {
- return Append(data, options, dbg);
- }
- IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/,
- const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus PositionedAppend(
- const Slice& /* data */, uint64_t /* offset */,
- const IOOptions& options,
- const DataVerificationInfo& /* verification_info */,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Truncate(uint64_t /*size*/, const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- uint64_t GetFileSize(const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return 0;
- }
- void GetPreallocationStatus(size_t* /*block_size*/,
- size_t* /*last_allocated_block*/) override {}
- size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
- return 0;
- }
- IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
- return IOStatus::OK();
- }
- IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
- const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
- const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- return IOStatus::OK();
- }
- void PrepareWrite(size_t /*offset*/, size_t /*len*/,
- const IOOptions& options,
- IODebugContext* /*dbg*/) override {
- EXPECT_EQ(options.rate_limiter_priority, io_priority_);
- }
- bool IsSyncThreadSafe() const override { return true; }
- };
- std::unique_ptr<WritableFileWriter> writer_;
- };
- TEST_F(WritableFileWriterIOPriorityTest, Append) {
- ASSERT_OK(writer_->Append(IOOptions(), Slice("abc")));
- }
- TEST_F(WritableFileWriterIOPriorityTest, Pad) {
- ASSERT_OK(writer_->Pad(IOOptions(), 500, kDefaultPageSize));
- }
- TEST_F(WritableFileWriterIOPriorityTest, Flush) {
- ASSERT_OK(writer_->Flush(IOOptions()));
- }
- TEST_F(WritableFileWriterIOPriorityTest, Close) {
- ASSERT_OK(writer_->Close(IOOptions()));
- }
- TEST_F(WritableFileWriterIOPriorityTest, Sync) {
- ASSERT_OK(writer_->Sync(IOOptions(), false));
- ASSERT_OK(writer_->Sync(IOOptions(), true));
- }
- TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) {
- ASSERT_OK(writer_->SyncWithoutFlush(IOOptions(), false));
- ASSERT_OK(writer_->SyncWithoutFlush(IOOptions(), true));
- }
- TEST_F(WritableFileWriterIOPriorityTest, BasicOp) {
- EnvOptions env_options;
- env_options.bytes_per_sync = kMb;
- std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH));
- std::unique_ptr<WritableFileWriter> writer(
- new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
- Random r(301);
- Status s;
- std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
- for (int i = 0; i < 1000; i++) {
- int skew_limit = (i < 700) ? 10 : 15;
- uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
- s = writer->Append(IOOptions(), Slice(large_buf.get(), num));
- ASSERT_OK(s);
- // Flush in a chance of 1/10.
- if (r.Uniform(10) == 0) {
- s = writer->Flush(IOOptions());
- ASSERT_OK(s);
- }
- }
- s = writer->Close(IOOptions());
- ASSERT_OK(s);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|