| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #include <stdint.h>
- #include <atomic>
- #include <memory>
- #include "rocksdb/rate_limiter.h"
- namespace ROCKSDB_NAMESPACE {
- class Env;
- class WriteControllerToken;
- // WriteController is controlling write stalls in our write code-path. Write
- // stalls happen when compaction can't keep up with write rate.
- // All of the methods here (including WriteControllerToken's destructors) need
- // to be called while holding DB mutex
- class WriteController {
- public:
- explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
- int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
- : total_stopped_(0),
- total_delayed_(0),
- total_compaction_pressure_(0),
- bytes_left_(0),
- last_refill_time_(0),
- low_pri_rate_limiter_(
- NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
- set_max_delayed_write_rate(_delayed_write_rate);
- }
- ~WriteController() = default;
- // When an actor (column family) requests a stop token, all writes will be
- // stopped until the stop token is released (deleted)
- std::unique_ptr<WriteControllerToken> GetStopToken();
- // When an actor (column family) requests a delay token, total delay for all
- // writes to the DB will be controlled under the delayed write rate. Every
- // write needs to call GetDelay() with number of bytes writing to the DB,
- // which returns number of microseconds to sleep.
- std::unique_ptr<WriteControllerToken> GetDelayToken(
- uint64_t delayed_write_rate);
- // When an actor (column family) requests a moderate token, compaction
- // threads will be increased
- std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
- // these three metods are querying the state of the WriteController
- bool IsStopped() const;
- bool NeedsDelay() const { return total_delayed_.load() > 0; }
- bool NeedSpeedupCompaction() const {
- return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
- }
- // return how many microseconds the caller needs to sleep after the call
- // num_bytes: how many number of bytes to put into the DB.
- // Prerequisite: DB mutex held.
- uint64_t GetDelay(Env* env, uint64_t num_bytes);
- void set_delayed_write_rate(uint64_t write_rate) {
- // avoid divide 0
- if (write_rate == 0) {
- write_rate = 1u;
- } else if (write_rate > max_delayed_write_rate()) {
- write_rate = max_delayed_write_rate();
- }
- delayed_write_rate_ = write_rate;
- }
- void set_max_delayed_write_rate(uint64_t write_rate) {
- // avoid divide 0
- if (write_rate == 0) {
- write_rate = 1u;
- }
- max_delayed_write_rate_ = write_rate;
- // update delayed_write_rate_ as well
- delayed_write_rate_ = write_rate;
- }
- uint64_t delayed_write_rate() const { return delayed_write_rate_; }
- uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
- RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
- private:
- uint64_t NowMicrosMonotonic(Env* env);
- friend class WriteControllerToken;
- friend class StopWriteToken;
- friend class DelayWriteToken;
- friend class CompactionPressureToken;
- std::atomic<int> total_stopped_;
- std::atomic<int> total_delayed_;
- std::atomic<int> total_compaction_pressure_;
- uint64_t bytes_left_;
- uint64_t last_refill_time_;
- // write rate set when initialization or by `DBImpl::SetDBOptions`
- uint64_t max_delayed_write_rate_;
- // current write rate
- uint64_t delayed_write_rate_;
- std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
- };
- class WriteControllerToken {
- public:
- explicit WriteControllerToken(WriteController* controller)
- : controller_(controller) {}
- virtual ~WriteControllerToken() {}
- protected:
- WriteController* controller_;
- private:
- // no copying allowed
- WriteControllerToken(const WriteControllerToken&) = delete;
- void operator=(const WriteControllerToken&) = delete;
- };
- class StopWriteToken : public WriteControllerToken {
- public:
- explicit StopWriteToken(WriteController* controller)
- : WriteControllerToken(controller) {}
- virtual ~StopWriteToken();
- };
- class DelayWriteToken : public WriteControllerToken {
- public:
- explicit DelayWriteToken(WriteController* controller)
- : WriteControllerToken(controller) {}
- virtual ~DelayWriteToken();
- };
- class CompactionPressureToken : public WriteControllerToken {
- public:
- explicit CompactionPressureToken(WriteController* controller)
- : WriteControllerToken(controller) {}
- virtual ~CompactionPressureToken();
- };
- } // namespace ROCKSDB_NAMESPACE
|