block_cache_tier_file.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/persistent_cache/block_cache_tier_file.h"
  6. #ifndef OS_WIN
  7. #include <unistd.h>
  8. #endif
  9. #include <functional>
  10. #include <memory>
  11. #include <vector>
  12. #include "env/composite_env_wrapper.h"
  13. #include "logging/logging.h"
  14. #include "port/port.h"
  15. #include "rocksdb/system_clock.h"
  16. #include "util/crc32c.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. //
  19. // File creation factories
  20. //
  21. Status NewWritableCacheFile(Env* const env, const std::string& filepath,
  22. std::unique_ptr<WritableFile>* file,
  23. const bool use_direct_writes = false) {
  24. EnvOptions opt;
  25. opt.use_direct_writes = use_direct_writes;
  26. Status s = env->NewWritableFile(filepath, file, opt);
  27. return s;
  28. }
  29. Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs,
  30. const std::string& filepath,
  31. std::unique_ptr<FSRandomAccessFile>* file,
  32. const bool use_direct_reads = true) {
  33. assert(fs.get());
  34. FileOptions opt;
  35. opt.use_direct_reads = use_direct_reads;
  36. return fs->NewRandomAccessFile(filepath, opt, file, nullptr);
  37. }
  38. //
  39. // BlockCacheFile
  40. //
  41. Status BlockCacheFile::Delete(uint64_t* size) {
  42. assert(env_);
  43. Status status = env_->GetFileSize(Path(), size);
  44. if (!status.ok()) {
  45. return status;
  46. }
  47. return env_->DeleteFile(Path());
  48. }
  49. //
  50. // CacheRecord
  51. //
  52. // Cache record represents the record on disk
  53. //
  54. // +--------+---------+----------+------------+---------------+-------------+
  55. // | magic | crc | key size | value size | key data | value data |
  56. // +--------+---------+----------+------------+---------------+-------------+
  57. // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
  58. //
  59. struct CacheRecordHeader {
  60. CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
  61. CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
  62. const uint32_t val_size)
  63. : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
  64. uint32_t magic_;
  65. uint32_t crc_;
  66. uint32_t key_size_;
  67. uint32_t val_size_;
  68. };
  69. struct CacheRecord {
  70. CacheRecord() = default;
  71. CacheRecord(const Slice& key, const Slice& val)
  72. : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
  73. static_cast<uint32_t>(val.size())),
  74. key_(key),
  75. val_(val) {
  76. hdr_.crc_ = ComputeCRC();
  77. }
  78. uint32_t ComputeCRC() const;
  79. bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
  80. bool Deserialize(const Slice& buf);
  81. static uint32_t CalcSize(const Slice& key, const Slice& val) {
  82. return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
  83. val.size());
  84. }
  85. static const uint32_t MAGIC = 0xfefa;
  86. bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
  87. const char* data, const size_t size);
  88. CacheRecordHeader hdr_;
  89. Slice key_;
  90. Slice val_;
  91. };
  92. static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
  93. uint32_t CacheRecord::ComputeCRC() const {
  94. uint32_t crc = 0;
  95. CacheRecordHeader tmp = hdr_;
  96. tmp.crc_ = 0;
  97. crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
  98. crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
  99. key_.size());
  100. crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
  101. val_.size());
  102. return crc;
  103. }
  104. bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
  105. size_t* woff) {
  106. assert(bufs->size());
  107. return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
  108. sizeof(hdr_)) &&
  109. Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
  110. key_.size()) &&
  111. Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
  112. val_.size());
  113. }
  114. bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
  115. const char* data, const size_t data_size) {
  116. assert(*woff < bufs->size());
  117. const char* p = data;
  118. size_t size = data_size;
  119. while (size && *woff < bufs->size()) {
  120. CacheWriteBuffer* buf = (*bufs)[*woff];
  121. const size_t free = buf->Free();
  122. if (size <= free) {
  123. buf->Append(p, size);
  124. size = 0;
  125. } else {
  126. buf->Append(p, free);
  127. p += free;
  128. size -= free;
  129. assert(!buf->Free());
  130. assert(buf->Used() == buf->Capacity());
  131. }
  132. if (!buf->Free()) {
  133. *woff += 1;
  134. }
  135. }
  136. assert(!size);
  137. return !size;
  138. }
  139. bool CacheRecord::Deserialize(const Slice& data) {
  140. assert(data.size() >= sizeof(CacheRecordHeader));
  141. if (data.size() < sizeof(CacheRecordHeader)) {
  142. return false;
  143. }
  144. memcpy(&hdr_, data.data(), sizeof(hdr_));
  145. assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
  146. if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
  147. return false;
  148. }
  149. key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
  150. val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
  151. if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
  152. fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
  153. fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
  154. fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
  155. fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
  156. fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
  157. for (size_t i = 0; i < hdr_.val_size_; ++i) {
  158. fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
  159. }
  160. fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
  161. }
  162. assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
  163. return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
  164. }
  165. //
  166. // RandomAccessFile
  167. //
  168. bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
  169. WriteLock _(&rwlock_);
  170. return OpenImpl(enable_direct_reads);
  171. }
  172. bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
  173. rwlock_.AssertHeld();
  174. ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
  175. assert(env_);
  176. std::unique_ptr<FSRandomAccessFile> file;
  177. Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file,
  178. enable_direct_reads);
  179. if (!status.ok()) {
  180. Error(log_, "Error opening random access file %s. %s", Path().c_str(),
  181. status.ToString().c_str());
  182. return false;
  183. }
  184. freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
  185. env_->GetSystemClock().get()));
  186. return true;
  187. }
  188. bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
  189. char* scratch) {
  190. ReadLock _(&rwlock_);
  191. assert(lba.cache_id_ == cache_id_);
  192. if (!freader_) {
  193. return false;
  194. }
  195. Slice result;
  196. Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
  197. nullptr);
  198. if (!s.ok()) {
  199. Error(log_, "Error reading from file %s. %s", Path().c_str(),
  200. s.ToString().c_str());
  201. return false;
  202. }
  203. assert(result.data() == scratch);
  204. return ParseRec(lba, key, val, scratch);
  205. }
  206. bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
  207. char* scratch) {
  208. Slice data(scratch, lba.size_);
  209. CacheRecord rec;
  210. if (!rec.Deserialize(data)) {
  211. assert(!"Error deserializing data");
  212. Error(log_, "Error de-serializing record from file %s off %d",
  213. Path().c_str(), lba.off_);
  214. return false;
  215. }
  216. *key = Slice(rec.key_);
  217. *val = Slice(rec.val_);
  218. return true;
  219. }
  220. //
  221. // WriteableCacheFile
  222. //
  223. WriteableCacheFile::~WriteableCacheFile() {
  224. WriteLock _(&rwlock_);
  225. if (!eof_) {
  226. // This file never flushed. We give priority to shutdown since this is a
  227. // cache
  228. // TODO(krad): Figure a way to flush the pending data
  229. if (file_) {
  230. assert(refs_ == 1);
  231. --refs_;
  232. }
  233. }
  234. assert(!refs_);
  235. ClearBuffers();
  236. }
  237. bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
  238. const bool enable_direct_reads) {
  239. WriteLock _(&rwlock_);
  240. enable_direct_reads_ = enable_direct_reads;
  241. ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
  242. Path().c_str(), max_size_);
  243. assert(env_);
  244. Status s = env_->FileExists(Path());
  245. if (s.ok()) {
  246. ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
  247. s.ToString().c_str());
  248. }
  249. s = NewWritableCacheFile(env_, Path(), &file_);
  250. if (!s.ok()) {
  251. ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
  252. s.ToString().c_str());
  253. return false;
  254. }
  255. assert(!refs_);
  256. ++refs_;
  257. return true;
  258. }
  259. bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
  260. WriteLock _(&rwlock_);
  261. if (eof_) {
  262. // We can't append since the file is full
  263. return false;
  264. }
  265. // estimate the space required to store the (key, val)
  266. uint32_t rec_size = CacheRecord::CalcSize(key, val);
  267. if (!ExpandBuffer(rec_size)) {
  268. // unable to expand the buffer
  269. ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
  270. return false;
  271. }
  272. lba->cache_id_ = cache_id_;
  273. lba->off_ = disk_woff_;
  274. lba->size_ = rec_size;
  275. CacheRecord rec(key, val);
  276. if (!rec.Serialize(&bufs_, &buf_woff_)) {
  277. // unexpected error: unable to serialize the data
  278. assert(!"Error serializing record");
  279. return false;
  280. }
  281. disk_woff_ += rec_size;
  282. eof_ = disk_woff_ >= max_size_;
  283. // dispatch buffer for flush
  284. DispatchBuffer();
  285. return true;
  286. }
  287. bool WriteableCacheFile::ExpandBuffer(const size_t size) {
  288. rwlock_.AssertHeld();
  289. assert(!eof_);
  290. // determine if there is enough space
  291. size_t free = 0; // compute the free space left in buffer
  292. for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
  293. free += bufs_[i]->Free();
  294. if (size <= free) {
  295. // we have enough space in the buffer
  296. return true;
  297. }
  298. }
  299. // expand the buffer until there is enough space to write `size` bytes
  300. assert(free < size);
  301. assert(alloc_);
  302. while (free < size) {
  303. CacheWriteBuffer* const buf = alloc_->Allocate();
  304. if (!buf) {
  305. ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
  306. return false;
  307. }
  308. size_ += static_cast<uint32_t>(buf->Free());
  309. free += buf->Free();
  310. bufs_.push_back(buf);
  311. }
  312. assert(free >= size);
  313. return true;
  314. }
  315. void WriteableCacheFile::DispatchBuffer() {
  316. rwlock_.AssertHeld();
  317. assert(bufs_.size());
  318. assert(buf_doff_ <= buf_woff_);
  319. assert(buf_woff_ <= bufs_.size());
  320. if (pending_ios_) {
  321. return;
  322. }
  323. if (!eof_ && buf_doff_ == buf_woff_) {
  324. // dispatch buffer is pointing to write buffer and we haven't hit eof
  325. return;
  326. }
  327. assert(eof_ || buf_doff_ < buf_woff_);
  328. assert(buf_doff_ < bufs_.size());
  329. assert(file_);
  330. assert(alloc_);
  331. auto* buf = bufs_[buf_doff_];
  332. const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
  333. assert(!buf->Free() ||
  334. (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
  335. // we have reached end of file, and there is space in the last buffer
  336. // pad it with zero for direct IO
  337. buf->FillTrailingZeros();
  338. assert(buf->Used() % kFileAlignmentSize == 0);
  339. writer_->Write(file_.get(), buf, file_off,
  340. std::bind(&WriteableCacheFile::BufferWriteDone, this));
  341. pending_ios_++;
  342. buf_doff_++;
  343. }
  344. void WriteableCacheFile::BufferWriteDone() {
  345. WriteLock _(&rwlock_);
  346. assert(bufs_.size());
  347. pending_ios_--;
  348. if (buf_doff_ < bufs_.size()) {
  349. DispatchBuffer();
  350. }
  351. if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
  352. // end-of-file reached, move to read mode
  353. CloseAndOpenForReading();
  354. }
  355. }
  356. void WriteableCacheFile::CloseAndOpenForReading() {
  357. // Our env abstraction do not allow reading from a file opened for appending
  358. // We need close the file and re-open it for reading
  359. Close();
  360. RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
  361. }
  362. bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
  363. char* scratch) {
  364. rwlock_.AssertHeld();
  365. if (!ReadBuffer(lba, scratch)) {
  366. Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
  367. lba.off_);
  368. return false;
  369. }
  370. return ParseRec(lba, key, block, scratch);
  371. }
  372. bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
  373. rwlock_.AssertHeld();
  374. assert(lba.off_ < disk_woff_);
  375. assert(alloc_);
  376. // we read from the buffers like reading from a flat file. The list of buffers
  377. // are treated as contiguous stream of data
  378. char* tmp = data;
  379. size_t pending_nbytes = lba.size_;
  380. // start buffer
  381. size_t start_idx = lba.off_ / alloc_->BufferSize();
  382. // offset into the start buffer
  383. size_t start_off = lba.off_ % alloc_->BufferSize();
  384. assert(start_idx <= buf_woff_);
  385. for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
  386. assert(i <= buf_woff_);
  387. auto* buf = bufs_[i];
  388. assert(i == buf_woff_ || !buf->Free());
  389. // bytes to write to the buffer
  390. size_t nbytes = pending_nbytes > (buf->Used() - start_off)
  391. ? (buf->Used() - start_off)
  392. : pending_nbytes;
  393. memcpy(tmp, buf->Data() + start_off, nbytes);
  394. // left over to be written
  395. pending_nbytes -= nbytes;
  396. start_off = 0;
  397. tmp += nbytes;
  398. }
  399. assert(!pending_nbytes);
  400. if (pending_nbytes) {
  401. return false;
  402. }
  403. assert(tmp == data + lba.size_);
  404. return true;
  405. }
  406. void WriteableCacheFile::Close() {
  407. rwlock_.AssertHeld();
  408. assert(size_ >= max_size_);
  409. assert(disk_woff_ >= max_size_);
  410. assert(buf_doff_ == bufs_.size());
  411. assert(bufs_.size() - buf_woff_ <= 1);
  412. assert(!pending_ios_);
  413. Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
  414. disk_woff_);
  415. ClearBuffers();
  416. file_.reset();
  417. assert(refs_);
  418. --refs_;
  419. }
  420. void WriteableCacheFile::ClearBuffers() {
  421. assert(alloc_);
  422. for (size_t i = 0; i < bufs_.size(); ++i) {
  423. alloc_->Deallocate(bufs_[i]);
  424. }
  425. bufs_.clear();
  426. }
  427. //
  428. // ThreadedFileWriter implementation
  429. //
  430. ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
  431. const size_t qdepth, const size_t io_size)
  432. : Writer(cache), io_size_(io_size) {
  433. for (size_t i = 0; i < qdepth; ++i) {
  434. port::Thread th(&ThreadedWriter::ThreadMain, this);
  435. threads_.push_back(std::move(th));
  436. }
  437. }
  438. void ThreadedWriter::Stop() {
  439. // notify all threads to exit
  440. for (size_t i = 0; i < threads_.size(); ++i) {
  441. q_.Push(IO(/*signal=*/true));
  442. }
  443. // wait for all threads to exit
  444. for (auto& th : threads_) {
  445. th.join();
  446. assert(!th.joinable());
  447. }
  448. threads_.clear();
  449. }
  450. void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
  451. const uint64_t file_off,
  452. const std::function<void()> callback) {
  453. q_.Push(IO(file, buf, file_off, callback));
  454. }
  455. void ThreadedWriter::ThreadMain() {
  456. while (true) {
  457. // Fetch the IO to process
  458. IO io(q_.Pop());
  459. if (io.signal_) {
  460. // that's secret signal to exit
  461. break;
  462. }
  463. // Reserve space for writing the buffer
  464. while (!cache_->Reserve(io.buf_->Used())) {
  465. // We can fail to reserve space if every file in the system
  466. // is being currently accessed
  467. /* sleep override */
  468. SystemClock::Default()->SleepForMicroseconds(1000000);
  469. }
  470. DispatchIO(io);
  471. io.callback_();
  472. }
  473. }
  474. void ThreadedWriter::DispatchIO(const IO& io) {
  475. size_t written = 0;
  476. while (written < io.buf_->Used()) {
  477. Slice data(io.buf_->Data() + written, io_size_);
  478. Status s = io.file_->Append(data);
  479. assert(s.ok());
  480. if (!s.ok()) {
  481. // That is definite IO error to device. There is not much we can
  482. // do but ignore the failure. This can lead to corruption of data on
  483. // disk, but the cache will skip while reading
  484. fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
  485. }
  486. written += io_size_;
  487. }
  488. }
  489. } // namespace ROCKSDB_NAMESPACE