| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors
- #include <dirent.h>
- #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
- #include <dlfcn.h>
- #endif
- #include <errno.h>
- #include <fcntl.h>
- #if defined(OS_LINUX)
- #include <linux/fs.h>
- #endif
- #if defined(ROCKSDB_IOURING_PRESENT)
- #include <liburing.h>
- #endif
- #include <pthread.h>
- #include <signal.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/ioctl.h>
- #include <sys/mman.h>
- #include <sys/stat.h>
- #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
- #include <sys/statfs.h>
- #include <sys/syscall.h>
- #include <sys/sysmacros.h>
- #endif
- #include <sys/statvfs.h>
- #include <sys/time.h>
- #include <sys/types.h>
- #if defined(ROCKSDB_IOURING_PRESENT)
- #include <sys/uio.h>
- #endif
- #include <time.h>
- #include <algorithm>
- // Get nano time includes
- #if defined(OS_LINUX) || defined(OS_FREEBSD)
- #elif defined(__MACH__)
- #include <Availability.h>
- #include <mach/clock.h>
- #include <mach/mach.h>
- #else
- #include <chrono>
- #endif
- #include <deque>
- #include <set>
- #include <vector>
- #include "env/composite_env_wrapper.h"
- #include "env/io_posix.h"
- #include "logging/logging.h"
- #include "logging/posix_logger.h"
- #include "monitoring/iostats_context_imp.h"
- #include "monitoring/thread_status_updater.h"
- #include "port/port.h"
- #include "rocksdb/options.h"
- #include "rocksdb/slice.h"
- #include "test_util/sync_point.h"
- #include "util/coding.h"
- #include "util/compression_context_cache.h"
- #include "util/random.h"
- #include "util/string_util.h"
- #include "util/thread_local.h"
- #include "util/threadpool_imp.h"
- #if !defined(TMPFS_MAGIC)
- #define TMPFS_MAGIC 0x01021994
- #endif
- #if !defined(XFS_SUPER_MAGIC)
- #define XFS_SUPER_MAGIC 0x58465342
- #endif
- #if !defined(EXT4_SUPER_MAGIC)
- #define EXT4_SUPER_MAGIC 0xEF53
- #endif
- namespace ROCKSDB_NAMESPACE {
- #if defined(OS_WIN)
- static const std::string kSharedLibExt = ".dll";
- static const char kPathSeparator = ';';
- #else
- static const char kPathSeparator = ':';
- #if defined(OS_MACOSX)
- static const std::string kSharedLibExt = ".dylib";
- #else
- static const std::string kSharedLibExt = ".so";
- #endif
- #endif
- namespace {
- ThreadStatusUpdater* CreateThreadStatusUpdater() {
- return new ThreadStatusUpdater();
- }
- #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
- class PosixDynamicLibrary : public DynamicLibrary {
- public:
- PosixDynamicLibrary(const std::string& name, void* handle)
- : name_(name), handle_(handle) {}
- ~PosixDynamicLibrary() override { dlclose(handle_); }
- Status LoadSymbol(const std::string& sym_name, void** func) override {
- assert(nullptr != func);
- dlerror(); // Clear any old error
- *func = dlsym(handle_, sym_name.c_str());
- if (*func != nullptr) {
- return Status::OK();
- } else {
- char* err = dlerror();
- return Status::NotFound("Error finding symbol: " + sym_name, err);
- }
- }
- const char* Name() const override { return name_.c_str(); }
- private:
- std::string name_;
- void* handle_;
- };
- #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
- class PosixEnv : public CompositeEnvWrapper {
- public:
- PosixEnv();
- ~PosixEnv() override {
- for (const auto tid : threads_to_join_) {
- pthread_join(tid, nullptr);
- }
- for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
- thread_pools_[pool_id].JoinAllThreads();
- }
- // Delete the thread_status_updater_ only when the current Env is not
- // Env::Default(). This is to avoid the free-after-use error when
- // Env::Default() is destructed while some other child threads are
- // still trying to update thread status.
- if (this != Env::Default()) {
- delete thread_status_updater_;
- }
- }
- void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
- if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
- fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
- }
- }
- #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
- // Loads the named library into the result.
- // If the input name is empty, the current executable is loaded
- // On *nix systems, a "lib" prefix is added to the name if one is not supplied
- // Comparably, the appropriate shared library extension is added to the name
- // if not supplied. If search_path is not specified, the shared library will
- // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
- // specified, the shared library will be searched for in the directories
- // provided by the search path
- Status LoadLibrary(const std::string& name, const std::string& path,
- std::shared_ptr<DynamicLibrary>* result) override {
- Status status;
- assert(result != nullptr);
- if (name.empty()) {
- void* hndl = dlopen(NULL, RTLD_NOW);
- if (hndl != nullptr) {
- result->reset(new PosixDynamicLibrary(name, hndl));
- return Status::OK();
- }
- } else {
- std::string library_name = name;
- if (library_name.find(kSharedLibExt) == std::string::npos) {
- library_name = library_name + kSharedLibExt;
- }
- #if !defined(OS_WIN)
- if (library_name.find('/') == std::string::npos &&
- library_name.compare(0, 3, "lib") != 0) {
- library_name = "lib" + library_name;
- }
- #endif
- if (path.empty()) {
- void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
- if (hndl != nullptr) {
- result->reset(new PosixDynamicLibrary(library_name, hndl));
- return Status::OK();
- }
- } else {
- std::string local_path;
- std::stringstream ss(path);
- while (getline(ss, local_path, kPathSeparator)) {
- if (!path.empty()) {
- std::string full_name = local_path + "/" + library_name;
- void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
- if (hndl != nullptr) {
- result->reset(new PosixDynamicLibrary(full_name, hndl));
- return Status::OK();
- }
- }
- }
- }
- }
- return Status::IOError(
- IOErrorMsg("Failed to open shared library: xs", name), dlerror());
- }
- #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
- void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
- void* tag = nullptr,
- void (*unschedFunction)(void* arg) = nullptr) override;
- int UnSchedule(void* arg, Priority pri) override;
- void StartThread(void (*function)(void* arg), void* arg) override;
- void WaitForJoin() override;
- unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
- Status GetTestDirectory(std::string* result) override {
- const char* env = getenv("TEST_TMPDIR");
- if (env && env[0] != '\0') {
- *result = env;
- } else {
- char buf[100];
- snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
- *result = buf;
- }
- // Directory may already exist
- CreateDir(*result);
- return Status::OK();
- }
- Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
- assert(thread_status_updater_);
- return thread_status_updater_->GetThreadList(thread_list);
- }
- static uint64_t gettid(pthread_t tid) {
- uint64_t thread_id = 0;
- memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
- return thread_id;
- }
- static uint64_t gettid() {
- pthread_t tid = pthread_self();
- return gettid(tid);
- }
- uint64_t GetThreadID() const override { return gettid(pthread_self()); }
- Status NewLogger(const std::string& fname,
- std::shared_ptr<Logger>* result) override {
- FILE* f;
- {
- IOSTATS_TIMER_GUARD(open_nanos);
- f = fopen(fname.c_str(),
- "w"
- #ifdef __GLIBC_PREREQ
- #if __GLIBC_PREREQ(2, 7)
- "e" // glibc extension to enable O_CLOEXEC
- #endif
- #endif
- );
- }
- if (f == nullptr) {
- result->reset();
- return IOError("when fopen a file for new logger", fname, errno);
- } else {
- int fd = fileno(f);
- #ifdef ROCKSDB_FALLOCATE_PRESENT
- fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
- #endif
- SetFD_CLOEXEC(fd, nullptr);
- result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
- return Status::OK();
- }
- }
- uint64_t NowMicros() override {
- struct timeval tv;
- gettimeofday(&tv, nullptr);
- return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
- }
- uint64_t NowNanos() override {
- #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
- #elif defined(OS_SOLARIS)
- return gethrtime();
- #elif defined(__MACH__)
- clock_serv_t cclock;
- mach_timespec_t ts;
- host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
- clock_get_time(cclock, &ts);
- mach_port_deallocate(mach_task_self(), cclock);
- return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
- #else
- return std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::steady_clock::now().time_since_epoch()).count();
- #endif
- }
- uint64_t NowCPUNanos() override {
- #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
- (defined(__MACH__) && defined(__MAC_10_12))
- struct timespec ts;
- clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
- return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
- #endif
- return 0;
- }
- void SleepForMicroseconds(int micros) override { usleep(micros); }
- Status GetHostName(char* name, uint64_t len) override {
- int ret = gethostname(name, static_cast<size_t>(len));
- if (ret < 0) {
- if (errno == EFAULT || errno == EINVAL) {
- return Status::InvalidArgument(strerror(errno));
- } else {
- return IOError("GetHostName", name, errno);
- }
- }
- return Status::OK();
- }
- Status GetCurrentTime(int64_t* unix_time) override {
- time_t ret = time(nullptr);
- if (ret == (time_t) -1) {
- return IOError("GetCurrentTime", "", errno);
- }
- *unix_time = (int64_t) ret;
- return Status::OK();
- }
- ThreadStatusUpdater* GetThreadStatusUpdater() const override {
- return Env::GetThreadStatusUpdater();
- }
- std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
- // Allow increasing the number of worker threads.
- void SetBackgroundThreads(int num, Priority pri) override {
- assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
- thread_pools_[pri].SetBackgroundThreads(num);
- }
- int GetBackgroundThreads(Priority pri) override {
- assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
- return thread_pools_[pri].GetBackgroundThreads();
- }
- Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
- allow_non_owner_access_ = allow_non_owner_access;
- return Status::OK();
- }
- // Allow increasing the number of worker threads.
- void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
- assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
- thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
- }
- void LowerThreadPoolIOPriority(Priority pool = LOW) override {
- assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
- #ifdef OS_LINUX
- thread_pools_[pool].LowerIOPriority();
- #else
- (void)pool;
- #endif
- }
- void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
- assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
- #ifdef OS_LINUX
- thread_pools_[pool].LowerCPUPriority();
- #else
- (void)pool;
- #endif
- }
- std::string TimeToString(uint64_t secondsSince1970) override {
- const time_t seconds = (time_t)secondsSince1970;
- struct tm t;
- int maxsize = 64;
- std::string dummy;
- dummy.reserve(maxsize);
- dummy.resize(maxsize);
- char* p = &dummy[0];
- localtime_r(&seconds, &t);
- snprintf(p, maxsize,
- "%04d/%02d/%02d-%02d:%02d:%02d ",
- t.tm_year + 1900,
- t.tm_mon + 1,
- t.tm_mday,
- t.tm_hour,
- t.tm_min,
- t.tm_sec);
- return dummy;
- }
- private:
- std::vector<ThreadPoolImpl> thread_pools_;
- pthread_mutex_t mu_;
- std::vector<pthread_t> threads_to_join_;
- // If true, allow non owner read access for db files. Otherwise, non-owner
- // has no access to db files.
- bool allow_non_owner_access_;
- };
- PosixEnv::PosixEnv()
- : CompositeEnvWrapper(this, FileSystem::Default().get()),
- thread_pools_(Priority::TOTAL),
- allow_non_owner_access_(true) {
- ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
- for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
- thread_pools_[pool_id].SetThreadPriority(
- static_cast<Env::Priority>(pool_id));
- // This allows later initializing the thread-local-env of each thread.
- thread_pools_[pool_id].SetHostEnv(this);
- }
- thread_status_updater_ = CreateThreadStatusUpdater();
- }
- void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
- void* tag, void (*unschedFunction)(void* arg)) {
- assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
- thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
- }
- int PosixEnv::UnSchedule(void* arg, Priority pri) {
- return thread_pools_[pri].UnSchedule(arg);
- }
- unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
- assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
- return thread_pools_[pri].GetQueueLen();
- }
- struct StartThreadState {
- void (*user_function)(void*);
- void* arg;
- };
- static void* StartThreadWrapper(void* arg) {
- StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
- state->user_function(state->arg);
- delete state;
- return nullptr;
- }
- void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
- pthread_t t;
- StartThreadState* state = new StartThreadState;
- state->user_function = function;
- state->arg = arg;
- ThreadPoolImpl::PthreadCall(
- "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
- ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
- threads_to_join_.push_back(t);
- ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
- }
- void PosixEnv::WaitForJoin() {
- for (const auto tid : threads_to_join_) {
- pthread_join(tid, nullptr);
- }
- threads_to_join_.clear();
- }
- } // namespace
- std::string Env::GenerateUniqueId() {
- std::string uuid_file = "/proc/sys/kernel/random/uuid";
- Status s = FileExists(uuid_file);
- if (s.ok()) {
- std::string uuid;
- s = ReadFileToString(this, uuid_file, &uuid);
- if (s.ok()) {
- return uuid;
- }
- }
- // Could not read uuid_file - generate uuid using "nanos-random"
- Random64 r(time(nullptr));
- uint64_t random_uuid_portion =
- r.Uniform(std::numeric_limits<uint64_t>::max());
- uint64_t nanos_uuid_portion = NowNanos();
- char uuid2[200];
- snprintf(uuid2,
- 200,
- "%lx-%lx",
- (unsigned long)nanos_uuid_portion,
- (unsigned long)random_uuid_portion);
- return uuid2;
- }
- //
- // Default Posix Env
- //
- Env* Env::Default() {
- // The following function call initializes the singletons of ThreadLocalPtr
- // right before the static default_env. This guarantees default_env will
- // always being destructed before the ThreadLocalPtr singletons get
- // destructed as C++ guarantees that the destructions of static variables
- // is in the reverse order of their constructions.
- //
- // Since static members are destructed in the reverse order
- // of their construction, having this call here guarantees that
- // the destructor of static PosixEnv will go first, then the
- // the singletons of ThreadLocalPtr.
- ThreadLocalPtr::InitSingletons();
- CompressionContextCache::InitSingleton();
- INIT_SYNC_POINT_SINGLETONS();
- static PosixEnv default_env;
- static CompositeEnvWrapper composite_env(&default_env,
- FileSystem::Default().get());
- return &composite_env;
- }
- } // namespace ROCKSDB_NAMESPACE
|