| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- #include "rocksdb/env.h"
- #include "hdfs/env_hdfs.h"
- #ifdef USE_HDFS
- #ifndef ROCKSDB_HDFS_FILE_C
- #define ROCKSDB_HDFS_FILE_C
- #include <stdio.h>
- #include <sys/time.h>
- #include <time.h>
- #include <algorithm>
- #include <iostream>
- #include <sstream>
- #include "logging/logging.h"
- #include "rocksdb/status.h"
- #include "util/string_util.h"
- #define HDFS_EXISTS 0
- #define HDFS_DOESNT_EXIST -1
- #define HDFS_SUCCESS 0
- //
- // This file defines an HDFS environment for rocksdb. It uses the libhdfs
- // api to access HDFS. All HDFS files created by one instance of rocksdb
- // will reside on the same HDFS cluster.
- //
- namespace ROCKSDB_NAMESPACE {
- namespace {
- // Log error message
- static Status IOError(const std::string& context, int err_number) {
- return (err_number == ENOSPC)
- ? Status::NoSpace(context, strerror(err_number))
- : (err_number == ENOENT)
- ? Status::PathNotFound(context, strerror(err_number))
- : Status::IOError(context, strerror(err_number));
- }
- // assume that there is one global logger for now. It is not thread-safe,
- // but need not be because the logger is initialized at db-open time.
- static Logger* mylog = nullptr;
- // Used for reading a file from HDFS. It implements both sequential-read
- // access methods as well as random read access methods.
- class HdfsReadableFile : virtual public SequentialFile,
- virtual public RandomAccessFile {
- private:
- hdfsFS fileSys_;
- std::string filename_;
- hdfsFile hfile_;
- public:
- HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
- : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
- filename_.c_str());
- hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
- ROCKS_LOG_DEBUG(mylog,
- "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
- filename_.c_str(), hfile_);
- }
- virtual ~HdfsReadableFile() {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
- filename_.c_str());
- hdfsCloseFile(fileSys_, hfile_);
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
- filename_.c_str());
- hfile_ = nullptr;
- }
- bool isValid() {
- return hfile_ != nullptr;
- }
- // sequential access, read data at current offset in file
- virtual Status Read(size_t n, Slice* result, char* scratch) {
- Status s;
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
- filename_.c_str(), n);
- char* buffer = scratch;
- size_t total_bytes_read = 0;
- tSize bytes_read = 0;
- tSize remaining_bytes = (tSize)n;
- // Read a total of n bytes repeatedly until we hit error or eof
- while (remaining_bytes > 0) {
- bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
- if (bytes_read <= 0) {
- break;
- }
- assert(bytes_read <= remaining_bytes);
- total_bytes_read += bytes_read;
- remaining_bytes -= bytes_read;
- buffer += bytes_read;
- }
- assert(total_bytes_read <= n);
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
- filename_.c_str());
- if (bytes_read < 0) {
- s = IOError(filename_, errno);
- } else {
- *result = Slice(scratch, total_bytes_read);
- }
- return s;
- }
- // random access, read data from specified offset in file
- virtual Status Read(uint64_t offset, size_t n, Slice* result,
- char* scratch) const {
- Status s;
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
- filename_.c_str());
- ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
- (void*)scratch, (tSize)n);
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
- filename_.c_str());
- *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
- if (bytes_read < 0) {
- // An error: return a non-ok status
- s = IOError(filename_, errno);
- }
- return s;
- }
- virtual Status Skip(uint64_t n) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
- filename_.c_str());
- // get current offset from file
- tOffset current = hdfsTell(fileSys_, hfile_);
- if (current < 0) {
- return IOError(filename_, errno);
- }
- // seek to new offset in file
- tOffset newoffset = current + n;
- int val = hdfsSeek(fileSys_, hfile_, newoffset);
- if (val < 0) {
- return IOError(filename_, errno);
- }
- return Status::OK();
- }
- private:
- // returns true if we are at the end of file, false otherwise
- bool feof() {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
- filename_.c_str());
- if (hdfsTell(fileSys_, hfile_) == fileSize()) {
- return true;
- }
- return false;
- }
- // the current size of the file
- tOffset fileSize() {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
- filename_.c_str());
- hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
- tOffset size = 0L;
- if (pFileInfo != nullptr) {
- size = pFileInfo->mSize;
- hdfsFreeFileInfo(pFileInfo, 1);
- } else {
- throw HdfsFatalException("fileSize on unknown file " + filename_);
- }
- return size;
- }
- };
- // Appends to an existing file in HDFS.
- class HdfsWritableFile: public WritableFile {
- private:
- hdfsFS fileSys_;
- std::string filename_;
- hdfsFile hfile_;
- public:
- HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
- const EnvOptions& options)
- : WritableFile(options),
- fileSys_(fileSys),
- filename_(fname),
- hfile_(nullptr) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
- filename_.c_str());
- hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
- filename_.c_str());
- assert(hfile_ != nullptr);
- }
- virtual ~HdfsWritableFile() {
- if (hfile_ != nullptr) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
- filename_.c_str());
- hdfsCloseFile(fileSys_, hfile_);
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
- filename_.c_str());
- hfile_ = nullptr;
- }
- }
- // If the file was successfully created, then this returns true.
- // Otherwise returns false.
- bool isValid() {
- return hfile_ != nullptr;
- }
- // The name of the file, mostly needed for debug logging.
- const std::string& getName() {
- return filename_;
- }
- virtual Status Append(const Slice& data) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
- filename_.c_str());
- const char* src = data.data();
- size_t left = data.size();
- size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
- filename_.c_str());
- if (ret != left) {
- return IOError(filename_, errno);
- }
- return Status::OK();
- }
- virtual Status Flush() {
- return Status::OK();
- }
- virtual Status Sync() {
- Status s;
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
- filename_.c_str());
- if (hdfsFlush(fileSys_, hfile_) == -1) {
- return IOError(filename_, errno);
- }
- if (hdfsHSync(fileSys_, hfile_) == -1) {
- return IOError(filename_, errno);
- }
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
- filename_.c_str());
- return Status::OK();
- }
- // This is used by HdfsLogger to write data to the debug log file
- virtual Status Append(const char* src, size_t size) {
- if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
- static_cast<tSize>(size)) {
- return IOError(filename_, errno);
- }
- return Status::OK();
- }
- virtual Status Close() {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
- filename_.c_str());
- if (hdfsCloseFile(fileSys_, hfile_) != 0) {
- return IOError(filename_, errno);
- }
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
- filename_.c_str());
- hfile_ = nullptr;
- return Status::OK();
- }
- };
- // The object that implements the debug logs to reside in HDFS.
- class HdfsLogger : public Logger {
- private:
- HdfsWritableFile* file_;
- uint64_t (*gettid_)(); // Return the thread id for the current thread
- Status HdfsCloseHelper() {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
- file_->getName().c_str());
- if (mylog != nullptr && mylog == this) {
- mylog = nullptr;
- }
- return Status::OK();
- }
- protected:
- virtual Status CloseImpl() override { return HdfsCloseHelper(); }
- public:
- HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
- : file_(f), gettid_(gettid) {
- ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
- file_->getName().c_str());
- }
- ~HdfsLogger() override {
- if (!closed_) {
- closed_ = true;
- HdfsCloseHelper();
- }
- }
- using Logger::Logv;
- void Logv(const char* format, va_list ap) override {
- const uint64_t thread_id = (*gettid_)();
- // We try twice: the first time with a fixed-size stack allocated buffer,
- // and the second time with a much larger dynamically allocated buffer.
- char buffer[500];
- for (int iter = 0; iter < 2; iter++) {
- char* base;
- int bufsize;
- if (iter == 0) {
- bufsize = sizeof(buffer);
- base = buffer;
- } else {
- bufsize = 30000;
- base = new char[bufsize];
- }
- char* p = base;
- char* limit = base + bufsize;
- struct timeval now_tv;
- gettimeofday(&now_tv, nullptr);
- const time_t seconds = now_tv.tv_sec;
- struct tm t;
- localtime_r(&seconds, &t);
- p += snprintf(p, limit - p,
- "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
- t.tm_year + 1900,
- t.tm_mon + 1,
- t.tm_mday,
- t.tm_hour,
- t.tm_min,
- t.tm_sec,
- static_cast<int>(now_tv.tv_usec),
- static_cast<long long unsigned int>(thread_id));
- // Print the message
- if (p < limit) {
- va_list backup_ap;
- va_copy(backup_ap, ap);
- p += vsnprintf(p, limit - p, format, backup_ap);
- va_end(backup_ap);
- }
- // Truncate to available space if necessary
- if (p >= limit) {
- if (iter == 0) {
- continue; // Try again with larger buffer
- } else {
- p = limit - 1;
- }
- }
- // Add newline if necessary
- if (p == base || p[-1] != '\n') {
- *p++ = '\n';
- }
- assert(p <= limit);
- file_->Append(base, p-base);
- file_->Flush();
- if (base != buffer) {
- delete[] base;
- }
- break;
- }
- }
- };
- } // namespace
- // Finally, the hdfs environment
- const std::string HdfsEnv::kProto = "hdfs://";
- const std::string HdfsEnv::pathsep = "/";
- // open a file for sequential reading
- Status HdfsEnv::NewSequentialFile(const std::string& fname,
- std::unique_ptr<SequentialFile>* result,
- const EnvOptions& /*options*/) {
- result->reset();
- HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
- if (f == nullptr || !f->isValid()) {
- delete f;
- *result = nullptr;
- return IOError(fname, errno);
- }
- result->reset(dynamic_cast<SequentialFile*>(f));
- return Status::OK();
- }
- // open a file for random reading
- Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
- std::unique_ptr<RandomAccessFile>* result,
- const EnvOptions& /*options*/) {
- result->reset();
- HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
- if (f == nullptr || !f->isValid()) {
- delete f;
- *result = nullptr;
- return IOError(fname, errno);
- }
- result->reset(dynamic_cast<RandomAccessFile*>(f));
- return Status::OK();
- }
- // create a new file for writing
- Status HdfsEnv::NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options) {
- result->reset();
- Status s;
- HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
- if (f == nullptr || !f->isValid()) {
- delete f;
- *result = nullptr;
- return IOError(fname, errno);
- }
- result->reset(dynamic_cast<WritableFile*>(f));
- return Status::OK();
- }
- class HdfsDirectory : public Directory {
- public:
- explicit HdfsDirectory(int fd) : fd_(fd) {}
- ~HdfsDirectory() {}
- Status Fsync() override { return Status::OK(); }
- int GetFd() const { return fd_; }
- private:
- int fd_;
- };
- Status HdfsEnv::NewDirectory(const std::string& name,
- std::unique_ptr<Directory>* result) {
- int value = hdfsExists(fileSys_, name.c_str());
- switch (value) {
- case HDFS_EXISTS:
- result->reset(new HdfsDirectory(0));
- return Status::OK();
- default: // fail if the directory doesn't exist
- ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
- throw HdfsFatalException("hdfsExists call failed with error " +
- ToString(value) + " on path " + name +
- ".\n");
- }
- }
- Status HdfsEnv::FileExists(const std::string& fname) {
- int value = hdfsExists(fileSys_, fname.c_str());
- switch (value) {
- case HDFS_EXISTS:
- return Status::OK();
- case HDFS_DOESNT_EXIST:
- return Status::NotFound();
- default: // anything else should be an error
- ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
- return Status::IOError("hdfsExists call failed with error " +
- ToString(value) + " on path " + fname + ".\n");
- }
- }
- Status HdfsEnv::GetChildren(const std::string& path,
- std::vector<std::string>* result) {
- int value = hdfsExists(fileSys_, path.c_str());
- switch (value) {
- case HDFS_EXISTS: { // directory exists
- int numEntries = 0;
- hdfsFileInfo* pHdfsFileInfo = 0;
- pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
- if (numEntries >= 0) {
- for(int i = 0; i < numEntries; i++) {
- std::string pathname(pHdfsFileInfo[i].mName);
- size_t pos = pathname.rfind("/");
- if (std::string::npos != pos) {
- result->push_back(pathname.substr(pos + 1));
- }
- }
- if (pHdfsFileInfo != nullptr) {
- hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
- }
- } else {
- // numEntries < 0 indicates error
- ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
- throw HdfsFatalException(
- "hdfsListDirectory call failed negative error.\n");
- }
- break;
- }
- case HDFS_DOESNT_EXIST: // directory does not exist, exit
- return Status::NotFound();
- default: // anything else should be an error
- ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
- throw HdfsFatalException("hdfsExists call failed with error " +
- ToString(value) + ".\n");
- }
- return Status::OK();
- }
- Status HdfsEnv::DeleteFile(const std::string& fname) {
- if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
- return Status::OK();
- }
- return IOError(fname, errno);
- };
- Status HdfsEnv::CreateDir(const std::string& name) {
- if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
- return Status::OK();
- }
- return IOError(name, errno);
- };
- Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
- const int value = hdfsExists(fileSys_, name.c_str());
- // Not atomic. state might change b/w hdfsExists and CreateDir.
- switch (value) {
- case HDFS_EXISTS:
- return Status::OK();
- case HDFS_DOESNT_EXIST:
- return CreateDir(name);
- default: // anything else should be an error
- ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
- throw HdfsFatalException("hdfsExists call failed with error " +
- ToString(value) + ".\n");
- }
- };
- Status HdfsEnv::DeleteDir(const std::string& name) {
- return DeleteFile(name);
- };
- Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
- *size = 0L;
- hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
- if (pFileInfo != nullptr) {
- *size = pFileInfo->mSize;
- hdfsFreeFileInfo(pFileInfo, 1);
- return Status::OK();
- }
- return IOError(fname, errno);
- }
- Status HdfsEnv::GetFileModificationTime(const std::string& fname,
- uint64_t* time) {
- hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
- if (pFileInfo != nullptr) {
- *time = static_cast<uint64_t>(pFileInfo->mLastMod);
- hdfsFreeFileInfo(pFileInfo, 1);
- return Status::OK();
- }
- return IOError(fname, errno);
- }
- // The rename is not atomic. HDFS does not allow a renaming if the
- // target already exists. So, we delete the target before attempting the
- // rename.
- Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
- hdfsDelete(fileSys_, target.c_str(), 1);
- if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
- return Status::OK();
- }
- return IOError(src, errno);
- }
- Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
- // there isn's a very good way to atomically check and create
- // a file via libhdfs
- *lock = nullptr;
- return Status::OK();
- }
- Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
- Status HdfsEnv::NewLogger(const std::string& fname,
- std::shared_ptr<Logger>* result) {
- // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
- // option is only intended for WAL/flush/compaction writes, so turn it off in
- // the logger.
- EnvOptions options;
- options.strict_bytes_per_sync = false;
- HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
- if (f == nullptr || !f->isValid()) {
- delete f;
- *result = nullptr;
- return IOError(fname, errno);
- }
- HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
- result->reset(h);
- if (mylog == nullptr) {
- // mylog = h; // uncomment this for detailed logging
- }
- return Status::OK();
- }
- // The factory method for creating an HDFS Env
- Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
- *hdfs_env = new HdfsEnv(fsname);
- return Status::OK();
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // ROCKSDB_HDFS_FILE_C
- #else // USE_HDFS
- // dummy placeholders used when HDFS is not available
- namespace ROCKSDB_NAMESPACE {
- Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
- std::unique_ptr<SequentialFile>* /*result*/,
- const EnvOptions& /*options*/) {
- return Status::NotSupported("Not compiled with hdfs support");
- }
- Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
- return Status::NotSupported("Not compiled with hdfs support");
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif
|