| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "env/mock_env.h"#include <algorithm>#include <chrono>#include "port/sys_time.h"#include "util/cast_util.h"#include "util/murmurhash.h"#include "util/random.h"#include "util/rate_limiter.h"namespace ROCKSDB_NAMESPACE {class MemFile { public:  explicit MemFile(Env* env, const std::string& fn, bool _is_lock_file = false)      : env_(env),        fn_(fn),        refs_(0),        is_lock_file_(_is_lock_file),        locked_(false),        size_(0),        modified_time_(Now()),        rnd_(static_cast<uint32_t>(            MurmurHash(fn.data(), static_cast<int>(fn.size()), 0))),        fsynced_bytes_(0) {}  // No copying allowed.  MemFile(const MemFile&) = delete;  void operator=(const MemFile&) = delete;  void Ref() {    MutexLock lock(&mutex_);    ++refs_;  }  bool is_lock_file() const { return is_lock_file_; }  bool Lock() {    assert(is_lock_file_);    MutexLock lock(&mutex_);    if (locked_) {      return false;    } else {      locked_ = true;      return true;    }  }  void Unlock() {    assert(is_lock_file_);    MutexLock lock(&mutex_);    locked_ = false;  }  void Unref() {    bool do_delete = false;    {      MutexLock lock(&mutex_);      --refs_;      assert(refs_ >= 0);      if (refs_ <= 0) {        do_delete = true;      }    }    if (do_delete) {      delete this;    }  }  uint64_t Size() const { return size_; }  void Truncate(size_t size) {    MutexLock lock(&mutex_);    if (size < size_) {      data_.resize(size);      size_ = size;    }  }  void CorruptBuffer() {    if (fsynced_bytes_ >= size_) {      return;    }    uint64_t buffered_bytes = size_ - fsynced_bytes_;    uint64_t start =        fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes));    uint64_t end = std::min(start + 512, size_.load());    MutexLock lock(&mutex_);    for (uint64_t pos = start; pos < end; ++pos) {      data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256));    }  }  Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {    MutexLock lock(&mutex_);    const uint64_t available = Size() - std::min(Size(), offset);    size_t offset_ = static_cast<size_t>(offset);    if (n > available) {      n = static_cast<size_t>(available);    }    if (n == 0) {      *result = Slice();      return Status::OK();    }    if (scratch) {      memcpy(scratch, &(data_[offset_]), n);      *result = Slice(scratch, n);    } else {      *result = Slice(&(data_[offset_]), n);    }    return Status::OK();  }  Status Write(uint64_t offset, const Slice& data) {    MutexLock lock(&mutex_);    size_t offset_ = static_cast<size_t>(offset);    if (offset + data.size() > data_.size()) {      data_.resize(offset_ + data.size());    }    data_.replace(offset_, data.size(), data.data(), data.size());    size_ = data_.size();    modified_time_ = Now();    return Status::OK();  }  Status Append(const Slice& data) {    MutexLock lock(&mutex_);    data_.append(data.data(), data.size());    size_ = data_.size();    modified_time_ = Now();    return Status::OK();  }  Status Fsync() {    fsynced_bytes_ = size_.load();    return Status::OK();  }  uint64_t ModifiedTime() const { return modified_time_; } private:  uint64_t Now() {    int64_t unix_time = 0;    auto s = env_->GetCurrentTime(&unix_time);    assert(s.ok());    return static_cast<uint64_t>(unix_time);  }  // Private since only Unref() should be used to delete it.  ~MemFile() { assert(refs_ == 0); }  Env* env_;  const std::string fn_;  mutable port::Mutex mutex_;  int refs_;  bool is_lock_file_;  bool locked_;  // Data written into this file, all bytes before fsynced_bytes are  // persistent.  std::string data_;  std::atomic<uint64_t> size_;  std::atomic<uint64_t> modified_time_;  Random rnd_;  std::atomic<uint64_t> fsynced_bytes_;};namespace {class MockSequentialFile : public SequentialFile { public:  explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) {    file_->Ref();  }  ~MockSequentialFile() override { file_->Unref(); }  Status Read(size_t n, Slice* result, char* scratch) override {    Status s = file_->Read(pos_, n, result, scratch);    if (s.ok()) {      pos_ += result->size();    }    return s;  }  Status Skip(uint64_t n) override {    if (pos_ > file_->Size()) {      return Status::IOError("pos_ > file_->Size()");    }    const uint64_t available = file_->Size() - pos_;    if (n > available) {      n = available;    }    pos_ += static_cast<size_t>(n);    return Status::OK();  } private:  MemFile* file_;  size_t pos_;};class MockRandomAccessFile : public RandomAccessFile { public:  explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); }  ~MockRandomAccessFile() override { file_->Unref(); }  Status Read(uint64_t offset, size_t n, Slice* result,              char* scratch) const override {    return file_->Read(offset, n, result, scratch);  } private:  MemFile* file_;};class MockRandomRWFile : public RandomRWFile { public:  explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }  ~MockRandomRWFile() override { file_->Unref(); }  Status Write(uint64_t offset, const Slice& data) override {    return file_->Write(offset, data);  }  Status Read(uint64_t offset, size_t n, Slice* result,              char* scratch) const override {    return file_->Read(offset, n, result, scratch);  }  Status Close() override { return file_->Fsync(); }  Status Flush() override { return Status::OK(); }  Status Sync() override { return file_->Fsync(); } private:  MemFile* file_;};class MockWritableFile : public WritableFile { public:  MockWritableFile(MemFile* file, RateLimiter* rate_limiter)      : file_(file), rate_limiter_(rate_limiter) {    file_->Ref();  }  ~MockWritableFile() override { file_->Unref(); }  Status Append(const Slice& data) override {    size_t bytes_written = 0;    while (bytes_written < data.size()) {      auto bytes = RequestToken(data.size() - bytes_written);      Status s = file_->Append(Slice(data.data() + bytes_written, bytes));      if (!s.ok()) {        return s;      }      bytes_written += bytes;    }    return Status::OK();  }  Status Truncate(uint64_t size) override {    file_->Truncate(static_cast<size_t>(size));    return Status::OK();  }  Status Close() override { return file_->Fsync(); }  Status Flush() override { return Status::OK(); }  Status Sync() override { return file_->Fsync(); }  uint64_t GetFileSize() override { return file_->Size(); } private:  inline size_t RequestToken(size_t bytes) {    if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {      bytes = std::min(          bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));      rate_limiter_->Request(bytes, io_priority_);    }    return bytes;  }  MemFile* file_;  RateLimiter* rate_limiter_;};class MockEnvDirectory : public Directory { public:  Status Fsync() override { return Status::OK(); }};class MockEnvFileLock : public FileLock { public:  explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {}  std::string FileName() const { return fname_; } private:  const std::string fname_;};class TestMemLogger : public Logger { private:  std::unique_ptr<WritableFile> file_;  std::atomic_size_t log_size_;  static const uint64_t flush_every_seconds_ = 5;  std::atomic_uint_fast64_t last_flush_micros_;  Env* env_;  std::atomic<bool> flush_pending_; public:  TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,                const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)      : Logger(log_level),        file_(std::move(f)),        log_size_(0),        last_flush_micros_(0),        env_(env),        flush_pending_(false) {}  ~TestMemLogger() override {}  void Flush() override {    if (flush_pending_) {      flush_pending_ = false;    }    last_flush_micros_ = env_->NowMicros();  }  using Logger::Logv;  void Logv(const char* format, va_list ap) override {    // We try twice: the first time with a fixed-size stack allocated buffer,    // and the second time with a much larger dynamically allocated buffer.    char buffer[500];    for (int iter = 0; iter < 2; iter++) {      char* base;      int bufsize;      if (iter == 0) {        bufsize = sizeof(buffer);        base = buffer;      } else {        bufsize = 30000;        base = new char[bufsize];      }      char* p = base;      char* limit = base + bufsize;      struct timeval now_tv;      gettimeofday(&now_tv, nullptr);      const time_t seconds = now_tv.tv_sec;      struct tm t;      memset(&t, 0, sizeof(t));      struct tm* ret __attribute__((__unused__));      ret = localtime_r(&seconds, &t);      assert(ret);      p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ",                    t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,                    t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec));      // Print the message      if (p < limit) {        va_list backup_ap;        va_copy(backup_ap, ap);        p += vsnprintf(p, limit - p, format, backup_ap);        va_end(backup_ap);      }      // Truncate to available space if necessary      if (p >= limit) {        if (iter == 0) {          continue;  // Try again with larger buffer        } else {          p = limit - 1;        }      }      // Add newline if necessary      if (p == base || p[-1] != '\n') {        *p++ = '\n';      }      assert(p <= limit);      const size_t write_size = p - base;      file_->Append(Slice(base, write_size));      flush_pending_ = true;      log_size_ += write_size;      uint64_t now_micros =          static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;      if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {        flush_pending_ = false;        last_flush_micros_ = now_micros;      }      if (base != buffer) {        delete[] base;      }      break;    }  }  size_t GetLogFileSize() const override { return log_size_; }};}  // Anonymous namespaceMockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {}MockEnv::~MockEnv() {  for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) {    i->second->Unref();  }}// Partial implementation of the Env interface.Status MockEnv::NewSequentialFile(const std::string& fname,                                  std::unique_ptr<SequentialFile>* result,                                  const EnvOptions& /*soptions*/) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) == file_map_.end()) {    *result = nullptr;    return Status::IOError(fn, "File not found");  }  auto* f = file_map_[fn];  if (f->is_lock_file()) {    return Status::InvalidArgument(fn, "Cannot open a lock file.");  }  result->reset(new MockSequentialFile(f));  return Status::OK();}Status MockEnv::NewRandomAccessFile(const std::string& fname,                                    std::unique_ptr<RandomAccessFile>* result,                                    const EnvOptions& /*soptions*/) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) == file_map_.end()) {    *result = nullptr;    return Status::IOError(fn, "File not found");  }  auto* f = file_map_[fn];  if (f->is_lock_file()) {    return Status::InvalidArgument(fn, "Cannot open a lock file.");  }  result->reset(new MockRandomAccessFile(f));  return Status::OK();}Status MockEnv::NewRandomRWFile(const std::string& fname,                                std::unique_ptr<RandomRWFile>* result,                                const EnvOptions& /*soptions*/) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) == file_map_.end()) {    *result = nullptr;    return Status::IOError(fn, "File not found");  }  auto* f = file_map_[fn];  if (f->is_lock_file()) {    return Status::InvalidArgument(fn, "Cannot open a lock file.");  }  result->reset(new MockRandomRWFile(f));  return Status::OK();}Status MockEnv::ReuseWritableFile(const std::string& fname,                                  const std::string& old_fname,                                  std::unique_ptr<WritableFile>* result,                                  const EnvOptions& options) {  auto s = RenameFile(old_fname, fname);  if (!s.ok()) {    return s;  }  result->reset();  return NewWritableFile(fname, result, options);}Status MockEnv::NewWritableFile(const std::string& fname,                                std::unique_ptr<WritableFile>* result,                                const EnvOptions& env_options) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) != file_map_.end()) {    DeleteFileInternal(fn);  }  MemFile* file = new MemFile(this, fn, false);  file->Ref();  file_map_[fn] = file;  result->reset(new MockWritableFile(file, env_options.rate_limiter));  return Status::OK();}Status MockEnv::NewDirectory(const std::string& /*name*/,                             std::unique_ptr<Directory>* result) {  result->reset(new MockEnvDirectory());  return Status::OK();}Status MockEnv::FileExists(const std::string& fname) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) != file_map_.end()) {    // File exists    return Status::OK();  }  // Now also check if fn exists as a dir  for (const auto& iter : file_map_) {    const std::string& filename = iter.first;    if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&        Slice(filename).starts_with(Slice(fn))) {      return Status::OK();    }  }  return Status::NotFound();}Status MockEnv::GetChildren(const std::string& dir,                            std::vector<std::string>* result) {  auto d = NormalizePath(dir);  bool found_dir = false;  {    MutexLock lock(&mutex_);    result->clear();    for (const auto& iter : file_map_) {      const std::string& filename = iter.first;      if (filename == d) {        found_dir = true;      } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&                 Slice(filename).starts_with(Slice(d))) {        found_dir = true;        size_t next_slash = filename.find('/', d.size() + 1);        if (next_slash != std::string::npos) {          result->push_back(              filename.substr(d.size() + 1, next_slash - d.size() - 1));        } else {          result->push_back(filename.substr(d.size() + 1));        }      }    }  }  result->erase(std::unique(result->begin(), result->end()), result->end());  return found_dir ? Status::OK() : Status::NotFound();}void MockEnv::DeleteFileInternal(const std::string& fname) {  assert(fname == NormalizePath(fname));  const auto& pair = file_map_.find(fname);  if (pair != file_map_.end()) {    pair->second->Unref();    file_map_.erase(fname);  }}Status MockEnv::DeleteFile(const std::string& fname) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  if (file_map_.find(fn) == file_map_.end()) {    return Status::IOError(fn, "File not found");  }  DeleteFileInternal(fn);  return Status::OK();}Status MockEnv::Truncate(const std::string& fname, size_t size) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  auto iter = file_map_.find(fn);  if (iter == file_map_.end()) {    return Status::IOError(fn, "File not found");  }  iter->second->Truncate(size);  return Status::OK();}Status MockEnv::CreateDir(const std::string& dirname) {  auto dn = NormalizePath(dirname);  if (file_map_.find(dn) == file_map_.end()) {    MemFile* file = new MemFile(this, dn, false);    file->Ref();    file_map_[dn] = file;  } else {    return Status::IOError();  }  return Status::OK();}Status MockEnv::CreateDirIfMissing(const std::string& dirname) {  CreateDir(dirname);  return Status::OK();}Status MockEnv::DeleteDir(const std::string& dirname) {  return DeleteFile(dirname);}Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  auto iter = file_map_.find(fn);  if (iter == file_map_.end()) {    return Status::IOError(fn, "File not found");  }  *file_size = iter->second->Size();  return Status::OK();}Status MockEnv::GetFileModificationTime(const std::string& fname,                                        uint64_t* time) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  auto iter = file_map_.find(fn);  if (iter == file_map_.end()) {    return Status::IOError(fn, "File not found");  }  *time = iter->second->ModifiedTime();  return Status::OK();}Status MockEnv::RenameFile(const std::string& src, const std::string& dest) {  auto s = NormalizePath(src);  auto t = NormalizePath(dest);  MutexLock lock(&mutex_);  if (file_map_.find(s) == file_map_.end()) {    return Status::IOError(s, "File not found");  }  DeleteFileInternal(t);  file_map_[t] = file_map_[s];  file_map_.erase(s);  return Status::OK();}Status MockEnv::LinkFile(const std::string& src, const std::string& dest) {  auto s = NormalizePath(src);  auto t = NormalizePath(dest);  MutexLock lock(&mutex_);  if (file_map_.find(s) == file_map_.end()) {    return Status::IOError(s, "File not found");  }  DeleteFileInternal(t);  file_map_[t] = file_map_[s];  file_map_[t]->Ref();  // Otherwise it might get deleted when noone uses s  return Status::OK();}Status MockEnv::NewLogger(const std::string& fname,                          std::shared_ptr<Logger>* result) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  auto iter = file_map_.find(fn);  MemFile* file = nullptr;  if (iter == file_map_.end()) {    file = new MemFile(this, fn, false);    file->Ref();    file_map_[fn] = file;  } else {    file = iter->second;  }  std::unique_ptr<WritableFile> f(new MockWritableFile(file, nullptr));  result->reset(new TestMemLogger(std::move(f), this));  return Status::OK();}Status MockEnv::LockFile(const std::string& fname, FileLock** flock) {  auto fn = NormalizePath(fname);  {    MutexLock lock(&mutex_);    if (file_map_.find(fn) != file_map_.end()) {      if (!file_map_[fn]->is_lock_file()) {        return Status::InvalidArgument(fname, "Not a lock file.");      }      if (!file_map_[fn]->Lock()) {        return Status::IOError(fn, "Lock is already held.");      }    } else {      auto* file = new MemFile(this, fn, true);      file->Ref();      file->Lock();      file_map_[fn] = file;    }  }  *flock = new MockEnvFileLock(fn);  return Status::OK();}Status MockEnv::UnlockFile(FileLock* flock) {  std::string fn =      static_cast_with_check<MockEnvFileLock, FileLock>(flock)->FileName();  {    MutexLock lock(&mutex_);    if (file_map_.find(fn) != file_map_.end()) {      if (!file_map_[fn]->is_lock_file()) {        return Status::InvalidArgument(fn, "Not a lock file.");      }      file_map_[fn]->Unlock();    }  }  delete flock;  return Status::OK();}Status MockEnv::GetTestDirectory(std::string* path) {  *path = "/test";  return Status::OK();}Status MockEnv::GetCurrentTime(int64_t* unix_time) {  auto s = EnvWrapper::GetCurrentTime(unix_time);  if (s.ok()) {    *unix_time += fake_sleep_micros_.load() / (1000 * 1000);  }  return s;}uint64_t MockEnv::NowMicros() {  return EnvWrapper::NowMicros() + fake_sleep_micros_.load();}uint64_t MockEnv::NowNanos() {  return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;}Status MockEnv::CorruptBuffer(const std::string& fname) {  auto fn = NormalizePath(fname);  MutexLock lock(&mutex_);  auto iter = file_map_.find(fn);  if (iter == file_map_.end()) {    return Status::IOError(fn, "File not found");  }  iter->second->CorruptBuffer();  return Status::OK();}std::string MockEnv::NormalizePath(const std::string path) {  std::string dst;  for (auto c : path) {    if (!dst.empty() && c == '/' && dst.back() == '/') {      continue;    }    dst.push_back(c);  }  return dst;}void MockEnv::FakeSleepForMicroseconds(int64_t micros) {  fake_sleep_micros_.fetch_add(micros);}#ifndef ROCKSDB_LITE// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnvEnv* NewMemEnv(Env* base_env) { return new MockEnv(base_env); }#else  // ROCKSDB_LITEEnv* NewMemEnv(Env* /*base_env*/) { return nullptr; }#endif  // !ROCKSDB_LITE}  // namespace ROCKSDB_NAMESPACE
 |