io_posix.h 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <errno.h>
  11. #if defined(ROCKSDB_IOURING_PRESENT)
  12. #include <liburing.h>
  13. #include <sys/uio.h>
  14. #endif
  15. #include <unistd.h>
  16. #include <atomic>
  17. #include <functional>
  18. #include <map>
  19. #include <string>
  20. #include "port/port.h"
  21. #include "rocksdb/env.h"
  22. #include "rocksdb/file_system.h"
  23. #include "rocksdb/io_status.h"
  24. #include "test_util/sync_point.h"
  25. #include "util/mutexlock.h"
  26. #include "util/thread_local.h"
  27. // For non linux platform, the following macros are used only as place
  28. // holder.
  29. #if !(defined OS_LINUX) && !(defined OS_FREEBSD) && !(defined CYGWIN) && \
  30. !(defined OS_AIX) && !(defined OS_ANDROID)
  31. #define POSIX_FADV_NORMAL 0 /* [MC1] no further special treatment */
  32. #define POSIX_FADV_RANDOM 1 /* [MC1] expect random page refs */
  33. #define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
  34. #define POSIX_FADV_WILLNEED 3 /* [MC1] will need these pages */
  35. #define POSIX_FADV_DONTNEED 4 /* [MC1] don't need these pages */
  36. #define POSIX_MADV_NORMAL 0 /* [MC1] no further special treatment */
  37. #define POSIX_MADV_RANDOM 1 /* [MC1] expect random page refs */
  38. #define POSIX_MADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
  39. #define POSIX_MADV_WILLNEED 3 /* [MC1] will need these pages */
  40. #define POSIX_MADV_DONTNEED 4 /* [MC1] don't need these pages */
  41. #endif
  42. namespace ROCKSDB_NAMESPACE {
  43. std::string IOErrorMsg(const std::string& context,
  44. const std::string& file_name);
  45. // file_name can be left empty if it is not unkown.
  46. IOStatus IOError(const std::string& context, const std::string& file_name,
  47. int err_number);
  48. class PosixHelper {
  49. public:
  50. static const std::string& GetLogicalBlockSizeFileName() {
  51. static const std::string kLogicalBlockSizeFileName = "logical_block_size";
  52. return kLogicalBlockSizeFileName;
  53. }
  54. static const std::string& GetMaxSectorsKBFileName() {
  55. static const std::string kMaxSectorsKBFileName = "max_sectors_kb";
  56. return kMaxSectorsKBFileName;
  57. }
  58. static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size);
  59. static size_t GetLogicalBlockSizeOfFd(int fd);
  60. static Status GetLogicalBlockSizeOfDirectory(const std::string& directory,
  61. size_t* size);
  62. static Status GetMaxSectorsKBOfDirectory(const std::string& directory,
  63. size_t* kb);
  64. private:
  65. static const size_t kDefaultMaxSectorsKB = 2 * 1024;
  66. static size_t GetMaxSectorsKBOfFd(int fd);
  67. // Return the value in the specified `file_name` under
  68. // `/sys/block/xxx/queue/` for the device where the file of `fd` is on.
  69. // If not found, then return the specified `default_return_value`
  70. static size_t GetQueueSysfsFileValueOfFd(int fd, const std::string& file_name,
  71. size_t default_return_value);
  72. /// Return the value in the specified `file_name` under
  73. // `/sys/block/xxx/queue/` for the device where `directory` is on.
  74. // If not found, then return the specified `default_return_value`
  75. static Status GetQueueSysfsFileValueofDirectory(const std::string& directory,
  76. const std::string& file_name,
  77. size_t* value);
  78. };
  79. /*
  80. * DirectIOHelper
  81. */
  82. inline bool IsSectorAligned(const size_t off, size_t sector_size) {
  83. assert((sector_size & (sector_size - 1)) == 0);
  84. return (off & (sector_size - 1)) == 0;
  85. }
  86. #ifndef NDEBUG
  87. inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
  88. return uintptr_t(ptr) % sector_size == 0;
  89. }
  90. #endif
  91. #if defined(ROCKSDB_IOURING_PRESENT)
  92. struct Posix_IOHandle {
  93. Posix_IOHandle(struct io_uring* _iu,
  94. std::function<void(FSReadRequest&, void*)> _cb, void* _cb_arg,
  95. uint64_t _offset, size_t _len, char* _scratch,
  96. bool _use_direct_io, size_t _alignment)
  97. : iu(_iu),
  98. cb(_cb),
  99. cb_arg(_cb_arg),
  100. offset(_offset),
  101. len(_len),
  102. scratch(_scratch),
  103. use_direct_io(_use_direct_io),
  104. alignment(_alignment),
  105. is_finished(false),
  106. req_count(0) {}
  107. struct iovec iov;
  108. struct io_uring* iu;
  109. std::function<void(FSReadRequest&, void*)> cb;
  110. void* cb_arg;
  111. uint64_t offset;
  112. size_t len;
  113. char* scratch;
  114. bool use_direct_io;
  115. size_t alignment;
  116. bool is_finished;
  117. // req_count is used by AbortIO API to keep track of number of requests.
  118. uint32_t req_count;
  119. };
  120. inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
  121. size_t len, size_t iov_len, bool async_read,
  122. bool use_direct_io, size_t alignment,
  123. size_t& finished_len, FSReadRequest* req,
  124. size_t& bytes_read, bool& read_again) {
  125. read_again = false;
  126. if (cqe->res < 0) {
  127. req->result = Slice(req->scratch, 0);
  128. req->status = IOError("Req failed", file_name, cqe->res);
  129. } else {
  130. bytes_read = static_cast<size_t>(cqe->res);
  131. TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
  132. if (bytes_read == iov_len) {
  133. req->result = Slice(req->scratch, req->len);
  134. req->status = IOStatus::OK();
  135. } else if (bytes_read == 0) {
  136. /// cqe->res == 0 can means EOF, or can mean partial results. See
  137. // comment
  138. // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
  139. // Fall back to pread in this case.
  140. if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
  141. // Bytes reads don't fill sectors. Should only happen at the end
  142. // of the file.
  143. req->result = Slice(req->scratch, finished_len);
  144. req->status = IOStatus::OK();
  145. } else {
  146. if (async_read) {
  147. // No bytes read. It can means EOF. In case of partial results, it's
  148. // caller responsibility to call read/readasync again.
  149. req->result = Slice(req->scratch, 0);
  150. req->status = IOStatus::OK();
  151. } else {
  152. read_again = true;
  153. }
  154. }
  155. } else if (bytes_read < iov_len) {
  156. assert(bytes_read > 0);
  157. if (async_read) {
  158. req->result = Slice(req->scratch, bytes_read);
  159. req->status = IOStatus::OK();
  160. } else {
  161. assert(bytes_read + finished_len < len);
  162. finished_len += bytes_read;
  163. }
  164. } else {
  165. req->result = Slice(req->scratch, 0);
  166. req->status = IOError("Req returned more bytes than requested", file_name,
  167. cqe->res);
  168. }
  169. }
  170. #ifdef NDEBUG
  171. (void)len;
  172. #endif
  173. }
  174. #endif
  175. #ifdef OS_LINUX
  176. // Files under a specific directory have the same logical block size.
  177. // This class caches the logical block size for the specified directories to
  178. // save the CPU cost of computing the size.
  179. // Safe for concurrent access from multiple threads without any external
  180. // synchronization.
  181. class LogicalBlockSizeCache {
  182. public:
  183. LogicalBlockSizeCache(
  184. std::function<size_t(int)> get_logical_block_size_of_fd =
  185. PosixHelper::GetLogicalBlockSizeOfFd,
  186. std::function<Status(const std::string&, size_t*)>
  187. get_logical_block_size_of_directory =
  188. PosixHelper::GetLogicalBlockSizeOfDirectory)
  189. : get_logical_block_size_of_fd_(get_logical_block_size_of_fd),
  190. get_logical_block_size_of_directory_(
  191. get_logical_block_size_of_directory) {}
  192. // Takes the following actions:
  193. // 1. Increases reference count of the directories;
  194. // 2. If the directory's logical block size is not cached,
  195. // compute the buffer size and cache the result.
  196. Status RefAndCacheLogicalBlockSize(
  197. const std::vector<std::string>& directories);
  198. // Takes the following actions:
  199. // 1. Decreases reference count of the directories;
  200. // 2. If the reference count of a directory reaches 0, remove the directory
  201. // from the cache.
  202. void UnrefAndTryRemoveCachedLogicalBlockSize(
  203. const std::vector<std::string>& directories);
  204. // Returns the logical block size for the file.
  205. //
  206. // If the file is under a cached directory, return the cached size.
  207. // Otherwise, the size is computed.
  208. size_t GetLogicalBlockSize(const std::string& fname, int fd);
  209. int GetRefCount(const std::string& dir) {
  210. ReadLock lock(&cache_mutex_);
  211. auto it = cache_.find(dir);
  212. if (it == cache_.end()) {
  213. return 0;
  214. }
  215. return it->second.ref;
  216. }
  217. size_t Size() const { return cache_.size(); }
  218. bool Contains(const std::string& dir) {
  219. ReadLock lock(&cache_mutex_);
  220. return cache_.find(dir) != cache_.end();
  221. }
  222. private:
  223. struct CacheValue {
  224. CacheValue() : size(0), ref(0) {}
  225. // Logical block size of the directory.
  226. size_t size;
  227. // Reference count of the directory.
  228. int ref;
  229. };
  230. std::function<size_t(int)> get_logical_block_size_of_fd_;
  231. std::function<Status(const std::string&, size_t*)>
  232. get_logical_block_size_of_directory_;
  233. std::map<std::string, CacheValue> cache_;
  234. port::RWMutex cache_mutex_;
  235. };
  236. #endif
  237. class PosixSequentialFile : public FSSequentialFile {
  238. private:
  239. std::string filename_;
  240. FILE* file_;
  241. int fd_;
  242. bool use_direct_io_;
  243. size_t logical_sector_size_;
  244. public:
  245. PosixSequentialFile(const std::string& fname, FILE* file, int fd,
  246. size_t logical_block_size, const EnvOptions& options);
  247. virtual ~PosixSequentialFile();
  248. IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
  249. IODebugContext* dbg) override;
  250. IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
  251. Slice* result, char* scratch,
  252. IODebugContext* dbg) override;
  253. IOStatus Skip(uint64_t n) override;
  254. IOStatus InvalidateCache(size_t offset, size_t length) override;
  255. bool use_direct_io() const override { return use_direct_io_; }
  256. size_t GetRequiredBufferAlignment() const override {
  257. return logical_sector_size_;
  258. }
  259. };
  260. #if defined(ROCKSDB_IOURING_PRESENT)
  261. // io_uring instance queue depth
  262. const unsigned int kIoUringDepth = 256;
  263. inline void DeleteIOUring(void* p) {
  264. struct io_uring* iu = static_cast<struct io_uring*>(p);
  265. delete iu;
  266. }
  267. inline struct io_uring* CreateIOUring() {
  268. struct io_uring* new_io_uring = new struct io_uring;
  269. int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0);
  270. if (ret) {
  271. delete new_io_uring;
  272. new_io_uring = nullptr;
  273. }
  274. return new_io_uring;
  275. }
  276. #endif // defined(ROCKSDB_IOURING_PRESENT)
  277. class PosixRandomAccessFile : public FSRandomAccessFile {
  278. protected:
  279. std::string filename_;
  280. int fd_;
  281. bool use_direct_io_;
  282. size_t logical_sector_size_;
  283. #if defined(ROCKSDB_IOURING_PRESENT)
  284. ThreadLocalPtr* thread_local_io_urings_;
  285. #endif
  286. public:
  287. PosixRandomAccessFile(const std::string& fname, int fd,
  288. size_t logical_block_size, const EnvOptions& options
  289. #if defined(ROCKSDB_IOURING_PRESENT)
  290. ,
  291. ThreadLocalPtr* thread_local_io_urings
  292. #endif
  293. );
  294. virtual ~PosixRandomAccessFile();
  295. IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
  296. char* scratch, IODebugContext* dbg) const override;
  297. IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
  298. const IOOptions& options, IODebugContext* dbg) override;
  299. IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& opts,
  300. IODebugContext* dbg) override;
  301. #if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
  302. size_t GetUniqueId(char* id, size_t max_size) const override;
  303. #endif
  304. void Hint(AccessPattern pattern) override;
  305. IOStatus InvalidateCache(size_t offset, size_t length) override;
  306. bool use_direct_io() const override { return use_direct_io_; }
  307. size_t GetRequiredBufferAlignment() const override {
  308. return logical_sector_size_;
  309. }
  310. virtual IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
  311. std::function<void(FSReadRequest&, void*)> cb,
  312. void* cb_arg, void** io_handle,
  313. IOHandleDeleter* del_fn,
  314. IODebugContext* dbg) override;
  315. virtual IOStatus GetFileSize(uint64_t* result) override;
  316. };
  317. class PosixWritableFile : public FSWritableFile {
  318. protected:
  319. const std::string filename_;
  320. const bool use_direct_io_;
  321. int fd_;
  322. uint64_t filesize_;
  323. size_t logical_sector_size_;
  324. #ifdef ROCKSDB_FALLOCATE_PRESENT
  325. bool allow_fallocate_;
  326. bool fallocate_with_keep_size_;
  327. #endif
  328. #ifdef ROCKSDB_RANGESYNC_PRESENT
  329. // Even if the syscall is present, the filesystem may still not properly
  330. // support it, so we need to do a dynamic check too.
  331. bool sync_file_range_supported_;
  332. #endif // ROCKSDB_RANGESYNC_PRESENT
  333. public:
  334. explicit PosixWritableFile(const std::string& fname, int fd,
  335. size_t logical_block_size,
  336. const EnvOptions& options,
  337. uint64_t initial_file_size);
  338. virtual ~PosixWritableFile();
  339. // Need to implement this so the file is truncated correctly
  340. // with direct I/O
  341. IOStatus Truncate(uint64_t size, const IOOptions& opts,
  342. IODebugContext* dbg) override;
  343. IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
  344. IOStatus Append(const Slice& data, const IOOptions& opts,
  345. IODebugContext* dbg) override;
  346. IOStatus Append(const Slice& data, const IOOptions& opts,
  347. const DataVerificationInfo& /* verification_info */,
  348. IODebugContext* dbg) override {
  349. return Append(data, opts, dbg);
  350. }
  351. IOStatus PositionedAppend(const Slice& data, uint64_t offset,
  352. const IOOptions& opts,
  353. IODebugContext* dbg) override;
  354. IOStatus PositionedAppend(const Slice& data, uint64_t offset,
  355. const IOOptions& opts,
  356. const DataVerificationInfo& /* verification_info */,
  357. IODebugContext* dbg) override {
  358. return PositionedAppend(data, offset, opts, dbg);
  359. }
  360. IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
  361. IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
  362. IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
  363. bool IsSyncThreadSafe() const override;
  364. bool use_direct_io() const override { return use_direct_io_; }
  365. void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override;
  366. uint64_t GetFileSize(const IOOptions& opts, IODebugContext* dbg) override;
  367. IOStatus InvalidateCache(size_t offset, size_t length) override;
  368. size_t GetRequiredBufferAlignment() const override {
  369. return logical_sector_size_;
  370. }
  371. #ifdef ROCKSDB_FALLOCATE_PRESENT
  372. IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& opts,
  373. IODebugContext* dbg) override;
  374. #endif
  375. IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& opts,
  376. IODebugContext* dbg) override;
  377. #ifdef OS_LINUX
  378. size_t GetUniqueId(char* id, size_t max_size) const override;
  379. #endif
  380. };
  381. // mmap() based random-access
  382. class PosixMmapReadableFile : public FSRandomAccessFile {
  383. private:
  384. int fd_;
  385. std::string filename_;
  386. void* mmapped_region_;
  387. size_t length_;
  388. public:
  389. PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
  390. size_t length, const EnvOptions& options);
  391. virtual ~PosixMmapReadableFile();
  392. IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
  393. char* scratch, IODebugContext* dbg) const override;
  394. void Hint(AccessPattern pattern) override;
  395. IOStatus InvalidateCache(size_t offset, size_t length) override;
  396. virtual IOStatus GetFileSize(uint64_t* result) override;
  397. };
  398. class PosixMmapFile : public FSWritableFile {
  399. private:
  400. std::string filename_;
  401. int fd_;
  402. size_t page_size_;
  403. size_t map_size_; // How much extra memory to map at a time
  404. char* base_; // The mapped region
  405. char* limit_; // Limit of the mapped region
  406. char* dst_; // Where to write next (in range [base_,limit_])
  407. char* last_sync_; // Where have we synced up to
  408. uint64_t file_offset_; // Offset of base_ in file
  409. #ifdef ROCKSDB_FALLOCATE_PRESENT
  410. bool allow_fallocate_; // If false, fallocate calls are bypassed
  411. bool fallocate_with_keep_size_;
  412. #endif
  413. // Roundup x to a multiple of y
  414. static size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; }
  415. size_t TruncateToPageBoundary(size_t s) {
  416. s -= (s & (page_size_ - 1));
  417. assert((s % page_size_) == 0);
  418. return s;
  419. }
  420. IOStatus MapNewRegion();
  421. IOStatus UnmapCurrentRegion();
  422. IOStatus Msync();
  423. public:
  424. PosixMmapFile(const std::string& fname, int fd, size_t page_size,
  425. const EnvOptions& options, uint64_t initial_file_size);
  426. ~PosixMmapFile();
  427. // Means Close() will properly take care of truncate
  428. // and it does not need any additional information
  429. IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*opts*/,
  430. IODebugContext* /*dbg*/) override {
  431. return IOStatus::OK();
  432. }
  433. IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
  434. IOStatus Append(const Slice& data, const IOOptions& opts,
  435. IODebugContext* dbg) override;
  436. IOStatus Append(const Slice& data, const IOOptions& opts,
  437. const DataVerificationInfo& /* verification_info */,
  438. IODebugContext* dbg) override {
  439. return Append(data, opts, dbg);
  440. }
  441. IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
  442. IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
  443. IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
  444. uint64_t GetFileSize(const IOOptions& opts, IODebugContext* dbg) override;
  445. IOStatus InvalidateCache(size_t offset, size_t length) override;
  446. #ifdef ROCKSDB_FALLOCATE_PRESENT
  447. IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& opts,
  448. IODebugContext* dbg) override;
  449. #endif
  450. };
  451. class PosixRandomRWFile : public FSRandomRWFile {
  452. public:
  453. explicit PosixRandomRWFile(const std::string& fname, int fd,
  454. const EnvOptions& options);
  455. virtual ~PosixRandomRWFile();
  456. IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& opts,
  457. IODebugContext* dbg) override;
  458. IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
  459. char* scratch, IODebugContext* dbg) const override;
  460. IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
  461. IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
  462. IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
  463. IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
  464. private:
  465. const std::string filename_;
  466. int fd_;
  467. };
  468. struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
  469. PosixMemoryMappedFileBuffer(void* _base, size_t _length)
  470. : MemoryMappedFileBuffer(_base, _length) {}
  471. virtual ~PosixMemoryMappedFileBuffer();
  472. };
  473. class PosixDirectory : public FSDirectory {
  474. public:
  475. explicit PosixDirectory(int fd, const std::string& directory_name);
  476. ~PosixDirectory();
  477. IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
  478. IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
  479. IOStatus FsyncWithDirOptions(
  480. const IOOptions&, IODebugContext*,
  481. const DirFsyncOptions& dir_fsync_options) override;
  482. private:
  483. int fd_;
  484. bool is_btrfs_;
  485. const std::string directory_name_;
  486. };
  487. } // namespace ROCKSDB_NAMESPACE