writable_file_writer.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <atomic>
  11. #include <string>
  12. #include "db/version_edit.h"
  13. #include "env/file_system_tracer.h"
  14. #include "monitoring/thread_status_util.h"
  15. #include "port/port.h"
  16. #include "rocksdb/file_checksum.h"
  17. #include "rocksdb/file_system.h"
  18. #include "rocksdb/io_status.h"
  19. #include "rocksdb/listener.h"
  20. #include "rocksdb/rate_limiter.h"
  21. #include "test_util/sync_point.h"
  22. #include "util/aligned_buffer.h"
  23. #ifndef NDEBUG
  24. #include "utilities/fault_injection_fs.h"
  25. #endif // NDEBUG
  26. namespace ROCKSDB_NAMESPACE {
  27. class Statistics;
  28. class SystemClock;
  29. // WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
  30. // facilities to:
  31. // - Handle Buffered and Direct writes.
  32. // - Rate limit writes.
  33. // - Flush and Sync the data to the underlying filesystem.
  34. // - Notify any interested listeners on the completion of a write.
  35. // - Update IO stats.
  36. class WritableFileWriter {
  37. private:
  38. void NotifyOnFileWriteFinish(
  39. uint64_t offset, size_t length,
  40. const FileOperationInfo::StartTimePoint& start_ts,
  41. const FileOperationInfo::FinishTimePoint& finish_ts,
  42. const IOStatus& io_status) {
  43. FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts,
  44. finish_ts, io_status, temperature_);
  45. info.offset = offset;
  46. info.length = length;
  47. for (auto& listener : listeners_) {
  48. listener->OnFileWriteFinish(info);
  49. }
  50. info.status.PermitUncheckedError();
  51. }
  52. void NotifyOnFileFlushFinish(
  53. const FileOperationInfo::StartTimePoint& start_ts,
  54. const FileOperationInfo::FinishTimePoint& finish_ts,
  55. const IOStatus& io_status) {
  56. FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts,
  57. finish_ts, io_status, temperature_);
  58. for (auto& listener : listeners_) {
  59. listener->OnFileFlushFinish(info);
  60. }
  61. info.status.PermitUncheckedError();
  62. }
  63. void NotifyOnFileSyncFinish(
  64. const FileOperationInfo::StartTimePoint& start_ts,
  65. const FileOperationInfo::FinishTimePoint& finish_ts,
  66. const IOStatus& io_status,
  67. FileOperationType type = FileOperationType::kSync) {
  68. FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status,
  69. temperature_);
  70. for (auto& listener : listeners_) {
  71. listener->OnFileSyncFinish(info);
  72. }
  73. info.status.PermitUncheckedError();
  74. }
  75. void NotifyOnFileRangeSyncFinish(
  76. uint64_t offset, size_t length,
  77. const FileOperationInfo::StartTimePoint& start_ts,
  78. const FileOperationInfo::FinishTimePoint& finish_ts,
  79. const IOStatus& io_status) {
  80. FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts,
  81. finish_ts, io_status, temperature_);
  82. info.offset = offset;
  83. info.length = length;
  84. for (auto& listener : listeners_) {
  85. listener->OnFileRangeSyncFinish(info);
  86. }
  87. info.status.PermitUncheckedError();
  88. }
  89. void NotifyOnFileTruncateFinish(
  90. const FileOperationInfo::StartTimePoint& start_ts,
  91. const FileOperationInfo::FinishTimePoint& finish_ts,
  92. const IOStatus& io_status) {
  93. FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts,
  94. finish_ts, io_status, temperature_);
  95. for (auto& listener : listeners_) {
  96. listener->OnFileTruncateFinish(info);
  97. }
  98. info.status.PermitUncheckedError();
  99. }
  100. void NotifyOnFileCloseFinish(
  101. const FileOperationInfo::StartTimePoint& start_ts,
  102. const FileOperationInfo::FinishTimePoint& finish_ts,
  103. const IOStatus& io_status) {
  104. FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts,
  105. finish_ts, io_status, temperature_);
  106. for (auto& listener : listeners_) {
  107. listener->OnFileCloseFinish(info);
  108. }
  109. info.status.PermitUncheckedError();
  110. }
  111. void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
  112. const std::string& file_path, size_t length = 0,
  113. uint64_t offset = 0) {
  114. if (listeners_.empty()) {
  115. return;
  116. }
  117. IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);
  118. for (auto& listener : listeners_) {
  119. listener->OnIOError(io_error_info);
  120. }
  121. io_error_info.io_status.PermitUncheckedError();
  122. }
  123. bool ShouldNotifyListeners() const { return !listeners_.empty(); }
  124. void UpdateFileChecksum(const Slice& data);
  125. void Crc32cHandoffChecksumCalculation(const char* data, size_t size,
  126. char* buf);
  127. std::string file_name_;
  128. FSWritableFilePtr writable_file_;
  129. SystemClock* clock_;
  130. AlignedBuffer buf_;
  131. size_t max_buffer_size_;
  132. // Actually written data size can be used for truncate
  133. // not counting padding data
  134. std::atomic<uint64_t> filesize_;
  135. std::atomic<uint64_t> flushed_size_;
  136. // This is necessary when we use unbuffered access
  137. // and writes must happen on aligned offsets
  138. // so we need to go back and write that page again
  139. uint64_t next_write_offset_;
  140. bool pending_sync_;
  141. std::atomic<bool> seen_error_;
  142. #ifndef NDEBUG
  143. std::atomic<bool> seen_injected_error_;
  144. #endif // NDEBUG
  145. uint64_t last_sync_size_;
  146. uint64_t bytes_per_sync_;
  147. RateLimiter* rate_limiter_;
  148. Statistics* stats_;
  149. Histograms hist_type_;
  150. std::vector<std::shared_ptr<EventListener>> listeners_;
  151. std::unique_ptr<FileChecksumGenerator> checksum_generator_;
  152. bool checksum_finalized_;
  153. bool perform_data_verification_;
  154. uint32_t buffered_data_crc32c_checksum_;
  155. bool buffered_data_with_checksum_;
  156. Temperature temperature_;
  157. public:
  158. WritableFileWriter(
  159. std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
  160. const FileOptions& options, SystemClock* clock = nullptr,
  161. const std::shared_ptr<IOTracer>& io_tracer = nullptr,
  162. Statistics* stats = nullptr,
  163. Histograms hist_type = Histograms::HISTOGRAM_ENUM_MAX,
  164. const std::vector<std::shared_ptr<EventListener>>& listeners = {},
  165. FileChecksumGenFactory* file_checksum_gen_factory = nullptr,
  166. bool perform_data_verification = false,
  167. bool buffered_data_with_checksum = false)
  168. : file_name_(_file_name),
  169. writable_file_(std::move(file), io_tracer, _file_name),
  170. clock_(clock),
  171. buf_(),
  172. max_buffer_size_(options.writable_file_max_buffer_size),
  173. filesize_(0),
  174. flushed_size_(0),
  175. next_write_offset_(0),
  176. pending_sync_(false),
  177. seen_error_(false),
  178. #ifndef NDEBUG
  179. seen_injected_error_(false),
  180. #endif // NDEBUG
  181. last_sync_size_(0),
  182. bytes_per_sync_(options.bytes_per_sync),
  183. rate_limiter_(options.rate_limiter),
  184. stats_(stats),
  185. hist_type_(hist_type),
  186. listeners_(),
  187. checksum_generator_(nullptr),
  188. checksum_finalized_(false),
  189. perform_data_verification_(perform_data_verification),
  190. buffered_data_crc32c_checksum_(0),
  191. buffered_data_with_checksum_(buffered_data_with_checksum) {
  192. temperature_ = options.temperature;
  193. assert(!use_direct_io() || max_buffer_size_ > 0);
  194. TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
  195. reinterpret_cast<void*>(max_buffer_size_));
  196. buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
  197. buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
  198. std::for_each(listeners.begin(), listeners.end(),
  199. [this](const std::shared_ptr<EventListener>& e) {
  200. if (e->ShouldBeNotifiedOnFileIO()) {
  201. listeners_.emplace_back(e);
  202. }
  203. });
  204. if (file_checksum_gen_factory != nullptr) {
  205. FileChecksumGenContext checksum_gen_context;
  206. checksum_gen_context.file_name = _file_name;
  207. checksum_generator_ =
  208. file_checksum_gen_factory->CreateFileChecksumGenerator(
  209. checksum_gen_context);
  210. }
  211. }
  212. static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
  213. const std::string& fname, const FileOptions& file_opts,
  214. std::unique_ptr<WritableFileWriter>* writer,
  215. IODebugContext* dbg);
  216. static IOStatus PrepareIOOptions(const WriteOptions& wo, IOOptions& opts);
  217. WritableFileWriter(const WritableFileWriter&) = delete;
  218. WritableFileWriter& operator=(const WritableFileWriter&) = delete;
  219. ~WritableFileWriter() {
  220. IOOptions io_options;
  221. #ifndef NDEBUG
  222. // This is needed to pass the IOActivity related checks in stress test.
  223. // See DbStressWritableFileWrapper.
  224. ThreadStatus::OperationType op_type =
  225. ThreadStatusUtil::GetThreadOperation();
  226. io_options.io_activity =
  227. ThreadStatusUtil::TEST_GetExpectedIOActivity(op_type);
  228. #endif
  229. auto s = Close(io_options);
  230. s.PermitUncheckedError();
  231. }
  232. std::string file_name() const { return file_name_; }
  233. // When this Append API is called, if the crc32c_checksum is not provided, we
  234. // will calculate the checksum internally.
  235. IOStatus Append(const IOOptions& opts, const Slice& data,
  236. uint32_t crc32c_checksum = 0);
  237. IOStatus Pad(const IOOptions& opts, const size_t pad_bytes,
  238. const size_t max_pad_size);
  239. IOStatus Flush(const IOOptions& opts);
  240. IOStatus Close(const IOOptions& opts);
  241. IOStatus Sync(const IOOptions& opts, bool use_fsync);
  242. // Sync only the data that was already Flush()ed. Safe to call concurrently
  243. // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
  244. // returns NotSupported status.
  245. IOStatus SyncWithoutFlush(const IOOptions& opts, bool use_fsync);
  246. // Size including unflushed data written to this writer. If the next op is
  247. // a successful Close, the file size will be this.
  248. uint64_t GetFileSize() const {
  249. return filesize_.load(std::memory_order_acquire);
  250. }
  251. // Returns the size of data flushed to the underlying `FSWritableFile`.
  252. // Expected to match `writable_file()->GetFileSize()`.
  253. // The return value can serve as a lower-bound for the amount of data synced
  254. // by a future call to `SyncWithoutFlush()`.
  255. uint64_t GetFlushedSize() const {
  256. return flushed_size_.load(std::memory_order_acquire);
  257. }
  258. IOStatus InvalidateCache(size_t offset, size_t length) {
  259. return writable_file_->InvalidateCache(offset, length);
  260. }
  261. FSWritableFile* writable_file() const { return writable_file_.get(); }
  262. bool use_direct_io() { return writable_file_->use_direct_io(); }
  263. bool BufferIsEmpty() const { return buf_.CurrentSize() == 0; }
  264. bool IsClosed() const { return writable_file_.get() == nullptr; }
  265. void TEST_SetFileChecksumGenerator(
  266. FileChecksumGenerator* checksum_generator) {
  267. checksum_generator_.reset(checksum_generator);
  268. }
  269. std::string GetFileChecksum();
  270. const char* GetFileChecksumFuncName() const;
  271. bool seen_error() const {
  272. return seen_error_.load(std::memory_order_relaxed);
  273. }
  274. // For options of relaxed consistency, users might hope to continue
  275. // operating on the file after an error happens.
  276. void reset_seen_error() {
  277. seen_error_.store(false, std::memory_order_relaxed);
  278. #ifndef NDEBUG
  279. seen_injected_error_.store(false, std::memory_order_relaxed);
  280. #endif // NDEBUG
  281. }
  282. void set_seen_error(const Status& s) {
  283. seen_error_.store(true, std::memory_order_relaxed);
  284. (void)s;
  285. #ifndef NDEBUG
  286. if (s.getState() && std::strstr(s.getState(), "inject")) {
  287. seen_injected_error_.store(true, std::memory_order_relaxed);
  288. }
  289. #endif // NDEBUG
  290. }
  291. #ifndef NDEBUG
  292. bool seen_injected_error() const {
  293. return seen_injected_error_.load(std::memory_order_relaxed);
  294. }
  295. #endif // NDEBUG
  296. // TODO(hx235): store the actual previous error status and return it here
  297. IOStatus GetWriterHasPreviousErrorStatus() {
  298. #ifndef NDEBUG
  299. if (seen_injected_error_.load(std::memory_order_relaxed)) {
  300. std::stringstream msg;
  301. msg << "Writer has previous " << FaultInjectionTestFS::kInjected
  302. << " error.";
  303. return IOStatus::IOError(msg.str());
  304. }
  305. #endif // NDEBUG
  306. return IOStatus::IOError("Writer has previous error.");
  307. }
  308. private:
  309. // Decide the Rate Limiter priority.
  310. static Env::IOPriority DecideRateLimiterPriority(
  311. Env::IOPriority writable_file_io_priority,
  312. Env::IOPriority op_rate_limiter_priority);
  313. // Used when os buffering is OFF and we are writing
  314. // DMA such as in Direct I/O mode
  315. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  316. IOStatus WriteDirect(const IOOptions& opts);
  317. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  318. IOStatus WriteDirectWithChecksum(const IOOptions& opts);
  319. // Normal write.
  320. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  321. IOStatus WriteBuffered(const IOOptions& opts, const char* data, size_t size);
  322. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  323. IOStatus WriteBufferedWithChecksum(const IOOptions& opts, const char* data,
  324. size_t size);
  325. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  326. IOStatus RangeSync(const IOOptions& opts, uint64_t offset, uint64_t nbytes);
  327. // `opts` should've been called with `FinalizeIOOptions()` before passing in
  328. IOStatus SyncInternal(const IOOptions& opts, bool use_fsync);
  329. IOOptions FinalizeIOOptions(const IOOptions& opts) const;
  330. };
  331. } // namespace ROCKSDB_NAMESPACE