| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #include <unordered_map>
- #include "file/random_access_file_reader.h"
- #include "file/writable_file_writer.h"
- #include "rocksdb/utilities/cache_dump_load.h"
- #include "table/block_based/block.h"
- #include "table/block_based/block_type.h"
- #include "table/block_based/cachable_entry.h"
- #include "table/block_based/parsed_full_filter_block.h"
- #include "table/block_based/reader_common.h"
- #include "util/hash_containers.h"
- namespace ROCKSDB_NAMESPACE {
- // the read buffer size of for the default CacheDumpReader
- const unsigned int kDumpReaderBufferSize = 1024; // 1KB
- static const unsigned int kSizePrefixLen = 4;
- enum CacheDumpUnitType : unsigned char {
- kHeader = 1,
- kFooter = 2,
- kData = 3,
- kFilter = 4,
- kProperties = 5,
- kCompressionDictionary = 6,
- kRangeDeletion = 7,
- kHashIndexPrefixes = 8,
- kHashIndexMetadata = 9,
- kMetaIndex = 10,
- kIndex = 11,
- kDeprecatedFilterBlock = 12, // OBSOLETE / DEPRECATED
- kFilterMetaBlock = 13,
- kBlockTypeMax,
- };
- // The metadata of a dump unit. After it is serialized, its size is fixed 16
- // bytes.
- struct DumpUnitMeta {
- // sequence number is a monotonically increasing number to indicate the order
- // of the blocks being written. Header is 0.
- uint32_t sequence_num;
- // The Crc32c checksum of its dump unit.
- uint32_t dump_unit_checksum;
- // The dump unit size after the dump unit is serialized to a string.
- uint64_t dump_unit_size;
- void reset() {
- sequence_num = 0;
- dump_unit_checksum = 0;
- dump_unit_size = 0;
- }
- };
- // The data structure to hold a block and its information.
- struct DumpUnit {
- // The timestamp when the block is identified, copied, and dumped from block
- // cache
- uint64_t timestamp;
- // The type of the block
- CacheDumpUnitType type;
- // The key of this block when the block is referenced by this Cache
- Slice key;
- // The block size
- size_t value_len;
- // The Crc32c checksum of the block
- uint32_t value_checksum;
- // Pointer to the block. Note that, in the dump process, it points to a memory
- // buffer copied from cache block. The buffer is freed when we process the
- // next block. In the load process, we use an std::string to store the
- // serialized dump_unit read from the reader. So it points to the memory
- // address of the begin of the block in this string.
- void* value;
- DumpUnit() { reset(); }
- void reset() {
- timestamp = 0;
- type = CacheDumpUnitType::kBlockTypeMax;
- key.clear();
- value_len = 0;
- value_checksum = 0;
- value = nullptr;
- }
- };
- // The default implementation of the Cache Dumper
- class CacheDumperImpl : public CacheDumper {
- public:
- CacheDumperImpl(const CacheDumpOptions& dump_options,
- const std::shared_ptr<Cache>& cache,
- std::unique_ptr<CacheDumpWriter>&& writer)
- : options_(dump_options), cache_(cache), writer_(std::move(writer)) {
- dumped_size_bytes_ = 0;
- }
- ~CacheDumperImpl() { writer_.reset(); }
- Status SetDumpFilter(const std::vector<DB*>& db_list) override;
- IOStatus DumpCacheEntriesToWriter() override;
- private:
- IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key,
- const Slice& value, uint64_t timestamp);
- IOStatus WriteHeader();
- IOStatus WriteFooter();
- bool ShouldFilterOut(const Slice& key);
- std::function<void(const Slice&, Cache::ObjectPtr, size_t,
- const Cache::CacheItemHelper*)>
- DumpOneBlockCallBack(std::string& buf);
- CacheDumpOptions options_;
- std::shared_ptr<Cache> cache_;
- std::unique_ptr<CacheDumpWriter> writer_;
- SystemClock* clock_;
- uint32_t sequence_num_;
- // The cache key prefix filter. Currently, we use db_session_id as the prefix,
- // so using std::set to store the prefixes as filter is enough. Further
- // improvement can be applied like BloomFilter or others to speedup the
- // filtering.
- std::set<std::string> prefix_filter_;
- // Deadline for dumper in microseconds.
- std::chrono::microseconds deadline_;
- uint64_t dumped_size_bytes_;
- // dump all keys of cache if user doesn't call SetDumpFilter
- bool dump_all_keys_ = true;
- };
- // The default implementation of CacheDumpedLoader
- class CacheDumpedLoaderImpl : public CacheDumpedLoader {
- public:
- CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
- const BlockBasedTableOptions& /*toptions*/,
- const std::shared_ptr<SecondaryCache>& secondary_cache,
- std::unique_ptr<CacheDumpReader>&& reader)
- : options_(dump_options),
- secondary_cache_(secondary_cache),
- reader_(std::move(reader)) {}
- ~CacheDumpedLoaderImpl() {}
- IOStatus RestoreCacheEntriesToSecondaryCache() override;
- private:
- IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
- IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit);
- IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit);
- IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit);
- CacheDumpOptions options_;
- std::shared_ptr<SecondaryCache> secondary_cache_;
- std::unique_ptr<CacheDumpReader> reader_;
- };
- // The default implementation of CacheDumpWriter. We write the blocks to a file
- // sequentially.
- class ToFileCacheDumpWriter : public CacheDumpWriter {
- public:
- explicit ToFileCacheDumpWriter(
- std::unique_ptr<WritableFileWriter>&& file_writer)
- : file_writer_(std::move(file_writer)) {}
- ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); }
- // Write the serialized metadata to the file
- IOStatus WriteMetadata(const Slice& metadata) override {
- assert(file_writer_ != nullptr);
- std::string prefix;
- PutFixed32(&prefix, static_cast<uint32_t>(metadata.size()));
- const IOOptions opts;
- IOStatus io_s = file_writer_->Append(opts, Slice(prefix));
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = file_writer_->Append(opts, metadata);
- return io_s;
- }
- // Write the serialized data to the file
- IOStatus WritePacket(const Slice& data) override {
- assert(file_writer_ != nullptr);
- std::string prefix;
- PutFixed32(&prefix, static_cast<uint32_t>(data.size()));
- const IOOptions opts;
- IOStatus io_s = file_writer_->Append(opts, Slice(prefix));
- if (!io_s.ok()) {
- return io_s;
- }
- io_s = file_writer_->Append(opts, data);
- return io_s;
- }
- // Reset the writer
- IOStatus Close() override {
- IOStatus io_s;
- if (file_writer_ != nullptr && !file_writer_->seen_error()) {
- io_s = file_writer_->Sync(IOOptions(), false /* use_fsync */);
- }
- file_writer_.reset();
- return io_s;
- }
- private:
- std::unique_ptr<WritableFileWriter> file_writer_;
- };
- // The default implementation of CacheDumpReader. It is implemented based on
- // RandomAccessFileReader. Note that, we keep an internal variable to remember
- // the current offset.
- class FromFileCacheDumpReader : public CacheDumpReader {
- public:
- explicit FromFileCacheDumpReader(
- std::unique_ptr<RandomAccessFileReader>&& reader)
- : file_reader_(std::move(reader)),
- offset_(0),
- buffer_(new char[kDumpReaderBufferSize]) {}
- ~FromFileCacheDumpReader() { delete[] buffer_; }
- IOStatus ReadMetadata(std::string* metadata) override {
- uint32_t metadata_len = 0;
- IOStatus io_s = ReadSizePrefix(&metadata_len);
- if (!io_s.ok()) {
- return io_s;
- }
- return Read(metadata_len, metadata);
- }
- IOStatus ReadPacket(std::string* data) override {
- uint32_t data_len = 0;
- IOStatus io_s = ReadSizePrefix(&data_len);
- if (!io_s.ok()) {
- return io_s;
- }
- return Read(data_len, data);
- }
- private:
- IOStatus ReadSizePrefix(uint32_t* len) {
- std::string prefix;
- IOStatus io_s = Read(kSizePrefixLen, &prefix);
- if (!io_s.ok()) {
- return io_s;
- }
- Slice encoded_slice(prefix);
- if (!GetFixed32(&encoded_slice, len)) {
- return IOStatus::Corruption("Decode size prefix string failed");
- }
- return IOStatus::OK();
- }
- IOStatus Read(size_t len, std::string* data) {
- assert(file_reader_ != nullptr);
- IOStatus io_s;
- unsigned int bytes_to_read = static_cast<unsigned int>(len);
- unsigned int to_read = bytes_to_read > kDumpReaderBufferSize
- ? kDumpReaderBufferSize
- : bytes_to_read;
- while (to_read > 0) {
- io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
- buffer_, nullptr);
- if (!io_s.ok()) {
- return io_s;
- }
- if (result_.size() < to_read) {
- return IOStatus::Corruption("Corrupted cache dump file.");
- }
- data->append(result_.data(), result_.size());
- offset_ += to_read;
- bytes_to_read -= to_read;
- to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize
- : bytes_to_read;
- }
- return io_s;
- }
- std::unique_ptr<RandomAccessFileReader> file_reader_;
- Slice result_;
- size_t offset_;
- char* buffer_;
- };
- // The cache dump and load helper class
- class CacheDumperHelper {
- public:
- // serialize the dump_unit_meta to a string, it is fixed 16 bytes size.
- static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) {
- assert(data);
- PutFixed32(data, static_cast<uint32_t>(meta.sequence_num));
- PutFixed32(data, static_cast<uint32_t>(meta.dump_unit_checksum));
- PutFixed64(data, meta.dump_unit_size);
- }
- // Serialize the dump_unit to a string.
- static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) {
- assert(data);
- PutFixed64(data, dump_unit.timestamp);
- data->push_back(dump_unit.type);
- PutLengthPrefixedSlice(data, dump_unit.key);
- PutFixed32(data, static_cast<uint32_t>(dump_unit.value_len));
- PutFixed32(data, dump_unit.value_checksum);
- PutLengthPrefixedSlice(data,
- Slice((char*)dump_unit.value, dump_unit.value_len));
- }
- // Deserialize the dump_unit_meta from a string
- static Status DecodeDumpUnitMeta(const std::string& encoded_data,
- DumpUnitMeta* unit_meta) {
- assert(unit_meta != nullptr);
- Slice encoded_slice = Slice(encoded_data);
- if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) {
- return Status::Incomplete("Decode dumped unit meta sequence_num failed");
- }
- if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) {
- return Status::Incomplete(
- "Decode dumped unit meta dump_unit_checksum failed");
- }
- if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) {
- return Status::Incomplete(
- "Decode dumped unit meta dump_unit_size failed");
- }
- return Status::OK();
- }
- // Deserialize the dump_unit from a string.
- static Status DecodeDumpUnit(const std::string& encoded_data,
- DumpUnit* dump_unit) {
- assert(dump_unit != nullptr);
- Slice encoded_slice = Slice(encoded_data);
- // Decode timestamp
- if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) {
- return Status::Incomplete("Decode dumped unit string failed");
- }
- // Decode the block type
- dump_unit->type = static_cast<CacheDumpUnitType>(encoded_slice[0]);
- encoded_slice.remove_prefix(1);
- // Decode the key
- if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) {
- return Status::Incomplete("Decode dumped unit string failed");
- }
- // Decode the value size
- uint32_t value_len;
- if (!GetFixed32(&encoded_slice, &value_len)) {
- return Status::Incomplete("Decode dumped unit string failed");
- }
- dump_unit->value_len = static_cast<size_t>(value_len);
- // Decode the value checksum
- if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) {
- return Status::Incomplete("Decode dumped unit string failed");
- }
- // Decode the block content and copy to the memory space whose pointer
- // will be managed by the cache finally.
- Slice block;
- if (!GetLengthPrefixedSlice(&encoded_slice, &block)) {
- return Status::Incomplete("Decode dumped unit string failed");
- }
- dump_unit->value = (void*)block.data();
- assert(block.size() == dump_unit->value_len);
- return Status::OK();
- }
- };
- } // namespace ROCKSDB_NAMESPACE
|