memtable_list.cc 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "db/memtable_list.h"
  7. #include <cinttypes>
  8. #include <limits>
  9. #include <queue>
  10. #include <string>
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/memtable.h"
  13. #include "db/range_tombstone_fragmenter.h"
  14. #include "db/version_set.h"
  15. #include "logging/log_buffer.h"
  16. #include "monitoring/thread_status_util.h"
  17. #include "rocksdb/db.h"
  18. #include "rocksdb/env.h"
  19. #include "rocksdb/iterator.h"
  20. #include "table/merging_iterator.h"
  21. #include "test_util/sync_point.h"
  22. #include "util/coding.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. class InternalKeyComparator;
  25. class Mutex;
  26. class VersionSet;
  27. void MemTableListVersion::AddMemTable(MemTable* m) {
  28. memlist_.push_front(m);
  29. *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
  30. }
  31. void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
  32. MemTable* m) {
  33. if (m->Unref()) {
  34. to_delete->push_back(m);
  35. assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
  36. *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
  37. }
  38. }
  39. MemTableListVersion::MemTableListVersion(
  40. size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
  41. : max_write_buffer_number_to_maintain_(
  42. old->max_write_buffer_number_to_maintain_),
  43. max_write_buffer_size_to_maintain_(
  44. old->max_write_buffer_size_to_maintain_),
  45. parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
  46. if (old != nullptr) {
  47. memlist_ = old->memlist_;
  48. for (auto& m : memlist_) {
  49. m->Ref();
  50. }
  51. memlist_history_ = old->memlist_history_;
  52. for (auto& m : memlist_history_) {
  53. m->Ref();
  54. }
  55. }
  56. }
  57. MemTableListVersion::MemTableListVersion(
  58. size_t* parent_memtable_list_memory_usage,
  59. int max_write_buffer_number_to_maintain,
  60. int64_t max_write_buffer_size_to_maintain)
  61. : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
  62. max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
  63. parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
  64. void MemTableListVersion::Ref() { ++refs_; }
  65. // called by superversion::clean()
  66. void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
  67. assert(refs_ >= 1);
  68. --refs_;
  69. if (refs_ == 0) {
  70. // if to_delete is equal to nullptr it means we're confident
  71. // that refs_ will not be zero
  72. assert(to_delete != nullptr);
  73. for (const auto& m : memlist_) {
  74. UnrefMemTable(to_delete, m);
  75. }
  76. for (const auto& m : memlist_history_) {
  77. UnrefMemTable(to_delete, m);
  78. }
  79. delete this;
  80. }
  81. }
  82. int MemTableList::NumNotFlushed() const {
  83. int size = static_cast<int>(current_->memlist_.size());
  84. assert(num_flush_not_started_ <= size);
  85. return size;
  86. }
  87. int MemTableList::NumFlushed() const {
  88. return static_cast<int>(current_->memlist_history_.size());
  89. }
  90. // Search all the memtables starting from the most recent one.
  91. // Return the most recent value found, if any.
  92. // Operands stores the list of merge operations to apply, so far.
  93. bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
  94. Status* s, MergeContext* merge_context,
  95. SequenceNumber* max_covering_tombstone_seq,
  96. SequenceNumber* seq, const ReadOptions& read_opts,
  97. ReadCallback* callback, bool* is_blob_index) {
  98. return GetFromList(&memlist_, key, value, s, merge_context,
  99. max_covering_tombstone_seq, seq, read_opts, callback,
  100. is_blob_index);
  101. }
  102. void MemTableListVersion::MultiGet(const ReadOptions& read_options,
  103. MultiGetRange* range, ReadCallback* callback,
  104. bool* is_blob) {
  105. for (auto memtable : memlist_) {
  106. memtable->MultiGet(read_options, range, callback, is_blob);
  107. if (range->empty()) {
  108. return;
  109. }
  110. }
  111. }
  112. bool MemTableListVersion::GetMergeOperands(
  113. const LookupKey& key, Status* s, MergeContext* merge_context,
  114. SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
  115. for (MemTable* memtable : memlist_) {
  116. bool done = memtable->Get(key, nullptr, s, merge_context,
  117. max_covering_tombstone_seq, read_opts, nullptr,
  118. nullptr, false);
  119. if (done) {
  120. return true;
  121. }
  122. }
  123. return false;
  124. }
  125. bool MemTableListVersion::GetFromHistory(
  126. const LookupKey& key, std::string* value, Status* s,
  127. MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
  128. SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
  129. return GetFromList(&memlist_history_, key, value, s, merge_context,
  130. max_covering_tombstone_seq, seq, read_opts,
  131. nullptr /*read_callback*/, is_blob_index);
  132. }
  133. bool MemTableListVersion::GetFromList(
  134. std::list<MemTable*>* list, const LookupKey& key, std::string* value,
  135. Status* s, MergeContext* merge_context,
  136. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  137. const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
  138. *seq = kMaxSequenceNumber;
  139. for (auto& memtable : *list) {
  140. SequenceNumber current_seq = kMaxSequenceNumber;
  141. bool done =
  142. memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq,
  143. &current_seq, read_opts, callback, is_blob_index);
  144. if (*seq == kMaxSequenceNumber) {
  145. // Store the most recent sequence number of any operation on this key.
  146. // Since we only care about the most recent change, we only need to
  147. // return the first operation found when searching memtables in
  148. // reverse-chronological order.
  149. // current_seq would be equal to kMaxSequenceNumber if the value was to be
  150. // skipped. This allows seq to be assigned again when the next value is
  151. // read.
  152. *seq = current_seq;
  153. }
  154. if (done) {
  155. assert(*seq != kMaxSequenceNumber || s->IsNotFound());
  156. return true;
  157. }
  158. if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
  159. return false;
  160. }
  161. }
  162. return false;
  163. }
  164. Status MemTableListVersion::AddRangeTombstoneIterators(
  165. const ReadOptions& read_opts, Arena* /*arena*/,
  166. RangeDelAggregator* range_del_agg) {
  167. assert(range_del_agg != nullptr);
  168. // Except for snapshot read, using kMaxSequenceNumber is OK because these
  169. // are immutable memtables.
  170. SequenceNumber read_seq = read_opts.snapshot != nullptr
  171. ? read_opts.snapshot->GetSequenceNumber()
  172. : kMaxSequenceNumber;
  173. for (auto& m : memlist_) {
  174. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  175. m->NewRangeTombstoneIterator(read_opts, read_seq));
  176. range_del_agg->AddTombstones(std::move(range_del_iter));
  177. }
  178. return Status::OK();
  179. }
  180. void MemTableListVersion::AddIterators(
  181. const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
  182. Arena* arena) {
  183. for (auto& m : memlist_) {
  184. iterator_list->push_back(m->NewIterator(options, arena));
  185. }
  186. }
  187. void MemTableListVersion::AddIterators(
  188. const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
  189. for (auto& m : memlist_) {
  190. merge_iter_builder->AddIterator(
  191. m->NewIterator(options, merge_iter_builder->GetArena()));
  192. }
  193. }
  194. uint64_t MemTableListVersion::GetTotalNumEntries() const {
  195. uint64_t total_num = 0;
  196. for (auto& m : memlist_) {
  197. total_num += m->num_entries();
  198. }
  199. return total_num;
  200. }
  201. MemTable::MemTableStats MemTableListVersion::ApproximateStats(
  202. const Slice& start_ikey, const Slice& end_ikey) {
  203. MemTable::MemTableStats total_stats = {0, 0};
  204. for (auto& m : memlist_) {
  205. auto mStats = m->ApproximateStats(start_ikey, end_ikey);
  206. total_stats.size += mStats.size;
  207. total_stats.count += mStats.count;
  208. }
  209. return total_stats;
  210. }
  211. uint64_t MemTableListVersion::GetTotalNumDeletes() const {
  212. uint64_t total_num = 0;
  213. for (auto& m : memlist_) {
  214. total_num += m->num_deletes();
  215. }
  216. return total_num;
  217. }
  218. SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
  219. bool include_history) const {
  220. if (include_history && !memlist_history_.empty()) {
  221. return memlist_history_.back()->GetEarliestSequenceNumber();
  222. } else if (!memlist_.empty()) {
  223. return memlist_.back()->GetEarliestSequenceNumber();
  224. } else {
  225. return kMaxSequenceNumber;
  226. }
  227. }
  228. // caller is responsible for referencing m
  229. void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
  230. assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
  231. AddMemTable(m);
  232. TrimHistory(to_delete, m->ApproximateMemoryUsage());
  233. }
  234. // Removes m from list of memtables not flushed. Caller should NOT Unref m.
  235. void MemTableListVersion::Remove(MemTable* m,
  236. autovector<MemTable*>* to_delete) {
  237. assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
  238. memlist_.remove(m);
  239. m->MarkFlushed();
  240. if (max_write_buffer_size_to_maintain_ > 0 ||
  241. max_write_buffer_number_to_maintain_ > 0) {
  242. memlist_history_.push_front(m);
  243. // Unable to get size of mutable memtable at this point, pass 0 to
  244. // TrimHistory as a best effort.
  245. TrimHistory(to_delete, 0);
  246. } else {
  247. UnrefMemTable(to_delete, m);
  248. }
  249. }
  250. // return the total memory usage assuming the oldest flushed memtable is dropped
  251. size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
  252. size_t total_memtable_size = 0;
  253. for (auto& memtable : memlist_) {
  254. total_memtable_size += memtable->ApproximateMemoryUsage();
  255. }
  256. for (auto& memtable : memlist_history_) {
  257. total_memtable_size += memtable->ApproximateMemoryUsage();
  258. }
  259. if (!memlist_history_.empty()) {
  260. total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
  261. }
  262. return total_memtable_size;
  263. }
  264. bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
  265. if (max_write_buffer_size_to_maintain_ > 0) {
  266. // calculate the total memory usage after dropping the oldest flushed
  267. // memtable, compare with max_write_buffer_size_to_maintain_ to decide
  268. // whether to trim history
  269. return ApproximateMemoryUsageExcludingLast() + usage >=
  270. static_cast<size_t>(max_write_buffer_size_to_maintain_);
  271. } else if (max_write_buffer_number_to_maintain_ > 0) {
  272. return memlist_.size() + memlist_history_.size() >
  273. static_cast<size_t>(max_write_buffer_number_to_maintain_);
  274. } else {
  275. return false;
  276. }
  277. }
  278. // Make sure we don't use up too much space in history
  279. void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
  280. size_t usage) {
  281. while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
  282. MemTable* x = memlist_history_.back();
  283. memlist_history_.pop_back();
  284. UnrefMemTable(to_delete, x);
  285. }
  286. }
  287. // Returns true if there is at least one memtable on which flush has
  288. // not yet started.
  289. bool MemTableList::IsFlushPending() const {
  290. if ((flush_requested_ && num_flush_not_started_ > 0) ||
  291. (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
  292. assert(imm_flush_needed.load(std::memory_order_relaxed));
  293. return true;
  294. }
  295. return false;
  296. }
  297. // Returns the memtables that need to be flushed.
  298. void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
  299. autovector<MemTable*>* ret) {
  300. AutoThreadOperationStageUpdater stage_updater(
  301. ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
  302. const auto& memlist = current_->memlist_;
  303. bool atomic_flush = false;
  304. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  305. MemTable* m = *it;
  306. if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
  307. atomic_flush = true;
  308. }
  309. if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
  310. break;
  311. }
  312. if (!m->flush_in_progress_) {
  313. assert(!m->flush_completed_);
  314. num_flush_not_started_--;
  315. if (num_flush_not_started_ == 0) {
  316. imm_flush_needed.store(false, std::memory_order_release);
  317. }
  318. m->flush_in_progress_ = true; // flushing will start very soon
  319. ret->push_back(m);
  320. }
  321. }
  322. if (!atomic_flush || num_flush_not_started_ == 0) {
  323. flush_requested_ = false; // start-flush request is complete
  324. }
  325. }
  326. void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
  327. uint64_t /*file_number*/) {
  328. AutoThreadOperationStageUpdater stage_updater(
  329. ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
  330. assert(!mems.empty());
  331. // If the flush was not successful, then just reset state.
  332. // Maybe a succeeding attempt to flush will be successful.
  333. for (MemTable* m : mems) {
  334. assert(m->flush_in_progress_);
  335. assert(m->file_number_ == 0);
  336. m->flush_in_progress_ = false;
  337. m->flush_completed_ = false;
  338. m->edit_.Clear();
  339. num_flush_not_started_++;
  340. }
  341. imm_flush_needed.store(true, std::memory_order_release);
  342. }
  343. // Try record a successful flush in the manifest file. It might just return
  344. // Status::OK letting a concurrent flush to do actual the recording..
  345. Status MemTableList::TryInstallMemtableFlushResults(
  346. ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
  347. const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
  348. VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
  349. autovector<MemTable*>* to_delete, Directory* db_directory,
  350. LogBuffer* log_buffer,
  351. std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
  352. AutoThreadOperationStageUpdater stage_updater(
  353. ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
  354. mu->AssertHeld();
  355. // Flush was successful
  356. // Record the status on the memtable object. Either this call or a call by a
  357. // concurrent flush thread will read the status and write it to manifest.
  358. for (size_t i = 0; i < mems.size(); ++i) {
  359. // All the edits are associated with the first memtable of this batch.
  360. assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
  361. mems[i]->flush_completed_ = true;
  362. mems[i]->file_number_ = file_number;
  363. }
  364. // if some other thread is already committing, then return
  365. Status s;
  366. if (commit_in_progress_) {
  367. TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
  368. return s;
  369. }
  370. // Only a single thread can be executing this piece of code
  371. commit_in_progress_ = true;
  372. // Retry until all completed flushes are committed. New flushes can finish
  373. // while the current thread is writing manifest where mutex is released.
  374. while (s.ok()) {
  375. auto& memlist = current_->memlist_;
  376. // The back is the oldest; if flush_completed_ is not set to it, it means
  377. // that we were assigned a more recent memtable. The memtables' flushes must
  378. // be recorded in manifest in order. A concurrent flush thread, who is
  379. // assigned to flush the oldest memtable, will later wake up and does all
  380. // the pending writes to manifest, in order.
  381. if (memlist.empty() || !memlist.back()->flush_completed_) {
  382. break;
  383. }
  384. // scan all memtables from the earliest, and commit those
  385. // (in that order) that have finished flushing. Memtables
  386. // are always committed in the order that they were created.
  387. uint64_t batch_file_number = 0;
  388. size_t batch_count = 0;
  389. autovector<VersionEdit*> edit_list;
  390. autovector<MemTable*> memtables_to_flush;
  391. // enumerate from the last (earliest) element to see how many batch finished
  392. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  393. MemTable* m = *it;
  394. if (!m->flush_completed_) {
  395. break;
  396. }
  397. if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
  398. batch_file_number = m->file_number_;
  399. ROCKS_LOG_BUFFER(log_buffer,
  400. "[%s] Level-0 commit table #%" PRIu64 " started",
  401. cfd->GetName().c_str(), m->file_number_);
  402. edit_list.push_back(&m->edit_);
  403. memtables_to_flush.push_back(m);
  404. #ifndef ROCKSDB_LITE
  405. std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
  406. if (info != nullptr) {
  407. committed_flush_jobs_info->push_back(std::move(info));
  408. }
  409. #else
  410. (void)committed_flush_jobs_info;
  411. #endif // !ROCKSDB_LITE
  412. }
  413. batch_count++;
  414. }
  415. // TODO(myabandeh): Not sure how batch_count could be 0 here.
  416. if (batch_count > 0) {
  417. if (vset->db_options()->allow_2pc) {
  418. assert(edit_list.size() > 0);
  419. // We piggyback the information of earliest log file to keep in the
  420. // manifest entry for the last file flushed.
  421. edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
  422. vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
  423. }
  424. // this can release and reacquire the mutex.
  425. s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
  426. db_directory);
  427. // we will be changing the version in the next code path,
  428. // so we better create a new one, since versions are immutable
  429. InstallNewVersion();
  430. // All the later memtables that have the same filenum
  431. // are part of the same batch. They can be committed now.
  432. uint64_t mem_id = 1; // how many memtables have been flushed.
  433. // commit new state only if the column family is NOT dropped.
  434. // The reason is as follows (refer to
  435. // ColumnFamilyTest.FlushAndDropRaceCondition).
  436. // If the column family is dropped, then according to LogAndApply, its
  437. // corresponding flush operation is NOT written to the MANIFEST. This
  438. // means the DB is not aware of the L0 files generated from the flush.
  439. // By committing the new state, we remove the memtable from the memtable
  440. // list. Creating an iterator on this column family will not be able to
  441. // read full data since the memtable is removed, and the DB is not aware
  442. // of the L0 files, causing MergingIterator unable to build child
  443. // iterators. RocksDB contract requires that the iterator can be created
  444. // on a dropped column family, and we must be able to
  445. // read full data as long as column family handle is not deleted, even if
  446. // the column family is dropped.
  447. if (s.ok() && !cfd->IsDropped()) { // commit new state
  448. while (batch_count-- > 0) {
  449. MemTable* m = current_->memlist_.back();
  450. ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
  451. ": memtable #%" PRIu64 " done",
  452. cfd->GetName().c_str(), m->file_number_, mem_id);
  453. assert(m->file_number_ > 0);
  454. current_->Remove(m, to_delete);
  455. UpdateCachedValuesFromMemTableListVersion();
  456. ResetTrimHistoryNeeded();
  457. ++mem_id;
  458. }
  459. } else {
  460. for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
  461. MemTable* m = *it;
  462. // commit failed. setup state so that we can flush again.
  463. ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
  464. ": memtable #%" PRIu64 " failed",
  465. m->file_number_, mem_id);
  466. m->flush_completed_ = false;
  467. m->flush_in_progress_ = false;
  468. m->edit_.Clear();
  469. num_flush_not_started_++;
  470. m->file_number_ = 0;
  471. imm_flush_needed.store(true, std::memory_order_release);
  472. ++mem_id;
  473. }
  474. }
  475. }
  476. }
  477. commit_in_progress_ = false;
  478. return s;
  479. }
  480. // New memtables are inserted at the front of the list.
  481. void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
  482. assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
  483. InstallNewVersion();
  484. // this method is used to move mutable memtable into an immutable list.
  485. // since mutable memtable is already refcounted by the DBImpl,
  486. // and when moving to the imutable list we don't unref it,
  487. // we don't have to ref the memtable here. we just take over the
  488. // reference from the DBImpl.
  489. current_->Add(m, to_delete);
  490. m->MarkImmutable();
  491. num_flush_not_started_++;
  492. if (num_flush_not_started_ == 1) {
  493. imm_flush_needed.store(true, std::memory_order_release);
  494. }
  495. UpdateCachedValuesFromMemTableListVersion();
  496. ResetTrimHistoryNeeded();
  497. }
  498. void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
  499. InstallNewVersion();
  500. current_->TrimHistory(to_delete, usage);
  501. UpdateCachedValuesFromMemTableListVersion();
  502. ResetTrimHistoryNeeded();
  503. }
  504. // Returns an estimate of the number of bytes of data in use.
  505. size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
  506. size_t total_size = 0;
  507. for (auto& memtable : current_->memlist_) {
  508. total_size += memtable->ApproximateMemoryUsage();
  509. }
  510. return total_size;
  511. }
  512. size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
  513. size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
  514. const size_t usage =
  515. current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
  516. return usage;
  517. }
  518. bool MemTableList::HasHistory() const {
  519. const bool has_history = current_has_history_.load(std::memory_order_relaxed);
  520. return has_history;
  521. }
  522. void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
  523. const size_t total_memtable_size =
  524. current_->ApproximateMemoryUsageExcludingLast();
  525. current_memory_usage_excluding_last_.store(total_memtable_size,
  526. std::memory_order_relaxed);
  527. const bool has_history = current_->HasHistory();
  528. current_has_history_.store(has_history, std::memory_order_relaxed);
  529. }
  530. uint64_t MemTableList::ApproximateOldestKeyTime() const {
  531. if (!current_->memlist_.empty()) {
  532. return current_->memlist_.back()->ApproximateOldestKeyTime();
  533. }
  534. return std::numeric_limits<uint64_t>::max();
  535. }
  536. void MemTableList::InstallNewVersion() {
  537. if (current_->refs_ == 1) {
  538. // we're the only one using the version, just keep using it
  539. } else {
  540. // somebody else holds the current version, we need to create new one
  541. MemTableListVersion* version = current_;
  542. current_ = new MemTableListVersion(&current_memory_usage_, current_);
  543. current_->Ref();
  544. version->Unref();
  545. }
  546. }
  547. uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
  548. const autovector<MemTable*>& memtables_to_flush) {
  549. uint64_t min_log = 0;
  550. for (auto& m : current_->memlist_) {
  551. // Assume the list is very short, we can live with O(m*n). We can optimize
  552. // if the performance has some problem.
  553. bool should_skip = false;
  554. for (MemTable* m_to_flush : memtables_to_flush) {
  555. if (m == m_to_flush) {
  556. should_skip = true;
  557. break;
  558. }
  559. }
  560. if (should_skip) {
  561. continue;
  562. }
  563. auto log = m->GetMinLogContainingPrepSection();
  564. if (log > 0 && (min_log == 0 || log < min_log)) {
  565. min_log = log;
  566. }
  567. }
  568. return min_log;
  569. }
  570. // Commit a successful atomic flush in the manifest file.
  571. Status InstallMemtableAtomicFlushResults(
  572. const autovector<MemTableList*>* imm_lists,
  573. const autovector<ColumnFamilyData*>& cfds,
  574. const autovector<const MutableCFOptions*>& mutable_cf_options_list,
  575. const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
  576. InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
  577. autovector<MemTable*>* to_delete, Directory* db_directory,
  578. LogBuffer* log_buffer) {
  579. AutoThreadOperationStageUpdater stage_updater(
  580. ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
  581. mu->AssertHeld();
  582. size_t num = mems_list.size();
  583. assert(cfds.size() == num);
  584. if (imm_lists != nullptr) {
  585. assert(imm_lists->size() == num);
  586. }
  587. for (size_t k = 0; k != num; ++k) {
  588. #ifndef NDEBUG
  589. const auto* imm =
  590. (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
  591. if (!mems_list[k]->empty()) {
  592. assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
  593. }
  594. #endif
  595. assert(nullptr != file_metas[k]);
  596. for (size_t i = 0; i != mems_list[k]->size(); ++i) {
  597. assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
  598. (*mems_list[k])[i]->SetFlushCompleted(true);
  599. (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
  600. }
  601. }
  602. Status s;
  603. autovector<autovector<VersionEdit*>> edit_lists;
  604. uint32_t num_entries = 0;
  605. for (const auto mems : mems_list) {
  606. assert(mems != nullptr);
  607. autovector<VersionEdit*> edits;
  608. assert(!mems->empty());
  609. edits.emplace_back((*mems)[0]->GetEdits());
  610. ++num_entries;
  611. edit_lists.emplace_back(edits);
  612. }
  613. // Mark the version edits as an atomic group if the number of version edits
  614. // exceeds 1.
  615. if (cfds.size() > 1) {
  616. for (auto& edits : edit_lists) {
  617. assert(edits.size() == 1);
  618. edits[0]->MarkAtomicGroup(--num_entries);
  619. }
  620. assert(0 == num_entries);
  621. }
  622. // this can release and reacquire the mutex.
  623. s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
  624. db_directory);
  625. for (size_t k = 0; k != cfds.size(); ++k) {
  626. auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
  627. imm->InstallNewVersion();
  628. }
  629. if (s.ok() || s.IsColumnFamilyDropped()) {
  630. for (size_t i = 0; i != cfds.size(); ++i) {
  631. if (cfds[i]->IsDropped()) {
  632. continue;
  633. }
  634. auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
  635. for (auto m : *mems_list[i]) {
  636. assert(m->GetFileNumber() > 0);
  637. uint64_t mem_id = m->GetID();
  638. ROCKS_LOG_BUFFER(log_buffer,
  639. "[%s] Level-0 commit table #%" PRIu64
  640. ": memtable #%" PRIu64 " done",
  641. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  642. mem_id);
  643. imm->current_->Remove(m, to_delete);
  644. imm->UpdateCachedValuesFromMemTableListVersion();
  645. imm->ResetTrimHistoryNeeded();
  646. }
  647. }
  648. } else {
  649. for (size_t i = 0; i != cfds.size(); ++i) {
  650. auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
  651. for (auto m : *mems_list[i]) {
  652. uint64_t mem_id = m->GetID();
  653. ROCKS_LOG_BUFFER(log_buffer,
  654. "[%s] Level-0 commit table #%" PRIu64
  655. ": memtable #%" PRIu64 " failed",
  656. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  657. mem_id);
  658. m->SetFlushCompleted(false);
  659. m->SetFlushInProgress(false);
  660. m->GetEdits()->Clear();
  661. m->SetFileNumber(0);
  662. imm->num_flush_not_started_++;
  663. }
  664. imm->imm_flush_needed.store(true, std::memory_order_release);
  665. }
  666. }
  667. return s;
  668. }
  669. void MemTableList::RemoveOldMemTables(uint64_t log_number,
  670. autovector<MemTable*>* to_delete) {
  671. assert(to_delete != nullptr);
  672. InstallNewVersion();
  673. auto& memlist = current_->memlist_;
  674. autovector<MemTable*> old_memtables;
  675. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  676. MemTable* mem = *it;
  677. if (mem->GetNextLogNumber() > log_number) {
  678. break;
  679. }
  680. old_memtables.push_back(mem);
  681. }
  682. for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
  683. MemTable* mem = *it;
  684. current_->Remove(mem, to_delete);
  685. --num_flush_not_started_;
  686. if (0 == num_flush_not_started_) {
  687. imm_flush_needed.store(false, std::memory_order_release);
  688. }
  689. }
  690. UpdateCachedValuesFromMemTableListVersion();
  691. ResetTrimHistoryNeeded();
  692. }
  693. } // namespace ROCKSDB_NAMESPACE