write_thread.cc 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/write_thread.h"
  6. #include <chrono>
  7. #include <thread>
  8. #include "db/column_family.h"
  9. #include "monitoring/perf_context_imp.h"
  10. #include "port/port.h"
  11. #include "test_util/sync_point.h"
  12. #include "util/random.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. WriteThread::WriteThread(const ImmutableDBOptions& db_options)
  15. : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
  16. ? db_options.write_thread_max_yield_usec
  17. : 0),
  18. slow_yield_usec_(db_options.write_thread_slow_yield_usec),
  19. allow_concurrent_memtable_write_(
  20. db_options.allow_concurrent_memtable_write),
  21. enable_pipelined_write_(db_options.enable_pipelined_write),
  22. max_write_batch_group_size_bytes(
  23. db_options.max_write_batch_group_size_bytes),
  24. newest_writer_(nullptr),
  25. newest_memtable_writer_(nullptr),
  26. last_sequence_(0),
  27. write_stall_dummy_(),
  28. stall_mu_(),
  29. stall_cv_(&stall_mu_) {}
  30. uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
  31. // We're going to block. Lazily create the mutex. We guarantee
  32. // propagation of this construction to the waker via the
  33. // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
  34. // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
  35. // we install below.
  36. w->CreateMutex();
  37. auto state = w->state.load(std::memory_order_acquire);
  38. assert(state != STATE_LOCKED_WAITING);
  39. if ((state & goal_mask) == 0 &&
  40. w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
  41. // we have permission (and an obligation) to use StateMutex
  42. std::unique_lock<std::mutex> guard(w->StateMutex());
  43. w->StateCV().wait(guard, [w] {
  44. return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
  45. });
  46. state = w->state.load(std::memory_order_relaxed);
  47. }
  48. // else tricky. Goal is met or CAS failed. In the latter case the waker
  49. // must have changed the state, and compare_exchange_strong has updated
  50. // our local variable with the new one. At the moment WriteThread never
  51. // waits for a transition across intermediate states, so we know that
  52. // since a state change has occurred the goal must have been met.
  53. assert((state & goal_mask) != 0);
  54. return state;
  55. }
  56. uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
  57. AdaptationContext* ctx) {
  58. uint8_t state = 0;
  59. // 1. Busy loop using "pause" for 1 micro sec
  60. // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
  61. // 3. Else blocking wait
  62. // On a modern Xeon each loop takes about 7 nanoseconds (most of which
  63. // is the effect of the pause instruction), so 200 iterations is a bit
  64. // more than a microsecond. This is long enough that waits longer than
  65. // this can amortize the cost of accessing the clock and yielding.
  66. for (uint32_t tries = 0; tries < 200; ++tries) {
  67. state = w->state.load(std::memory_order_acquire);
  68. if ((state & goal_mask) != 0) {
  69. return state;
  70. }
  71. port::AsmVolatilePause();
  72. }
  73. // This is below the fast path, so that the stat is zero when all writes are
  74. // from the same thread.
  75. PERF_TIMER_FOR_WAIT_GUARD(write_thread_wait_nanos);
  76. // If we're only going to end up waiting a short period of time,
  77. // it can be a lot more efficient to call std::this_thread::yield()
  78. // in a loop than to block in StateMutex(). For reference, on my 4.0
  79. // SELinux test server with support for syscall auditing enabled, the
  80. // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
  81. // 2.7 usec, and the average is more like 10 usec. That can be a big
  82. // drag on RockDB's single-writer design. Of course, spinning is a
  83. // bad idea if other threads are waiting to run or if we're going to
  84. // wait for a long time. How do we decide?
  85. //
  86. // We break waiting into 3 categories: short-uncontended,
  87. // short-contended, and long. If we had an oracle, then we would always
  88. // spin for short-uncontended, always block for long, and our choice for
  89. // short-contended might depend on whether we were trying to optimize
  90. // RocksDB throughput or avoid being greedy with system resources.
  91. //
  92. // Bucketing into short or long is easy by measuring elapsed time.
  93. // Differentiating short-uncontended from short-contended is a bit
  94. // trickier, but not too bad. We could look for involuntary context
  95. // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
  96. // (portability code and CPU) to just look for yield calls that take
  97. // longer than we expect. sched_yield() doesn't actually result in any
  98. // context switch overhead if there are no other runnable processes
  99. // on the current core, in which case it usually takes less than
  100. // a microsecond.
  101. //
  102. // There are two primary tunables here: the threshold between "short"
  103. // and "long" waits, and the threshold at which we suspect that a yield
  104. // is slow enough to indicate we should probably block. If these
  105. // thresholds are chosen well then CPU-bound workloads that don't
  106. // have more threads than cores will experience few context switches
  107. // (voluntary or involuntary), and the total number of context switches
  108. // (voluntary and involuntary) will not be dramatically larger (maybe
  109. // 2x) than the number of voluntary context switches that occur when
  110. // --max_yield_wait_micros=0.
  111. //
  112. // There's another constant, which is the number of slow yields we will
  113. // tolerate before reversing our previous decision. Solitary slow
  114. // yields are pretty common (low-priority small jobs ready to run),
  115. // so this should be at least 2. We set this conservatively to 3 so
  116. // that we can also immediately schedule a ctx adaptation, rather than
  117. // waiting for the next update_ctx.
  118. const size_t kMaxSlowYieldsWhileSpinning = 3;
  119. // Whether the yield approach has any credit in this context. The credit is
  120. // added by yield being succesfull before timing out, and decreased otherwise.
  121. auto& yield_credit = ctx->value;
  122. // Update the yield_credit based on sample runs or right after a hard failure
  123. bool update_ctx = false;
  124. // Should we reinforce the yield credit
  125. bool would_spin_again = false;
  126. // The samling base for updating the yeild credit. The sampling rate would be
  127. // 1/sampling_base.
  128. const int sampling_base = 256;
  129. if (max_yield_usec_ > 0) {
  130. update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
  131. if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
  132. // we're updating the adaptation statistics, or spinning has >
  133. // 50% chance of being shorter than max_yield_usec_ and causing no
  134. // involuntary context switches
  135. auto spin_begin = std::chrono::steady_clock::now();
  136. // this variable doesn't include the final yield (if any) that
  137. // causes the goal to be met
  138. size_t slow_yield_count = 0;
  139. auto iter_begin = spin_begin;
  140. while ((iter_begin - spin_begin) <=
  141. std::chrono::microseconds(max_yield_usec_)) {
  142. std::this_thread::yield();
  143. state = w->state.load(std::memory_order_acquire);
  144. if ((state & goal_mask) != 0) {
  145. // success
  146. would_spin_again = true;
  147. break;
  148. }
  149. auto now = std::chrono::steady_clock::now();
  150. if (now == iter_begin ||
  151. now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
  152. // conservatively count it as a slow yield if our clock isn't
  153. // accurate enough to measure the yield duration
  154. ++slow_yield_count;
  155. if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
  156. // Not just one ivcsw, but several. Immediately update yield_credit
  157. // and fall back to blocking
  158. update_ctx = true;
  159. break;
  160. }
  161. }
  162. iter_begin = now;
  163. }
  164. }
  165. }
  166. if ((state & goal_mask) == 0) {
  167. TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
  168. state = BlockingAwaitState(w, goal_mask);
  169. }
  170. if (update_ctx) {
  171. // Since our update is sample based, it is ok if a thread overwrites the
  172. // updates by other threads. Thus the update does not have to be atomic.
  173. auto v = yield_credit.load(std::memory_order_relaxed);
  174. // fixed point exponential decay with decay constant 1/1024, with +1
  175. // and -1 scaled to avoid overflow for int32_t
  176. //
  177. // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
  178. // 0.1%). If the sampled yield was successful, the credit is also increased
  179. // by X. Setting X=2^17 ensures that the credit never exceeds
  180. // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
  181. // logic applies to negative credits.
  182. v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
  183. yield_credit.store(v, std::memory_order_relaxed);
  184. }
  185. assert((state & goal_mask) != 0);
  186. return state;
  187. }
  188. void WriteThread::SetState(Writer* w, uint8_t new_state) {
  189. assert(w);
  190. auto state = w->state.load(std::memory_order_acquire);
  191. if (state == STATE_LOCKED_WAITING ||
  192. !w->state.compare_exchange_strong(state, new_state)) {
  193. assert(state == STATE_LOCKED_WAITING);
  194. std::lock_guard<std::mutex> guard(w->StateMutex());
  195. assert(w->state.load(std::memory_order_relaxed) != new_state);
  196. w->state.store(new_state, std::memory_order_relaxed);
  197. w->StateCV().notify_one();
  198. }
  199. }
  200. bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
  201. assert(newest_writer != nullptr);
  202. assert(w->state == STATE_INIT);
  203. Writer* writers = newest_writer->load(std::memory_order_relaxed);
  204. while (true) {
  205. assert(writers != w);
  206. // If write stall in effect, and w->no_slowdown is not true,
  207. // block here until stall is cleared. If its true, then return
  208. // immediately
  209. if (writers == &write_stall_dummy_) {
  210. if (w->no_slowdown) {
  211. w->status = Status::Incomplete("Write stall");
  212. SetState(w, STATE_COMPLETED);
  213. return false;
  214. }
  215. // Since no_slowdown is false, wait here to be notified of the write
  216. // stall clearing
  217. {
  218. MutexLock lock(&stall_mu_);
  219. writers = newest_writer->load(std::memory_order_relaxed);
  220. if (writers == &write_stall_dummy_) {
  221. TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
  222. stall_cv_.Wait();
  223. // Load newest_writers_ again since it may have changed
  224. writers = newest_writer->load(std::memory_order_relaxed);
  225. continue;
  226. }
  227. }
  228. }
  229. w->link_older = writers;
  230. if (newest_writer->compare_exchange_weak(writers, w)) {
  231. return (writers == nullptr);
  232. }
  233. }
  234. }
  235. bool WriteThread::LinkGroup(WriteGroup& write_group,
  236. std::atomic<Writer*>* newest_writer) {
  237. assert(newest_writer != nullptr);
  238. Writer* leader = write_group.leader;
  239. Writer* last_writer = write_group.last_writer;
  240. Writer* w = last_writer;
  241. while (true) {
  242. // Unset link_newer pointers to make sure when we call
  243. // CreateMissingNewerLinks later it create all missing links.
  244. w->link_newer = nullptr;
  245. w->write_group = nullptr;
  246. if (w == leader) {
  247. break;
  248. }
  249. w = w->link_older;
  250. }
  251. Writer* newest = newest_writer->load(std::memory_order_relaxed);
  252. while (true) {
  253. leader->link_older = newest;
  254. if (newest_writer->compare_exchange_weak(newest, last_writer)) {
  255. return (newest == nullptr);
  256. }
  257. }
  258. }
  259. void WriteThread::CreateMissingNewerLinks(Writer* head) {
  260. while (true) {
  261. Writer* next = head->link_older;
  262. if (next == nullptr || next->link_newer != nullptr) {
  263. assert(next == nullptr || next->link_newer == head);
  264. break;
  265. }
  266. next->link_newer = head;
  267. head = next;
  268. }
  269. }
  270. void WriteThread::CompleteLeader(WriteGroup& write_group) {
  271. assert(write_group.size > 0);
  272. Writer* leader = write_group.leader;
  273. if (write_group.size == 1) {
  274. write_group.leader = nullptr;
  275. write_group.last_writer = nullptr;
  276. } else {
  277. assert(leader->link_newer != nullptr);
  278. leader->link_newer->link_older = nullptr;
  279. write_group.leader = leader->link_newer;
  280. }
  281. write_group.size -= 1;
  282. SetState(leader, STATE_COMPLETED);
  283. }
  284. void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
  285. assert(write_group.size > 1);
  286. assert(w != write_group.leader);
  287. if (w == write_group.last_writer) {
  288. w->link_older->link_newer = nullptr;
  289. write_group.last_writer = w->link_older;
  290. } else {
  291. w->link_older->link_newer = w->link_newer;
  292. w->link_newer->link_older = w->link_older;
  293. }
  294. write_group.size -= 1;
  295. SetState(w, STATE_COMPLETED);
  296. }
  297. void WriteThread::BeginWriteStall() {
  298. ++stall_begun_count_;
  299. LinkOne(&write_stall_dummy_, &newest_writer_);
  300. // Walk writer list until w->write_group != nullptr. The current write group
  301. // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
  302. // point
  303. Writer* w = write_stall_dummy_.link_older;
  304. Writer* prev = &write_stall_dummy_;
  305. while (w != nullptr && w->write_group == nullptr) {
  306. if (w->no_slowdown) {
  307. prev->link_older = w->link_older;
  308. w->status = Status::Incomplete("Write stall");
  309. SetState(w, STATE_COMPLETED);
  310. // Only update `link_newer` if it's already set.
  311. // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
  312. // which assumes the the first non-nullptr `link_newer` is the last
  313. // nullptr link in the writer list.
  314. // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
  315. // updating the whole list when it sees the first non nullptr link.
  316. if (prev->link_older && prev->link_older->link_newer) {
  317. prev->link_older->link_newer = prev;
  318. }
  319. w = prev->link_older;
  320. } else {
  321. prev = w;
  322. w = w->link_older;
  323. }
  324. }
  325. }
  326. void WriteThread::EndWriteStall() {
  327. MutexLock lock(&stall_mu_);
  328. // Unlink write_stall_dummy_ from the write queue. This will unblock
  329. // pending write threads to enqueue themselves
  330. assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
  331. // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been
  332. // called.
  333. if (write_stall_dummy_.link_older) {
  334. write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
  335. }
  336. newest_writer_.exchange(write_stall_dummy_.link_older);
  337. ++stall_ended_count_;
  338. // Wake up writers
  339. stall_cv_.SignalAll();
  340. }
  341. uint64_t WriteThread::GetBegunCountOfOutstandingStall() {
  342. if (stall_begun_count_ > stall_ended_count_) {
  343. // Oustanding stall in queue
  344. assert(newest_writer_.load(std::memory_order_relaxed) ==
  345. &write_stall_dummy_);
  346. return stall_begun_count_;
  347. } else {
  348. // No stall in queue
  349. assert(newest_writer_.load(std::memory_order_relaxed) !=
  350. &write_stall_dummy_);
  351. return 0;
  352. }
  353. }
  354. void WriteThread::WaitForStallEndedCount(uint64_t stall_count) {
  355. MutexLock lock(&stall_mu_);
  356. while (stall_ended_count_ < stall_count) {
  357. stall_cv_.Wait();
  358. }
  359. }
  360. static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
  361. void WriteThread::JoinBatchGroup(Writer* w) {
  362. TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
  363. assert(w->batch != nullptr);
  364. bool linked_as_leader = LinkOne(w, &newest_writer_);
  365. w->CheckWriteEnqueuedCallback();
  366. if (linked_as_leader) {
  367. SetState(w, STATE_GROUP_LEADER);
  368. }
  369. TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
  370. TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
  371. if (!linked_as_leader) {
  372. /**
  373. * Wait util:
  374. * 1) An existing leader pick us as the new leader when it finishes
  375. * 2) An existing leader pick us as its follewer and
  376. * 2.1) finishes the memtable writes on our behalf
  377. * 2.2) Or tell us to finish the memtable writes in pralallel
  378. * 3) (pipelined write) An existing leader pick us as its follower and
  379. * finish book-keeping and WAL write for us, enqueue us as pending
  380. * memtable writer, and
  381. * 3.1) we become memtable writer group leader, or
  382. * 3.2) an existing memtable writer group leader tell us to finish memtable
  383. * writes in parallel.
  384. */
  385. TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
  386. AwaitState(w,
  387. STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
  388. STATE_PARALLEL_MEMTABLE_CALLER |
  389. STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
  390. &jbg_ctx);
  391. TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
  392. }
  393. }
  394. size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
  395. WriteGroup* write_group) {
  396. assert(leader->link_older == nullptr);
  397. assert(leader->batch != nullptr);
  398. assert(write_group != nullptr);
  399. size_t size = WriteBatchInternal::ByteSize(leader->batch);
  400. // Allow the group to grow up to a maximum size, but if the
  401. // original write is small, limit the growth so we do not slow
  402. // down the small write too much.
  403. size_t max_size = max_write_batch_group_size_bytes;
  404. const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
  405. if (size <= min_batch_size_bytes) {
  406. max_size = size + min_batch_size_bytes;
  407. }
  408. leader->write_group = write_group;
  409. write_group->leader = leader;
  410. write_group->last_writer = leader;
  411. write_group->size = 1;
  412. Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
  413. // This is safe regardless of any db mutex status of the caller. Previous
  414. // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
  415. // (they emptied the list and then we added ourself as leader) or had to
  416. // explicitly wake us up (the list was non-empty when we added ourself,
  417. // so we have already received our MarkJoined).
  418. CreateMissingNewerLinks(newest_writer);
  419. // This comment illustrates how the rest of the function works using an
  420. // example. Notation:
  421. //
  422. // - Items are `Writer`s
  423. // - Items prefixed by "@" have been included in `write_group`
  424. // - Items prefixed by "*" have compatible options with `leader`, but have not
  425. // been included in `write_group` yet
  426. // - Items after several spaces are in `r_list`. These have incompatible
  427. // options with `leader` and are temporarily separated from the main list.
  428. //
  429. // Each line below depicts the state of the linked lists at the beginning of
  430. // an iteration of the while-loop.
  431. //
  432. // @leader, n1, *n2, n3, *newest_writer
  433. // @leader, *n2, n3, *newest_writer, n1
  434. // @leader, @n2, n3, *newest_writer, n1
  435. //
  436. // After the while-loop, the `r_list` is grafted back onto the main list.
  437. //
  438. // case A: no new `Writer`s arrived
  439. // @leader, @n2, @newest_writer, n1, n3
  440. // @leader, @n2, @newest_writer, n1, n3
  441. //
  442. // case B: a new `Writer` (n4) arrived
  443. // @leader, @n2, @newest_writer, n4 n1, n3
  444. // @leader, @n2, @newest_writer, n1, n3, n4
  445. // Tricky. Iteration start (leader) is exclusive and finish
  446. // (newest_writer) is inclusive. Iteration goes from old to new.
  447. Writer* w = leader;
  448. // write_group end
  449. Writer* we = leader;
  450. // declare r_list
  451. Writer* rb = nullptr;
  452. Writer* re = nullptr;
  453. while (w != newest_writer) {
  454. assert(w->link_newer);
  455. w = w->link_newer;
  456. if ((w->sync && !leader->sync) ||
  457. // Do not include a sync write into a batch handled by a non-sync write.
  458. (w->no_slowdown != leader->no_slowdown) ||
  459. // Do not mix writes that are ok with delays with the ones that request
  460. // fail on delays.
  461. (w->disable_wal != leader->disable_wal) ||
  462. // Do not mix writes that enable WAL with the ones whose WAL disabled.
  463. (w->protection_bytes_per_key != leader->protection_bytes_per_key) ||
  464. // Do not mix writes with different levels of integrity protection.
  465. (w->rate_limiter_priority != leader->rate_limiter_priority) ||
  466. // Do not mix writes with different rate limiter priorities.
  467. (w->batch == nullptr) ||
  468. // Do not include those writes with nullptr batch. Those are not writes
  469. // those are something else. They want to be alone
  470. (w->callback != nullptr && !w->callback->AllowWriteBatching()) ||
  471. // dont batch writes that don't want to be batched
  472. (size + WriteBatchInternal::ByteSize(w->batch) > max_size) ||
  473. // Do not make batch too big
  474. (leader->ingest_wbwi || w->ingest_wbwi)
  475. // ingesting WBWI needs to be its own group
  476. ) {
  477. // remove from list
  478. w->link_older->link_newer = w->link_newer;
  479. if (w->link_newer != nullptr) {
  480. w->link_newer->link_older = w->link_older;
  481. }
  482. // insert into r_list
  483. if (re == nullptr) {
  484. rb = re = w;
  485. w->link_older = nullptr;
  486. } else {
  487. w->link_older = re;
  488. re->link_newer = w;
  489. re = w;
  490. }
  491. } else {
  492. // grow up
  493. we = w;
  494. w->write_group = write_group;
  495. size += WriteBatchInternal::ByteSize(w->batch);
  496. write_group->last_writer = w;
  497. write_group->size++;
  498. }
  499. }
  500. // append r_list after write_group end
  501. if (rb != nullptr) {
  502. rb->link_older = we;
  503. re->link_newer = nullptr;
  504. we->link_newer = rb;
  505. if (!newest_writer_.compare_exchange_weak(w, re)) {
  506. while (w->link_older != newest_writer) {
  507. w = w->link_older;
  508. }
  509. w->link_older = re;
  510. }
  511. }
  512. TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
  513. return size;
  514. }
  515. void WriteThread::EnterAsMemTableWriter(Writer* leader,
  516. WriteGroup* write_group) {
  517. assert(leader != nullptr);
  518. assert(leader->link_older == nullptr);
  519. assert(leader->batch != nullptr);
  520. assert(write_group != nullptr);
  521. size_t size = WriteBatchInternal::ByteSize(leader->batch);
  522. // Allow the group to grow up to a maximum size, but if the
  523. // original write is small, limit the growth so we do not slow
  524. // down the small write too much.
  525. size_t max_size = max_write_batch_group_size_bytes;
  526. const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
  527. if (size <= min_batch_size_bytes) {
  528. max_size = size + min_batch_size_bytes;
  529. }
  530. leader->write_group = write_group;
  531. write_group->leader = leader;
  532. write_group->size = 1;
  533. Writer* last_writer = leader;
  534. if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
  535. Writer* newest_writer = newest_memtable_writer_.load();
  536. CreateMissingNewerLinks(newest_writer);
  537. Writer* w = leader;
  538. while (w != newest_writer) {
  539. assert(w->link_newer);
  540. w = w->link_newer;
  541. if (w->batch == nullptr) {
  542. break;
  543. }
  544. if (w->batch->HasMerge()) {
  545. break;
  546. }
  547. if (!allow_concurrent_memtable_write_) {
  548. auto batch_size = WriteBatchInternal::ByteSize(w->batch);
  549. if (size + batch_size > max_size) {
  550. // Do not make batch too big
  551. break;
  552. }
  553. size += batch_size;
  554. }
  555. w->write_group = write_group;
  556. last_writer = w;
  557. write_group->size++;
  558. }
  559. }
  560. write_group->last_writer = last_writer;
  561. write_group->last_sequence =
  562. last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
  563. }
  564. void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
  565. WriteGroup& write_group) {
  566. Writer* leader = write_group.leader;
  567. Writer* last_writer = write_group.last_writer;
  568. Writer* newest_writer = last_writer;
  569. if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
  570. nullptr)) {
  571. CreateMissingNewerLinks(newest_writer);
  572. Writer* next_leader = last_writer->link_newer;
  573. assert(next_leader != nullptr);
  574. next_leader->link_older = nullptr;
  575. SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
  576. }
  577. Writer* w = leader;
  578. while (true) {
  579. if (!write_group.status.ok()) {
  580. w->status = write_group.status;
  581. }
  582. Writer* next = w->link_newer;
  583. if (w != leader) {
  584. SetState(w, STATE_COMPLETED);
  585. }
  586. if (w == last_writer) {
  587. break;
  588. }
  589. assert(next);
  590. w = next;
  591. }
  592. // Note that leader has to exit last, since it owns the write group.
  593. SetState(leader, STATE_COMPLETED);
  594. }
  595. void WriteThread::SetMemWritersEachStride(Writer* w) {
  596. WriteGroup* write_group = w->write_group;
  597. Writer* last_writer = write_group->last_writer;
  598. // The stride is the same for each writer in write_group, so w will
  599. // call the writers with the same number in write_group mod total size
  600. size_t stride = static_cast<size_t>(std::sqrt(write_group->size));
  601. size_t count = 0;
  602. while (w) {
  603. if (count++ % stride == 0) {
  604. SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
  605. }
  606. w = (w == last_writer) ? nullptr : w->link_newer;
  607. }
  608. }
  609. void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
  610. assert(write_group != nullptr);
  611. size_t group_size = write_group->size;
  612. write_group->running.store(group_size);
  613. // The minimum number to allow the group use parallel caller mode.
  614. // The number must no lower than 3;
  615. const size_t MinParallelSize = 20;
  616. // The group_size is too small, and there is no need to have
  617. // the parallel partial callers.
  618. if (group_size < MinParallelSize) {
  619. for (auto w : *write_group) {
  620. SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
  621. }
  622. return;
  623. }
  624. // The stride is equal to std::sqrt(group_size) which can minimize
  625. // the total number of leader SetSate.
  626. // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set
  627. // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER.
  628. size_t stride = static_cast<size_t>(std::sqrt(group_size));
  629. auto w = write_group->leader;
  630. SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
  631. for (size_t i = 1; i < stride; i++) {
  632. w = w->link_newer;
  633. SetState(w, STATE_PARALLEL_MEMTABLE_CALLER);
  634. }
  635. // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also
  636. // does the job as STATE_PARALLEL_MEMTABLE_CALLER.
  637. w = w->link_newer;
  638. SetMemWritersEachStride(w);
  639. }
  640. static WriteThread::AdaptationContext cpmtw_ctx(
  641. "CompleteParallelMemTableWriter");
  642. // This method is called by both the leader and parallel followers
  643. bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
  644. auto* write_group = w->write_group;
  645. if (!w->status.ok()) {
  646. std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
  647. write_group->status = w->status;
  648. }
  649. if (write_group->running-- > 1) {
  650. // we're not the last one
  651. AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
  652. return false;
  653. }
  654. // else we're the last parallel worker and should perform exit duties.
  655. w->status = write_group->status;
  656. // Callers of this function must ensure w->status is checked.
  657. write_group->status.PermitUncheckedError();
  658. return true;
  659. }
  660. void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
  661. auto* write_group = w->write_group;
  662. assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
  663. assert(write_group->status.ok());
  664. ExitAsBatchGroupLeader(*write_group, write_group->status);
  665. assert(w->status.ok());
  666. assert(w->state == STATE_COMPLETED);
  667. SetState(write_group->leader, STATE_COMPLETED);
  668. }
  669. static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
  670. void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
  671. Status& status) {
  672. TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
  673. &write_group);
  674. Writer* leader = write_group.leader;
  675. Writer* last_writer = write_group.last_writer;
  676. assert(leader->link_older == nullptr);
  677. // If status is non-ok already, then write_group.status won't have the chance
  678. // of being propagated to caller.
  679. if (!status.ok()) {
  680. write_group.status.PermitUncheckedError();
  681. }
  682. // Propagate memtable write error to the whole group.
  683. if (status.ok() && !write_group.status.ok()) {
  684. status = write_group.status;
  685. }
  686. if (enable_pipelined_write_) {
  687. // We insert a dummy Writer right before our current write_group. This
  688. // allows us to unlink our write_group without the risk that a subsequent
  689. // writer becomes a new leader and might overtake us and add itself to the
  690. // memtable-writer-list before we can do so. This ensures that writers are
  691. // added to the memtable-writer-list in the exact same order in which they
  692. // were in the newest_writer list.
  693. // This must happen before completing the writers from our group to prevent
  694. // a race where the owning thread of one of these writers can start a new
  695. // write operation.
  696. Writer dummy;
  697. Writer* head = newest_writer_.load(std::memory_order_acquire);
  698. if (head != last_writer ||
  699. !newest_writer_.compare_exchange_strong(head, &dummy)) {
  700. // Either last_writer wasn't the head during the load(), or it was the
  701. // head during the load() but somebody else pushed onto the list before
  702. // we did the compare_exchange_strong (causing it to fail). In the latter
  703. // case compare_exchange_strong has the effect of re-reading its first
  704. // param (head). No need to retry a failing CAS, because only a departing
  705. // leader (which we are at the moment) can remove nodes from the list.
  706. assert(head != last_writer);
  707. // After walking link_older starting from head (if not already done) we
  708. // will be able to traverse w->link_newer below.
  709. CreateMissingNewerLinks(head);
  710. assert(last_writer->link_newer != nullptr);
  711. last_writer->link_newer->link_older = &dummy;
  712. dummy.link_newer = last_writer->link_newer;
  713. }
  714. // Complete writers that don't write to memtable
  715. for (Writer* w = last_writer; w != leader;) {
  716. Writer* next = w->link_older;
  717. w->status = status;
  718. if (!w->ShouldWriteToMemtable()) {
  719. CompleteFollower(w, write_group);
  720. }
  721. w = next;
  722. }
  723. if (!leader->ShouldWriteToMemtable()) {
  724. CompleteLeader(write_group);
  725. }
  726. TEST_SYNC_POINT_CALLBACK(
  727. "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
  728. &write_group);
  729. // Link the remaining of the group to memtable writer list.
  730. // We have to link our group to memtable writer queue before wake up the
  731. // next leader or set newest_writer_ to null, otherwise the next leader
  732. // can run ahead of us and link to memtable writer queue before we do.
  733. if (write_group.size > 0) {
  734. if (LinkGroup(write_group, &newest_memtable_writer_)) {
  735. // The leader can now be different from current writer.
  736. SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
  737. }
  738. }
  739. // Unlink the dummy writer from the list and identify the new leader
  740. head = newest_writer_.load(std::memory_order_acquire);
  741. if (head != &dummy ||
  742. !newest_writer_.compare_exchange_strong(head, nullptr)) {
  743. CreateMissingNewerLinks(head);
  744. Writer* new_leader = dummy.link_newer;
  745. assert(new_leader != nullptr);
  746. new_leader->link_older = nullptr;
  747. SetState(new_leader, STATE_GROUP_LEADER);
  748. }
  749. AwaitState(leader,
  750. STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER |
  751. STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
  752. &eabgl_ctx);
  753. } else {
  754. Writer* head = newest_writer_.load(std::memory_order_acquire);
  755. if (head != last_writer ||
  756. !newest_writer_.compare_exchange_strong(head, nullptr)) {
  757. // Either last_writer wasn't the head during the load(), or it was the
  758. // head during the load() but somebody else pushed onto the list before
  759. // we did the compare_exchange_strong (causing it to fail). In the
  760. // latter case compare_exchange_strong has the effect of re-reading
  761. // its first param (head). No need to retry a failing CAS, because
  762. // only a departing leader (which we are at the moment) can remove
  763. // nodes from the list.
  764. assert(head != last_writer);
  765. // After walking link_older starting from head (if not already done)
  766. // we will be able to traverse w->link_newer below. This function
  767. // can only be called from an active leader, only a leader can
  768. // clear newest_writer_, we didn't, and only a clear newest_writer_
  769. // could cause the next leader to start their work without a call
  770. // to MarkJoined, so we can definitely conclude that no other leader
  771. // work is going on here (with or without db mutex).
  772. CreateMissingNewerLinks(head);
  773. assert(last_writer->link_newer != nullptr);
  774. assert(last_writer->link_newer->link_older == last_writer);
  775. last_writer->link_newer->link_older = nullptr;
  776. // Next leader didn't self-identify, because newest_writer_ wasn't
  777. // nullptr when they enqueued (we were definitely enqueued before them
  778. // and are still in the list). That means leader handoff occurs when
  779. // we call MarkJoined
  780. SetState(last_writer->link_newer, STATE_GROUP_LEADER);
  781. }
  782. // else nobody else was waiting, although there might already be a new
  783. // leader now
  784. while (last_writer != leader) {
  785. assert(last_writer);
  786. last_writer->status = status;
  787. // we need to read link_older before calling SetState, because as soon
  788. // as it is marked committed the other thread's Await may return and
  789. // deallocate the Writer.
  790. auto next = last_writer->link_older;
  791. SetState(last_writer, STATE_COMPLETED);
  792. last_writer = next;
  793. }
  794. }
  795. }
  796. static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
  797. void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
  798. assert(w != nullptr && w->batch == nullptr);
  799. mu->Unlock();
  800. bool linked_as_leader = LinkOne(w, &newest_writer_);
  801. if (!linked_as_leader) {
  802. TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
  803. // Last leader will not pick us as a follower since our batch is nullptr
  804. AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
  805. }
  806. if (enable_pipelined_write_) {
  807. WaitForMemTableWriters();
  808. }
  809. mu->Lock();
  810. }
  811. void WriteThread::ExitUnbatched(Writer* w) {
  812. assert(w != nullptr);
  813. Writer* newest_writer = w;
  814. if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
  815. CreateMissingNewerLinks(newest_writer);
  816. Writer* next_leader = w->link_newer;
  817. assert(next_leader != nullptr);
  818. next_leader->link_older = nullptr;
  819. SetState(next_leader, STATE_GROUP_LEADER);
  820. }
  821. }
  822. static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
  823. void WriteThread::WaitForMemTableWriters() {
  824. assert(enable_pipelined_write_);
  825. if (newest_memtable_writer_.load() == nullptr) {
  826. return;
  827. }
  828. Writer w;
  829. if (!LinkOne(&w, &newest_memtable_writer_)) {
  830. AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
  831. }
  832. newest_memtable_writer_.store(nullptr);
  833. }
  834. } // namespace ROCKSDB_NAMESPACE