db_impl_readonly.cc 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/db_impl/db_impl_readonly.h"
  6. #include "db/arena_wrapped_db_iter.h"
  7. #include "db/compacted_db_impl.h"
  8. #include "db/db_impl/db_impl.h"
  9. #include "db/db_iter.h"
  10. #include "db/merge_context.h"
  11. #include "monitoring/perf_context_imp.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. #ifndef ROCKSDB_LITE
  14. DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
  15. const std::string& dbname)
  16. : DBImpl(db_options, dbname) {
  17. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  18. "Opening the db in read only mode");
  19. LogFlush(immutable_db_options_.info_log);
  20. }
  21. DBImplReadOnly::~DBImplReadOnly() {}
  22. // Implementations of the DB interface
  23. Status DBImplReadOnly::Get(const ReadOptions& read_options,
  24. ColumnFamilyHandle* column_family, const Slice& key,
  25. PinnableSlice* pinnable_val) {
  26. assert(pinnable_val != nullptr);
  27. // TODO: stopwatch DB_GET needed?, perf timer needed?
  28. PERF_TIMER_GUARD(get_snapshot_time);
  29. Status s;
  30. SequenceNumber snapshot = versions_->LastSequence();
  31. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  32. auto cfd = cfh->cfd();
  33. if (tracer_) {
  34. InstrumentedMutexLock lock(&trace_mutex_);
  35. if (tracer_) {
  36. tracer_->Get(column_family, key);
  37. }
  38. }
  39. SuperVersion* super_version = cfd->GetSuperVersion();
  40. MergeContext merge_context;
  41. SequenceNumber max_covering_tombstone_seq = 0;
  42. LookupKey lkey(key, snapshot);
  43. PERF_TIMER_STOP(get_snapshot_time);
  44. if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
  45. &max_covering_tombstone_seq, read_options)) {
  46. pinnable_val->PinSelf();
  47. RecordTick(stats_, MEMTABLE_HIT);
  48. } else {
  49. PERF_TIMER_GUARD(get_from_output_files_time);
  50. super_version->current->Get(read_options, lkey, pinnable_val, &s,
  51. &merge_context, &max_covering_tombstone_seq);
  52. RecordTick(stats_, MEMTABLE_MISS);
  53. }
  54. RecordTick(stats_, NUMBER_KEYS_READ);
  55. size_t size = pinnable_val->size();
  56. RecordTick(stats_, BYTES_READ, size);
  57. RecordInHistogram(stats_, BYTES_PER_READ, size);
  58. PERF_COUNTER_ADD(get_read_bytes, size);
  59. return s;
  60. }
  61. Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
  62. ColumnFamilyHandle* column_family) {
  63. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  64. auto cfd = cfh->cfd();
  65. SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
  66. SequenceNumber latest_snapshot = versions_->LastSequence();
  67. SequenceNumber read_seq =
  68. read_options.snapshot != nullptr
  69. ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
  70. ->number_
  71. : latest_snapshot;
  72. ReadCallback* read_callback = nullptr; // No read callback provided.
  73. auto db_iter = NewArenaWrappedDbIterator(
  74. env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
  75. read_seq,
  76. super_version->mutable_cf_options.max_sequential_skip_in_iterations,
  77. super_version->version_number, read_callback);
  78. auto internal_iter =
  79. NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
  80. db_iter->GetRangeDelAggregator(), read_seq);
  81. db_iter->SetIterUnderDBIter(internal_iter);
  82. return db_iter;
  83. }
  84. Status DBImplReadOnly::NewIterators(
  85. const ReadOptions& read_options,
  86. const std::vector<ColumnFamilyHandle*>& column_families,
  87. std::vector<Iterator*>* iterators) {
  88. ReadCallback* read_callback = nullptr; // No read callback provided.
  89. if (iterators == nullptr) {
  90. return Status::InvalidArgument("iterators not allowed to be nullptr");
  91. }
  92. iterators->clear();
  93. iterators->reserve(column_families.size());
  94. SequenceNumber latest_snapshot = versions_->LastSequence();
  95. SequenceNumber read_seq =
  96. read_options.snapshot != nullptr
  97. ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
  98. ->number_
  99. : latest_snapshot;
  100. for (auto cfh : column_families) {
  101. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
  102. auto* sv = cfd->GetSuperVersion()->Ref();
  103. auto* db_iter = NewArenaWrappedDbIterator(
  104. env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
  105. sv->mutable_cf_options.max_sequential_skip_in_iterations,
  106. sv->version_number, read_callback);
  107. auto* internal_iter =
  108. NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
  109. db_iter->GetRangeDelAggregator(), read_seq);
  110. db_iter->SetIterUnderDBIter(internal_iter);
  111. iterators->push_back(db_iter);
  112. }
  113. return Status::OK();
  114. }
  115. Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
  116. DB** dbptr, bool /*error_if_log_file_exist*/) {
  117. *dbptr = nullptr;
  118. // Try to first open DB as fully compacted DB
  119. Status s;
  120. s = CompactedDBImpl::Open(options, dbname, dbptr);
  121. if (s.ok()) {
  122. return s;
  123. }
  124. DBOptions db_options(options);
  125. ColumnFamilyOptions cf_options(options);
  126. std::vector<ColumnFamilyDescriptor> column_families;
  127. column_families.push_back(
  128. ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
  129. std::vector<ColumnFamilyHandle*> handles;
  130. s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
  131. if (s.ok()) {
  132. assert(handles.size() == 1);
  133. // i can delete the handle since DBImpl is always holding a
  134. // reference to default column family
  135. delete handles[0];
  136. }
  137. return s;
  138. }
  139. Status DB::OpenForReadOnly(
  140. const DBOptions& db_options, const std::string& dbname,
  141. const std::vector<ColumnFamilyDescriptor>& column_families,
  142. std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
  143. bool error_if_log_file_exist) {
  144. *dbptr = nullptr;
  145. handles->clear();
  146. SuperVersionContext sv_context(/* create_superversion */ true);
  147. DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
  148. impl->mutex_.Lock();
  149. Status s = impl->Recover(column_families, true /* read only */,
  150. error_if_log_file_exist);
  151. if (s.ok()) {
  152. // set column family handles
  153. for (auto cf : column_families) {
  154. auto cfd =
  155. impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
  156. if (cfd == nullptr) {
  157. s = Status::InvalidArgument("Column family not found: ", cf.name);
  158. break;
  159. }
  160. handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
  161. }
  162. }
  163. if (s.ok()) {
  164. for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
  165. sv_context.NewSuperVersion();
  166. cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
  167. }
  168. }
  169. impl->mutex_.Unlock();
  170. sv_context.Clean();
  171. if (s.ok()) {
  172. *dbptr = impl;
  173. for (auto* h : *handles) {
  174. impl->NewThreadStatusCfInfo(
  175. reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
  176. }
  177. } else {
  178. for (auto h : *handles) {
  179. delete h;
  180. }
  181. handles->clear();
  182. delete impl;
  183. }
  184. return s;
  185. }
  186. #else // !ROCKSDB_LITE
  187. Status DB::OpenForReadOnly(const Options& /*options*/,
  188. const std::string& /*dbname*/, DB** /*dbptr*/,
  189. bool /*error_if_log_file_exist*/) {
  190. return Status::NotSupported("Not supported in ROCKSDB_LITE.");
  191. }
  192. Status DB::OpenForReadOnly(
  193. const DBOptions& /*db_options*/, const std::string& /*dbname*/,
  194. const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
  195. std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
  196. bool /*error_if_log_file_exist*/) {
  197. return Status::NotSupported("Not supported in ROCKSDB_LITE.");
  198. }
  199. #endif // !ROCKSDB_LITE
  200. } // namespace ROCKSDB_NAMESPACE