blob_file_builder.cc 13 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/blob/blob_file_builder.h"
  6. #include <cassert>
  7. #include "db/blob/blob_contents.h"
  8. #include "db/blob/blob_file_addition.h"
  9. #include "db/blob/blob_file_completion_callback.h"
  10. #include "db/blob/blob_index.h"
  11. #include "db/blob/blob_log_format.h"
  12. #include "db/blob/blob_log_writer.h"
  13. #include "db/blob/blob_source.h"
  14. #include "db/event_helpers.h"
  15. #include "db/version_set.h"
  16. #include "file/filename.h"
  17. #include "file/read_write_util.h"
  18. #include "file/writable_file_writer.h"
  19. #include "logging/logging.h"
  20. #include "options/cf_options.h"
  21. #include "options/options_helper.h"
  22. #include "rocksdb/slice.h"
  23. #include "rocksdb/status.h"
  24. #include "test_util/sync_point.h"
  25. #include "trace_replay/io_tracer.h"
  26. #include "util/compression.h"
  27. namespace ROCKSDB_NAMESPACE {
  28. BlobFileBuilder::BlobFileBuilder(
  29. VersionSet* versions, FileSystem* fs,
  30. const ImmutableOptions* immutable_options,
  31. const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
  32. const WriteOptions* write_options, std::string db_id,
  33. std::string db_session_id, int job_id, uint32_t column_family_id,
  34. const std::string& column_family_name, Env::WriteLifeTimeHint write_hint,
  35. const std::shared_ptr<IOTracer>& io_tracer,
  36. BlobFileCompletionCallback* blob_callback,
  37. BlobFileCreationReason creation_reason,
  38. std::vector<std::string>* blob_file_paths,
  39. std::vector<BlobFileAddition>* blob_file_additions)
  40. : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
  41. immutable_options, mutable_cf_options, file_options,
  42. write_options, db_id, db_session_id, job_id,
  43. column_family_id, column_family_name, write_hint,
  44. io_tracer, blob_callback, creation_reason,
  45. blob_file_paths, blob_file_additions) {}
  46. BlobFileBuilder::BlobFileBuilder(
  47. std::function<uint64_t()> file_number_generator, FileSystem* fs,
  48. const ImmutableOptions* immutable_options,
  49. const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
  50. const WriteOptions* write_options, std::string db_id,
  51. std::string db_session_id, int job_id, uint32_t column_family_id,
  52. const std::string& column_family_name, Env::WriteLifeTimeHint write_hint,
  53. const std::shared_ptr<IOTracer>& io_tracer,
  54. BlobFileCompletionCallback* blob_callback,
  55. BlobFileCreationReason creation_reason,
  56. std::vector<std::string>* blob_file_paths,
  57. std::vector<BlobFileAddition>* blob_file_additions)
  58. : file_number_generator_(std::move(file_number_generator)),
  59. fs_(fs),
  60. immutable_options_(immutable_options),
  61. min_blob_size_(mutable_cf_options->min_blob_size),
  62. blob_file_size_(mutable_cf_options->blob_file_size),
  63. blob_compression_type_(mutable_cf_options->blob_compression_type),
  64. prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
  65. file_options_(file_options),
  66. write_options_(write_options),
  67. db_id_(std::move(db_id)),
  68. db_session_id_(std::move(db_session_id)),
  69. job_id_(job_id),
  70. column_family_id_(column_family_id),
  71. column_family_name_(column_family_name),
  72. write_hint_(write_hint),
  73. io_tracer_(io_tracer),
  74. blob_callback_(blob_callback),
  75. creation_reason_(creation_reason),
  76. blob_file_paths_(blob_file_paths),
  77. blob_file_additions_(blob_file_additions),
  78. blob_count_(0),
  79. blob_bytes_(0) {
  80. assert(file_number_generator_);
  81. assert(fs_);
  82. assert(immutable_options_);
  83. assert(file_options_);
  84. assert(write_options_);
  85. assert(blob_file_paths_);
  86. assert(blob_file_paths_->empty());
  87. assert(blob_file_additions_);
  88. assert(blob_file_additions_->empty());
  89. }
  90. BlobFileBuilder::~BlobFileBuilder() = default;
  91. Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
  92. std::string* blob_index) {
  93. assert(blob_index);
  94. assert(blob_index->empty());
  95. if (value.size() < min_blob_size_) {
  96. return Status::OK();
  97. }
  98. {
  99. const Status s = OpenBlobFileIfNeeded();
  100. if (!s.ok()) {
  101. return s;
  102. }
  103. }
  104. Slice blob = value;
  105. std::string compressed_blob;
  106. {
  107. const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
  108. if (!s.ok()) {
  109. return s;
  110. }
  111. }
  112. uint64_t blob_file_number = 0;
  113. uint64_t blob_offset = 0;
  114. {
  115. const Status s =
  116. WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
  117. if (!s.ok()) {
  118. return s;
  119. }
  120. }
  121. {
  122. const Status s = CloseBlobFileIfNeeded();
  123. if (!s.ok()) {
  124. return s;
  125. }
  126. }
  127. {
  128. const Status s =
  129. PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
  130. if (!s.ok()) {
  131. ROCKS_LOG_WARN(immutable_options_->info_log,
  132. "Failed to pre-populate the blob into blob cache: %s",
  133. s.ToString().c_str());
  134. }
  135. }
  136. BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
  137. blob_compression_type_);
  138. return Status::OK();
  139. }
  140. Status BlobFileBuilder::Finish() {
  141. if (!IsBlobFileOpen()) {
  142. return Status::OK();
  143. }
  144. return CloseBlobFile();
  145. }
  146. bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
  147. Status BlobFileBuilder::OpenBlobFileIfNeeded() {
  148. if (IsBlobFileOpen()) {
  149. return Status::OK();
  150. }
  151. assert(!blob_count_);
  152. assert(!blob_bytes_);
  153. assert(file_number_generator_);
  154. const uint64_t blob_file_number = file_number_generator_();
  155. assert(immutable_options_);
  156. assert(!immutable_options_->cf_paths.empty());
  157. std::string blob_file_path =
  158. BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
  159. if (blob_callback_) {
  160. blob_callback_->OnBlobFileCreationStarted(
  161. blob_file_path, column_family_name_, job_id_, creation_reason_);
  162. }
  163. std::unique_ptr<FSWritableFile> file;
  164. FileOptions fo_copy;
  165. {
  166. assert(file_options_);
  167. fo_copy = *file_options_;
  168. fo_copy.write_hint = write_hint_;
  169. Status s = NewWritableFile(fs_, blob_file_path, &file, fo_copy);
  170. TEST_SYNC_POINT_CALLBACK(
  171. "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
  172. if (!s.ok()) {
  173. return s;
  174. }
  175. }
  176. // Note: files get added to blob_file_paths_ right after the open, so they
  177. // can be cleaned up upon failure. Contrast this with blob_file_additions_,
  178. // which only contains successfully written files.
  179. assert(blob_file_paths_);
  180. blob_file_paths_->emplace_back(std::move(blob_file_path));
  181. assert(file);
  182. file->SetIOPriority(write_options_->rate_limiter_priority);
  183. // Subsequent attempts to override the hint via SetWriteLifeTimeHint
  184. // with the very same value will be ignored by the fs.
  185. file->SetWriteLifeTimeHint(fo_copy.write_hint);
  186. FileTypeSet tmp_set = immutable_options_->checksum_handoff_file_types;
  187. Statistics* const statistics = immutable_options_->stats;
  188. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  189. std::move(file), blob_file_paths_->back(), *file_options_,
  190. immutable_options_->clock, io_tracer_, statistics,
  191. Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS, immutable_options_->listeners,
  192. immutable_options_->file_checksum_gen_factory.get(),
  193. tmp_set.Contains(FileType::kBlobFile), false));
  194. constexpr bool do_flush = false;
  195. std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
  196. std::move(file_writer), immutable_options_->clock, statistics,
  197. blob_file_number, immutable_options_->use_fsync, do_flush));
  198. constexpr bool has_ttl = false;
  199. constexpr ExpirationRange expiration_range;
  200. BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
  201. expiration_range);
  202. {
  203. Status s = blob_log_writer->WriteHeader(*write_options_, header);
  204. TEST_SYNC_POINT_CALLBACK(
  205. "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
  206. if (!s.ok()) {
  207. return s;
  208. }
  209. }
  210. writer_ = std::move(blob_log_writer);
  211. assert(IsBlobFileOpen());
  212. return Status::OK();
  213. }
  214. Status BlobFileBuilder::CompressBlobIfNeeded(
  215. Slice* blob, std::string* compressed_blob) const {
  216. assert(blob);
  217. assert(compressed_blob);
  218. assert(compressed_blob->empty());
  219. assert(immutable_options_);
  220. if (blob_compression_type_ == kNoCompression) {
  221. return Status::OK();
  222. }
  223. // TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
  224. CompressionOptions opts;
  225. CompressionContext context(blob_compression_type_, opts);
  226. CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
  227. blob_compression_type_);
  228. constexpr uint32_t compression_format_version = 2;
  229. bool success = false;
  230. {
  231. StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
  232. BLOB_DB_COMPRESSION_MICROS);
  233. success = OLD_CompressData(*blob, info, compression_format_version,
  234. compressed_blob);
  235. }
  236. if (!success) {
  237. return Status::Corruption("Error compressing blob");
  238. }
  239. *blob = Slice(*compressed_blob);
  240. return Status::OK();
  241. }
  242. Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
  243. uint64_t* blob_file_number,
  244. uint64_t* blob_offset) {
  245. assert(IsBlobFileOpen());
  246. assert(blob_file_number);
  247. assert(blob_offset);
  248. uint64_t key_offset = 0;
  249. Status s =
  250. writer_->AddRecord(*write_options_, key, blob, &key_offset, blob_offset);
  251. TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
  252. if (!s.ok()) {
  253. return s;
  254. }
  255. *blob_file_number = writer_->get_log_number();
  256. ++blob_count_;
  257. blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
  258. return Status::OK();
  259. }
  260. Status BlobFileBuilder::CloseBlobFile() {
  261. assert(IsBlobFileOpen());
  262. BlobLogFooter footer;
  263. footer.blob_count = blob_count_;
  264. std::string checksum_method;
  265. std::string checksum_value;
  266. Status s = writer_->AppendFooter(*write_options_, footer, &checksum_method,
  267. &checksum_value);
  268. TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
  269. if (!s.ok()) {
  270. return s;
  271. }
  272. const uint64_t blob_file_number = writer_->get_log_number();
  273. if (blob_callback_) {
  274. s = blob_callback_->OnBlobFileCompleted(
  275. blob_file_paths_->back(), column_family_name_, job_id_,
  276. blob_file_number, creation_reason_, s, checksum_value, checksum_method,
  277. blob_count_, blob_bytes_);
  278. }
  279. assert(blob_file_additions_);
  280. blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
  281. std::move(checksum_method),
  282. std::move(checksum_value));
  283. assert(immutable_options_);
  284. ROCKS_LOG_INFO(immutable_options_->logger,
  285. "[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
  286. " total blobs, %" PRIu64 " total bytes",
  287. column_family_name_.c_str(), job_id_, blob_file_number,
  288. blob_count_, blob_bytes_);
  289. writer_.reset();
  290. blob_count_ = 0;
  291. blob_bytes_ = 0;
  292. return s;
  293. }
  294. Status BlobFileBuilder::CloseBlobFileIfNeeded() {
  295. assert(IsBlobFileOpen());
  296. const WritableFileWriter* const file_writer = writer_->file();
  297. assert(file_writer);
  298. if (file_writer->GetFileSize() < blob_file_size_) {
  299. return Status::OK();
  300. }
  301. return CloseBlobFile();
  302. }
  303. void BlobFileBuilder::Abandon(const Status& s) {
  304. if (!IsBlobFileOpen()) {
  305. return;
  306. }
  307. if (blob_callback_) {
  308. // BlobFileBuilder::Abandon() is called because of error while writing to
  309. // Blob files. So we can ignore the below error.
  310. blob_callback_
  311. ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
  312. job_id_, writer_->get_log_number(),
  313. creation_reason_, s, "", "", blob_count_,
  314. blob_bytes_)
  315. .PermitUncheckedError();
  316. }
  317. writer_.reset();
  318. blob_count_ = 0;
  319. blob_bytes_ = 0;
  320. }
  321. Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
  322. uint64_t blob_file_number,
  323. uint64_t blob_offset) const {
  324. Status s = Status::OK();
  325. BlobSource::SharedCacheInterface blob_cache{immutable_options_->blob_cache};
  326. auto statistics = immutable_options_->statistics.get();
  327. bool warm_cache =
  328. prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
  329. creation_reason_ == BlobFileCreationReason::kFlush;
  330. if (blob_cache && warm_cache) {
  331. const OffsetableCacheKey base_cache_key(db_id_, db_session_id_,
  332. blob_file_number);
  333. const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
  334. const Slice key = cache_key.AsSlice();
  335. const Cache::Priority priority = Cache::Priority::BOTTOM;
  336. s = blob_cache.InsertSaved(key, blob, nullptr /*context*/, priority,
  337. immutable_options_->lowest_used_cache_tier);
  338. if (s.ok()) {
  339. RecordTick(statistics, BLOB_DB_CACHE_ADD);
  340. RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, blob.size());
  341. } else {
  342. RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
  343. }
  344. }
  345. return s;
  346. }
  347. } // namespace ROCKSDB_NAMESPACE