writable_file_writer.h 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <string>
  12. #include "db/version_edit.h"
  13. #include "port/port.h"
  14. #include "rocksdb/env.h"
  15. #include "rocksdb/file_checksum.h"
  16. #include "rocksdb/file_system.h"
  17. #include "rocksdb/listener.h"
  18. #include "rocksdb/rate_limiter.h"
  19. #include "test_util/sync_point.h"
  20. #include "util/aligned_buffer.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. class Statistics;
  23. // WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
  24. // facilities to:
  25. // - Handle Buffered and Direct writes.
  26. // - Rate limit writes.
  27. // - Flush and Sync the data to the underlying filesystem.
  28. // - Notify any interested listeners on the completion of a write.
  29. // - Update IO stats.
  30. class WritableFileWriter {
  31. private:
  32. #ifndef ROCKSDB_LITE
  33. void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
  34. const FileOperationInfo::TimePoint& start_ts,
  35. const FileOperationInfo::TimePoint& finish_ts,
  36. const Status& status) {
  37. FileOperationInfo info(file_name_, start_ts, finish_ts);
  38. info.offset = offset;
  39. info.length = length;
  40. info.status = status;
  41. for (auto& listener : listeners_) {
  42. listener->OnFileWriteFinish(info);
  43. }
  44. }
  45. #endif // ROCKSDB_LITE
  46. bool ShouldNotifyListeners() const { return !listeners_.empty(); }
  47. void CalculateFileChecksum(const Slice& data);
  48. std::unique_ptr<FSWritableFile> writable_file_;
  49. std::string file_name_;
  50. Env* env_;
  51. AlignedBuffer buf_;
  52. size_t max_buffer_size_;
  53. // Actually written data size can be used for truncate
  54. // not counting padding data
  55. uint64_t filesize_;
  56. #ifndef ROCKSDB_LITE
  57. // This is necessary when we use unbuffered access
  58. // and writes must happen on aligned offsets
  59. // so we need to go back and write that page again
  60. uint64_t next_write_offset_;
  61. #endif // ROCKSDB_LITE
  62. bool pending_sync_;
  63. uint64_t last_sync_size_;
  64. uint64_t bytes_per_sync_;
  65. RateLimiter* rate_limiter_;
  66. Statistics* stats_;
  67. std::vector<std::shared_ptr<EventListener>> listeners_;
  68. FileChecksumFunc* checksum_func_;
  69. std::string file_checksum_ = kUnknownFileChecksum;
  70. bool is_first_checksum_ = true;
  71. public:
  72. WritableFileWriter(
  73. std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
  74. const FileOptions& options, Env* env = nullptr,
  75. Statistics* stats = nullptr,
  76. const std::vector<std::shared_ptr<EventListener>>& listeners = {},
  77. FileChecksumFunc* checksum_func = nullptr)
  78. : writable_file_(std::move(file)),
  79. file_name_(_file_name),
  80. env_(env),
  81. buf_(),
  82. max_buffer_size_(options.writable_file_max_buffer_size),
  83. filesize_(0),
  84. #ifndef ROCKSDB_LITE
  85. next_write_offset_(0),
  86. #endif // ROCKSDB_LITE
  87. pending_sync_(false),
  88. last_sync_size_(0),
  89. bytes_per_sync_(options.bytes_per_sync),
  90. rate_limiter_(options.rate_limiter),
  91. stats_(stats),
  92. listeners_(),
  93. checksum_func_(checksum_func) {
  94. TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
  95. reinterpret_cast<void*>(max_buffer_size_));
  96. buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
  97. buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
  98. #ifndef ROCKSDB_LITE
  99. std::for_each(listeners.begin(), listeners.end(),
  100. [this](const std::shared_ptr<EventListener>& e) {
  101. if (e->ShouldBeNotifiedOnFileIO()) {
  102. listeners_.emplace_back(e);
  103. }
  104. });
  105. #else // !ROCKSDB_LITE
  106. (void)listeners;
  107. #endif
  108. }
  109. WritableFileWriter(const WritableFileWriter&) = delete;
  110. WritableFileWriter& operator=(const WritableFileWriter&) = delete;
  111. ~WritableFileWriter() { Close(); }
  112. std::string file_name() const { return file_name_; }
  113. Status Append(const Slice& data);
  114. Status Pad(const size_t pad_bytes);
  115. Status Flush();
  116. Status Close();
  117. Status Sync(bool use_fsync);
  118. // Sync only the data that was already Flush()ed. Safe to call concurrently
  119. // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
  120. // returns NotSupported status.
  121. Status SyncWithoutFlush(bool use_fsync);
  122. uint64_t GetFileSize() const { return filesize_; }
  123. Status InvalidateCache(size_t offset, size_t length) {
  124. return writable_file_->InvalidateCache(offset, length);
  125. }
  126. FSWritableFile* writable_file() const { return writable_file_.get(); }
  127. bool use_direct_io() { return writable_file_->use_direct_io(); }
  128. bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
  129. void TEST_SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
  130. checksum_func_ = checksum_func;
  131. }
  132. const std::string& GetFileChecksum() const { return file_checksum_; }
  133. const char* GetFileChecksumFuncName() const;
  134. private:
  135. // Used when os buffering is OFF and we are writing
  136. // DMA such as in Direct I/O mode
  137. #ifndef ROCKSDB_LITE
  138. Status WriteDirect();
  139. #endif // !ROCKSDB_LITE
  140. // Normal write
  141. Status WriteBuffered(const char* data, size_t size);
  142. Status RangeSync(uint64_t offset, uint64_t nbytes);
  143. Status SyncInternal(bool use_fsync);
  144. };
  145. } // namespace ROCKSDB_NAMESPACE