memtable_list.cc 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "db/memtable_list.h"
  7. #include <algorithm>
  8. #include <cinttypes>
  9. #include <limits>
  10. #include <queue>
  11. #include <string>
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/memtable.h"
  14. #include "db/range_tombstone_fragmenter.h"
  15. #include "db/version_set.h"
  16. #include "logging/log_buffer.h"
  17. #include "logging/logging.h"
  18. #include "monitoring/thread_status_util.h"
  19. #include "rocksdb/db.h"
  20. #include "rocksdb/env.h"
  21. #include "rocksdb/iterator.h"
  22. #include "table/merging_iterator.h"
  23. #include "test_util/sync_point.h"
  24. #include "util/coding.h"
  25. namespace ROCKSDB_NAMESPACE {
  26. class InternalKeyComparator;
  27. class Mutex;
  28. class VersionSet;
  29. void MemTableListVersion::AddMemTable(ReadOnlyMemTable* m) {
  30. if (!memlist_.empty()) {
  31. // ID can be equal for MemPurge
  32. assert(m->GetID() >= memlist_.front()->GetID());
  33. }
  34. memlist_.push_front(m);
  35. *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
  36. }
  37. void MemTableListVersion::UnrefMemTable(
  38. autovector<ReadOnlyMemTable*>* to_delete, ReadOnlyMemTable* m) {
  39. if (m->Unref()) {
  40. to_delete->push_back(m);
  41. assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
  42. *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
  43. }
  44. }
  45. MemTableListVersion::MemTableListVersion(
  46. size_t* parent_memtable_list_memory_usage, const MemTableListVersion& old)
  47. : max_write_buffer_size_to_maintain_(
  48. old.max_write_buffer_size_to_maintain_),
  49. parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
  50. memlist_ = old.memlist_;
  51. for (auto& m : memlist_) {
  52. m->Ref();
  53. }
  54. memlist_history_ = old.memlist_history_;
  55. for (auto& m : memlist_history_) {
  56. m->Ref();
  57. }
  58. }
  59. MemTableListVersion::MemTableListVersion(
  60. size_t* parent_memtable_list_memory_usage,
  61. int64_t max_write_buffer_size_to_maintain)
  62. : max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
  63. parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
  64. void MemTableListVersion::Ref() { ++refs_; }
  65. // called by superversion::clean()
  66. void MemTableListVersion::Unref(autovector<ReadOnlyMemTable*>* to_delete) {
  67. assert(refs_ >= 1);
  68. --refs_;
  69. if (refs_ == 0) {
  70. // if to_delete is equal to nullptr it means we're confident
  71. // that refs_ will not be zero
  72. assert(to_delete != nullptr);
  73. for (const auto& m : memlist_) {
  74. UnrefMemTable(to_delete, m);
  75. }
  76. for (const auto& m : memlist_history_) {
  77. UnrefMemTable(to_delete, m);
  78. }
  79. delete this;
  80. }
  81. }
  82. int MemTableList::NumNotFlushed() const {
  83. int size = current_->NumNotFlushed();
  84. assert(num_flush_not_started_ <= size);
  85. return size;
  86. }
  87. int MemTableList::NumFlushed() const { return current_->NumFlushed(); }
  88. // Search all the memtables starting from the most recent one.
  89. // Return the most recent value found, if any.
  90. // Operands stores the list of merge operations to apply, so far.
  91. bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
  92. PinnableWideColumns* columns,
  93. std::string* timestamp, Status* s,
  94. MergeContext* merge_context,
  95. SequenceNumber* max_covering_tombstone_seq,
  96. SequenceNumber* seq, const ReadOptions& read_opts,
  97. ReadCallback* callback, bool* is_blob_index) {
  98. return GetFromList(&memlist_, key, value, columns, timestamp, s,
  99. merge_context, max_covering_tombstone_seq, seq, read_opts,
  100. callback, is_blob_index);
  101. }
  102. void MemTableListVersion::MultiGet(const ReadOptions& read_options,
  103. MultiGetRange* range,
  104. ReadCallback* callback) {
  105. for (auto memtable : memlist_) {
  106. memtable->MultiGet(read_options, range, callback,
  107. true /* immutable_memtable */);
  108. if (range->empty()) {
  109. return;
  110. }
  111. }
  112. }
  113. bool MemTableListVersion::GetMergeOperands(
  114. const LookupKey& key, Status* s, MergeContext* merge_context,
  115. SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
  116. for (ReadOnlyMemTable* memtable : memlist_) {
  117. bool done = memtable->Get(
  118. key, /*value=*/nullptr, /*columns=*/nullptr, /*timestamp=*/nullptr, s,
  119. merge_context, max_covering_tombstone_seq, read_opts,
  120. true /* immutable_memtable */, nullptr, nullptr, false);
  121. if (done) {
  122. return true;
  123. }
  124. }
  125. return false;
  126. }
  127. bool MemTableListVersion::GetFromHistory(
  128. const LookupKey& key, std::string* value, PinnableWideColumns* columns,
  129. std::string* timestamp, Status* s, MergeContext* merge_context,
  130. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  131. const ReadOptions& read_opts, bool* is_blob_index) {
  132. return GetFromList(&memlist_history_, key, value, columns, timestamp, s,
  133. merge_context, max_covering_tombstone_seq, seq, read_opts,
  134. nullptr /*read_callback*/, is_blob_index);
  135. }
  136. bool MemTableListVersion::GetFromList(
  137. std::list<ReadOnlyMemTable*>* list, const LookupKey& key,
  138. std::string* value, PinnableWideColumns* columns, std::string* timestamp,
  139. Status* s, MergeContext* merge_context,
  140. SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
  141. const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
  142. *seq = kMaxSequenceNumber;
  143. for (auto& memtable : *list) {
  144. assert(memtable->IsFragmentedRangeTombstonesConstructed());
  145. SequenceNumber current_seq = kMaxSequenceNumber;
  146. bool done =
  147. memtable->Get(key, value, columns, timestamp, s, merge_context,
  148. max_covering_tombstone_seq, &current_seq, read_opts,
  149. true /* immutable_memtable */, callback, is_blob_index);
  150. if (*seq == kMaxSequenceNumber) {
  151. // Store the most recent sequence number of any operation on this key.
  152. // Since we only care about the most recent change, we only need to
  153. // return the first operation found when searching memtables in
  154. // reverse-chronological order.
  155. // current_seq would be equal to kMaxSequenceNumber if the value was to be
  156. // skipped. This allows seq to be assigned again when the next value is
  157. // read.
  158. *seq = current_seq;
  159. }
  160. if (done) {
  161. assert(*seq != kMaxSequenceNumber ||
  162. (!s->ok() && !s->IsMergeInProgress()));
  163. return true;
  164. }
  165. if (!s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
  166. return false;
  167. }
  168. }
  169. return false;
  170. }
  171. Status MemTableListVersion::AddRangeTombstoneIterators(
  172. const ReadOptions& read_opts, Arena* /*arena*/,
  173. RangeDelAggregator* range_del_agg) {
  174. assert(range_del_agg != nullptr);
  175. // Except for snapshot read, using kMaxSequenceNumber is OK because these
  176. // are immutable memtables.
  177. SequenceNumber read_seq = read_opts.snapshot != nullptr
  178. ? read_opts.snapshot->GetSequenceNumber()
  179. : kMaxSequenceNumber;
  180. for (auto& m : memlist_) {
  181. assert(m->IsFragmentedRangeTombstonesConstructed());
  182. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  183. m->NewRangeTombstoneIterator(read_opts, read_seq,
  184. true /* immutable_memtable */));
  185. range_del_agg->AddTombstones(std::move(range_del_iter));
  186. }
  187. return Status::OK();
  188. }
  189. void MemTableListVersion::AddIterators(
  190. const ReadOptions& options,
  191. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
  192. const SliceTransform* prefix_extractor,
  193. std::vector<InternalIterator*>* iterator_list, Arena* arena) {
  194. for (auto& m : memlist_) {
  195. iterator_list->push_back(m->NewIterator(options, seqno_to_time_mapping,
  196. arena, prefix_extractor,
  197. /*for_flush=*/false));
  198. }
  199. }
  200. void MemTableListVersion::AddIterators(
  201. const ReadOptions& options,
  202. UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
  203. const SliceTransform* prefix_extractor,
  204. MergeIteratorBuilder* merge_iter_builder, bool add_range_tombstone_iter) {
  205. for (auto& m : memlist_) {
  206. auto mem_iter =
  207. m->NewIterator(options, seqno_to_time_mapping,
  208. merge_iter_builder->GetArena(), prefix_extractor,
  209. /*for_flush=*/false);
  210. if (!add_range_tombstone_iter || options.ignore_range_deletions) {
  211. merge_iter_builder->AddIterator(mem_iter);
  212. } else {
  213. // Except for snapshot read, using kMaxSequenceNumber is OK because these
  214. // are immutable memtables.
  215. SequenceNumber read_seq = options.snapshot != nullptr
  216. ? options.snapshot->GetSequenceNumber()
  217. : kMaxSequenceNumber;
  218. std::unique_ptr<TruncatedRangeDelIterator> mem_tombstone_iter;
  219. auto range_del_iter = m->NewRangeTombstoneIterator(
  220. options, read_seq, true /* immutale_memtable */);
  221. if (range_del_iter == nullptr || range_del_iter->empty()) {
  222. delete range_del_iter;
  223. } else {
  224. mem_tombstone_iter = std::make_unique<TruncatedRangeDelIterator>(
  225. std::unique_ptr<FragmentedRangeTombstoneIterator>(range_del_iter),
  226. &m->GetInternalKeyComparator(), nullptr /* smallest */,
  227. nullptr /* largest */);
  228. }
  229. merge_iter_builder->AddPointAndTombstoneIterator(
  230. mem_iter, std::move(mem_tombstone_iter));
  231. }
  232. }
  233. }
  234. uint64_t MemTableListVersion::GetTotalNumEntries() const {
  235. uint64_t total_num = 0;
  236. for (auto& m : memlist_) {
  237. total_num += m->NumEntries();
  238. }
  239. return total_num;
  240. }
  241. ReadOnlyMemTable::MemTableStats MemTableListVersion::ApproximateStats(
  242. const Slice& start_ikey, const Slice& end_ikey) const {
  243. ReadOnlyMemTable::MemTableStats total_stats = {0, 0};
  244. for (auto& m : memlist_) {
  245. auto mStats = m->ApproximateStats(start_ikey, end_ikey);
  246. total_stats.size += mStats.size;
  247. total_stats.count += mStats.count;
  248. }
  249. return total_stats;
  250. }
  251. uint64_t MemTableListVersion::GetTotalNumDeletes() const {
  252. uint64_t total_num = 0;
  253. for (auto& m : memlist_) {
  254. total_num += m->NumDeletion();
  255. }
  256. return total_num;
  257. }
  258. SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
  259. bool include_history) const {
  260. if (include_history && !memlist_history_.empty()) {
  261. return memlist_history_.back()->GetEarliestSequenceNumber();
  262. } else if (!memlist_.empty()) {
  263. return memlist_.back()->GetEarliestSequenceNumber();
  264. } else {
  265. return kMaxSequenceNumber;
  266. }
  267. }
  268. SequenceNumber MemTableListVersion::GetFirstSequenceNumber() const {
  269. SequenceNumber min_first_seqno = kMaxSequenceNumber;
  270. // The first memtable in the list might not be the oldest one with mempurge
  271. for (const auto& m : memlist_) {
  272. min_first_seqno = std::min(m->GetFirstSequenceNumber(), min_first_seqno);
  273. }
  274. return min_first_seqno;
  275. }
  276. // caller is responsible for referencing m
  277. void MemTableListVersion::Add(ReadOnlyMemTable* m,
  278. autovector<ReadOnlyMemTable*>* to_delete) {
  279. assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
  280. AddMemTable(m);
  281. // m->MemoryAllocatedBytes() is added in MemoryAllocatedBytesExcludingLast
  282. TrimHistory(to_delete, 0);
  283. }
  284. // Removes m from list of memtables not flushed. Caller should NOT Unref m.
  285. void MemTableListVersion::Remove(ReadOnlyMemTable* m,
  286. autovector<ReadOnlyMemTable*>* to_delete) {
  287. assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
  288. memlist_.remove(m);
  289. m->MarkFlushed();
  290. if (max_write_buffer_size_to_maintain_ > 0) {
  291. memlist_history_.push_front(m);
  292. // Unable to get size of mutable memtable at this point, pass 0 to
  293. // TrimHistory as a best effort.
  294. TrimHistory(to_delete, 0);
  295. } else {
  296. UnrefMemTable(to_delete, m);
  297. }
  298. }
  299. // return the total memory usage assuming the oldest flushed memtable is dropped
  300. size_t MemTableListVersion::MemoryAllocatedBytesExcludingLast() const {
  301. size_t total_memtable_size = 0;
  302. for (auto& memtable : memlist_) {
  303. total_memtable_size += memtable->MemoryAllocatedBytes();
  304. }
  305. for (auto& memtable : memlist_history_) {
  306. total_memtable_size += memtable->MemoryAllocatedBytes();
  307. }
  308. if (!memlist_history_.empty()) {
  309. total_memtable_size -= memlist_history_.back()->MemoryAllocatedBytes();
  310. }
  311. return total_memtable_size;
  312. }
  313. bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
  314. if (max_write_buffer_size_to_maintain_ > 0) {
  315. // calculate the total memory usage after dropping the oldest flushed
  316. // memtable, compare with max_write_buffer_size_to_maintain_ to decide
  317. // whether to trim history
  318. return MemoryAllocatedBytesExcludingLast() + usage >=
  319. static_cast<size_t>(max_write_buffer_size_to_maintain_);
  320. } else {
  321. return false;
  322. }
  323. }
  324. bool MemTableListVersion::HistoryShouldBeTrimmed(size_t usage) {
  325. return MemtableLimitExceeded(usage) && !memlist_history_.empty();
  326. }
  327. // Make sure we don't use up too much space in history
  328. bool MemTableListVersion::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
  329. size_t usage) {
  330. bool ret = false;
  331. while (HistoryShouldBeTrimmed(usage)) {
  332. ReadOnlyMemTable* x = memlist_history_.back();
  333. memlist_history_.pop_back();
  334. UnrefMemTable(to_delete, x);
  335. ret = true;
  336. }
  337. return ret;
  338. }
  339. const Slice& MemTableListVersion::GetNewestUDT() const {
  340. static Slice kEmptySlice;
  341. for (auto it = memlist_.begin(); it != memlist_.end(); ++it) {
  342. ReadOnlyMemTable* m = *it;
  343. Slice timestamp = m->GetNewestUDT();
  344. assert(!timestamp.empty() || m->IsEmpty());
  345. if (!timestamp.empty()) {
  346. return m->GetNewestUDT();
  347. }
  348. }
  349. return kEmptySlice;
  350. }
  351. // Returns true if there is at least one memtable on which flush has
  352. // not yet started.
  353. bool MemTableList::IsFlushPending() const {
  354. if ((flush_requested_ && num_flush_not_started_ > 0) ||
  355. (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
  356. assert(imm_flush_needed.load(std::memory_order_relaxed));
  357. return true;
  358. }
  359. return false;
  360. }
  361. bool MemTableList::IsFlushPendingOrRunning() const {
  362. if (current_->memlist_.size() - num_flush_not_started_ > 0) {
  363. // Flush is already running on at least one memtable
  364. return true;
  365. }
  366. return IsFlushPending();
  367. }
  368. // Returns the memtables that need to be flushed.
  369. void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
  370. autovector<ReadOnlyMemTable*>* ret,
  371. uint64_t* max_next_log_number) {
  372. AutoThreadOperationStageUpdater stage_updater(
  373. ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
  374. const auto& memlist = current_->memlist_;
  375. bool atomic_flush = false;
  376. // Note: every time MemTableList::Add(mem) is called, it adds the new mem
  377. // at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by
  378. // iterating through the memlist starting at the end, the vector<MemTable*>
  379. // ret is filled with memtables already sorted in increasing MemTable ID.
  380. // However, when the mempurge feature is activated, new memtables with older
  381. // IDs will be added to the memlist.
  382. auto it = memlist.rbegin();
  383. for (; it != memlist.rend(); ++it) {
  384. ReadOnlyMemTable* m = *it;
  385. if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
  386. atomic_flush = true;
  387. }
  388. if (m->GetID() > max_memtable_id) {
  389. break;
  390. }
  391. if (!m->flush_in_progress_) {
  392. assert(!m->flush_completed_);
  393. num_flush_not_started_--;
  394. if (num_flush_not_started_ == 0) {
  395. imm_flush_needed.store(false, std::memory_order_release);
  396. }
  397. m->flush_in_progress_ = true; // flushing will start very soon
  398. if (max_next_log_number) {
  399. *max_next_log_number =
  400. std::max(m->GetNextLogNumber(), *max_next_log_number);
  401. }
  402. ret->push_back(m);
  403. } else if (!ret->empty()) {
  404. // This `break` is necessary to prevent picking non-consecutive memtables
  405. // in case `memlist` has one or more entries with
  406. // `flush_in_progress_ == true` sandwiched between entries with
  407. // `flush_in_progress_ == false`. This could happen after parallel flushes
  408. // are picked and the one flushing older memtables is rolled back.
  409. break;
  410. }
  411. }
  412. if (!ret->empty() && it != memlist.rend()) {
  413. // checks that the first memtable not picked to flush is not ingested wbwi.
  414. // Ingested memtable should be flushed together with the memtable before it
  415. // since they map to the same WAL and have the same NextLogNumber().
  416. assert(strcmp((*it)->Name(), "WBWIMemTable") != 0);
  417. }
  418. if (!atomic_flush || num_flush_not_started_ == 0) {
  419. flush_requested_ = false; // start-flush request is complete
  420. }
  421. }
  422. void MemTableList::RollbackMemtableFlush(
  423. const autovector<ReadOnlyMemTable*>& mems,
  424. bool rollback_succeeding_memtables) {
  425. TEST_SYNC_POINT("RollbackMemtableFlush");
  426. AutoThreadOperationStageUpdater stage_updater(
  427. ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
  428. #ifndef NDEBUG
  429. for (ReadOnlyMemTable* m : mems) {
  430. assert(m->flush_in_progress_);
  431. assert(m->file_number_ == 0);
  432. }
  433. #endif
  434. if (rollback_succeeding_memtables && !mems.empty()) {
  435. std::list<ReadOnlyMemTable*>& memlist = current_->memlist_;
  436. auto it = memlist.rbegin();
  437. for (; *it != mems[0] && it != memlist.rend(); ++it) {
  438. }
  439. // mems should be in memlist
  440. assert(*it == mems[0]);
  441. if (*it == mems[0]) {
  442. ++it;
  443. }
  444. while (it != memlist.rend()) {
  445. ReadOnlyMemTable* m = *it;
  446. // Only rollback complete, not in-progress,
  447. // in_progress can be flushes that are still writing SSTs
  448. if (m->flush_completed_) {
  449. m->flush_in_progress_ = false;
  450. m->flush_completed_ = false;
  451. m->edit_.Clear();
  452. m->file_number_ = 0;
  453. num_flush_not_started_++;
  454. ++it;
  455. } else {
  456. break;
  457. }
  458. }
  459. }
  460. for (ReadOnlyMemTable* m : mems) {
  461. if (m->flush_in_progress_) {
  462. assert(m->file_number_ == 0);
  463. m->file_number_ = 0;
  464. m->flush_in_progress_ = false;
  465. m->flush_completed_ = false;
  466. m->edit_.Clear();
  467. num_flush_not_started_++;
  468. }
  469. }
  470. if (!mems.empty()) {
  471. imm_flush_needed.store(true, std::memory_order_release);
  472. }
  473. }
  474. // Try record a successful flush in the manifest file. It might just return
  475. // Status::OK letting a concurrent flush to do actual the recording..
  476. Status MemTableList::TryInstallMemtableFlushResults(
  477. ColumnFamilyData* cfd, const autovector<ReadOnlyMemTable*>& mems,
  478. LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu,
  479. uint64_t file_number, autovector<ReadOnlyMemTable*>* to_delete,
  480. FSDirectory* db_directory, LogBuffer* log_buffer,
  481. std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
  482. bool write_edits) {
  483. AutoThreadOperationStageUpdater stage_updater(
  484. ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
  485. mu->AssertHeld();
  486. const ReadOptions read_options(Env::IOActivity::kFlush);
  487. const WriteOptions write_options(Env::IOActivity::kFlush);
  488. // Flush was successful
  489. // Record the status on the memtable object. Either this call or a call by a
  490. // concurrent flush thread will read the status and write it to manifest.
  491. for (size_t i = 0; i < mems.size(); ++i) {
  492. // All the edits are associated with the first memtable of this batch.
  493. assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
  494. mems[i]->flush_completed_ = true;
  495. mems[i]->file_number_ = file_number;
  496. }
  497. // if some other thread is already committing, then return
  498. Status s;
  499. if (commit_in_progress_) {
  500. TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
  501. return s;
  502. }
  503. // Only a single thread can be executing this piece of code
  504. commit_in_progress_ = true;
  505. // Retry until all completed flushes are committed. New flushes can finish
  506. // while the current thread is writing manifest where mutex is released.
  507. while (s.ok()) {
  508. auto& memlist = current_->memlist_;
  509. // The back is the oldest; if flush_completed_ is not set to it, it means
  510. // that we were assigned a more recent memtable. The memtables' flushes must
  511. // be recorded in manifest in order. A concurrent flush thread, who is
  512. // assigned to flush the oldest memtable, will later wake up and does all
  513. // the pending writes to manifest, in order.
  514. if (memlist.empty() || !memlist.back()->flush_completed_) {
  515. break;
  516. }
  517. // scan all memtables from the earliest, and commit those
  518. // (in that order) that have finished flushing. Memtables
  519. // are always committed in the order that they were created.
  520. uint64_t batch_file_number = 0;
  521. autovector<VersionEdit*> edit_list;
  522. autovector<ReadOnlyMemTable*> memtables_to_flush;
  523. // enumerate from the last (earliest) element to see how many batch finished
  524. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  525. ReadOnlyMemTable* m = *it;
  526. if (!m->flush_completed_) {
  527. break;
  528. }
  529. if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
  530. // Oldest memtable in a new batch.
  531. batch_file_number = m->file_number_;
  532. if (m->edit_.GetBlobFileAdditions().empty()) {
  533. ROCKS_LOG_BUFFER(log_buffer,
  534. "[%s] Level-0 commit flush result of table #%" PRIu64
  535. " started",
  536. cfd->GetName().c_str(), m->file_number_);
  537. } else {
  538. ROCKS_LOG_BUFFER(log_buffer,
  539. "[%s] Level-0 commit flush result of table #%" PRIu64
  540. " (+%zu blob files) started",
  541. cfd->GetName().c_str(), m->file_number_,
  542. m->edit_.GetBlobFileAdditions().size());
  543. }
  544. edit_list.push_back(&m->edit_);
  545. std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
  546. if (info != nullptr) {
  547. committed_flush_jobs_info->push_back(std::move(info));
  548. }
  549. }
  550. memtables_to_flush.push_back(m);
  551. }
  552. size_t num_mem_to_flush = memtables_to_flush.size();
  553. // TODO(myabandeh): Not sure how batch_count could be 0 here.
  554. if (num_mem_to_flush > 0) {
  555. VersionEdit edit;
  556. #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
  557. if (memtables_to_flush.size() == memlist.size()) {
  558. // TODO(yuzhangyu): remove this testing code once the
  559. // `GetEditForDroppingCurrentVersion` API is used by the atomic data
  560. // replacement. This function can get the same edits for wal related
  561. // fields, and some duplicated fields as contained already in edit_list
  562. // for column family's recovery.
  563. edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker);
  564. } else {
  565. edit = GetDBRecoveryEditForObsoletingMemTables(
  566. vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
  567. }
  568. #else
  569. edit = GetDBRecoveryEditForObsoletingMemTables(
  570. vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
  571. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  572. TEST_SYNC_POINT_CALLBACK(
  573. "MemTableList::TryInstallMemtableFlushResults:"
  574. "AfterComputeMinWalToKeep",
  575. nullptr);
  576. edit_list.push_back(&edit);
  577. const auto manifest_write_cb = [this, cfd, num_mem_to_flush, log_buffer,
  578. to_delete, mu](const Status& status) {
  579. RemoveMemTablesOrRestoreFlags(status, cfd, num_mem_to_flush, log_buffer,
  580. to_delete, mu);
  581. };
  582. if (write_edits) {
  583. // this can release and reacquire the mutex.
  584. s = vset->LogAndApply(cfd, read_options, write_options, edit_list, mu,
  585. db_directory, /*new_descriptor_log=*/false,
  586. /*column_family_options=*/nullptr,
  587. manifest_write_cb);
  588. } else {
  589. // If write_edit is false (e.g: successful mempurge),
  590. // then remove old memtables, wake up manifest write queue threads,
  591. // and don't commit anything to the manifest file.
  592. RemoveMemTablesOrRestoreFlags(s, cfd, num_mem_to_flush, log_buffer,
  593. to_delete, mu);
  594. // Note: cfd->SetLogNumber is only called when a VersionEdit
  595. // is written to MANIFEST. When mempurge is succesful, we skip
  596. // this step, therefore cfd->GetLogNumber is always is
  597. // earliest log with data unflushed.
  598. // Notify new head of manifest write queue.
  599. // wake up all the waiting writers
  600. // TODO(bjlemaire): explain full reason WakeUpWaitingManifestWriters
  601. // needed or investigate more.
  602. vset->WakeUpWaitingManifestWriters();
  603. }
  604. }
  605. }
  606. commit_in_progress_ = false;
  607. return s;
  608. }
  609. // New memtables are inserted at the front of the list.
  610. void MemTableList::Add(ReadOnlyMemTable* m,
  611. autovector<ReadOnlyMemTable*>* to_delete) {
  612. assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
  613. InstallNewVersion();
  614. // this method is used to move mutable memtable into an immutable list.
  615. // since mutable memtable is already refcounted by the DBImpl,
  616. // and when moving to the immutable list we don't unref it,
  617. // we don't have to ref the memtable here. we just take over the
  618. // reference from the DBImpl.
  619. current_->Add(m, to_delete);
  620. m->MarkImmutable();
  621. num_flush_not_started_++;
  622. if (num_flush_not_started_ == 1) {
  623. imm_flush_needed.store(true, std::memory_order_release);
  624. }
  625. UpdateCachedValuesFromMemTableListVersion();
  626. ResetTrimHistoryNeeded();
  627. }
  628. bool MemTableList::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
  629. size_t usage) {
  630. // Check if history trim is needed first, so that we can avoid installing a
  631. // new MemTableListVersion without installing a SuperVersion (installed based
  632. // on return value of this function).
  633. if (!current_->HistoryShouldBeTrimmed(usage)) {
  634. ResetTrimHistoryNeeded();
  635. return false;
  636. }
  637. InstallNewVersion();
  638. bool ret = current_->TrimHistory(to_delete, usage);
  639. assert(ret);
  640. UpdateCachedValuesFromMemTableListVersion();
  641. ResetTrimHistoryNeeded();
  642. return ret;
  643. }
  644. // Returns an estimate of the number of bytes of data in use.
  645. size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
  646. size_t total_size = 0;
  647. for (auto& memtable : current_->memlist_) {
  648. total_size += memtable->ApproximateMemoryUsage();
  649. }
  650. return total_size;
  651. }
  652. size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
  653. size_t MemTableList::MemoryAllocatedBytesExcludingLast() const {
  654. const size_t usage = current_memory_allocted_bytes_excluding_last_.load(
  655. std::memory_order_relaxed);
  656. return usage;
  657. }
  658. bool MemTableList::HasHistory() const {
  659. const bool has_history = current_has_history_.load(std::memory_order_relaxed);
  660. return has_history;
  661. }
  662. void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
  663. const size_t total_memtable_size =
  664. current_->MemoryAllocatedBytesExcludingLast();
  665. current_memory_allocted_bytes_excluding_last_.store(
  666. total_memtable_size, std::memory_order_relaxed);
  667. const bool has_history = current_->HasHistory();
  668. current_has_history_.store(has_history, std::memory_order_relaxed);
  669. }
  670. uint64_t MemTableList::ApproximateOldestKeyTime() const {
  671. if (!current_->memlist_.empty()) {
  672. return current_->memlist_.back()->ApproximateOldestKeyTime();
  673. }
  674. return std::numeric_limits<uint64_t>::max();
  675. }
  676. void MemTableList::InstallNewVersion() {
  677. if (current_->refs_ == 1) {
  678. // we're the only one using the version, just keep using it
  679. } else {
  680. // somebody else holds the current version, we need to create new one
  681. MemTableListVersion* version = current_;
  682. current_ = new MemTableListVersion(&current_memory_usage_, *version);
  683. current_->SetID(++last_memtable_list_version_id_);
  684. current_->Ref();
  685. version->Unref();
  686. }
  687. }
  688. void MemTableList::RemoveMemTablesOrRestoreFlags(
  689. const Status& s, ColumnFamilyData* cfd, size_t num_mem_to_flush,
  690. LogBuffer* log_buffer, autovector<ReadOnlyMemTable*>* to_delete,
  691. InstrumentedMutex* mu) {
  692. assert(mu);
  693. mu->AssertHeld();
  694. assert(to_delete);
  695. // we will be changing the version in the next code path,
  696. // so we better create a new one, since versions are immutable
  697. InstallNewVersion();
  698. // All the later memtables that have the same filenum
  699. // are part of the same batch. They can be committed now.
  700. uint64_t mem_id = 1; // how many memtables have been flushed.
  701. // commit new state only if the column family is NOT dropped.
  702. // The reason is as follows (refer to
  703. // ColumnFamilyTest.FlushAndDropRaceCondition).
  704. // If the column family is dropped, then according to LogAndApply, its
  705. // corresponding flush operation is NOT written to the MANIFEST. This
  706. // means the DB is not aware of the L0 files generated from the flush.
  707. // By committing the new state, we remove the memtable from the memtable
  708. // list. Creating an iterator on this column family will not be able to
  709. // read full data since the memtable is removed, and the DB is not aware
  710. // of the L0 files, causing MergingIterator unable to build child
  711. // iterators. RocksDB contract requires that the iterator can be created
  712. // on a dropped column family, and we must be able to
  713. // read full data as long as column family handle is not deleted, even if
  714. // the column family is dropped.
  715. if (s.ok() && !cfd->IsDropped()) { // commit new state
  716. while (num_mem_to_flush-- > 0) {
  717. ReadOnlyMemTable* m = current_->memlist_.back();
  718. // TODO: The logging can be redundant when we flush multiple memtables
  719. // into one SST file. We should only check the edit_ of the oldest
  720. // memtable in the group in that case.
  721. if (m->edit_.GetBlobFileAdditions().empty()) {
  722. ROCKS_LOG_BUFFER(log_buffer,
  723. "[%s] Level-0 commit flush result of table #%" PRIu64
  724. ": memtable #%" PRIu64 " done",
  725. cfd->GetName().c_str(), m->file_number_, mem_id);
  726. } else {
  727. ROCKS_LOG_BUFFER(log_buffer,
  728. "[%s] Level-0 commit flush result of table #%" PRIu64
  729. " (+%zu blob files)"
  730. ": memtable #%" PRIu64 " done",
  731. cfd->GetName().c_str(), m->file_number_,
  732. m->edit_.GetBlobFileAdditions().size(), mem_id);
  733. }
  734. assert(m->file_number_ > 0);
  735. current_->Remove(m, to_delete);
  736. UpdateCachedValuesFromMemTableListVersion();
  737. ResetTrimHistoryNeeded();
  738. ++mem_id;
  739. }
  740. } else {
  741. for (auto it = current_->memlist_.rbegin(); num_mem_to_flush-- > 0; ++it) {
  742. ReadOnlyMemTable* m = *it;
  743. // commit failed. setup state so that we can flush again.
  744. if (m->edit_.GetBlobFileAdditions().empty()) {
  745. ROCKS_LOG_BUFFER(log_buffer,
  746. "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64
  747. " failed",
  748. m->file_number_, mem_id);
  749. } else {
  750. ROCKS_LOG_BUFFER(log_buffer,
  751. "Level-0 commit table #%" PRIu64
  752. " (+%zu blob files)"
  753. ": memtable #%" PRIu64 " failed",
  754. m->file_number_,
  755. m->edit_.GetBlobFileAdditions().size(), mem_id);
  756. }
  757. m->flush_completed_ = false;
  758. m->flush_in_progress_ = false;
  759. m->edit_.Clear();
  760. num_flush_not_started_++;
  761. m->file_number_ = 0;
  762. imm_flush_needed.store(true, std::memory_order_release);
  763. ++mem_id;
  764. }
  765. }
  766. }
  767. uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
  768. const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) const {
  769. uint64_t min_log = 0;
  770. for (auto& m : current_->memlist_) {
  771. if (memtables_to_flush && memtables_to_flush->count(m)) {
  772. continue;
  773. }
  774. auto log = m->GetMinLogContainingPrepSection();
  775. if (log > 0 && (min_log == 0 || log < min_log)) {
  776. min_log = log;
  777. }
  778. }
  779. return min_log;
  780. }
  781. // Commit a successful atomic flush in the manifest file.
  782. Status InstallMemtableAtomicFlushResults(
  783. const autovector<MemTableList*>* imm_lists,
  784. const autovector<ColumnFamilyData*>& cfds,
  785. const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
  786. VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
  787. const autovector<FileMetaData*>& file_metas,
  788. const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
  789. committed_flush_jobs_info,
  790. autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
  791. LogBuffer* log_buffer) {
  792. AutoThreadOperationStageUpdater stage_updater(
  793. ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
  794. mu->AssertHeld();
  795. const ReadOptions read_options(Env::IOActivity::kFlush);
  796. const WriteOptions write_options(Env::IOActivity::kFlush);
  797. size_t num = mems_list.size();
  798. assert(cfds.size() == num);
  799. if (imm_lists != nullptr) {
  800. assert(imm_lists->size() == num);
  801. }
  802. if (num == 0) {
  803. return Status::OK();
  804. }
  805. for (size_t k = 0; k != num; ++k) {
  806. #ifndef NDEBUG
  807. const auto* imm =
  808. (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
  809. if (!mems_list[k]->empty()) {
  810. assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
  811. }
  812. #endif
  813. assert(nullptr != file_metas[k]);
  814. for (size_t i = 0; i != mems_list[k]->size(); ++i) {
  815. assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
  816. (*mems_list[k])[i]->SetFlushCompleted(true);
  817. (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
  818. }
  819. if (committed_flush_jobs_info[k]) {
  820. assert(!mems_list[k]->empty());
  821. assert((*mems_list[k])[0]);
  822. std::unique_ptr<FlushJobInfo> flush_job_info =
  823. (*mems_list[k])[0]->ReleaseFlushJobInfo();
  824. committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
  825. }
  826. }
  827. Status s;
  828. autovector<autovector<VersionEdit*>> edit_lists;
  829. uint32_t num_entries = 0;
  830. for (const auto mems : mems_list) {
  831. assert(mems != nullptr);
  832. autovector<VersionEdit*> edits;
  833. assert(!mems->empty());
  834. edits.emplace_back((*mems)[0]->GetEdits());
  835. ++num_entries;
  836. edit_lists.emplace_back(edits);
  837. }
  838. WalNumber min_wal_number_to_keep = 0;
  839. if (vset->db_options()->allow_2pc) {
  840. min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
  841. vset, cfds, edit_lists, mems_list, prep_tracker);
  842. } else {
  843. min_wal_number_to_keep =
  844. PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
  845. }
  846. VersionEdit wal_deletion;
  847. wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
  848. if (vset->db_options()->track_and_verify_wals_in_manifest &&
  849. min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
  850. wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
  851. }
  852. edit_lists.back().push_back(&wal_deletion);
  853. ++num_entries;
  854. // Mark the version edits as an atomic group if the number of version edits
  855. // exceeds 1.
  856. if (cfds.size() > 1) {
  857. for (size_t i = 0; i < edit_lists.size(); i++) {
  858. assert((edit_lists[i].size() == 1) ||
  859. ((edit_lists[i].size() == 2) && (i == edit_lists.size() - 1)));
  860. for (auto& e : edit_lists[i]) {
  861. e->MarkAtomicGroup(--num_entries);
  862. }
  863. }
  864. assert(0 == num_entries);
  865. }
  866. // this can release and reacquire the mutex.
  867. s = vset->LogAndApply(cfds, read_options, write_options, edit_lists, mu,
  868. db_directory);
  869. for (size_t k = 0; k != cfds.size(); ++k) {
  870. auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
  871. imm->InstallNewVersion();
  872. }
  873. if (s.ok() || s.IsColumnFamilyDropped()) {
  874. for (size_t i = 0; i != cfds.size(); ++i) {
  875. if (cfds[i]->IsDropped()) {
  876. continue;
  877. }
  878. auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
  879. for (auto m : *mems_list[i]) {
  880. assert(m->GetFileNumber() > 0);
  881. uint64_t mem_id = m->GetID();
  882. const VersionEdit* const edit = m->GetEdits();
  883. assert(edit);
  884. if (edit->GetBlobFileAdditions().empty()) {
  885. ROCKS_LOG_BUFFER(log_buffer,
  886. "[%s] Level-0 commit table #%" PRIu64
  887. ": memtable #%" PRIu64 " done",
  888. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  889. mem_id);
  890. } else {
  891. ROCKS_LOG_BUFFER(log_buffer,
  892. "[%s] Level-0 commit table #%" PRIu64
  893. " (+%zu blob files)"
  894. ": memtable #%" PRIu64 " done",
  895. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  896. edit->GetBlobFileAdditions().size(), mem_id);
  897. }
  898. imm->current_->Remove(m, to_delete);
  899. imm->UpdateCachedValuesFromMemTableListVersion();
  900. imm->ResetTrimHistoryNeeded();
  901. }
  902. }
  903. } else {
  904. for (size_t i = 0; i != cfds.size(); ++i) {
  905. auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
  906. for (auto m : *mems_list[i]) {
  907. uint64_t mem_id = m->GetID();
  908. const VersionEdit* const edit = m->GetEdits();
  909. assert(edit);
  910. if (edit->GetBlobFileAdditions().empty()) {
  911. ROCKS_LOG_BUFFER(log_buffer,
  912. "[%s] Level-0 commit table #%" PRIu64
  913. ": memtable #%" PRIu64 " failed",
  914. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  915. mem_id);
  916. } else {
  917. ROCKS_LOG_BUFFER(log_buffer,
  918. "[%s] Level-0 commit table #%" PRIu64
  919. " (+%zu blob files)"
  920. ": memtable #%" PRIu64 " failed",
  921. cfds[i]->GetName().c_str(), m->GetFileNumber(),
  922. edit->GetBlobFileAdditions().size(), mem_id);
  923. }
  924. m->SetFlushCompleted(false);
  925. m->SetFlushInProgress(false);
  926. m->GetEdits()->Clear();
  927. m->SetFileNumber(0);
  928. imm->num_flush_not_started_++;
  929. }
  930. imm->imm_flush_needed.store(true, std::memory_order_release);
  931. }
  932. }
  933. return s;
  934. }
  935. void MemTableList::RemoveOldMemTables(
  936. uint64_t log_number, autovector<ReadOnlyMemTable*>* to_delete) {
  937. assert(to_delete != nullptr);
  938. InstallNewVersion();
  939. auto& memlist = current_->memlist_;
  940. autovector<ReadOnlyMemTable*> old_memtables;
  941. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  942. ReadOnlyMemTable* mem = *it;
  943. if (mem->GetNextLogNumber() > log_number) {
  944. break;
  945. }
  946. old_memtables.push_back(mem);
  947. }
  948. for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
  949. ReadOnlyMemTable* mem = *it;
  950. current_->Remove(mem, to_delete);
  951. --num_flush_not_started_;
  952. if (0 == num_flush_not_started_) {
  953. imm_flush_needed.store(false, std::memory_order_release);
  954. }
  955. }
  956. UpdateCachedValuesFromMemTableListVersion();
  957. ResetTrimHistoryNeeded();
  958. }
  959. VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
  960. const ColumnFamilyData* cfd, VersionSet* vset,
  961. LogsWithPrepTracker* prep_tracker) const {
  962. assert(cfd);
  963. auto& memlist = current_->memlist_;
  964. if (memlist.empty()) {
  965. return VersionEdit();
  966. }
  967. uint64_t max_next_log_number = 0;
  968. autovector<VersionEdit*> edit_list;
  969. autovector<ReadOnlyMemTable*> memtables_to_drop;
  970. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
  971. ReadOnlyMemTable* m = *it;
  972. memtables_to_drop.push_back(m);
  973. max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
  974. }
  975. // Check the obsoleted MemTables' impact on WALs related to DB's recovery (min
  976. // log number to keep, a delta of WAL files to delete).
  977. VersionEdit edit_with_log_number;
  978. edit_with_log_number.SetPrevLogNumber(0);
  979. edit_with_log_number.SetLogNumber(max_next_log_number);
  980. edit_list.push_back(&edit_with_log_number);
  981. VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables(
  982. vset, *cfd, edit_list, memtables_to_drop, prep_tracker);
  983. // Set fields related to the column family's recovery.
  984. edit.SetColumnFamily(cfd->GetID());
  985. edit.SetPrevLogNumber(0);
  986. edit.SetLogNumber(max_next_log_number);
  987. return edit;
  988. }
  989. } // namespace ROCKSDB_NAMESPACE