threadpool_imp.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "util/threadpool_imp.h"
  10. #ifndef OS_WIN
  11. #include <unistd.h>
  12. #endif
  13. #ifdef OS_LINUX
  14. #include <sys/resource.h>
  15. #include <sys/syscall.h>
  16. #endif
  17. #include <algorithm>
  18. #include <atomic>
  19. #include <condition_variable>
  20. #include <cstdlib>
  21. #include <deque>
  22. #include <mutex>
  23. #include <sstream>
  24. #include <thread>
  25. #include <vector>
  26. #include "monitoring/thread_status_util.h"
  27. #include "port/port.h"
  28. #include "test_util/sync_point.h"
  29. #include "util/string_util.h"
  30. namespace ROCKSDB_NAMESPACE {
  31. void ThreadPoolImpl::PthreadCall(const char* label, int result) {
  32. if (result != 0) {
  33. fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str());
  34. abort();
  35. }
  36. }
  37. struct ThreadPoolImpl::Impl {
  38. Impl();
  39. ~Impl();
  40. void JoinThreads(bool wait_for_jobs_to_complete);
  41. void SetBackgroundThreadsInternal(int num, bool allow_reduce);
  42. int GetBackgroundThreads();
  43. unsigned int GetQueueLen() const {
  44. return queue_len_.load(std::memory_order_relaxed);
  45. }
  46. void LowerIOPriority();
  47. void LowerCPUPriority(CpuPriority pri);
  48. void WakeUpAllThreads() { bgsignal_.notify_all(); }
  49. void BGThread(size_t thread_id);
  50. void StartBGThreads();
  51. void Submit(std::function<void()>&& schedule,
  52. std::function<void()>&& unschedule, void* tag);
  53. int UnSchedule(void* arg);
  54. void SetHostEnv(Env* env) { env_ = env; }
  55. Env* GetHostEnv() const { return env_; }
  56. bool HasExcessiveThread() const {
  57. return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
  58. }
  59. // Return true iff the current thread is the excessive thread to terminate.
  60. // Always terminate the running thread that is added last, even if there are
  61. // more than one thread to terminate.
  62. bool IsLastExcessiveThread(size_t thread_id) const {
  63. return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
  64. }
  65. bool IsExcessiveThread(size_t thread_id) const {
  66. return static_cast<int>(thread_id) >= total_threads_limit_;
  67. }
  68. // Return the thread priority.
  69. // This would allow its member-thread to know its priority.
  70. Env::Priority GetThreadPriority() const { return priority_; }
  71. // Set the thread priority.
  72. void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
  73. int ReserveThreads(int threads_to_be_reserved) {
  74. std::unique_lock<std::mutex> lock(mu_);
  75. // We can reserve at most num_waiting_threads_ in total so the number of
  76. // threads that can be reserved might be fewer than the desired one. In
  77. // rare cases, num_waiting_threads_ could be less than reserved_threads
  78. // due to SetBackgroundThreadInternal or last excessive threads. If that
  79. // happens, we cannot reserve any other threads.
  80. int reserved_threads_in_success =
  81. std::min(std::max(num_waiting_threads_ - reserved_threads_, 0),
  82. threads_to_be_reserved);
  83. reserved_threads_ += reserved_threads_in_success;
  84. return reserved_threads_in_success;
  85. }
  86. int ReleaseThreads(int threads_to_be_released) {
  87. std::unique_lock<std::mutex> lock(mu_);
  88. // We cannot release more than reserved_threads_
  89. int released_threads_in_success =
  90. std::min(reserved_threads_, threads_to_be_released);
  91. reserved_threads_ -= released_threads_in_success;
  92. WakeUpAllThreads();
  93. return released_threads_in_success;
  94. }
  95. private:
  96. static void BGThreadWrapper(void* arg);
  97. bool low_io_priority_;
  98. CpuPriority cpu_priority_;
  99. Env::Priority priority_;
  100. Env* env_;
  101. int total_threads_limit_;
  102. std::atomic_uint queue_len_; // Queue length. Used for stats reporting
  103. // Number of reserved threads, managed by ReserveThreads(..) and
  104. // ReleaseThreads(..), if num_waiting_threads_ is no larger than
  105. // reserved_threads_, its thread will be blocked to ensure the reservation
  106. // mechanism
  107. int reserved_threads_;
  108. // Number of waiting threads (Maximum number of threads that can be
  109. // reserved), in rare cases, num_waiting_threads_ could be less than
  110. // reserved_threads due to SetBackgroundThreadInternal or last
  111. // excessive threads.
  112. int num_waiting_threads_;
  113. bool exit_all_threads_;
  114. bool wait_for_jobs_to_complete_;
  115. // Entry per Schedule()/Submit() call
  116. struct BGItem {
  117. void* tag = nullptr;
  118. std::function<void()> function;
  119. std::function<void()> unschedFunction;
  120. };
  121. using BGQueue = std::deque<BGItem>;
  122. BGQueue queue_;
  123. std::mutex mu_;
  124. std::condition_variable bgsignal_;
  125. std::vector<port::Thread> bgthreads_;
  126. };
  127. inline ThreadPoolImpl::Impl::Impl()
  128. : low_io_priority_(false),
  129. cpu_priority_(CpuPriority::kNormal),
  130. priority_(Env::LOW),
  131. env_(nullptr),
  132. total_threads_limit_(0),
  133. queue_len_(),
  134. reserved_threads_(0),
  135. num_waiting_threads_(0),
  136. exit_all_threads_(false),
  137. wait_for_jobs_to_complete_(false),
  138. queue_(),
  139. mu_(),
  140. bgsignal_(),
  141. bgthreads_() {}
  142. inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
  143. void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
  144. std::unique_lock<std::mutex> lock(mu_);
  145. assert(!exit_all_threads_);
  146. wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
  147. exit_all_threads_ = true;
  148. // prevent threads from being recreated right after they're joined, in case
  149. // the user is concurrently submitting jobs.
  150. total_threads_limit_ = 0;
  151. reserved_threads_ = 0;
  152. num_waiting_threads_ = 0;
  153. lock.unlock();
  154. bgsignal_.notify_all();
  155. for (auto& th : bgthreads_) {
  156. th.join();
  157. }
  158. bgthreads_.clear();
  159. exit_all_threads_ = false;
  160. wait_for_jobs_to_complete_ = false;
  161. }
  162. inline void ThreadPoolImpl::Impl::LowerIOPriority() {
  163. std::lock_guard<std::mutex> lock(mu_);
  164. low_io_priority_ = true;
  165. }
  166. inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
  167. std::lock_guard<std::mutex> lock(mu_);
  168. cpu_priority_ = pri;
  169. }
  170. void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
  171. bool low_io_priority = false;
  172. CpuPriority current_cpu_priority = CpuPriority::kNormal;
  173. while (true) {
  174. // Wait until there is an item that is ready to run
  175. std::unique_lock<std::mutex> lock(mu_);
  176. // Stop waiting if the thread needs to do work or needs to terminate.
  177. // Increase num_waiting_threads_ once this task has started waiting
  178. num_waiting_threads_++;
  179. TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc");
  180. TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id);
  181. // When not exist_all_threads and the current thread id is not the last
  182. // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty
  183. // 2) it is the excessive thread (not the last one)
  184. // 3) the number of waiting threads is not greater than reserved threads
  185. // (i.e, no available threads due to full reservation")
  186. while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
  187. (queue_.empty() || IsExcessiveThread(thread_id) ||
  188. num_waiting_threads_ <= reserved_threads_)) {
  189. bgsignal_.wait(lock);
  190. }
  191. // Decrease num_waiting_threads_ once the thread is not waiting
  192. num_waiting_threads_--;
  193. if (exit_all_threads_) { // mechanism to let BG threads exit safely
  194. if (!wait_for_jobs_to_complete_ || queue_.empty()) {
  195. break;
  196. }
  197. } else if (IsLastExcessiveThread(thread_id)) {
  198. // Current thread is the last generated one and is excessive.
  199. // We always terminate excessive thread in the reverse order of
  200. // generation time. But not when `exit_all_threads_ == true`,
  201. // otherwise `JoinThreads()` could try to `join()` a `detach()`ed
  202. // thread.
  203. auto& terminating_thread = bgthreads_.back();
  204. terminating_thread.detach();
  205. bgthreads_.pop_back();
  206. if (HasExcessiveThread()) {
  207. // There is still at least more excessive thread to terminate.
  208. WakeUpAllThreads();
  209. }
  210. TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th",
  211. thread_id);
  212. TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination");
  213. break;
  214. }
  215. auto func = std::move(queue_.front().function);
  216. queue_.pop_front();
  217. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  218. std::memory_order_relaxed);
  219. bool decrease_io_priority = (low_io_priority != low_io_priority_);
  220. CpuPriority cpu_priority = cpu_priority_;
  221. lock.unlock();
  222. if (cpu_priority < current_cpu_priority) {
  223. TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
  224. &current_cpu_priority);
  225. // 0 means current thread.
  226. port::SetCpuPriority(0, cpu_priority);
  227. current_cpu_priority = cpu_priority;
  228. TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
  229. &current_cpu_priority);
  230. }
  231. #ifdef OS_LINUX
  232. if (decrease_io_priority) {
  233. #define IOPRIO_CLASS_SHIFT (13)
  234. #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
  235. // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
  236. // These system calls only have an effect when used in conjunction
  237. // with an I/O scheduler that supports I/O priorities. As at
  238. // kernel 2.6.17 the only such scheduler is the Completely
  239. // Fair Queuing (CFQ) I/O scheduler.
  240. // To change scheduler:
  241. // echo cfq > /sys/block/<device_name>/queue/schedule
  242. // Tunables to consider:
  243. // /sys/block/<device_name>/queue/slice_idle
  244. // /sys/block/<device_name>/queue/slice_sync
  245. syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
  246. 0, // current thread
  247. IOPRIO_PRIO_VALUE(3, 0));
  248. low_io_priority = true;
  249. }
  250. #else
  251. (void)decrease_io_priority; // avoid 'unused variable' error
  252. #endif
  253. TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
  254. &priority_);
  255. func();
  256. }
  257. }
  258. // Helper struct for passing arguments when creating threads.
  259. struct BGThreadMetadata {
  260. ThreadPoolImpl::Impl* thread_pool_;
  261. size_t thread_id_; // Thread count in the thread.
  262. BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
  263. : thread_pool_(thread_pool), thread_id_(thread_id) {}
  264. };
  265. void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
  266. BGThreadMetadata* meta = static_cast<BGThreadMetadata*>(arg);
  267. size_t thread_id = meta->thread_id_;
  268. ThreadPoolImpl::Impl* tp = meta->thread_pool_;
  269. #ifdef ROCKSDB_USING_THREAD_STATUS
  270. // initialize it because compiler isn't good enough to see we don't use it
  271. // uninitialized
  272. ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
  273. switch (tp->GetThreadPriority()) {
  274. case Env::Priority::HIGH:
  275. thread_type = ThreadStatus::HIGH_PRIORITY;
  276. break;
  277. case Env::Priority::LOW:
  278. thread_type = ThreadStatus::LOW_PRIORITY;
  279. break;
  280. case Env::Priority::BOTTOM:
  281. thread_type = ThreadStatus::BOTTOM_PRIORITY;
  282. break;
  283. case Env::Priority::USER:
  284. thread_type = ThreadStatus::USER;
  285. break;
  286. case Env::Priority::TOTAL:
  287. assert(false);
  288. return;
  289. }
  290. assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
  291. ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
  292. #endif
  293. delete meta;
  294. tp->BGThread(thread_id);
  295. #ifdef ROCKSDB_USING_THREAD_STATUS
  296. ThreadStatusUtil::UnregisterThread();
  297. #endif
  298. return;
  299. }
  300. void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
  301. bool allow_reduce) {
  302. std::lock_guard<std::mutex> lock(mu_);
  303. if (exit_all_threads_) {
  304. return;
  305. }
  306. if (num > total_threads_limit_ ||
  307. (num < total_threads_limit_ && allow_reduce)) {
  308. total_threads_limit_ = std::max(0, num);
  309. WakeUpAllThreads();
  310. StartBGThreads();
  311. }
  312. }
  313. int ThreadPoolImpl::Impl::GetBackgroundThreads() {
  314. std::unique_lock<std::mutex> lock(mu_);
  315. return total_threads_limit_;
  316. }
  317. void ThreadPoolImpl::Impl::StartBGThreads() {
  318. // Start background thread if necessary
  319. while ((int)bgthreads_.size() < total_threads_limit_) {
  320. port::Thread p_t(&BGThreadWrapper,
  321. new BGThreadMetadata(this, bgthreads_.size()));
  322. // Set the thread name to aid debugging
  323. #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  324. #if __GLIBC_PREREQ(2, 12)
  325. auto th_handle = p_t.native_handle();
  326. std::string thread_priority = Env::PriorityToString(GetThreadPriority());
  327. std::ostringstream thread_name_stream;
  328. thread_name_stream << "rocksdb:";
  329. for (char c : thread_priority) {
  330. thread_name_stream << static_cast<char>(tolower(c));
  331. }
  332. pthread_setname_np(th_handle, thread_name_stream.str().c_str());
  333. #endif
  334. #endif
  335. bgthreads_.push_back(std::move(p_t));
  336. }
  337. }
  338. void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
  339. std::function<void()>&& unschedule,
  340. void* tag) {
  341. std::lock_guard<std::mutex> lock(mu_);
  342. if (exit_all_threads_) {
  343. return;
  344. }
  345. StartBGThreads();
  346. // Add to priority queue
  347. queue_.push_back(BGItem());
  348. TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue");
  349. auto& item = queue_.back();
  350. item.tag = tag;
  351. item.function = std::move(schedule);
  352. item.unschedFunction = std::move(unschedule);
  353. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  354. std::memory_order_relaxed);
  355. if (!HasExcessiveThread()) {
  356. // Wake up at least one waiting thread.
  357. bgsignal_.notify_one();
  358. } else {
  359. // Need to wake up all threads to make sure the one woken
  360. // up is not the one to terminate.
  361. WakeUpAllThreads();
  362. }
  363. }
  364. int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
  365. int count = 0;
  366. std::vector<std::function<void()>> candidates;
  367. {
  368. std::lock_guard<std::mutex> lock(mu_);
  369. // Remove from priority queue
  370. BGQueue::iterator it = queue_.begin();
  371. while (it != queue_.end()) {
  372. if (arg == (*it).tag) {
  373. if (it->unschedFunction) {
  374. candidates.push_back(std::move(it->unschedFunction));
  375. }
  376. it = queue_.erase(it);
  377. count++;
  378. } else {
  379. ++it;
  380. }
  381. }
  382. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  383. std::memory_order_relaxed);
  384. }
  385. // Run unschedule functions outside the mutex
  386. for (auto& f : candidates) {
  387. f();
  388. }
  389. return count;
  390. }
  391. ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {}
  392. ThreadPoolImpl::~ThreadPoolImpl() = default;
  393. void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); }
  394. void ThreadPoolImpl::SetBackgroundThreads(int num) {
  395. impl_->SetBackgroundThreadsInternal(num, true);
  396. }
  397. int ThreadPoolImpl::GetBackgroundThreads() {
  398. return impl_->GetBackgroundThreads();
  399. }
  400. unsigned int ThreadPoolImpl::GetQueueLen() const {
  401. return impl_->GetQueueLen();
  402. }
  403. void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
  404. impl_->JoinThreads(true);
  405. }
  406. void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); }
  407. void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
  408. impl_->LowerCPUPriority(pri);
  409. }
  410. void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
  411. impl_->SetBackgroundThreadsInternal(num, false);
  412. }
  413. void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
  414. auto copy(job);
  415. impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
  416. }
  417. void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
  418. impl_->Submit(std::move(job), std::function<void()>(), nullptr);
  419. }
  420. void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
  421. void* tag, void (*unschedFunction)(void* arg)) {
  422. if (unschedFunction == nullptr) {
  423. impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
  424. } else {
  425. impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
  426. tag);
  427. }
  428. }
  429. int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); }
  430. void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
  431. Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
  432. // Return the thread priority.
  433. // This would allow its member-thread to know its priority.
  434. Env::Priority ThreadPoolImpl::GetThreadPriority() const {
  435. return impl_->GetThreadPriority();
  436. }
  437. // Set the thread priority.
  438. void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
  439. impl_->SetThreadPriority(priority);
  440. }
  441. // Reserve a specific number of threads, prevent them from running other
  442. // functions The number of reserved threads could be fewer than the desired one
  443. int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) {
  444. return impl_->ReserveThreads(threads_to_be_reserved);
  445. }
  446. // Release a specific number of threads
  447. int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) {
  448. return impl_->ReleaseThreads(threads_to_be_released);
  449. }
  450. ThreadPool* NewThreadPool(int num_threads) {
  451. ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
  452. thread_pool->SetBackgroundThreads(num_threads);
  453. return thread_pool;
  454. }
  455. } // namespace ROCKSDB_NAMESPACE