| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//#include "rocksdb/env.h"#include "hdfs/env_hdfs.h"#ifdef USE_HDFS#ifndef ROCKSDB_HDFS_FILE_C#define ROCKSDB_HDFS_FILE_C#include <stdio.h>#include <sys/time.h>#include <time.h>#include <algorithm>#include <iostream>#include <sstream>#include "logging/logging.h"#include "rocksdb/status.h"#include "util/string_util.h"#define HDFS_EXISTS 0#define HDFS_DOESNT_EXIST -1#define HDFS_SUCCESS 0//// This file defines an HDFS environment for rocksdb. It uses the libhdfs// api to access HDFS. All HDFS files created by one instance of rocksdb// will reside on the same HDFS cluster.//namespace ROCKSDB_NAMESPACE {namespace {// Log error messagestatic Status IOError(const std::string& context, int err_number) {  return (err_number == ENOSPC)             ? Status::NoSpace(context, strerror(err_number))             : (err_number == ENOENT)                   ? Status::PathNotFound(context, strerror(err_number))                   : Status::IOError(context, strerror(err_number));}// assume that there is one global logger for now. It is not thread-safe,// but need not be because the logger is initialized at db-open time.static Logger* mylog = nullptr;// Used for reading a file from HDFS. It implements both sequential-read// access methods as well as random read access methods.class HdfsReadableFile : virtual public SequentialFile,                         virtual public RandomAccessFile { private:  hdfsFS fileSys_;  std::string filename_;  hdfsFile hfile_; public:  HdfsReadableFile(hdfsFS fileSys, const std::string& fname)      : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",                    filename_.c_str());    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);    ROCKS_LOG_DEBUG(mylog,                    "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",                    filename_.c_str(), hfile_);  }  virtual ~HdfsReadableFile() {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",                    filename_.c_str());    hdfsCloseFile(fileSys_, hfile_);    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",                    filename_.c_str());    hfile_ = nullptr;  }  bool isValid() {    return hfile_ != nullptr;  }  // sequential access, read data at current offset in file  virtual Status Read(size_t n, Slice* result, char* scratch) {    Status s;    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",                    filename_.c_str(), n);    char* buffer = scratch;    size_t total_bytes_read = 0;    tSize bytes_read = 0;    tSize remaining_bytes = (tSize)n;    // Read a total of n bytes repeatedly until we hit error or eof    while (remaining_bytes > 0) {      bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);      if (bytes_read <= 0) {        break;      }      assert(bytes_read <= remaining_bytes);      total_bytes_read += bytes_read;      remaining_bytes -= bytes_read;      buffer += bytes_read;    }    assert(total_bytes_read <= n);    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",                    filename_.c_str());    if (bytes_read < 0) {      s = IOError(filename_, errno);    } else {      *result = Slice(scratch, total_bytes_read);    }    return s;  }  // random access, read data from specified offset in file  virtual Status Read(uint64_t offset, size_t n, Slice* result,                      char* scratch) const {    Status s;    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",                    filename_.c_str());    ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,                                   (void*)scratch, (tSize)n);    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",                    filename_.c_str());    *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);    if (bytes_read < 0) {      // An error: return a non-ok status      s = IOError(filename_, errno);    }    return s;  }  virtual Status Skip(uint64_t n) {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",                    filename_.c_str());    // get current offset from file    tOffset current = hdfsTell(fileSys_, hfile_);    if (current < 0) {      return IOError(filename_, errno);    }    // seek to new offset in file    tOffset newoffset = current + n;    int val = hdfsSeek(fileSys_, hfile_, newoffset);    if (val < 0) {      return IOError(filename_, errno);    }    return Status::OK();  } private:  // returns true if we are at the end of file, false otherwise  bool feof() {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",                    filename_.c_str());    if (hdfsTell(fileSys_, hfile_) == fileSize()) {      return true;    }    return false;  }  // the current size of the file  tOffset fileSize() {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",                    filename_.c_str());    hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());    tOffset size = 0L;    if (pFileInfo != nullptr) {      size = pFileInfo->mSize;      hdfsFreeFileInfo(pFileInfo, 1);    } else {      throw HdfsFatalException("fileSize on unknown file " + filename_);    }    return size;  }};// Appends to an existing file in HDFS.class HdfsWritableFile: public WritableFile { private:  hdfsFS fileSys_;  std::string filename_;  hdfsFile hfile_; public:  HdfsWritableFile(hdfsFS fileSys, const std::string& fname,                   const EnvOptions& options)      : WritableFile(options),        fileSys_(fileSys),        filename_(fname),        hfile_(nullptr) {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",                    filename_.c_str());    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",                    filename_.c_str());    assert(hfile_ != nullptr);  }  virtual ~HdfsWritableFile() {    if (hfile_ != nullptr) {      ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",                      filename_.c_str());      hdfsCloseFile(fileSys_, hfile_);      ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",                      filename_.c_str());      hfile_ = nullptr;    }  }  // If the file was successfully created, then this returns true.  // Otherwise returns false.  bool isValid() {    return hfile_ != nullptr;  }  // The name of the file, mostly needed for debug logging.  const std::string& getName() {    return filename_;  }  virtual Status Append(const Slice& data) {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",                    filename_.c_str());    const char* src = data.data();    size_t left = data.size();    size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",                    filename_.c_str());    if (ret != left) {      return IOError(filename_, errno);    }    return Status::OK();  }  virtual Status Flush() {    return Status::OK();  }  virtual Status Sync() {    Status s;    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",                    filename_.c_str());    if (hdfsFlush(fileSys_, hfile_) == -1) {      return IOError(filename_, errno);    }    if (hdfsHSync(fileSys_, hfile_) == -1) {      return IOError(filename_, errno);    }    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",                    filename_.c_str());    return Status::OK();  }  // This is used by HdfsLogger to write data to the debug log file  virtual Status Append(const char* src, size_t size) {    if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=        static_cast<tSize>(size)) {      return IOError(filename_, errno);    }    return Status::OK();  }  virtual Status Close() {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",                    filename_.c_str());    if (hdfsCloseFile(fileSys_, hfile_) != 0) {      return IOError(filename_, errno);    }    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",                    filename_.c_str());    hfile_ = nullptr;    return Status::OK();  }};// The object that implements the debug logs to reside in HDFS.class HdfsLogger : public Logger { private:  HdfsWritableFile* file_;  uint64_t (*gettid_)();  // Return the thread id for the current thread  Status HdfsCloseHelper() {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",                    file_->getName().c_str());    if (mylog != nullptr && mylog == this) {      mylog = nullptr;    }    return Status::OK();  } protected:  virtual Status CloseImpl() override { return HdfsCloseHelper(); } public:  HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())      : file_(f), gettid_(gettid) {    ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",                    file_->getName().c_str());  }  ~HdfsLogger() override {    if (!closed_) {      closed_ = true;      HdfsCloseHelper();    }  }  using Logger::Logv;  void Logv(const char* format, va_list ap) override {    const uint64_t thread_id = (*gettid_)();    // We try twice: the first time with a fixed-size stack allocated buffer,    // and the second time with a much larger dynamically allocated buffer.    char buffer[500];    for (int iter = 0; iter < 2; iter++) {      char* base;      int bufsize;      if (iter == 0) {        bufsize = sizeof(buffer);        base = buffer;      } else {        bufsize = 30000;        base = new char[bufsize];      }      char* p = base;      char* limit = base + bufsize;      struct timeval now_tv;      gettimeofday(&now_tv, nullptr);      const time_t seconds = now_tv.tv_sec;      struct tm t;      localtime_r(&seconds, &t);      p += snprintf(p, limit - p,                    "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",                    t.tm_year + 1900,                    t.tm_mon + 1,                    t.tm_mday,                    t.tm_hour,                    t.tm_min,                    t.tm_sec,                    static_cast<int>(now_tv.tv_usec),                    static_cast<long long unsigned int>(thread_id));      // Print the message      if (p < limit) {        va_list backup_ap;        va_copy(backup_ap, ap);        p += vsnprintf(p, limit - p, format, backup_ap);        va_end(backup_ap);      }      // Truncate to available space if necessary      if (p >= limit) {        if (iter == 0) {          continue;       // Try again with larger buffer        } else {          p = limit - 1;        }      }      // Add newline if necessary      if (p == base || p[-1] != '\n') {        *p++ = '\n';      }      assert(p <= limit);      file_->Append(base, p-base);      file_->Flush();      if (base != buffer) {        delete[] base;      }      break;    }  }};}  // namespace// Finally, the hdfs environmentconst std::string HdfsEnv::kProto = "hdfs://";const std::string HdfsEnv::pathsep = "/";// open a file for sequential readingStatus HdfsEnv::NewSequentialFile(const std::string& fname,                                  std::unique_ptr<SequentialFile>* result,                                  const EnvOptions& /*options*/) {  result->reset();  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);  if (f == nullptr || !f->isValid()) {    delete f;    *result = nullptr;    return IOError(fname, errno);  }  result->reset(dynamic_cast<SequentialFile*>(f));  return Status::OK();}// open a file for random readingStatus HdfsEnv::NewRandomAccessFile(const std::string& fname,                                    std::unique_ptr<RandomAccessFile>* result,                                    const EnvOptions& /*options*/) {  result->reset();  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);  if (f == nullptr || !f->isValid()) {    delete f;    *result = nullptr;    return IOError(fname, errno);  }  result->reset(dynamic_cast<RandomAccessFile*>(f));  return Status::OK();}// create a new file for writingStatus HdfsEnv::NewWritableFile(const std::string& fname,                                std::unique_ptr<WritableFile>* result,                                const EnvOptions& options) {  result->reset();  Status s;  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);  if (f == nullptr || !f->isValid()) {    delete f;    *result = nullptr;    return IOError(fname, errno);  }  result->reset(dynamic_cast<WritableFile*>(f));  return Status::OK();}class HdfsDirectory : public Directory { public:  explicit HdfsDirectory(int fd) : fd_(fd) {}  ~HdfsDirectory() {}  Status Fsync() override { return Status::OK(); }  int GetFd() const { return fd_; } private:  int fd_;};Status HdfsEnv::NewDirectory(const std::string& name,                             std::unique_ptr<Directory>* result) {  int value = hdfsExists(fileSys_, name.c_str());  switch (value) {    case HDFS_EXISTS:      result->reset(new HdfsDirectory(0));      return Status::OK();    default:  // fail if the directory doesn't exist      ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");      throw HdfsFatalException("hdfsExists call failed with error " +                               ToString(value) + " on path " + name +                               ".\n");  }}Status HdfsEnv::FileExists(const std::string& fname) {  int value = hdfsExists(fileSys_, fname.c_str());  switch (value) {    case HDFS_EXISTS:      return Status::OK();    case HDFS_DOESNT_EXIST:      return Status::NotFound();    default:  // anything else should be an error      ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");      return Status::IOError("hdfsExists call failed with error " +                             ToString(value) + " on path " + fname + ".\n");  }}Status HdfsEnv::GetChildren(const std::string& path,                            std::vector<std::string>* result) {  int value = hdfsExists(fileSys_, path.c_str());  switch (value) {    case HDFS_EXISTS: {  // directory exists    int numEntries = 0;    hdfsFileInfo* pHdfsFileInfo = 0;    pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);    if (numEntries >= 0) {      for(int i = 0; i < numEntries; i++) {        std::string pathname(pHdfsFileInfo[i].mName);        size_t pos = pathname.rfind("/");        if (std::string::npos != pos) {          result->push_back(pathname.substr(pos + 1));        }      }      if (pHdfsFileInfo != nullptr) {        hdfsFreeFileInfo(pHdfsFileInfo, numEntries);      }    } else {      // numEntries < 0 indicates error      ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");      throw HdfsFatalException(          "hdfsListDirectory call failed negative error.\n");    }    break;  }  case HDFS_DOESNT_EXIST:  // directory does not exist, exit    return Status::NotFound();  default:          // anything else should be an error    ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");    throw HdfsFatalException("hdfsExists call failed with error " +                             ToString(value) + ".\n");  }  return Status::OK();}Status HdfsEnv::DeleteFile(const std::string& fname) {  if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {    return Status::OK();  }  return IOError(fname, errno);};Status HdfsEnv::CreateDir(const std::string& name) {  if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {    return Status::OK();  }  return IOError(name, errno);};Status HdfsEnv::CreateDirIfMissing(const std::string& name) {  const int value = hdfsExists(fileSys_, name.c_str());  //  Not atomic. state might change b/w hdfsExists and CreateDir.  switch (value) {    case HDFS_EXISTS:    return Status::OK();    case HDFS_DOESNT_EXIST:    return CreateDir(name);    default:  // anything else should be an error      ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");      throw HdfsFatalException("hdfsExists call failed with error " +                               ToString(value) + ".\n");  }};Status HdfsEnv::DeleteDir(const std::string& name) {  return DeleteFile(name);};Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {  *size = 0L;  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());  if (pFileInfo != nullptr) {    *size = pFileInfo->mSize;    hdfsFreeFileInfo(pFileInfo, 1);    return Status::OK();  }  return IOError(fname, errno);}Status HdfsEnv::GetFileModificationTime(const std::string& fname,                                        uint64_t* time) {  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());  if (pFileInfo != nullptr) {    *time = static_cast<uint64_t>(pFileInfo->mLastMod);    hdfsFreeFileInfo(pFileInfo, 1);    return Status::OK();  }  return IOError(fname, errno);}// The rename is not atomic. HDFS does not allow a renaming if the// target already exists. So, we delete the target before attempting the// rename.Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {  hdfsDelete(fileSys_, target.c_str(), 1);  if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {    return Status::OK();  }  return IOError(src, errno);}Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {  // there isn's a very good way to atomically check and create  // a file via libhdfs  *lock = nullptr;  return Status::OK();}Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }Status HdfsEnv::NewLogger(const std::string& fname,                          std::shared_ptr<Logger>* result) {  // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That  // option is only intended for WAL/flush/compaction writes, so turn it off in  // the logger.  EnvOptions options;  options.strict_bytes_per_sync = false;  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);  if (f == nullptr || !f->isValid()) {    delete f;    *result = nullptr;    return IOError(fname, errno);  }  HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);  result->reset(h);  if (mylog == nullptr) {    // mylog = h; // uncomment this for detailed logging  }  return Status::OK();}// The factory method for creating an HDFS EnvStatus NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {  *hdfs_env = new HdfsEnv(fsname);  return Status::OK();}}  // namespace ROCKSDB_NAMESPACE#endif // ROCKSDB_HDFS_FILE_C#else // USE_HDFS// dummy placeholders used when HDFS is not availablenamespace ROCKSDB_NAMESPACE {Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,                                  std::unique_ptr<SequentialFile>* /*result*/,                                  const EnvOptions& /*options*/) {  return Status::NotSupported("Not compiled with hdfs support");} Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {   return Status::NotSupported("Not compiled with hdfs support"); } }  // namespace ROCKSDB_NAMESPACE#endif
 |