compacted_db_impl.cc 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "db/compacted_db_impl.h"
  7. #include "db/db_impl/db_impl.h"
  8. #include "db/version_set.h"
  9. #include "table/get_context.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. extern void MarkKeyMayExist(void* arg);
  12. extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
  13. const Slice& v, bool hit_and_return);
  14. CompactedDBImpl::CompactedDBImpl(
  15. const DBOptions& options, const std::string& dbname)
  16. : DBImpl(options, dbname), cfd_(nullptr), version_(nullptr),
  17. user_comparator_(nullptr) {
  18. }
  19. CompactedDBImpl::~CompactedDBImpl() {
  20. }
  21. size_t CompactedDBImpl::FindFile(const Slice& key) {
  22. size_t right = files_.num_files - 1;
  23. auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
  24. return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
  25. };
  26. return static_cast<size_t>(std::lower_bound(files_.files,
  27. files_.files + right, key, cmp) - files_.files);
  28. }
  29. Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
  30. const Slice& key, PinnableSlice* value) {
  31. GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
  32. GetContext::kNotFound, key, value, nullptr, nullptr,
  33. true, nullptr, nullptr);
  34. LookupKey lkey(key, kMaxSequenceNumber);
  35. files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
  36. &get_context, nullptr);
  37. if (get_context.State() == GetContext::kFound) {
  38. return Status::OK();
  39. }
  40. return Status::NotFound();
  41. }
  42. std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
  43. const std::vector<ColumnFamilyHandle*>&,
  44. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  45. autovector<TableReader*, 16> reader_list;
  46. for (const auto& key : keys) {
  47. const FdWithKeyRange& f = files_.files[FindFile(key)];
  48. if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
  49. reader_list.push_back(nullptr);
  50. } else {
  51. LookupKey lkey(key, kMaxSequenceNumber);
  52. f.fd.table_reader->Prepare(lkey.internal_key());
  53. reader_list.push_back(f.fd.table_reader);
  54. }
  55. }
  56. std::vector<Status> statuses(keys.size(), Status::NotFound());
  57. values->resize(keys.size());
  58. int idx = 0;
  59. for (auto* r : reader_list) {
  60. if (r != nullptr) {
  61. PinnableSlice pinnable_val;
  62. std::string& value = (*values)[idx];
  63. GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
  64. GetContext::kNotFound, keys[idx], &pinnable_val,
  65. nullptr, nullptr, true, nullptr, nullptr);
  66. LookupKey lkey(keys[idx], kMaxSequenceNumber);
  67. r->Get(options, lkey.internal_key(), &get_context, nullptr);
  68. value.assign(pinnable_val.data(), pinnable_val.size());
  69. if (get_context.State() == GetContext::kFound) {
  70. statuses[idx] = Status::OK();
  71. }
  72. }
  73. ++idx;
  74. }
  75. return statuses;
  76. }
  77. Status CompactedDBImpl::Init(const Options& options) {
  78. SuperVersionContext sv_context(/* create_superversion */ true);
  79. mutex_.Lock();
  80. ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
  81. ColumnFamilyOptions(options));
  82. Status s = Recover({cf}, true /* read only */, false, true);
  83. if (s.ok()) {
  84. cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
  85. DefaultColumnFamily())->cfd();
  86. cfd_->InstallSuperVersion(&sv_context, &mutex_);
  87. }
  88. mutex_.Unlock();
  89. sv_context.Clean();
  90. if (!s.ok()) {
  91. return s;
  92. }
  93. NewThreadStatusCfInfo(cfd_);
  94. version_ = cfd_->GetSuperVersion()->current;
  95. user_comparator_ = cfd_->user_comparator();
  96. auto* vstorage = version_->storage_info();
  97. if (vstorage->num_non_empty_levels() == 0) {
  98. return Status::NotSupported("no file exists");
  99. }
  100. const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
  101. // L0 should not have files
  102. if (l0.num_files > 1) {
  103. return Status::NotSupported("L0 contain more than 1 file");
  104. }
  105. if (l0.num_files == 1) {
  106. if (vstorage->num_non_empty_levels() > 1) {
  107. return Status::NotSupported("Both L0 and other level contain files");
  108. }
  109. files_ = l0;
  110. return Status::OK();
  111. }
  112. for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
  113. if (vstorage->LevelFilesBrief(i).num_files > 0) {
  114. return Status::NotSupported("Other levels also contain files");
  115. }
  116. }
  117. int level = vstorage->num_non_empty_levels() - 1;
  118. if (vstorage->LevelFilesBrief(level).num_files > 0) {
  119. files_ = vstorage->LevelFilesBrief(level);
  120. return Status::OK();
  121. }
  122. return Status::NotSupported("no file exists");
  123. }
  124. Status CompactedDBImpl::Open(const Options& options,
  125. const std::string& dbname, DB** dbptr) {
  126. *dbptr = nullptr;
  127. if (options.max_open_files != -1) {
  128. return Status::InvalidArgument("require max_open_files = -1");
  129. }
  130. if (options.merge_operator.get() != nullptr) {
  131. return Status::InvalidArgument("merge operator is not supported");
  132. }
  133. DBOptions db_options(options);
  134. std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
  135. Status s = db->Init(options);
  136. if (s.ok()) {
  137. db->StartTimedTasks();
  138. ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
  139. "Opened the db as fully compacted mode");
  140. LogFlush(db->immutable_db_options_.info_log);
  141. *dbptr = db.release();
  142. }
  143. return s;
  144. }
  145. } // namespace ROCKSDB_NAMESPACE
  146. #endif // ROCKSDB_LITE