line_file_reader.cc 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "file/line_file_reader.h"
  6. #include <cstring>
  7. #include "monitoring/iostats_context_imp.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
  10. const std::string& fname,
  11. const FileOptions& file_opts,
  12. std::unique_ptr<LineFileReader>* reader,
  13. IODebugContext* dbg,
  14. RateLimiter* rate_limiter) {
  15. std::unique_ptr<FSSequentialFile> file;
  16. IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
  17. if (io_s.ok()) {
  18. reader->reset(new LineFileReader(
  19. std::move(file), fname, nullptr,
  20. std::vector<std::shared_ptr<EventListener>>{}, rate_limiter));
  21. }
  22. return io_s;
  23. }
  24. bool LineFileReader::ReadLine(std::string* out,
  25. Env::IOPriority rate_limiter_priority) {
  26. assert(out);
  27. if (!io_status_.ok()) {
  28. // Status should be checked (or permit unchecked) any time we return false.
  29. io_status_.MustCheck();
  30. return false;
  31. }
  32. out->clear();
  33. for (;;) {
  34. // Look for line delimiter
  35. const char* found = static_cast<const char*>(
  36. std::memchr(buf_begin_, '\n', buf_end_ - buf_begin_));
  37. if (found) {
  38. size_t len = found - buf_begin_;
  39. out->append(buf_begin_, len);
  40. buf_begin_ += len + /*delim*/ 1;
  41. ++line_number_;
  42. return true;
  43. }
  44. if (at_eof_) {
  45. io_status_.MustCheck();
  46. return false;
  47. }
  48. // else flush and reload buffer
  49. out->append(buf_begin_, buf_end_ - buf_begin_);
  50. Slice result;
  51. io_status_ =
  52. sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority);
  53. IOSTATS_ADD(bytes_read, result.size());
  54. if (!io_status_.ok()) {
  55. io_status_.MustCheck();
  56. return false;
  57. }
  58. if (result.size() != buf_.size()) {
  59. // The obscure way of indicating EOF
  60. at_eof_ = true;
  61. }
  62. buf_begin_ = result.data();
  63. buf_end_ = result.data() + result.size();
  64. }
  65. }
  66. } // namespace ROCKSDB_NAMESPACE