write_thread.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <assert.h>
  7. #include <stdint.h>
  8. #include <atomic>
  9. #include <chrono>
  10. #include <condition_variable>
  11. #include <mutex>
  12. #include <type_traits>
  13. #include <vector>
  14. #include "db/dbformat.h"
  15. #include "db/pre_release_callback.h"
  16. #include "db/write_callback.h"
  17. #include "monitoring/instrumented_mutex.h"
  18. #include "rocksdb/options.h"
  19. #include "rocksdb/status.h"
  20. #include "rocksdb/types.h"
  21. #include "rocksdb/write_batch.h"
  22. #include "util/autovector.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. class WriteThread {
  25. public:
  26. enum State : uint8_t {
  27. // The initial state of a writer. This is a Writer that is
  28. // waiting in JoinBatchGroup. This state can be left when another
  29. // thread informs the waiter that it has become a group leader
  30. // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
  31. // non-parallel informs a follower that its writes have been committed
  32. // (-> STATE_COMPLETED), or when a leader that has chosen to perform
  33. // updates in parallel and needs this Writer to apply its batch (->
  34. // STATE_PARALLEL_FOLLOWER).
  35. STATE_INIT = 1,
  36. // The state used to inform a waiting Writer that it has become the
  37. // leader, and it should now build a write batch group. Tricky:
  38. // this state is not used if newest_writer_ is empty when a writer
  39. // enqueues itself, because there is no need to wait (or even to
  40. // create the mutex and condvar used to wait) in that case. This is
  41. // a terminal state unless the leader chooses to make this a parallel
  42. // batch, in which case the last parallel worker to finish will move
  43. // the leader to STATE_COMPLETED.
  44. STATE_GROUP_LEADER = 2,
  45. // The state used to inform a waiting writer that it has become the
  46. // leader of memtable writer group. The leader will either write
  47. // memtable for the whole group, or launch a parallel group write
  48. // to memtable by calling LaunchParallelMemTableWrite.
  49. STATE_MEMTABLE_WRITER_LEADER = 4,
  50. // The state used to inform a waiting writer that it has become a
  51. // parallel memtable writer. It can be the group leader who launch the
  52. // parallel writer group, or one of the followers. The writer should then
  53. // apply its batch to the memtable concurrently and call
  54. // CompleteParallelMemTableWriter.
  55. STATE_PARALLEL_MEMTABLE_WRITER = 8,
  56. // A follower whose writes have been applied, or a parallel leader
  57. // whose followers have all finished their work. This is a terminal
  58. // state.
  59. STATE_COMPLETED = 16,
  60. // A state indicating that the thread may be waiting using StateMutex()
  61. // and StateCondVar()
  62. STATE_LOCKED_WAITING = 32,
  63. };
  64. struct Writer;
  65. struct WriteGroup {
  66. Writer* leader = nullptr;
  67. Writer* last_writer = nullptr;
  68. SequenceNumber last_sequence;
  69. // before running goes to zero, status needs leader->StateMutex()
  70. Status status;
  71. std::atomic<size_t> running;
  72. size_t size = 0;
  73. struct Iterator {
  74. Writer* writer;
  75. Writer* last_writer;
  76. explicit Iterator(Writer* w, Writer* last)
  77. : writer(w), last_writer(last) {}
  78. Writer* operator*() const { return writer; }
  79. Iterator& operator++() {
  80. assert(writer != nullptr);
  81. if (writer == last_writer) {
  82. writer = nullptr;
  83. } else {
  84. writer = writer->link_newer;
  85. }
  86. return *this;
  87. }
  88. bool operator!=(const Iterator& other) const {
  89. return writer != other.writer;
  90. }
  91. };
  92. Iterator begin() const { return Iterator(leader, last_writer); }
  93. Iterator end() const { return Iterator(nullptr, nullptr); }
  94. };
  95. // Information kept for every waiting writer.
  96. struct Writer {
  97. WriteBatch* batch;
  98. bool sync;
  99. bool no_slowdown;
  100. bool disable_wal;
  101. bool disable_memtable;
  102. size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
  103. PreReleaseCallback* pre_release_callback;
  104. uint64_t log_used; // log number that this batch was inserted into
  105. uint64_t log_ref; // log number that memtable insert should reference
  106. WriteCallback* callback;
  107. bool made_waitable; // records lazy construction of mutex and cv
  108. std::atomic<uint8_t> state; // write under StateMutex() or pre-link
  109. WriteGroup* write_group;
  110. SequenceNumber sequence; // the sequence number to use for the first key
  111. Status status;
  112. Status callback_status; // status returned by callback->Callback()
  113. std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
  114. std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
  115. Writer* link_older; // read/write only before linking, or as leader
  116. Writer* link_newer; // lazy, read/write only before linking, or as leader
  117. Writer()
  118. : batch(nullptr),
  119. sync(false),
  120. no_slowdown(false),
  121. disable_wal(false),
  122. disable_memtable(false),
  123. batch_cnt(0),
  124. pre_release_callback(nullptr),
  125. log_used(0),
  126. log_ref(0),
  127. callback(nullptr),
  128. made_waitable(false),
  129. state(STATE_INIT),
  130. write_group(nullptr),
  131. sequence(kMaxSequenceNumber),
  132. link_older(nullptr),
  133. link_newer(nullptr) {}
  134. Writer(const WriteOptions& write_options, WriteBatch* _batch,
  135. WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
  136. size_t _batch_cnt = 0,
  137. PreReleaseCallback* _pre_release_callback = nullptr)
  138. : batch(_batch),
  139. sync(write_options.sync),
  140. no_slowdown(write_options.no_slowdown),
  141. disable_wal(write_options.disableWAL),
  142. disable_memtable(_disable_memtable),
  143. batch_cnt(_batch_cnt),
  144. pre_release_callback(_pre_release_callback),
  145. log_used(0),
  146. log_ref(_log_ref),
  147. callback(_callback),
  148. made_waitable(false),
  149. state(STATE_INIT),
  150. write_group(nullptr),
  151. sequence(kMaxSequenceNumber),
  152. link_older(nullptr),
  153. link_newer(nullptr) {}
  154. ~Writer() {
  155. if (made_waitable) {
  156. StateMutex().~mutex();
  157. StateCV().~condition_variable();
  158. }
  159. }
  160. bool CheckCallback(DB* db) {
  161. if (callback != nullptr) {
  162. callback_status = callback->Callback(db);
  163. }
  164. return callback_status.ok();
  165. }
  166. void CreateMutex() {
  167. if (!made_waitable) {
  168. // Note that made_waitable is tracked separately from state
  169. // transitions, because we can't atomically create the mutex and
  170. // link into the list.
  171. made_waitable = true;
  172. new (&state_mutex_bytes) std::mutex;
  173. new (&state_cv_bytes) std::condition_variable;
  174. }
  175. }
  176. // returns the aggregate status of this Writer
  177. Status FinalStatus() {
  178. if (!status.ok()) {
  179. // a non-ok memtable write status takes presidence
  180. assert(callback == nullptr || callback_status.ok());
  181. return status;
  182. } else if (!callback_status.ok()) {
  183. // if the callback failed then that is the status we want
  184. // because a memtable insert should not have been attempted
  185. assert(callback != nullptr);
  186. assert(status.ok());
  187. return callback_status;
  188. } else {
  189. // if there is no callback then we only care about
  190. // the memtable insert status
  191. assert(callback == nullptr || callback_status.ok());
  192. return status;
  193. }
  194. }
  195. bool CallbackFailed() {
  196. return (callback != nullptr) && !callback_status.ok();
  197. }
  198. bool ShouldWriteToMemtable() {
  199. return status.ok() && !CallbackFailed() && !disable_memtable;
  200. }
  201. bool ShouldWriteToWAL() {
  202. return status.ok() && !CallbackFailed() && !disable_wal;
  203. }
  204. // No other mutexes may be acquired while holding StateMutex(), it is
  205. // always last in the order
  206. std::mutex& StateMutex() {
  207. assert(made_waitable);
  208. return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
  209. }
  210. std::condition_variable& StateCV() {
  211. assert(made_waitable);
  212. return *static_cast<std::condition_variable*>(
  213. static_cast<void*>(&state_cv_bytes));
  214. }
  215. };
  216. struct AdaptationContext {
  217. const char* name;
  218. std::atomic<int32_t> value;
  219. explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
  220. };
  221. explicit WriteThread(const ImmutableDBOptions& db_options);
  222. virtual ~WriteThread() = default;
  223. // IMPORTANT: None of the methods in this class rely on the db mutex
  224. // for correctness. All of the methods except JoinBatchGroup and
  225. // EnterUnbatched may be called either with or without the db mutex held.
  226. // Correctness is maintained by ensuring that only a single thread is
  227. // a leader at a time.
  228. // Registers w as ready to become part of a batch group, waits until the
  229. // caller should perform some work, and returns the current state of the
  230. // writer. If w has become the leader of a write batch group, returns
  231. // STATE_GROUP_LEADER. If w has been made part of a sequential batch
  232. // group and the leader has performed the write, returns STATE_DONE.
  233. // If w has been made part of a parallel batch group and is responsible
  234. // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
  235. //
  236. // The db mutex SHOULD NOT be held when calling this function, because
  237. // it will block.
  238. //
  239. // Writer* w: Writer to be executed as part of a batch group
  240. void JoinBatchGroup(Writer* w);
  241. // Constructs a write batch group led by leader, which should be a
  242. // Writer passed to JoinBatchGroup on the current thread.
  243. //
  244. // Writer* leader: Writer that is STATE_GROUP_LEADER
  245. // WriteGroup* write_group: Out-param of group members
  246. // returns: Total batch group byte size
  247. size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
  248. // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
  249. // and wakes up the next leader (if any).
  250. //
  251. // WriteGroup* write_group: the write group
  252. // Status status: Status of write operation
  253. void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
  254. // Exit batch group on behalf of batch group leader.
  255. void ExitAsBatchGroupFollower(Writer* w);
  256. // Constructs a write batch group led by leader from newest_memtable_writers_
  257. // list. The leader should either write memtable for the whole group and
  258. // call ExitAsMemTableWriter, or launch parallel memtable write through
  259. // LaunchParallelMemTableWriters.
  260. void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
  261. // Memtable writer group leader, or the last finished writer in a parallel
  262. // write group, exit from the newest_memtable_writers_ list, and wake up
  263. // the next leader if needed.
  264. void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
  265. // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
  266. // non-leader members of this write batch group. Sets Writer::sequence
  267. // before waking them up.
  268. //
  269. // WriteGroup* write_group: Extra state used to coordinate the parallel add
  270. void LaunchParallelMemTableWriters(WriteGroup* write_group);
  271. // Reports the completion of w's batch to the parallel group leader, and
  272. // waits for the rest of the parallel batch to complete. Returns true
  273. // if this thread is the last to complete, and hence should advance
  274. // the sequence number and then call EarlyExitParallelGroup, false if
  275. // someone else has already taken responsibility for that.
  276. bool CompleteParallelMemTableWriter(Writer* w);
  277. // Waits for all preceding writers (unlocking mu while waiting), then
  278. // registers w as the currently proceeding writer.
  279. //
  280. // Writer* w: A Writer not eligible for batching
  281. // InstrumentedMutex* mu: The db mutex, to unlock while waiting
  282. // REQUIRES: db mutex held
  283. void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
  284. // Completes a Writer begun with EnterUnbatched, unblocking subsequent
  285. // writers.
  286. void ExitUnbatched(Writer* w);
  287. // Wait for all parallel memtable writers to finish, in case pipelined
  288. // write is enabled.
  289. void WaitForMemTableWriters();
  290. SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
  291. if (sequence > last_sequence_) {
  292. last_sequence_ = sequence;
  293. }
  294. return last_sequence_;
  295. }
  296. // Insert a dummy writer at the tail of the write queue to indicate a write
  297. // stall, and fail any writers in the queue with no_slowdown set to true
  298. void BeginWriteStall();
  299. // Remove the dummy writer and wake up waiting writers
  300. void EndWriteStall();
  301. private:
  302. // See AwaitState.
  303. const uint64_t max_yield_usec_;
  304. const uint64_t slow_yield_usec_;
  305. // Allow multiple writers write to memtable concurrently.
  306. const bool allow_concurrent_memtable_write_;
  307. // Enable pipelined write to WAL and memtable.
  308. const bool enable_pipelined_write_;
  309. // The maximum limit of number of bytes that are written in a single batch
  310. // of WAL or memtable write. It is followed when the leader write size
  311. // is larger than 1/8 of this limit.
  312. const uint64_t max_write_batch_group_size_bytes;
  313. // Points to the newest pending writer. Only leader can remove
  314. // elements, adding can be done lock-free by anybody.
  315. std::atomic<Writer*> newest_writer_;
  316. // Points to the newest pending memtable writer. Used only when pipelined
  317. // write is enabled.
  318. std::atomic<Writer*> newest_memtable_writer_;
  319. // The last sequence that have been consumed by a writer. The sequence
  320. // is not necessary visible to reads because the writer can be ongoing.
  321. SequenceNumber last_sequence_;
  322. // A dummy writer to indicate a write stall condition. This will be inserted
  323. // at the tail of the writer queue by the leader, so newer writers can just
  324. // check for this and bail
  325. Writer write_stall_dummy_;
  326. // Mutex and condvar for writers to block on a write stall. During a write
  327. // stall, writers with no_slowdown set to false will wait on this rather
  328. // on the writer queue
  329. port::Mutex stall_mu_;
  330. port::CondVar stall_cv_;
  331. // Waits for w->state & goal_mask using w->StateMutex(). Returns
  332. // the state that satisfies goal_mask.
  333. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
  334. // Blocks until w->state & goal_mask, returning the state value
  335. // that satisfied the predicate. Uses ctx to adaptively use
  336. // std::this_thread::yield() to avoid mutex overheads. ctx should be
  337. // a context-dependent static.
  338. uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
  339. // Set writer state and wake the writer up if it is waiting.
  340. void SetState(Writer* w, uint8_t new_state);
  341. // Links w into the newest_writer list. Return true if w was linked directly
  342. // into the leader position. Safe to call from multiple threads without
  343. // external locking.
  344. bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
  345. // Link write group into the newest_writer list as a whole, while keeping the
  346. // order of the writers unchanged. Return true if the group was linked
  347. // directly into the leader position.
  348. bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
  349. // Computes any missing link_newer links. Should not be called
  350. // concurrently with itself.
  351. void CreateMissingNewerLinks(Writer* head);
  352. // Starting from a pending writer, follow link_older to search for next
  353. // leader, until we hit boundary.
  354. Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
  355. // Set the leader in write_group to completed state and remove it from the
  356. // write group.
  357. void CompleteLeader(WriteGroup& write_group);
  358. // Set a follower in write_group to completed state and remove it from the
  359. // write group.
  360. void CompleteFollower(Writer* w, WriteGroup& write_group);
  361. };
  362. } // namespace ROCKSDB_NAMESPACE