blob_file.cc 9.7 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/blob_db/blob_file.h"
  7. #include <stdio.h>
  8. #include <cinttypes>
  9. #include <algorithm>
  10. #include <memory>
  11. #include "db/column_family.h"
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/dbformat.h"
  14. #include "env/composite_env_wrapper.h"
  15. #include "file/filename.h"
  16. #include "file/readahead_raf.h"
  17. #include "logging/logging.h"
  18. #include "utilities/blob_db/blob_db_impl.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. namespace blob_db {
  21. BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
  22. Logger* info_log)
  23. : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
  24. BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
  25. Logger* info_log, uint32_t column_family_id,
  26. CompressionType compression, bool has_ttl,
  27. const ExpirationRange& expiration_range)
  28. : parent_(p),
  29. path_to_dir_(bdir),
  30. file_number_(fn),
  31. info_log_(info_log),
  32. column_family_id_(column_family_id),
  33. compression_(compression),
  34. has_ttl_(has_ttl),
  35. expiration_range_(expiration_range),
  36. header_(column_family_id, compression, has_ttl, expiration_range),
  37. header_valid_(true) {}
  38. BlobFile::~BlobFile() {
  39. if (obsolete_) {
  40. std::string pn(PathName());
  41. Status s = Env::Default()->DeleteFile(PathName());
  42. if (!s.ok()) {
  43. // ROCKS_LOG_INFO(db_options_.info_log,
  44. // "File could not be deleted %s", pn.c_str());
  45. }
  46. }
  47. }
  48. uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; }
  49. std::string BlobFile::PathName() const {
  50. return BlobFileName(path_to_dir_, file_number_);
  51. }
  52. std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
  53. Env* env, const DBOptions& db_options,
  54. const EnvOptions& env_options) const {
  55. constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
  56. std::unique_ptr<RandomAccessFile> sfile;
  57. std::string path_name(PathName());
  58. Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
  59. if (!s.ok()) {
  60. // report something here.
  61. return nullptr;
  62. }
  63. sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
  64. std::unique_ptr<RandomAccessFileReader> sfile_reader;
  65. sfile_reader.reset(new RandomAccessFileReader(
  66. NewLegacyRandomAccessFileWrapper(sfile), path_name));
  67. std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
  68. std::move(sfile_reader), db_options.env, db_options.statistics.get());
  69. return log_reader;
  70. }
  71. std::string BlobFile::DumpState() const {
  72. char str[1000];
  73. snprintf(
  74. str, sizeof(str),
  75. "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
  76. " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
  77. "), writer: %d reader: %d",
  78. path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
  79. closed_.load(), obsolete_.load(), expiration_range_.first,
  80. expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
  81. return str;
  82. }
  83. void BlobFile::MarkObsolete(SequenceNumber sequence) {
  84. assert(Immutable());
  85. obsolete_sequence_ = sequence;
  86. obsolete_.store(true);
  87. }
  88. bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
  89. assert(last_fsync_ <= file_size_);
  90. return (hard) ? file_size_ > last_fsync_
  91. : (file_size_ - last_fsync_) >= bytes_per_sync;
  92. }
  93. Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
  94. BlobLogFooter footer;
  95. footer.blob_count = blob_count_;
  96. if (HasTTL()) {
  97. footer.expiration_range = expiration_range_;
  98. }
  99. // this will close the file and reset the Writable File Pointer.
  100. Status s = log_writer_->AppendFooter(footer);
  101. if (s.ok()) {
  102. closed_ = true;
  103. immutable_sequence_ = sequence;
  104. file_size_ += BlobLogFooter::kSize;
  105. }
  106. // delete the sequential writer
  107. log_writer_.reset();
  108. return s;
  109. }
  110. Status BlobFile::ReadFooter(BlobLogFooter* bf) {
  111. if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
  112. return Status::IOError("File does not have footer", PathName());
  113. }
  114. uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
  115. // assume that ra_file_reader_ is valid before we enter this
  116. assert(ra_file_reader_);
  117. Slice result;
  118. char scratch[BlobLogFooter::kSize + 10];
  119. Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
  120. scratch);
  121. if (!s.ok()) return s;
  122. if (result.size() != BlobLogFooter::kSize) {
  123. // should not happen
  124. return Status::IOError("EOF reached before footer");
  125. }
  126. s = bf->DecodeFrom(result);
  127. return s;
  128. }
  129. Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
  130. // assume that file has been fully fsync'd
  131. last_fsync_.store(file_size_);
  132. blob_count_ = footer.blob_count;
  133. expiration_range_ = footer.expiration_range;
  134. closed_ = true;
  135. return Status::OK();
  136. }
  137. Status BlobFile::Fsync() {
  138. Status s;
  139. if (log_writer_.get()) {
  140. s = log_writer_->Sync();
  141. last_fsync_.store(file_size_.load());
  142. }
  143. return s;
  144. }
  145. void BlobFile::CloseRandomAccessLocked() {
  146. ra_file_reader_.reset();
  147. last_access_ = -1;
  148. }
  149. Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
  150. std::shared_ptr<RandomAccessFileReader>* reader,
  151. bool* fresh_open) {
  152. assert(reader != nullptr);
  153. assert(fresh_open != nullptr);
  154. *fresh_open = false;
  155. int64_t current_time = 0;
  156. env->GetCurrentTime(&current_time);
  157. last_access_.store(current_time);
  158. Status s;
  159. {
  160. ReadLock lockbfile_r(&mutex_);
  161. if (ra_file_reader_) {
  162. *reader = ra_file_reader_;
  163. return s;
  164. }
  165. }
  166. WriteLock lockbfile_w(&mutex_);
  167. // Double check.
  168. if (ra_file_reader_) {
  169. *reader = ra_file_reader_;
  170. return s;
  171. }
  172. std::unique_ptr<RandomAccessFile> rfile;
  173. s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
  174. if (!s.ok()) {
  175. ROCKS_LOG_ERROR(info_log_,
  176. "Failed to open blob file for random-read: %s status: '%s'"
  177. " exists: '%s'",
  178. PathName().c_str(), s.ToString().c_str(),
  179. env->FileExists(PathName()).ToString().c_str());
  180. return s;
  181. }
  182. ra_file_reader_ = std::make_shared<RandomAccessFileReader>(
  183. NewLegacyRandomAccessFileWrapper(rfile), PathName());
  184. *reader = ra_file_reader_;
  185. *fresh_open = true;
  186. return s;
  187. }
  188. Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
  189. assert(Immutable());
  190. // Get file size.
  191. uint64_t file_size = 0;
  192. Status s = env->GetFileSize(PathName(), &file_size);
  193. if (s.ok()) {
  194. file_size_ = file_size;
  195. } else {
  196. ROCKS_LOG_ERROR(info_log_,
  197. "Failed to get size of blob file %" PRIu64
  198. ", status: %s",
  199. file_number_, s.ToString().c_str());
  200. return s;
  201. }
  202. if (file_size < BlobLogHeader::kSize) {
  203. ROCKS_LOG_ERROR(info_log_,
  204. "Incomplete blob file blob file %" PRIu64
  205. ", size: %" PRIu64,
  206. file_number_, file_size);
  207. return Status::Corruption("Incomplete blob file header.");
  208. }
  209. // Create file reader.
  210. std::unique_ptr<RandomAccessFile> file;
  211. s = env->NewRandomAccessFile(PathName(), &file, env_options);
  212. if (!s.ok()) {
  213. ROCKS_LOG_ERROR(info_log_,
  214. "Failed to open blob file %" PRIu64 ", status: %s",
  215. file_number_, s.ToString().c_str());
  216. return s;
  217. }
  218. std::unique_ptr<RandomAccessFileReader> file_reader(
  219. new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
  220. PathName()));
  221. // Read file header.
  222. char header_buf[BlobLogHeader::kSize];
  223. Slice header_slice;
  224. s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf);
  225. if (!s.ok()) {
  226. ROCKS_LOG_ERROR(info_log_,
  227. "Failed to read header of blob file %" PRIu64
  228. ", status: %s",
  229. file_number_, s.ToString().c_str());
  230. return s;
  231. }
  232. BlobLogHeader header;
  233. s = header.DecodeFrom(header_slice);
  234. if (!s.ok()) {
  235. ROCKS_LOG_ERROR(info_log_,
  236. "Failed to decode header of blob file %" PRIu64
  237. ", status: %s",
  238. file_number_, s.ToString().c_str());
  239. return s;
  240. }
  241. column_family_id_ = header.column_family_id;
  242. compression_ = header.compression;
  243. has_ttl_ = header.has_ttl;
  244. if (has_ttl_) {
  245. expiration_range_ = header.expiration_range;
  246. }
  247. header_valid_ = true;
  248. // Read file footer.
  249. if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
  250. // OK not to have footer.
  251. assert(!footer_valid_);
  252. return Status::OK();
  253. }
  254. char footer_buf[BlobLogFooter::kSize];
  255. Slice footer_slice;
  256. s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
  257. &footer_slice, footer_buf);
  258. if (!s.ok()) {
  259. ROCKS_LOG_ERROR(info_log_,
  260. "Failed to read footer of blob file %" PRIu64
  261. ", status: %s",
  262. file_number_, s.ToString().c_str());
  263. return s;
  264. }
  265. BlobLogFooter footer;
  266. s = footer.DecodeFrom(footer_slice);
  267. if (!s.ok()) {
  268. // OK not to have footer.
  269. assert(!footer_valid_);
  270. return Status::OK();
  271. }
  272. blob_count_ = footer.blob_count;
  273. if (has_ttl_) {
  274. assert(header.expiration_range.first <= footer.expiration_range.first);
  275. assert(header.expiration_range.second >= footer.expiration_range.second);
  276. expiration_range_ = footer.expiration_range;
  277. }
  278. footer_valid_ = true;
  279. return Status::OK();
  280. }
  281. } // namespace blob_db
  282. } // namespace ROCKSDB_NAMESPACE
  283. #endif // ROCKSDB_LITE