| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- #pragma once
- #include <algorithm>
- #include <stdio.h>
- #include <time.h>
- #include <iostream>
- #include "port/sys_time.h"
- #include "rocksdb/env.h"
- #include "rocksdb/status.h"
- #ifdef USE_HDFS
- #include <hdfs.h>
- namespace ROCKSDB_NAMESPACE {
- // Thrown during execution when there is an issue with the supplied
- // arguments.
- class HdfsUsageException : public std::exception { };
- // A simple exception that indicates something went wrong that is not
- // recoverable. The intention is for the message to be printed (with
- // nothing else) and the process terminate.
- class HdfsFatalException : public std::exception {
- public:
- explicit HdfsFatalException(const std::string& s) : what_(s) { }
- virtual ~HdfsFatalException() throw() { }
- virtual const char* what() const throw() {
- return what_.c_str();
- }
- private:
- const std::string what_;
- };
- //
- // The HDFS environment for rocksdb. This class overrides all the
- // file/dir access methods and delegates the thread-mgmt methods to the
- // default posix environment.
- //
- class HdfsEnv : public Env {
- public:
- explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
- posixEnv = Env::Default();
- fileSys_ = connectToPath(fsname_);
- }
- virtual ~HdfsEnv() {
- fprintf(stderr, "Destroying HdfsEnv::Default()\n");
- hdfsDisconnect(fileSys_);
- }
- Status NewSequentialFile(const std::string& fname,
- std::unique_ptr<SequentialFile>* result,
- const EnvOptions& options) override;
- Status NewRandomAccessFile(const std::string& fname,
- std::unique_ptr<RandomAccessFile>* result,
- const EnvOptions& options) override;
- Status NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options) override;
- Status NewDirectory(const std::string& name,
- std::unique_ptr<Directory>* result) override;
- Status FileExists(const std::string& fname) override;
- Status GetChildren(const std::string& path,
- std::vector<std::string>* result) override;
- Status DeleteFile(const std::string& fname) override;
- Status CreateDir(const std::string& name) override;
- Status CreateDirIfMissing(const std::string& name) override;
- Status DeleteDir(const std::string& name) override;
- Status GetFileSize(const std::string& fname, uint64_t* size) override;
- Status GetFileModificationTime(const std::string& fname,
- uint64_t* file_mtime) override;
- Status RenameFile(const std::string& src, const std::string& target) override;
- Status LinkFile(const std::string& /*src*/,
- const std::string& /*target*/) override {
- return Status::NotSupported(); // not supported
- }
- Status LockFile(const std::string& fname, FileLock** lock) override;
- Status UnlockFile(FileLock* lock) override;
- Status NewLogger(const std::string& fname,
- std::shared_ptr<Logger>* result) override;
- void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
- void* tag = nullptr,
- void (*unschedFunction)(void* arg) = 0) override {
- posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
- }
- int UnSchedule(void* tag, Priority pri) override {
- return posixEnv->UnSchedule(tag, pri);
- }
- void StartThread(void (*function)(void* arg), void* arg) override {
- posixEnv->StartThread(function, arg);
- }
- void WaitForJoin() override { posixEnv->WaitForJoin(); }
- unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
- return posixEnv->GetThreadPoolQueueLen(pri);
- }
- Status GetTestDirectory(std::string* path) override {
- return posixEnv->GetTestDirectory(path);
- }
- uint64_t NowMicros() override { return posixEnv->NowMicros(); }
- void SleepForMicroseconds(int micros) override {
- posixEnv->SleepForMicroseconds(micros);
- }
- Status GetHostName(char* name, uint64_t len) override {
- return posixEnv->GetHostName(name, len);
- }
- Status GetCurrentTime(int64_t* unix_time) override {
- return posixEnv->GetCurrentTime(unix_time);
- }
- Status GetAbsolutePath(const std::string& db_path,
- std::string* output_path) override {
- return posixEnv->GetAbsolutePath(db_path, output_path);
- }
- void SetBackgroundThreads(int number, Priority pri = LOW) override {
- posixEnv->SetBackgroundThreads(number, pri);
- }
- int GetBackgroundThreads(Priority pri = LOW) override {
- return posixEnv->GetBackgroundThreads(pri);
- }
- void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
- posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
- }
- std::string TimeToString(uint64_t number) override {
- return posixEnv->TimeToString(number);
- }
- static uint64_t gettid() {
- assert(sizeof(pthread_t) <= sizeof(uint64_t));
- return (uint64_t)pthread_self();
- }
- uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
- private:
- std::string fsname_; // string of the form "hdfs://hostname:port/"
- hdfsFS fileSys_; // a single FileSystem object for all files
- Env* posixEnv; // This object is derived from Env, but not from
- // posixEnv. We have posixnv as an encapsulated
- // object here so that we can use posix timers,
- // posix threads, etc.
- static const std::string kProto;
- static const std::string pathsep;
- /**
- * If the URI is specified of the form hdfs://server:port/path,
- * then connect to the specified cluster
- * else connect to default.
- */
- hdfsFS connectToPath(const std::string& uri) {
- if (uri.empty()) {
- return nullptr;
- }
- if (uri.find(kProto) != 0) {
- // uri doesn't start with hdfs:// -> use default:0, which is special
- // to libhdfs.
- return hdfsConnectNewInstance("default", 0);
- }
- const std::string hostport = uri.substr(kProto.length());
- std::vector <std::string> parts;
- split(hostport, ':', parts);
- if (parts.size() != 2) {
- throw HdfsFatalException("Bad uri for hdfs " + uri);
- }
- // parts[0] = hosts, parts[1] = port/xxx/yyy
- std::string host(parts[0]);
- std::string remaining(parts[1]);
- int rem = static_cast<int>(remaining.find(pathsep));
- std::string portStr = (rem == 0 ? remaining :
- remaining.substr(0, rem));
- tPort port;
- port = atoi(portStr.c_str());
- if (port == 0) {
- throw HdfsFatalException("Bad host-port for hdfs " + uri);
- }
- hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
- return fs;
- }
- void split(const std::string &s, char delim,
- std::vector<std::string> &elems) {
- elems.clear();
- size_t prev = 0;
- size_t pos = s.find(delim);
- while (pos != std::string::npos) {
- elems.push_back(s.substr(prev, pos));
- prev = pos + 1;
- pos = s.find(delim, prev);
- }
- elems.push_back(s.substr(prev, s.size()));
- }
- };
- } // namespace ROCKSDB_NAMESPACE
- #else // USE_HDFS
- namespace ROCKSDB_NAMESPACE {
- static const Status notsup;
- class HdfsEnv : public Env {
- public:
- explicit HdfsEnv(const std::string& /*fsname*/) {
- fprintf(stderr, "You have not build rocksdb with HDFS support\n");
- fprintf(stderr, "Please see hdfs/README for details\n");
- abort();
- }
- virtual ~HdfsEnv() {
- }
- virtual Status NewSequentialFile(const std::string& fname,
- std::unique_ptr<SequentialFile>* result,
- const EnvOptions& options) override;
- virtual Status NewRandomAccessFile(
- const std::string& /*fname*/,
- std::unique_ptr<RandomAccessFile>* /*result*/,
- const EnvOptions& /*options*/) override {
- return notsup;
- }
- virtual Status NewWritableFile(const std::string& /*fname*/,
- std::unique_ptr<WritableFile>* /*result*/,
- const EnvOptions& /*options*/) override {
- return notsup;
- }
- virtual Status NewDirectory(const std::string& /*name*/,
- std::unique_ptr<Directory>* /*result*/) override {
- return notsup;
- }
- virtual Status FileExists(const std::string& /*fname*/) override {
- return notsup;
- }
- virtual Status GetChildren(const std::string& /*path*/,
- std::vector<std::string>* /*result*/) override {
- return notsup;
- }
- virtual Status DeleteFile(const std::string& /*fname*/) override {
- return notsup;
- }
- virtual Status CreateDir(const std::string& /*name*/) override {
- return notsup;
- }
- virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
- return notsup;
- }
- virtual Status DeleteDir(const std::string& /*name*/) override {
- return notsup;
- }
- virtual Status GetFileSize(const std::string& /*fname*/,
- uint64_t* /*size*/) override {
- return notsup;
- }
- virtual Status GetFileModificationTime(const std::string& /*fname*/,
- uint64_t* /*time*/) override {
- return notsup;
- }
- virtual Status RenameFile(const std::string& /*src*/,
- const std::string& /*target*/) override {
- return notsup;
- }
- virtual Status LinkFile(const std::string& /*src*/,
- const std::string& /*target*/) override {
- return notsup;
- }
- virtual Status LockFile(const std::string& /*fname*/,
- FileLock** /*lock*/) override {
- return notsup;
- }
- virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; }
- virtual Status NewLogger(const std::string& /*fname*/,
- std::shared_ptr<Logger>* /*result*/) override {
- return notsup;
- }
- virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
- Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
- void (* /*unschedFunction*/)(void* arg) = 0) override {}
- virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
- virtual void StartThread(void (* /*function*/)(void* arg),
- void* /*arg*/) override {}
- virtual void WaitForJoin() override {}
- virtual unsigned int GetThreadPoolQueueLen(
- Priority /*pri*/ = LOW) const override {
- return 0;
- }
- virtual Status GetTestDirectory(std::string* /*path*/) override {
- return notsup;
- }
- virtual uint64_t NowMicros() override { return 0; }
- virtual void SleepForMicroseconds(int /*micros*/) override {}
- virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
- return notsup;
- }
- virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
- return notsup;
- }
- virtual Status GetAbsolutePath(const std::string& /*db_path*/,
- std::string* /*outputpath*/) override {
- return notsup;
- }
- virtual void SetBackgroundThreads(int /*number*/,
- Priority /*pri*/ = LOW) override {}
- virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
- return 0;
- }
- virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
- Priority /*pri*/) override {}
- virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
- virtual uint64_t GetThreadID() const override {
- return 0;
- }
- };
- } // namespace ROCKSDB_NAMESPACE
- #endif // USE_HDFS
|