env_posix.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors
  9. #include <dirent.h>
  10. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  11. #include <dlfcn.h>
  12. #endif
  13. #include <errno.h>
  14. #include <fcntl.h>
  15. #if defined(OS_LINUX)
  16. #include <linux/fs.h>
  17. #endif
  18. #if defined(ROCKSDB_IOURING_PRESENT)
  19. #include <liburing.h>
  20. #endif
  21. #include <pthread.h>
  22. #include <signal.h>
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <string.h>
  26. #include <sys/ioctl.h>
  27. #include <sys/mman.h>
  28. #include <sys/stat.h>
  29. #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
  30. #include <sys/statfs.h>
  31. #include <sys/syscall.h>
  32. #include <sys/sysmacros.h>
  33. #endif
  34. #include <sys/statvfs.h>
  35. #include <sys/time.h>
  36. #include <sys/types.h>
  37. #if defined(ROCKSDB_IOURING_PRESENT)
  38. #include <sys/uio.h>
  39. #endif
  40. #include <time.h>
  41. #include <algorithm>
  42. // Get nano time includes
  43. #if defined(OS_LINUX) || defined(OS_FREEBSD)
  44. #elif defined(__MACH__)
  45. #include <Availability.h>
  46. #include <mach/clock.h>
  47. #include <mach/mach.h>
  48. #else
  49. #include <chrono>
  50. #endif
  51. #include <deque>
  52. #include <set>
  53. #include <vector>
  54. #include "env/composite_env_wrapper.h"
  55. #include "env/io_posix.h"
  56. #include "logging/logging.h"
  57. #include "logging/posix_logger.h"
  58. #include "monitoring/iostats_context_imp.h"
  59. #include "monitoring/thread_status_updater.h"
  60. #include "port/port.h"
  61. #include "rocksdb/options.h"
  62. #include "rocksdb/slice.h"
  63. #include "test_util/sync_point.h"
  64. #include "util/coding.h"
  65. #include "util/compression_context_cache.h"
  66. #include "util/random.h"
  67. #include "util/string_util.h"
  68. #include "util/thread_local.h"
  69. #include "util/threadpool_imp.h"
  70. #if !defined(TMPFS_MAGIC)
  71. #define TMPFS_MAGIC 0x01021994
  72. #endif
  73. #if !defined(XFS_SUPER_MAGIC)
  74. #define XFS_SUPER_MAGIC 0x58465342
  75. #endif
  76. #if !defined(EXT4_SUPER_MAGIC)
  77. #define EXT4_SUPER_MAGIC 0xEF53
  78. #endif
  79. namespace ROCKSDB_NAMESPACE {
  80. #if defined(OS_WIN)
  81. static const std::string kSharedLibExt = ".dll";
  82. static const char kPathSeparator = ';';
  83. #else
  84. static const char kPathSeparator = ':';
  85. #if defined(OS_MACOSX)
  86. static const std::string kSharedLibExt = ".dylib";
  87. #else
  88. static const std::string kSharedLibExt = ".so";
  89. #endif
  90. #endif
  91. namespace {
  92. ThreadStatusUpdater* CreateThreadStatusUpdater() {
  93. return new ThreadStatusUpdater();
  94. }
  95. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  96. class PosixDynamicLibrary : public DynamicLibrary {
  97. public:
  98. PosixDynamicLibrary(const std::string& name, void* handle)
  99. : name_(name), handle_(handle) {}
  100. ~PosixDynamicLibrary() override { dlclose(handle_); }
  101. Status LoadSymbol(const std::string& sym_name, void** func) override {
  102. assert(nullptr != func);
  103. dlerror(); // Clear any old error
  104. *func = dlsym(handle_, sym_name.c_str());
  105. if (*func != nullptr) {
  106. return Status::OK();
  107. } else {
  108. char* err = dlerror();
  109. return Status::NotFound("Error finding symbol: " + sym_name, err);
  110. }
  111. }
  112. const char* Name() const override { return name_.c_str(); }
  113. private:
  114. std::string name_;
  115. void* handle_;
  116. };
  117. #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
  118. class PosixEnv : public CompositeEnvWrapper {
  119. public:
  120. PosixEnv();
  121. ~PosixEnv() override {
  122. for (const auto tid : threads_to_join_) {
  123. pthread_join(tid, nullptr);
  124. }
  125. for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
  126. thread_pools_[pool_id].JoinAllThreads();
  127. }
  128. // Delete the thread_status_updater_ only when the current Env is not
  129. // Env::Default(). This is to avoid the free-after-use error when
  130. // Env::Default() is destructed while some other child threads are
  131. // still trying to update thread status.
  132. if (this != Env::Default()) {
  133. delete thread_status_updater_;
  134. }
  135. }
  136. void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
  137. if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
  138. fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
  139. }
  140. }
  141. #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
  142. // Loads the named library into the result.
  143. // If the input name is empty, the current executable is loaded
  144. // On *nix systems, a "lib" prefix is added to the name if one is not supplied
  145. // Comparably, the appropriate shared library extension is added to the name
  146. // if not supplied. If search_path is not specified, the shared library will
  147. // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
  148. // specified, the shared library will be searched for in the directories
  149. // provided by the search path
  150. Status LoadLibrary(const std::string& name, const std::string& path,
  151. std::shared_ptr<DynamicLibrary>* result) override {
  152. Status status;
  153. assert(result != nullptr);
  154. if (name.empty()) {
  155. void* hndl = dlopen(NULL, RTLD_NOW);
  156. if (hndl != nullptr) {
  157. result->reset(new PosixDynamicLibrary(name, hndl));
  158. return Status::OK();
  159. }
  160. } else {
  161. std::string library_name = name;
  162. if (library_name.find(kSharedLibExt) == std::string::npos) {
  163. library_name = library_name + kSharedLibExt;
  164. }
  165. #if !defined(OS_WIN)
  166. if (library_name.find('/') == std::string::npos &&
  167. library_name.compare(0, 3, "lib") != 0) {
  168. library_name = "lib" + library_name;
  169. }
  170. #endif
  171. if (path.empty()) {
  172. void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
  173. if (hndl != nullptr) {
  174. result->reset(new PosixDynamicLibrary(library_name, hndl));
  175. return Status::OK();
  176. }
  177. } else {
  178. std::string local_path;
  179. std::stringstream ss(path);
  180. while (getline(ss, local_path, kPathSeparator)) {
  181. if (!path.empty()) {
  182. std::string full_name = local_path + "/" + library_name;
  183. void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
  184. if (hndl != nullptr) {
  185. result->reset(new PosixDynamicLibrary(full_name, hndl));
  186. return Status::OK();
  187. }
  188. }
  189. }
  190. }
  191. }
  192. return Status::IOError(
  193. IOErrorMsg("Failed to open shared library: xs", name), dlerror());
  194. }
  195. #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
  196. void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
  197. void* tag = nullptr,
  198. void (*unschedFunction)(void* arg) = nullptr) override;
  199. int UnSchedule(void* arg, Priority pri) override;
  200. void StartThread(void (*function)(void* arg), void* arg) override;
  201. void WaitForJoin() override;
  202. unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
  203. Status GetTestDirectory(std::string* result) override {
  204. const char* env = getenv("TEST_TMPDIR");
  205. if (env && env[0] != '\0') {
  206. *result = env;
  207. } else {
  208. char buf[100];
  209. snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
  210. *result = buf;
  211. }
  212. // Directory may already exist
  213. CreateDir(*result);
  214. return Status::OK();
  215. }
  216. Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
  217. assert(thread_status_updater_);
  218. return thread_status_updater_->GetThreadList(thread_list);
  219. }
  220. static uint64_t gettid(pthread_t tid) {
  221. uint64_t thread_id = 0;
  222. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  223. return thread_id;
  224. }
  225. static uint64_t gettid() {
  226. pthread_t tid = pthread_self();
  227. return gettid(tid);
  228. }
  229. uint64_t GetThreadID() const override { return gettid(pthread_self()); }
  230. Status NewLogger(const std::string& fname,
  231. std::shared_ptr<Logger>* result) override {
  232. FILE* f;
  233. {
  234. IOSTATS_TIMER_GUARD(open_nanos);
  235. f = fopen(fname.c_str(),
  236. "w"
  237. #ifdef __GLIBC_PREREQ
  238. #if __GLIBC_PREREQ(2, 7)
  239. "e" // glibc extension to enable O_CLOEXEC
  240. #endif
  241. #endif
  242. );
  243. }
  244. if (f == nullptr) {
  245. result->reset();
  246. return IOError("when fopen a file for new logger", fname, errno);
  247. } else {
  248. int fd = fileno(f);
  249. #ifdef ROCKSDB_FALLOCATE_PRESENT
  250. fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
  251. #endif
  252. SetFD_CLOEXEC(fd, nullptr);
  253. result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
  254. return Status::OK();
  255. }
  256. }
  257. uint64_t NowMicros() override {
  258. struct timeval tv;
  259. gettimeofday(&tv, nullptr);
  260. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  261. }
  262. uint64_t NowNanos() override {
  263. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
  264. struct timespec ts;
  265. clock_gettime(CLOCK_MONOTONIC, &ts);
  266. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  267. #elif defined(OS_SOLARIS)
  268. return gethrtime();
  269. #elif defined(__MACH__)
  270. clock_serv_t cclock;
  271. mach_timespec_t ts;
  272. host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
  273. clock_get_time(cclock, &ts);
  274. mach_port_deallocate(mach_task_self(), cclock);
  275. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  276. #else
  277. return std::chrono::duration_cast<std::chrono::nanoseconds>(
  278. std::chrono::steady_clock::now().time_since_epoch()).count();
  279. #endif
  280. }
  281. uint64_t NowCPUNanos() override {
  282. #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
  283. (defined(__MACH__) && defined(__MAC_10_12))
  284. struct timespec ts;
  285. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
  286. return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
  287. #endif
  288. return 0;
  289. }
  290. void SleepForMicroseconds(int micros) override { usleep(micros); }
  291. Status GetHostName(char* name, uint64_t len) override {
  292. int ret = gethostname(name, static_cast<size_t>(len));
  293. if (ret < 0) {
  294. if (errno == EFAULT || errno == EINVAL) {
  295. return Status::InvalidArgument(strerror(errno));
  296. } else {
  297. return IOError("GetHostName", name, errno);
  298. }
  299. }
  300. return Status::OK();
  301. }
  302. Status GetCurrentTime(int64_t* unix_time) override {
  303. time_t ret = time(nullptr);
  304. if (ret == (time_t) -1) {
  305. return IOError("GetCurrentTime", "", errno);
  306. }
  307. *unix_time = (int64_t) ret;
  308. return Status::OK();
  309. }
  310. ThreadStatusUpdater* GetThreadStatusUpdater() const override {
  311. return Env::GetThreadStatusUpdater();
  312. }
  313. std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
  314. // Allow increasing the number of worker threads.
  315. void SetBackgroundThreads(int num, Priority pri) override {
  316. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  317. thread_pools_[pri].SetBackgroundThreads(num);
  318. }
  319. int GetBackgroundThreads(Priority pri) override {
  320. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  321. return thread_pools_[pri].GetBackgroundThreads();
  322. }
  323. Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
  324. allow_non_owner_access_ = allow_non_owner_access;
  325. return Status::OK();
  326. }
  327. // Allow increasing the number of worker threads.
  328. void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
  329. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  330. thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
  331. }
  332. void LowerThreadPoolIOPriority(Priority pool = LOW) override {
  333. assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
  334. #ifdef OS_LINUX
  335. thread_pools_[pool].LowerIOPriority();
  336. #else
  337. (void)pool;
  338. #endif
  339. }
  340. void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
  341. assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
  342. #ifdef OS_LINUX
  343. thread_pools_[pool].LowerCPUPriority();
  344. #else
  345. (void)pool;
  346. #endif
  347. }
  348. std::string TimeToString(uint64_t secondsSince1970) override {
  349. const time_t seconds = (time_t)secondsSince1970;
  350. struct tm t;
  351. int maxsize = 64;
  352. std::string dummy;
  353. dummy.reserve(maxsize);
  354. dummy.resize(maxsize);
  355. char* p = &dummy[0];
  356. localtime_r(&seconds, &t);
  357. snprintf(p, maxsize,
  358. "%04d/%02d/%02d-%02d:%02d:%02d ",
  359. t.tm_year + 1900,
  360. t.tm_mon + 1,
  361. t.tm_mday,
  362. t.tm_hour,
  363. t.tm_min,
  364. t.tm_sec);
  365. return dummy;
  366. }
  367. private:
  368. std::vector<ThreadPoolImpl> thread_pools_;
  369. pthread_mutex_t mu_;
  370. std::vector<pthread_t> threads_to_join_;
  371. // If true, allow non owner read access for db files. Otherwise, non-owner
  372. // has no access to db files.
  373. bool allow_non_owner_access_;
  374. };
  375. PosixEnv::PosixEnv()
  376. : CompositeEnvWrapper(this, FileSystem::Default().get()),
  377. thread_pools_(Priority::TOTAL),
  378. allow_non_owner_access_(true) {
  379. ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  380. for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
  381. thread_pools_[pool_id].SetThreadPriority(
  382. static_cast<Env::Priority>(pool_id));
  383. // This allows later initializing the thread-local-env of each thread.
  384. thread_pools_[pool_id].SetHostEnv(this);
  385. }
  386. thread_status_updater_ = CreateThreadStatusUpdater();
  387. }
  388. void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
  389. void* tag, void (*unschedFunction)(void* arg)) {
  390. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  391. thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
  392. }
  393. int PosixEnv::UnSchedule(void* arg, Priority pri) {
  394. return thread_pools_[pri].UnSchedule(arg);
  395. }
  396. unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
  397. assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
  398. return thread_pools_[pri].GetQueueLen();
  399. }
  400. struct StartThreadState {
  401. void (*user_function)(void*);
  402. void* arg;
  403. };
  404. static void* StartThreadWrapper(void* arg) {
  405. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  406. state->user_function(state->arg);
  407. delete state;
  408. return nullptr;
  409. }
  410. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  411. pthread_t t;
  412. StartThreadState* state = new StartThreadState;
  413. state->user_function = function;
  414. state->arg = arg;
  415. ThreadPoolImpl::PthreadCall(
  416. "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
  417. ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
  418. threads_to_join_.push_back(t);
  419. ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  420. }
  421. void PosixEnv::WaitForJoin() {
  422. for (const auto tid : threads_to_join_) {
  423. pthread_join(tid, nullptr);
  424. }
  425. threads_to_join_.clear();
  426. }
  427. } // namespace
  428. std::string Env::GenerateUniqueId() {
  429. std::string uuid_file = "/proc/sys/kernel/random/uuid";
  430. Status s = FileExists(uuid_file);
  431. if (s.ok()) {
  432. std::string uuid;
  433. s = ReadFileToString(this, uuid_file, &uuid);
  434. if (s.ok()) {
  435. return uuid;
  436. }
  437. }
  438. // Could not read uuid_file - generate uuid using "nanos-random"
  439. Random64 r(time(nullptr));
  440. uint64_t random_uuid_portion =
  441. r.Uniform(std::numeric_limits<uint64_t>::max());
  442. uint64_t nanos_uuid_portion = NowNanos();
  443. char uuid2[200];
  444. snprintf(uuid2,
  445. 200,
  446. "%lx-%lx",
  447. (unsigned long)nanos_uuid_portion,
  448. (unsigned long)random_uuid_portion);
  449. return uuid2;
  450. }
  451. //
  452. // Default Posix Env
  453. //
  454. Env* Env::Default() {
  455. // The following function call initializes the singletons of ThreadLocalPtr
  456. // right before the static default_env. This guarantees default_env will
  457. // always being destructed before the ThreadLocalPtr singletons get
  458. // destructed as C++ guarantees that the destructions of static variables
  459. // is in the reverse order of their constructions.
  460. //
  461. // Since static members are destructed in the reverse order
  462. // of their construction, having this call here guarantees that
  463. // the destructor of static PosixEnv will go first, then the
  464. // the singletons of ThreadLocalPtr.
  465. ThreadLocalPtr::InitSingletons();
  466. CompressionContextCache::InitSingleton();
  467. INIT_SYNC_POINT_SINGLETONS();
  468. static PosixEnv default_env;
  469. static CompositeEnvWrapper composite_env(&default_env,
  470. FileSystem::Default().get());
  471. return &composite_env;
  472. }
  473. } // namespace ROCKSDB_NAMESPACE