compacted_db_impl.cc 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/db_impl/compacted_db_impl.h"
  6. #include "db/db_impl/db_impl.h"
  7. #include "db/version_set.h"
  8. #include "logging/logging.h"
  9. #include "table/get_context.h"
  10. #include "util/cast_util.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. CompactedDBImpl::CompactedDBImpl(const DBOptions& options,
  13. const std::string& dbname)
  14. : DBImpl(options, dbname, /*seq_per_batch*/ false, +/*batch_per_txn*/ true,
  15. /*read_only*/ true),
  16. cfd_(nullptr),
  17. version_(nullptr),
  18. user_comparator_(nullptr) {}
  19. CompactedDBImpl::~CompactedDBImpl() = default;
  20. size_t CompactedDBImpl::FindFile(const Slice& key) {
  21. size_t right = files_.num_files - 1;
  22. auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
  23. return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
  24. };
  25. return static_cast<size_t>(
  26. std::lower_bound(files_.files, files_.files + right, key, cmp) -
  27. files_.files);
  28. }
  29. Status CompactedDBImpl::Get(const ReadOptions& _read_options,
  30. ColumnFamilyHandle*, const Slice& key,
  31. PinnableSlice* value, std::string* timestamp) {
  32. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  33. _read_options.io_activity != Env::IOActivity::kGet) {
  34. return Status::InvalidArgument(
  35. "Can only call Get with `ReadOptions::io_activity` is "
  36. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  37. }
  38. ReadOptions read_options(_read_options);
  39. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  40. read_options.io_activity = Env::IOActivity::kGet;
  41. }
  42. assert(user_comparator_);
  43. if (read_options.timestamp) {
  44. Status s =
  45. FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
  46. if (!s.ok()) {
  47. return s;
  48. }
  49. if (read_options.timestamp->size() > 0) {
  50. s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
  51. *(read_options.timestamp));
  52. if (!s.ok()) {
  53. return s;
  54. }
  55. }
  56. } else {
  57. const Status s = FailIfCfHasTs(DefaultColumnFamily());
  58. if (!s.ok()) {
  59. return s;
  60. }
  61. }
  62. // Clear the timestamps for returning results so that we can distinguish
  63. // between tombstone or key that has never been written
  64. if (timestamp) {
  65. timestamp->clear();
  66. }
  67. GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
  68. std::string* ts =
  69. user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
  70. LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
  71. GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
  72. GetContext::kNotFound, lkey.user_key(), value,
  73. /*columns=*/nullptr, ts, nullptr, nullptr, true,
  74. nullptr, nullptr, nullptr, nullptr, &read_cb);
  75. const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
  76. if (user_comparator_->CompareWithoutTimestamp(
  77. key, /*a_has_ts=*/false,
  78. ExtractUserKeyAndStripTimestamp(f.smallest_key,
  79. user_comparator_->timestamp_size()),
  80. /*b_has_ts=*/false) < 0) {
  81. return Status::NotFound();
  82. }
  83. Status s = f.fd.table_reader->Get(read_options, lkey.internal_key(),
  84. &get_context, nullptr);
  85. if (!s.ok() && !s.IsNotFound()) {
  86. return s;
  87. }
  88. if (get_context.State() == GetContext::kFound) {
  89. return Status::OK();
  90. }
  91. return Status::NotFound();
  92. }
  93. void CompactedDBImpl::MultiGet(const ReadOptions& _read_options,
  94. size_t num_keys,
  95. ColumnFamilyHandle** /*column_families*/,
  96. const Slice* keys, PinnableSlice* values,
  97. std::string* timestamps, Status* statuses,
  98. const bool /*sorted_input*/) {
  99. assert(user_comparator_);
  100. Status s;
  101. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  102. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  103. s = Status::InvalidArgument(
  104. "Can only call MultiGet with `ReadOptions::io_activity` is "
  105. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  106. }
  107. ReadOptions read_options(_read_options);
  108. if (s.ok()) {
  109. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  110. read_options.io_activity = Env::IOActivity::kMultiGet;
  111. }
  112. if (read_options.timestamp) {
  113. s = FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
  114. if (s.ok()) {
  115. if (read_options.timestamp->size() > 0) {
  116. s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
  117. *(read_options.timestamp));
  118. }
  119. }
  120. } else {
  121. s = FailIfCfHasTs(DefaultColumnFamily());
  122. }
  123. }
  124. if (!s.ok()) {
  125. for (size_t i = 0; i < num_keys; ++i) {
  126. statuses[i] = s;
  127. }
  128. return;
  129. }
  130. // Clear the timestamps for returning results so that we can distinguish
  131. // between tombstone or key that has never been written
  132. if (timestamps) {
  133. for (size_t i = 0; i < num_keys; ++i) {
  134. timestamps[i].clear();
  135. }
  136. }
  137. GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
  138. autovector<TableReader*, 16> reader_list;
  139. for (size_t i = 0; i < num_keys; ++i) {
  140. const Slice& key = keys[i];
  141. LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
  142. const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
  143. if (user_comparator_->CompareWithoutTimestamp(
  144. key, /*a_has_ts=*/false,
  145. ExtractUserKeyAndStripTimestamp(f.smallest_key,
  146. user_comparator_->timestamp_size()),
  147. /*b_has_ts=*/false) < 0) {
  148. reader_list.push_back(nullptr);
  149. } else {
  150. f.fd.table_reader->Prepare(lkey.internal_key());
  151. reader_list.push_back(f.fd.table_reader);
  152. }
  153. }
  154. for (size_t i = 0; i < num_keys; ++i) {
  155. statuses[i] = Status::NotFound();
  156. }
  157. int idx = 0;
  158. for (auto* r : reader_list) {
  159. if (r != nullptr) {
  160. PinnableSlice& pinnable_val = values[idx];
  161. LookupKey lkey(keys[idx], kMaxSequenceNumber, read_options.timestamp);
  162. std::string* timestamp = timestamps ? &timestamps[idx] : nullptr;
  163. GetContext get_context(
  164. user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
  165. lkey.user_key(), &pinnable_val, /*columns=*/nullptr,
  166. user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
  167. nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
  168. Status status =
  169. r->Get(read_options, lkey.internal_key(), &get_context, nullptr);
  170. assert(static_cast<size_t>(idx) < num_keys);
  171. if (!status.ok() && !status.IsNotFound()) {
  172. statuses[idx] = status;
  173. } else {
  174. if (get_context.State() == GetContext::kFound) {
  175. statuses[idx] = Status::OK();
  176. }
  177. }
  178. }
  179. ++idx;
  180. }
  181. }
  182. Status CompactedDBImpl::Init(const Options& options) {
  183. SuperVersionContext sv_context(/* create_superversion */ true);
  184. mutex_.Lock();
  185. ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
  186. ColumnFamilyOptions(options));
  187. Status s = Recover({cf}, true /* read only */, false, true);
  188. if (s.ok()) {
  189. cfd_ = static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
  190. ->cfd();
  191. cfd_->InstallSuperVersion(&sv_context, &mutex_);
  192. }
  193. mutex_.Unlock();
  194. sv_context.Clean();
  195. if (!s.ok()) {
  196. return s;
  197. }
  198. NewThreadStatusCfInfo(cfd_);
  199. version_ = cfd_->GetSuperVersion()->current;
  200. user_comparator_ = cfd_->user_comparator();
  201. auto* vstorage = version_->storage_info();
  202. if (vstorage->num_non_empty_levels() == 0) {
  203. return Status::NotSupported("no file exists");
  204. }
  205. const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
  206. // L0 should not have files
  207. if (l0.num_files > 1) {
  208. return Status::NotSupported("L0 contain more than 1 file");
  209. }
  210. if (l0.num_files == 1) {
  211. if (vstorage->num_non_empty_levels() > 1) {
  212. return Status::NotSupported("Both L0 and other level contain files");
  213. }
  214. files_ = l0;
  215. return Status::OK();
  216. }
  217. for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
  218. if (vstorage->LevelFilesBrief(i).num_files > 0) {
  219. return Status::NotSupported("Other levels also contain files");
  220. }
  221. }
  222. int level = vstorage->num_non_empty_levels() - 1;
  223. if (vstorage->LevelFilesBrief(level).num_files > 0) {
  224. files_ = vstorage->LevelFilesBrief(level);
  225. return Status::OK();
  226. }
  227. return Status::NotSupported("no file exists");
  228. }
  229. Status CompactedDBImpl::Open(const Options& options, const std::string& dbname,
  230. std::unique_ptr<DB>* dbptr) {
  231. *dbptr = nullptr;
  232. if (options.max_open_files != -1) {
  233. return Status::InvalidArgument("require max_open_files = -1");
  234. }
  235. if (options.merge_operator.get() != nullptr) {
  236. return Status::InvalidArgument("merge operator is not supported");
  237. }
  238. DBOptions db_options(options);
  239. std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
  240. Status s = db->Init(options);
  241. if (s.ok()) {
  242. s = db->StartPeriodicTaskScheduler();
  243. }
  244. if (s.ok()) {
  245. ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
  246. "Opened the db as fully compacted mode");
  247. LogFlush(db->immutable_db_options_.info_log);
  248. *dbptr = std::move(db);
  249. }
  250. return s;
  251. }
  252. } // namespace ROCKSDB_NAMESPACE