db_ttl_impl.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  3. // Use of this source code is governed by a BSD-style license that can be
  4. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  5. #include "utilities/ttl/db_ttl_impl.h"
  6. #include "db/write_batch_internal.h"
  7. #include "file/filename.h"
  8. #include "logging/logging.h"
  9. #include "rocksdb/convenience.h"
  10. #include "rocksdb/env.h"
  11. #include "rocksdb/iterator.h"
  12. #include "rocksdb/system_clock.h"
  13. #include "rocksdb/utilities/db_ttl.h"
  14. #include "rocksdb/utilities/object_registry.h"
  15. #include "rocksdb/utilities/options_type.h"
  16. #include "util/coding.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
  19. {{"user_operator", OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
  20. 0, OptionVerificationType::kByNameAllowNull,
  21. OptionTypeFlags::kNone)}};
  22. TtlMergeOperator::TtlMergeOperator(
  23. const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
  24. : user_merge_op_(merge_op), clock_(clock) {
  25. RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
  26. }
  27. bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
  28. MergeOperationOutput* merge_out) const {
  29. const uint32_t ts_len = DBWithTTLImpl::kTSLength;
  30. if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
  31. ROCKS_LOG_ERROR(merge_in.logger,
  32. "Error: Could not remove timestamp from existing value.");
  33. return false;
  34. }
  35. // Extract time-stamp from each operand to be passed to user_merge_op_
  36. std::vector<Slice> operands_without_ts;
  37. for (const auto& operand : merge_in.operand_list) {
  38. if (operand.size() < ts_len) {
  39. ROCKS_LOG_ERROR(merge_in.logger,
  40. "Error: Could not remove timestamp from operand value.");
  41. return false;
  42. }
  43. operands_without_ts.push_back(operand);
  44. operands_without_ts.back().remove_suffix(ts_len);
  45. }
  46. // Apply the user merge operator (store result in *new_value)
  47. bool good = true;
  48. MergeOperationOutput user_merge_out(merge_out->new_value,
  49. merge_out->existing_operand);
  50. if (merge_in.existing_value) {
  51. Slice existing_value_without_ts(merge_in.existing_value->data(),
  52. merge_in.existing_value->size() - ts_len);
  53. good = user_merge_op_->FullMergeV2(
  54. MergeOperationInput(merge_in.key, &existing_value_without_ts,
  55. operands_without_ts, merge_in.logger),
  56. &user_merge_out);
  57. } else {
  58. good = user_merge_op_->FullMergeV2(
  59. MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
  60. merge_in.logger),
  61. &user_merge_out);
  62. }
  63. merge_out->op_failure_scope = user_merge_out.op_failure_scope;
  64. // Return false if the user merge operator returned false
  65. if (!good) {
  66. return false;
  67. }
  68. if (merge_out->existing_operand.data()) {
  69. merge_out->new_value.assign(merge_out->existing_operand.data(),
  70. merge_out->existing_operand.size());
  71. merge_out->existing_operand = Slice(nullptr, 0);
  72. }
  73. // Augment the *new_value with the ttl time-stamp
  74. int64_t curtime;
  75. if (!clock_->GetCurrentTime(&curtime).ok()) {
  76. ROCKS_LOG_ERROR(
  77. merge_in.logger,
  78. "Error: Could not get current time to be attached internally "
  79. "to the new value.");
  80. return false;
  81. } else {
  82. char ts_string[ts_len];
  83. EncodeFixed32(ts_string, (int32_t)curtime);
  84. merge_out->new_value.append(ts_string, ts_len);
  85. return true;
  86. }
  87. }
  88. bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
  89. const std::deque<Slice>& operand_list,
  90. std::string* new_value,
  91. Logger* logger) const {
  92. const uint32_t ts_len = DBWithTTLImpl::kTSLength;
  93. std::deque<Slice> operands_without_ts;
  94. for (const auto& operand : operand_list) {
  95. if (operand.size() < ts_len) {
  96. ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
  97. return false;
  98. }
  99. operands_without_ts.emplace_back(operand.data(), operand.size() - ts_len);
  100. }
  101. // Apply the user partial-merge operator (store result in *new_value)
  102. assert(new_value);
  103. if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
  104. logger)) {
  105. return false;
  106. }
  107. // Augment the *new_value with the ttl time-stamp
  108. int64_t curtime;
  109. if (!clock_->GetCurrentTime(&curtime).ok()) {
  110. ROCKS_LOG_ERROR(
  111. logger,
  112. "Error: Could not get current time to be attached internally "
  113. "to the new value.");
  114. return false;
  115. } else {
  116. char ts_string[ts_len];
  117. EncodeFixed32(ts_string, (int32_t)curtime);
  118. new_value->append(ts_string, ts_len);
  119. return true;
  120. }
  121. }
  122. Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
  123. if (clock_ == nullptr) {
  124. clock_ = config_options.env->GetSystemClock().get();
  125. }
  126. return MergeOperator::PrepareOptions(config_options);
  127. }
  128. Status TtlMergeOperator::ValidateOptions(
  129. const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
  130. if (user_merge_op_ == nullptr) {
  131. return Status::InvalidArgument(
  132. "UserMergeOperator required by TtlMergeOperator");
  133. } else if (clock_ == nullptr) {
  134. return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
  135. } else {
  136. return MergeOperator::ValidateOptions(db_opts, cf_opts);
  137. }
  138. }
  139. void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
  140. SystemClock* clock) {
  141. if (options->compaction_filter) {
  142. options->compaction_filter =
  143. new TtlCompactionFilter(ttl, clock, options->compaction_filter);
  144. } else {
  145. options->compaction_filter_factory =
  146. std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
  147. ttl, clock, options->compaction_filter_factory));
  148. }
  149. if (options->merge_operator) {
  150. options->merge_operator.reset(
  151. new TtlMergeOperator(options->merge_operator, clock));
  152. }
  153. }
  154. static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
  155. {"ttl", {0, OptionType::kInt32T}},
  156. };
  157. static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
  158. {"user_filter_factory",
  159. OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
  160. 0, OptionVerificationType::kByNameAllowFromNull,
  161. OptionTypeFlags::kNone)}};
  162. static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
  163. {"user_filter",
  164. OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
  165. 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
  166. TtlCompactionFilter::TtlCompactionFilter(
  167. int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
  168. std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
  169. : LayeredCompactionFilterBase(_user_comp_filter,
  170. std::move(_user_comp_filter_from_factory)),
  171. ttl_(ttl),
  172. clock_(clock) {
  173. RegisterOptions("TTL", &ttl_, &ttl_type_info);
  174. RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
  175. }
  176. bool TtlCompactionFilter::Filter(int level, const Slice& key,
  177. const Slice& old_val, std::string* new_val,
  178. bool* value_changed) const {
  179. if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
  180. return true;
  181. }
  182. if (user_comp_filter() == nullptr) {
  183. return false;
  184. }
  185. assert(old_val.size() >= DBWithTTLImpl::kTSLength);
  186. Slice old_val_without_ts(old_val.data(),
  187. old_val.size() - DBWithTTLImpl::kTSLength);
  188. if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
  189. value_changed)) {
  190. return true;
  191. }
  192. if (*value_changed) {
  193. new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
  194. DBWithTTLImpl::kTSLength);
  195. }
  196. return false;
  197. }
  198. Status TtlCompactionFilter::PrepareOptions(
  199. const ConfigOptions& config_options) {
  200. if (clock_ == nullptr) {
  201. clock_ = config_options.env->GetSystemClock().get();
  202. }
  203. return LayeredCompactionFilterBase::PrepareOptions(config_options);
  204. }
  205. Status TtlCompactionFilter::ValidateOptions(
  206. const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
  207. if (clock_ == nullptr) {
  208. return Status::InvalidArgument(
  209. "SystemClock required by TtlCompactionFilter");
  210. } else {
  211. return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
  212. }
  213. }
  214. TtlCompactionFilterFactory::TtlCompactionFilterFactory(
  215. int32_t ttl, SystemClock* clock,
  216. std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
  217. : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
  218. RegisterOptions("UserOptions", &user_comp_filter_factory_,
  219. &ttl_cff_type_info);
  220. RegisterOptions("TTL", &ttl_, &ttl_type_info);
  221. }
  222. std::unique_ptr<CompactionFilter>
  223. TtlCompactionFilterFactory::CreateCompactionFilter(
  224. const CompactionFilter::Context& context) {
  225. std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
  226. nullptr;
  227. if (user_comp_filter_factory_) {
  228. user_comp_filter_from_factory =
  229. user_comp_filter_factory_->CreateCompactionFilter(context);
  230. }
  231. return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
  232. ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
  233. }
  234. Status TtlCompactionFilterFactory::PrepareOptions(
  235. const ConfigOptions& config_options) {
  236. if (clock_ == nullptr) {
  237. clock_ = config_options.env->GetSystemClock().get();
  238. }
  239. return CompactionFilterFactory::PrepareOptions(config_options);
  240. }
  241. Status TtlCompactionFilterFactory::ValidateOptions(
  242. const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
  243. if (clock_ == nullptr) {
  244. return Status::InvalidArgument(
  245. "SystemClock required by TtlCompactionFilterFactory");
  246. } else {
  247. return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
  248. }
  249. }
  250. int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
  251. library.AddFactory<MergeOperator>(
  252. TtlMergeOperator::kClassName(),
  253. [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
  254. std::string* /* errmsg */) {
  255. guard->reset(new TtlMergeOperator(nullptr, nullptr));
  256. return guard->get();
  257. });
  258. library.AddFactory<CompactionFilterFactory>(
  259. TtlCompactionFilterFactory::kClassName(),
  260. [](const std::string& /*uri*/,
  261. std::unique_ptr<CompactionFilterFactory>* guard,
  262. std::string* /* errmsg */) {
  263. guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
  264. return guard->get();
  265. });
  266. library.AddFactory<CompactionFilter>(
  267. TtlCompactionFilter::kClassName(),
  268. [](const std::string& /*uri*/,
  269. std::unique_ptr<CompactionFilter>* /*guard*/,
  270. std::string* /* errmsg */) {
  271. return new TtlCompactionFilter(0, nullptr, nullptr);
  272. });
  273. size_t num_types;
  274. return static_cast<int>(library.GetFactoryCount(&num_types));
  275. }
  276. // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
  277. DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
  278. DBWithTTLImpl::~DBWithTTLImpl() {
  279. if (!closed_) {
  280. Close().PermitUncheckedError();
  281. }
  282. }
  283. Status DBWithTTLImpl::Close() {
  284. Status ret = Status::OK();
  285. if (!closed_) {
  286. Options default_options = GetOptions();
  287. // Need to stop background compaction before getting rid of the filter
  288. CancelAllBackgroundWork(db_, /* wait = */ true);
  289. ret = db_->Close();
  290. delete default_options.compaction_filter;
  291. closed_ = true;
  292. }
  293. return ret;
  294. }
  295. void DBWithTTLImpl::RegisterTtlClasses() {
  296. static std::once_flag once;
  297. std::call_once(once, [&]() {
  298. ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
  299. });
  300. }
  301. Status DBWithTTL::Open(const Options& options, const std::string& dbname,
  302. DBWithTTL** dbptr, int32_t ttl, bool read_only) {
  303. DBOptions db_options(options);
  304. ColumnFamilyOptions cf_options(options);
  305. std::vector<ColumnFamilyDescriptor> column_families;
  306. column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  307. std::vector<ColumnFamilyHandle*> handles;
  308. Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
  309. dbptr, {ttl}, read_only);
  310. if (s.ok()) {
  311. assert(handles.size() == 1);
  312. // i can delete the handle since DBImpl is always holding a reference to
  313. // default column family
  314. delete handles[0];
  315. }
  316. return s;
  317. }
  318. Status DBWithTTL::Open(
  319. const DBOptions& db_options, const std::string& dbname,
  320. const std::vector<ColumnFamilyDescriptor>& column_families,
  321. std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
  322. const std::vector<int32_t>& ttls, bool read_only) {
  323. DBWithTTLImpl::RegisterTtlClasses();
  324. if (ttls.size() != column_families.size()) {
  325. return Status::InvalidArgument(
  326. "ttls size has to be the same as number of column families");
  327. }
  328. SystemClock* clock = (db_options.env == nullptr)
  329. ? SystemClock::Default().get()
  330. : db_options.env->GetSystemClock().get();
  331. std::vector<ColumnFamilyDescriptor> column_families_sanitized =
  332. column_families;
  333. for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
  334. DBWithTTLImpl::SanitizeOptions(
  335. ttls[i], &column_families_sanitized[i].options, clock);
  336. }
  337. DB* db;
  338. Status st;
  339. if (read_only) {
  340. st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
  341. handles, &db);
  342. } else {
  343. st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
  344. }
  345. if (st.ok()) {
  346. *dbptr = new DBWithTTLImpl(db);
  347. } else {
  348. *dbptr = nullptr;
  349. }
  350. return st;
  351. }
  352. Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
  353. const ColumnFamilyOptions& options, const std::string& column_family_name,
  354. ColumnFamilyHandle** handle, int ttl) {
  355. RegisterTtlClasses();
  356. ColumnFamilyOptions sanitized_options = options;
  357. DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
  358. GetEnv()->GetSystemClock().get());
  359. return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
  360. handle);
  361. }
  362. Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
  363. const std::string& column_family_name,
  364. ColumnFamilyHandle** handle) {
  365. return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
  366. }
  367. // Appends the current timestamp to the string.
  368. // Returns false if could not get the current_time, true if append succeeds
  369. Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
  370. SystemClock* clock) {
  371. val_with_ts->reserve(kTSLength + val.size());
  372. char ts_string[kTSLength];
  373. int64_t curtime;
  374. Status st = clock->GetCurrentTime(&curtime);
  375. if (!st.ok()) {
  376. return st;
  377. }
  378. EncodeFixed32(ts_string, (int32_t)curtime);
  379. val_with_ts->append(val.data(), val.size());
  380. val_with_ts->append(ts_string, kTSLength);
  381. return st;
  382. }
  383. // Returns corruption if the length of the string is lesser than timestamp, or
  384. // timestamp refers to a time lesser than ttl-feature release time
  385. Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
  386. if (str.size() < kTSLength) {
  387. return Status::Corruption("Error: value's length less than timestamp's\n");
  388. }
  389. // Checks that TS is not lesser than kMinTimestamp
  390. // Gaurds against corruption & normal database opened incorrectly in ttl mode
  391. int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
  392. if (timestamp_value < kMinTimestamp) {
  393. return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
  394. }
  395. return Status::OK();
  396. }
  397. // Checks if the string is stale or not according to TTl provided
  398. bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
  399. SystemClock* clock) {
  400. if (ttl <= 0) { // Data is fresh if TTL is non-positive
  401. return false;
  402. }
  403. int64_t curtime;
  404. if (!clock->GetCurrentTime(&curtime).ok()) {
  405. return false; // Treat the data as fresh if could not get current time
  406. }
  407. /* int32_t may overflow when timestamp_value + ttl
  408. * for example ttl = 86400 * 365 * 15
  409. * convert timestamp_value to int64_t
  410. */
  411. int64_t timestamp_value =
  412. DecodeFixed32(value.data() + value.size() - kTSLength);
  413. return (timestamp_value + ttl) < curtime;
  414. }
  415. // Strips the TS from the end of the slice
  416. Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
  417. if (pinnable_val->size() < kTSLength) {
  418. return Status::Corruption("Bad timestamp in key-value");
  419. }
  420. // Erasing characters which hold the TS
  421. pinnable_val->remove_suffix(kTSLength);
  422. return Status::OK();
  423. }
  424. // Strips the TS from the end of the string
  425. Status DBWithTTLImpl::StripTS(std::string* str) {
  426. if (str->length() < kTSLength) {
  427. return Status::Corruption("Bad timestamp in key-value");
  428. }
  429. // Erasing characters which hold the TS
  430. str->erase(str->length() - kTSLength, kTSLength);
  431. return Status::OK();
  432. }
  433. Status DBWithTTLImpl::Put(const WriteOptions& options,
  434. ColumnFamilyHandle* column_family, const Slice& key,
  435. const Slice& val) {
  436. WriteBatch batch;
  437. Status st = batch.Put(column_family, key, val);
  438. if (st.ok()) {
  439. st = Write(options, &batch);
  440. }
  441. return st;
  442. }
  443. Status DBWithTTLImpl::Get(const ReadOptions& options,
  444. ColumnFamilyHandle* column_family, const Slice& key,
  445. PinnableSlice* value, std::string* timestamp) {
  446. if (timestamp) {
  447. return Status::NotSupported(
  448. "Get() that returns timestamp is not supported");
  449. }
  450. Status st = db_->Get(options, column_family, key, value);
  451. if (!st.ok()) {
  452. return st;
  453. }
  454. st = SanityCheckTimestamp(*value);
  455. if (!st.ok()) {
  456. return st;
  457. }
  458. return StripTS(value);
  459. }
  460. void DBWithTTLImpl::MultiGet(const ReadOptions& options, const size_t num_keys,
  461. ColumnFamilyHandle** column_families,
  462. const Slice* keys, PinnableSlice* values,
  463. std::string* timestamps, Status* statuses,
  464. const bool /*sorted_input*/) {
  465. if (timestamps) {
  466. for (size_t i = 0; i < num_keys; ++i) {
  467. statuses[i] = Status::NotSupported(
  468. "MultiGet() returning timestamps not implemented.");
  469. }
  470. return;
  471. }
  472. db_->MultiGet(options, num_keys, column_families, keys, values, timestamps,
  473. statuses);
  474. for (size_t i = 0; i < num_keys; ++i) {
  475. if (!statuses[i].ok()) {
  476. continue;
  477. }
  478. PinnableSlice tmp_val = std::move(values[i]);
  479. values[i].PinSelf(tmp_val);
  480. assert(!values[i].IsPinned());
  481. statuses[i] = SanityCheckTimestamp(values[i]);
  482. if (!statuses[i].ok()) {
  483. continue;
  484. }
  485. statuses[i] = StripTS(&values[i]);
  486. }
  487. }
  488. bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
  489. ColumnFamilyHandle* column_family,
  490. const Slice& key, std::string* value,
  491. bool* value_found) {
  492. bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
  493. if (ret && value != nullptr && value_found != nullptr && *value_found) {
  494. if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
  495. return false;
  496. }
  497. }
  498. return ret;
  499. }
  500. Status DBWithTTLImpl::Merge(const WriteOptions& options,
  501. ColumnFamilyHandle* column_family, const Slice& key,
  502. const Slice& value) {
  503. WriteBatch batch;
  504. Status st = batch.Merge(column_family, key, value);
  505. if (st.ok()) {
  506. st = Write(options, &batch);
  507. }
  508. return st;
  509. }
  510. Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
  511. class Handler : public WriteBatch::Handler {
  512. public:
  513. explicit Handler(SystemClock* clock) : clock_(clock) {}
  514. WriteBatch updates_ttl;
  515. Status PutCF(uint32_t column_family_id, const Slice& key,
  516. const Slice& value) override {
  517. std::string value_with_ts;
  518. Status st = AppendTS(value, &value_with_ts, clock_);
  519. if (!st.ok()) {
  520. return st;
  521. }
  522. return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
  523. value_with_ts);
  524. }
  525. Status MergeCF(uint32_t column_family_id, const Slice& key,
  526. const Slice& value) override {
  527. std::string value_with_ts;
  528. Status st = AppendTS(value, &value_with_ts, clock_);
  529. if (!st.ok()) {
  530. return st;
  531. }
  532. return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
  533. value_with_ts);
  534. }
  535. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  536. return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
  537. }
  538. Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
  539. const Slice& end_key) override {
  540. return WriteBatchInternal::DeleteRange(&updates_ttl, column_family_id,
  541. begin_key, end_key);
  542. }
  543. void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
  544. private:
  545. SystemClock* clock_;
  546. };
  547. Handler handler(GetEnv()->GetSystemClock().get());
  548. Status st = updates->Iterate(&handler);
  549. if (!st.ok()) {
  550. return st;
  551. } else {
  552. return db_->Write(opts, &(handler.updates_ttl));
  553. }
  554. }
  555. Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& _read_options,
  556. ColumnFamilyHandle* column_family) {
  557. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  558. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  559. return NewErrorIterator(Status::InvalidArgument(
  560. "Can only call NewIterator with `ReadOptions::io_activity` is "
  561. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
  562. }
  563. ReadOptions read_options(_read_options);
  564. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  565. read_options.io_activity = Env::IOActivity::kDBIterator;
  566. }
  567. return new TtlIterator(db_->NewIterator(read_options, column_family));
  568. }
  569. void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) {
  570. std::shared_ptr<TtlCompactionFilterFactory> filter;
  571. Options opts;
  572. opts = GetOptions(h);
  573. filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
  574. opts.compaction_filter_factory);
  575. if (!filter) {
  576. return;
  577. }
  578. filter->SetTtl(ttl);
  579. }
  580. Status DBWithTTLImpl::GetTtl(ColumnFamilyHandle* h, int32_t* ttl) {
  581. if (h == nullptr || ttl == nullptr) {
  582. return Status::InvalidArgument(
  583. "column family handle or ttl cannot be null");
  584. }
  585. std::shared_ptr<TtlCompactionFilterFactory> filter;
  586. Options opts;
  587. opts = GetOptions(h);
  588. filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
  589. opts.compaction_filter_factory);
  590. if (!filter) {
  591. return Status::InvalidArgument(
  592. "TTLCompactionFilterFactory is not set for TTLDB");
  593. }
  594. *ttl = filter->GetTtl();
  595. return Status::OK();
  596. }
  597. } // namespace ROCKSDB_NAMESPACE