env_hdfs.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #pragma once
  7. #include <algorithm>
  8. #include <stdio.h>
  9. #include <time.h>
  10. #include <iostream>
  11. #include "port/sys_time.h"
  12. #include "rocksdb/env.h"
  13. #include "rocksdb/status.h"
  14. #ifdef USE_HDFS
  15. #include <hdfs.h>
  16. namespace ROCKSDB_NAMESPACE {
  17. // Thrown during execution when there is an issue with the supplied
  18. // arguments.
  19. class HdfsUsageException : public std::exception { };
  20. // A simple exception that indicates something went wrong that is not
  21. // recoverable. The intention is for the message to be printed (with
  22. // nothing else) and the process terminate.
  23. class HdfsFatalException : public std::exception {
  24. public:
  25. explicit HdfsFatalException(const std::string& s) : what_(s) { }
  26. virtual ~HdfsFatalException() throw() { }
  27. virtual const char* what() const throw() {
  28. return what_.c_str();
  29. }
  30. private:
  31. const std::string what_;
  32. };
  33. //
  34. // The HDFS environment for rocksdb. This class overrides all the
  35. // file/dir access methods and delegates the thread-mgmt methods to the
  36. // default posix environment.
  37. //
  38. class HdfsEnv : public Env {
  39. public:
  40. explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
  41. posixEnv = Env::Default();
  42. fileSys_ = connectToPath(fsname_);
  43. }
  44. virtual ~HdfsEnv() {
  45. fprintf(stderr, "Destroying HdfsEnv::Default()\n");
  46. hdfsDisconnect(fileSys_);
  47. }
  48. Status NewSequentialFile(const std::string& fname,
  49. std::unique_ptr<SequentialFile>* result,
  50. const EnvOptions& options) override;
  51. Status NewRandomAccessFile(const std::string& fname,
  52. std::unique_ptr<RandomAccessFile>* result,
  53. const EnvOptions& options) override;
  54. Status NewWritableFile(const std::string& fname,
  55. std::unique_ptr<WritableFile>* result,
  56. const EnvOptions& options) override;
  57. Status NewDirectory(const std::string& name,
  58. std::unique_ptr<Directory>* result) override;
  59. Status FileExists(const std::string& fname) override;
  60. Status GetChildren(const std::string& path,
  61. std::vector<std::string>* result) override;
  62. Status DeleteFile(const std::string& fname) override;
  63. Status CreateDir(const std::string& name) override;
  64. Status CreateDirIfMissing(const std::string& name) override;
  65. Status DeleteDir(const std::string& name) override;
  66. Status GetFileSize(const std::string& fname, uint64_t* size) override;
  67. Status GetFileModificationTime(const std::string& fname,
  68. uint64_t* file_mtime) override;
  69. Status RenameFile(const std::string& src, const std::string& target) override;
  70. Status LinkFile(const std::string& /*src*/,
  71. const std::string& /*target*/) override {
  72. return Status::NotSupported(); // not supported
  73. }
  74. Status LockFile(const std::string& fname, FileLock** lock) override;
  75. Status UnlockFile(FileLock* lock) override;
  76. Status NewLogger(const std::string& fname,
  77. std::shared_ptr<Logger>* result) override;
  78. void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
  79. void* tag = nullptr,
  80. void (*unschedFunction)(void* arg) = 0) override {
  81. posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
  82. }
  83. int UnSchedule(void* tag, Priority pri) override {
  84. return posixEnv->UnSchedule(tag, pri);
  85. }
  86. void StartThread(void (*function)(void* arg), void* arg) override {
  87. posixEnv->StartThread(function, arg);
  88. }
  89. void WaitForJoin() override { posixEnv->WaitForJoin(); }
  90. unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
  91. return posixEnv->GetThreadPoolQueueLen(pri);
  92. }
  93. Status GetTestDirectory(std::string* path) override {
  94. return posixEnv->GetTestDirectory(path);
  95. }
  96. uint64_t NowMicros() override { return posixEnv->NowMicros(); }
  97. void SleepForMicroseconds(int micros) override {
  98. posixEnv->SleepForMicroseconds(micros);
  99. }
  100. Status GetHostName(char* name, uint64_t len) override {
  101. return posixEnv->GetHostName(name, len);
  102. }
  103. Status GetCurrentTime(int64_t* unix_time) override {
  104. return posixEnv->GetCurrentTime(unix_time);
  105. }
  106. Status GetAbsolutePath(const std::string& db_path,
  107. std::string* output_path) override {
  108. return posixEnv->GetAbsolutePath(db_path, output_path);
  109. }
  110. void SetBackgroundThreads(int number, Priority pri = LOW) override {
  111. posixEnv->SetBackgroundThreads(number, pri);
  112. }
  113. int GetBackgroundThreads(Priority pri = LOW) override {
  114. return posixEnv->GetBackgroundThreads(pri);
  115. }
  116. void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
  117. posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
  118. }
  119. std::string TimeToString(uint64_t number) override {
  120. return posixEnv->TimeToString(number);
  121. }
  122. static uint64_t gettid() {
  123. assert(sizeof(pthread_t) <= sizeof(uint64_t));
  124. return (uint64_t)pthread_self();
  125. }
  126. uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
  127. private:
  128. std::string fsname_; // string of the form "hdfs://hostname:port/"
  129. hdfsFS fileSys_; // a single FileSystem object for all files
  130. Env* posixEnv; // This object is derived from Env, but not from
  131. // posixEnv. We have posixnv as an encapsulated
  132. // object here so that we can use posix timers,
  133. // posix threads, etc.
  134. static const std::string kProto;
  135. static const std::string pathsep;
  136. /**
  137. * If the URI is specified of the form hdfs://server:port/path,
  138. * then connect to the specified cluster
  139. * else connect to default.
  140. */
  141. hdfsFS connectToPath(const std::string& uri) {
  142. if (uri.empty()) {
  143. return nullptr;
  144. }
  145. if (uri.find(kProto) != 0) {
  146. // uri doesn't start with hdfs:// -> use default:0, which is special
  147. // to libhdfs.
  148. return hdfsConnectNewInstance("default", 0);
  149. }
  150. const std::string hostport = uri.substr(kProto.length());
  151. std::vector <std::string> parts;
  152. split(hostport, ':', parts);
  153. if (parts.size() != 2) {
  154. throw HdfsFatalException("Bad uri for hdfs " + uri);
  155. }
  156. // parts[0] = hosts, parts[1] = port/xxx/yyy
  157. std::string host(parts[0]);
  158. std::string remaining(parts[1]);
  159. int rem = static_cast<int>(remaining.find(pathsep));
  160. std::string portStr = (rem == 0 ? remaining :
  161. remaining.substr(0, rem));
  162. tPort port;
  163. port = atoi(portStr.c_str());
  164. if (port == 0) {
  165. throw HdfsFatalException("Bad host-port for hdfs " + uri);
  166. }
  167. hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
  168. return fs;
  169. }
  170. void split(const std::string &s, char delim,
  171. std::vector<std::string> &elems) {
  172. elems.clear();
  173. size_t prev = 0;
  174. size_t pos = s.find(delim);
  175. while (pos != std::string::npos) {
  176. elems.push_back(s.substr(prev, pos));
  177. prev = pos + 1;
  178. pos = s.find(delim, prev);
  179. }
  180. elems.push_back(s.substr(prev, s.size()));
  181. }
  182. };
  183. } // namespace ROCKSDB_NAMESPACE
  184. #else // USE_HDFS
  185. namespace ROCKSDB_NAMESPACE {
  186. static const Status notsup;
  187. class HdfsEnv : public Env {
  188. public:
  189. explicit HdfsEnv(const std::string& /*fsname*/) {
  190. fprintf(stderr, "You have not build rocksdb with HDFS support\n");
  191. fprintf(stderr, "Please see hdfs/README for details\n");
  192. abort();
  193. }
  194. virtual ~HdfsEnv() {
  195. }
  196. virtual Status NewSequentialFile(const std::string& fname,
  197. std::unique_ptr<SequentialFile>* result,
  198. const EnvOptions& options) override;
  199. virtual Status NewRandomAccessFile(
  200. const std::string& /*fname*/,
  201. std::unique_ptr<RandomAccessFile>* /*result*/,
  202. const EnvOptions& /*options*/) override {
  203. return notsup;
  204. }
  205. virtual Status NewWritableFile(const std::string& /*fname*/,
  206. std::unique_ptr<WritableFile>* /*result*/,
  207. const EnvOptions& /*options*/) override {
  208. return notsup;
  209. }
  210. virtual Status NewDirectory(const std::string& /*name*/,
  211. std::unique_ptr<Directory>* /*result*/) override {
  212. return notsup;
  213. }
  214. virtual Status FileExists(const std::string& /*fname*/) override {
  215. return notsup;
  216. }
  217. virtual Status GetChildren(const std::string& /*path*/,
  218. std::vector<std::string>* /*result*/) override {
  219. return notsup;
  220. }
  221. virtual Status DeleteFile(const std::string& /*fname*/) override {
  222. return notsup;
  223. }
  224. virtual Status CreateDir(const std::string& /*name*/) override {
  225. return notsup;
  226. }
  227. virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
  228. return notsup;
  229. }
  230. virtual Status DeleteDir(const std::string& /*name*/) override {
  231. return notsup;
  232. }
  233. virtual Status GetFileSize(const std::string& /*fname*/,
  234. uint64_t* /*size*/) override {
  235. return notsup;
  236. }
  237. virtual Status GetFileModificationTime(const std::string& /*fname*/,
  238. uint64_t* /*time*/) override {
  239. return notsup;
  240. }
  241. virtual Status RenameFile(const std::string& /*src*/,
  242. const std::string& /*target*/) override {
  243. return notsup;
  244. }
  245. virtual Status LinkFile(const std::string& /*src*/,
  246. const std::string& /*target*/) override {
  247. return notsup;
  248. }
  249. virtual Status LockFile(const std::string& /*fname*/,
  250. FileLock** /*lock*/) override {
  251. return notsup;
  252. }
  253. virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; }
  254. virtual Status NewLogger(const std::string& /*fname*/,
  255. std::shared_ptr<Logger>* /*result*/) override {
  256. return notsup;
  257. }
  258. virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
  259. Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
  260. void (* /*unschedFunction*/)(void* arg) = 0) override {}
  261. virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
  262. virtual void StartThread(void (* /*function*/)(void* arg),
  263. void* /*arg*/) override {}
  264. virtual void WaitForJoin() override {}
  265. virtual unsigned int GetThreadPoolQueueLen(
  266. Priority /*pri*/ = LOW) const override {
  267. return 0;
  268. }
  269. virtual Status GetTestDirectory(std::string* /*path*/) override {
  270. return notsup;
  271. }
  272. virtual uint64_t NowMicros() override { return 0; }
  273. virtual void SleepForMicroseconds(int /*micros*/) override {}
  274. virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
  275. return notsup;
  276. }
  277. virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
  278. return notsup;
  279. }
  280. virtual Status GetAbsolutePath(const std::string& /*db_path*/,
  281. std::string* /*outputpath*/) override {
  282. return notsup;
  283. }
  284. virtual void SetBackgroundThreads(int /*number*/,
  285. Priority /*pri*/ = LOW) override {}
  286. virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
  287. return 0;
  288. }
  289. virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
  290. Priority /*pri*/) override {}
  291. virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
  292. virtual uint64_t GetThreadID() const override {
  293. return 0;
  294. }
  295. };
  296. } // namespace ROCKSDB_NAMESPACE
  297. #endif // USE_HDFS