| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "file/writable_file_writer.h"
- #include <algorithm>
- #include <mutex>
- #include "db/version_edit.h"
- #include "monitoring/histogram.h"
- #include "monitoring/iostats_context_imp.h"
- #include "port/port.h"
- #include "test_util/sync_point.h"
- #include "util/random.h"
- #include "util/rate_limiter.h"
- namespace ROCKSDB_NAMESPACE {
- Status WritableFileWriter::Append(const Slice& data) {
- const char* src = data.data();
- size_t left = data.size();
- Status s;
- pending_sync_ = true;
- TEST_KILL_RANDOM("WritableFileWriter::Append:0",
- rocksdb_kill_odds * REDUCE_ODDS2);
- {
- IOSTATS_TIMER_GUARD(prepare_write_nanos);
- TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
- writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
- IOOptions(), nullptr);
- }
- // See whether we need to enlarge the buffer to avoid the flush
- if (buf_.Capacity() - buf_.CurrentSize() < left) {
- for (size_t cap = buf_.Capacity();
- cap < max_buffer_size_; // There is still room to increase
- cap *= 2) {
- // See whether the next available size is large enough.
- // Buffer will never be increased to more than max_buffer_size_.
- size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
- if (desired_capacity - buf_.CurrentSize() >= left ||
- (use_direct_io() && desired_capacity == max_buffer_size_)) {
- buf_.AllocateNewBuffer(desired_capacity, true);
- break;
- }
- }
- }
- // Flush only when buffered I/O
- if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
- if (buf_.CurrentSize() > 0) {
- s = Flush();
- if (!s.ok()) {
- return s;
- }
- }
- assert(buf_.CurrentSize() == 0);
- }
- // We never write directly to disk with direct I/O on.
- // or we simply use it for its original purpose to accumulate many small
- // chunks
- if (use_direct_io() || (buf_.Capacity() >= left)) {
- while (left > 0) {
- size_t appended = buf_.Append(src, left);
- left -= appended;
- src += appended;
- if (left > 0) {
- s = Flush();
- if (!s.ok()) {
- break;
- }
- }
- }
- } else {
- // Writing directly to file bypassing the buffer
- assert(buf_.CurrentSize() == 0);
- s = WriteBuffered(src, left);
- }
- TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
- if (s.ok()) {
- filesize_ += data.size();
- CalculateFileChecksum(data);
- }
- return s;
- }
- Status WritableFileWriter::Pad(const size_t pad_bytes) {
- assert(pad_bytes < kDefaultPageSize);
- size_t left = pad_bytes;
- size_t cap = buf_.Capacity() - buf_.CurrentSize();
- // Assume pad_bytes is small compared to buf_ capacity. So we always
- // use buf_ rather than write directly to file in certain cases like
- // Append() does.
- while (left) {
- size_t append_bytes = std::min(cap, left);
- buf_.PadWith(append_bytes, 0);
- left -= append_bytes;
- if (left > 0) {
- Status s = Flush();
- if (!s.ok()) {
- return s;
- }
- }
- cap = buf_.Capacity() - buf_.CurrentSize();
- }
- pending_sync_ = true;
- filesize_ += pad_bytes;
- return Status::OK();
- }
- Status WritableFileWriter::Close() {
- // Do not quit immediately on failure the file MUST be closed
- Status s;
- // Possible to close it twice now as we MUST close
- // in __dtor, simply flushing is not enough
- // Windows when pre-allocating does not fill with zeros
- // also with unbuffered access we also set the end of data.
- if (!writable_file_) {
- return s;
- }
- s = Flush(); // flush cache to OS
- Status interim;
- // In direct I/O mode we write whole pages so
- // we need to let the file know where data ends.
- if (use_direct_io()) {
- interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
- if (interim.ok()) {
- interim = writable_file_->Fsync(IOOptions(), nullptr);
- }
- if (!interim.ok() && s.ok()) {
- s = interim;
- }
- }
- TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
- interim = writable_file_->Close(IOOptions(), nullptr);
- if (!interim.ok() && s.ok()) {
- s = interim;
- }
- writable_file_.reset();
- TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
- return s;
- }
- // write out the cached data to the OS cache or storage if direct I/O
- // enabled
- Status WritableFileWriter::Flush() {
- Status s;
- TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
- rocksdb_kill_odds * REDUCE_ODDS2);
- if (buf_.CurrentSize() > 0) {
- if (use_direct_io()) {
- #ifndef ROCKSDB_LITE
- if (pending_sync_) {
- s = WriteDirect();
- }
- #endif // !ROCKSDB_LITE
- } else {
- s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
- }
- if (!s.ok()) {
- return s;
- }
- }
- s = writable_file_->Flush(IOOptions(), nullptr);
- if (!s.ok()) {
- return s;
- }
- // sync OS cache to disk for every bytes_per_sync_
- // TODO: give log file and sst file different options (log
- // files could be potentially cached in OS for their whole
- // life time, thus we might not want to flush at all).
- // We try to avoid sync to the last 1MB of data. For two reasons:
- // (1) avoid rewrite the same page that is modified later.
- // (2) for older version of OS, write can block while writing out
- // the page.
- // Xfs does neighbor page flushing outside of the specified ranges. We
- // need to make sure sync range is far from the write offset.
- if (!use_direct_io() && bytes_per_sync_) {
- const uint64_t kBytesNotSyncRange =
- 1024 * 1024; // recent 1MB is not synced.
- const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
- if (filesize_ > kBytesNotSyncRange) {
- uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
- offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
- assert(offset_sync_to >= last_sync_size_);
- if (offset_sync_to > 0 &&
- offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
- s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
- last_sync_size_ = offset_sync_to;
- }
- }
- }
- return s;
- }
- const char* WritableFileWriter::GetFileChecksumFuncName() const {
- if (checksum_func_ != nullptr) {
- return checksum_func_->Name();
- } else {
- return kUnknownFileChecksumFuncName.c_str();
- }
- }
- Status WritableFileWriter::Sync(bool use_fsync) {
- Status s = Flush();
- if (!s.ok()) {
- return s;
- }
- TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
- if (!use_direct_io() && pending_sync_) {
- s = SyncInternal(use_fsync);
- if (!s.ok()) {
- return s;
- }
- }
- TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
- pending_sync_ = false;
- return Status::OK();
- }
- Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
- if (!writable_file_->IsSyncThreadSafe()) {
- return Status::NotSupported(
- "Can't WritableFileWriter::SyncWithoutFlush() because "
- "WritableFile::IsSyncThreadSafe() is false");
- }
- TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
- Status s = SyncInternal(use_fsync);
- TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
- return s;
- }
- Status WritableFileWriter::SyncInternal(bool use_fsync) {
- Status s;
- IOSTATS_TIMER_GUARD(fsync_nanos);
- TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
- if (use_fsync) {
- s = writable_file_->Fsync(IOOptions(), nullptr);
- } else {
- s = writable_file_->Sync(IOOptions(), nullptr);
- }
- SetPerfLevel(prev_perf_level);
- return s;
- }
- Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
- IOSTATS_TIMER_GUARD(range_sync_nanos);
- TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
- return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
- }
- // This method writes to disk the specified data and makes use of the rate
- // limiter if available
- Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
- Status s;
- assert(!use_direct_io());
- const char* src = data;
- size_t left = size;
- while (left > 0) {
- size_t allowed;
- if (rate_limiter_ != nullptr) {
- allowed = rate_limiter_->RequestToken(
- left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
- RateLimiter::OpType::kWrite);
- } else {
- allowed = left;
- }
- {
- IOSTATS_TIMER_GUARD(write_nanos);
- TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
- #ifndef ROCKSDB_LITE
- FileOperationInfo::TimePoint start_ts;
- uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
- if (ShouldNotifyListeners()) {
- start_ts = std::chrono::system_clock::now();
- old_size = next_write_offset_;
- }
- #endif
- {
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
- s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
- SetPerfLevel(prev_perf_level);
- }
- #ifndef ROCKSDB_LITE
- if (ShouldNotifyListeners()) {
- auto finish_ts = std::chrono::system_clock::now();
- NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
- }
- #endif
- if (!s.ok()) {
- return s;
- }
- }
- IOSTATS_ADD(bytes_written, allowed);
- TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
- left -= allowed;
- src += allowed;
- }
- buf_.Size(0);
- return s;
- }
- void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
- if (checksum_func_ != nullptr) {
- if (is_first_checksum_) {
- file_checksum_ = checksum_func_->Value(data.data(), data.size());
- is_first_checksum_ = false;
- } else {
- file_checksum_ =
- checksum_func_->Extend(file_checksum_, data.data(), data.size());
- }
- }
- }
- // This flushes the accumulated data in the buffer. We pad data with zeros if
- // necessary to the whole page.
- // However, during automatic flushes padding would not be necessary.
- // We always use RateLimiter if available. We move (Refit) any buffer bytes
- // that are left over the
- // whole number of pages to be written again on the next flush because we can
- // only write on aligned
- // offsets.
- #ifndef ROCKSDB_LITE
- Status WritableFileWriter::WriteDirect() {
- assert(use_direct_io());
- Status s;
- const size_t alignment = buf_.Alignment();
- assert((next_write_offset_ % alignment) == 0);
- // Calculate whole page final file advance if all writes succeed
- size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
- // Calculate the leftover tail, we write it here padded with zeros BUT we
- // will write
- // it again in the future either on Close() OR when the current whole page
- // fills out
- size_t leftover_tail = buf_.CurrentSize() - file_advance;
- // Round up and pad
- buf_.PadToAlignmentWith(0);
- const char* src = buf_.BufferStart();
- uint64_t write_offset = next_write_offset_;
- size_t left = buf_.CurrentSize();
- while (left > 0) {
- // Check how much is allowed
- size_t size;
- if (rate_limiter_ != nullptr) {
- size = rate_limiter_->RequestToken(left, buf_.Alignment(),
- writable_file_->GetIOPriority(),
- stats_, RateLimiter::OpType::kWrite);
- } else {
- size = left;
- }
- {
- IOSTATS_TIMER_GUARD(write_nanos);
- TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
- FileOperationInfo::TimePoint start_ts;
- if (ShouldNotifyListeners()) {
- start_ts = std::chrono::system_clock::now();
- }
- // direct writes must be positional
- s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
- IOOptions(), nullptr);
- if (ShouldNotifyListeners()) {
- auto finish_ts = std::chrono::system_clock::now();
- NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
- }
- if (!s.ok()) {
- buf_.Size(file_advance + leftover_tail);
- return s;
- }
- }
- IOSTATS_ADD(bytes_written, size);
- left -= size;
- src += size;
- write_offset += size;
- assert((next_write_offset_ % alignment) == 0);
- }
- if (s.ok()) {
- // Move the tail to the beginning of the buffer
- // This never happens during normal Append but rather during
- // explicit call to Flush()/Sync() or Close()
- buf_.RefitTail(file_advance, leftover_tail);
- // This is where we start writing next time which may or not be
- // the actual file size on disk. They match if the buffer size
- // is a multiple of whole pages otherwise filesize_ is leftover_tail
- // behind
- next_write_offset_ += file_advance;
- }
- return s;
- }
- #endif // !ROCKSDB_LITE
- } // namespace ROCKSDB_NAMESPACE
|