| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#pragma once#include <functional>#include <string>#include "port/port.h"#include "rocksdb/env.h"#include "test_util/mock_time_env.h"#include "util/mutexlock.h"namespace ROCKSDB_NAMESPACE {// Simple wrapper around port::Thread that supports calling a callback every// X seconds. If you pass in 0, then it will call your callback repeatedly// without delay.class RepeatableThread { public:  RepeatableThread(std::function<void()> function,                   const std::string& thread_name, Env* env, uint64_t delay_us,                   uint64_t initial_delay_us = 0)      : function_(function),        thread_name_("rocksdb:" + thread_name),        env_(env),        delay_us_(delay_us),        initial_delay_us_(initial_delay_us),        mutex_(env),        cond_var_(&mutex_),        running_(true),#ifndef NDEBUG        waiting_(false),        run_count_(0),#endif        thread_([this] { thread(); }) {  }  void cancel() {    {      InstrumentedMutexLock l(&mutex_);      if (!running_) {        return;      }      running_ = false;      cond_var_.SignalAll();    }    thread_.join();  }  bool IsRunning() { return running_; }  ~RepeatableThread() { cancel(); }#ifndef NDEBUG  // Wait until RepeatableThread starting waiting, call the optional callback,  // then wait for one run of RepeatableThread. Tests can use provide a  // custom env object to mock time, and use the callback here to bump current  // time and trigger RepeatableThread. See repeatable_thread_test for example.  //  // Note: only support one caller of this method.  void TEST_WaitForRun(std::function<void()> callback = nullptr) {    InstrumentedMutexLock l(&mutex_);    while (!waiting_) {      cond_var_.Wait();    }    uint64_t prev_count = run_count_;    if (callback != nullptr) {      callback();    }    cond_var_.SignalAll();    while (!(run_count_ > prev_count)) {      cond_var_.Wait();    }  }#endif private:  bool wait(uint64_t delay) {    InstrumentedMutexLock l(&mutex_);    if (running_ && delay > 0) {      uint64_t wait_until = env_->NowMicros() + delay;#ifndef NDEBUG      waiting_ = true;      cond_var_.SignalAll();#endif      while (running_) {        cond_var_.TimedWait(wait_until);        if (env_->NowMicros() >= wait_until) {          break;        }      }#ifndef NDEBUG      waiting_ = false;#endif    }    return running_;  }  void thread() {#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)#if __GLIBC_PREREQ(2, 12)    // Set thread name.    auto thread_handle = thread_.native_handle();    int ret __attribute__((__unused__)) =        pthread_setname_np(thread_handle, thread_name_.c_str());    assert(ret == 0);#endif#endif    assert(delay_us_ > 0);    if (!wait(initial_delay_us_)) {      return;    }    do {      function_();#ifndef NDEBUG      {        InstrumentedMutexLock l(&mutex_);        run_count_++;        cond_var_.SignalAll();      }#endif    } while (wait(delay_us_));  }  const std::function<void()> function_;  const std::string thread_name_;  Env* const env_;  const uint64_t delay_us_;  const uint64_t initial_delay_us_;  // Mutex lock should be held when accessing running_, waiting_  // and run_count_.  InstrumentedMutex mutex_;  InstrumentedCondVar cond_var_;  bool running_;#ifndef NDEBUG  // RepeatableThread waiting for timeout.  bool waiting_;  // Times function_ had run.  uint64_t run_count_;#endif  port::Thread thread_;};}  // namespace ROCKSDB_NAMESPACE
 |