| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656 |
- // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "utilities/ttl/db_ttl_impl.h"
- #include "db/write_batch_internal.h"
- #include "file/filename.h"
- #include "logging/logging.h"
- #include "rocksdb/convenience.h"
- #include "rocksdb/env.h"
- #include "rocksdb/iterator.h"
- #include "rocksdb/system_clock.h"
- #include "rocksdb/utilities/db_ttl.h"
- #include "rocksdb/utilities/object_registry.h"
- #include "rocksdb/utilities/options_type.h"
- #include "util/coding.h"
- namespace ROCKSDB_NAMESPACE {
- static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
- {{"user_operator", OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
- 0, OptionVerificationType::kByNameAllowNull,
- OptionTypeFlags::kNone)}};
- TtlMergeOperator::TtlMergeOperator(
- const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
- : user_merge_op_(merge_op), clock_(clock) {
- RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
- }
- bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
- MergeOperationOutput* merge_out) const {
- const uint32_t ts_len = DBWithTTLImpl::kTSLength;
- if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
- ROCKS_LOG_ERROR(merge_in.logger,
- "Error: Could not remove timestamp from existing value.");
- return false;
- }
- // Extract time-stamp from each operand to be passed to user_merge_op_
- std::vector<Slice> operands_without_ts;
- for (const auto& operand : merge_in.operand_list) {
- if (operand.size() < ts_len) {
- ROCKS_LOG_ERROR(merge_in.logger,
- "Error: Could not remove timestamp from operand value.");
- return false;
- }
- operands_without_ts.push_back(operand);
- operands_without_ts.back().remove_suffix(ts_len);
- }
- // Apply the user merge operator (store result in *new_value)
- bool good = true;
- MergeOperationOutput user_merge_out(merge_out->new_value,
- merge_out->existing_operand);
- if (merge_in.existing_value) {
- Slice existing_value_without_ts(merge_in.existing_value->data(),
- merge_in.existing_value->size() - ts_len);
- good = user_merge_op_->FullMergeV2(
- MergeOperationInput(merge_in.key, &existing_value_without_ts,
- operands_without_ts, merge_in.logger),
- &user_merge_out);
- } else {
- good = user_merge_op_->FullMergeV2(
- MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
- merge_in.logger),
- &user_merge_out);
- }
- merge_out->op_failure_scope = user_merge_out.op_failure_scope;
- // Return false if the user merge operator returned false
- if (!good) {
- return false;
- }
- if (merge_out->existing_operand.data()) {
- merge_out->new_value.assign(merge_out->existing_operand.data(),
- merge_out->existing_operand.size());
- merge_out->existing_operand = Slice(nullptr, 0);
- }
- // Augment the *new_value with the ttl time-stamp
- int64_t curtime;
- if (!clock_->GetCurrentTime(&curtime).ok()) {
- ROCKS_LOG_ERROR(
- merge_in.logger,
- "Error: Could not get current time to be attached internally "
- "to the new value.");
- return false;
- } else {
- char ts_string[ts_len];
- EncodeFixed32(ts_string, (int32_t)curtime);
- merge_out->new_value.append(ts_string, ts_len);
- return true;
- }
- }
- bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
- const std::deque<Slice>& operand_list,
- std::string* new_value,
- Logger* logger) const {
- const uint32_t ts_len = DBWithTTLImpl::kTSLength;
- std::deque<Slice> operands_without_ts;
- for (const auto& operand : operand_list) {
- if (operand.size() < ts_len) {
- ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
- return false;
- }
- operands_without_ts.emplace_back(operand.data(), operand.size() - ts_len);
- }
- // Apply the user partial-merge operator (store result in *new_value)
- assert(new_value);
- if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
- logger)) {
- return false;
- }
- // Augment the *new_value with the ttl time-stamp
- int64_t curtime;
- if (!clock_->GetCurrentTime(&curtime).ok()) {
- ROCKS_LOG_ERROR(
- logger,
- "Error: Could not get current time to be attached internally "
- "to the new value.");
- return false;
- } else {
- char ts_string[ts_len];
- EncodeFixed32(ts_string, (int32_t)curtime);
- new_value->append(ts_string, ts_len);
- return true;
- }
- }
- Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
- if (clock_ == nullptr) {
- clock_ = config_options.env->GetSystemClock().get();
- }
- return MergeOperator::PrepareOptions(config_options);
- }
- Status TtlMergeOperator::ValidateOptions(
- const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
- if (user_merge_op_ == nullptr) {
- return Status::InvalidArgument(
- "UserMergeOperator required by TtlMergeOperator");
- } else if (clock_ == nullptr) {
- return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
- } else {
- return MergeOperator::ValidateOptions(db_opts, cf_opts);
- }
- }
- void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
- SystemClock* clock) {
- if (options->compaction_filter) {
- options->compaction_filter =
- new TtlCompactionFilter(ttl, clock, options->compaction_filter);
- } else {
- options->compaction_filter_factory =
- std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
- ttl, clock, options->compaction_filter_factory));
- }
- if (options->merge_operator) {
- options->merge_operator.reset(
- new TtlMergeOperator(options->merge_operator, clock));
- }
- }
- static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
- {"ttl", {0, OptionType::kInt32T}},
- };
- static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
- {"user_filter_factory",
- OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
- 0, OptionVerificationType::kByNameAllowFromNull,
- OptionTypeFlags::kNone)}};
- static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
- {"user_filter",
- OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
- 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
- TtlCompactionFilter::TtlCompactionFilter(
- int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
- std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
- : LayeredCompactionFilterBase(_user_comp_filter,
- std::move(_user_comp_filter_from_factory)),
- ttl_(ttl),
- clock_(clock) {
- RegisterOptions("TTL", &ttl_, &ttl_type_info);
- RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
- }
- bool TtlCompactionFilter::Filter(int level, const Slice& key,
- const Slice& old_val, std::string* new_val,
- bool* value_changed) const {
- if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
- return true;
- }
- if (user_comp_filter() == nullptr) {
- return false;
- }
- assert(old_val.size() >= DBWithTTLImpl::kTSLength);
- Slice old_val_without_ts(old_val.data(),
- old_val.size() - DBWithTTLImpl::kTSLength);
- if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
- value_changed)) {
- return true;
- }
- if (*value_changed) {
- new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
- DBWithTTLImpl::kTSLength);
- }
- return false;
- }
- Status TtlCompactionFilter::PrepareOptions(
- const ConfigOptions& config_options) {
- if (clock_ == nullptr) {
- clock_ = config_options.env->GetSystemClock().get();
- }
- return LayeredCompactionFilterBase::PrepareOptions(config_options);
- }
- Status TtlCompactionFilter::ValidateOptions(
- const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
- if (clock_ == nullptr) {
- return Status::InvalidArgument(
- "SystemClock required by TtlCompactionFilter");
- } else {
- return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
- }
- }
- TtlCompactionFilterFactory::TtlCompactionFilterFactory(
- int32_t ttl, SystemClock* clock,
- std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
- : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
- RegisterOptions("UserOptions", &user_comp_filter_factory_,
- &ttl_cff_type_info);
- RegisterOptions("TTL", &ttl_, &ttl_type_info);
- }
- std::unique_ptr<CompactionFilter>
- TtlCompactionFilterFactory::CreateCompactionFilter(
- const CompactionFilter::Context& context) {
- std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
- nullptr;
- if (user_comp_filter_factory_) {
- user_comp_filter_from_factory =
- user_comp_filter_factory_->CreateCompactionFilter(context);
- }
- return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
- ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
- }
- Status TtlCompactionFilterFactory::PrepareOptions(
- const ConfigOptions& config_options) {
- if (clock_ == nullptr) {
- clock_ = config_options.env->GetSystemClock().get();
- }
- return CompactionFilterFactory::PrepareOptions(config_options);
- }
- Status TtlCompactionFilterFactory::ValidateOptions(
- const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
- if (clock_ == nullptr) {
- return Status::InvalidArgument(
- "SystemClock required by TtlCompactionFilterFactory");
- } else {
- return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
- }
- }
- int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
- library.AddFactory<MergeOperator>(
- TtlMergeOperator::kClassName(),
- [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
- std::string* /* errmsg */) {
- guard->reset(new TtlMergeOperator(nullptr, nullptr));
- return guard->get();
- });
- library.AddFactory<CompactionFilterFactory>(
- TtlCompactionFilterFactory::kClassName(),
- [](const std::string& /*uri*/,
- std::unique_ptr<CompactionFilterFactory>* guard,
- std::string* /* errmsg */) {
- guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
- return guard->get();
- });
- library.AddFactory<CompactionFilter>(
- TtlCompactionFilter::kClassName(),
- [](const std::string& /*uri*/,
- std::unique_ptr<CompactionFilter>* /*guard*/,
- std::string* /* errmsg */) {
- return new TtlCompactionFilter(0, nullptr, nullptr);
- });
- size_t num_types;
- return static_cast<int>(library.GetFactoryCount(&num_types));
- }
- // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
- DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
- DBWithTTLImpl::~DBWithTTLImpl() {
- if (!closed_) {
- Close().PermitUncheckedError();
- }
- }
- Status DBWithTTLImpl::Close() {
- Status ret = Status::OK();
- if (!closed_) {
- Options default_options = GetOptions();
- // Need to stop background compaction before getting rid of the filter
- CancelAllBackgroundWork(db_, /* wait = */ true);
- ret = db_->Close();
- delete default_options.compaction_filter;
- closed_ = true;
- }
- return ret;
- }
- void DBWithTTLImpl::RegisterTtlClasses() {
- static std::once_flag once;
- std::call_once(once, [&]() {
- ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
- });
- }
- Status DBWithTTL::Open(const Options& options, const std::string& dbname,
- DBWithTTL** dbptr, int32_t ttl, bool read_only) {
- DBOptions db_options(options);
- ColumnFamilyOptions cf_options(options);
- std::vector<ColumnFamilyDescriptor> column_families;
- column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
- std::vector<ColumnFamilyHandle*> handles;
- Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
- dbptr, {ttl}, read_only);
- if (s.ok()) {
- assert(handles.size() == 1);
- // i can delete the handle since DBImpl is always holding a reference to
- // default column family
- delete handles[0];
- }
- return s;
- }
- Status DBWithTTL::Open(
- const DBOptions& db_options, const std::string& dbname,
- const std::vector<ColumnFamilyDescriptor>& column_families,
- std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
- const std::vector<int32_t>& ttls, bool read_only) {
- DBWithTTLImpl::RegisterTtlClasses();
- if (ttls.size() != column_families.size()) {
- return Status::InvalidArgument(
- "ttls size has to be the same as number of column families");
- }
- SystemClock* clock = (db_options.env == nullptr)
- ? SystemClock::Default().get()
- : db_options.env->GetSystemClock().get();
- std::vector<ColumnFamilyDescriptor> column_families_sanitized =
- column_families;
- for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
- DBWithTTLImpl::SanitizeOptions(
- ttls[i], &column_families_sanitized[i].options, clock);
- }
- DB* db;
- Status st;
- if (read_only) {
- st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
- handles, &db);
- } else {
- st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
- }
- if (st.ok()) {
- *dbptr = new DBWithTTLImpl(db);
- } else {
- *dbptr = nullptr;
- }
- return st;
- }
- Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
- const ColumnFamilyOptions& options, const std::string& column_family_name,
- ColumnFamilyHandle** handle, int ttl) {
- RegisterTtlClasses();
- ColumnFamilyOptions sanitized_options = options;
- DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
- GetEnv()->GetSystemClock().get());
- return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
- handle);
- }
- Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
- const std::string& column_family_name,
- ColumnFamilyHandle** handle) {
- return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
- }
- // Appends the current timestamp to the string.
- // Returns false if could not get the current_time, true if append succeeds
- Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
- SystemClock* clock) {
- val_with_ts->reserve(kTSLength + val.size());
- char ts_string[kTSLength];
- int64_t curtime;
- Status st = clock->GetCurrentTime(&curtime);
- if (!st.ok()) {
- return st;
- }
- EncodeFixed32(ts_string, (int32_t)curtime);
- val_with_ts->append(val.data(), val.size());
- val_with_ts->append(ts_string, kTSLength);
- return st;
- }
- // Returns corruption if the length of the string is lesser than timestamp, or
- // timestamp refers to a time lesser than ttl-feature release time
- Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
- if (str.size() < kTSLength) {
- return Status::Corruption("Error: value's length less than timestamp's\n");
- }
- // Checks that TS is not lesser than kMinTimestamp
- // Gaurds against corruption & normal database opened incorrectly in ttl mode
- int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
- if (timestamp_value < kMinTimestamp) {
- return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
- }
- return Status::OK();
- }
- // Checks if the string is stale or not according to TTl provided
- bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
- SystemClock* clock) {
- if (ttl <= 0) { // Data is fresh if TTL is non-positive
- return false;
- }
- int64_t curtime;
- if (!clock->GetCurrentTime(&curtime).ok()) {
- return false; // Treat the data as fresh if could not get current time
- }
- /* int32_t may overflow when timestamp_value + ttl
- * for example ttl = 86400 * 365 * 15
- * convert timestamp_value to int64_t
- */
- int64_t timestamp_value =
- DecodeFixed32(value.data() + value.size() - kTSLength);
- return (timestamp_value + ttl) < curtime;
- }
- // Strips the TS from the end of the slice
- Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
- if (pinnable_val->size() < kTSLength) {
- return Status::Corruption("Bad timestamp in key-value");
- }
- // Erasing characters which hold the TS
- pinnable_val->remove_suffix(kTSLength);
- return Status::OK();
- }
- // Strips the TS from the end of the string
- Status DBWithTTLImpl::StripTS(std::string* str) {
- if (str->length() < kTSLength) {
- return Status::Corruption("Bad timestamp in key-value");
- }
- // Erasing characters which hold the TS
- str->erase(str->length() - kTSLength, kTSLength);
- return Status::OK();
- }
- Status DBWithTTLImpl::Put(const WriteOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- const Slice& val) {
- WriteBatch batch;
- Status st = batch.Put(column_family, key, val);
- if (st.ok()) {
- st = Write(options, &batch);
- }
- return st;
- }
- Status DBWithTTLImpl::Get(const ReadOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- PinnableSlice* value, std::string* timestamp) {
- if (timestamp) {
- return Status::NotSupported(
- "Get() that returns timestamp is not supported");
- }
- Status st = db_->Get(options, column_family, key, value);
- if (!st.ok()) {
- return st;
- }
- st = SanityCheckTimestamp(*value);
- if (!st.ok()) {
- return st;
- }
- return StripTS(value);
- }
- void DBWithTTLImpl::MultiGet(const ReadOptions& options, const size_t num_keys,
- ColumnFamilyHandle** column_families,
- const Slice* keys, PinnableSlice* values,
- std::string* timestamps, Status* statuses,
- const bool /*sorted_input*/) {
- if (timestamps) {
- for (size_t i = 0; i < num_keys; ++i) {
- statuses[i] = Status::NotSupported(
- "MultiGet() returning timestamps not implemented.");
- }
- return;
- }
- db_->MultiGet(options, num_keys, column_families, keys, values, timestamps,
- statuses);
- for (size_t i = 0; i < num_keys; ++i) {
- if (!statuses[i].ok()) {
- continue;
- }
- PinnableSlice tmp_val = std::move(values[i]);
- values[i].PinSelf(tmp_val);
- assert(!values[i].IsPinned());
- statuses[i] = SanityCheckTimestamp(values[i]);
- if (!statuses[i].ok()) {
- continue;
- }
- statuses[i] = StripTS(&values[i]);
- }
- }
- bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
- ColumnFamilyHandle* column_family,
- const Slice& key, std::string* value,
- bool* value_found) {
- bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
- if (ret && value != nullptr && value_found != nullptr && *value_found) {
- if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
- return false;
- }
- }
- return ret;
- }
- Status DBWithTTLImpl::Merge(const WriteOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- const Slice& value) {
- WriteBatch batch;
- Status st = batch.Merge(column_family, key, value);
- if (st.ok()) {
- st = Write(options, &batch);
- }
- return st;
- }
- Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
- class Handler : public WriteBatch::Handler {
- public:
- explicit Handler(SystemClock* clock) : clock_(clock) {}
- WriteBatch updates_ttl;
- Status PutCF(uint32_t column_family_id, const Slice& key,
- const Slice& value) override {
- std::string value_with_ts;
- Status st = AppendTS(value, &value_with_ts, clock_);
- if (!st.ok()) {
- return st;
- }
- return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
- value_with_ts);
- }
- Status MergeCF(uint32_t column_family_id, const Slice& key,
- const Slice& value) override {
- std::string value_with_ts;
- Status st = AppendTS(value, &value_with_ts, clock_);
- if (!st.ok()) {
- return st;
- }
- return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
- value_with_ts);
- }
- Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
- return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
- }
- Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
- const Slice& end_key) override {
- return WriteBatchInternal::DeleteRange(&updates_ttl, column_family_id,
- begin_key, end_key);
- }
- void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
- private:
- SystemClock* clock_;
- };
- Handler handler(GetEnv()->GetSystemClock().get());
- Status st = updates->Iterate(&handler);
- if (!st.ok()) {
- return st;
- } else {
- return db_->Write(opts, &(handler.updates_ttl));
- }
- }
- Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& _read_options,
- ColumnFamilyHandle* column_family) {
- if (_read_options.io_activity != Env::IOActivity::kUnknown &&
- _read_options.io_activity != Env::IOActivity::kDBIterator) {
- return NewErrorIterator(Status::InvalidArgument(
- "Can only call NewIterator with `ReadOptions::io_activity` is "
- "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
- }
- ReadOptions read_options(_read_options);
- if (read_options.io_activity == Env::IOActivity::kUnknown) {
- read_options.io_activity = Env::IOActivity::kDBIterator;
- }
- return new TtlIterator(db_->NewIterator(read_options, column_family));
- }
- void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) {
- std::shared_ptr<TtlCompactionFilterFactory> filter;
- Options opts;
- opts = GetOptions(h);
- filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
- opts.compaction_filter_factory);
- if (!filter) {
- return;
- }
- filter->SetTtl(ttl);
- }
- Status DBWithTTLImpl::GetTtl(ColumnFamilyHandle* h, int32_t* ttl) {
- if (h == nullptr || ttl == nullptr) {
- return Status::InvalidArgument(
- "column family handle or ttl cannot be null");
- }
- std::shared_ptr<TtlCompactionFilterFactory> filter;
- Options opts;
- opts = GetOptions(h);
- filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
- opts.compaction_filter_factory);
- if (!filter) {
- return Status::InvalidArgument(
- "TTLCompactionFilterFactory is not set for TTLDB");
- }
- *ttl = filter->GetTtl();
- return Status::OK();
- }
- } // namespace ROCKSDB_NAMESPACE
|