| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- // Copyright (c) 2013, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #include "utilities/persistent_cache/block_cache_tier_file.h"
- #ifndef OS_WIN
- #include <unistd.h>
- #endif
- #include <functional>
- #include <memory>
- #include <vector>
- #include "env/composite_env_wrapper.h"
- #include "logging/logging.h"
- #include "port/port.h"
- #include "util/crc32c.h"
- namespace ROCKSDB_NAMESPACE {
- //
- // File creation factories
- //
- Status NewWritableCacheFile(Env* const env, const std::string& filepath,
- std::unique_ptr<WritableFile>* file,
- const bool use_direct_writes = false) {
- EnvOptions opt;
- opt.use_direct_writes = use_direct_writes;
- Status s = env->NewWritableFile(filepath, file, opt);
- return s;
- }
- Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
- std::unique_ptr<RandomAccessFile>* file,
- const bool use_direct_reads = true) {
- assert(env);
- EnvOptions opt;
- opt.use_direct_reads = use_direct_reads;
- Status s = env->NewRandomAccessFile(filepath, file, opt);
- return s;
- }
- //
- // BlockCacheFile
- //
- Status BlockCacheFile::Delete(uint64_t* size) {
- assert(env_);
- Status status = env_->GetFileSize(Path(), size);
- if (!status.ok()) {
- return status;
- }
- return env_->DeleteFile(Path());
- }
- //
- // CacheRecord
- //
- // Cache record represents the record on disk
- //
- // +--------+---------+----------+------------+---------------+-------------+
- // | magic | crc | key size | value size | key data | value data |
- // +--------+---------+----------+------------+---------------+-------------+
- // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
- //
- struct CacheRecordHeader {
- CacheRecordHeader()
- : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
- CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
- const uint32_t val_size)
- : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
- uint32_t magic_;
- uint32_t crc_;
- uint32_t key_size_;
- uint32_t val_size_;
- };
- struct CacheRecord {
- CacheRecord() {}
- CacheRecord(const Slice& key, const Slice& val)
- : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
- static_cast<uint32_t>(val.size())),
- key_(key),
- val_(val) {
- hdr_.crc_ = ComputeCRC();
- }
- uint32_t ComputeCRC() const;
- bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
- bool Deserialize(const Slice& buf);
- static uint32_t CalcSize(const Slice& key, const Slice& val) {
- return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
- val.size());
- }
- static const uint32_t MAGIC = 0xfefa;
- bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
- const char* data, const size_t size);
- CacheRecordHeader hdr_;
- Slice key_;
- Slice val_;
- };
- static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
- uint32_t CacheRecord::ComputeCRC() const {
- uint32_t crc = 0;
- CacheRecordHeader tmp = hdr_;
- tmp.crc_ = 0;
- crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
- crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
- key_.size());
- crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
- val_.size());
- return crc;
- }
- bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
- size_t* woff) {
- assert(bufs->size());
- return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
- sizeof(hdr_)) &&
- Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
- key_.size()) &&
- Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
- val_.size());
- }
- bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
- const char* data, const size_t data_size) {
- assert(*woff < bufs->size());
- const char* p = data;
- size_t size = data_size;
- while (size && *woff < bufs->size()) {
- CacheWriteBuffer* buf = (*bufs)[*woff];
- const size_t free = buf->Free();
- if (size <= free) {
- buf->Append(p, size);
- size = 0;
- } else {
- buf->Append(p, free);
- p += free;
- size -= free;
- assert(!buf->Free());
- assert(buf->Used() == buf->Capacity());
- }
- if (!buf->Free()) {
- *woff += 1;
- }
- }
- assert(!size);
- return !size;
- }
- bool CacheRecord::Deserialize(const Slice& data) {
- assert(data.size() >= sizeof(CacheRecordHeader));
- if (data.size() < sizeof(CacheRecordHeader)) {
- return false;
- }
- memcpy(&hdr_, data.data(), sizeof(hdr_));
- assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
- if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
- return false;
- }
- key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
- val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
- if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
- fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
- fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
- fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
- fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
- fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
- for (size_t i = 0; i < hdr_.val_size_; ++i) {
- fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
- }
- fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
- }
- assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
- return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
- }
- //
- // RandomAccessFile
- //
- bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
- WriteLock _(&rwlock_);
- return OpenImpl(enable_direct_reads);
- }
- bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
- rwlock_.AssertHeld();
- ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
- std::unique_ptr<RandomAccessFile> file;
- Status status =
- NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
- if (!status.ok()) {
- Error(log_, "Error opening random access file %s. %s", Path().c_str(),
- status.ToString().c_str());
- return false;
- }
- freader_.reset(new RandomAccessFileReader(
- NewLegacyRandomAccessFileWrapper(file), Path(), env_));
- return true;
- }
- bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
- char* scratch) {
- ReadLock _(&rwlock_);
- assert(lba.cache_id_ == cache_id_);
- if (!freader_) {
- return false;
- }
- Slice result;
- Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
- if (!s.ok()) {
- Error(log_, "Error reading from file %s. %s", Path().c_str(),
- s.ToString().c_str());
- return false;
- }
- assert(result.data() == scratch);
- return ParseRec(lba, key, val, scratch);
- }
- bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
- char* scratch) {
- Slice data(scratch, lba.size_);
- CacheRecord rec;
- if (!rec.Deserialize(data)) {
- assert(!"Error deserializing data");
- Error(log_, "Error de-serializing record from file %s off %d",
- Path().c_str(), lba.off_);
- return false;
- }
- *key = Slice(rec.key_);
- *val = Slice(rec.val_);
- return true;
- }
- //
- // WriteableCacheFile
- //
- WriteableCacheFile::~WriteableCacheFile() {
- WriteLock _(&rwlock_);
- if (!eof_) {
- // This file never flushed. We give priority to shutdown since this is a
- // cache
- // TODO(krad): Figure a way to flush the pending data
- if (file_) {
- assert(refs_ == 1);
- --refs_;
- }
- }
- assert(!refs_);
- ClearBuffers();
- }
- bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
- const bool enable_direct_reads) {
- WriteLock _(&rwlock_);
- enable_direct_reads_ = enable_direct_reads;
- ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
- Path().c_str(), max_size_);
- assert(env_);
- Status s = env_->FileExists(Path());
- if (s.ok()) {
- ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
- s.ToString().c_str());
- }
- s = NewWritableCacheFile(env_, Path(), &file_);
- if (!s.ok()) {
- ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
- s.ToString().c_str());
- return false;
- }
- assert(!refs_);
- ++refs_;
- return true;
- }
- bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
- WriteLock _(&rwlock_);
- if (eof_) {
- // We can't append since the file is full
- return false;
- }
- // estimate the space required to store the (key, val)
- uint32_t rec_size = CacheRecord::CalcSize(key, val);
- if (!ExpandBuffer(rec_size)) {
- // unable to expand the buffer
- ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
- return false;
- }
- lba->cache_id_ = cache_id_;
- lba->off_ = disk_woff_;
- lba->size_ = rec_size;
- CacheRecord rec(key, val);
- if (!rec.Serialize(&bufs_, &buf_woff_)) {
- // unexpected error: unable to serialize the data
- assert(!"Error serializing record");
- return false;
- }
- disk_woff_ += rec_size;
- eof_ = disk_woff_ >= max_size_;
- // dispatch buffer for flush
- DispatchBuffer();
- return true;
- }
- bool WriteableCacheFile::ExpandBuffer(const size_t size) {
- rwlock_.AssertHeld();
- assert(!eof_);
- // determine if there is enough space
- size_t free = 0; // compute the free space left in buffer
- for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
- free += bufs_[i]->Free();
- if (size <= free) {
- // we have enough space in the buffer
- return true;
- }
- }
- // expand the buffer until there is enough space to write `size` bytes
- assert(free < size);
- assert(alloc_);
- while (free < size) {
- CacheWriteBuffer* const buf = alloc_->Allocate();
- if (!buf) {
- ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
- return false;
- }
- size_ += static_cast<uint32_t>(buf->Free());
- free += buf->Free();
- bufs_.push_back(buf);
- }
- assert(free >= size);
- return true;
- }
- void WriteableCacheFile::DispatchBuffer() {
- rwlock_.AssertHeld();
- assert(bufs_.size());
- assert(buf_doff_ <= buf_woff_);
- assert(buf_woff_ <= bufs_.size());
- if (pending_ios_) {
- return;
- }
- if (!eof_ && buf_doff_ == buf_woff_) {
- // dispatch buffer is pointing to write buffer and we haven't hit eof
- return;
- }
- assert(eof_ || buf_doff_ < buf_woff_);
- assert(buf_doff_ < bufs_.size());
- assert(file_);
- assert(alloc_);
- auto* buf = bufs_[buf_doff_];
- const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
- assert(!buf->Free() ||
- (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
- // we have reached end of file, and there is space in the last buffer
- // pad it with zero for direct IO
- buf->FillTrailingZeros();
- assert(buf->Used() % kFileAlignmentSize == 0);
- writer_->Write(file_.get(), buf, file_off,
- std::bind(&WriteableCacheFile::BufferWriteDone, this));
- pending_ios_++;
- buf_doff_++;
- }
- void WriteableCacheFile::BufferWriteDone() {
- WriteLock _(&rwlock_);
- assert(bufs_.size());
- pending_ios_--;
- if (buf_doff_ < bufs_.size()) {
- DispatchBuffer();
- }
- if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
- // end-of-file reached, move to read mode
- CloseAndOpenForReading();
- }
- }
- void WriteableCacheFile::CloseAndOpenForReading() {
- // Our env abstraction do not allow reading from a file opened for appending
- // We need close the file and re-open it for reading
- Close();
- RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
- }
- bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
- char* scratch) {
- rwlock_.AssertHeld();
- if (!ReadBuffer(lba, scratch)) {
- Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
- lba.off_);
- return false;
- }
- return ParseRec(lba, key, block, scratch);
- }
- bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
- rwlock_.AssertHeld();
- assert(lba.off_ < disk_woff_);
- assert(alloc_);
- // we read from the buffers like reading from a flat file. The list of buffers
- // are treated as contiguous stream of data
- char* tmp = data;
- size_t pending_nbytes = lba.size_;
- // start buffer
- size_t start_idx = lba.off_ / alloc_->BufferSize();
- // offset into the start buffer
- size_t start_off = lba.off_ % alloc_->BufferSize();
- assert(start_idx <= buf_woff_);
- for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
- assert(i <= buf_woff_);
- auto* buf = bufs_[i];
- assert(i == buf_woff_ || !buf->Free());
- // bytes to write to the buffer
- size_t nbytes = pending_nbytes > (buf->Used() - start_off)
- ? (buf->Used() - start_off)
- : pending_nbytes;
- memcpy(tmp, buf->Data() + start_off, nbytes);
- // left over to be written
- pending_nbytes -= nbytes;
- start_off = 0;
- tmp += nbytes;
- }
- assert(!pending_nbytes);
- if (pending_nbytes) {
- return false;
- }
- assert(tmp == data + lba.size_);
- return true;
- }
- void WriteableCacheFile::Close() {
- rwlock_.AssertHeld();
- assert(size_ >= max_size_);
- assert(disk_woff_ >= max_size_);
- assert(buf_doff_ == bufs_.size());
- assert(bufs_.size() - buf_woff_ <= 1);
- assert(!pending_ios_);
- Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
- disk_woff_);
- ClearBuffers();
- file_.reset();
- assert(refs_);
- --refs_;
- }
- void WriteableCacheFile::ClearBuffers() {
- assert(alloc_);
- for (size_t i = 0; i < bufs_.size(); ++i) {
- alloc_->Deallocate(bufs_[i]);
- }
- bufs_.clear();
- }
- //
- // ThreadedFileWriter implementation
- //
- ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
- const size_t qdepth, const size_t io_size)
- : Writer(cache), io_size_(io_size) {
- for (size_t i = 0; i < qdepth; ++i) {
- port::Thread th(&ThreadedWriter::ThreadMain, this);
- threads_.push_back(std::move(th));
- }
- }
- void ThreadedWriter::Stop() {
- // notify all threads to exit
- for (size_t i = 0; i < threads_.size(); ++i) {
- q_.Push(IO(/*signal=*/true));
- }
- // wait for all threads to exit
- for (auto& th : threads_) {
- th.join();
- assert(!th.joinable());
- }
- threads_.clear();
- }
- void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
- const uint64_t file_off,
- const std::function<void()> callback) {
- q_.Push(IO(file, buf, file_off, callback));
- }
- void ThreadedWriter::ThreadMain() {
- while (true) {
- // Fetch the IO to process
- IO io(q_.Pop());
- if (io.signal_) {
- // that's secret signal to exit
- break;
- }
- // Reserve space for writing the buffer
- while (!cache_->Reserve(io.buf_->Used())) {
- // We can fail to reserve space if every file in the system
- // is being currently accessed
- /* sleep override */
- Env::Default()->SleepForMicroseconds(1000000);
- }
- DispatchIO(io);
- io.callback_();
- }
- }
- void ThreadedWriter::DispatchIO(const IO& io) {
- size_t written = 0;
- while (written < io.buf_->Used()) {
- Slice data(io.buf_->Data() + written, io_size_);
- Status s = io.file_->Append(data);
- assert(s.ok());
- if (!s.ok()) {
- // That is definite IO error to device. There is not much we can
- // do but ignore the failure. This can lead to corruption of data on
- // disk, but the cache will skip while reading
- fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
- }
- written += io_size_;
- }
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif
|