env_posix.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors
  9. #include "port/lang.h"
  10. #if !defined(OS_WIN)
  11. #include <dirent.h>
  12. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  13. #include <dlfcn.h>
  14. #endif
  15. #include <fcntl.h>
  16. #include <cerrno>
  17. #if defined(ROCKSDB_IOURING_PRESENT)
  18. #include <liburing.h>
  19. #endif
  20. #include <pthread.h>
  21. #include <sys/mman.h>
  22. #include <sys/stat.h>
  23. #include <csignal>
  24. #include <cstdio>
  25. #include <cstdlib>
  26. #include <cstring>
  27. #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
  28. #include <sys/statfs.h>
  29. #endif
  30. #include <sys/statvfs.h>
  31. #include <sys/time.h>
  32. #include <sys/types.h>
  33. #if defined(ROCKSDB_IOURING_PRESENT)
  34. #include <sys/uio.h>
  35. #endif
  36. #include <unistd.h>
  37. #include <algorithm>
  38. #include <ctime>
  39. // Get nano time includes
  40. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
  41. #elif defined(__MACH__)
  42. #include <Availability.h>
  43. #include <mach/clock.h>
  44. #include <mach/mach.h>
  45. #else
  46. #include <chrono>
  47. #endif
  48. #include <deque>
  49. #include <set>
  50. #include <vector>
  51. #include "env/composite_env_wrapper.h"
  52. #include "env/io_posix.h"
  53. #include "monitoring/iostats_context_imp.h"
  54. #include "monitoring/thread_status_updater.h"
  55. #include "port/port.h"
  56. #include "port/sys_time.h"
  57. #include "rocksdb/env.h"
  58. #include "rocksdb/options.h"
  59. #include "rocksdb/slice.h"
  60. #include "rocksdb/system_clock.h"
  61. #include "test_util/sync_point.h"
  62. #include "util/coding.h"
  63. #include "util/compression_context_cache.h"
  64. #include "util/random.h"
  65. #include "util/string_util.h"
  66. #include "util/thread_local.h"
  67. #include "util/threadpool_imp.h"
  68. #if !defined(TMPFS_MAGIC)
  69. #define TMPFS_MAGIC 0x01021994
  70. #endif
  71. #if !defined(XFS_SUPER_MAGIC)
  72. #define XFS_SUPER_MAGIC 0x58465342
  73. #endif
  74. #if !defined(EXT4_SUPER_MAGIC)
  75. #define EXT4_SUPER_MAGIC 0xEF53
  76. #endif
  77. namespace ROCKSDB_NAMESPACE {
  78. #if defined(OS_WIN)
  79. static const std::string kSharedLibExt = ".dll";
  80. [[maybe_unused]] static const char kPathSeparator = ';';
  81. #else
  82. [[maybe_unused]] static const char kPathSeparator = ':';
  83. #if defined(OS_MACOSX)
  84. static const std::string kSharedLibExt = ".dylib";
  85. #else
  86. static const std::string kSharedLibExt = ".so";
  87. #endif
  88. #endif
  89. namespace {
  90. ThreadStatusUpdater* CreateThreadStatusUpdater() {
  91. return new ThreadStatusUpdater();
  92. }
  93. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  94. class PosixDynamicLibrary : public DynamicLibrary {
  95. public:
  96. PosixDynamicLibrary(const std::string& name, void* handle)
  97. : name_(name), handle_(handle) {}
  98. ~PosixDynamicLibrary() override { dlclose(handle_); }
  99. Status LoadSymbol(const std::string& sym_name, void** func) override {
  100. assert(nullptr != func);
  101. dlerror(); // Clear any old error
  102. *func = dlsym(handle_, sym_name.c_str());
  103. if (*func != nullptr) {
  104. return Status::OK();
  105. } else {
  106. char* err = dlerror();
  107. return Status::NotFound("Error finding symbol: " + sym_name, err);
  108. }
  109. }
  110. const char* Name() const override { return name_.c_str(); }
  111. private:
  112. std::string name_;
  113. void* handle_;
  114. };
  115. #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
  116. class PosixClock : public SystemClock {
  117. public:
  118. static const char* kClassName() { return "PosixClock"; }
  119. const char* Name() const override { return kDefaultName(); }
  120. const char* NickName() const override { return kClassName(); }
  121. uint64_t NowMicros() override {
  122. port::TimeVal tv;
  123. port::GetTimeOfDay(&tv, nullptr);
  124. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  125. }
  126. uint64_t NowNanos() override {
  127. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
  128. defined(OS_AIX)
  129. struct timespec ts;
  130. clock_gettime(CLOCK_MONOTONIC, &ts);
  131. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  132. #elif defined(OS_SOLARIS)
  133. return gethrtime();
  134. #elif defined(__MACH__)
  135. clock_serv_t cclock;
  136. mach_timespec_t ts;
  137. host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
  138. clock_get_time(cclock, &ts);
  139. mach_port_deallocate(mach_task_self(), cclock);
  140. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  141. #else
  142. return std::chrono::duration_cast<std::chrono::nanoseconds>(
  143. std::chrono::steady_clock::now().time_since_epoch())
  144. .count();
  145. #endif
  146. }
  147. uint64_t CPUMicros() override {
  148. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
  149. defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
  150. struct timespec ts;
  151. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
  152. return (static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec) / 1000;
  153. #else
  154. return 0;
  155. #endif
  156. }
  157. uint64_t CPUNanos() override {
  158. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
  159. defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
  160. struct timespec ts;
  161. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
  162. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  163. #else
  164. return 0;
  165. #endif
  166. }
  167. void SleepForMicroseconds(int micros) override { usleep(micros); }
  168. Status GetCurrentTime(int64_t* unix_time) override {
  169. time_t ret = time(nullptr);
  170. if (ret == (time_t)-1) {
  171. return IOError("GetCurrentTime", "", errno);
  172. }
  173. *unix_time = (int64_t)ret;
  174. return Status::OK();
  175. }
  176. std::string TimeToString(uint64_t secondsSince1970) override {
  177. const time_t seconds = (time_t)secondsSince1970;
  178. struct tm t;
  179. int maxsize = 64;
  180. std::string dummy;
  181. dummy.reserve(maxsize);
  182. dummy.resize(maxsize);
  183. char* p = dummy.data();
  184. port::LocalTimeR(&seconds, &t);
  185. snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", t.tm_year + 1900,
  186. t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec);
  187. return dummy;
  188. }
  189. };
  190. class PosixEnv : public CompositeEnv {
  191. public:
  192. static const char* kClassName() { return "PosixEnv"; }
  193. const char* Name() const override { return kClassName(); }
  194. const char* NickName() const override { return kDefaultName(); }
  195. struct JoinThreadsOnExit {
  196. explicit JoinThreadsOnExit(PosixEnv& _deflt) : deflt(_deflt) {}
  197. ~JoinThreadsOnExit() {
  198. for (const auto tid : deflt.threads_to_join_) {
  199. pthread_join(tid, nullptr);
  200. }
  201. for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
  202. deflt.thread_pools_[pool_id].JoinAllThreads();
  203. }
  204. // Do not delete the thread_status_updater_ in order to avoid the
  205. // free after use when Env::Default() is destructed while some other
  206. // child threads are still trying to update thread status. All
  207. // PosixEnv instances use the same thread_status_updater_, so never
  208. // explicitly delete it.
  209. }
  210. PosixEnv& deflt;
  211. };
  212. void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
  213. if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
  214. fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
  215. }
  216. }
  217. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  218. // Loads the named library into the result.
  219. // If the input name is empty, the current executable is loaded
  220. // On *nix systems, a "lib" prefix is added to the name if one is not supplied
  221. // Comparably, the appropriate shared library extension is added to the name
  222. // if not supplied. If search_path is not specified, the shared library will
  223. // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
  224. // specified, the shared library will be searched for in the directories
  225. // provided by the search path
  226. Status LoadLibrary(const std::string& name, const std::string& path,
  227. std::shared_ptr<DynamicLibrary>* result) override {
  228. assert(result != nullptr);
  229. if (name.empty()) {
  230. void* hndl = dlopen(NULL, RTLD_NOW);
  231. if (hndl != nullptr) {
  232. result->reset(new PosixDynamicLibrary(name, hndl));
  233. return Status::OK();
  234. }
  235. } else {
  236. std::string library_name = name;
  237. if (library_name.find(kSharedLibExt) == std::string::npos) {
  238. library_name = library_name + kSharedLibExt;
  239. }
  240. #if !defined(OS_WIN)
  241. if (library_name.find('/') == std::string::npos &&
  242. library_name.compare(0, 3, "lib") != 0) {
  243. library_name = "lib" + library_name;
  244. }
  245. #endif
  246. if (path.empty()) {
  247. void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
  248. if (hndl != nullptr) {
  249. result->reset(new PosixDynamicLibrary(library_name, hndl));
  250. return Status::OK();
  251. }
  252. } else {
  253. std::string local_path;
  254. std::stringstream ss(path);
  255. while (getline(ss, local_path, kPathSeparator)) {
  256. if (!path.empty()) {
  257. std::string full_name = local_path + "/" + library_name;
  258. void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
  259. if (hndl != nullptr) {
  260. result->reset(new PosixDynamicLibrary(full_name, hndl));
  261. return Status::OK();
  262. }
  263. }
  264. }
  265. }
  266. }
  267. return Status::IOError(
  268. IOErrorMsg("Failed to open shared library: xs", name), dlerror());
  269. }
  270. #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
  271. void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
  272. void* tag = nullptr,
  273. void (*unschedFunction)(void* arg) = nullptr) override;
  274. int UnSchedule(void* arg, Priority pri) override;
  275. void StartThread(void (*function)(void* arg), void* arg) override;
  276. void WaitForJoin() override;
  277. unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
  278. int ReserveThreads(int threads_to_be_reserved, Priority pri) override;
  279. int ReleaseThreads(int threads_to_be_released, Priority pri) override;
  280. Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
  281. assert(thread_status_updater_);
  282. return thread_status_updater_->GetThreadList(thread_list);
  283. }
  284. uint64_t GetThreadID() const override {
  285. uint64_t thread_id = 0;
  286. #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  287. #if __GLIBC_PREREQ(2, 30)
  288. thread_id = ::gettid();
  289. #else // __GLIBC_PREREQ(2, 30)
  290. pthread_t tid = pthread_self();
  291. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  292. #endif // __GLIBC_PREREQ(2, 30)
  293. #else // defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  294. pthread_t tid = pthread_self();
  295. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  296. #endif // defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
  297. return thread_id;
  298. }
  299. Status GetHostName(char* name, uint64_t len) override {
  300. const size_t max_len = static_cast<size_t>(len);
  301. int ret = gethostname(name, max_len);
  302. if (ret < 0) {
  303. if (errno == EFAULT || errno == EINVAL) {
  304. return Status::InvalidArgument(errnoStr(errno).c_str());
  305. } else if (errno == ENAMETOOLONG) {
  306. return IOError("GetHostName", std::string(name, strnlen(name, max_len)),
  307. errno);
  308. } else {
  309. return IOError("GetHostName", "", errno);
  310. }
  311. }
  312. return Status::OK();
  313. }
  314. ThreadStatusUpdater* GetThreadStatusUpdater() const override {
  315. return Env::GetThreadStatusUpdater();
  316. }
  317. std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
  318. // Allow increasing the number of worker threads.
  319. void SetBackgroundThreads(int num, Priority pri) override {
  320. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  321. thread_pools_[pri].SetBackgroundThreads(num);
  322. }
  323. int GetBackgroundThreads(Priority pri) override {
  324. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  325. return thread_pools_[pri].GetBackgroundThreads();
  326. }
  327. Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
  328. allow_non_owner_access_ = allow_non_owner_access;
  329. return Status::OK();
  330. }
  331. // Allow increasing the number of worker threads.
  332. void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
  333. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  334. thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
  335. }
  336. void LowerThreadPoolIOPriority(Priority pool) override {
  337. assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
  338. #ifdef OS_LINUX
  339. thread_pools_[pool].LowerIOPriority();
  340. #else
  341. (void)pool;
  342. #endif
  343. }
  344. void LowerThreadPoolCPUPriority(Priority pool) override {
  345. assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
  346. thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
  347. }
  348. Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
  349. assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
  350. thread_pools_[pool].LowerCPUPriority(pri);
  351. return Status::OK();
  352. }
  353. private:
  354. friend Env* Env::Default();
  355. // Constructs the default Env, a singleton
  356. PosixEnv();
  357. // The below 4 members are only used by the default PosixEnv instance.
  358. // Non-default instances simply maintain references to the backing
  359. // members in te default instance
  360. std::vector<ThreadPoolImpl> thread_pools_storage_;
  361. pthread_mutex_t mu_storage_;
  362. std::vector<pthread_t> threads_to_join_storage_;
  363. bool allow_non_owner_access_storage_;
  364. std::vector<ThreadPoolImpl>& thread_pools_;
  365. pthread_mutex_t& mu_;
  366. std::vector<pthread_t>& threads_to_join_;
  367. // If true, allow non owner read access for db files. Otherwise, non-owner
  368. // has no access to db files.
  369. bool& allow_non_owner_access_;
  370. };
  371. PosixEnv::PosixEnv()
  372. : CompositeEnv(FileSystem::Default(), SystemClock::Default()),
  373. thread_pools_storage_(Priority::TOTAL),
  374. allow_non_owner_access_storage_(true),
  375. thread_pools_(thread_pools_storage_),
  376. mu_(mu_storage_),
  377. threads_to_join_(threads_to_join_storage_),
  378. allow_non_owner_access_(allow_non_owner_access_storage_) {
  379. ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  380. for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
  381. thread_pools_[pool_id].SetThreadPriority(
  382. static_cast<Env::Priority>(pool_id));
  383. // This allows later initializing the thread-local-env of each thread.
  384. thread_pools_[pool_id].SetHostEnv(this);
  385. }
  386. thread_status_updater_ = CreateThreadStatusUpdater();
  387. }
  388. void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
  389. void* tag, void (*unschedFunction)(void* arg)) {
  390. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  391. thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
  392. }
  393. int PosixEnv::UnSchedule(void* arg, Priority pri) {
  394. return thread_pools_[pri].UnSchedule(arg);
  395. }
  396. unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
  397. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  398. return thread_pools_[pri].GetQueueLen();
  399. }
  400. int PosixEnv::ReserveThreads(int threads_to_reserved, Priority pri) {
  401. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  402. return thread_pools_[pri].ReserveThreads(threads_to_reserved);
  403. }
  404. int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) {
  405. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  406. return thread_pools_[pri].ReleaseThreads(threads_to_released);
  407. }
  408. struct StartThreadState {
  409. void (*user_function)(void*);
  410. void* arg;
  411. };
  412. static void* StartThreadWrapper(void* arg) {
  413. StartThreadState* state = static_cast<StartThreadState*>(arg);
  414. state->user_function(state->arg);
  415. delete state;
  416. return nullptr;
  417. }
  418. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  419. pthread_t t;
  420. StartThreadState* state = new StartThreadState;
  421. state->user_function = function;
  422. state->arg = arg;
  423. ThreadPoolImpl::PthreadCall(
  424. "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
  425. ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
  426. threads_to_join_.push_back(t);
  427. ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  428. }
  429. void PosixEnv::WaitForJoin() {
  430. for (const auto tid : threads_to_join_) {
  431. pthread_join(tid, nullptr);
  432. }
  433. threads_to_join_.clear();
  434. }
  435. } // namespace
  436. //
  437. // Default Posix Env
  438. //
  439. Env* Env::Default() {
  440. // The following function call initializes the singletons of ThreadLocalPtr
  441. // right before the static default_env. This guarantees default_env will
  442. // always being destructed before the ThreadLocalPtr singletons get
  443. // destructed as C++ guarantees that the destructions of static variables
  444. // is in the reverse order of their constructions.
  445. //
  446. // Since static members are destructed in the reverse order
  447. // of their construction, having this call here guarantees that
  448. // the destructor of static PosixEnv will go first, then the
  449. // the singletons of ThreadLocalPtr.
  450. ThreadLocalPtr::InitSingletons();
  451. CompressionContextCache::InitSingleton();
  452. INIT_SYNC_POINT_SINGLETONS();
  453. // Avoid problems with accessing most members of Env::Default() during
  454. // static destruction.
  455. STATIC_AVOID_DESTRUCTION(PosixEnv, default_env);
  456. // This destructor must be called on exit
  457. static PosixEnv::JoinThreadsOnExit thread_joiner(default_env);
  458. return &default_env;
  459. }
  460. //
  461. // Default Posix SystemClock
  462. //
  463. const std::shared_ptr<SystemClock>& SystemClock::Default() {
  464. STATIC_AVOID_DESTRUCTION(std::shared_ptr<SystemClock>, instance)
  465. (std::make_shared<PosixClock>());
  466. return instance;
  467. }
  468. } // namespace ROCKSDB_NAMESPACE
  469. #endif