db_ttl_impl.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  3. // Use of this source code is governed by a BSD-style license that can be
  4. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/ttl/db_ttl_impl.h"
  7. #include "db/write_batch_internal.h"
  8. #include "file/filename.h"
  9. #include "rocksdb/convenience.h"
  10. #include "rocksdb/env.h"
  11. #include "rocksdb/iterator.h"
  12. #include "rocksdb/utilities/db_ttl.h"
  13. #include "util/coding.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
  16. Env* env) {
  17. if (options->compaction_filter) {
  18. options->compaction_filter =
  19. new TtlCompactionFilter(ttl, env, options->compaction_filter);
  20. } else {
  21. options->compaction_filter_factory =
  22. std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
  23. ttl, env, options->compaction_filter_factory));
  24. }
  25. if (options->merge_operator) {
  26. options->merge_operator.reset(
  27. new TtlMergeOperator(options->merge_operator, env));
  28. }
  29. }
  30. // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
  31. DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
  32. DBWithTTLImpl::~DBWithTTLImpl() {
  33. if (!closed_) {
  34. Close();
  35. }
  36. }
  37. Status DBWithTTLImpl::Close() {
  38. Status ret = Status::OK();
  39. if (!closed_) {
  40. Options default_options = GetOptions();
  41. // Need to stop background compaction before getting rid of the filter
  42. CancelAllBackgroundWork(db_, /* wait = */ true);
  43. ret = db_->Close();
  44. delete default_options.compaction_filter;
  45. closed_ = true;
  46. }
  47. return ret;
  48. }
  49. Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
  50. StackableDB** dbptr, int32_t ttl, bool read_only) {
  51. DBWithTTL* db;
  52. Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
  53. if (s.ok()) {
  54. *dbptr = db;
  55. } else {
  56. *dbptr = nullptr;
  57. }
  58. return s;
  59. }
  60. Status DBWithTTL::Open(const Options& options, const std::string& dbname,
  61. DBWithTTL** dbptr, int32_t ttl, bool read_only) {
  62. DBOptions db_options(options);
  63. ColumnFamilyOptions cf_options(options);
  64. std::vector<ColumnFamilyDescriptor> column_families;
  65. column_families.push_back(
  66. ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
  67. std::vector<ColumnFamilyHandle*> handles;
  68. Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
  69. dbptr, {ttl}, read_only);
  70. if (s.ok()) {
  71. assert(handles.size() == 1);
  72. // i can delete the handle since DBImpl is always holding a reference to
  73. // default column family
  74. delete handles[0];
  75. }
  76. return s;
  77. }
  78. Status DBWithTTL::Open(
  79. const DBOptions& db_options, const std::string& dbname,
  80. const std::vector<ColumnFamilyDescriptor>& column_families,
  81. std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
  82. std::vector<int32_t> ttls, bool read_only) {
  83. if (ttls.size() != column_families.size()) {
  84. return Status::InvalidArgument(
  85. "ttls size has to be the same as number of column families");
  86. }
  87. std::vector<ColumnFamilyDescriptor> column_families_sanitized =
  88. column_families;
  89. for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
  90. DBWithTTLImpl::SanitizeOptions(
  91. ttls[i], &column_families_sanitized[i].options,
  92. db_options.env == nullptr ? Env::Default() : db_options.env);
  93. }
  94. DB* db;
  95. Status st;
  96. if (read_only) {
  97. st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
  98. handles, &db);
  99. } else {
  100. st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
  101. }
  102. if (st.ok()) {
  103. *dbptr = new DBWithTTLImpl(db);
  104. } else {
  105. *dbptr = nullptr;
  106. }
  107. return st;
  108. }
  109. Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
  110. const ColumnFamilyOptions& options, const std::string& column_family_name,
  111. ColumnFamilyHandle** handle, int ttl) {
  112. ColumnFamilyOptions sanitized_options = options;
  113. DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
  114. return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
  115. handle);
  116. }
  117. Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
  118. const std::string& column_family_name,
  119. ColumnFamilyHandle** handle) {
  120. return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
  121. }
  122. // Appends the current timestamp to the string.
  123. // Returns false if could not get the current_time, true if append succeeds
  124. Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
  125. Env* env) {
  126. val_with_ts->reserve(kTSLength + val.size());
  127. char ts_string[kTSLength];
  128. int64_t curtime;
  129. Status st = env->GetCurrentTime(&curtime);
  130. if (!st.ok()) {
  131. return st;
  132. }
  133. EncodeFixed32(ts_string, (int32_t)curtime);
  134. val_with_ts->append(val.data(), val.size());
  135. val_with_ts->append(ts_string, kTSLength);
  136. return st;
  137. }
  138. // Returns corruption if the length of the string is lesser than timestamp, or
  139. // timestamp refers to a time lesser than ttl-feature release time
  140. Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
  141. if (str.size() < kTSLength) {
  142. return Status::Corruption("Error: value's length less than timestamp's\n");
  143. }
  144. // Checks that TS is not lesser than kMinTimestamp
  145. // Gaurds against corruption & normal database opened incorrectly in ttl mode
  146. int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
  147. if (timestamp_value < kMinTimestamp) {
  148. return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
  149. }
  150. return Status::OK();
  151. }
  152. // Checks if the string is stale or not according to TTl provided
  153. bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
  154. if (ttl <= 0) { // Data is fresh if TTL is non-positive
  155. return false;
  156. }
  157. int64_t curtime;
  158. if (!env->GetCurrentTime(&curtime).ok()) {
  159. return false; // Treat the data as fresh if could not get current time
  160. }
  161. int32_t timestamp_value =
  162. DecodeFixed32(value.data() + value.size() - kTSLength);
  163. return (timestamp_value + ttl) < curtime;
  164. }
  165. // Strips the TS from the end of the slice
  166. Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
  167. Status st;
  168. if (pinnable_val->size() < kTSLength) {
  169. return Status::Corruption("Bad timestamp in key-value");
  170. }
  171. // Erasing characters which hold the TS
  172. pinnable_val->remove_suffix(kTSLength);
  173. return st;
  174. }
  175. // Strips the TS from the end of the string
  176. Status DBWithTTLImpl::StripTS(std::string* str) {
  177. Status st;
  178. if (str->length() < kTSLength) {
  179. return Status::Corruption("Bad timestamp in key-value");
  180. }
  181. // Erasing characters which hold the TS
  182. str->erase(str->length() - kTSLength, kTSLength);
  183. return st;
  184. }
  185. Status DBWithTTLImpl::Put(const WriteOptions& options,
  186. ColumnFamilyHandle* column_family, const Slice& key,
  187. const Slice& val) {
  188. WriteBatch batch;
  189. batch.Put(column_family, key, val);
  190. return Write(options, &batch);
  191. }
  192. Status DBWithTTLImpl::Get(const ReadOptions& options,
  193. ColumnFamilyHandle* column_family, const Slice& key,
  194. PinnableSlice* value) {
  195. Status st = db_->Get(options, column_family, key, value);
  196. if (!st.ok()) {
  197. return st;
  198. }
  199. st = SanityCheckTimestamp(*value);
  200. if (!st.ok()) {
  201. return st;
  202. }
  203. return StripTS(value);
  204. }
  205. std::vector<Status> DBWithTTLImpl::MultiGet(
  206. const ReadOptions& options,
  207. const std::vector<ColumnFamilyHandle*>& column_family,
  208. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  209. auto statuses = db_->MultiGet(options, column_family, keys, values);
  210. for (size_t i = 0; i < keys.size(); ++i) {
  211. if (!statuses[i].ok()) {
  212. continue;
  213. }
  214. statuses[i] = SanityCheckTimestamp((*values)[i]);
  215. if (!statuses[i].ok()) {
  216. continue;
  217. }
  218. statuses[i] = StripTS(&(*values)[i]);
  219. }
  220. return statuses;
  221. }
  222. bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
  223. ColumnFamilyHandle* column_family,
  224. const Slice& key, std::string* value,
  225. bool* value_found) {
  226. bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
  227. if (ret && value != nullptr && value_found != nullptr && *value_found) {
  228. if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
  229. return false;
  230. }
  231. }
  232. return ret;
  233. }
  234. Status DBWithTTLImpl::Merge(const WriteOptions& options,
  235. ColumnFamilyHandle* column_family, const Slice& key,
  236. const Slice& value) {
  237. WriteBatch batch;
  238. batch.Merge(column_family, key, value);
  239. return Write(options, &batch);
  240. }
  241. Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
  242. class Handler : public WriteBatch::Handler {
  243. public:
  244. explicit Handler(Env* env) : env_(env) {}
  245. WriteBatch updates_ttl;
  246. Status batch_rewrite_status;
  247. Status PutCF(uint32_t column_family_id, const Slice& key,
  248. const Slice& value) override {
  249. std::string value_with_ts;
  250. Status st = AppendTS(value, &value_with_ts, env_);
  251. if (!st.ok()) {
  252. batch_rewrite_status = st;
  253. } else {
  254. WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
  255. value_with_ts);
  256. }
  257. return Status::OK();
  258. }
  259. Status MergeCF(uint32_t column_family_id, const Slice& key,
  260. const Slice& value) override {
  261. std::string value_with_ts;
  262. Status st = AppendTS(value, &value_with_ts, env_);
  263. if (!st.ok()) {
  264. batch_rewrite_status = st;
  265. } else {
  266. WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
  267. value_with_ts);
  268. }
  269. return Status::OK();
  270. }
  271. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  272. WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
  273. return Status::OK();
  274. }
  275. void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
  276. private:
  277. Env* env_;
  278. };
  279. Handler handler(GetEnv());
  280. updates->Iterate(&handler);
  281. if (!handler.batch_rewrite_status.ok()) {
  282. return handler.batch_rewrite_status;
  283. } else {
  284. return db_->Write(opts, &(handler.updates_ttl));
  285. }
  286. }
  287. Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
  288. ColumnFamilyHandle* column_family) {
  289. return new TtlIterator(db_->NewIterator(opts, column_family));
  290. }
  291. void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
  292. std::shared_ptr<TtlCompactionFilterFactory> filter;
  293. Options opts;
  294. opts = GetOptions(h);
  295. filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
  296. opts.compaction_filter_factory);
  297. if (!filter)
  298. return;
  299. filter->SetTtl(ttl);
  300. }
  301. } // namespace ROCKSDB_NAMESPACE
  302. #endif // ROCKSDB_LITE