write_thread.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <atomic>
  7. #include <cassert>
  8. #include <chrono>
  9. #include <condition_variable>
  10. #include <cstdint>
  11. #include <mutex>
  12. #include <type_traits>
  13. #include <vector>
  14. #include "db/dbformat.h"
  15. #include "db/post_memtable_callback.h"
  16. #include "db/pre_release_callback.h"
  17. #include "db/write_callback.h"
  18. #include "monitoring/instrumented_mutex.h"
  19. #include "rocksdb/options.h"
  20. #include "rocksdb/status.h"
  21. #include "rocksdb/types.h"
  22. #include "rocksdb/user_write_callback.h"
  23. #include "rocksdb/write_batch.h"
  24. #include "util/aligned_storage.h"
  25. #include "util/autovector.h"
  26. namespace ROCKSDB_NAMESPACE {
  27. class WriteThread {
  28. public:
  29. enum State : uint8_t {
  30. // The initial state of a writer. This is a Writer that is
  31. // waiting in JoinBatchGroup. This state can be left when another
  32. // thread informs the waiter that it has become a group leader
  33. // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
  34. // non-parallel informs a follower that its writes have been committed
  35. // (-> STATE_COMPLETED), or when a leader that has chosen to perform
  36. // updates in parallel and needs this Writer to apply its batch (->
  37. // STATE_PARALLEL_MEMTABLE_WRITER).
  38. STATE_INIT = 1,
  39. // The state used to inform a waiting Writer that it has become the
  40. // leader, and it should now build a write batch group. Tricky:
  41. // this state is not used if newest_writer_ is empty when a writer
  42. // enqueues itself, because there is no need to wait (or even to
  43. // create the mutex and condvar used to wait) in that case. This is
  44. // a terminal state unless the leader chooses to make this a parallel
  45. // batch, in which case the last parallel worker to finish will move
  46. // the leader to STATE_COMPLETED.
  47. STATE_GROUP_LEADER = 2,
  48. // The state used to inform a waiting writer that it has become the
  49. // leader of memtable writer group. The leader will either write
  50. // memtable for the whole group, or launch a parallel group write
  51. // to memtable by calling LaunchParallelMemTableWrite.
  52. STATE_MEMTABLE_WRITER_LEADER = 4,
  53. // The state used to inform a waiting writer that it has become a
  54. // parallel memtable writer. It can be the group leader who launch the
  55. // parallel writer group, or one of the followers. The writer should then
  56. // apply its batch to the memtable concurrently and call
  57. // CompleteParallelMemTableWriter.
  58. STATE_PARALLEL_MEMTABLE_WRITER = 8,
  59. // A follower whose writes have been applied, or a parallel leader
  60. // whose followers have all finished their work. This is a terminal
  61. // state.
  62. STATE_COMPLETED = 16,
  63. // A state indicating that the thread may be waiting using StateMutex()
  64. // and StateCondVar()
  65. STATE_LOCKED_WAITING = 32,
  66. // The state used to inform a waiting writer that it has become a
  67. // caller to call some other waiting writers to write to memtable
  68. // by calling SetMemWritersEachStride. After doing
  69. // this, it will also write to memtable.
  70. STATE_PARALLEL_MEMTABLE_CALLER = 64,
  71. };
  72. struct Writer;
  73. struct WriteGroup {
  74. Writer* leader = nullptr;
  75. Writer* last_writer = nullptr;
  76. SequenceNumber last_sequence;
  77. // before running goes to zero, status needs leader->StateMutex()
  78. Status status;
  79. std::atomic<size_t> running;
  80. size_t size = 0;
  81. struct Iterator {
  82. Writer* writer;
  83. Writer* last_writer;
  84. explicit Iterator(Writer* w, Writer* last)
  85. : writer(w), last_writer(last) {}
  86. Writer* operator*() const { return writer; }
  87. Iterator& operator++() {
  88. assert(writer != nullptr);
  89. if (writer == last_writer) {
  90. writer = nullptr;
  91. } else {
  92. writer = writer->link_newer;
  93. }
  94. return *this;
  95. }
  96. bool operator!=(const Iterator& other) const {
  97. return writer != other.writer;
  98. }
  99. };
  100. Iterator begin() const { return Iterator(leader, last_writer); }
  101. Iterator end() const { return Iterator(nullptr, nullptr); }
  102. };
  103. // Information kept for every waiting writer.
  104. struct Writer {
  105. WriteBatch* batch;
  106. bool sync;
  107. bool no_slowdown;
  108. bool disable_wal;
  109. Env::IOPriority rate_limiter_priority;
  110. bool disable_memtable;
  111. size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
  112. size_t protection_bytes_per_key;
  113. PreReleaseCallback* pre_release_callback;
  114. PostMemTableCallback* post_memtable_callback;
  115. uint64_t wal_used; // log number that this batch was inserted into
  116. uint64_t log_ref; // log number that memtable insert should reference
  117. WriteCallback* callback;
  118. UserWriteCallback* user_write_cb;
  119. bool made_waitable; // records lazy construction of mutex and cv
  120. std::atomic<uint8_t> state; // write under StateMutex() or pre-link
  121. WriteGroup* write_group;
  122. SequenceNumber sequence; // the sequence number to use for the first key
  123. Status status;
  124. Status callback_status; // status returned by callback->Callback()
  125. aligned_storage<std::mutex>::type state_mutex_bytes;
  126. aligned_storage<std::condition_variable>::type state_cv_bytes;
  127. Writer* link_older; // read/write only before linking, or as leader
  128. Writer* link_newer; // lazy, read/write only before linking, or as leader
  129. bool ingest_wbwi;
  130. Writer()
  131. : batch(nullptr),
  132. sync(false),
  133. no_slowdown(false),
  134. disable_wal(false),
  135. rate_limiter_priority(Env::IOPriority::IO_TOTAL),
  136. disable_memtable(false),
  137. batch_cnt(0),
  138. protection_bytes_per_key(0),
  139. pre_release_callback(nullptr),
  140. post_memtable_callback(nullptr),
  141. wal_used(0),
  142. log_ref(0),
  143. callback(nullptr),
  144. user_write_cb(nullptr),
  145. made_waitable(false),
  146. state(STATE_INIT),
  147. write_group(nullptr),
  148. sequence(kMaxSequenceNumber),
  149. link_older(nullptr),
  150. link_newer(nullptr) {}
  151. Writer(const WriteOptions& write_options, WriteBatch* _batch,
  152. WriteCallback* _callback, UserWriteCallback* _user_write_cb,
  153. uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0,
  154. PreReleaseCallback* _pre_release_callback = nullptr,
  155. PostMemTableCallback* _post_memtable_callback = nullptr,
  156. bool _ingest_wbwi = false)
  157. : batch(_batch),
  158. // TODO: store a copy of WriteOptions instead of its separated data
  159. // members
  160. sync(write_options.sync),
  161. no_slowdown(write_options.no_slowdown),
  162. disable_wal(write_options.disableWAL),
  163. rate_limiter_priority(write_options.rate_limiter_priority),
  164. disable_memtable(_disable_memtable),
  165. batch_cnt(_batch_cnt),
  166. protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),
  167. pre_release_callback(_pre_release_callback),
  168. post_memtable_callback(_post_memtable_callback),
  169. wal_used(0),
  170. log_ref(_log_ref),
  171. callback(_callback),
  172. user_write_cb(_user_write_cb),
  173. made_waitable(false),
  174. state(STATE_INIT),
  175. write_group(nullptr),
  176. sequence(kMaxSequenceNumber),
  177. link_older(nullptr),
  178. link_newer(nullptr),
  179. ingest_wbwi(_ingest_wbwi) {}
  180. ~Writer() {
  181. if (made_waitable) {
  182. StateMutex().~mutex();
  183. StateCV().~condition_variable();
  184. }
  185. status.PermitUncheckedError();
  186. callback_status.PermitUncheckedError();
  187. }
  188. bool CheckCallback(DB* db) {
  189. if (callback != nullptr) {
  190. callback_status = callback->Callback(db);
  191. }
  192. return callback_status.ok();
  193. }
  194. void CheckWriteEnqueuedCallback() {
  195. if (user_write_cb != nullptr) {
  196. user_write_cb->OnWriteEnqueued();
  197. }
  198. }
  199. void CheckPostWalWriteCallback() {
  200. if (user_write_cb != nullptr) {
  201. user_write_cb->OnWalWriteFinish();
  202. }
  203. }
  204. void CreateMutex() {
  205. if (!made_waitable) {
  206. // Note that made_waitable is tracked separately from state
  207. // transitions, because we can't atomically create the mutex and
  208. // link into the list.
  209. made_waitable = true;
  210. new (&state_mutex_bytes) std::mutex;
  211. new (&state_cv_bytes) std::condition_variable;
  212. }
  213. }
  214. // returns the aggregate status of this Writer
  215. Status FinalStatus() {
  216. if (!status.ok()) {
  217. // a non-ok memtable write status takes presidence
  218. assert(callback == nullptr || callback_status.ok());
  219. return status;
  220. } else if (!callback_status.ok()) {
  221. // if the callback failed then that is the status we want
  222. // because a memtable insert should not have been attempted
  223. assert(callback != nullptr);
  224. assert(status.ok());
  225. return callback_status;
  226. } else {
  227. // if there is no callback then we only care about
  228. // the memtable insert status
  229. assert(callback == nullptr || callback_status.ok());
  230. return status;
  231. }
  232. }
  233. bool CallbackFailed() {
  234. return (callback != nullptr) && !callback_status.ok();
  235. }
  236. bool ShouldWriteToMemtable() {
  237. return status.ok() && !CallbackFailed() && !disable_memtable;
  238. }
  239. bool ShouldWriteToWAL() {
  240. return status.ok() && !CallbackFailed() && !disable_wal;
  241. }
  242. // No other mutexes may be acquired while holding StateMutex(), it is
  243. // always last in the order
  244. std::mutex& StateMutex() {
  245. assert(made_waitable);
  246. return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
  247. }
  248. std::condition_variable& StateCV() {
  249. assert(made_waitable);
  250. return *static_cast<std::condition_variable*>(
  251. static_cast<void*>(&state_cv_bytes));
  252. }
  253. };
  254. struct AdaptationContext {
  255. const char* name;
  256. std::atomic<int32_t> value;
  257. explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
  258. };
  259. explicit WriteThread(const ImmutableDBOptions& db_options);
  260. virtual ~WriteThread() = default;
  261. // IMPORTANT: None of the methods in this class rely on the db mutex
  262. // for correctness. All of the methods except JoinBatchGroup and
  263. // EnterUnbatched may be called either with or without the db mutex held.
  264. // Correctness is maintained by ensuring that only a single thread is
  265. // a leader at a time.
  266. // Registers w as ready to become part of a batch group, waits until the
  267. // caller should perform some work, and returns the current state of the
  268. // writer. If w has become the leader of a write batch group, returns
  269. // STATE_GROUP_LEADER. If w has been made part of a sequential batch
  270. // group and the leader has performed the write, returns STATE_DONE.
  271. // If w has been made part of a parallel batch group and is responsible
  272. // for updating the memtable, returns STATE_PARALLEL_MEMTABLE_WRITER.
  273. //
  274. // The db mutex SHOULD NOT be held when calling this function, because
  275. // it will block.
  276. //
  277. // Writer* w: Writer to be executed as part of a batch group
  278. void JoinBatchGroup(Writer* w);
  279. // Constructs a write batch group led by leader, which should be a
  280. // Writer passed to JoinBatchGroup on the current thread.
  281. //
  282. // Writer* leader: Writer that is STATE_GROUP_LEADER
  283. // WriteGroup* write_group: Out-param of group members
  284. // returns: Total batch group byte size
  285. size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
  286. // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
  287. // and wakes up the next leader (if any).
  288. //
  289. // WriteGroup* write_group: the write group
  290. // Status status: Status of write operation
  291. void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
  292. // Exit batch group on behalf of batch group leader.
  293. void ExitAsBatchGroupFollower(Writer* w);
  294. // Constructs a write batch group led by leader from newest_memtable_writers_
  295. // list. The leader should either write memtable for the whole group and
  296. // call ExitAsMemTableWriter, or launch parallel memtable write through
  297. // LaunchParallelMemTableWriters.
  298. void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
  299. // Memtable writer group leader, or the last finished writer in a parallel
  300. // write group, exit from the newest_memtable_writers_ list, and wake up
  301. // the next leader if needed.
  302. void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
  303. // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
  304. // the non-leader members of this write batch group. Sets Writer::sequence
  305. // before waking them up.
  306. // If the size of write_group n is not small, the leader will call n^0.5
  307. // members to be PARALLEL_MEMTABLE_CALLER in the write_group to help to set
  308. // other's status parallel. This ensures that the cost to call SetState
  309. // sequentially does not exceed 2(n^0.5).
  310. //
  311. // WriteGroup* write_group: Extra state used to coordinate the parallel add
  312. void LaunchParallelMemTableWriters(WriteGroup* write_group);
  313. // One of the every stride=N number writer in the WriteGroup are set to the
  314. // MemTableWriters, where N is equal to square of the total number of this
  315. // write_group, and all of these MemTableWriters will write to memtable.
  316. void SetMemWritersEachStride(Writer* w);
  317. // Reports the completion of w's batch to the parallel group leader, and
  318. // waits for the rest of the parallel batch to complete. Returns true
  319. // if this thread is the last to complete, and hence should advance
  320. // the sequence number and then call EarlyExitParallelGroup, false if
  321. // someone else has already taken responsibility for that.
  322. bool CompleteParallelMemTableWriter(Writer* w);
  323. // Waits for all preceding writers (unlocking mu while waiting), then
  324. // registers w as the currently proceeding writer.
  325. //
  326. // Writer* w: A Writer not eligible for batching
  327. // InstrumentedMutex* mu: The db mutex, to unlock while waiting
  328. // REQUIRES: db mutex held
  329. void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
  330. // Completes a Writer begun with EnterUnbatched, unblocking subsequent
  331. // writers.
  332. void ExitUnbatched(Writer* w);
  333. // Wait for all parallel memtable writers to finish, in case pipelined
  334. // write is enabled.
  335. void WaitForMemTableWriters();
  336. SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
  337. if (sequence > last_sequence_) {
  338. last_sequence_ = sequence;
  339. }
  340. return last_sequence_;
  341. }
  342. // Insert a dummy writer at the tail of the write queue to indicate a write
  343. // stall, and fail any writers in the queue with no_slowdown set to true
  344. // REQUIRES: db mutex held, no other stall on this queue outstanding
  345. void BeginWriteStall();
  346. // Remove the dummy writer and wake up waiting writers
  347. // REQUIRES: db mutex held
  348. void EndWriteStall();
  349. // Number of BeginWriteStall(), or 0 if there is no active stall in the
  350. // write queue.
  351. // REQUIRES: db mutex held
  352. uint64_t GetBegunCountOfOutstandingStall();
  353. // Wait for number of completed EndWriteStall() to reach >= `stall_count`,
  354. // which will generally have come from GetBegunCountOfOutstandingStall().
  355. // (Does not require db mutex held)
  356. void WaitForStallEndedCount(uint64_t stall_count);
  357. private:
  358. // See AwaitState.
  359. const uint64_t max_yield_usec_;
  360. const uint64_t slow_yield_usec_;
  361. // Allow multiple writers write to memtable concurrently.
  362. const bool allow_concurrent_memtable_write_;
  363. // Enable pipelined write to WAL and memtable.
  364. const bool enable_pipelined_write_;
  365. // The maximum limit of number of bytes that are written in a single batch
  366. // of WAL or memtable write. It is followed when the leader write size
  367. // is larger than 1/8 of this limit.
  368. const uint64_t max_write_batch_group_size_bytes;
  369. // Points to the newest pending writer. Only leader can remove
  370. // elements, adding can be done lock-free by anybody.
  371. std::atomic<Writer*> newest_writer_;
  372. // Points to the newest pending memtable writer. Used only when pipelined
  373. // write is enabled.
  374. std::atomic<Writer*> newest_memtable_writer_;
  375. // The last sequence that have been consumed by a writer. The sequence
  376. // is not necessary visible to reads because the writer can be ongoing.
  377. SequenceNumber last_sequence_;
  378. // A dummy writer to indicate a write stall condition. This will be inserted
  379. // at the tail of the writer queue by the leader, so newer writers can just
  380. // check for this and bail
  381. Writer write_stall_dummy_;
  382. // Mutex and condvar for writers to block on a write stall. During a write
  383. // stall, writers with no_slowdown set to false will wait on this rather
  384. // on the writer queue
  385. port::Mutex stall_mu_;
  386. port::CondVar stall_cv_;
  387. // Count the number of stalls begun, so that we can check whether
  388. // a particular stall has cleared (even if caught in another stall).
  389. // Controlled by DB mutex.
  390. // Because of the contract on BeginWriteStall() / EndWriteStall(),
  391. // stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1.
  392. uint64_t stall_begun_count_ = 0;
  393. // Count the number of stalls ended, so that we can check whether
  394. // a particular stall has cleared (even if caught in another stall).
  395. // Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_.
  396. // Read with stall_mu or DB mutex.
  397. uint64_t stall_ended_count_ = 0;
  398. // Waits for w->state & goal_mask using w->StateMutex(). Returns
  399. // the state that satisfies goal_mask.
  400. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
  401. // Blocks until w->state & goal_mask, returning the state value
  402. // that satisfied the predicate. Uses ctx to adaptively use
  403. // std::this_thread::yield() to avoid mutex overheads. ctx should be
  404. // a context-dependent static.
  405. uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
  406. // Set writer state and wake the writer up if it is waiting.
  407. void SetState(Writer* w, uint8_t new_state);
  408. // Links w into the newest_writer list. Return true if w was linked directly
  409. // into the leader position. Safe to call from multiple threads without
  410. // external locking.
  411. bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
  412. // Link write group into the newest_writer list as a whole, while keeping the
  413. // order of the writers unchanged. Return true if the group was linked
  414. // directly into the leader position.
  415. bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
  416. // Computes any missing link_newer links. Should not be called
  417. // concurrently with itself.
  418. void CreateMissingNewerLinks(Writer* head);
  419. // Set the leader in write_group to completed state and remove it from the
  420. // write group.
  421. void CompleteLeader(WriteGroup& write_group);
  422. // Set a follower in write_group to completed state and remove it from the
  423. // write group.
  424. void CompleteFollower(Writer* w, WriteGroup& write_group);
  425. };
  426. } // namespace ROCKSDB_NAMESPACE