env_hdfs.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "rocksdb/env.h"
  7. #include "hdfs/env_hdfs.h"
  8. #ifdef USE_HDFS
  9. #ifndef ROCKSDB_HDFS_FILE_C
  10. #define ROCKSDB_HDFS_FILE_C
  11. #include <stdio.h>
  12. #include <sys/time.h>
  13. #include <time.h>
  14. #include <algorithm>
  15. #include <iostream>
  16. #include <sstream>
  17. #include "logging/logging.h"
  18. #include "rocksdb/status.h"
  19. #include "util/string_util.h"
  20. #define HDFS_EXISTS 0
  21. #define HDFS_DOESNT_EXIST -1
  22. #define HDFS_SUCCESS 0
  23. //
  24. // This file defines an HDFS environment for rocksdb. It uses the libhdfs
  25. // api to access HDFS. All HDFS files created by one instance of rocksdb
  26. // will reside on the same HDFS cluster.
  27. //
  28. namespace ROCKSDB_NAMESPACE {
  29. namespace {
  30. // Log error message
  31. static Status IOError(const std::string& context, int err_number) {
  32. return (err_number == ENOSPC)
  33. ? Status::NoSpace(context, strerror(err_number))
  34. : (err_number == ENOENT)
  35. ? Status::PathNotFound(context, strerror(err_number))
  36. : Status::IOError(context, strerror(err_number));
  37. }
  38. // assume that there is one global logger for now. It is not thread-safe,
  39. // but need not be because the logger is initialized at db-open time.
  40. static Logger* mylog = nullptr;
  41. // Used for reading a file from HDFS. It implements both sequential-read
  42. // access methods as well as random read access methods.
  43. class HdfsReadableFile : virtual public SequentialFile,
  44. virtual public RandomAccessFile {
  45. private:
  46. hdfsFS fileSys_;
  47. std::string filename_;
  48. hdfsFile hfile_;
  49. public:
  50. HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
  51. : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
  52. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
  53. filename_.c_str());
  54. hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
  55. ROCKS_LOG_DEBUG(mylog,
  56. "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
  57. filename_.c_str(), hfile_);
  58. }
  59. virtual ~HdfsReadableFile() {
  60. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
  61. filename_.c_str());
  62. hdfsCloseFile(fileSys_, hfile_);
  63. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
  64. filename_.c_str());
  65. hfile_ = nullptr;
  66. }
  67. bool isValid() {
  68. return hfile_ != nullptr;
  69. }
  70. // sequential access, read data at current offset in file
  71. virtual Status Read(size_t n, Slice* result, char* scratch) {
  72. Status s;
  73. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
  74. filename_.c_str(), n);
  75. char* buffer = scratch;
  76. size_t total_bytes_read = 0;
  77. tSize bytes_read = 0;
  78. tSize remaining_bytes = (tSize)n;
  79. // Read a total of n bytes repeatedly until we hit error or eof
  80. while (remaining_bytes > 0) {
  81. bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
  82. if (bytes_read <= 0) {
  83. break;
  84. }
  85. assert(bytes_read <= remaining_bytes);
  86. total_bytes_read += bytes_read;
  87. remaining_bytes -= bytes_read;
  88. buffer += bytes_read;
  89. }
  90. assert(total_bytes_read <= n);
  91. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
  92. filename_.c_str());
  93. if (bytes_read < 0) {
  94. s = IOError(filename_, errno);
  95. } else {
  96. *result = Slice(scratch, total_bytes_read);
  97. }
  98. return s;
  99. }
  100. // random access, read data from specified offset in file
  101. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  102. char* scratch) const {
  103. Status s;
  104. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
  105. filename_.c_str());
  106. ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
  107. (void*)scratch, (tSize)n);
  108. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
  109. filename_.c_str());
  110. *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
  111. if (bytes_read < 0) {
  112. // An error: return a non-ok status
  113. s = IOError(filename_, errno);
  114. }
  115. return s;
  116. }
  117. virtual Status Skip(uint64_t n) {
  118. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
  119. filename_.c_str());
  120. // get current offset from file
  121. tOffset current = hdfsTell(fileSys_, hfile_);
  122. if (current < 0) {
  123. return IOError(filename_, errno);
  124. }
  125. // seek to new offset in file
  126. tOffset newoffset = current + n;
  127. int val = hdfsSeek(fileSys_, hfile_, newoffset);
  128. if (val < 0) {
  129. return IOError(filename_, errno);
  130. }
  131. return Status::OK();
  132. }
  133. private:
  134. // returns true if we are at the end of file, false otherwise
  135. bool feof() {
  136. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
  137. filename_.c_str());
  138. if (hdfsTell(fileSys_, hfile_) == fileSize()) {
  139. return true;
  140. }
  141. return false;
  142. }
  143. // the current size of the file
  144. tOffset fileSize() {
  145. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
  146. filename_.c_str());
  147. hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
  148. tOffset size = 0L;
  149. if (pFileInfo != nullptr) {
  150. size = pFileInfo->mSize;
  151. hdfsFreeFileInfo(pFileInfo, 1);
  152. } else {
  153. throw HdfsFatalException("fileSize on unknown file " + filename_);
  154. }
  155. return size;
  156. }
  157. };
  158. // Appends to an existing file in HDFS.
  159. class HdfsWritableFile: public WritableFile {
  160. private:
  161. hdfsFS fileSys_;
  162. std::string filename_;
  163. hdfsFile hfile_;
  164. public:
  165. HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
  166. const EnvOptions& options)
  167. : WritableFile(options),
  168. fileSys_(fileSys),
  169. filename_(fname),
  170. hfile_(nullptr) {
  171. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
  172. filename_.c_str());
  173. hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
  174. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
  175. filename_.c_str());
  176. assert(hfile_ != nullptr);
  177. }
  178. virtual ~HdfsWritableFile() {
  179. if (hfile_ != nullptr) {
  180. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
  181. filename_.c_str());
  182. hdfsCloseFile(fileSys_, hfile_);
  183. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
  184. filename_.c_str());
  185. hfile_ = nullptr;
  186. }
  187. }
  188. // If the file was successfully created, then this returns true.
  189. // Otherwise returns false.
  190. bool isValid() {
  191. return hfile_ != nullptr;
  192. }
  193. // The name of the file, mostly needed for debug logging.
  194. const std::string& getName() {
  195. return filename_;
  196. }
  197. virtual Status Append(const Slice& data) {
  198. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
  199. filename_.c_str());
  200. const char* src = data.data();
  201. size_t left = data.size();
  202. size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
  203. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
  204. filename_.c_str());
  205. if (ret != left) {
  206. return IOError(filename_, errno);
  207. }
  208. return Status::OK();
  209. }
  210. virtual Status Flush() {
  211. return Status::OK();
  212. }
  213. virtual Status Sync() {
  214. Status s;
  215. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
  216. filename_.c_str());
  217. if (hdfsFlush(fileSys_, hfile_) == -1) {
  218. return IOError(filename_, errno);
  219. }
  220. if (hdfsHSync(fileSys_, hfile_) == -1) {
  221. return IOError(filename_, errno);
  222. }
  223. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
  224. filename_.c_str());
  225. return Status::OK();
  226. }
  227. // This is used by HdfsLogger to write data to the debug log file
  228. virtual Status Append(const char* src, size_t size) {
  229. if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
  230. static_cast<tSize>(size)) {
  231. return IOError(filename_, errno);
  232. }
  233. return Status::OK();
  234. }
  235. virtual Status Close() {
  236. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
  237. filename_.c_str());
  238. if (hdfsCloseFile(fileSys_, hfile_) != 0) {
  239. return IOError(filename_, errno);
  240. }
  241. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
  242. filename_.c_str());
  243. hfile_ = nullptr;
  244. return Status::OK();
  245. }
  246. };
  247. // The object that implements the debug logs to reside in HDFS.
  248. class HdfsLogger : public Logger {
  249. private:
  250. HdfsWritableFile* file_;
  251. uint64_t (*gettid_)(); // Return the thread id for the current thread
  252. Status HdfsCloseHelper() {
  253. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
  254. file_->getName().c_str());
  255. if (mylog != nullptr && mylog == this) {
  256. mylog = nullptr;
  257. }
  258. return Status::OK();
  259. }
  260. protected:
  261. virtual Status CloseImpl() override { return HdfsCloseHelper(); }
  262. public:
  263. HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
  264. : file_(f), gettid_(gettid) {
  265. ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
  266. file_->getName().c_str());
  267. }
  268. ~HdfsLogger() override {
  269. if (!closed_) {
  270. closed_ = true;
  271. HdfsCloseHelper();
  272. }
  273. }
  274. using Logger::Logv;
  275. void Logv(const char* format, va_list ap) override {
  276. const uint64_t thread_id = (*gettid_)();
  277. // We try twice: the first time with a fixed-size stack allocated buffer,
  278. // and the second time with a much larger dynamically allocated buffer.
  279. char buffer[500];
  280. for (int iter = 0; iter < 2; iter++) {
  281. char* base;
  282. int bufsize;
  283. if (iter == 0) {
  284. bufsize = sizeof(buffer);
  285. base = buffer;
  286. } else {
  287. bufsize = 30000;
  288. base = new char[bufsize];
  289. }
  290. char* p = base;
  291. char* limit = base + bufsize;
  292. struct timeval now_tv;
  293. gettimeofday(&now_tv, nullptr);
  294. const time_t seconds = now_tv.tv_sec;
  295. struct tm t;
  296. localtime_r(&seconds, &t);
  297. p += snprintf(p, limit - p,
  298. "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
  299. t.tm_year + 1900,
  300. t.tm_mon + 1,
  301. t.tm_mday,
  302. t.tm_hour,
  303. t.tm_min,
  304. t.tm_sec,
  305. static_cast<int>(now_tv.tv_usec),
  306. static_cast<long long unsigned int>(thread_id));
  307. // Print the message
  308. if (p < limit) {
  309. va_list backup_ap;
  310. va_copy(backup_ap, ap);
  311. p += vsnprintf(p, limit - p, format, backup_ap);
  312. va_end(backup_ap);
  313. }
  314. // Truncate to available space if necessary
  315. if (p >= limit) {
  316. if (iter == 0) {
  317. continue; // Try again with larger buffer
  318. } else {
  319. p = limit - 1;
  320. }
  321. }
  322. // Add newline if necessary
  323. if (p == base || p[-1] != '\n') {
  324. *p++ = '\n';
  325. }
  326. assert(p <= limit);
  327. file_->Append(base, p-base);
  328. file_->Flush();
  329. if (base != buffer) {
  330. delete[] base;
  331. }
  332. break;
  333. }
  334. }
  335. };
  336. } // namespace
  337. // Finally, the hdfs environment
  338. const std::string HdfsEnv::kProto = "hdfs://";
  339. const std::string HdfsEnv::pathsep = "/";
  340. // open a file for sequential reading
  341. Status HdfsEnv::NewSequentialFile(const std::string& fname,
  342. std::unique_ptr<SequentialFile>* result,
  343. const EnvOptions& /*options*/) {
  344. result->reset();
  345. HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
  346. if (f == nullptr || !f->isValid()) {
  347. delete f;
  348. *result = nullptr;
  349. return IOError(fname, errno);
  350. }
  351. result->reset(dynamic_cast<SequentialFile*>(f));
  352. return Status::OK();
  353. }
  354. // open a file for random reading
  355. Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
  356. std::unique_ptr<RandomAccessFile>* result,
  357. const EnvOptions& /*options*/) {
  358. result->reset();
  359. HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
  360. if (f == nullptr || !f->isValid()) {
  361. delete f;
  362. *result = nullptr;
  363. return IOError(fname, errno);
  364. }
  365. result->reset(dynamic_cast<RandomAccessFile*>(f));
  366. return Status::OK();
  367. }
  368. // create a new file for writing
  369. Status HdfsEnv::NewWritableFile(const std::string& fname,
  370. std::unique_ptr<WritableFile>* result,
  371. const EnvOptions& options) {
  372. result->reset();
  373. Status s;
  374. HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
  375. if (f == nullptr || !f->isValid()) {
  376. delete f;
  377. *result = nullptr;
  378. return IOError(fname, errno);
  379. }
  380. result->reset(dynamic_cast<WritableFile*>(f));
  381. return Status::OK();
  382. }
  383. class HdfsDirectory : public Directory {
  384. public:
  385. explicit HdfsDirectory(int fd) : fd_(fd) {}
  386. ~HdfsDirectory() {}
  387. Status Fsync() override { return Status::OK(); }
  388. int GetFd() const { return fd_; }
  389. private:
  390. int fd_;
  391. };
  392. Status HdfsEnv::NewDirectory(const std::string& name,
  393. std::unique_ptr<Directory>* result) {
  394. int value = hdfsExists(fileSys_, name.c_str());
  395. switch (value) {
  396. case HDFS_EXISTS:
  397. result->reset(new HdfsDirectory(0));
  398. return Status::OK();
  399. default: // fail if the directory doesn't exist
  400. ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
  401. throw HdfsFatalException("hdfsExists call failed with error " +
  402. ToString(value) + " on path " + name +
  403. ".\n");
  404. }
  405. }
  406. Status HdfsEnv::FileExists(const std::string& fname) {
  407. int value = hdfsExists(fileSys_, fname.c_str());
  408. switch (value) {
  409. case HDFS_EXISTS:
  410. return Status::OK();
  411. case HDFS_DOESNT_EXIST:
  412. return Status::NotFound();
  413. default: // anything else should be an error
  414. ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
  415. return Status::IOError("hdfsExists call failed with error " +
  416. ToString(value) + " on path " + fname + ".\n");
  417. }
  418. }
  419. Status HdfsEnv::GetChildren(const std::string& path,
  420. std::vector<std::string>* result) {
  421. int value = hdfsExists(fileSys_, path.c_str());
  422. switch (value) {
  423. case HDFS_EXISTS: { // directory exists
  424. int numEntries = 0;
  425. hdfsFileInfo* pHdfsFileInfo = 0;
  426. pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
  427. if (numEntries >= 0) {
  428. for(int i = 0; i < numEntries; i++) {
  429. std::string pathname(pHdfsFileInfo[i].mName);
  430. size_t pos = pathname.rfind("/");
  431. if (std::string::npos != pos) {
  432. result->push_back(pathname.substr(pos + 1));
  433. }
  434. }
  435. if (pHdfsFileInfo != nullptr) {
  436. hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
  437. }
  438. } else {
  439. // numEntries < 0 indicates error
  440. ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
  441. throw HdfsFatalException(
  442. "hdfsListDirectory call failed negative error.\n");
  443. }
  444. break;
  445. }
  446. case HDFS_DOESNT_EXIST: // directory does not exist, exit
  447. return Status::NotFound();
  448. default: // anything else should be an error
  449. ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
  450. throw HdfsFatalException("hdfsExists call failed with error " +
  451. ToString(value) + ".\n");
  452. }
  453. return Status::OK();
  454. }
  455. Status HdfsEnv::DeleteFile(const std::string& fname) {
  456. if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
  457. return Status::OK();
  458. }
  459. return IOError(fname, errno);
  460. };
  461. Status HdfsEnv::CreateDir(const std::string& name) {
  462. if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
  463. return Status::OK();
  464. }
  465. return IOError(name, errno);
  466. };
  467. Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
  468. const int value = hdfsExists(fileSys_, name.c_str());
  469. // Not atomic. state might change b/w hdfsExists and CreateDir.
  470. switch (value) {
  471. case HDFS_EXISTS:
  472. return Status::OK();
  473. case HDFS_DOESNT_EXIST:
  474. return CreateDir(name);
  475. default: // anything else should be an error
  476. ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
  477. throw HdfsFatalException("hdfsExists call failed with error " +
  478. ToString(value) + ".\n");
  479. }
  480. };
  481. Status HdfsEnv::DeleteDir(const std::string& name) {
  482. return DeleteFile(name);
  483. };
  484. Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
  485. *size = 0L;
  486. hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
  487. if (pFileInfo != nullptr) {
  488. *size = pFileInfo->mSize;
  489. hdfsFreeFileInfo(pFileInfo, 1);
  490. return Status::OK();
  491. }
  492. return IOError(fname, errno);
  493. }
  494. Status HdfsEnv::GetFileModificationTime(const std::string& fname,
  495. uint64_t* time) {
  496. hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
  497. if (pFileInfo != nullptr) {
  498. *time = static_cast<uint64_t>(pFileInfo->mLastMod);
  499. hdfsFreeFileInfo(pFileInfo, 1);
  500. return Status::OK();
  501. }
  502. return IOError(fname, errno);
  503. }
  504. // The rename is not atomic. HDFS does not allow a renaming if the
  505. // target already exists. So, we delete the target before attempting the
  506. // rename.
  507. Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
  508. hdfsDelete(fileSys_, target.c_str(), 1);
  509. if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
  510. return Status::OK();
  511. }
  512. return IOError(src, errno);
  513. }
  514. Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
  515. // there isn's a very good way to atomically check and create
  516. // a file via libhdfs
  517. *lock = nullptr;
  518. return Status::OK();
  519. }
  520. Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
  521. Status HdfsEnv::NewLogger(const std::string& fname,
  522. std::shared_ptr<Logger>* result) {
  523. // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
  524. // option is only intended for WAL/flush/compaction writes, so turn it off in
  525. // the logger.
  526. EnvOptions options;
  527. options.strict_bytes_per_sync = false;
  528. HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
  529. if (f == nullptr || !f->isValid()) {
  530. delete f;
  531. *result = nullptr;
  532. return IOError(fname, errno);
  533. }
  534. HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
  535. result->reset(h);
  536. if (mylog == nullptr) {
  537. // mylog = h; // uncomment this for detailed logging
  538. }
  539. return Status::OK();
  540. }
  541. // The factory method for creating an HDFS Env
  542. Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
  543. *hdfs_env = new HdfsEnv(fsname);
  544. return Status::OK();
  545. }
  546. } // namespace ROCKSDB_NAMESPACE
  547. #endif // ROCKSDB_HDFS_FILE_C
  548. #else // USE_HDFS
  549. // dummy placeholders used when HDFS is not available
  550. namespace ROCKSDB_NAMESPACE {
  551. Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
  552. std::unique_ptr<SequentialFile>* /*result*/,
  553. const EnvOptions& /*options*/) {
  554. return Status::NotSupported("Not compiled with hdfs support");
  555. }
  556. Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
  557. return Status::NotSupported("Not compiled with hdfs support");
  558. }
  559. } // namespace ROCKSDB_NAMESPACE
  560. #endif