writable_file_writer.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/writable_file_writer.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "db/version_edit.h"
  13. #include "monitoring/histogram.h"
  14. #include "monitoring/iostats_context_imp.h"
  15. #include "port/port.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/random.h"
  18. #include "util/rate_limiter.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. Status WritableFileWriter::Append(const Slice& data) {
  21. const char* src = data.data();
  22. size_t left = data.size();
  23. Status s;
  24. pending_sync_ = true;
  25. TEST_KILL_RANDOM("WritableFileWriter::Append:0",
  26. rocksdb_kill_odds * REDUCE_ODDS2);
  27. {
  28. IOSTATS_TIMER_GUARD(prepare_write_nanos);
  29. TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
  30. writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
  31. IOOptions(), nullptr);
  32. }
  33. // See whether we need to enlarge the buffer to avoid the flush
  34. if (buf_.Capacity() - buf_.CurrentSize() < left) {
  35. for (size_t cap = buf_.Capacity();
  36. cap < max_buffer_size_; // There is still room to increase
  37. cap *= 2) {
  38. // See whether the next available size is large enough.
  39. // Buffer will never be increased to more than max_buffer_size_.
  40. size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
  41. if (desired_capacity - buf_.CurrentSize() >= left ||
  42. (use_direct_io() && desired_capacity == max_buffer_size_)) {
  43. buf_.AllocateNewBuffer(desired_capacity, true);
  44. break;
  45. }
  46. }
  47. }
  48. // Flush only when buffered I/O
  49. if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
  50. if (buf_.CurrentSize() > 0) {
  51. s = Flush();
  52. if (!s.ok()) {
  53. return s;
  54. }
  55. }
  56. assert(buf_.CurrentSize() == 0);
  57. }
  58. // We never write directly to disk with direct I/O on.
  59. // or we simply use it for its original purpose to accumulate many small
  60. // chunks
  61. if (use_direct_io() || (buf_.Capacity() >= left)) {
  62. while (left > 0) {
  63. size_t appended = buf_.Append(src, left);
  64. left -= appended;
  65. src += appended;
  66. if (left > 0) {
  67. s = Flush();
  68. if (!s.ok()) {
  69. break;
  70. }
  71. }
  72. }
  73. } else {
  74. // Writing directly to file bypassing the buffer
  75. assert(buf_.CurrentSize() == 0);
  76. s = WriteBuffered(src, left);
  77. }
  78. TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
  79. if (s.ok()) {
  80. filesize_ += data.size();
  81. CalculateFileChecksum(data);
  82. }
  83. return s;
  84. }
  85. Status WritableFileWriter::Pad(const size_t pad_bytes) {
  86. assert(pad_bytes < kDefaultPageSize);
  87. size_t left = pad_bytes;
  88. size_t cap = buf_.Capacity() - buf_.CurrentSize();
  89. // Assume pad_bytes is small compared to buf_ capacity. So we always
  90. // use buf_ rather than write directly to file in certain cases like
  91. // Append() does.
  92. while (left) {
  93. size_t append_bytes = std::min(cap, left);
  94. buf_.PadWith(append_bytes, 0);
  95. left -= append_bytes;
  96. if (left > 0) {
  97. Status s = Flush();
  98. if (!s.ok()) {
  99. return s;
  100. }
  101. }
  102. cap = buf_.Capacity() - buf_.CurrentSize();
  103. }
  104. pending_sync_ = true;
  105. filesize_ += pad_bytes;
  106. return Status::OK();
  107. }
  108. Status WritableFileWriter::Close() {
  109. // Do not quit immediately on failure the file MUST be closed
  110. Status s;
  111. // Possible to close it twice now as we MUST close
  112. // in __dtor, simply flushing is not enough
  113. // Windows when pre-allocating does not fill with zeros
  114. // also with unbuffered access we also set the end of data.
  115. if (!writable_file_) {
  116. return s;
  117. }
  118. s = Flush(); // flush cache to OS
  119. Status interim;
  120. // In direct I/O mode we write whole pages so
  121. // we need to let the file know where data ends.
  122. if (use_direct_io()) {
  123. interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
  124. if (interim.ok()) {
  125. interim = writable_file_->Fsync(IOOptions(), nullptr);
  126. }
  127. if (!interim.ok() && s.ok()) {
  128. s = interim;
  129. }
  130. }
  131. TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
  132. interim = writable_file_->Close(IOOptions(), nullptr);
  133. if (!interim.ok() && s.ok()) {
  134. s = interim;
  135. }
  136. writable_file_.reset();
  137. TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
  138. return s;
  139. }
  140. // write out the cached data to the OS cache or storage if direct I/O
  141. // enabled
  142. Status WritableFileWriter::Flush() {
  143. Status s;
  144. TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
  145. rocksdb_kill_odds * REDUCE_ODDS2);
  146. if (buf_.CurrentSize() > 0) {
  147. if (use_direct_io()) {
  148. #ifndef ROCKSDB_LITE
  149. if (pending_sync_) {
  150. s = WriteDirect();
  151. }
  152. #endif // !ROCKSDB_LITE
  153. } else {
  154. s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
  155. }
  156. if (!s.ok()) {
  157. return s;
  158. }
  159. }
  160. s = writable_file_->Flush(IOOptions(), nullptr);
  161. if (!s.ok()) {
  162. return s;
  163. }
  164. // sync OS cache to disk for every bytes_per_sync_
  165. // TODO: give log file and sst file different options (log
  166. // files could be potentially cached in OS for their whole
  167. // life time, thus we might not want to flush at all).
  168. // We try to avoid sync to the last 1MB of data. For two reasons:
  169. // (1) avoid rewrite the same page that is modified later.
  170. // (2) for older version of OS, write can block while writing out
  171. // the page.
  172. // Xfs does neighbor page flushing outside of the specified ranges. We
  173. // need to make sure sync range is far from the write offset.
  174. if (!use_direct_io() && bytes_per_sync_) {
  175. const uint64_t kBytesNotSyncRange =
  176. 1024 * 1024; // recent 1MB is not synced.
  177. const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
  178. if (filesize_ > kBytesNotSyncRange) {
  179. uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
  180. offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
  181. assert(offset_sync_to >= last_sync_size_);
  182. if (offset_sync_to > 0 &&
  183. offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
  184. s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
  185. last_sync_size_ = offset_sync_to;
  186. }
  187. }
  188. }
  189. return s;
  190. }
  191. const char* WritableFileWriter::GetFileChecksumFuncName() const {
  192. if (checksum_func_ != nullptr) {
  193. return checksum_func_->Name();
  194. } else {
  195. return kUnknownFileChecksumFuncName.c_str();
  196. }
  197. }
  198. Status WritableFileWriter::Sync(bool use_fsync) {
  199. Status s = Flush();
  200. if (!s.ok()) {
  201. return s;
  202. }
  203. TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
  204. if (!use_direct_io() && pending_sync_) {
  205. s = SyncInternal(use_fsync);
  206. if (!s.ok()) {
  207. return s;
  208. }
  209. }
  210. TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
  211. pending_sync_ = false;
  212. return Status::OK();
  213. }
  214. Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
  215. if (!writable_file_->IsSyncThreadSafe()) {
  216. return Status::NotSupported(
  217. "Can't WritableFileWriter::SyncWithoutFlush() because "
  218. "WritableFile::IsSyncThreadSafe() is false");
  219. }
  220. TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
  221. Status s = SyncInternal(use_fsync);
  222. TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
  223. return s;
  224. }
  225. Status WritableFileWriter::SyncInternal(bool use_fsync) {
  226. Status s;
  227. IOSTATS_TIMER_GUARD(fsync_nanos);
  228. TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
  229. auto prev_perf_level = GetPerfLevel();
  230. IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
  231. if (use_fsync) {
  232. s = writable_file_->Fsync(IOOptions(), nullptr);
  233. } else {
  234. s = writable_file_->Sync(IOOptions(), nullptr);
  235. }
  236. SetPerfLevel(prev_perf_level);
  237. return s;
  238. }
  239. Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
  240. IOSTATS_TIMER_GUARD(range_sync_nanos);
  241. TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
  242. return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
  243. }
  244. // This method writes to disk the specified data and makes use of the rate
  245. // limiter if available
  246. Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
  247. Status s;
  248. assert(!use_direct_io());
  249. const char* src = data;
  250. size_t left = size;
  251. while (left > 0) {
  252. size_t allowed;
  253. if (rate_limiter_ != nullptr) {
  254. allowed = rate_limiter_->RequestToken(
  255. left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
  256. RateLimiter::OpType::kWrite);
  257. } else {
  258. allowed = left;
  259. }
  260. {
  261. IOSTATS_TIMER_GUARD(write_nanos);
  262. TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
  263. #ifndef ROCKSDB_LITE
  264. FileOperationInfo::TimePoint start_ts;
  265. uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
  266. if (ShouldNotifyListeners()) {
  267. start_ts = std::chrono::system_clock::now();
  268. old_size = next_write_offset_;
  269. }
  270. #endif
  271. {
  272. auto prev_perf_level = GetPerfLevel();
  273. IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
  274. s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
  275. SetPerfLevel(prev_perf_level);
  276. }
  277. #ifndef ROCKSDB_LITE
  278. if (ShouldNotifyListeners()) {
  279. auto finish_ts = std::chrono::system_clock::now();
  280. NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
  281. }
  282. #endif
  283. if (!s.ok()) {
  284. return s;
  285. }
  286. }
  287. IOSTATS_ADD(bytes_written, allowed);
  288. TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
  289. left -= allowed;
  290. src += allowed;
  291. }
  292. buf_.Size(0);
  293. return s;
  294. }
  295. void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
  296. if (checksum_func_ != nullptr) {
  297. if (is_first_checksum_) {
  298. file_checksum_ = checksum_func_->Value(data.data(), data.size());
  299. is_first_checksum_ = false;
  300. } else {
  301. file_checksum_ =
  302. checksum_func_->Extend(file_checksum_, data.data(), data.size());
  303. }
  304. }
  305. }
  306. // This flushes the accumulated data in the buffer. We pad data with zeros if
  307. // necessary to the whole page.
  308. // However, during automatic flushes padding would not be necessary.
  309. // We always use RateLimiter if available. We move (Refit) any buffer bytes
  310. // that are left over the
  311. // whole number of pages to be written again on the next flush because we can
  312. // only write on aligned
  313. // offsets.
  314. #ifndef ROCKSDB_LITE
  315. Status WritableFileWriter::WriteDirect() {
  316. assert(use_direct_io());
  317. Status s;
  318. const size_t alignment = buf_.Alignment();
  319. assert((next_write_offset_ % alignment) == 0);
  320. // Calculate whole page final file advance if all writes succeed
  321. size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
  322. // Calculate the leftover tail, we write it here padded with zeros BUT we
  323. // will write
  324. // it again in the future either on Close() OR when the current whole page
  325. // fills out
  326. size_t leftover_tail = buf_.CurrentSize() - file_advance;
  327. // Round up and pad
  328. buf_.PadToAlignmentWith(0);
  329. const char* src = buf_.BufferStart();
  330. uint64_t write_offset = next_write_offset_;
  331. size_t left = buf_.CurrentSize();
  332. while (left > 0) {
  333. // Check how much is allowed
  334. size_t size;
  335. if (rate_limiter_ != nullptr) {
  336. size = rate_limiter_->RequestToken(left, buf_.Alignment(),
  337. writable_file_->GetIOPriority(),
  338. stats_, RateLimiter::OpType::kWrite);
  339. } else {
  340. size = left;
  341. }
  342. {
  343. IOSTATS_TIMER_GUARD(write_nanos);
  344. TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
  345. FileOperationInfo::TimePoint start_ts;
  346. if (ShouldNotifyListeners()) {
  347. start_ts = std::chrono::system_clock::now();
  348. }
  349. // direct writes must be positional
  350. s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
  351. IOOptions(), nullptr);
  352. if (ShouldNotifyListeners()) {
  353. auto finish_ts = std::chrono::system_clock::now();
  354. NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
  355. }
  356. if (!s.ok()) {
  357. buf_.Size(file_advance + leftover_tail);
  358. return s;
  359. }
  360. }
  361. IOSTATS_ADD(bytes_written, size);
  362. left -= size;
  363. src += size;
  364. write_offset += size;
  365. assert((next_write_offset_ % alignment) == 0);
  366. }
  367. if (s.ok()) {
  368. // Move the tail to the beginning of the buffer
  369. // This never happens during normal Append but rather during
  370. // explicit call to Flush()/Sync() or Close()
  371. buf_.RefitTail(file_advance, leftover_tail);
  372. // This is where we start writing next time which may or not be
  373. // the actual file size on disk. They match if the buffer size
  374. // is a multiple of whole pages otherwise filesize_ is leftover_tail
  375. // behind
  376. next_write_offset_ += file_advance;
  377. }
  378. return s;
  379. }
  380. #endif // !ROCKSDB_LITE
  381. } // namespace ROCKSDB_NAMESPACE