threadpool_imp.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "util/threadpool_imp.h"
  10. #include "monitoring/thread_status_util.h"
  11. #include "port/port.h"
  12. #ifndef OS_WIN
  13. # include <unistd.h>
  14. #endif
  15. #ifdef OS_LINUX
  16. # include <sys/syscall.h>
  17. # include <sys/resource.h>
  18. #endif
  19. #include <stdlib.h>
  20. #include <algorithm>
  21. #include <atomic>
  22. #include <condition_variable>
  23. #include <deque>
  24. #include <mutex>
  25. #include <sstream>
  26. #include <thread>
  27. #include <vector>
  28. namespace ROCKSDB_NAMESPACE {
  29. void ThreadPoolImpl::PthreadCall(const char* label, int result) {
  30. if (result != 0) {
  31. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  32. abort();
  33. }
  34. }
  35. struct ThreadPoolImpl::Impl {
  36. Impl();
  37. ~Impl();
  38. void JoinThreads(bool wait_for_jobs_to_complete);
  39. void SetBackgroundThreadsInternal(int num, bool allow_reduce);
  40. int GetBackgroundThreads();
  41. unsigned int GetQueueLen() const {
  42. return queue_len_.load(std::memory_order_relaxed);
  43. }
  44. void LowerIOPriority();
  45. void LowerCPUPriority();
  46. void WakeUpAllThreads() {
  47. bgsignal_.notify_all();
  48. }
  49. void BGThread(size_t thread_id);
  50. void StartBGThreads();
  51. void Submit(std::function<void()>&& schedule,
  52. std::function<void()>&& unschedule, void* tag);
  53. int UnSchedule(void* arg);
  54. void SetHostEnv(Env* env) { env_ = env; }
  55. Env* GetHostEnv() const { return env_; }
  56. bool HasExcessiveThread() const {
  57. return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
  58. }
  59. // Return true iff the current thread is the excessive thread to terminate.
  60. // Always terminate the running thread that is added last, even if there are
  61. // more than one thread to terminate.
  62. bool IsLastExcessiveThread(size_t thread_id) const {
  63. return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
  64. }
  65. bool IsExcessiveThread(size_t thread_id) const {
  66. return static_cast<int>(thread_id) >= total_threads_limit_;
  67. }
  68. // Return the thread priority.
  69. // This would allow its member-thread to know its priority.
  70. Env::Priority GetThreadPriority() const { return priority_; }
  71. // Set the thread priority.
  72. void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
  73. private:
  74. static void BGThreadWrapper(void* arg);
  75. bool low_io_priority_;
  76. bool low_cpu_priority_;
  77. Env::Priority priority_;
  78. Env* env_;
  79. int total_threads_limit_;
  80. std::atomic_uint queue_len_; // Queue length. Used for stats reporting
  81. bool exit_all_threads_;
  82. bool wait_for_jobs_to_complete_;
  83. // Entry per Schedule()/Submit() call
  84. struct BGItem {
  85. void* tag = nullptr;
  86. std::function<void()> function;
  87. std::function<void()> unschedFunction;
  88. };
  89. using BGQueue = std::deque<BGItem>;
  90. BGQueue queue_;
  91. std::mutex mu_;
  92. std::condition_variable bgsignal_;
  93. std::vector<port::Thread> bgthreads_;
  94. };
  95. inline
  96. ThreadPoolImpl::Impl::Impl()
  97. :
  98. low_io_priority_(false),
  99. low_cpu_priority_(false),
  100. priority_(Env::LOW),
  101. env_(nullptr),
  102. total_threads_limit_(0),
  103. queue_len_(),
  104. exit_all_threads_(false),
  105. wait_for_jobs_to_complete_(false),
  106. queue_(),
  107. mu_(),
  108. bgsignal_(),
  109. bgthreads_() {
  110. }
  111. inline
  112. ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
  113. void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
  114. std::unique_lock<std::mutex> lock(mu_);
  115. assert(!exit_all_threads_);
  116. wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
  117. exit_all_threads_ = true;
  118. // prevent threads from being recreated right after they're joined, in case
  119. // the user is concurrently submitting jobs.
  120. total_threads_limit_ = 0;
  121. lock.unlock();
  122. bgsignal_.notify_all();
  123. for (auto& th : bgthreads_) {
  124. th.join();
  125. }
  126. bgthreads_.clear();
  127. exit_all_threads_ = false;
  128. wait_for_jobs_to_complete_ = false;
  129. }
  130. inline
  131. void ThreadPoolImpl::Impl::LowerIOPriority() {
  132. std::lock_guard<std::mutex> lock(mu_);
  133. low_io_priority_ = true;
  134. }
  135. inline
  136. void ThreadPoolImpl::Impl::LowerCPUPriority() {
  137. std::lock_guard<std::mutex> lock(mu_);
  138. low_cpu_priority_ = true;
  139. }
  140. void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
  141. bool low_io_priority = false;
  142. bool low_cpu_priority = false;
  143. while (true) {
  144. // Wait until there is an item that is ready to run
  145. std::unique_lock<std::mutex> lock(mu_);
  146. // Stop waiting if the thread needs to do work or needs to terminate.
  147. while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
  148. (queue_.empty() || IsExcessiveThread(thread_id))) {
  149. bgsignal_.wait(lock);
  150. }
  151. if (exit_all_threads_) { // mechanism to let BG threads exit safely
  152. if (!wait_for_jobs_to_complete_ ||
  153. queue_.empty()) {
  154. break;
  155. }
  156. }
  157. if (IsLastExcessiveThread(thread_id)) {
  158. // Current thread is the last generated one and is excessive.
  159. // We always terminate excessive thread in the reverse order of
  160. // generation time.
  161. auto& terminating_thread = bgthreads_.back();
  162. terminating_thread.detach();
  163. bgthreads_.pop_back();
  164. if (HasExcessiveThread()) {
  165. // There is still at least more excessive thread to terminate.
  166. WakeUpAllThreads();
  167. }
  168. break;
  169. }
  170. auto func = std::move(queue_.front().function);
  171. queue_.pop_front();
  172. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  173. std::memory_order_relaxed);
  174. bool decrease_io_priority = (low_io_priority != low_io_priority_);
  175. bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
  176. lock.unlock();
  177. #ifdef OS_LINUX
  178. if (decrease_cpu_priority) {
  179. setpriority(
  180. PRIO_PROCESS,
  181. // Current thread.
  182. 0,
  183. // Lowest priority possible.
  184. 19);
  185. low_cpu_priority = true;
  186. }
  187. if (decrease_io_priority) {
  188. #define IOPRIO_CLASS_SHIFT (13)
  189. #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
  190. // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
  191. // These system calls only have an effect when used in conjunction
  192. // with an I/O scheduler that supports I/O priorities. As at
  193. // kernel 2.6.17 the only such scheduler is the Completely
  194. // Fair Queuing (CFQ) I/O scheduler.
  195. // To change scheduler:
  196. // echo cfq > /sys/block/<device_name>/queue/schedule
  197. // Tunables to consider:
  198. // /sys/block/<device_name>/queue/slice_idle
  199. // /sys/block/<device_name>/queue/slice_sync
  200. syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
  201. 0, // current thread
  202. IOPRIO_PRIO_VALUE(3, 0));
  203. low_io_priority = true;
  204. }
  205. #else
  206. (void)decrease_io_priority; // avoid 'unused variable' error
  207. (void)decrease_cpu_priority;
  208. #endif
  209. func();
  210. }
  211. }
  212. // Helper struct for passing arguments when creating threads.
  213. struct BGThreadMetadata {
  214. ThreadPoolImpl::Impl* thread_pool_;
  215. size_t thread_id_; // Thread count in the thread.
  216. BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
  217. : thread_pool_(thread_pool), thread_id_(thread_id) {}
  218. };
  219. void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
  220. BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
  221. size_t thread_id = meta->thread_id_;
  222. ThreadPoolImpl::Impl* tp = meta->thread_pool_;
  223. #ifdef ROCKSDB_USING_THREAD_STATUS
  224. // initialize it because compiler isn't good enough to see we don't use it
  225. // uninitialized
  226. ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
  227. switch (tp->GetThreadPriority()) {
  228. case Env::Priority::HIGH:
  229. thread_type = ThreadStatus::HIGH_PRIORITY;
  230. break;
  231. case Env::Priority::LOW:
  232. thread_type = ThreadStatus::LOW_PRIORITY;
  233. break;
  234. case Env::Priority::BOTTOM:
  235. thread_type = ThreadStatus::BOTTOM_PRIORITY;
  236. break;
  237. case Env::Priority::USER:
  238. thread_type = ThreadStatus::USER;
  239. break;
  240. case Env::Priority::TOTAL:
  241. assert(false);
  242. return;
  243. }
  244. assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
  245. ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
  246. #endif
  247. delete meta;
  248. tp->BGThread(thread_id);
  249. #ifdef ROCKSDB_USING_THREAD_STATUS
  250. ThreadStatusUtil::UnregisterThread();
  251. #endif
  252. return;
  253. }
  254. void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
  255. bool allow_reduce) {
  256. std::lock_guard<std::mutex> lock(mu_);
  257. if (exit_all_threads_) {
  258. return;
  259. }
  260. if (num > total_threads_limit_ ||
  261. (num < total_threads_limit_ && allow_reduce)) {
  262. total_threads_limit_ = std::max(0, num);
  263. WakeUpAllThreads();
  264. StartBGThreads();
  265. }
  266. }
  267. int ThreadPoolImpl::Impl::GetBackgroundThreads() {
  268. std::unique_lock<std::mutex> lock(mu_);
  269. return total_threads_limit_;
  270. }
  271. void ThreadPoolImpl::Impl::StartBGThreads() {
  272. // Start background thread if necessary
  273. while ((int)bgthreads_.size() < total_threads_limit_) {
  274. port::Thread p_t(&BGThreadWrapper,
  275. new BGThreadMetadata(this, bgthreads_.size()));
  276. // Set the thread name to aid debugging
  277. #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  278. #if __GLIBC_PREREQ(2, 12)
  279. auto th_handle = p_t.native_handle();
  280. std::string thread_priority = Env::PriorityToString(GetThreadPriority());
  281. std::ostringstream thread_name_stream;
  282. thread_name_stream << "rocksdb:";
  283. for (char c : thread_priority) {
  284. thread_name_stream << static_cast<char>(tolower(c));
  285. }
  286. thread_name_stream << bgthreads_.size();
  287. pthread_setname_np(th_handle, thread_name_stream.str().c_str());
  288. #endif
  289. #endif
  290. bgthreads_.push_back(std::move(p_t));
  291. }
  292. }
  293. void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
  294. std::function<void()>&& unschedule, void* tag) {
  295. std::lock_guard<std::mutex> lock(mu_);
  296. if (exit_all_threads_) {
  297. return;
  298. }
  299. StartBGThreads();
  300. // Add to priority queue
  301. queue_.push_back(BGItem());
  302. auto& item = queue_.back();
  303. item.tag = tag;
  304. item.function = std::move(schedule);
  305. item.unschedFunction = std::move(unschedule);
  306. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  307. std::memory_order_relaxed);
  308. if (!HasExcessiveThread()) {
  309. // Wake up at least one waiting thread.
  310. bgsignal_.notify_one();
  311. } else {
  312. // Need to wake up all threads to make sure the one woken
  313. // up is not the one to terminate.
  314. WakeUpAllThreads();
  315. }
  316. }
  317. int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
  318. int count = 0;
  319. std::vector<std::function<void()>> candidates;
  320. {
  321. std::lock_guard<std::mutex> lock(mu_);
  322. // Remove from priority queue
  323. BGQueue::iterator it = queue_.begin();
  324. while (it != queue_.end()) {
  325. if (arg == (*it).tag) {
  326. if (it->unschedFunction) {
  327. candidates.push_back(std::move(it->unschedFunction));
  328. }
  329. it = queue_.erase(it);
  330. count++;
  331. } else {
  332. ++it;
  333. }
  334. }
  335. queue_len_.store(static_cast<unsigned int>(queue_.size()),
  336. std::memory_order_relaxed);
  337. }
  338. // Run unschedule functions outside the mutex
  339. for (auto& f : candidates) {
  340. f();
  341. }
  342. return count;
  343. }
  344. ThreadPoolImpl::ThreadPoolImpl() :
  345. impl_(new Impl()) {
  346. }
  347. ThreadPoolImpl::~ThreadPoolImpl() {
  348. }
  349. void ThreadPoolImpl::JoinAllThreads() {
  350. impl_->JoinThreads(false);
  351. }
  352. void ThreadPoolImpl::SetBackgroundThreads(int num) {
  353. impl_->SetBackgroundThreadsInternal(num, true);
  354. }
  355. int ThreadPoolImpl::GetBackgroundThreads() {
  356. return impl_->GetBackgroundThreads();
  357. }
  358. unsigned int ThreadPoolImpl::GetQueueLen() const {
  359. return impl_->GetQueueLen();
  360. }
  361. void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
  362. impl_->JoinThreads(true);
  363. }
  364. void ThreadPoolImpl::LowerIOPriority() {
  365. impl_->LowerIOPriority();
  366. }
  367. void ThreadPoolImpl::LowerCPUPriority() {
  368. impl_->LowerCPUPriority();
  369. }
  370. void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
  371. impl_->SetBackgroundThreadsInternal(num, false);
  372. }
  373. void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
  374. auto copy(job);
  375. impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
  376. }
  377. void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
  378. impl_->Submit(std::move(job), std::function<void()>(), nullptr);
  379. }
  380. void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
  381. void* tag, void(*unschedFunction)(void* arg)) {
  382. if (unschedFunction == nullptr) {
  383. impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
  384. } else {
  385. impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
  386. tag);
  387. }
  388. }
  389. int ThreadPoolImpl::UnSchedule(void* arg) {
  390. return impl_->UnSchedule(arg);
  391. }
  392. void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
  393. Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
  394. // Return the thread priority.
  395. // This would allow its member-thread to know its priority.
  396. Env::Priority ThreadPoolImpl::GetThreadPriority() const {
  397. return impl_->GetThreadPriority();
  398. }
  399. // Set the thread priority.
  400. void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
  401. impl_->SetThreadPriority(priority);
  402. }
  403. ThreadPool* NewThreadPool(int num_threads) {
  404. ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
  405. thread_pool->SetBackgroundThreads(num_threads);
  406. return thread_pool;
  407. }
  408. } // namespace ROCKSDB_NAMESPACE