db_impl_write.cc 68 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <cinttypes>
  11. #include "db/error_handler.h"
  12. #include "db/event_helpers.h"
  13. #include "monitoring/perf_context_imp.h"
  14. #include "options/options_helper.h"
  15. #include "test_util/sync_point.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. // Convenience methods
  18. Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
  19. const Slice& key, const Slice& val) {
  20. return DB::Put(o, column_family, key, val);
  21. }
  22. Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
  23. const Slice& key, const Slice& val) {
  24. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  25. if (!cfh->cfd()->ioptions()->merge_operator) {
  26. return Status::NotSupported("Provide a merge_operator when opening DB");
  27. } else {
  28. return DB::Merge(o, column_family, key, val);
  29. }
  30. }
  31. Status DBImpl::Delete(const WriteOptions& write_options,
  32. ColumnFamilyHandle* column_family, const Slice& key) {
  33. return DB::Delete(write_options, column_family, key);
  34. }
  35. Status DBImpl::SingleDelete(const WriteOptions& write_options,
  36. ColumnFamilyHandle* column_family,
  37. const Slice& key) {
  38. return DB::SingleDelete(write_options, column_family, key);
  39. }
  40. void DBImpl::SetRecoverableStatePreReleaseCallback(
  41. PreReleaseCallback* callback) {
  42. recoverable_state_pre_release_callback_.reset(callback);
  43. }
  44. Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
  45. return WriteImpl(write_options, my_batch, nullptr, nullptr);
  46. }
  47. #ifndef ROCKSDB_LITE
  48. Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
  49. WriteBatch* my_batch,
  50. WriteCallback* callback) {
  51. return WriteImpl(write_options, my_batch, callback, nullptr);
  52. }
  53. #endif // ROCKSDB_LITE
  54. // The main write queue. This is the only write queue that updates LastSequence.
  55. // When using one write queue, the same sequence also indicates the last
  56. // published sequence.
  57. Status DBImpl::WriteImpl(const WriteOptions& write_options,
  58. WriteBatch* my_batch, WriteCallback* callback,
  59. uint64_t* log_used, uint64_t log_ref,
  60. bool disable_memtable, uint64_t* seq_used,
  61. size_t batch_cnt,
  62. PreReleaseCallback* pre_release_callback) {
  63. assert(!seq_per_batch_ || batch_cnt != 0);
  64. if (my_batch == nullptr) {
  65. return Status::Corruption("Batch is nullptr!");
  66. }
  67. if (tracer_) {
  68. InstrumentedMutexLock lock(&trace_mutex_);
  69. if (tracer_) {
  70. tracer_->Write(my_batch);
  71. }
  72. }
  73. if (write_options.sync && write_options.disableWAL) {
  74. return Status::InvalidArgument("Sync writes has to enable WAL.");
  75. }
  76. if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
  77. return Status::NotSupported(
  78. "pipelined_writes is not compatible with concurrent prepares");
  79. }
  80. if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
  81. // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
  82. return Status::NotSupported(
  83. "pipelined_writes is not compatible with seq_per_batch");
  84. }
  85. if (immutable_db_options_.unordered_write &&
  86. immutable_db_options_.enable_pipelined_write) {
  87. return Status::NotSupported(
  88. "pipelined_writes is not compatible with unordered_write");
  89. }
  90. // Otherwise IsLatestPersistentState optimization does not make sense
  91. assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
  92. disable_memtable);
  93. Status status;
  94. if (write_options.low_pri) {
  95. status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
  96. if (!status.ok()) {
  97. return status;
  98. }
  99. }
  100. if (two_write_queues_ && disable_memtable) {
  101. AssignOrder assign_order =
  102. seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
  103. // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
  104. // they don't consume sequence.
  105. return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
  106. callback, log_used, log_ref, seq_used, batch_cnt,
  107. pre_release_callback, assign_order,
  108. kDontPublishLastSeq, disable_memtable);
  109. }
  110. if (immutable_db_options_.unordered_write) {
  111. const size_t sub_batch_cnt = batch_cnt != 0
  112. ? batch_cnt
  113. // every key is a sub-batch consuming a seq
  114. : WriteBatchInternal::Count(my_batch);
  115. uint64_t seq;
  116. // Use a write thread to i) optimize for WAL write, ii) publish last
  117. // sequence in in increasing order, iii) call pre_release_callback serially
  118. status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
  119. log_used, log_ref, &seq, sub_batch_cnt,
  120. pre_release_callback, kDoAssignOrder,
  121. kDoPublishLastSeq, disable_memtable);
  122. TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
  123. if (!status.ok()) {
  124. return status;
  125. }
  126. if (seq_used) {
  127. *seq_used = seq;
  128. }
  129. if (!disable_memtable) {
  130. TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
  131. status = UnorderedWriteMemtable(write_options, my_batch, callback,
  132. log_ref, seq, sub_batch_cnt);
  133. }
  134. return status;
  135. }
  136. if (immutable_db_options_.enable_pipelined_write) {
  137. return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
  138. log_ref, disable_memtable, seq_used);
  139. }
  140. PERF_TIMER_GUARD(write_pre_and_post_process_time);
  141. WriteThread::Writer w(write_options, my_batch, callback, log_ref,
  142. disable_memtable, batch_cnt, pre_release_callback);
  143. if (!write_options.disableWAL) {
  144. RecordTick(stats_, WRITE_WITH_WAL);
  145. }
  146. StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
  147. write_thread_.JoinBatchGroup(&w);
  148. if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
  149. // we are a non-leader in a parallel group
  150. if (w.ShouldWriteToMemtable()) {
  151. PERF_TIMER_STOP(write_pre_and_post_process_time);
  152. PERF_TIMER_GUARD(write_memtable_time);
  153. ColumnFamilyMemTablesImpl column_family_memtables(
  154. versions_->GetColumnFamilySet());
  155. w.status = WriteBatchInternal::InsertInto(
  156. &w, w.sequence, &column_family_memtables, &flush_scheduler_,
  157. &trim_history_scheduler_,
  158. write_options.ignore_missing_column_families, 0 /*log_number*/, this,
  159. true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
  160. batch_per_txn_, write_options.memtable_insert_hint_per_batch);
  161. PERF_TIMER_START(write_pre_and_post_process_time);
  162. }
  163. if (write_thread_.CompleteParallelMemTableWriter(&w)) {
  164. // we're responsible for exit batch group
  165. // TODO(myabandeh): propagate status to write_group
  166. auto last_sequence = w.write_group->last_sequence;
  167. versions_->SetLastSequence(last_sequence);
  168. MemTableInsertStatusCheck(w.status);
  169. write_thread_.ExitAsBatchGroupFollower(&w);
  170. }
  171. assert(w.state == WriteThread::STATE_COMPLETED);
  172. // STATE_COMPLETED conditional below handles exit
  173. status = w.FinalStatus();
  174. }
  175. if (w.state == WriteThread::STATE_COMPLETED) {
  176. if (log_used != nullptr) {
  177. *log_used = w.log_used;
  178. }
  179. if (seq_used != nullptr) {
  180. *seq_used = w.sequence;
  181. }
  182. // write is complete and leader has updated sequence
  183. return w.FinalStatus();
  184. }
  185. // else we are the leader of the write batch group
  186. assert(w.state == WriteThread::STATE_GROUP_LEADER);
  187. // Once reaches this point, the current writer "w" will try to do its write
  188. // job. It may also pick up some of the remaining writers in the "writers_"
  189. // when it finds suitable, and finish them in the same write batch.
  190. // This is how a write job could be done by the other writer.
  191. WriteContext write_context;
  192. WriteThread::WriteGroup write_group;
  193. bool in_parallel_group = false;
  194. uint64_t last_sequence = kMaxSequenceNumber;
  195. mutex_.Lock();
  196. bool need_log_sync = write_options.sync;
  197. bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
  198. if (!two_write_queues_ || !disable_memtable) {
  199. // With concurrent writes we do preprocess only in the write thread that
  200. // also does write to memtable to avoid sync issue on shared data structure
  201. // with the other thread
  202. // PreprocessWrite does its own perf timing.
  203. PERF_TIMER_STOP(write_pre_and_post_process_time);
  204. status = PreprocessWrite(write_options, &need_log_sync, &write_context);
  205. if (!two_write_queues_) {
  206. // Assign it after ::PreprocessWrite since the sequence might advance
  207. // inside it by WriteRecoverableState
  208. last_sequence = versions_->LastSequence();
  209. }
  210. PERF_TIMER_START(write_pre_and_post_process_time);
  211. }
  212. log::Writer* log_writer = logs_.back().writer;
  213. mutex_.Unlock();
  214. // Add to log and apply to memtable. We can release the lock
  215. // during this phase since &w is currently responsible for logging
  216. // and protects against concurrent loggers and concurrent writes
  217. // into memtables
  218. TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
  219. last_batch_group_size_ =
  220. write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
  221. if (status.ok()) {
  222. // Rules for when we can update the memtable concurrently
  223. // 1. supported by memtable
  224. // 2. Puts are not okay if inplace_update_support
  225. // 3. Merges are not okay
  226. //
  227. // Rules 1..2 are enforced by checking the options
  228. // during startup (CheckConcurrentWritesSupported), so if
  229. // options.allow_concurrent_memtable_write is true then they can be
  230. // assumed to be true. Rule 3 is checked for each batch. We could
  231. // relax rules 2 if we could prevent write batches from referring
  232. // more than once to a particular key.
  233. bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
  234. write_group.size > 1;
  235. size_t total_count = 0;
  236. size_t valid_batches = 0;
  237. size_t total_byte_size = 0;
  238. size_t pre_release_callback_cnt = 0;
  239. for (auto* writer : write_group) {
  240. if (writer->CheckCallback(this)) {
  241. valid_batches += writer->batch_cnt;
  242. if (writer->ShouldWriteToMemtable()) {
  243. total_count += WriteBatchInternal::Count(writer->batch);
  244. parallel = parallel && !writer->batch->HasMerge();
  245. }
  246. total_byte_size = WriteBatchInternal::AppendedByteSize(
  247. total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
  248. if (writer->pre_release_callback) {
  249. pre_release_callback_cnt++;
  250. }
  251. }
  252. }
  253. // Note about seq_per_batch_: either disableWAL is set for the entire write
  254. // group or not. In either case we inc seq for each write batch with no
  255. // failed callback. This means that there could be a batch with
  256. // disalbe_memtable in between; although we do not write this batch to
  257. // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
  258. // the seq per valid written key to mem.
  259. size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
  260. const bool concurrent_update = two_write_queues_;
  261. // Update stats while we are an exclusive group leader, so we know
  262. // that nobody else can be writing to these particular stats.
  263. // We're optimistic, updating the stats before we successfully
  264. // commit. That lets us release our leader status early.
  265. auto stats = default_cf_internal_stats_;
  266. stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
  267. concurrent_update);
  268. RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
  269. stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
  270. concurrent_update);
  271. RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
  272. stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
  273. concurrent_update);
  274. RecordTick(stats_, WRITE_DONE_BY_SELF);
  275. auto write_done_by_other = write_group.size - 1;
  276. if (write_done_by_other > 0) {
  277. stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
  278. write_done_by_other, concurrent_update);
  279. RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
  280. }
  281. RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
  282. if (write_options.disableWAL) {
  283. has_unpersisted_data_.store(true, std::memory_order_relaxed);
  284. }
  285. PERF_TIMER_STOP(write_pre_and_post_process_time);
  286. if (!two_write_queues_) {
  287. if (status.ok() && !write_options.disableWAL) {
  288. PERF_TIMER_GUARD(write_wal_time);
  289. status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
  290. need_log_dir_sync, last_sequence + 1);
  291. }
  292. } else {
  293. if (status.ok() && !write_options.disableWAL) {
  294. PERF_TIMER_GUARD(write_wal_time);
  295. // LastAllocatedSequence is increased inside WriteToWAL under
  296. // wal_write_mutex_ to ensure ordered events in WAL
  297. status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
  298. seq_inc);
  299. } else {
  300. // Otherwise we inc seq number for memtable writes
  301. last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
  302. }
  303. }
  304. assert(last_sequence != kMaxSequenceNumber);
  305. const SequenceNumber current_sequence = last_sequence + 1;
  306. last_sequence += seq_inc;
  307. // PreReleaseCallback is called after WAL write and before memtable write
  308. if (status.ok()) {
  309. SequenceNumber next_sequence = current_sequence;
  310. size_t index = 0;
  311. // Note: the logic for advancing seq here must be consistent with the
  312. // logic in WriteBatchInternal::InsertInto(write_group...) as well as
  313. // with WriteBatchInternal::InsertInto(write_batch...) that is called on
  314. // the merged batch during recovery from the WAL.
  315. for (auto* writer : write_group) {
  316. if (writer->CallbackFailed()) {
  317. continue;
  318. }
  319. writer->sequence = next_sequence;
  320. if (writer->pre_release_callback) {
  321. Status ws = writer->pre_release_callback->Callback(
  322. writer->sequence, disable_memtable, writer->log_used, index++,
  323. pre_release_callback_cnt);
  324. if (!ws.ok()) {
  325. status = ws;
  326. break;
  327. }
  328. }
  329. if (seq_per_batch_) {
  330. assert(writer->batch_cnt);
  331. next_sequence += writer->batch_cnt;
  332. } else if (writer->ShouldWriteToMemtable()) {
  333. next_sequence += WriteBatchInternal::Count(writer->batch);
  334. }
  335. }
  336. }
  337. if (status.ok()) {
  338. PERF_TIMER_GUARD(write_memtable_time);
  339. if (!parallel) {
  340. // w.sequence will be set inside InsertInto
  341. w.status = WriteBatchInternal::InsertInto(
  342. write_group, current_sequence, column_family_memtables_.get(),
  343. &flush_scheduler_, &trim_history_scheduler_,
  344. write_options.ignore_missing_column_families,
  345. 0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
  346. batch_per_txn_);
  347. } else {
  348. write_group.last_sequence = last_sequence;
  349. write_thread_.LaunchParallelMemTableWriters(&write_group);
  350. in_parallel_group = true;
  351. // Each parallel follower is doing each own writes. The leader should
  352. // also do its own.
  353. if (w.ShouldWriteToMemtable()) {
  354. ColumnFamilyMemTablesImpl column_family_memtables(
  355. versions_->GetColumnFamilySet());
  356. assert(w.sequence == current_sequence);
  357. w.status = WriteBatchInternal::InsertInto(
  358. &w, w.sequence, &column_family_memtables, &flush_scheduler_,
  359. &trim_history_scheduler_,
  360. write_options.ignore_missing_column_families, 0 /*log_number*/,
  361. this, true /*concurrent_memtable_writes*/, seq_per_batch_,
  362. w.batch_cnt, batch_per_txn_,
  363. write_options.memtable_insert_hint_per_batch);
  364. }
  365. }
  366. if (seq_used != nullptr) {
  367. *seq_used = w.sequence;
  368. }
  369. }
  370. }
  371. PERF_TIMER_START(write_pre_and_post_process_time);
  372. if (!w.CallbackFailed()) {
  373. WriteStatusCheck(status);
  374. }
  375. if (need_log_sync) {
  376. mutex_.Lock();
  377. MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
  378. mutex_.Unlock();
  379. // Requesting sync with two_write_queues_ is expected to be very rare. We
  380. // hence provide a simple implementation that is not necessarily efficient.
  381. if (two_write_queues_) {
  382. if (manual_wal_flush_) {
  383. status = FlushWAL(true);
  384. } else {
  385. status = SyncWAL();
  386. }
  387. }
  388. }
  389. bool should_exit_batch_group = true;
  390. if (in_parallel_group) {
  391. // CompleteParallelWorker returns true if this thread should
  392. // handle exit, false means somebody else did
  393. should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
  394. }
  395. if (should_exit_batch_group) {
  396. if (status.ok()) {
  397. // Note: if we are to resume after non-OK statuses we need to revisit how
  398. // we reacts to non-OK statuses here.
  399. versions_->SetLastSequence(last_sequence);
  400. }
  401. MemTableInsertStatusCheck(w.status);
  402. write_thread_.ExitAsBatchGroupLeader(write_group, status);
  403. }
  404. if (status.ok()) {
  405. status = w.FinalStatus();
  406. }
  407. return status;
  408. }
  409. Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
  410. WriteBatch* my_batch, WriteCallback* callback,
  411. uint64_t* log_used, uint64_t log_ref,
  412. bool disable_memtable, uint64_t* seq_used) {
  413. PERF_TIMER_GUARD(write_pre_and_post_process_time);
  414. StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
  415. WriteContext write_context;
  416. WriteThread::Writer w(write_options, my_batch, callback, log_ref,
  417. disable_memtable);
  418. write_thread_.JoinBatchGroup(&w);
  419. if (w.state == WriteThread::STATE_GROUP_LEADER) {
  420. WriteThread::WriteGroup wal_write_group;
  421. if (w.callback && !w.callback->AllowWriteBatching()) {
  422. write_thread_.WaitForMemTableWriters();
  423. }
  424. mutex_.Lock();
  425. bool need_log_sync = !write_options.disableWAL && write_options.sync;
  426. bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
  427. // PreprocessWrite does its own perf timing.
  428. PERF_TIMER_STOP(write_pre_and_post_process_time);
  429. w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
  430. PERF_TIMER_START(write_pre_and_post_process_time);
  431. log::Writer* log_writer = logs_.back().writer;
  432. mutex_.Unlock();
  433. // This can set non-OK status if callback fail.
  434. last_batch_group_size_ =
  435. write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
  436. const SequenceNumber current_sequence =
  437. write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
  438. size_t total_count = 0;
  439. size_t total_byte_size = 0;
  440. if (w.status.ok()) {
  441. SequenceNumber next_sequence = current_sequence;
  442. for (auto writer : wal_write_group) {
  443. if (writer->CheckCallback(this)) {
  444. if (writer->ShouldWriteToMemtable()) {
  445. writer->sequence = next_sequence;
  446. size_t count = WriteBatchInternal::Count(writer->batch);
  447. next_sequence += count;
  448. total_count += count;
  449. }
  450. total_byte_size = WriteBatchInternal::AppendedByteSize(
  451. total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
  452. }
  453. }
  454. if (w.disable_wal) {
  455. has_unpersisted_data_.store(true, std::memory_order_relaxed);
  456. }
  457. write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
  458. }
  459. auto stats = default_cf_internal_stats_;
  460. stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
  461. RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
  462. stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
  463. RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
  464. RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
  465. PERF_TIMER_STOP(write_pre_and_post_process_time);
  466. if (w.status.ok() && !write_options.disableWAL) {
  467. PERF_TIMER_GUARD(write_wal_time);
  468. stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
  469. RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
  470. if (wal_write_group.size > 1) {
  471. stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
  472. wal_write_group.size - 1);
  473. RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
  474. }
  475. w.status = WriteToWAL(wal_write_group, log_writer, log_used,
  476. need_log_sync, need_log_dir_sync, current_sequence);
  477. }
  478. if (!w.CallbackFailed()) {
  479. WriteStatusCheck(w.status);
  480. }
  481. if (need_log_sync) {
  482. mutex_.Lock();
  483. MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
  484. mutex_.Unlock();
  485. }
  486. write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
  487. }
  488. WriteThread::WriteGroup memtable_write_group;
  489. if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
  490. PERF_TIMER_GUARD(write_memtable_time);
  491. assert(w.ShouldWriteToMemtable());
  492. write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
  493. if (memtable_write_group.size > 1 &&
  494. immutable_db_options_.allow_concurrent_memtable_write) {
  495. write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
  496. } else {
  497. memtable_write_group.status = WriteBatchInternal::InsertInto(
  498. memtable_write_group, w.sequence, column_family_memtables_.get(),
  499. &flush_scheduler_, &trim_history_scheduler_,
  500. write_options.ignore_missing_column_families, 0 /*log_number*/, this,
  501. false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
  502. versions_->SetLastSequence(memtable_write_group.last_sequence);
  503. write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
  504. }
  505. }
  506. if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
  507. assert(w.ShouldWriteToMemtable());
  508. ColumnFamilyMemTablesImpl column_family_memtables(
  509. versions_->GetColumnFamilySet());
  510. w.status = WriteBatchInternal::InsertInto(
  511. &w, w.sequence, &column_family_memtables, &flush_scheduler_,
  512. &trim_history_scheduler_, write_options.ignore_missing_column_families,
  513. 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
  514. false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
  515. write_options.memtable_insert_hint_per_batch);
  516. if (write_thread_.CompleteParallelMemTableWriter(&w)) {
  517. MemTableInsertStatusCheck(w.status);
  518. versions_->SetLastSequence(w.write_group->last_sequence);
  519. write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
  520. }
  521. }
  522. if (seq_used != nullptr) {
  523. *seq_used = w.sequence;
  524. }
  525. assert(w.state == WriteThread::STATE_COMPLETED);
  526. return w.FinalStatus();
  527. }
  528. Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
  529. WriteBatch* my_batch,
  530. WriteCallback* callback, uint64_t log_ref,
  531. SequenceNumber seq,
  532. const size_t sub_batch_cnt) {
  533. PERF_TIMER_GUARD(write_pre_and_post_process_time);
  534. StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
  535. WriteThread::Writer w(write_options, my_batch, callback, log_ref,
  536. false /*disable_memtable*/);
  537. if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
  538. w.sequence = seq;
  539. size_t total_count = WriteBatchInternal::Count(my_batch);
  540. InternalStats* stats = default_cf_internal_stats_;
  541. stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
  542. RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
  543. ColumnFamilyMemTablesImpl column_family_memtables(
  544. versions_->GetColumnFamilySet());
  545. w.status = WriteBatchInternal::InsertInto(
  546. &w, w.sequence, &column_family_memtables, &flush_scheduler_,
  547. &trim_history_scheduler_, write_options.ignore_missing_column_families,
  548. 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
  549. seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
  550. write_options.memtable_insert_hint_per_batch);
  551. WriteStatusCheck(w.status);
  552. if (write_options.disableWAL) {
  553. has_unpersisted_data_.store(true, std::memory_order_relaxed);
  554. }
  555. }
  556. size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
  557. if (pending_cnt == 0) {
  558. // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
  559. // before notify ensures that cv is in waiting state when it is notified
  560. // thus not missing the update to pending_memtable_writes_ even though it is
  561. // not modified under the mutex.
  562. std::lock_guard<std::mutex> lck(switch_mutex_);
  563. switch_cv_.notify_all();
  564. }
  565. if (!w.FinalStatus().ok()) {
  566. return w.FinalStatus();
  567. }
  568. return Status::OK();
  569. }
  570. // The 2nd write queue. If enabled it will be used only for WAL-only writes.
  571. // This is the only queue that updates LastPublishedSequence which is only
  572. // applicable in a two-queue setting.
  573. Status DBImpl::WriteImplWALOnly(
  574. WriteThread* write_thread, const WriteOptions& write_options,
  575. WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
  576. const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
  577. PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
  578. const PublishLastSeq publish_last_seq, const bool disable_memtable) {
  579. Status status;
  580. PERF_TIMER_GUARD(write_pre_and_post_process_time);
  581. WriteThread::Writer w(write_options, my_batch, callback, log_ref,
  582. disable_memtable, sub_batch_cnt, pre_release_callback);
  583. RecordTick(stats_, WRITE_WITH_WAL);
  584. StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
  585. write_thread->JoinBatchGroup(&w);
  586. assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
  587. if (w.state == WriteThread::STATE_COMPLETED) {
  588. if (log_used != nullptr) {
  589. *log_used = w.log_used;
  590. }
  591. if (seq_used != nullptr) {
  592. *seq_used = w.sequence;
  593. }
  594. return w.FinalStatus();
  595. }
  596. // else we are the leader of the write batch group
  597. assert(w.state == WriteThread::STATE_GROUP_LEADER);
  598. if (publish_last_seq == kDoPublishLastSeq) {
  599. // Currently we only use kDoPublishLastSeq in unordered_write
  600. assert(immutable_db_options_.unordered_write);
  601. WriteContext write_context;
  602. if (error_handler_.IsDBStopped()) {
  603. status = error_handler_.GetBGError();
  604. }
  605. // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
  606. // without paying the cost of obtaining the mutex.
  607. if (status.ok()) {
  608. InstrumentedMutexLock l(&mutex_);
  609. bool need_log_sync = false;
  610. status = PreprocessWrite(write_options, &need_log_sync, &write_context);
  611. WriteStatusCheck(status);
  612. }
  613. if (!status.ok()) {
  614. WriteThread::WriteGroup write_group;
  615. write_thread->EnterAsBatchGroupLeader(&w, &write_group);
  616. write_thread->ExitAsBatchGroupLeader(write_group, status);
  617. return status;
  618. }
  619. }
  620. WriteThread::WriteGroup write_group;
  621. uint64_t last_sequence;
  622. write_thread->EnterAsBatchGroupLeader(&w, &write_group);
  623. // Note: no need to update last_batch_group_size_ here since the batch writes
  624. // to WAL only
  625. size_t pre_release_callback_cnt = 0;
  626. size_t total_byte_size = 0;
  627. for (auto* writer : write_group) {
  628. if (writer->CheckCallback(this)) {
  629. total_byte_size = WriteBatchInternal::AppendedByteSize(
  630. total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
  631. if (writer->pre_release_callback) {
  632. pre_release_callback_cnt++;
  633. }
  634. }
  635. }
  636. const bool concurrent_update = true;
  637. // Update stats while we are an exclusive group leader, so we know
  638. // that nobody else can be writing to these particular stats.
  639. // We're optimistic, updating the stats before we successfully
  640. // commit. That lets us release our leader status early.
  641. auto stats = default_cf_internal_stats_;
  642. stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
  643. concurrent_update);
  644. RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
  645. stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
  646. concurrent_update);
  647. RecordTick(stats_, WRITE_DONE_BY_SELF);
  648. auto write_done_by_other = write_group.size - 1;
  649. if (write_done_by_other > 0) {
  650. stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
  651. write_done_by_other, concurrent_update);
  652. RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
  653. }
  654. RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
  655. PERF_TIMER_STOP(write_pre_and_post_process_time);
  656. PERF_TIMER_GUARD(write_wal_time);
  657. // LastAllocatedSequence is increased inside WriteToWAL under
  658. // wal_write_mutex_ to ensure ordered events in WAL
  659. size_t seq_inc = 0 /* total_count */;
  660. if (assign_order == kDoAssignOrder) {
  661. size_t total_batch_cnt = 0;
  662. for (auto* writer : write_group) {
  663. assert(writer->batch_cnt || !seq_per_batch_);
  664. if (!writer->CallbackFailed()) {
  665. total_batch_cnt += writer->batch_cnt;
  666. }
  667. }
  668. seq_inc = total_batch_cnt;
  669. }
  670. if (!write_options.disableWAL) {
  671. status =
  672. ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
  673. } else {
  674. // Otherwise we inc seq number to do solely the seq allocation
  675. last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
  676. }
  677. size_t memtable_write_cnt = 0;
  678. auto curr_seq = last_sequence + 1;
  679. for (auto* writer : write_group) {
  680. if (writer->CallbackFailed()) {
  681. continue;
  682. }
  683. writer->sequence = curr_seq;
  684. if (assign_order == kDoAssignOrder) {
  685. assert(writer->batch_cnt || !seq_per_batch_);
  686. curr_seq += writer->batch_cnt;
  687. }
  688. if (!writer->disable_memtable) {
  689. memtable_write_cnt++;
  690. }
  691. // else seq advances only by memtable writes
  692. }
  693. if (status.ok() && write_options.sync) {
  694. assert(!write_options.disableWAL);
  695. // Requesting sync with two_write_queues_ is expected to be very rare. We
  696. // hance provide a simple implementation that is not necessarily efficient.
  697. if (manual_wal_flush_) {
  698. status = FlushWAL(true);
  699. } else {
  700. status = SyncWAL();
  701. }
  702. }
  703. PERF_TIMER_START(write_pre_and_post_process_time);
  704. if (!w.CallbackFailed()) {
  705. WriteStatusCheck(status);
  706. }
  707. if (status.ok()) {
  708. size_t index = 0;
  709. for (auto* writer : write_group) {
  710. if (!writer->CallbackFailed() && writer->pre_release_callback) {
  711. assert(writer->sequence != kMaxSequenceNumber);
  712. Status ws = writer->pre_release_callback->Callback(
  713. writer->sequence, disable_memtable, writer->log_used, index++,
  714. pre_release_callback_cnt);
  715. if (!ws.ok()) {
  716. status = ws;
  717. break;
  718. }
  719. }
  720. }
  721. }
  722. if (publish_last_seq == kDoPublishLastSeq) {
  723. versions_->SetLastSequence(last_sequence + seq_inc);
  724. // Currently we only use kDoPublishLastSeq in unordered_write
  725. assert(immutable_db_options_.unordered_write);
  726. }
  727. if (immutable_db_options_.unordered_write && status.ok()) {
  728. pending_memtable_writes_ += memtable_write_cnt;
  729. }
  730. write_thread->ExitAsBatchGroupLeader(write_group, status);
  731. if (status.ok()) {
  732. status = w.FinalStatus();
  733. }
  734. if (seq_used != nullptr) {
  735. *seq_used = w.sequence;
  736. }
  737. return status;
  738. }
  739. void DBImpl::WriteStatusCheck(const Status& status) {
  740. // Is setting bg_error_ enough here? This will at least stop
  741. // compaction and fail any further writes.
  742. if (immutable_db_options_.paranoid_checks && !status.ok() &&
  743. !status.IsBusy() && !status.IsIncomplete()) {
  744. mutex_.Lock();
  745. error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
  746. mutex_.Unlock();
  747. }
  748. }
  749. void DBImpl::MemTableInsertStatusCheck(const Status& status) {
  750. // A non-OK status here indicates that the state implied by the
  751. // WAL has diverged from the in-memory state. This could be
  752. // because of a corrupt write_batch (very bad), or because the
  753. // client specified an invalid column family and didn't specify
  754. // ignore_missing_column_families.
  755. if (!status.ok()) {
  756. mutex_.Lock();
  757. assert(!error_handler_.IsBGWorkStopped());
  758. error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
  759. mutex_.Unlock();
  760. }
  761. }
  762. Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
  763. bool* need_log_sync,
  764. WriteContext* write_context) {
  765. mutex_.AssertHeld();
  766. assert(write_context != nullptr && need_log_sync != nullptr);
  767. Status status;
  768. if (error_handler_.IsDBStopped()) {
  769. status = error_handler_.GetBGError();
  770. }
  771. PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
  772. assert(!single_column_family_mode_ ||
  773. versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
  774. if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
  775. total_log_size_ > GetMaxTotalWalSize())) {
  776. WaitForPendingWrites();
  777. status = SwitchWAL(write_context);
  778. }
  779. if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
  780. // Before a new memtable is added in SwitchMemtable(),
  781. // write_buffer_manager_->ShouldFlush() will keep returning true. If another
  782. // thread is writing to another DB with the same write buffer, they may also
  783. // be flushed. We may end up with flushing much more DBs than needed. It's
  784. // suboptimal but still correct.
  785. WaitForPendingWrites();
  786. status = HandleWriteBufferFull(write_context);
  787. }
  788. if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
  789. status = TrimMemtableHistory(write_context);
  790. }
  791. if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
  792. WaitForPendingWrites();
  793. status = ScheduleFlushes(write_context);
  794. }
  795. PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
  796. PERF_TIMER_GUARD(write_pre_and_post_process_time);
  797. if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
  798. write_controller_.NeedsDelay()))) {
  799. PERF_TIMER_STOP(write_pre_and_post_process_time);
  800. PERF_TIMER_GUARD(write_delay_time);
  801. // We don't know size of curent batch so that we always use the size
  802. // for previous one. It might create a fairness issue that expiration
  803. // might happen for smaller writes but larger writes can go through.
  804. // Can optimize it if it is an issue.
  805. status = DelayWrite(last_batch_group_size_, write_options);
  806. PERF_TIMER_START(write_pre_and_post_process_time);
  807. }
  808. if (status.ok() && *need_log_sync) {
  809. // Wait until the parallel syncs are finished. Any sync process has to sync
  810. // the front log too so it is enough to check the status of front()
  811. // We do a while loop since log_sync_cv_ is signalled when any sync is
  812. // finished
  813. // Note: there does not seem to be a reason to wait for parallel sync at
  814. // this early step but it is not important since parallel sync (SyncWAL) and
  815. // need_log_sync are usually not used together.
  816. while (logs_.front().getting_synced) {
  817. log_sync_cv_.Wait();
  818. }
  819. for (auto& log : logs_) {
  820. assert(!log.getting_synced);
  821. // This is just to prevent the logs to be synced by a parallel SyncWAL
  822. // call. We will do the actual syncing later after we will write to the
  823. // WAL.
  824. // Note: there does not seem to be a reason to set this early before we
  825. // actually write to the WAL
  826. log.getting_synced = true;
  827. }
  828. } else {
  829. *need_log_sync = false;
  830. }
  831. return status;
  832. }
  833. WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
  834. WriteBatch* tmp_batch, size_t* write_with_wal,
  835. WriteBatch** to_be_cached_state) {
  836. assert(write_with_wal != nullptr);
  837. assert(tmp_batch != nullptr);
  838. assert(*to_be_cached_state == nullptr);
  839. WriteBatch* merged_batch = nullptr;
  840. *write_with_wal = 0;
  841. auto* leader = write_group.leader;
  842. assert(!leader->disable_wal); // Same holds for all in the batch group
  843. if (write_group.size == 1 && !leader->CallbackFailed() &&
  844. leader->batch->GetWalTerminationPoint().is_cleared()) {
  845. // we simply write the first WriteBatch to WAL if the group only
  846. // contains one batch, that batch should be written to the WAL,
  847. // and the batch is not wanting to be truncated
  848. merged_batch = leader->batch;
  849. if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
  850. *to_be_cached_state = merged_batch;
  851. }
  852. *write_with_wal = 1;
  853. } else {
  854. // WAL needs all of the batches flattened into a single batch.
  855. // We could avoid copying here with an iov-like AddRecord
  856. // interface
  857. merged_batch = tmp_batch;
  858. for (auto writer : write_group) {
  859. if (!writer->CallbackFailed()) {
  860. WriteBatchInternal::Append(merged_batch, writer->batch,
  861. /*WAL_only*/ true);
  862. if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
  863. // We only need to cache the last of such write batch
  864. *to_be_cached_state = writer->batch;
  865. }
  866. (*write_with_wal)++;
  867. }
  868. }
  869. }
  870. return merged_batch;
  871. }
  872. // When two_write_queues_ is disabled, this function is called from the only
  873. // write thread. Otherwise this must be called holding log_write_mutex_.
  874. Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
  875. log::Writer* log_writer, uint64_t* log_used,
  876. uint64_t* log_size) {
  877. assert(log_size != nullptr);
  878. Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
  879. *log_size = log_entry.size();
  880. // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
  881. // from the two queues anyway and log_write_mutex_ is already held. Otherwise
  882. // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
  883. // from possible concurrent calls via the FlushWAL by the application.
  884. const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
  885. // Due to performance cocerns of missed branch prediction penalize the new
  886. // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
  887. // when we do not need any locking.
  888. if (UNLIKELY(needs_locking)) {
  889. log_write_mutex_.Lock();
  890. }
  891. Status status = log_writer->AddRecord(log_entry);
  892. if (UNLIKELY(needs_locking)) {
  893. log_write_mutex_.Unlock();
  894. }
  895. if (log_used != nullptr) {
  896. *log_used = logfile_number_;
  897. }
  898. total_log_size_ += log_entry.size();
  899. // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
  900. // since alive_log_files_ might be modified concurrently
  901. alive_log_files_.back().AddSize(log_entry.size());
  902. log_empty_ = false;
  903. return status;
  904. }
  905. Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
  906. log::Writer* log_writer, uint64_t* log_used,
  907. bool need_log_sync, bool need_log_dir_sync,
  908. SequenceNumber sequence) {
  909. Status status;
  910. assert(!write_group.leader->disable_wal);
  911. // Same holds for all in the batch group
  912. size_t write_with_wal = 0;
  913. WriteBatch* to_be_cached_state = nullptr;
  914. WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
  915. &write_with_wal, &to_be_cached_state);
  916. if (merged_batch == write_group.leader->batch) {
  917. write_group.leader->log_used = logfile_number_;
  918. } else if (write_with_wal > 1) {
  919. for (auto writer : write_group) {
  920. writer->log_used = logfile_number_;
  921. }
  922. }
  923. WriteBatchInternal::SetSequence(merged_batch, sequence);
  924. uint64_t log_size;
  925. status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
  926. if (to_be_cached_state) {
  927. cached_recoverable_state_ = *to_be_cached_state;
  928. cached_recoverable_state_empty_ = false;
  929. }
  930. if (status.ok() && need_log_sync) {
  931. StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
  932. // It's safe to access logs_ with unlocked mutex_ here because:
  933. // - we've set getting_synced=true for all logs,
  934. // so other threads won't pop from logs_ while we're here,
  935. // - only writer thread can push to logs_, and we're in
  936. // writer thread, so no one will push to logs_,
  937. // - as long as other threads don't modify it, it's safe to read
  938. // from std::deque from multiple threads concurrently.
  939. for (auto& log : logs_) {
  940. status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
  941. if (!status.ok()) {
  942. break;
  943. }
  944. }
  945. if (status.ok() && need_log_dir_sync) {
  946. // We only sync WAL directory the first time WAL syncing is
  947. // requested, so that in case users never turn on WAL sync,
  948. // we can avoid the disk I/O in the write code path.
  949. status = directories_.GetWalDir()->Fsync();
  950. }
  951. }
  952. if (merged_batch == &tmp_batch_) {
  953. tmp_batch_.Clear();
  954. }
  955. if (status.ok()) {
  956. auto stats = default_cf_internal_stats_;
  957. if (need_log_sync) {
  958. stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
  959. RecordTick(stats_, WAL_FILE_SYNCED);
  960. }
  961. stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
  962. RecordTick(stats_, WAL_FILE_BYTES, log_size);
  963. stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
  964. RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
  965. }
  966. return status;
  967. }
  968. Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
  969. uint64_t* log_used,
  970. SequenceNumber* last_sequence,
  971. size_t seq_inc) {
  972. Status status;
  973. assert(!write_group.leader->disable_wal);
  974. // Same holds for all in the batch group
  975. WriteBatch tmp_batch;
  976. size_t write_with_wal = 0;
  977. WriteBatch* to_be_cached_state = nullptr;
  978. WriteBatch* merged_batch =
  979. MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
  980. // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
  981. // pushed back concurrently
  982. log_write_mutex_.Lock();
  983. if (merged_batch == write_group.leader->batch) {
  984. write_group.leader->log_used = logfile_number_;
  985. } else if (write_with_wal > 1) {
  986. for (auto writer : write_group) {
  987. writer->log_used = logfile_number_;
  988. }
  989. }
  990. *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
  991. auto sequence = *last_sequence + 1;
  992. WriteBatchInternal::SetSequence(merged_batch, sequence);
  993. log::Writer* log_writer = logs_.back().writer;
  994. uint64_t log_size;
  995. status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
  996. if (to_be_cached_state) {
  997. cached_recoverable_state_ = *to_be_cached_state;
  998. cached_recoverable_state_empty_ = false;
  999. }
  1000. log_write_mutex_.Unlock();
  1001. if (status.ok()) {
  1002. const bool concurrent = true;
  1003. auto stats = default_cf_internal_stats_;
  1004. stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
  1005. concurrent);
  1006. RecordTick(stats_, WAL_FILE_BYTES, log_size);
  1007. stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
  1008. concurrent);
  1009. RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
  1010. }
  1011. return status;
  1012. }
  1013. Status DBImpl::WriteRecoverableState() {
  1014. mutex_.AssertHeld();
  1015. if (!cached_recoverable_state_empty_) {
  1016. bool dont_care_bool;
  1017. SequenceNumber next_seq;
  1018. if (two_write_queues_) {
  1019. log_write_mutex_.Lock();
  1020. }
  1021. SequenceNumber seq;
  1022. if (two_write_queues_) {
  1023. seq = versions_->FetchAddLastAllocatedSequence(0);
  1024. } else {
  1025. seq = versions_->LastSequence();
  1026. }
  1027. WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
  1028. auto status = WriteBatchInternal::InsertInto(
  1029. &cached_recoverable_state_, column_family_memtables_.get(),
  1030. &flush_scheduler_, &trim_history_scheduler_, true,
  1031. 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
  1032. &next_seq, &dont_care_bool, seq_per_batch_);
  1033. auto last_seq = next_seq - 1;
  1034. if (two_write_queues_) {
  1035. versions_->FetchAddLastAllocatedSequence(last_seq - seq);
  1036. versions_->SetLastPublishedSequence(last_seq);
  1037. }
  1038. versions_->SetLastSequence(last_seq);
  1039. if (two_write_queues_) {
  1040. log_write_mutex_.Unlock();
  1041. }
  1042. if (status.ok() && recoverable_state_pre_release_callback_) {
  1043. const bool DISABLE_MEMTABLE = true;
  1044. for (uint64_t sub_batch_seq = seq + 1;
  1045. sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
  1046. uint64_t const no_log_num = 0;
  1047. // Unlock it since the callback might end up locking mutex. e.g.,
  1048. // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
  1049. mutex_.Unlock();
  1050. status = recoverable_state_pre_release_callback_->Callback(
  1051. sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
  1052. mutex_.Lock();
  1053. }
  1054. }
  1055. if (status.ok()) {
  1056. cached_recoverable_state_.Clear();
  1057. cached_recoverable_state_empty_ = true;
  1058. }
  1059. return status;
  1060. }
  1061. return Status::OK();
  1062. }
  1063. void DBImpl::SelectColumnFamiliesForAtomicFlush(
  1064. autovector<ColumnFamilyData*>* cfds) {
  1065. for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
  1066. if (cfd->IsDropped()) {
  1067. continue;
  1068. }
  1069. if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
  1070. !cached_recoverable_state_empty_.load()) {
  1071. cfds->push_back(cfd);
  1072. }
  1073. }
  1074. }
  1075. // Assign sequence number for atomic flush.
  1076. void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
  1077. assert(immutable_db_options_.atomic_flush);
  1078. auto seq = versions_->LastSequence();
  1079. for (auto cfd : cfds) {
  1080. cfd->imm()->AssignAtomicFlushSeq(seq);
  1081. }
  1082. }
  1083. Status DBImpl::SwitchWAL(WriteContext* write_context) {
  1084. mutex_.AssertHeld();
  1085. assert(write_context != nullptr);
  1086. Status status;
  1087. if (alive_log_files_.begin()->getting_flushed) {
  1088. return status;
  1089. }
  1090. auto oldest_alive_log = alive_log_files_.begin()->number;
  1091. bool flush_wont_release_oldest_log = false;
  1092. if (allow_2pc()) {
  1093. auto oldest_log_with_uncommitted_prep =
  1094. logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
  1095. assert(oldest_log_with_uncommitted_prep == 0 ||
  1096. oldest_log_with_uncommitted_prep >= oldest_alive_log);
  1097. if (oldest_log_with_uncommitted_prep > 0 &&
  1098. oldest_log_with_uncommitted_prep == oldest_alive_log) {
  1099. if (unable_to_release_oldest_log_) {
  1100. // we already attempted to flush all column families dependent on
  1101. // the oldest alive log but the log still contained uncommitted
  1102. // transactions so there is still nothing that we can do.
  1103. return status;
  1104. } else {
  1105. ROCKS_LOG_WARN(
  1106. immutable_db_options_.info_log,
  1107. "Unable to release oldest log due to uncommitted transaction");
  1108. unable_to_release_oldest_log_ = true;
  1109. flush_wont_release_oldest_log = true;
  1110. }
  1111. }
  1112. }
  1113. if (!flush_wont_release_oldest_log) {
  1114. // we only mark this log as getting flushed if we have successfully
  1115. // flushed all data in this log. If this log contains outstanding prepared
  1116. // transactions then we cannot flush this log until those transactions are
  1117. // commited.
  1118. unable_to_release_oldest_log_ = false;
  1119. alive_log_files_.begin()->getting_flushed = true;
  1120. }
  1121. ROCKS_LOG_INFO(
  1122. immutable_db_options_.info_log,
  1123. "Flushing all column families with data in WAL number %" PRIu64
  1124. ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
  1125. oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
  1126. // no need to refcount because drop is happening in write thread, so can't
  1127. // happen while we're in the write thread
  1128. autovector<ColumnFamilyData*> cfds;
  1129. if (immutable_db_options_.atomic_flush) {
  1130. SelectColumnFamiliesForAtomicFlush(&cfds);
  1131. } else {
  1132. for (auto cfd : *versions_->GetColumnFamilySet()) {
  1133. if (cfd->IsDropped()) {
  1134. continue;
  1135. }
  1136. if (cfd->OldestLogToKeep() <= oldest_alive_log) {
  1137. cfds.push_back(cfd);
  1138. }
  1139. }
  1140. MaybeFlushStatsCF(&cfds);
  1141. }
  1142. WriteThread::Writer nonmem_w;
  1143. if (two_write_queues_) {
  1144. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  1145. }
  1146. for (const auto cfd : cfds) {
  1147. cfd->Ref();
  1148. status = SwitchMemtable(cfd, write_context);
  1149. cfd->UnrefAndTryDelete();
  1150. if (!status.ok()) {
  1151. break;
  1152. }
  1153. }
  1154. if (two_write_queues_) {
  1155. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  1156. }
  1157. if (status.ok()) {
  1158. if (immutable_db_options_.atomic_flush) {
  1159. AssignAtomicFlushSeq(cfds);
  1160. }
  1161. for (auto cfd : cfds) {
  1162. cfd->imm()->FlushRequested();
  1163. }
  1164. FlushRequest flush_req;
  1165. GenerateFlushRequest(cfds, &flush_req);
  1166. SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
  1167. MaybeScheduleFlushOrCompaction();
  1168. }
  1169. return status;
  1170. }
  1171. Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
  1172. mutex_.AssertHeld();
  1173. assert(write_context != nullptr);
  1174. Status status;
  1175. // Before a new memtable is added in SwitchMemtable(),
  1176. // write_buffer_manager_->ShouldFlush() will keep returning true. If another
  1177. // thread is writing to another DB with the same write buffer, they may also
  1178. // be flushed. We may end up with flushing much more DBs than needed. It's
  1179. // suboptimal but still correct.
  1180. ROCKS_LOG_INFO(
  1181. immutable_db_options_.info_log,
  1182. "Flushing column family with oldest memtable entry. Write buffer is "
  1183. "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
  1184. write_buffer_manager_->memory_usage(),
  1185. write_buffer_manager_->buffer_size());
  1186. // no need to refcount because drop is happening in write thread, so can't
  1187. // happen while we're in the write thread
  1188. autovector<ColumnFamilyData*> cfds;
  1189. if (immutable_db_options_.atomic_flush) {
  1190. SelectColumnFamiliesForAtomicFlush(&cfds);
  1191. } else {
  1192. ColumnFamilyData* cfd_picked = nullptr;
  1193. SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
  1194. for (auto cfd : *versions_->GetColumnFamilySet()) {
  1195. if (cfd->IsDropped()) {
  1196. continue;
  1197. }
  1198. if (!cfd->mem()->IsEmpty()) {
  1199. // We only consider active mem table, hoping immutable memtable is
  1200. // already in the process of flushing.
  1201. uint64_t seq = cfd->mem()->GetCreationSeq();
  1202. if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
  1203. cfd_picked = cfd;
  1204. seq_num_for_cf_picked = seq;
  1205. }
  1206. }
  1207. }
  1208. if (cfd_picked != nullptr) {
  1209. cfds.push_back(cfd_picked);
  1210. }
  1211. MaybeFlushStatsCF(&cfds);
  1212. }
  1213. WriteThread::Writer nonmem_w;
  1214. if (two_write_queues_) {
  1215. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  1216. }
  1217. for (const auto cfd : cfds) {
  1218. if (cfd->mem()->IsEmpty()) {
  1219. continue;
  1220. }
  1221. cfd->Ref();
  1222. status = SwitchMemtable(cfd, write_context);
  1223. cfd->UnrefAndTryDelete();
  1224. if (!status.ok()) {
  1225. break;
  1226. }
  1227. }
  1228. if (two_write_queues_) {
  1229. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  1230. }
  1231. if (status.ok()) {
  1232. if (immutable_db_options_.atomic_flush) {
  1233. AssignAtomicFlushSeq(cfds);
  1234. }
  1235. for (const auto cfd : cfds) {
  1236. cfd->imm()->FlushRequested();
  1237. }
  1238. FlushRequest flush_req;
  1239. GenerateFlushRequest(cfds, &flush_req);
  1240. SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
  1241. MaybeScheduleFlushOrCompaction();
  1242. }
  1243. return status;
  1244. }
  1245. uint64_t DBImpl::GetMaxTotalWalSize() const {
  1246. mutex_.AssertHeld();
  1247. return mutable_db_options_.max_total_wal_size == 0
  1248. ? 4 * max_total_in_memory_state_
  1249. : mutable_db_options_.max_total_wal_size;
  1250. }
  1251. // REQUIRES: mutex_ is held
  1252. // REQUIRES: this thread is currently at the front of the writer queue
  1253. Status DBImpl::DelayWrite(uint64_t num_bytes,
  1254. const WriteOptions& write_options) {
  1255. uint64_t time_delayed = 0;
  1256. bool delayed = false;
  1257. {
  1258. StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
  1259. uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
  1260. if (delay > 0) {
  1261. if (write_options.no_slowdown) {
  1262. return Status::Incomplete("Write stall");
  1263. }
  1264. TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
  1265. // Notify write_thread_ about the stall so it can setup a barrier and
  1266. // fail any pending writers with no_slowdown
  1267. write_thread_.BeginWriteStall();
  1268. TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
  1269. mutex_.Unlock();
  1270. // We will delay the write until we have slept for delay ms or
  1271. // we don't need a delay anymore
  1272. const uint64_t kDelayInterval = 1000;
  1273. uint64_t stall_end = sw.start_time() + delay;
  1274. while (write_controller_.NeedsDelay()) {
  1275. if (env_->NowMicros() >= stall_end) {
  1276. // We already delayed this write `delay` microseconds
  1277. break;
  1278. }
  1279. delayed = true;
  1280. // Sleep for 0.001 seconds
  1281. env_->SleepForMicroseconds(kDelayInterval);
  1282. }
  1283. mutex_.Lock();
  1284. write_thread_.EndWriteStall();
  1285. }
  1286. // Don't wait if there's a background error, even if its a soft error. We
  1287. // might wait here indefinitely as the background compaction may never
  1288. // finish successfully, resulting in the stall condition lasting
  1289. // indefinitely
  1290. while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
  1291. if (write_options.no_slowdown) {
  1292. return Status::Incomplete("Write stall");
  1293. }
  1294. delayed = true;
  1295. // Notify write_thread_ about the stall so it can setup a barrier and
  1296. // fail any pending writers with no_slowdown
  1297. write_thread_.BeginWriteStall();
  1298. TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
  1299. bg_cv_.Wait();
  1300. write_thread_.EndWriteStall();
  1301. }
  1302. }
  1303. assert(!delayed || !write_options.no_slowdown);
  1304. if (delayed) {
  1305. default_cf_internal_stats_->AddDBStats(
  1306. InternalStats::kIntStatsWriteStallMicros, time_delayed);
  1307. RecordTick(stats_, STALL_MICROS, time_delayed);
  1308. }
  1309. // If DB is not in read-only mode and write_controller is not stopping
  1310. // writes, we can ignore any background errors and allow the write to
  1311. // proceed
  1312. Status s;
  1313. if (write_controller_.IsStopped()) {
  1314. // If writes are still stopped, it means we bailed due to a background
  1315. // error
  1316. s = Status::Incomplete(error_handler_.GetBGError().ToString());
  1317. }
  1318. if (error_handler_.IsDBStopped()) {
  1319. s = error_handler_.GetBGError();
  1320. }
  1321. return s;
  1322. }
  1323. Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
  1324. WriteBatch* my_batch) {
  1325. assert(write_options.low_pri);
  1326. // This is called outside the DB mutex. Although it is safe to make the call,
  1327. // the consistency condition is not guaranteed to hold. It's OK to live with
  1328. // it in this case.
  1329. // If we need to speed compaction, it means the compaction is left behind
  1330. // and we start to limit low pri writes to a limit.
  1331. if (write_controller_.NeedSpeedupCompaction()) {
  1332. if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
  1333. // For 2PC, we only rate limit prepare, not commit.
  1334. return Status::OK();
  1335. }
  1336. if (write_options.no_slowdown) {
  1337. return Status::Incomplete("Low priority write stall");
  1338. } else {
  1339. assert(my_batch != nullptr);
  1340. // Rate limit those writes. The reason that we don't completely wait
  1341. // is that in case the write is heavy, low pri writes may never have
  1342. // a chance to run. Now we guarantee we are still slowly making
  1343. // progress.
  1344. PERF_TIMER_GUARD(write_delay_time);
  1345. write_controller_.low_pri_rate_limiter()->Request(
  1346. my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
  1347. RateLimiter::OpType::kWrite);
  1348. }
  1349. }
  1350. return Status::OK();
  1351. }
  1352. void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
  1353. assert(cfds != nullptr);
  1354. if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
  1355. ColumnFamilyData* cfd_stats =
  1356. versions_->GetColumnFamilySet()->GetColumnFamily(
  1357. kPersistentStatsColumnFamilyName);
  1358. if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
  1359. for (ColumnFamilyData* cfd : *cfds) {
  1360. if (cfd == cfd_stats) {
  1361. // stats CF already included in cfds
  1362. return;
  1363. }
  1364. }
  1365. // force flush stats CF when its log number is less than all other CF's
  1366. // log numbers
  1367. bool force_flush_stats_cf = true;
  1368. for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
  1369. if (loop_cfd == cfd_stats) {
  1370. continue;
  1371. }
  1372. if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
  1373. force_flush_stats_cf = false;
  1374. }
  1375. }
  1376. if (force_flush_stats_cf) {
  1377. cfds->push_back(cfd_stats);
  1378. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1379. "Force flushing stats CF with automated flush "
  1380. "to avoid holding old logs");
  1381. }
  1382. }
  1383. }
  1384. }
  1385. Status DBImpl::TrimMemtableHistory(WriteContext* context) {
  1386. autovector<ColumnFamilyData*> cfds;
  1387. ColumnFamilyData* tmp_cfd;
  1388. while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
  1389. nullptr) {
  1390. cfds.push_back(tmp_cfd);
  1391. }
  1392. for (auto& cfd : cfds) {
  1393. autovector<MemTable*> to_delete;
  1394. cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
  1395. if (!to_delete.empty()) {
  1396. for (auto m : to_delete) {
  1397. delete m;
  1398. }
  1399. context->superversion_context.NewSuperVersion();
  1400. assert(context->superversion_context.new_superversion.get() != nullptr);
  1401. cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
  1402. }
  1403. if (cfd->UnrefAndTryDelete()) {
  1404. cfd = nullptr;
  1405. }
  1406. }
  1407. return Status::OK();
  1408. }
  1409. Status DBImpl::ScheduleFlushes(WriteContext* context) {
  1410. autovector<ColumnFamilyData*> cfds;
  1411. if (immutable_db_options_.atomic_flush) {
  1412. SelectColumnFamiliesForAtomicFlush(&cfds);
  1413. for (auto cfd : cfds) {
  1414. cfd->Ref();
  1415. }
  1416. flush_scheduler_.Clear();
  1417. } else {
  1418. ColumnFamilyData* tmp_cfd;
  1419. while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
  1420. cfds.push_back(tmp_cfd);
  1421. }
  1422. MaybeFlushStatsCF(&cfds);
  1423. }
  1424. Status status;
  1425. WriteThread::Writer nonmem_w;
  1426. if (two_write_queues_) {
  1427. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  1428. }
  1429. for (auto& cfd : cfds) {
  1430. if (!cfd->mem()->IsEmpty()) {
  1431. status = SwitchMemtable(cfd, context);
  1432. }
  1433. if (cfd->UnrefAndTryDelete()) {
  1434. cfd = nullptr;
  1435. }
  1436. if (!status.ok()) {
  1437. break;
  1438. }
  1439. }
  1440. if (two_write_queues_) {
  1441. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  1442. }
  1443. if (status.ok()) {
  1444. if (immutable_db_options_.atomic_flush) {
  1445. AssignAtomicFlushSeq(cfds);
  1446. }
  1447. FlushRequest flush_req;
  1448. GenerateFlushRequest(cfds, &flush_req);
  1449. SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
  1450. MaybeScheduleFlushOrCompaction();
  1451. }
  1452. return status;
  1453. }
  1454. #ifndef ROCKSDB_LITE
  1455. void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
  1456. const MemTableInfo& mem_table_info) {
  1457. if (immutable_db_options_.listeners.size() == 0U) {
  1458. return;
  1459. }
  1460. if (shutting_down_.load(std::memory_order_acquire)) {
  1461. return;
  1462. }
  1463. for (auto listener : immutable_db_options_.listeners) {
  1464. listener->OnMemTableSealed(mem_table_info);
  1465. }
  1466. }
  1467. #endif // ROCKSDB_LITE
  1468. // REQUIRES: mutex_ is held
  1469. // REQUIRES: this thread is currently at the front of the writer queue
  1470. // REQUIRES: this thread is currently at the front of the 2nd writer queue if
  1471. // two_write_queues_ is true (This is to simplify the reasoning.)
  1472. Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
  1473. mutex_.AssertHeld();
  1474. WriteThread::Writer nonmem_w;
  1475. std::unique_ptr<WritableFile> lfile;
  1476. log::Writer* new_log = nullptr;
  1477. MemTable* new_mem = nullptr;
  1478. // Recoverable state is persisted in WAL. After memtable switch, WAL might
  1479. // be deleted, so we write the state to memtable to be persisted as well.
  1480. Status s = WriteRecoverableState();
  1481. if (!s.ok()) {
  1482. return s;
  1483. }
  1484. // Attempt to switch to a new memtable and trigger flush of old.
  1485. // Do this without holding the dbmutex lock.
  1486. assert(versions_->prev_log_number() == 0);
  1487. if (two_write_queues_) {
  1488. log_write_mutex_.Lock();
  1489. }
  1490. bool creating_new_log = !log_empty_;
  1491. if (two_write_queues_) {
  1492. log_write_mutex_.Unlock();
  1493. }
  1494. uint64_t recycle_log_number = 0;
  1495. if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
  1496. !log_recycle_files_.empty()) {
  1497. recycle_log_number = log_recycle_files_.front();
  1498. }
  1499. uint64_t new_log_number =
  1500. creating_new_log ? versions_->NewFileNumber() : logfile_number_;
  1501. const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  1502. // Set memtable_info for memtable sealed callback
  1503. #ifndef ROCKSDB_LITE
  1504. MemTableInfo memtable_info;
  1505. memtable_info.cf_name = cfd->GetName();
  1506. memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
  1507. memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
  1508. memtable_info.num_entries = cfd->mem()->num_entries();
  1509. memtable_info.num_deletes = cfd->mem()->num_deletes();
  1510. #endif // ROCKSDB_LITE
  1511. // Log this later after lock release. It may be outdated, e.g., if background
  1512. // flush happens before logging, but that should be ok.
  1513. int num_imm_unflushed = cfd->imm()->NumNotFlushed();
  1514. const auto preallocate_block_size =
  1515. GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
  1516. mutex_.Unlock();
  1517. if (creating_new_log) {
  1518. // TODO: Write buffer size passed in should be max of all CF's instead
  1519. // of mutable_cf_options.write_buffer_size.
  1520. s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
  1521. &new_log);
  1522. }
  1523. if (s.ok()) {
  1524. SequenceNumber seq = versions_->LastSequence();
  1525. new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
  1526. context->superversion_context.NewSuperVersion();
  1527. }
  1528. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1529. "[%s] New memtable created with log file: #%" PRIu64
  1530. ". Immutable memtables: %d.\n",
  1531. cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
  1532. mutex_.Lock();
  1533. if (recycle_log_number != 0) {
  1534. // Since renaming the file is done outside DB mutex, we need to ensure
  1535. // concurrent full purges don't delete the file while we're recycling it.
  1536. // To achieve that we hold the old log number in the recyclable list until
  1537. // after it has been renamed.
  1538. assert(log_recycle_files_.front() == recycle_log_number);
  1539. log_recycle_files_.pop_front();
  1540. }
  1541. if (s.ok() && creating_new_log) {
  1542. log_write_mutex_.Lock();
  1543. assert(new_log != nullptr);
  1544. if (!logs_.empty()) {
  1545. // Alway flush the buffer of the last log before switching to a new one
  1546. log::Writer* cur_log_writer = logs_.back().writer;
  1547. s = cur_log_writer->WriteBuffer();
  1548. if (!s.ok()) {
  1549. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1550. "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
  1551. " WAL file\n",
  1552. cfd->GetName().c_str(), cur_log_writer->get_log_number(),
  1553. new_log_number);
  1554. }
  1555. }
  1556. if (s.ok()) {
  1557. logfile_number_ = new_log_number;
  1558. log_empty_ = true;
  1559. log_dir_synced_ = false;
  1560. logs_.emplace_back(logfile_number_, new_log);
  1561. alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
  1562. }
  1563. log_write_mutex_.Unlock();
  1564. }
  1565. if (!s.ok()) {
  1566. // how do we fail if we're not creating new log?
  1567. assert(creating_new_log);
  1568. if (new_mem) {
  1569. delete new_mem;
  1570. }
  1571. if (new_log) {
  1572. delete new_log;
  1573. }
  1574. SuperVersion* new_superversion =
  1575. context->superversion_context.new_superversion.release();
  1576. if (new_superversion != nullptr) {
  1577. delete new_superversion;
  1578. }
  1579. // We may have lost data from the WritableFileBuffer in-memory buffer for
  1580. // the current log, so treat it as a fatal error and set bg_error
  1581. error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
  1582. // Read back bg_error in order to get the right severity
  1583. s = error_handler_.GetBGError();
  1584. return s;
  1585. }
  1586. for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
  1587. // all this is just optimization to delete logs that
  1588. // are no longer needed -- if CF is empty, that means it
  1589. // doesn't need that particular log to stay alive, so we just
  1590. // advance the log number. no need to persist this in the manifest
  1591. if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
  1592. loop_cfd->imm()->NumNotFlushed() == 0) {
  1593. if (creating_new_log) {
  1594. loop_cfd->SetLogNumber(logfile_number_);
  1595. }
  1596. loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
  1597. }
  1598. }
  1599. cfd->mem()->SetNextLogNumber(logfile_number_);
  1600. cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
  1601. new_mem->Ref();
  1602. cfd->SetMemtable(new_mem);
  1603. InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
  1604. mutable_cf_options);
  1605. #ifndef ROCKSDB_LITE
  1606. mutex_.Unlock();
  1607. // Notify client that memtable is sealed, now that we have successfully
  1608. // installed a new memtable
  1609. NotifyOnMemTableSealed(cfd, memtable_info);
  1610. mutex_.Lock();
  1611. #endif // ROCKSDB_LITE
  1612. return s;
  1613. }
  1614. size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
  1615. mutex_.AssertHeld();
  1616. size_t bsize =
  1617. static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
  1618. // Some users might set very high write_buffer_size and rely on
  1619. // max_total_wal_size or other parameters to control the WAL size.
  1620. if (mutable_db_options_.max_total_wal_size > 0) {
  1621. bsize = std::min<size_t>(
  1622. bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
  1623. }
  1624. if (immutable_db_options_.db_write_buffer_size > 0) {
  1625. bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
  1626. }
  1627. if (immutable_db_options_.write_buffer_manager &&
  1628. immutable_db_options_.write_buffer_manager->enabled()) {
  1629. bsize = std::min<size_t>(
  1630. bsize, immutable_db_options_.write_buffer_manager->buffer_size());
  1631. }
  1632. return bsize;
  1633. }
  1634. // Default implementations of convenience methods that subclasses of DB
  1635. // can call if they wish
  1636. Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
  1637. const Slice& key, const Slice& value) {
  1638. if (nullptr == opt.timestamp) {
  1639. // Pre-allocate size of write batch conservatively.
  1640. // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  1641. // and we allocate 11 extra bytes for key length, as well as value length.
  1642. WriteBatch batch(key.size() + value.size() + 24);
  1643. Status s = batch.Put(column_family, key, value);
  1644. if (!s.ok()) {
  1645. return s;
  1646. }
  1647. return Write(opt, &batch);
  1648. }
  1649. const Slice* ts = opt.timestamp;
  1650. assert(nullptr != ts);
  1651. size_t ts_sz = ts->size();
  1652. WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
  1653. ts_sz);
  1654. Status s = batch.Put(column_family, key, value);
  1655. if (!s.ok()) {
  1656. return s;
  1657. }
  1658. s = batch.AssignTimestamp(*ts);
  1659. if (!s.ok()) {
  1660. return s;
  1661. }
  1662. return Write(opt, &batch);
  1663. }
  1664. Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
  1665. const Slice& key) {
  1666. WriteBatch batch;
  1667. batch.Delete(column_family, key);
  1668. return Write(opt, &batch);
  1669. }
  1670. Status DB::SingleDelete(const WriteOptions& opt,
  1671. ColumnFamilyHandle* column_family, const Slice& key) {
  1672. WriteBatch batch;
  1673. batch.SingleDelete(column_family, key);
  1674. return Write(opt, &batch);
  1675. }
  1676. Status DB::DeleteRange(const WriteOptions& opt,
  1677. ColumnFamilyHandle* column_family,
  1678. const Slice& begin_key, const Slice& end_key) {
  1679. WriteBatch batch;
  1680. batch.DeleteRange(column_family, begin_key, end_key);
  1681. return Write(opt, &batch);
  1682. }
  1683. Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
  1684. const Slice& key, const Slice& value) {
  1685. WriteBatch batch;
  1686. Status s = batch.Merge(column_family, key, value);
  1687. if (!s.ok()) {
  1688. return s;
  1689. }
  1690. return Write(opt, &batch);
  1691. }
  1692. } // namespace ROCKSDB_NAMESPACE