| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//#pragma once#include <algorithm>#include <stdio.h>#include <time.h>#include <iostream>#include "port/sys_time.h"#include "rocksdb/env.h"#include "rocksdb/status.h"#ifdef USE_HDFS#include <hdfs.h>namespace ROCKSDB_NAMESPACE {// Thrown during execution when there is an issue with the supplied// arguments.class HdfsUsageException : public std::exception { };// A simple exception that indicates something went wrong that is not// recoverable.  The intention is for the message to be printed (with// nothing else) and the process terminate.class HdfsFatalException : public std::exception {public:  explicit HdfsFatalException(const std::string& s) : what_(s) { }  virtual ~HdfsFatalException() throw() { }  virtual const char* what() const throw() {    return what_.c_str();  }private:  const std::string what_;};//// The HDFS environment for rocksdb. This class overrides all the// file/dir access methods and delegates the thread-mgmt methods to the// default posix environment.//class HdfsEnv : public Env { public:  explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {    posixEnv = Env::Default();    fileSys_ = connectToPath(fsname_);  }  virtual ~HdfsEnv() {    fprintf(stderr, "Destroying HdfsEnv::Default()\n");    hdfsDisconnect(fileSys_);  }  Status NewSequentialFile(const std::string& fname,                           std::unique_ptr<SequentialFile>* result,                           const EnvOptions& options) override;  Status NewRandomAccessFile(const std::string& fname,                             std::unique_ptr<RandomAccessFile>* result,                             const EnvOptions& options) override;  Status NewWritableFile(const std::string& fname,                         std::unique_ptr<WritableFile>* result,                         const EnvOptions& options) override;  Status NewDirectory(const std::string& name,                      std::unique_ptr<Directory>* result) override;  Status FileExists(const std::string& fname) override;  Status GetChildren(const std::string& path,                     std::vector<std::string>* result) override;  Status DeleteFile(const std::string& fname) override;  Status CreateDir(const std::string& name) override;  Status CreateDirIfMissing(const std::string& name) override;  Status DeleteDir(const std::string& name) override;  Status GetFileSize(const std::string& fname, uint64_t* size) override;  Status GetFileModificationTime(const std::string& fname,                                 uint64_t* file_mtime) override;  Status RenameFile(const std::string& src, const std::string& target) override;  Status LinkFile(const std::string& /*src*/,                  const std::string& /*target*/) override {    return Status::NotSupported(); // not supported  }  Status LockFile(const std::string& fname, FileLock** lock) override;  Status UnlockFile(FileLock* lock) override;  Status NewLogger(const std::string& fname,                   std::shared_ptr<Logger>* result) override;  void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,                void* tag = nullptr,                void (*unschedFunction)(void* arg) = 0) override {    posixEnv->Schedule(function, arg, pri, tag, unschedFunction);  }  int UnSchedule(void* tag, Priority pri) override {    return posixEnv->UnSchedule(tag, pri);  }  void StartThread(void (*function)(void* arg), void* arg) override {    posixEnv->StartThread(function, arg);  }  void WaitForJoin() override { posixEnv->WaitForJoin(); }  unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {    return posixEnv->GetThreadPoolQueueLen(pri);  }  Status GetTestDirectory(std::string* path) override {    return posixEnv->GetTestDirectory(path);  }  uint64_t NowMicros() override { return posixEnv->NowMicros(); }  void SleepForMicroseconds(int micros) override {    posixEnv->SleepForMicroseconds(micros);  }  Status GetHostName(char* name, uint64_t len) override {    return posixEnv->GetHostName(name, len);  }  Status GetCurrentTime(int64_t* unix_time) override {    return posixEnv->GetCurrentTime(unix_time);  }  Status GetAbsolutePath(const std::string& db_path,                         std::string* output_path) override {    return posixEnv->GetAbsolutePath(db_path, output_path);  }  void SetBackgroundThreads(int number, Priority pri = LOW) override {    posixEnv->SetBackgroundThreads(number, pri);  }  int GetBackgroundThreads(Priority pri = LOW) override {    return posixEnv->GetBackgroundThreads(pri);  }  void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {    posixEnv->IncBackgroundThreadsIfNeeded(number, pri);  }  std::string TimeToString(uint64_t number) override {    return posixEnv->TimeToString(number);  }  static uint64_t gettid() {    assert(sizeof(pthread_t) <= sizeof(uint64_t));    return (uint64_t)pthread_self();  }  uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } private:  std::string fsname_;  // string of the form "hdfs://hostname:port/"  hdfsFS fileSys_;      //  a single FileSystem object for all files  Env*  posixEnv;       // This object is derived from Env, but not from                        // posixEnv. We have posixnv as an encapsulated                        // object here so that we can use posix timers,                        // posix threads, etc.  static const std::string kProto;  static const std::string pathsep;  /**   * If the URI is specified of the form hdfs://server:port/path,   * then connect to the specified cluster   * else connect to default.   */  hdfsFS connectToPath(const std::string& uri) {    if (uri.empty()) {      return nullptr;    }    if (uri.find(kProto) != 0) {      // uri doesn't start with hdfs:// -> use default:0, which is special      // to libhdfs.      return hdfsConnectNewInstance("default", 0);    }    const std::string hostport = uri.substr(kProto.length());    std::vector <std::string> parts;    split(hostport, ':', parts);    if (parts.size() != 2) {      throw HdfsFatalException("Bad uri for hdfs " + uri);    }    // parts[0] = hosts, parts[1] = port/xxx/yyy    std::string host(parts[0]);    std::string remaining(parts[1]);    int rem = static_cast<int>(remaining.find(pathsep));    std::string portStr = (rem == 0 ? remaining :                           remaining.substr(0, rem));    tPort port;    port = atoi(portStr.c_str());    if (port == 0) {      throw HdfsFatalException("Bad host-port for hdfs " + uri);    }    hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);    return fs;  }  void split(const std::string &s, char delim,             std::vector<std::string> &elems) {    elems.clear();    size_t prev = 0;    size_t pos = s.find(delim);    while (pos != std::string::npos) {      elems.push_back(s.substr(prev, pos));      prev = pos + 1;      pos = s.find(delim, prev);    }    elems.push_back(s.substr(prev, s.size()));  }};}  // namespace ROCKSDB_NAMESPACE#else // USE_HDFSnamespace ROCKSDB_NAMESPACE {static const Status notsup;class HdfsEnv : public Env { public:  explicit HdfsEnv(const std::string& /*fsname*/) {    fprintf(stderr, "You have not build rocksdb with HDFS support\n");    fprintf(stderr, "Please see hdfs/README for details\n");    abort();  }  virtual ~HdfsEnv() {  }  virtual Status NewSequentialFile(const std::string& fname,                                   std::unique_ptr<SequentialFile>* result,                                   const EnvOptions& options) override;  virtual Status NewRandomAccessFile(      const std::string& /*fname*/,      std::unique_ptr<RandomAccessFile>* /*result*/,      const EnvOptions& /*options*/) override {    return notsup;  }  virtual Status NewWritableFile(const std::string& /*fname*/,                                 std::unique_ptr<WritableFile>* /*result*/,                                 const EnvOptions& /*options*/) override {    return notsup;  }  virtual Status NewDirectory(const std::string& /*name*/,                              std::unique_ptr<Directory>* /*result*/) override {    return notsup;  }  virtual Status FileExists(const std::string& /*fname*/) override {    return notsup;  }  virtual Status GetChildren(const std::string& /*path*/,                             std::vector<std::string>* /*result*/) override {    return notsup;  }  virtual Status DeleteFile(const std::string& /*fname*/) override {    return notsup;  }  virtual Status CreateDir(const std::string& /*name*/) override {    return notsup;  }  virtual Status CreateDirIfMissing(const std::string& /*name*/) override {    return notsup;  }  virtual Status DeleteDir(const std::string& /*name*/) override {    return notsup;  }  virtual Status GetFileSize(const std::string& /*fname*/,                             uint64_t* /*size*/) override {    return notsup;  }  virtual Status GetFileModificationTime(const std::string& /*fname*/,                                         uint64_t* /*time*/) override {    return notsup;  }  virtual Status RenameFile(const std::string& /*src*/,                            const std::string& /*target*/) override {    return notsup;  }  virtual Status LinkFile(const std::string& /*src*/,                          const std::string& /*target*/) override {    return notsup;  }  virtual Status LockFile(const std::string& /*fname*/,                          FileLock** /*lock*/) override {    return notsup;  }  virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; }  virtual Status NewLogger(const std::string& /*fname*/,                           std::shared_ptr<Logger>* /*result*/) override {    return notsup;  }  virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,                        Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,                        void (* /*unschedFunction*/)(void* arg) = 0) override {}  virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }  virtual void StartThread(void (* /*function*/)(void* arg),                           void* /*arg*/) override {}  virtual void WaitForJoin() override {}  virtual unsigned int GetThreadPoolQueueLen(      Priority /*pri*/ = LOW) const override {    return 0;  }  virtual Status GetTestDirectory(std::string* /*path*/) override {    return notsup;  }  virtual uint64_t NowMicros() override { return 0; }  virtual void SleepForMicroseconds(int /*micros*/) override {}  virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {    return notsup;  }  virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {    return notsup;  }  virtual Status GetAbsolutePath(const std::string& /*db_path*/,                                 std::string* /*outputpath*/) override {    return notsup;  }  virtual void SetBackgroundThreads(int /*number*/,                                    Priority /*pri*/ = LOW) override {}  virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {    return 0;  }  virtual void IncBackgroundThreadsIfNeeded(int /*number*/,                                            Priority /*pri*/) override {}  virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }  virtual uint64_t GetThreadID() const override {    return 0;  }};}  // namespace ROCKSDB_NAMESPACE#endif // USE_HDFS
 |