block_based_table_builder.cc 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_based/block_based_table_builder.h"
  10. #include <assert.h>
  11. #include <stdio.h>
  12. #include <list>
  13. #include <map>
  14. #include <memory>
  15. #include <string>
  16. #include <unordered_map>
  17. #include <utility>
  18. #include "db/dbformat.h"
  19. #include "index_builder.h"
  20. #include "rocksdb/cache.h"
  21. #include "rocksdb/comparator.h"
  22. #include "rocksdb/env.h"
  23. #include "rocksdb/flush_block_policy.h"
  24. #include "rocksdb/merge_operator.h"
  25. #include "rocksdb/table.h"
  26. #include "table/block_based/block.h"
  27. #include "table/block_based/block_based_filter_block.h"
  28. #include "table/block_based/block_based_table_factory.h"
  29. #include "table/block_based/block_based_table_reader.h"
  30. #include "table/block_based/block_builder.h"
  31. #include "table/block_based/filter_block.h"
  32. #include "table/block_based/filter_policy_internal.h"
  33. #include "table/block_based/full_filter_block.h"
  34. #include "table/block_based/partitioned_filter_block.h"
  35. #include "table/format.h"
  36. #include "table/table_builder.h"
  37. #include "memory/memory_allocator.h"
  38. #include "util/coding.h"
  39. #include "util/compression.h"
  40. #include "util/crc32c.h"
  41. #include "util/stop_watch.h"
  42. #include "util/string_util.h"
  43. #include "util/xxhash.h"
  44. namespace ROCKSDB_NAMESPACE {
  45. extern const std::string kHashIndexPrefixesBlock;
  46. extern const std::string kHashIndexPrefixesMetadataBlock;
  47. typedef BlockBasedTableOptions::IndexType IndexType;
  48. // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
  49. namespace {
  50. // Create a filter block builder based on its type.
  51. FilterBlockBuilder* CreateFilterBlockBuilder(
  52. const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
  53. const FilterBuildingContext& context,
  54. const bool use_delta_encoding_for_index_values,
  55. PartitionedIndexBuilder* const p_index_builder) {
  56. const BlockBasedTableOptions& table_opt = context.table_options;
  57. if (table_opt.filter_policy == nullptr) return nullptr;
  58. FilterBitsBuilder* filter_bits_builder =
  59. BloomFilterPolicy::GetBuilderFromContext(context);
  60. if (filter_bits_builder == nullptr) {
  61. return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
  62. table_opt);
  63. } else {
  64. if (table_opt.partition_filters) {
  65. assert(p_index_builder != nullptr);
  66. // Since after partition cut request from filter builder it takes time
  67. // until index builder actully cuts the partition, we take the lower bound
  68. // as partition size.
  69. assert(table_opt.block_size_deviation <= 100);
  70. auto partition_size =
  71. static_cast<uint32_t>(((table_opt.metadata_block_size *
  72. (100 - table_opt.block_size_deviation)) +
  73. 99) /
  74. 100);
  75. partition_size = std::max(partition_size, static_cast<uint32_t>(1));
  76. return new PartitionedFilterBlockBuilder(
  77. mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
  78. filter_bits_builder, table_opt.index_block_restart_interval,
  79. use_delta_encoding_for_index_values, p_index_builder, partition_size);
  80. } else {
  81. return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
  82. table_opt.whole_key_filtering,
  83. filter_bits_builder);
  84. }
  85. }
  86. }
  87. bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
  88. // Check to see if compressed less than 12.5%
  89. return compressed_size < raw_size - (raw_size / 8u);
  90. }
  91. bool CompressBlockInternal(const Slice& raw,
  92. const CompressionInfo& compression_info,
  93. uint32_t format_version,
  94. std::string* compressed_output) {
  95. // Will return compressed block contents if (1) the compression method is
  96. // supported in this platform and (2) the compression rate is "good enough".
  97. switch (compression_info.type()) {
  98. case kSnappyCompression:
  99. return Snappy_Compress(compression_info, raw.data(), raw.size(),
  100. compressed_output);
  101. case kZlibCompression:
  102. return Zlib_Compress(
  103. compression_info,
  104. GetCompressFormatForVersion(kZlibCompression, format_version),
  105. raw.data(), raw.size(), compressed_output);
  106. case kBZip2Compression:
  107. return BZip2_Compress(
  108. compression_info,
  109. GetCompressFormatForVersion(kBZip2Compression, format_version),
  110. raw.data(), raw.size(), compressed_output);
  111. case kLZ4Compression:
  112. return LZ4_Compress(
  113. compression_info,
  114. GetCompressFormatForVersion(kLZ4Compression, format_version),
  115. raw.data(), raw.size(), compressed_output);
  116. case kLZ4HCCompression:
  117. return LZ4HC_Compress(
  118. compression_info,
  119. GetCompressFormatForVersion(kLZ4HCCompression, format_version),
  120. raw.data(), raw.size(), compressed_output);
  121. case kXpressCompression:
  122. return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
  123. case kZSTD:
  124. case kZSTDNotFinalCompression:
  125. return ZSTD_Compress(compression_info, raw.data(), raw.size(),
  126. compressed_output);
  127. default:
  128. // Do not recognize this compression type
  129. return false;
  130. }
  131. }
  132. } // namespace
  133. // format_version is the block format as defined in include/rocksdb/table.h
  134. Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
  135. CompressionType* type, uint32_t format_version,
  136. bool do_sample, std::string* compressed_output,
  137. std::string* sampled_output_fast,
  138. std::string* sampled_output_slow) {
  139. *type = info.type();
  140. if (info.type() == kNoCompression && !info.SampleForCompression()) {
  141. return raw;
  142. }
  143. // If requested, we sample one in every N block with a
  144. // fast and slow compression algorithm and report the stats.
  145. // The users can use these stats to decide if it is worthwhile
  146. // enabling compression and they also get a hint about which
  147. // compression algorithm wil be beneficial.
  148. if (do_sample && info.SampleForCompression() &&
  149. Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
  150. sampled_output_fast && sampled_output_slow) {
  151. // Sampling with a fast compression algorithm
  152. if (LZ4_Supported() || Snappy_Supported()) {
  153. CompressionType c =
  154. LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
  155. CompressionContext context(c);
  156. CompressionOptions options;
  157. CompressionInfo info_tmp(options, context,
  158. CompressionDict::GetEmptyDict(), c,
  159. info.SampleForCompression());
  160. CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
  161. }
  162. // Sampling with a slow but high-compression algorithm
  163. if (ZSTD_Supported() || Zlib_Supported()) {
  164. CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
  165. CompressionContext context(c);
  166. CompressionOptions options;
  167. CompressionInfo info_tmp(options, context,
  168. CompressionDict::GetEmptyDict(), c,
  169. info.SampleForCompression());
  170. CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
  171. }
  172. }
  173. // Actually compress the data
  174. if (*type != kNoCompression) {
  175. if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
  176. GoodCompressionRatio(compressed_output->size(), raw.size())) {
  177. return *compressed_output;
  178. }
  179. }
  180. // Compression method is not supported, or not good
  181. // compression ratio, so just fall back to uncompressed form.
  182. *type = kNoCompression;
  183. return raw;
  184. }
  185. // kBlockBasedTableMagicNumber was picked by running
  186. // echo rocksdb.table.block_based | sha1sum
  187. // and taking the leading 64 bits.
  188. // Please note that kBlockBasedTableMagicNumber may also be accessed by other
  189. // .cc files
  190. // for that reason we declare it extern in the header but to get the space
  191. // allocated
  192. // it must be not extern in one place.
  193. const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
  194. // We also support reading and writing legacy block based table format (for
  195. // backwards compatibility)
  196. const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
  197. // A collector that collects properties of interest to block-based table.
  198. // For now this class looks heavy-weight since we only write one additional
  199. // property.
  200. // But in the foreseeable future, we will add more and more properties that are
  201. // specific to block-based table.
  202. class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
  203. : public IntTblPropCollector {
  204. public:
  205. explicit BlockBasedTablePropertiesCollector(
  206. BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
  207. bool prefix_filtering)
  208. : index_type_(index_type),
  209. whole_key_filtering_(whole_key_filtering),
  210. prefix_filtering_(prefix_filtering) {}
  211. Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
  212. uint64_t /*file_size*/) override {
  213. // Intentionally left blank. Have no interest in collecting stats for
  214. // individual key/value pairs.
  215. return Status::OK();
  216. }
  217. virtual void BlockAdd(uint64_t /* blockRawBytes */,
  218. uint64_t /* blockCompressedBytesFast */,
  219. uint64_t /* blockCompressedBytesSlow */) override {
  220. // Intentionally left blank. No interest in collecting stats for
  221. // blocks.
  222. return;
  223. }
  224. Status Finish(UserCollectedProperties* properties) override {
  225. std::string val;
  226. PutFixed32(&val, static_cast<uint32_t>(index_type_));
  227. properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
  228. properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
  229. whole_key_filtering_ ? kPropTrue : kPropFalse});
  230. properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
  231. prefix_filtering_ ? kPropTrue : kPropFalse});
  232. return Status::OK();
  233. }
  234. // The name of the properties collector can be used for debugging purpose.
  235. const char* Name() const override {
  236. return "BlockBasedTablePropertiesCollector";
  237. }
  238. UserCollectedProperties GetReadableProperties() const override {
  239. // Intentionally left blank.
  240. return UserCollectedProperties();
  241. }
  242. private:
  243. BlockBasedTableOptions::IndexType index_type_;
  244. bool whole_key_filtering_;
  245. bool prefix_filtering_;
  246. };
  247. struct BlockBasedTableBuilder::Rep {
  248. const ImmutableCFOptions ioptions;
  249. const MutableCFOptions moptions;
  250. const BlockBasedTableOptions table_options;
  251. const InternalKeyComparator& internal_comparator;
  252. WritableFileWriter* file;
  253. uint64_t offset = 0;
  254. Status status;
  255. size_t alignment;
  256. BlockBuilder data_block;
  257. // Buffers uncompressed data blocks and keys to replay later. Needed when
  258. // compression dictionary is enabled so we can finalize the dictionary before
  259. // compressing any data blocks.
  260. // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
  261. // blocks as it's redundant, but it's easier to implement for now.
  262. std::vector<std::pair<std::string, std::vector<std::string>>>
  263. data_block_and_keys_buffers;
  264. BlockBuilder range_del_block;
  265. InternalKeySliceTransform internal_prefix_transform;
  266. std::unique_ptr<IndexBuilder> index_builder;
  267. PartitionedIndexBuilder* p_index_builder_ = nullptr;
  268. std::string last_key;
  269. CompressionType compression_type;
  270. uint64_t sample_for_compression;
  271. CompressionOptions compression_opts;
  272. std::unique_ptr<CompressionDict> compression_dict;
  273. CompressionContext compression_ctx;
  274. std::unique_ptr<UncompressionContext> verify_ctx;
  275. std::unique_ptr<UncompressionDict> verify_dict;
  276. size_t data_begin_offset = 0;
  277. TableProperties props;
  278. // States of the builder.
  279. //
  280. // - `kBuffered`: This is the initial state where zero or more data blocks are
  281. // accumulated uncompressed in-memory. From this state, call
  282. // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
  283. // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
  284. // state.
  285. //
  286. // - `kUnbuffered`: This is the state when compression dictionary is finalized
  287. // either because it wasn't enabled in the first place or it's been created
  288. // from sampling previously buffered data. In this state, blocks are simply
  289. // compressed/written out as they fill up. From this state, call `Finish()`
  290. // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
  291. // the partially created file.
  292. //
  293. // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
  294. // called, so the table builder is no longer usable. We must be in this
  295. // state by the time the destructor runs.
  296. enum class State {
  297. kBuffered,
  298. kUnbuffered,
  299. kClosed,
  300. };
  301. State state;
  302. const bool use_delta_encoding_for_index_values;
  303. std::unique_ptr<FilterBlockBuilder> filter_builder;
  304. char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
  305. size_t compressed_cache_key_prefix_size;
  306. BlockHandle pending_handle; // Handle to add to index block
  307. std::string compressed_output;
  308. std::unique_ptr<FlushBlockPolicy> flush_block_policy;
  309. int level_at_creation;
  310. uint32_t column_family_id;
  311. const std::string& column_family_name;
  312. uint64_t creation_time = 0;
  313. uint64_t oldest_key_time = 0;
  314. const uint64_t target_file_size;
  315. uint64_t file_creation_time = 0;
  316. std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
  317. Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
  318. const BlockBasedTableOptions& table_opt,
  319. const InternalKeyComparator& icomparator,
  320. const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
  321. int_tbl_prop_collector_factories,
  322. uint32_t _column_family_id, WritableFileWriter* f,
  323. const CompressionType _compression_type,
  324. const uint64_t _sample_for_compression,
  325. const CompressionOptions& _compression_opts, const bool skip_filters,
  326. const int _level_at_creation, const std::string& _column_family_name,
  327. const uint64_t _creation_time, const uint64_t _oldest_key_time,
  328. const uint64_t _target_file_size, const uint64_t _file_creation_time)
  329. : ioptions(_ioptions),
  330. moptions(_moptions),
  331. table_options(table_opt),
  332. internal_comparator(icomparator),
  333. file(f),
  334. alignment(table_options.block_align
  335. ? std::min(table_options.block_size, kDefaultPageSize)
  336. : 0),
  337. data_block(table_options.block_restart_interval,
  338. table_options.use_delta_encoding,
  339. false /* use_value_delta_encoding */,
  340. icomparator.user_comparator()
  341. ->CanKeysWithDifferentByteContentsBeEqual()
  342. ? BlockBasedTableOptions::kDataBlockBinarySearch
  343. : table_options.data_block_index_type,
  344. table_options.data_block_hash_table_util_ratio),
  345. range_del_block(1 /* block_restart_interval */),
  346. internal_prefix_transform(_moptions.prefix_extractor.get()),
  347. compression_type(_compression_type),
  348. sample_for_compression(_sample_for_compression),
  349. compression_opts(_compression_opts),
  350. compression_dict(),
  351. compression_ctx(_compression_type),
  352. verify_dict(),
  353. state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
  354. : State::kUnbuffered),
  355. use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
  356. !table_opt.block_align),
  357. compressed_cache_key_prefix_size(0),
  358. flush_block_policy(
  359. table_options.flush_block_policy_factory->NewFlushBlockPolicy(
  360. table_options, data_block)),
  361. level_at_creation(_level_at_creation),
  362. column_family_id(_column_family_id),
  363. column_family_name(_column_family_name),
  364. creation_time(_creation_time),
  365. oldest_key_time(_oldest_key_time),
  366. target_file_size(_target_file_size),
  367. file_creation_time(_file_creation_time) {
  368. if (table_options.index_type ==
  369. BlockBasedTableOptions::kTwoLevelIndexSearch) {
  370. p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
  371. &internal_comparator, use_delta_encoding_for_index_values,
  372. table_options);
  373. index_builder.reset(p_index_builder_);
  374. } else {
  375. index_builder.reset(IndexBuilder::CreateIndexBuilder(
  376. table_options.index_type, &internal_comparator,
  377. &this->internal_prefix_transform, use_delta_encoding_for_index_values,
  378. table_options));
  379. }
  380. if (skip_filters) {
  381. filter_builder = nullptr;
  382. } else {
  383. FilterBuildingContext context(table_options);
  384. context.column_family_name = column_family_name;
  385. context.compaction_style = ioptions.compaction_style;
  386. context.level_at_creation = level_at_creation;
  387. context.info_log = ioptions.info_log;
  388. filter_builder.reset(CreateFilterBlockBuilder(
  389. ioptions, moptions, context, use_delta_encoding_for_index_values,
  390. p_index_builder_));
  391. }
  392. for (auto& collector_factories : *int_tbl_prop_collector_factories) {
  393. table_properties_collectors.emplace_back(
  394. collector_factories->CreateIntTblPropCollector(column_family_id));
  395. }
  396. table_properties_collectors.emplace_back(
  397. new BlockBasedTablePropertiesCollector(
  398. table_options.index_type, table_options.whole_key_filtering,
  399. _moptions.prefix_extractor != nullptr));
  400. if (table_options.verify_compression) {
  401. verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
  402. compression_type));
  403. }
  404. }
  405. Rep(const Rep&) = delete;
  406. Rep& operator=(const Rep&) = delete;
  407. ~Rep() {}
  408. };
  409. BlockBasedTableBuilder::BlockBasedTableBuilder(
  410. const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
  411. const BlockBasedTableOptions& table_options,
  412. const InternalKeyComparator& internal_comparator,
  413. const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
  414. int_tbl_prop_collector_factories,
  415. uint32_t column_family_id, WritableFileWriter* file,
  416. const CompressionType compression_type,
  417. const uint64_t sample_for_compression,
  418. const CompressionOptions& compression_opts, const bool skip_filters,
  419. const std::string& column_family_name, const int level_at_creation,
  420. const uint64_t creation_time, const uint64_t oldest_key_time,
  421. const uint64_t target_file_size, const uint64_t file_creation_time) {
  422. BlockBasedTableOptions sanitized_table_options(table_options);
  423. if (sanitized_table_options.format_version == 0 &&
  424. sanitized_table_options.checksum != kCRC32c) {
  425. ROCKS_LOG_WARN(
  426. ioptions.info_log,
  427. "Silently converting format_version to 1 because checksum is "
  428. "non-default");
  429. // silently convert format_version to 1 to keep consistent with current
  430. // behavior
  431. sanitized_table_options.format_version = 1;
  432. }
  433. rep_ = new Rep(ioptions, moptions, sanitized_table_options,
  434. internal_comparator, int_tbl_prop_collector_factories,
  435. column_family_id, file, compression_type,
  436. sample_for_compression, compression_opts, skip_filters,
  437. level_at_creation, column_family_name, creation_time,
  438. oldest_key_time, target_file_size, file_creation_time);
  439. if (rep_->filter_builder != nullptr) {
  440. rep_->filter_builder->StartBlock(0);
  441. }
  442. if (table_options.block_cache_compressed.get() != nullptr) {
  443. BlockBasedTable::GenerateCachePrefix(
  444. table_options.block_cache_compressed.get(), file->writable_file(),
  445. &rep_->compressed_cache_key_prefix[0],
  446. &rep_->compressed_cache_key_prefix_size);
  447. }
  448. }
  449. BlockBasedTableBuilder::~BlockBasedTableBuilder() {
  450. // Catch errors where caller forgot to call Finish()
  451. assert(rep_->state == Rep::State::kClosed);
  452. delete rep_;
  453. }
  454. void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
  455. Rep* r = rep_;
  456. assert(rep_->state != Rep::State::kClosed);
  457. if (!ok()) return;
  458. ValueType value_type = ExtractValueType(key);
  459. if (IsValueType(value_type)) {
  460. #ifndef NDEBUG
  461. if (r->props.num_entries > r->props.num_range_deletions) {
  462. assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
  463. }
  464. #endif // NDEBUG
  465. auto should_flush = r->flush_block_policy->Update(key, value);
  466. if (should_flush) {
  467. assert(!r->data_block.empty());
  468. Flush();
  469. if (r->state == Rep::State::kBuffered &&
  470. r->data_begin_offset > r->target_file_size) {
  471. EnterUnbuffered();
  472. }
  473. // Add item to index block.
  474. // We do not emit the index entry for a block until we have seen the
  475. // first key for the next data block. This allows us to use shorter
  476. // keys in the index block. For example, consider a block boundary
  477. // between the keys "the quick brown fox" and "the who". We can use
  478. // "the r" as the key for the index block entry since it is >= all
  479. // entries in the first block and < all entries in subsequent
  480. // blocks.
  481. if (ok() && r->state == Rep::State::kUnbuffered) {
  482. r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
  483. }
  484. }
  485. // Note: PartitionedFilterBlockBuilder requires key being added to filter
  486. // builder after being added to index builder.
  487. if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
  488. size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
  489. r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
  490. }
  491. r->last_key.assign(key.data(), key.size());
  492. r->data_block.Add(key, value);
  493. if (r->state == Rep::State::kBuffered) {
  494. // Buffer keys to be replayed during `Finish()` once compression
  495. // dictionary has been finalized.
  496. if (r->data_block_and_keys_buffers.empty() || should_flush) {
  497. r->data_block_and_keys_buffers.emplace_back();
  498. }
  499. r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
  500. } else {
  501. r->index_builder->OnKeyAdded(key);
  502. }
  503. NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
  504. r->table_properties_collectors,
  505. r->ioptions.info_log);
  506. } else if (value_type == kTypeRangeDeletion) {
  507. r->range_del_block.Add(key, value);
  508. NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
  509. r->table_properties_collectors,
  510. r->ioptions.info_log);
  511. } else {
  512. assert(false);
  513. }
  514. r->props.num_entries++;
  515. r->props.raw_key_size += key.size();
  516. r->props.raw_value_size += value.size();
  517. if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
  518. r->props.num_deletions++;
  519. } else if (value_type == kTypeRangeDeletion) {
  520. r->props.num_deletions++;
  521. r->props.num_range_deletions++;
  522. } else if (value_type == kTypeMerge) {
  523. r->props.num_merge_operands++;
  524. }
  525. }
  526. void BlockBasedTableBuilder::Flush() {
  527. Rep* r = rep_;
  528. assert(rep_->state != Rep::State::kClosed);
  529. if (!ok()) return;
  530. if (r->data_block.empty()) return;
  531. WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
  532. }
  533. void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
  534. BlockHandle* handle,
  535. bool is_data_block) {
  536. WriteBlock(block->Finish(), handle, is_data_block);
  537. block->Reset();
  538. }
  539. void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
  540. BlockHandle* handle,
  541. bool is_data_block) {
  542. // File format contains a sequence of blocks where each block has:
  543. // block_data: uint8[n]
  544. // type: uint8
  545. // crc: uint32
  546. assert(ok());
  547. Rep* r = rep_;
  548. auto type = r->compression_type;
  549. uint64_t sample_for_compression = r->sample_for_compression;
  550. Slice block_contents;
  551. bool abort_compression = false;
  552. StopWatchNano timer(
  553. r->ioptions.env,
  554. ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
  555. if (r->state == Rep::State::kBuffered) {
  556. assert(is_data_block);
  557. assert(!r->data_block_and_keys_buffers.empty());
  558. r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
  559. r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
  560. return;
  561. }
  562. if (raw_block_contents.size() < kCompressionSizeLimit) {
  563. const CompressionDict* compression_dict;
  564. if (!is_data_block || r->compression_dict == nullptr) {
  565. compression_dict = &CompressionDict::GetEmptyDict();
  566. } else {
  567. compression_dict = r->compression_dict.get();
  568. }
  569. assert(compression_dict != nullptr);
  570. CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
  571. *compression_dict, type,
  572. sample_for_compression);
  573. std::string sampled_output_fast;
  574. std::string sampled_output_slow;
  575. block_contents = CompressBlock(
  576. raw_block_contents, compression_info, &type,
  577. r->table_options.format_version, is_data_block /* do_sample */,
  578. &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
  579. // notify collectors on block add
  580. NotifyCollectTableCollectorsOnBlockAdd(
  581. r->table_properties_collectors, raw_block_contents.size(),
  582. sampled_output_fast.size(), sampled_output_slow.size());
  583. // Some of the compression algorithms are known to be unreliable. If
  584. // the verify_compression flag is set then try to de-compress the
  585. // compressed data and compare to the input.
  586. if (type != kNoCompression && r->table_options.verify_compression) {
  587. // Retrieve the uncompressed contents into a new buffer
  588. const UncompressionDict* verify_dict;
  589. if (!is_data_block || r->verify_dict == nullptr) {
  590. verify_dict = &UncompressionDict::GetEmptyDict();
  591. } else {
  592. verify_dict = r->verify_dict.get();
  593. }
  594. assert(verify_dict != nullptr);
  595. BlockContents contents;
  596. UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
  597. r->compression_type);
  598. Status stat = UncompressBlockContentsForCompressionType(
  599. uncompression_info, block_contents.data(), block_contents.size(),
  600. &contents, r->table_options.format_version, r->ioptions);
  601. if (stat.ok()) {
  602. bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
  603. if (!compressed_ok) {
  604. // The result of the compression was invalid. abort.
  605. abort_compression = true;
  606. ROCKS_LOG_ERROR(r->ioptions.info_log,
  607. "Decompressed block did not match raw block");
  608. r->status =
  609. Status::Corruption("Decompressed block did not match raw block");
  610. }
  611. } else {
  612. // Decompression reported an error. abort.
  613. r->status = Status::Corruption("Could not decompress");
  614. abort_compression = true;
  615. }
  616. }
  617. } else {
  618. // Block is too big to be compressed.
  619. abort_compression = true;
  620. }
  621. // Abort compression if the block is too big, or did not pass
  622. // verification.
  623. if (abort_compression) {
  624. RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
  625. type = kNoCompression;
  626. block_contents = raw_block_contents;
  627. } else if (type != kNoCompression) {
  628. if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
  629. RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
  630. timer.ElapsedNanos());
  631. }
  632. RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
  633. raw_block_contents.size());
  634. RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
  635. } else if (type != r->compression_type) {
  636. RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
  637. }
  638. WriteRawBlock(block_contents, type, handle, is_data_block);
  639. r->compressed_output.clear();
  640. if (is_data_block) {
  641. if (r->filter_builder != nullptr) {
  642. r->filter_builder->StartBlock(r->offset);
  643. }
  644. r->props.data_size = r->offset;
  645. ++r->props.num_data_blocks;
  646. }
  647. }
  648. void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
  649. CompressionType type,
  650. BlockHandle* handle,
  651. bool is_data_block) {
  652. Rep* r = rep_;
  653. StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
  654. handle->set_offset(r->offset);
  655. handle->set_size(block_contents.size());
  656. assert(r->status.ok());
  657. r->status = r->file->Append(block_contents);
  658. if (r->status.ok()) {
  659. char trailer[kBlockTrailerSize];
  660. trailer[0] = type;
  661. char* trailer_without_type = trailer + 1;
  662. switch (r->table_options.checksum) {
  663. case kNoChecksum:
  664. EncodeFixed32(trailer_without_type, 0);
  665. break;
  666. case kCRC32c: {
  667. auto crc = crc32c::Value(block_contents.data(), block_contents.size());
  668. crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
  669. EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
  670. break;
  671. }
  672. case kxxHash: {
  673. XXH32_state_t* const state = XXH32_createState();
  674. XXH32_reset(state, 0);
  675. XXH32_update(state, block_contents.data(),
  676. static_cast<uint32_t>(block_contents.size()));
  677. XXH32_update(state, trailer, 1); // Extend to cover block type
  678. EncodeFixed32(trailer_without_type, XXH32_digest(state));
  679. XXH32_freeState(state);
  680. break;
  681. }
  682. case kxxHash64: {
  683. XXH64_state_t* const state = XXH64_createState();
  684. XXH64_reset(state, 0);
  685. XXH64_update(state, block_contents.data(),
  686. static_cast<uint32_t>(block_contents.size()));
  687. XXH64_update(state, trailer, 1); // Extend to cover block type
  688. EncodeFixed32(
  689. trailer_without_type,
  690. static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
  691. uint64_t{0xffffffff}));
  692. XXH64_freeState(state);
  693. break;
  694. }
  695. }
  696. assert(r->status.ok());
  697. TEST_SYNC_POINT_CALLBACK(
  698. "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
  699. static_cast<char*>(trailer));
  700. r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
  701. if (r->status.ok()) {
  702. r->status = InsertBlockInCache(block_contents, type, handle);
  703. }
  704. if (r->status.ok()) {
  705. r->offset += block_contents.size() + kBlockTrailerSize;
  706. if (r->table_options.block_align && is_data_block) {
  707. size_t pad_bytes =
  708. (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
  709. (r->alignment - 1))) &
  710. (r->alignment - 1);
  711. r->status = r->file->Pad(pad_bytes);
  712. if (r->status.ok()) {
  713. r->offset += pad_bytes;
  714. }
  715. }
  716. }
  717. }
  718. }
  719. Status BlockBasedTableBuilder::status() const { return rep_->status; }
  720. static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
  721. BlockContents* bc = reinterpret_cast<BlockContents*>(value);
  722. delete bc;
  723. }
  724. //
  725. // Make a copy of the block contents and insert into compressed block cache
  726. //
  727. Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
  728. const CompressionType type,
  729. const BlockHandle* handle) {
  730. Rep* r = rep_;
  731. Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
  732. if (type != kNoCompression && block_cache_compressed != nullptr) {
  733. size_t size = block_contents.size();
  734. auto ubuf =
  735. AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
  736. memcpy(ubuf.get(), block_contents.data(), size);
  737. ubuf[size] = type;
  738. BlockContents* block_contents_to_cache =
  739. new BlockContents(std::move(ubuf), size);
  740. #ifndef NDEBUG
  741. block_contents_to_cache->is_raw_block = true;
  742. #endif // NDEBUG
  743. // make cache key by appending the file offset to the cache prefix id
  744. char* end = EncodeVarint64(
  745. r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
  746. handle->offset());
  747. Slice key(r->compressed_cache_key_prefix,
  748. static_cast<size_t>(end - r->compressed_cache_key_prefix));
  749. // Insert into compressed block cache.
  750. block_cache_compressed->Insert(
  751. key, block_contents_to_cache,
  752. block_contents_to_cache->ApproximateMemoryUsage(),
  753. &DeleteCachedBlockContents);
  754. // Invalidate OS cache.
  755. r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
  756. }
  757. return Status::OK();
  758. }
  759. void BlockBasedTableBuilder::WriteFilterBlock(
  760. MetaIndexBuilder* meta_index_builder) {
  761. BlockHandle filter_block_handle;
  762. bool empty_filter_block = (rep_->filter_builder == nullptr ||
  763. rep_->filter_builder->NumAdded() == 0);
  764. if (ok() && !empty_filter_block) {
  765. Status s = Status::Incomplete();
  766. while (ok() && s.IsIncomplete()) {
  767. Slice filter_content =
  768. rep_->filter_builder->Finish(filter_block_handle, &s);
  769. assert(s.ok() || s.IsIncomplete());
  770. rep_->props.filter_size += filter_content.size();
  771. WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
  772. }
  773. }
  774. if (ok() && !empty_filter_block) {
  775. // Add mapping from "<filter_block_prefix>.Name" to location
  776. // of filter data.
  777. std::string key;
  778. if (rep_->filter_builder->IsBlockBased()) {
  779. key = BlockBasedTable::kFilterBlockPrefix;
  780. } else {
  781. key = rep_->table_options.partition_filters
  782. ? BlockBasedTable::kPartitionedFilterBlockPrefix
  783. : BlockBasedTable::kFullFilterBlockPrefix;
  784. }
  785. key.append(rep_->table_options.filter_policy->Name());
  786. meta_index_builder->Add(key, filter_block_handle);
  787. }
  788. }
  789. void BlockBasedTableBuilder::WriteIndexBlock(
  790. MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
  791. IndexBuilder::IndexBlocks index_blocks;
  792. auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
  793. if (index_builder_status.IsIncomplete()) {
  794. // We we have more than one index partition then meta_blocks are not
  795. // supported for the index. Currently meta_blocks are used only by
  796. // HashIndexBuilder which is not multi-partition.
  797. assert(index_blocks.meta_blocks.empty());
  798. } else if (ok() && !index_builder_status.ok()) {
  799. rep_->status = index_builder_status;
  800. }
  801. if (ok()) {
  802. for (const auto& item : index_blocks.meta_blocks) {
  803. BlockHandle block_handle;
  804. WriteBlock(item.second, &block_handle, false /* is_data_block */);
  805. if (!ok()) {
  806. break;
  807. }
  808. meta_index_builder->Add(item.first, block_handle);
  809. }
  810. }
  811. if (ok()) {
  812. if (rep_->table_options.enable_index_compression) {
  813. WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
  814. } else {
  815. WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
  816. index_block_handle);
  817. }
  818. }
  819. // If there are more index partitions, finish them and write them out
  820. Status s = index_builder_status;
  821. while (ok() && s.IsIncomplete()) {
  822. s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
  823. if (!s.ok() && !s.IsIncomplete()) {
  824. rep_->status = s;
  825. return;
  826. }
  827. if (rep_->table_options.enable_index_compression) {
  828. WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
  829. } else {
  830. WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
  831. index_block_handle);
  832. }
  833. // The last index_block_handle will be for the partition index block
  834. }
  835. }
  836. void BlockBasedTableBuilder::WritePropertiesBlock(
  837. MetaIndexBuilder* meta_index_builder) {
  838. BlockHandle properties_block_handle;
  839. if (ok()) {
  840. PropertyBlockBuilder property_block_builder;
  841. rep_->props.column_family_id = rep_->column_family_id;
  842. rep_->props.column_family_name = rep_->column_family_name;
  843. rep_->props.filter_policy_name =
  844. rep_->table_options.filter_policy != nullptr
  845. ? rep_->table_options.filter_policy->Name()
  846. : "";
  847. rep_->props.index_size =
  848. rep_->index_builder->IndexSize() + kBlockTrailerSize;
  849. rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
  850. ? rep_->ioptions.user_comparator->Name()
  851. : "nullptr";
  852. rep_->props.merge_operator_name =
  853. rep_->ioptions.merge_operator != nullptr
  854. ? rep_->ioptions.merge_operator->Name()
  855. : "nullptr";
  856. rep_->props.compression_name =
  857. CompressionTypeToString(rep_->compression_type);
  858. rep_->props.compression_options =
  859. CompressionOptionsToString(rep_->compression_opts);
  860. rep_->props.prefix_extractor_name =
  861. rep_->moptions.prefix_extractor != nullptr
  862. ? rep_->moptions.prefix_extractor->Name()
  863. : "nullptr";
  864. std::string property_collectors_names = "[";
  865. for (size_t i = 0;
  866. i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
  867. if (i != 0) {
  868. property_collectors_names += ",";
  869. }
  870. property_collectors_names +=
  871. rep_->ioptions.table_properties_collector_factories[i]->Name();
  872. }
  873. property_collectors_names += "]";
  874. rep_->props.property_collectors_names = property_collectors_names;
  875. if (rep_->table_options.index_type ==
  876. BlockBasedTableOptions::kTwoLevelIndexSearch) {
  877. assert(rep_->p_index_builder_ != nullptr);
  878. rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
  879. rep_->props.top_level_index_size =
  880. rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
  881. }
  882. rep_->props.index_key_is_user_key =
  883. !rep_->index_builder->seperator_is_key_plus_seq();
  884. rep_->props.index_value_is_delta_encoded =
  885. rep_->use_delta_encoding_for_index_values;
  886. rep_->props.creation_time = rep_->creation_time;
  887. rep_->props.oldest_key_time = rep_->oldest_key_time;
  888. rep_->props.file_creation_time = rep_->file_creation_time;
  889. // Add basic properties
  890. property_block_builder.AddTableProperty(rep_->props);
  891. // Add use collected properties
  892. NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
  893. rep_->ioptions.info_log,
  894. &property_block_builder);
  895. WriteRawBlock(property_block_builder.Finish(), kNoCompression,
  896. &properties_block_handle);
  897. }
  898. if (ok()) {
  899. #ifndef NDEBUG
  900. {
  901. uint64_t props_block_offset = properties_block_handle.offset();
  902. uint64_t props_block_size = properties_block_handle.size();
  903. TEST_SYNC_POINT_CALLBACK(
  904. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
  905. &props_block_offset);
  906. TEST_SYNC_POINT_CALLBACK(
  907. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
  908. &props_block_size);
  909. }
  910. #endif // !NDEBUG
  911. meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
  912. }
  913. }
  914. void BlockBasedTableBuilder::WriteCompressionDictBlock(
  915. MetaIndexBuilder* meta_index_builder) {
  916. if (rep_->compression_dict != nullptr &&
  917. rep_->compression_dict->GetRawDict().size()) {
  918. BlockHandle compression_dict_block_handle;
  919. if (ok()) {
  920. WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
  921. &compression_dict_block_handle);
  922. #ifndef NDEBUG
  923. Slice compression_dict = rep_->compression_dict->GetRawDict();
  924. TEST_SYNC_POINT_CALLBACK(
  925. "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
  926. &compression_dict);
  927. #endif // NDEBUG
  928. }
  929. if (ok()) {
  930. meta_index_builder->Add(kCompressionDictBlock,
  931. compression_dict_block_handle);
  932. }
  933. }
  934. }
  935. void BlockBasedTableBuilder::WriteRangeDelBlock(
  936. MetaIndexBuilder* meta_index_builder) {
  937. if (ok() && !rep_->range_del_block.empty()) {
  938. BlockHandle range_del_block_handle;
  939. WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
  940. &range_del_block_handle);
  941. meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
  942. }
  943. }
  944. void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
  945. BlockHandle& index_block_handle) {
  946. Rep* r = rep_;
  947. // No need to write out new footer if we're using default checksum.
  948. // We're writing legacy magic number because we want old versions of RocksDB
  949. // be able to read files generated with new release (just in case if
  950. // somebody wants to roll back after an upgrade)
  951. // TODO(icanadi) at some point in the future, when we're absolutely sure
  952. // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
  953. // number and always write new table files with new magic number
  954. bool legacy = (r->table_options.format_version == 0);
  955. // this is guaranteed by BlockBasedTableBuilder's constructor
  956. assert(r->table_options.checksum == kCRC32c ||
  957. r->table_options.format_version != 0);
  958. Footer footer(
  959. legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
  960. r->table_options.format_version);
  961. footer.set_metaindex_handle(metaindex_block_handle);
  962. footer.set_index_handle(index_block_handle);
  963. footer.set_checksum(r->table_options.checksum);
  964. std::string footer_encoding;
  965. footer.EncodeTo(&footer_encoding);
  966. assert(r->status.ok());
  967. r->status = r->file->Append(footer_encoding);
  968. if (r->status.ok()) {
  969. r->offset += footer_encoding.size();
  970. }
  971. }
  972. void BlockBasedTableBuilder::EnterUnbuffered() {
  973. Rep* r = rep_;
  974. assert(r->state == Rep::State::kBuffered);
  975. r->state = Rep::State::kUnbuffered;
  976. const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
  977. ? r->compression_opts.zstd_max_train_bytes
  978. : r->compression_opts.max_dict_bytes;
  979. Random64 generator{r->creation_time};
  980. std::string compression_dict_samples;
  981. std::vector<size_t> compression_dict_sample_lens;
  982. if (!r->data_block_and_keys_buffers.empty()) {
  983. while (compression_dict_samples.size() < kSampleBytes) {
  984. size_t rand_idx =
  985. static_cast<size_t>(
  986. generator.Uniform(r->data_block_and_keys_buffers.size()));
  987. size_t copy_len =
  988. std::min(kSampleBytes - compression_dict_samples.size(),
  989. r->data_block_and_keys_buffers[rand_idx].first.size());
  990. compression_dict_samples.append(
  991. r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
  992. compression_dict_sample_lens.emplace_back(copy_len);
  993. }
  994. }
  995. // final data block flushed, now we can generate dictionary from the samples.
  996. // OK if compression_dict_samples is empty, we'll just get empty dictionary.
  997. std::string dict;
  998. if (r->compression_opts.zstd_max_train_bytes > 0) {
  999. dict = ZSTD_TrainDictionary(compression_dict_samples,
  1000. compression_dict_sample_lens,
  1001. r->compression_opts.max_dict_bytes);
  1002. } else {
  1003. dict = std::move(compression_dict_samples);
  1004. }
  1005. r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
  1006. r->compression_opts.level));
  1007. r->verify_dict.reset(new UncompressionDict(
  1008. dict, r->compression_type == kZSTD ||
  1009. r->compression_type == kZSTDNotFinalCompression));
  1010. for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
  1011. const auto& data_block = r->data_block_and_keys_buffers[i].first;
  1012. auto& keys = r->data_block_and_keys_buffers[i].second;
  1013. assert(!data_block.empty());
  1014. assert(!keys.empty());
  1015. for (const auto& key : keys) {
  1016. if (r->filter_builder != nullptr) {
  1017. size_t ts_sz =
  1018. r->internal_comparator.user_comparator()->timestamp_size();
  1019. r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
  1020. }
  1021. r->index_builder->OnKeyAdded(key);
  1022. }
  1023. WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
  1024. if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
  1025. Slice first_key_in_next_block =
  1026. r->data_block_and_keys_buffers[i + 1].second.front();
  1027. Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
  1028. r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
  1029. r->pending_handle);
  1030. }
  1031. }
  1032. r->data_block_and_keys_buffers.clear();
  1033. }
  1034. Status BlockBasedTableBuilder::Finish() {
  1035. Rep* r = rep_;
  1036. assert(r->state != Rep::State::kClosed);
  1037. bool empty_data_block = r->data_block.empty();
  1038. Flush();
  1039. if (r->state == Rep::State::kBuffered) {
  1040. EnterUnbuffered();
  1041. }
  1042. // To make sure properties block is able to keep the accurate size of index
  1043. // block, we will finish writing all index entries first.
  1044. if (ok() && !empty_data_block) {
  1045. r->index_builder->AddIndexEntry(
  1046. &r->last_key, nullptr /* no next data block */, r->pending_handle);
  1047. }
  1048. // Write meta blocks, metaindex block and footer in the following order.
  1049. // 1. [meta block: filter]
  1050. // 2. [meta block: index]
  1051. // 3. [meta block: compression dictionary]
  1052. // 4. [meta block: range deletion tombstone]
  1053. // 5. [meta block: properties]
  1054. // 6. [metaindex block]
  1055. // 7. Footer
  1056. BlockHandle metaindex_block_handle, index_block_handle;
  1057. MetaIndexBuilder meta_index_builder;
  1058. WriteFilterBlock(&meta_index_builder);
  1059. WriteIndexBlock(&meta_index_builder, &index_block_handle);
  1060. WriteCompressionDictBlock(&meta_index_builder);
  1061. WriteRangeDelBlock(&meta_index_builder);
  1062. WritePropertiesBlock(&meta_index_builder);
  1063. if (ok()) {
  1064. // flush the meta index block
  1065. WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
  1066. &metaindex_block_handle);
  1067. }
  1068. if (ok()) {
  1069. WriteFooter(metaindex_block_handle, index_block_handle);
  1070. }
  1071. if (r->file != nullptr) {
  1072. file_checksum_ = r->file->GetFileChecksum();
  1073. }
  1074. r->state = Rep::State::kClosed;
  1075. return r->status;
  1076. }
  1077. void BlockBasedTableBuilder::Abandon() {
  1078. assert(rep_->state != Rep::State::kClosed);
  1079. rep_->state = Rep::State::kClosed;
  1080. }
  1081. uint64_t BlockBasedTableBuilder::NumEntries() const {
  1082. return rep_->props.num_entries;
  1083. }
  1084. uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
  1085. bool BlockBasedTableBuilder::NeedCompact() const {
  1086. for (const auto& collector : rep_->table_properties_collectors) {
  1087. if (collector->NeedCompact()) {
  1088. return true;
  1089. }
  1090. }
  1091. return false;
  1092. }
  1093. TableProperties BlockBasedTableBuilder::GetTableProperties() const {
  1094. TableProperties ret = rep_->props;
  1095. for (const auto& collector : rep_->table_properties_collectors) {
  1096. for (const auto& prop : collector->GetReadableProperties()) {
  1097. ret.readable_properties.insert(prop);
  1098. }
  1099. collector->Finish(&ret.user_collected_properties);
  1100. }
  1101. return ret;
  1102. }
  1103. const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
  1104. if (rep_->file != nullptr) {
  1105. return rep_->file->GetFileChecksumFuncName();
  1106. } else {
  1107. return kUnknownFileChecksumFuncName.c_str();
  1108. }
  1109. }
  1110. const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
  1111. const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
  1112. const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
  1113. "partitionedfilter.";
  1114. } // namespace ROCKSDB_NAMESPACE