| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "util/threadpool_imp.h"#include "monitoring/thread_status_util.h"#include "port/port.h"#ifndef OS_WIN#  include <unistd.h>#endif#ifdef OS_LINUX#  include <sys/syscall.h>#  include <sys/resource.h>#endif#include <stdlib.h>#include <algorithm>#include <atomic>#include <condition_variable>#include <deque>#include <mutex>#include <sstream>#include <thread>#include <vector>namespace ROCKSDB_NAMESPACE {void ThreadPoolImpl::PthreadCall(const char* label, int result) {  if (result != 0) {    fprintf(stderr, "pthread %s: %s\n", label, strerror(result));    abort();  }}struct ThreadPoolImpl::Impl {  Impl();  ~Impl();  void JoinThreads(bool wait_for_jobs_to_complete);  void SetBackgroundThreadsInternal(int num, bool allow_reduce);  int GetBackgroundThreads();  unsigned int GetQueueLen() const {    return queue_len_.load(std::memory_order_relaxed);  }  void LowerIOPriority();  void LowerCPUPriority();  void WakeUpAllThreads() {    bgsignal_.notify_all();  }  void BGThread(size_t thread_id);  void StartBGThreads();  void Submit(std::function<void()>&& schedule,    std::function<void()>&& unschedule, void* tag);  int UnSchedule(void* arg);  void SetHostEnv(Env* env) { env_ = env; }  Env* GetHostEnv() const { return env_; }  bool HasExcessiveThread() const {    return static_cast<int>(bgthreads_.size()) > total_threads_limit_;  }  // Return true iff the current thread is the excessive thread to terminate.  // Always terminate the running thread that is added last, even if there are  // more than one thread to terminate.  bool IsLastExcessiveThread(size_t thread_id) const {    return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;  }  bool IsExcessiveThread(size_t thread_id) const {    return static_cast<int>(thread_id) >= total_threads_limit_;  }  // Return the thread priority.  // This would allow its member-thread to know its priority.  Env::Priority GetThreadPriority() const { return priority_; }  // Set the thread priority.  void SetThreadPriority(Env::Priority priority) { priority_ = priority; }private: static void BGThreadWrapper(void* arg); bool low_io_priority_; bool low_cpu_priority_; Env::Priority priority_; Env* env_; int total_threads_limit_; std::atomic_uint queue_len_;  // Queue length. Used for stats reporting bool exit_all_threads_; bool wait_for_jobs_to_complete_; // Entry per Schedule()/Submit() call struct BGItem {   void* tag = nullptr;   std::function<void()> function;   std::function<void()> unschedFunction;  };  using BGQueue = std::deque<BGItem>;  BGQueue       queue_;  std::mutex               mu_;  std::condition_variable  bgsignal_;  std::vector<port::Thread> bgthreads_;};inlineThreadPoolImpl::Impl::Impl()    :      low_io_priority_(false),      low_cpu_priority_(false),      priority_(Env::LOW),      env_(nullptr),      total_threads_limit_(0),      queue_len_(),      exit_all_threads_(false),      wait_for_jobs_to_complete_(false),      queue_(),      mu_(),      bgsignal_(),      bgthreads_() {}inlineThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {  std::unique_lock<std::mutex> lock(mu_);  assert(!exit_all_threads_);  wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;  exit_all_threads_ = true;  // prevent threads from being recreated right after they're joined, in case  // the user is concurrently submitting jobs.  total_threads_limit_ = 0;  lock.unlock();  bgsignal_.notify_all();  for (auto& th : bgthreads_) {    th.join();  }  bgthreads_.clear();  exit_all_threads_ = false;  wait_for_jobs_to_complete_ = false;}inlinevoid ThreadPoolImpl::Impl::LowerIOPriority() {  std::lock_guard<std::mutex> lock(mu_);  low_io_priority_ = true;}inlinevoid ThreadPoolImpl::Impl::LowerCPUPriority() {  std::lock_guard<std::mutex> lock(mu_);  low_cpu_priority_ = true;}void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {  bool low_io_priority = false;  bool low_cpu_priority = false;  while (true) {    // Wait until there is an item that is ready to run    std::unique_lock<std::mutex> lock(mu_);    // Stop waiting if the thread needs to do work or needs to terminate.    while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&           (queue_.empty() || IsExcessiveThread(thread_id))) {      bgsignal_.wait(lock);    }    if (exit_all_threads_) {  // mechanism to let BG threads exit safely      if (!wait_for_jobs_to_complete_ ||          queue_.empty()) {        break;       }    }    if (IsLastExcessiveThread(thread_id)) {      // Current thread is the last generated one and is excessive.      // We always terminate excessive thread in the reverse order of      // generation time.      auto& terminating_thread = bgthreads_.back();      terminating_thread.detach();      bgthreads_.pop_back();      if (HasExcessiveThread()) {        // There is still at least more excessive thread to terminate.        WakeUpAllThreads();      }      break;    }    auto func = std::move(queue_.front().function);    queue_.pop_front();    queue_len_.store(static_cast<unsigned int>(queue_.size()),                     std::memory_order_relaxed);    bool decrease_io_priority = (low_io_priority != low_io_priority_);    bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);    lock.unlock();#ifdef OS_LINUX    if (decrease_cpu_priority) {      setpriority(          PRIO_PROCESS,          // Current thread.          0,          // Lowest priority possible.          19);      low_cpu_priority = true;    }    if (decrease_io_priority) {#define IOPRIO_CLASS_SHIFT (13)#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)      // These system calls only have an effect when used in conjunction      // with an I/O scheduler that supports I/O priorities. As at      // kernel 2.6.17 the only such scheduler is the Completely      // Fair Queuing (CFQ) I/O scheduler.      // To change scheduler:      //  echo cfq > /sys/block/<device_name>/queue/schedule      // Tunables to consider:      //  /sys/block/<device_name>/queue/slice_idle      //  /sys/block/<device_name>/queue/slice_sync      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS              0,                  // current thread              IOPRIO_PRIO_VALUE(3, 0));      low_io_priority = true;    }#else    (void)decrease_io_priority;  // avoid 'unused variable' error    (void)decrease_cpu_priority;#endif    func();  }}// Helper struct for passing arguments when creating threads.struct BGThreadMetadata {  ThreadPoolImpl::Impl* thread_pool_;  size_t thread_id_;  // Thread count in the thread.  BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)      : thread_pool_(thread_pool), thread_id_(thread_id) {}};void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {  BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);  size_t thread_id = meta->thread_id_;  ThreadPoolImpl::Impl* tp = meta->thread_pool_;#ifdef ROCKSDB_USING_THREAD_STATUS  // initialize it because compiler isn't good enough to see we don't use it  // uninitialized  ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;  switch (tp->GetThreadPriority()) {    case Env::Priority::HIGH:      thread_type = ThreadStatus::HIGH_PRIORITY;      break;    case Env::Priority::LOW:      thread_type = ThreadStatus::LOW_PRIORITY;      break;    case Env::Priority::BOTTOM:      thread_type = ThreadStatus::BOTTOM_PRIORITY;      break;    case Env::Priority::USER:      thread_type = ThreadStatus::USER;      break;    case Env::Priority::TOTAL:      assert(false);      return;  }  assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);  ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);#endif  delete meta;  tp->BGThread(thread_id);#ifdef ROCKSDB_USING_THREAD_STATUS  ThreadStatusUtil::UnregisterThread();#endif  return;}void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,  bool allow_reduce) {  std::lock_guard<std::mutex> lock(mu_);  if (exit_all_threads_) {    return;  }  if (num > total_threads_limit_ ||      (num < total_threads_limit_ && allow_reduce)) {    total_threads_limit_ = std::max(0, num);    WakeUpAllThreads();    StartBGThreads();  }}int ThreadPoolImpl::Impl::GetBackgroundThreads() {  std::unique_lock<std::mutex> lock(mu_);  return total_threads_limit_;}void ThreadPoolImpl::Impl::StartBGThreads() {  // Start background thread if necessary  while ((int)bgthreads_.size() < total_threads_limit_) {    port::Thread p_t(&BGThreadWrapper,      new BGThreadMetadata(this, bgthreads_.size()));// Set the thread name to aid debugging#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)#if __GLIBC_PREREQ(2, 12)    auto th_handle = p_t.native_handle();    std::string thread_priority = Env::PriorityToString(GetThreadPriority());    std::ostringstream thread_name_stream;    thread_name_stream << "rocksdb:";    for (char c : thread_priority) {      thread_name_stream << static_cast<char>(tolower(c));    }    thread_name_stream << bgthreads_.size();    pthread_setname_np(th_handle, thread_name_stream.str().c_str());#endif#endif    bgthreads_.push_back(std::move(p_t));  }}void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,  std::function<void()>&& unschedule, void* tag) {  std::lock_guard<std::mutex> lock(mu_);  if (exit_all_threads_) {    return;  }  StartBGThreads();  // Add to priority queue  queue_.push_back(BGItem());  auto& item = queue_.back();  item.tag = tag;  item.function = std::move(schedule);  item.unschedFunction = std::move(unschedule);  queue_len_.store(static_cast<unsigned int>(queue_.size()),    std::memory_order_relaxed);  if (!HasExcessiveThread()) {    // Wake up at least one waiting thread.    bgsignal_.notify_one();  } else {    // Need to wake up all threads to make sure the one woken    // up is not the one to terminate.    WakeUpAllThreads();  }}int ThreadPoolImpl::Impl::UnSchedule(void* arg) {  int count = 0;  std::vector<std::function<void()>> candidates;  {    std::lock_guard<std::mutex> lock(mu_);    // Remove from priority queue    BGQueue::iterator it = queue_.begin();    while (it != queue_.end()) {      if (arg == (*it).tag) {        if (it->unschedFunction) {          candidates.push_back(std::move(it->unschedFunction));        }        it = queue_.erase(it);        count++;      } else {        ++it;      }    }    queue_len_.store(static_cast<unsigned int>(queue_.size()),      std::memory_order_relaxed);  } // Run unschedule functions outside the mutex  for (auto& f : candidates) {    f();  }  return count;}ThreadPoolImpl::ThreadPoolImpl() :  impl_(new Impl()) {}ThreadPoolImpl::~ThreadPoolImpl() {}void ThreadPoolImpl::JoinAllThreads() {  impl_->JoinThreads(false);}void ThreadPoolImpl::SetBackgroundThreads(int num) {  impl_->SetBackgroundThreadsInternal(num, true);}int ThreadPoolImpl::GetBackgroundThreads() {  return impl_->GetBackgroundThreads();}unsigned int ThreadPoolImpl::GetQueueLen() const {  return impl_->GetQueueLen();}void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {  impl_->JoinThreads(true);}void ThreadPoolImpl::LowerIOPriority() {  impl_->LowerIOPriority();}void ThreadPoolImpl::LowerCPUPriority() {  impl_->LowerCPUPriority();}void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {  impl_->SetBackgroundThreadsInternal(num, false);}void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {  auto copy(job);  impl_->Submit(std::move(copy), std::function<void()>(), nullptr);}void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {  impl_->Submit(std::move(job), std::function<void()>(), nullptr);}void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,  void* tag, void(*unschedFunction)(void* arg)) {  if (unschedFunction == nullptr) {    impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);  } else {    impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),                  tag);  }}int ThreadPoolImpl::UnSchedule(void* arg) {  return impl_->UnSchedule(arg);}void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }// Return the thread priority.// This would allow its member-thread to know its priority.Env::Priority ThreadPoolImpl::GetThreadPriority() const {  return impl_->GetThreadPriority();}// Set the thread priority.void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {  impl_->SetThreadPriority(priority);}ThreadPool* NewThreadPool(int num_threads) {  ThreadPoolImpl* thread_pool = new ThreadPoolImpl();  thread_pool->SetBackgroundThreads(num_threads);  return thread_pool;}}  // namespace ROCKSDB_NAMESPACE
 |