block_based_table_builder.cc 114 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_based/block_based_table_builder.h"
  10. #include <atomic>
  11. #include <cassert>
  12. #include <cstdio>
  13. #include <list>
  14. #include <map>
  15. #include <memory>
  16. #include <numeric>
  17. #include <string>
  18. #include <unordered_map>
  19. #include <utility>
  20. #include "block_cache.h"
  21. #include "cache/cache_entry_roles.h"
  22. #include "cache/cache_helpers.h"
  23. #include "cache/cache_key.h"
  24. #include "cache/cache_reservation_manager.h"
  25. #include "db/dbformat.h"
  26. #include "index_builder.h"
  27. #include "logging/logging.h"
  28. #include "memory/memory_allocator_impl.h"
  29. #include "options/options_helper.h"
  30. #include "rocksdb/cache.h"
  31. #include "rocksdb/comparator.h"
  32. #include "rocksdb/env.h"
  33. #include "rocksdb/filter_policy.h"
  34. #include "rocksdb/flush_block_policy.h"
  35. #include "rocksdb/merge_operator.h"
  36. #include "rocksdb/table.h"
  37. #include "rocksdb/types.h"
  38. #include "table/block_based/block.h"
  39. #include "table/block_based/block_based_table_factory.h"
  40. #include "table/block_based/block_based_table_reader.h"
  41. #include "table/block_based/block_builder.h"
  42. #include "table/block_based/filter_block.h"
  43. #include "table/block_based/filter_policy_internal.h"
  44. #include "table/block_based/full_filter_block.h"
  45. #include "table/block_based/partitioned_filter_block.h"
  46. #include "table/block_based/user_defined_index_wrapper.h"
  47. #include "table/format.h"
  48. #include "table/meta_blocks.h"
  49. #include "table/table_builder.h"
  50. #include "util/bit_fields.h"
  51. #include "util/coding.h"
  52. #include "util/compression.h"
  53. #include "util/defer.h"
  54. #include "util/semaphore.h"
  55. #include "util/stop_watch.h"
  56. #include "util/string_util.h"
  57. namespace ROCKSDB_NAMESPACE {
  58. extern const std::string kHashIndexPrefixesBlock;
  59. extern const std::string kHashIndexPrefixesMetadataBlock;
  60. // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
  61. namespace {
  62. constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
  63. // Create a filter block builder based on its type.
  64. FilterBlockBuilder* CreateFilterBlockBuilder(
  65. const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
  66. const FilterBuildingContext& context,
  67. const bool use_delta_encoding_for_index_values,
  68. PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
  69. const bool persist_user_defined_timestamps) {
  70. const BlockBasedTableOptions& table_opt = context.table_options;
  71. assert(table_opt.filter_policy); // precondition
  72. FilterBitsBuilder* filter_bits_builder =
  73. BloomFilterPolicy::GetBuilderFromContext(context);
  74. if (filter_bits_builder == nullptr) {
  75. return nullptr;
  76. } else {
  77. if (table_opt.partition_filters) {
  78. assert(p_index_builder != nullptr);
  79. // Since after partition cut request from filter builder it takes time
  80. // until index builder actully cuts the partition, until the end of a
  81. // data block potentially with many keys, we take the lower bound as
  82. // partition size.
  83. assert(table_opt.block_size_deviation <= 100);
  84. auto partition_size =
  85. static_cast<uint32_t>(((table_opt.metadata_block_size *
  86. (100 - table_opt.block_size_deviation)) +
  87. 99) /
  88. 100);
  89. partition_size = std::max(partition_size, static_cast<uint32_t>(1));
  90. return new PartitionedFilterBlockBuilder(
  91. mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
  92. filter_bits_builder, table_opt.index_block_restart_interval,
  93. use_delta_encoding_for_index_values, p_index_builder, partition_size,
  94. ts_sz, persist_user_defined_timestamps,
  95. table_opt.decouple_partitioned_filters);
  96. } else {
  97. return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
  98. table_opt.whole_key_filtering,
  99. filter_bits_builder);
  100. }
  101. }
  102. }
  103. } // namespace
  104. // kBlockBasedTableMagicNumber was picked by running
  105. // echo rocksdb.table.block_based | sha1sum
  106. // and taking the leading 64 bits.
  107. // Please note that kBlockBasedTableMagicNumber may also be accessed by other
  108. // .cc files
  109. // for that reason we declare it extern in the header but to get the space
  110. // allocated
  111. // it must be not extern in one place.
  112. const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
  113. // We also support reading and writing legacy block based table format (for
  114. // backwards compatibility)
  115. const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
  116. // A collector that collects properties of interest to block-based table.
  117. // For now this class looks heavy-weight since we only write one additional
  118. // property.
  119. // But in the foreseeable future, we will add more and more properties that are
  120. // specific to block-based table.
  121. class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
  122. : public InternalTblPropColl {
  123. public:
  124. explicit BlockBasedTablePropertiesCollector(
  125. BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
  126. bool prefix_filtering, bool decoupled_partitioned_filters)
  127. : index_type_(index_type),
  128. whole_key_filtering_(whole_key_filtering),
  129. prefix_filtering_(prefix_filtering),
  130. decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
  131. Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
  132. uint64_t /*file_size*/) override {
  133. // Intentionally left blank. Have no interest in collecting stats for
  134. // individual key/value pairs.
  135. return Status::OK();
  136. }
  137. void BlockAdd(uint64_t /* block_uncomp_bytes */,
  138. uint64_t /* block_compressed_bytes_fast */,
  139. uint64_t /* block_compressed_bytes_slow */) override {
  140. // Intentionally left blank. No interest in collecting stats for
  141. // blocks.
  142. }
  143. Status Finish(UserCollectedProperties* properties) override {
  144. std::string val;
  145. PutFixed32(&val, static_cast<uint32_t>(index_type_));
  146. properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
  147. properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
  148. whole_key_filtering_ ? kPropTrue : kPropFalse});
  149. properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
  150. prefix_filtering_ ? kPropTrue : kPropFalse});
  151. if (decoupled_partitioned_filters_) {
  152. properties->insert(
  153. {BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
  154. kPropTrue});
  155. }
  156. return Status::OK();
  157. }
  158. // The name of the properties collector can be used for debugging purpose.
  159. const char* Name() const override {
  160. return "BlockBasedTablePropertiesCollector";
  161. }
  162. UserCollectedProperties GetReadableProperties() const override {
  163. // Intentionally left blank.
  164. return UserCollectedProperties();
  165. }
  166. private:
  167. BlockBasedTableOptions::IndexType index_type_;
  168. bool whole_key_filtering_;
  169. bool prefix_filtering_;
  170. bool decoupled_partitioned_filters_;
  171. };
  172. struct BlockBasedTableBuilder::WorkingAreaPair {
  173. Compressor::ManagedWorkingArea compress;
  174. Decompressor::ManagedWorkingArea verify;
  175. };
  176. // ParallelCompressionRep essentially defines a framework for parallelizing
  177. // block generation ("emit"), block compression, and block writing to storage.
  178. // The synchronization is lock-free/wait-free, so thread waiting only happens
  179. // when work-order dependencies are unsatisfied, though sleeping/idle threads
  180. // might be kept idle when it seems unlikely they would improve throughput by
  181. // waking them up (essentially auto-tuned parallelism). But because all threads
  182. // are capable of 2 out of 3 kinds of work, in a quasi-work-stealing system,
  183. // running threads can usually expect that compatible work is available.
  184. //
  185. // This is currently activated with CompressionOptions::parallel_threads > 1
  186. // but that is a somewhat crude API that would ideally be adapted along with
  187. // the implementation in the future to allow threads to serve multiple
  188. // flush/compaction jobs, though the available improvement might be small.
  189. // Even within the scope of a single file it might be nice to use a general
  190. // framework for distributing work across threads, but (a) different threads
  191. // are limited to which work they can do because of technical challenges, (b)
  192. // being largely CPU bound on small work units means such a framework would
  193. // likely have big overheads compared to this hand-optimized solution.
  194. struct BlockBasedTableBuilder::ParallelCompressionRep {
  195. // The framework has two kinds of threads: the calling thread from
  196. // flush/compaction/SstFileWriter is called the "emit thread" (kEmitter).
  197. // Other threads cannot generally take over "emit" work because that is
  198. // largely happening up the call stack from BlockBasedTableBuilder.
  199. // The emit thread can also take on compression work in a quasi-work-stealing
  200. // manner when the buffer for emitting new blocks is full.
  201. //
  202. // When parallelism is enabled, there are also "worker" threads that
  203. // can handle compressing blocks and (one worker thread at a time) write them
  204. // to the SST file (and handle other single-threaded wrap-up of each block).
  205. //
  206. // NOTE: when parallelism is enabled, the emit thread is not permitted to
  207. // write to the SST file because that is the potential "output" bottleneck,
  208. // and it's generally bad for parallelism to allow the only thread that can
  209. // serve the "input" bottleneck (emit work) to also spend exclusive time on
  210. // the output bottleneck.
  211. enum class ThreadKind {
  212. kEmitter,
  213. kWorker,
  214. };
  215. // ThreadState allows each thread to track its work assignment. In addition to
  216. // the cases already mentioned, kEmitting, kCompressing, and kWriting to the
  217. // SST file writer,
  218. // * Threads can enter the kIdle state so that they can sleep when no work is
  219. // available for them, to be woken up when appropriate.
  220. // * The kEnd state means the thread is not doing any more work items, which
  221. // for worker threads means they will end soon.
  222. // * The kCompressingAndWriting state means a worker can compress and write a
  223. // block without additional state updates because the same block to be
  224. // compressed is the next to be written.
  225. enum class ThreadState {
  226. /* BEGIN Emitter only states */
  227. kEmitting,
  228. /* END Emitter only states */
  229. /* BEGIN states for emitter and worker */
  230. kIdle,
  231. kCompressing,
  232. kEnd,
  233. /* END states for emitter and worker */
  234. /* BEGIN Worker only states */
  235. kCompressingAndWriting,
  236. kWriting,
  237. /* END Worker only states */
  238. };
  239. // BlockRep instances are used and reused in a ring buffer (below), so that
  240. // many blocks can be in an intermediate state between serialized into
  241. // uncompressed bytes and written to the SST file. Notably, each block is
  242. // "emitted" in uncompressed form into a BlockRep, compressed (at least
  243. // attempted, when configured) for updated BlockRep, and then written from the
  244. // BlockRep to the writer for the SST file bytes.
  245. struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
  246. // Uncompressed block contents
  247. std::string uncompressed;
  248. GrowableBuffer compressed;
  249. CompressionType compression_type = kNoCompression;
  250. std::unique_ptr<IndexBuilder::PreparedIndexEntry> prepared_index_entry;
  251. };
  252. // Ring buffer of emitted blocks that may or may not yet be compressed.
  253. std::unique_ptr<BlockRep[]> ring_buffer;
  254. // log_2(ring buffer size), where ring buffer size must be a power of two
  255. const int ring_buffer_nbits;
  256. // ring buffer size - 1, to function as a bit mask for ring buffer positions
  257. // (e.g. given the ordinal number of a block)
  258. const uint32_t ring_buffer_mask;
  259. // Number of threads in worker_threads. (Emit thread doesn't count)
  260. const uint32_t num_worker_threads;
  261. // Rough upper bound on the sst file size contribution from blocks emitted
  262. // into the parallel compression ring buffer but not yet written. Tracks
  263. // uncompressed size, with trailer, until a block is compressed, then
  264. // compressed size until the block is written. (TODO: does not currently
  265. // account for block_align)
  266. RelaxedAtomic<uint64_t> estimated_inflight_size{0};
  267. // Thread objects for worker threads
  268. std::vector<port::Thread> worker_threads;
  269. // Working areas for data_block_compressor for each worker thread
  270. std::vector<WorkingAreaPair> working_areas;
  271. // Semaphores for threads to sleep when there's no available work for them
  272. // and to wake back up when someone determines there is available work (most
  273. // likely). Split between worker threads and emit thread because they can do
  274. // different kinds of work.
  275. CountingSemaphore idle_worker_sem{0};
  276. BinarySemaphore idle_emit_sem{0};
  277. // Primary atomic state of parallel compression, which includes a number of
  278. // state fields that are best updated atomically to avoid locking and/or to
  279. // simplify the interesting interleavings that have to be considered and
  280. // accommodated.
  281. struct State : public BitFields<uint64_t, State> {};
  282. ALIGN_AS(CACHE_LINE_SIZE) AcqRelBitFieldsAtomic<State> atomic_state;
  283. // The first field is a bit for each ring buffer slot (max 32) for whether
  284. // that slot is ready to be claimed for writing by a worker thread. Because
  285. // compressions might finish out-of-order, we need to track individually
  286. // whether they are finished, though this field doesn't differentiate
  287. // "compression completed" from "compression not started" because that can be
  288. // inferred from NextToCompress. A block might not enter this state, because
  289. // the same thread that compresses it can also immediately write the block if
  290. // it notices that the block is next to write.
  291. using NeedsWriter = UnsignedBitField<State, 32, NoPrevBitField>;
  292. // Track how many worker threads are in an idle state because there was no
  293. // available work and haven't been selected to wake back up.
  294. using IdleWorkerCount = UnsignedBitField<State, 5, NeedsWriter>;
  295. // Track whether the emit thread is an idle state because there was no
  296. // available work and hasn't been triggered to wake back up. The nature of
  297. // available work and atomic CAS assignment of work ensures at least one
  298. // thread is kept out of the idle state.
  299. using IdleEmitFlag = BoolBitField<State, IdleWorkerCount>;
  300. // Track whether threads should end when they finish available work because no
  301. // more blocks will be emitted.
  302. using NoMoreToEmitFlag = BoolBitField<State, IdleEmitFlag>;
  303. // Track whether threads should abort ASAP because of an error.
  304. using AbortFlag = BoolBitField<State, NoMoreToEmitFlag>;
  305. // Track three "NextTo" counters for the positions of the next block to write,
  306. // to start compression, and to emit into the ring buffer. If these counters
  307. // never overflowed / wrapped around, we would have next_to_write <=
  308. // next_to_compress <= next_to_emit because a block must be emitted before
  309. // compressed, and compressed (at least attempted) before writing. We need to
  310. // track more than ring_buffer_nbits of these counters to be able to
  311. // distinguish an empty ring buffer (next_to_write == next_to_emit) from a
  312. // full ring buffer (next_to_write != next_to_emit but equal under
  313. // ring_buffer_mask).
  314. using NextToWrite = UnsignedBitField<State, 8, AbortFlag>;
  315. using NextToCompress = UnsignedBitField<State, 8, NextToWrite>;
  316. using NextToEmit = UnsignedBitField<State, 8, NextToCompress>;
  317. static_assert(NextToEmit::kEndBit == 64);
  318. // BEGIN fields for use by the emit thread only. These can't live on the stack
  319. // because the emit thread frequently returns out of BlockBasedTableBuilder.
  320. ALIGN_AS(CACHE_LINE_SIZE)
  321. ThreadState emit_thread_state = ThreadState::kEmitting;
  322. // Ring buffer index that emit thread is operating on (for emitting and
  323. // compressing states)
  324. uint32_t emit_slot = 0;
  325. // Including some data to inform when to wake up idle worker threads (see
  326. // implementation for details)
  327. int32_t emit_counter_toward_wake_up = 0;
  328. int32_t emit_counter_for_wake_up = 0;
  329. static constexpr int32_t kMaxWakeupInterval = 8;
  330. // END fields for use by the emit thread only
  331. // TSAN on GCC has bugs that report false positives on this watchdog code.
  332. // Other efforts to work around the bug have failed, so to avoid those false
  333. // positive reports, we simply disable the watchdog when running under GCC
  334. // TSAN.
  335. #if !defined(NDEBUG) && !(defined(__GNUC__) && defined(__SANITIZE_THREAD__))
  336. #define BBTB_PC_WATCHDOG 1
  337. #endif
  338. #ifdef BBTB_PC_WATCHDOG
  339. // These are for an extra "watchdog" thread in DEBUG builds that heuristically
  340. // checks for the most likely deadlock conditions. False positives and false
  341. // negatives are technically possible.
  342. std::thread watchdog_thread;
  343. std::mutex watchdog_mutex;
  344. std::condition_variable watchdog_cv;
  345. bool shutdown_watchdog = false;
  346. RelaxedAtomic<uint32_t> live_workers{0};
  347. RelaxedAtomic<uint32_t> idling_workers{0};
  348. RelaxedAtomic<bool> live_emit{0};
  349. RelaxedAtomic<bool> idling_emit{0};
  350. #endif // BBTB_PC_WATCHDOG
  351. int ComputeRingBufferNbits(uint32_t parallel_threads) {
  352. // Ring buffer size is a power of two not to exceed 32 but otherwise
  353. // at least twice the number of threads.
  354. if (parallel_threads >= 9) {
  355. return 5;
  356. } else if (parallel_threads >= 5) {
  357. return 4;
  358. } else if (parallel_threads >= 3) {
  359. return 3;
  360. } else {
  361. assert(parallel_threads > 1);
  362. return 2;
  363. }
  364. }
  365. explicit ParallelCompressionRep(uint32_t parallel_threads)
  366. : ring_buffer_nbits(ComputeRingBufferNbits(parallel_threads)),
  367. ring_buffer_mask((uint32_t{1} << ring_buffer_nbits) - 1),
  368. num_worker_threads(std::min(parallel_threads, ring_buffer_mask)) {
  369. assert(num_worker_threads <= IdleWorkerCount::kMask);
  370. ring_buffer = std::make_unique<BlockRep[]>(ring_buffer_mask + 1);
  371. // Start by aggressively waking up idle workers
  372. emit_counter_for_wake_up = -static_cast<int32_t>(num_worker_threads);
  373. }
  374. ~ParallelCompressionRep() {
  375. #ifndef NDEBUG
  376. auto state = atomic_state.Load();
  377. if (state.Get<AbortFlag>() == false) {
  378. // Should be clear / cancelled out with normal shutdown
  379. assert(state.Get<NeedsWriter>() == 0);
  380. // Ring buffer reached empty state
  381. assert(state.Get<NextToWrite>() == state.Get<NextToCompress>());
  382. assert(state.Get<NextToCompress>() == state.Get<NextToEmit>());
  383. // Everything cancels out in inflight size
  384. assert(estimated_inflight_size.LoadRelaxed() == 0);
  385. }
  386. // All idling metadata cleaned up, properly tracked
  387. assert(state.Get<IdleWorkerCount>() == 0);
  388. assert(state.Get<IdleEmitFlag>() == false);
  389. // No excess in semaphores
  390. assert(!idle_emit_sem.TryAcquire());
  391. assert(!idle_worker_sem.TryAcquire());
  392. #endif // !NDEBUG
  393. }
  394. // The primary function for a thread transitioning from one state or work
  395. // assignment to the next. `slot` refers to a position in the ring buffer
  396. // for assigned emit, compression, or write work.
  397. //
  398. // Because both the emit thread and worker threads can work on compression,
  399. // this is a quasi-work-stealing parallel algorithm. (Enabling other threads
  400. // to do emit work would be quite challenging, and allowing the emit thread
  401. // to handle writes could create a bottle-neck.)
  402. //
  403. // This function is basically a CAS loop trying to pick the next piece of work
  404. // for this thread and retrying if CAS fails. This function also handles
  405. // thread idling when that's the appropriate assignment, continuing the loop
  406. // looking for productive work when woken from an idle state.
  407. //
  408. // Precondition: thread_state is appropriate for thread_kind and not kEnd. It
  409. // must match the previously returned state for that thread, and is only kIdle
  410. // for the thread on startup (though the kIdle state is used internal to the
  411. // function).
  412. //
  413. // Postcondition: thread_state is appropriate for thread_kind and not kIdle.
  414. // Except for kEnd state, the calling thread has exclusive access to
  415. // ring_buffer[slot] until next StateTransition().
  416. template <ThreadKind thread_kind>
  417. void StateTransition(
  418. /*in/out*/ ThreadState& thread_state,
  419. /*in/out*/ uint32_t& slot) {
  420. assert(slot <= ring_buffer_mask);
  421. // Last known value for atomic_state
  422. State seen_state = atomic_state.Load();
  423. for (;;) {
  424. if (seen_state.Get<AbortFlag>()) {
  425. thread_state = ThreadState::kEnd;
  426. return;
  427. }
  428. assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
  429. seen_state.Get<NextToCompress>()) <=
  430. ring_buffer_mask + 1);
  431. assert(static_cast<uint8_t>(seen_state.Get<NextToCompress>() -
  432. seen_state.Get<NextToWrite>()) <=
  433. ring_buffer_mask + 1);
  434. assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
  435. seen_state.Get<NextToWrite>()) <=
  436. ring_buffer_mask + 1);
  437. // Draft of the next proposed atomic_state. Start by marking completion of
  438. // the current thread's last work.
  439. State next_state = seen_state;
  440. bool wake_idle = false;
  441. switch (thread_state) {
  442. case ThreadState::kEmitting: {
  443. assert(thread_kind == ThreadKind::kEmitter);
  444. assert(slot == (next_state.Get<NextToEmit>() & ring_buffer_mask));
  445. next_state.Ref<NextToEmit>() += 1;
  446. // Check whether to wake up idle worker thread
  447. if (next_state.Get<IdleWorkerCount>() > 0 &&
  448. // The number of blocks for which compression hasn't started
  449. // is well over the number of active threads.
  450. static_cast<uint8_t>(next_state.Get<NextToEmit>() -
  451. next_state.Get<NextToCompress>()) >=
  452. (ring_buffer_mask + 1) / 4 +
  453. (num_worker_threads -
  454. next_state.Get<IdleWorkerCount>())) {
  455. // At first, emit_counter_for_wake_up is negative to aggressively
  456. // wake up idle worker threads. Then it backs off the interval at
  457. // which we wake up, up to some maximum that attempts to balance
  458. // maximum throughput and minimum CPU overhead.
  459. if (emit_counter_toward_wake_up >= emit_counter_for_wake_up) {
  460. // We reached a threshold to justify a wake-up.
  461. wake_idle = true;
  462. // Adjust idle count assuming we are going to own waking it up,
  463. // so no one else can duplicate that. (The idle count is really
  464. // the number idling for which no one yet owns waking them up.)
  465. next_state.Ref<IdleWorkerCount>() -= 1;
  466. // Reset the counter toward the threshold for wake-up
  467. emit_counter_toward_wake_up = 0;
  468. // Raise the threshold (up to some limit) to stabilize the number
  469. // of active threads after some ramp-up period.
  470. emit_counter_for_wake_up =
  471. std::min(emit_counter_for_wake_up + 1,
  472. static_cast<int32_t>(num_worker_threads +
  473. kMaxWakeupInterval));
  474. } else {
  475. // Advance closer to the threshold for justifying a wake-up
  476. emit_counter_toward_wake_up++;
  477. }
  478. }
  479. break;
  480. }
  481. case ThreadState::kIdle:
  482. // NOTE: thread that signalled to wake up already updated idle count
  483. // or marker. This is required to avoid overflow on the semaphore,
  484. // especially the binary semaphore for idle_emit_sem, and likely
  485. // desirable to avoid spurious/extra Release().
  486. break;
  487. case ThreadState::kCompressing:
  488. next_state.Ref<NeedsWriter>() |= uint32_t{1} << slot;
  489. if constexpr (thread_kind == ThreadKind::kEmitter) {
  490. if (next_state.Get<IdleWorkerCount>() == num_worker_threads) {
  491. // Work is available for a worker thread and none are running
  492. wake_idle = true;
  493. // Adjust idle count assuming we are going to own waking it up
  494. next_state.Ref<IdleWorkerCount>() -= 1;
  495. }
  496. }
  497. break;
  498. case ThreadState::kEnd:
  499. // Should have already recognized the end state
  500. assert(thread_state != ThreadState::kEnd);
  501. return;
  502. case ThreadState::kCompressingAndWriting:
  503. case ThreadState::kWriting:
  504. assert(thread_kind == ThreadKind::kWorker);
  505. assert((next_state.Get<NextToWrite>() & ring_buffer_mask) == slot);
  506. assert(next_state.Get<NextToCompress>() !=
  507. next_state.Get<NextToWrite>());
  508. assert(next_state.Get<NextToEmit>() != next_state.Get<NextToWrite>());
  509. assert((next_state.Get<NeedsWriter>() & (uint32_t{1} << slot)) == 0);
  510. next_state.Ref<NextToWrite>() += 1;
  511. if (next_state.Get<IdleEmitFlag>()) {
  512. wake_idle = true;
  513. // Clear idle emit flag assuming we are going to own waking it up
  514. next_state.Set<IdleEmitFlag>(false);
  515. }
  516. break;
  517. }
  518. // Find the next state, depending on the kind of thread
  519. ThreadState next_thread_state = ThreadState::kEnd;
  520. uint32_t next_slot = 0;
  521. if constexpr (thread_kind == ThreadKind::kEmitter) {
  522. // First priority is emitting more uncompressed blocks, if there's
  523. // room in the ring buffer.
  524. if (static_cast<uint8_t>(next_state.Get<NextToEmit>() -
  525. next_state.Get<NextToWrite>()) <=
  526. ring_buffer_mask) {
  527. // There is room
  528. next_thread_state = ThreadState::kEmitting;
  529. next_slot = next_state.Get<NextToEmit>() & ring_buffer_mask;
  530. }
  531. }
  532. if constexpr (thread_kind == ThreadKind::kWorker) {
  533. // First priority is writing next block to write, if it needs a writer
  534. // assigned to it
  535. uint32_t next_to_write_slot =
  536. next_state.Get<NextToWrite>() & ring_buffer_mask;
  537. uint32_t needs_writer_bit = uint32_t{1} << next_to_write_slot;
  538. if (next_state.Get<NeedsWriter>() & needs_writer_bit) {
  539. // Clear the "needs writer" marker on the slot
  540. next_state.Ref<NeedsWriter>() &= ~needs_writer_bit;
  541. // Take ownership of writing it
  542. next_thread_state = ThreadState::kWriting;
  543. next_slot = next_to_write_slot;
  544. }
  545. }
  546. // If didn't find higher priority work
  547. if (next_thread_state == ThreadState::kEnd) {
  548. if (next_state.Get<NextToCompress>() != next_state.Get<NextToEmit>()) {
  549. // Compression work is available, select that
  550. if (thread_kind == ThreadKind::kWorker &&
  551. next_state.Get<NextToCompress>() ==
  552. next_state.Get<NextToWrite>()) {
  553. next_thread_state = ThreadState::kCompressingAndWriting;
  554. } else {
  555. next_thread_state = ThreadState::kCompressing;
  556. }
  557. next_slot = next_state.Get<NextToCompress>() & ring_buffer_mask;
  558. next_state.Ref<NextToCompress>() += 1;
  559. } else if constexpr (thread_kind == ThreadKind::kEmitter) {
  560. // Emitter thread goes idle
  561. next_thread_state = ThreadState::kIdle;
  562. assert(next_state.Get<IdleEmitFlag>() == false);
  563. assert(next_state.Get<NoMoreToEmitFlag>() == false);
  564. next_state.Set<IdleEmitFlag>(true);
  565. } else if (next_state.Get<NoMoreToEmitFlag>()) {
  566. // Worker thread shall not idle if we are done emitting. At least
  567. // one worker will remain unblocked to finish writing
  568. next_thread_state = ThreadState::kEnd;
  569. } else {
  570. // Worker thread goes idle
  571. next_thread_state = ThreadState::kIdle;
  572. assert(next_state.Get<IdleWorkerCount>() < IdleWorkerCount::kMask);
  573. next_state.Ref<IdleWorkerCount>() += 1;
  574. }
  575. }
  576. assert(thread_state != ThreadState::kEnd);
  577. // Attempt to atomically apply the desired/computed state transition
  578. if (atomic_state.CasWeak(seen_state, next_state)) {
  579. // Success
  580. thread_state = next_thread_state;
  581. slot = next_slot;
  582. seen_state = next_state;
  583. if (wake_idle) {
  584. if constexpr (thread_kind == ThreadKind::kEmitter) {
  585. idle_worker_sem.Release();
  586. } else {
  587. idle_emit_sem.Release();
  588. }
  589. }
  590. if (thread_state != ThreadState::kIdle) {
  591. // Successfully transitioned to another useful state
  592. return;
  593. }
  594. // Handle idle state
  595. if constexpr (thread_kind == ThreadKind::kEmitter) {
  596. #ifdef BBTB_PC_WATCHDOG
  597. idling_emit.StoreRelaxed(true);
  598. Defer decr{[this]() { idling_emit.StoreRelaxed(false); }};
  599. #endif // BBTB_PC_WATCHDOG
  600. // Likely go to sleep
  601. idle_emit_sem.Acquire();
  602. } else {
  603. #ifdef BBTB_PC_WATCHDOG
  604. // Tracking for watchdog
  605. idling_workers.FetchAddRelaxed(1);
  606. Defer decr{[this]() { idling_workers.FetchSubRelaxed(1); }};
  607. #endif // BBTB_PC_WATCHDOG
  608. // Likely go to sleep
  609. idle_worker_sem.Acquire();
  610. }
  611. // Update state after sleep
  612. seen_state = atomic_state.Load();
  613. }
  614. // else loop and try again
  615. }
  616. }
  617. void EmitterStateTransition(
  618. /*in/out*/ ThreadState& thread_state,
  619. /*in/out*/ uint32_t& slot) {
  620. StateTransition<ThreadKind::kEmitter>(thread_state, slot);
  621. }
  622. void WorkerStateTransition(
  623. /*in/out*/ ThreadState& thread_state,
  624. /*in/out*/ uint32_t& slot) {
  625. StateTransition<ThreadKind::kWorker>(thread_state, slot);
  626. }
  627. // Exactly wake all idling threads (for an end state)
  628. void WakeAllIdle() {
  629. State old_state, new_state;
  630. auto transform =
  631. IdleEmitFlag::ClearTransform() + IdleWorkerCount::ClearTransform();
  632. atomic_state.Apply(transform, &old_state, &new_state);
  633. assert(new_state.Get<IdleEmitFlag>() == false);
  634. assert(new_state.Get<IdleWorkerCount>() == 0);
  635. if (old_state.Get<IdleEmitFlag>()) {
  636. idle_emit_sem.Release();
  637. }
  638. idle_worker_sem.Release(old_state.Get<IdleWorkerCount>());
  639. }
  640. // Called by emit thread if it is decided no more blocks will be emitted into
  641. // this SST file.
  642. void SetNoMoreToEmit(/*in/out*/ ThreadState& thread_state,
  643. /*in/out*/ uint32_t& slot) {
  644. (void)slot;
  645. State old_state;
  646. atomic_state.Apply(NoMoreToEmitFlag::SetTransform(), &old_state);
  647. assert(old_state.Get<NoMoreToEmitFlag>() == false);
  648. assert(slot == BitwiseAnd(old_state.Get<NextToEmit>(), ring_buffer_mask));
  649. assert(thread_state == ThreadState::kEmitting);
  650. thread_state = ThreadState::kEnd;
  651. WakeAllIdle();
  652. }
  653. // Called by any thread to abort parallel compression, etc. because of an
  654. // error.
  655. void SetAbort(/*in/out*/ ThreadState& thread_state) {
  656. State old_state;
  657. atomic_state.Apply(AbortFlag::SetTransform(), &old_state);
  658. if (old_state.Get<AbortFlag>() == false) {
  659. // First to set abort. Wake all workers and emitter
  660. WakeAllIdle();
  661. }
  662. thread_state = ThreadState::kEnd;
  663. }
  664. #ifdef BBTB_PC_WATCHDOG
  665. // Logic for the extra "watchdog" thread in DEBUG builds that heuristically
  666. // checks for the most likely deadlock conditions.
  667. //
  668. // Some ways to manually validate the watchdog:
  669. // * Insert
  670. // if (Random::GetTLSInstance()->OneIn(100)) {
  671. // sleep(100);
  672. // }
  673. // after either of the calls to semaphore Acquire above.
  674. // * Miss some Release()s in WakeAllIdle()
  675. //
  676. // and run table_test unit tests.
  677. void BGWatchdog() {
  678. int count_toward_deadlock_judgment = 0;
  679. for (;;) {
  680. // Check for termination condition: All workers and emit thread have
  681. // completed.
  682. if (live_workers.LoadRelaxed() == 0 && live_emit.LoadRelaxed() == false) {
  683. return;
  684. }
  685. // Check for potential deadlock condition
  686. if (idling_workers.LoadRelaxed() < live_workers.LoadRelaxed() ||
  687. (live_emit.LoadRelaxed() && !idling_emit.LoadRelaxed())) {
  688. // Someone is working, all good
  689. count_toward_deadlock_judgment = 0;
  690. } else {
  691. // Could be a deadlock state, but could also be a transient
  692. // state where someone has woken up but not cleared their idling flag.
  693. // Give it plenty of time and watchdog thread wake-ups before
  694. // declaring deadlock.
  695. count_toward_deadlock_judgment++;
  696. if (count_toward_deadlock_judgment >= 70) {
  697. fprintf(stderr,
  698. "Error: apparent deadlock in parallel compression. "
  699. "Aborting. %u / %u, %d / %d, %llx\n",
  700. (unsigned)idling_workers.LoadRelaxed(),
  701. (unsigned)live_workers.LoadRelaxed(),
  702. (int)idling_emit.LoadRelaxed(), (int)live_emit.LoadRelaxed(),
  703. (long long)atomic_state.Load().underlying);
  704. std::terminate();
  705. }
  706. }
  707. // Sleep for 1s at a time unless we are woken up because other threads
  708. // ended.
  709. std::unique_lock<std::mutex> lock(watchdog_mutex);
  710. if (!shutdown_watchdog) {
  711. watchdog_cv.wait_for(lock, std::chrono::seconds{1});
  712. }
  713. }
  714. }
  715. #endif // BBTB_PC_WATCHDOG
  716. };
  717. struct BlockBasedTableBuilder::Rep {
  718. const ImmutableOptions ioptions;
  719. // BEGIN from MutableCFOptions
  720. std::shared_ptr<const SliceTransform> prefix_extractor;
  721. // END from MutableCFOptions
  722. const WriteOptions write_options;
  723. const BlockBasedTableOptions table_options;
  724. const InternalKeyComparator& internal_comparator;
  725. // Size in bytes for the user-defined timestamps.
  726. size_t ts_sz;
  727. // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
  728. // user key will be stripped when creating the block based table. This
  729. // stripping happens for all user keys, including the keys in data block,
  730. // index block for data block, index block for index block (if index type is
  731. // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
  732. // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
  733. // deletion entries.
  734. // As long as the user keys are sorted when added via `Add` API, their logic
  735. // ordering won't change after timestamps are stripped. However, for each user
  736. // key to be logically equivalent before and after timestamp is stripped, the
  737. // user key should contain the minimum timestamp.
  738. bool persist_user_defined_timestamps;
  739. WritableFileWriter* file;
  740. // The current offset is only written by the current designated writer thread
  741. // but can be read by other threads to estimate current file size
  742. RelaxedAtomic<uint64_t> offset{0};
  743. size_t alignment;
  744. BlockBuilder data_block;
  745. // Buffers uncompressed data blocks to replay later. Needed when
  746. // compression dictionary is enabled so we can finalize the dictionary before
  747. // compressing any data blocks.
  748. std::vector<std::string> data_block_buffers;
  749. BlockBuilder range_del_block;
  750. InternalKeySliceTransform internal_prefix_transform;
  751. std::unique_ptr<IndexBuilder> index_builder;
  752. std::string index_separator_scratch;
  753. PartitionedIndexBuilder* p_index_builder_ = nullptr;
  754. std::string last_ikey; // Internal key or empty (unset)
  755. bool warm_cache = false;
  756. bool uses_explicit_compression_manager = false;
  757. uint64_t sample_for_compression;
  758. RelaxedAtomic<uint64_t> compressible_input_data_bytes{0};
  759. RelaxedAtomic<uint64_t> uncompressible_input_data_bytes{0};
  760. RelaxedAtomic<uint64_t> sampled_input_data_bytes{0};
  761. RelaxedAtomic<uint64_t> sampled_output_slow_data_bytes{0};
  762. RelaxedAtomic<uint64_t> sampled_output_fast_data_bytes{0};
  763. uint32_t compression_parallel_threads;
  764. int max_compressed_bytes_per_kb;
  765. size_t max_dict_sample_bytes = 0;
  766. // *** Compressors & decompressors - Yes, it seems like a lot here but ***
  767. // *** these are distinct fields to minimize extra conditionals and ***
  768. // *** field reads on hot code paths. ***
  769. // A compressor for blocks in general, without dictionary compression
  770. std::unique_ptr<Compressor> basic_compressor;
  771. // A compressor using dictionary compression (when applicable)
  772. std::unique_ptr<Compressor> compressor_with_dict;
  773. // Once configured/determined, points to one of the above Compressors to
  774. // use on data blocks.
  775. Compressor* data_block_compressor = nullptr;
  776. // A decompressor corresponding to basic_compressor (when non-nullptr).
  777. // Used for verification and cache warming.
  778. std::shared_ptr<Decompressor> basic_decompressor;
  779. // When needed, a decompressor for verifying compression using a
  780. // dictionary sampled/trained from this file.
  781. std::unique_ptr<Decompressor> verify_decompressor_with_dict;
  782. // When non-nullptr, compression should be verified with this corresponding
  783. // decompressor, except for data blocks. (Points to same as basic_decompressor
  784. // when verify_compression is set.)
  785. UnownedPtr<Decompressor> verify_decompressor;
  786. // Once configured/determined, points to one of the above Decompressors to use
  787. // in verifying data blocks.
  788. UnownedPtr<Decompressor> data_block_verify_decompressor;
  789. // Set of compression types used for blocks in this file (mixing compression
  790. // algorithms in a single file is allowed, using a CompressionManager)
  791. SmallEnumSet<CompressionType, kDisableCompressionOption>
  792. compression_types_used;
  793. // Working area for basic_compressor when compression_parallel_threads==1
  794. WorkingAreaPair basic_working_area;
  795. // Working area for data_block_compressor, for emit/compaction thread
  796. WorkingAreaPair data_block_working_area;
  797. size_t data_begin_offset = 0;
  798. TableProperties props;
  799. // States of the builder.
  800. //
  801. // - `kBuffered`: This is the initial state where zero or more data blocks are
  802. // accumulated uncompressed in-memory. From this state, call
  803. // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
  804. // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
  805. // state.
  806. //
  807. // - `kUnbuffered`: This is the state when compression dictionary is finalized
  808. // either because it wasn't enabled in the first place or it's been created
  809. // from sampling previously buffered data. In this state, blocks are simply
  810. // compressed/written out as they fill up. From this state, call `Finish()`
  811. // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
  812. // the partially created file.
  813. //
  814. // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
  815. // called, so the table builder is no longer usable. We must be in this
  816. // state by the time the destructor runs.
  817. enum class State {
  818. kBuffered,
  819. kUnbuffered,
  820. kClosed,
  821. };
  822. State state = State::kUnbuffered;
  823. // `kBuffered` state is allowed only as long as the buffering of uncompressed
  824. // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
  825. uint64_t buffer_limit = 0;
  826. std::shared_ptr<CacheReservationManager>
  827. compression_dict_buffer_cache_res_mgr;
  828. const bool use_delta_encoding_for_index_values;
  829. std::unique_ptr<FilterBlockBuilder> filter_builder;
  830. OffsetableCacheKey base_cache_key;
  831. const TableFileCreationReason reason;
  832. BlockHandle pending_handle; // Handle to add to index block
  833. GrowableBuffer single_threaded_compressed_output;
  834. std::unique_ptr<FlushBlockPolicy> flush_block_policy;
  835. std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
  836. std::unique_ptr<ParallelCompressionRep> pc_rep;
  837. RelaxedAtomic<uint64_t> worker_cpu_micros{0};
  838. BlockCreateContext create_context;
  839. // The size of the "tail" part of a SST file. "Tail" refers to
  840. // all blocks after data blocks till the end of the SST file.
  841. uint64_t tail_size;
  842. // The total size of all blocks in this file before they are compressed.
  843. // This is used for logging compaction stats.
  844. uint64_t pre_compression_size = 0;
  845. // See class Footer
  846. uint32_t base_context_checksum;
  847. uint64_t get_offset() { return offset.LoadRelaxed(); }
  848. void set_offset(uint64_t o) { offset.StoreRelaxed(o); }
  849. bool IsParallelCompressionActive() const { return pc_rep != nullptr; }
  850. Status GetStatus() { return GetIOStatus(); }
  851. bool StatusOk() {
  852. // The OK case is optimized with an atomic. Relaxed is sufficient because
  853. // if a thread other than the emit/compaction thread sets to non-OK it
  854. // will synchronize that in aborting parallel compression.
  855. bool ok = io_status_ok.LoadRelaxed();
  856. #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
  857. if (ok) {
  858. std::lock_guard<std::mutex> lock(io_status_mutex);
  859. // Double-check
  860. if (io_status_ok.LoadRelaxed()) {
  861. io_status.PermitUncheckedError();
  862. assert(io_status.ok());
  863. } else {
  864. ok = false;
  865. }
  866. }
  867. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  868. return ok;
  869. }
  870. IOStatus GetIOStatus() {
  871. // See StatusOk, which is optimized to avoid Status object copies
  872. if (LIKELY(io_status_ok.LoadRelaxed())) {
  873. #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
  874. std::lock_guard<std::mutex> lock(io_status_mutex);
  875. // Double-check
  876. if (io_status_ok.LoadRelaxed()) {
  877. io_status.PermitUncheckedError();
  878. assert(io_status.ok());
  879. } else {
  880. return io_status;
  881. }
  882. #endif // ROCKSDB_ASSERT_STATUS_CHECKED
  883. return IOStatus::OK();
  884. } else {
  885. std::lock_guard<std::mutex> lock(io_status_mutex);
  886. return io_status;
  887. }
  888. }
  889. // Avoid copying Status and IOStatus objects as much as possible.
  890. // Never erase an existing I/O status that is not OK.
  891. void SetStatus(Status&& s) {
  892. if (UNLIKELY(!s.ok()) && io_status_ok.LoadRelaxed()) {
  893. SetFailedIOStatus(status_to_io_status(std::move(s)));
  894. }
  895. }
  896. void SetStatus(const Status& s) {
  897. if (UNLIKELY(!s.ok()) && io_status_ok.LoadRelaxed()) {
  898. SetFailedIOStatus(status_to_io_status(Status(s)));
  899. }
  900. }
  901. void SetIOStatus(IOStatus&& ios) {
  902. if (UNLIKELY(!ios.ok()) && io_status_ok.LoadRelaxed()) {
  903. SetFailedIOStatus(std::move(ios));
  904. }
  905. }
  906. void SetIOStatus(const IOStatus& ios) {
  907. if (UNLIKELY(!ios.ok()) && io_status_ok.LoadRelaxed()) {
  908. SetFailedIOStatus(IOStatus(ios));
  909. }
  910. }
  911. void SetFailedIOStatus(IOStatus&& ios) {
  912. assert(!ios.ok());
  913. // Because !s.ok() is rare, locking is acceptable even in non-parallel case.
  914. std::lock_guard<std::mutex> lock(io_status_mutex);
  915. // Double-check
  916. if (io_status.ok()) {
  917. io_status = std::move(ios);
  918. io_status_ok.StoreRelaxed(false);
  919. }
  920. }
  921. Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
  922. WritableFileWriter* f)
  923. : ioptions(tbo.ioptions),
  924. prefix_extractor(tbo.moptions.prefix_extractor),
  925. write_options(tbo.write_options),
  926. table_options(table_opt),
  927. internal_comparator(tbo.internal_comparator),
  928. ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
  929. persist_user_defined_timestamps(
  930. tbo.ioptions.persist_user_defined_timestamps),
  931. file(f),
  932. alignment(table_options.block_align
  933. ? std::min(static_cast<size_t>(table_options.block_size),
  934. kDefaultPageSize)
  935. : 0),
  936. data_block(table_options.block_restart_interval,
  937. table_options.use_delta_encoding,
  938. false /* use_value_delta_encoding */,
  939. tbo.internal_comparator.user_comparator()
  940. ->CanKeysWithDifferentByteContentsBeEqual()
  941. ? BlockBasedTableOptions::kDataBlockBinarySearch
  942. : table_options.data_block_index_type,
  943. table_options.data_block_hash_table_util_ratio, ts_sz,
  944. persist_user_defined_timestamps),
  945. range_del_block(
  946. 1 /* block_restart_interval */, true /* use_delta_encoding */,
  947. false /* use_value_delta_encoding */,
  948. BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
  949. 0.75 /* data_block_hash_table_util_ratio */, ts_sz,
  950. persist_user_defined_timestamps),
  951. internal_prefix_transform(prefix_extractor.get()),
  952. sample_for_compression(tbo.moptions.sample_for_compression),
  953. compression_parallel_threads(
  954. ((table_opt.partition_filters &&
  955. !table_opt.decouple_partitioned_filters) ||
  956. table_options.user_defined_index_factory)
  957. ? uint32_t{1}
  958. : tbo.compression_opts.parallel_threads),
  959. max_compressed_bytes_per_kb(
  960. tbo.compression_opts.max_compressed_bytes_per_kb),
  961. use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
  962. !table_opt.block_align),
  963. reason(tbo.reason),
  964. flush_block_policy(
  965. table_options.flush_block_policy_factory->NewFlushBlockPolicy(
  966. table_options, data_block)),
  967. create_context(&table_options, &ioptions, ioptions.stats,
  968. /*decompressor=*/nullptr,
  969. tbo.moptions.block_protection_bytes_per_key,
  970. tbo.internal_comparator.user_comparator(),
  971. !use_delta_encoding_for_index_values,
  972. table_opt.index_type ==
  973. BlockBasedTableOptions::kBinarySearchWithFirstKey),
  974. tail_size(0) {
  975. FilterBuildingContext filter_context(table_options);
  976. filter_context.info_log = ioptions.logger;
  977. filter_context.column_family_name = tbo.column_family_name;
  978. filter_context.reason = reason;
  979. // Only populate other fields if known to be in LSM rather than
  980. // generating external SST file
  981. if (reason != TableFileCreationReason::kMisc) {
  982. filter_context.compaction_style = ioptions.compaction_style;
  983. filter_context.num_levels = ioptions.num_levels;
  984. filter_context.level_at_creation = tbo.level_at_creation;
  985. filter_context.is_bottommost = tbo.is_bottommost;
  986. assert(filter_context.level_at_creation < filter_context.num_levels);
  987. }
  988. props.compression_options =
  989. CompressionOptionsToString(tbo.compression_opts);
  990. auto* mgr = tbo.moptions.compression_manager.get();
  991. if (mgr == nullptr) {
  992. uses_explicit_compression_manager = false;
  993. mgr = GetBuiltinCompressionManager(
  994. GetCompressFormatForVersion(
  995. static_cast<uint32_t>(table_opt.format_version)))
  996. .get();
  997. } else {
  998. uses_explicit_compression_manager = true;
  999. // Stuff some extra debugging info as extra pseudo-options. Using
  1000. // underscore prefix to indicate they are special.
  1001. props.compression_options.append("_compression_manager=");
  1002. props.compression_options.append(mgr->GetId());
  1003. props.compression_options.append("; ");
  1004. }
  1005. // Sanitize to only allowing compression when it saves space.
  1006. max_compressed_bytes_per_kb =
  1007. std::min(int{1023}, tbo.compression_opts.max_compressed_bytes_per_kb);
  1008. basic_compressor = mgr->GetCompressorForSST(
  1009. filter_context, tbo.compression_opts, tbo.compression_type);
  1010. if (basic_compressor) {
  1011. if (table_options.enable_index_compression) {
  1012. basic_working_area.compress = basic_compressor->ObtainWorkingArea();
  1013. }
  1014. max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
  1015. CacheEntryRole::kDataBlock);
  1016. if (max_dict_sample_bytes > 0) {
  1017. state = State::kBuffered;
  1018. if (tbo.target_file_size == 0) {
  1019. buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
  1020. } else if (tbo.compression_opts.max_dict_buffer_bytes == 0) {
  1021. buffer_limit = tbo.target_file_size;
  1022. } else {
  1023. buffer_limit = std::min(tbo.target_file_size,
  1024. tbo.compression_opts.max_dict_buffer_bytes);
  1025. }
  1026. } else {
  1027. // No distinct data block compressor using dictionary
  1028. data_block_compressor = basic_compressor.get();
  1029. data_block_working_area.compress =
  1030. data_block_compressor->ObtainWorkingArea();
  1031. }
  1032. basic_decompressor = basic_compressor->GetOptimizedDecompressor();
  1033. if (basic_decompressor == nullptr) {
  1034. // Optimized version not available
  1035. basic_decompressor = mgr->GetDecompressor();
  1036. }
  1037. create_context.decompressor = basic_decompressor.get();
  1038. if (table_options.verify_compression) {
  1039. verify_decompressor = basic_decompressor.get();
  1040. if (table_options.enable_index_compression) {
  1041. basic_working_area.verify = verify_decompressor->ObtainWorkingArea(
  1042. basic_compressor->GetPreferredCompressionType());
  1043. }
  1044. if (state == State::kUnbuffered) {
  1045. assert(data_block_compressor);
  1046. data_block_verify_decompressor = verify_decompressor.get();
  1047. data_block_working_area.verify =
  1048. data_block_verify_decompressor->ObtainWorkingArea(
  1049. data_block_compressor->GetPreferredCompressionType());
  1050. }
  1051. }
  1052. }
  1053. switch (table_options.prepopulate_block_cache) {
  1054. case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
  1055. warm_cache = (reason == TableFileCreationReason::kFlush);
  1056. break;
  1057. case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
  1058. warm_cache = false;
  1059. break;
  1060. default:
  1061. // missing case
  1062. assert(false);
  1063. warm_cache = false;
  1064. }
  1065. const auto compress_dict_build_buffer_charged =
  1066. table_options.cache_usage_options.options_overrides
  1067. .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
  1068. .charged;
  1069. if (table_options.block_cache &&
  1070. (compress_dict_build_buffer_charged ==
  1071. CacheEntryRoleOptions::Decision::kEnabled ||
  1072. compress_dict_build_buffer_charged ==
  1073. CacheEntryRoleOptions::Decision::kFallback)) {
  1074. compression_dict_buffer_cache_res_mgr =
  1075. std::make_shared<CacheReservationManagerImpl<
  1076. CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
  1077. table_options.block_cache);
  1078. } else {
  1079. compression_dict_buffer_cache_res_mgr = nullptr;
  1080. }
  1081. if (table_options.index_type ==
  1082. BlockBasedTableOptions::kTwoLevelIndexSearch) {
  1083. p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
  1084. &internal_comparator, use_delta_encoding_for_index_values,
  1085. table_options, ts_sz, persist_user_defined_timestamps);
  1086. index_builder.reset(p_index_builder_);
  1087. } else {
  1088. index_builder.reset(IndexBuilder::CreateIndexBuilder(
  1089. table_options.index_type, &internal_comparator,
  1090. &this->internal_prefix_transform, use_delta_encoding_for_index_values,
  1091. table_options, ts_sz, persist_user_defined_timestamps));
  1092. }
  1093. // If user_defined_index_factory is provided, wrap the index builder with
  1094. // UserDefinedIndexWrapper
  1095. if (table_options.user_defined_index_factory != nullptr) {
  1096. if (tbo.moptions.compression_opts.parallel_threads > 1 ||
  1097. tbo.moptions.bottommost_compression_opts.parallel_threads > 1) {
  1098. SetStatus(
  1099. Status::InvalidArgument("user_defined_index_factory not supported "
  1100. "with parallel compression"));
  1101. } else {
  1102. std::unique_ptr<UserDefinedIndexBuilder> user_defined_index_builder;
  1103. UserDefinedIndexOption udi_options;
  1104. udi_options.comparator = internal_comparator.user_comparator();
  1105. auto s = table_options.user_defined_index_factory->NewBuilder(
  1106. udi_options, user_defined_index_builder);
  1107. if (!s.ok()) {
  1108. SetStatus(s);
  1109. } else {
  1110. if (user_defined_index_builder != nullptr) {
  1111. index_builder = std::make_unique<UserDefinedIndexBuilderWrapper>(
  1112. std::string(table_options.user_defined_index_factory->Name()),
  1113. std::move(index_builder), std::move(user_defined_index_builder),
  1114. &internal_comparator, ts_sz, persist_user_defined_timestamps);
  1115. }
  1116. }
  1117. }
  1118. }
  1119. if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
  1120. // Apply optimize_filters_for_hits setting here when applicable by
  1121. // skipping filter generation
  1122. filter_builder.reset();
  1123. } else if (tbo.skip_filters) {
  1124. // For SstFileWriter skip_filters
  1125. filter_builder.reset();
  1126. } else if (!table_options.filter_policy) {
  1127. // Null filter_policy -> no filter
  1128. filter_builder.reset();
  1129. } else {
  1130. filter_builder.reset(CreateFilterBlockBuilder(
  1131. ioptions, tbo.moptions, filter_context,
  1132. use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
  1133. persist_user_defined_timestamps));
  1134. }
  1135. assert(tbo.internal_tbl_prop_coll_factories);
  1136. for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
  1137. assert(factory);
  1138. std::unique_ptr<InternalTblPropColl> collector{
  1139. factory->CreateInternalTblPropColl(
  1140. tbo.column_family_id, tbo.level_at_creation,
  1141. tbo.ioptions.num_levels,
  1142. tbo.last_level_inclusive_max_seqno_threshold)};
  1143. if (collector) {
  1144. table_properties_collectors.emplace_back(std::move(collector));
  1145. }
  1146. }
  1147. table_properties_collectors.emplace_back(
  1148. std::make_unique<BlockBasedTablePropertiesCollector>(
  1149. table_options.index_type, table_options.whole_key_filtering,
  1150. prefix_extractor != nullptr,
  1151. table_options.decouple_partitioned_filters));
  1152. if (ts_sz > 0 && persist_user_defined_timestamps) {
  1153. table_properties_collectors.emplace_back(
  1154. std::make_unique<TimestampTablePropertiesCollector>(
  1155. tbo.internal_comparator.user_comparator()));
  1156. }
  1157. // These are only needed for populating table properties
  1158. props.column_family_id = tbo.column_family_id;
  1159. props.column_family_name = tbo.column_family_name;
  1160. props.oldest_key_time = tbo.oldest_key_time;
  1161. props.newest_key_time = tbo.newest_key_time;
  1162. props.file_creation_time = tbo.file_creation_time;
  1163. props.orig_file_number = tbo.cur_file_num;
  1164. props.db_id = tbo.db_id;
  1165. props.db_session_id = tbo.db_session_id;
  1166. props.db_host_id = ioptions.db_host_id;
  1167. props.format_version = table_options.format_version;
  1168. if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
  1169. ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
  1170. }
  1171. // Default is UINT64_MAX for unknown. Setting it to 0 here
  1172. // to allow updating it by taking max in BlockBasedTableBuilder::Add().
  1173. props.key_largest_seqno = 0;
  1174. // Default is UINT64_MAX for unknown.
  1175. props.key_smallest_seqno = UINT64_MAX;
  1176. PrePopulateCompressionProperties(mgr);
  1177. if (FormatVersionUsesContextChecksum(table_options.format_version)) {
  1178. // Must be non-zero and semi- or quasi-random
  1179. // TODO: ideally guaranteed different for related files (e.g. use file
  1180. // number and db_session, for benefit of SstFileWriter)
  1181. do {
  1182. base_context_checksum = Random::GetTLSInstance()->Next();
  1183. } while (UNLIKELY(base_context_checksum == 0));
  1184. } else {
  1185. base_context_checksum = 0;
  1186. }
  1187. if (alignment > 0 && basic_compressor) {
  1188. // With better sanitization in `CompactionPicker::CompactFiles()`, we
  1189. // would not need to handle this case here and could change it to an
  1190. // assertion instead.
  1191. SetStatus(Status::InvalidArgument(
  1192. "Enable block_align, but compression enabled"));
  1193. }
  1194. }
  1195. ~Rep() {
  1196. // Must have been cleaned up by StopParallelCompression
  1197. assert(pc_rep == nullptr);
  1198. }
  1199. Rep(const Rep&) = delete;
  1200. Rep& operator=(const Rep&) = delete;
  1201. void PrePopulateCompressionProperties(UnownedPtr<CompressionManager> mgr) {
  1202. if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
  1203. assert(mgr);
  1204. // Use newer compression_name property
  1205. props.compression_name.reserve(32);
  1206. // If compression is disabled, use empty manager name
  1207. if (basic_compressor) {
  1208. props.compression_name.append(mgr->CompatibilityName());
  1209. }
  1210. props.compression_name.push_back(';');
  1211. // Rest of property to be filled out at the end of building the file
  1212. } else {
  1213. // Use legacy compression_name property, populated at the end of
  1214. // building the file. Not compatible with compression managers using
  1215. // custom algorithms / compression types.
  1216. assert(Slice(mgr->CompatibilityName())
  1217. .compare(GetBuiltinCompressionManager(
  1218. GetCompressFormatForVersion(
  1219. static_cast<uint32_t>(props.format_version)))
  1220. ->CompatibilityName()) == 0);
  1221. }
  1222. }
  1223. void PostPopulateCompressionProperties() {
  1224. // Do not include "no compression" in the set. It's not really useful
  1225. // information whether there are any uncompressed blocks. Some kinds of
  1226. // blocks are never compressed anyway.
  1227. compression_types_used.Remove(kNoCompression);
  1228. size_t ctype_count = compression_types_used.count();
  1229. if (uses_explicit_compression_manager) {
  1230. // Stuff some extra debugging info as extra pseudo-options. Using
  1231. // underscore prefix to indicate they are special.
  1232. std::string& compression_options = props.compression_options;
  1233. compression_options.append("_compressor=");
  1234. compression_options.append(data_block_compressor
  1235. ? data_block_compressor->GetId()
  1236. : std::string{});
  1237. compression_options.append("; ");
  1238. } else {
  1239. // No explicit compression manager
  1240. assert(compression_types_used.count() <= 1);
  1241. }
  1242. std::string& compression_name = props.compression_name;
  1243. if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
  1244. // Fill in extended field of "compression name" property, which is the
  1245. // set of compression types used, sorted by unsigned byte and then hex
  1246. // encoded with two digits each (so that table properties are human
  1247. // readable).
  1248. assert(*compression_name.rbegin() == ';');
  1249. size_t pos = compression_name.size();
  1250. // Make space for the field contents
  1251. compression_name.append(ctype_count * 2, '\0');
  1252. char* ptr = compression_name.data() + pos;
  1253. // Populate the field contents
  1254. for (CompressionType t : compression_types_used) {
  1255. PutBaseChars<16>(&ptr, /*n=*/2, static_cast<unsigned char>(t),
  1256. /*uppercase=*/true);
  1257. }
  1258. assert(ptr == compression_name.data() + pos + ctype_count * 2);
  1259. // Allow additional fields in the future
  1260. compression_name.push_back(';');
  1261. } else {
  1262. // Use legacy compression naming. To adhere to requirements described in
  1263. // TableProperties::compression_name, we might have to replace the name
  1264. // based on the legacy configured compression type.
  1265. assert(compression_name.empty());
  1266. if (ctype_count == 0) {
  1267. // We could get a slight performance boost in the reader by marking
  1268. // the file as "no compression" if compression is configured but
  1269. // consistently rejected, but that would give misleading info for
  1270. // debugging purposes. So instead we record the configured compression
  1271. // type, matching the historical behavior.
  1272. if (data_block_compressor) {
  1273. compression_name = CompressionTypeToString(
  1274. data_block_compressor->GetPreferredCompressionType());
  1275. } else {
  1276. assert(basic_compressor == nullptr);
  1277. compression_name = CompressionTypeToString(kNoCompression);
  1278. }
  1279. } else if (compression_types_used.Contains(kZSTD)) {
  1280. compression_name = CompressionTypeToString(kZSTD);
  1281. } else {
  1282. compression_name =
  1283. CompressionTypeToString(*compression_types_used.begin());
  1284. }
  1285. }
  1286. }
  1287. private:
  1288. // Synchronize io_status to be readable/writable across threads, but
  1289. // optimize for the OK case
  1290. std::mutex io_status_mutex;
  1291. RelaxedAtomic<bool> io_status_ok{true};
  1292. IOStatus io_status;
  1293. };
  1294. BlockBasedTableBuilder::BlockBasedTableBuilder(
  1295. const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
  1296. WritableFileWriter* file) {
  1297. BlockBasedTableOptions sanitized_table_options(table_options);
  1298. auto ucmp = tbo.internal_comparator.user_comparator();
  1299. assert(ucmp);
  1300. (void)ucmp; // avoids unused variable error.
  1301. rep_ = std::make_unique<Rep>(sanitized_table_options, tbo, file);
  1302. TEST_SYNC_POINT_CALLBACK(
  1303. "BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
  1304. const_cast<TableProperties*>(&rep_->props));
  1305. BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
  1306. tbo.cur_file_num, &rep_->base_cache_key);
  1307. MaybeStartParallelCompression();
  1308. if (!rep_->IsParallelCompressionActive() && rep_->basic_compressor) {
  1309. rep_->single_threaded_compressed_output.ResetForSize(
  1310. table_options.block_size);
  1311. }
  1312. }
  1313. BlockBasedTableBuilder::~BlockBasedTableBuilder() {
  1314. // Catch errors where caller forgot to call Finish()
  1315. assert(rep_->state == Rep::State::kClosed);
  1316. }
  1317. void BlockBasedTableBuilder::Add(const Slice& ikey, const Slice& value) {
  1318. Rep* r = rep_.get();
  1319. assert(rep_->state != Rep::State::kClosed);
  1320. if (UNLIKELY(!ok())) {
  1321. return;
  1322. }
  1323. ValueType value_type;
  1324. SequenceNumber seq;
  1325. UnPackSequenceAndType(ExtractInternalKeyFooter(ikey), &seq, &value_type);
  1326. r->props.key_largest_seqno = std::max(r->props.key_largest_seqno, seq);
  1327. r->props.key_smallest_seqno = std::min(r->props.key_smallest_seqno, seq);
  1328. if (IsValueType(value_type)) {
  1329. #ifndef NDEBUG
  1330. if (r->props.num_entries > r->props.num_range_deletions) {
  1331. assert(r->internal_comparator.Compare(ikey, Slice(r->last_ikey)) > 0);
  1332. }
  1333. bool skip = false;
  1334. TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Add::skip", (void*)&skip);
  1335. if (skip) {
  1336. return;
  1337. }
  1338. #endif // !NDEBUG
  1339. auto should_flush = r->flush_block_policy->Update(ikey, value);
  1340. if (should_flush) {
  1341. assert(!r->data_block.empty());
  1342. Flush(/*first_key_in_next_block=*/&ikey);
  1343. }
  1344. // Note: PartitionedFilterBlockBuilder with
  1345. // decouple_partitioned_filters=false requires key being added to filter
  1346. // builder after being added to and "finished" in the index builder, so
  1347. // forces no parallel compression (logic in Rep constructor).
  1348. if (r->state == Rep::State::kUnbuffered) {
  1349. if (r->filter_builder != nullptr) {
  1350. r->filter_builder->AddWithPrevKey(
  1351. ExtractUserKeyAndStripTimestamp(ikey, r->ts_sz),
  1352. r->last_ikey.empty()
  1353. ? Slice{}
  1354. : ExtractUserKeyAndStripTimestamp(r->last_ikey, r->ts_sz));
  1355. }
  1356. }
  1357. r->data_block.AddWithLastKey(ikey, value, r->last_ikey);
  1358. r->last_ikey.assign(ikey.data(), ikey.size());
  1359. assert(!r->last_ikey.empty());
  1360. if (r->state == Rep::State::kBuffered) {
  1361. // Buffered keys will be replayed from data_block_buffers during
  1362. // `Finish()` once compression dictionary has been finalized.
  1363. } else {
  1364. r->index_builder->OnKeyAdded(ikey, value);
  1365. }
  1366. // TODO offset passed in is not accurate for parallel compression case
  1367. NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
  1368. r->table_properties_collectors,
  1369. r->ioptions.logger);
  1370. } else if (value_type == kTypeRangeDeletion) {
  1371. Slice persisted_end = value;
  1372. // When timestamps should not be persisted, we physically strip away range
  1373. // tombstone end key's user timestamp before passing it along to block
  1374. // builder. Physically stripping away start key's user timestamp is
  1375. // handled at the block builder level in the same way as the other data
  1376. // blocks.
  1377. if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
  1378. persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
  1379. }
  1380. r->range_del_block.Add(ikey, persisted_end);
  1381. // TODO offset passed in is not accurate for parallel compression case
  1382. NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
  1383. r->table_properties_collectors,
  1384. r->ioptions.logger);
  1385. } else {
  1386. assert(false);
  1387. r->SetStatus(Status::InvalidArgument(
  1388. "BlockBasedBuilder::Add() received a key with invalid value type " +
  1389. std::to_string(static_cast<unsigned int>(value_type))));
  1390. return;
  1391. }
  1392. r->props.num_entries++;
  1393. r->props.raw_key_size += ikey.size();
  1394. if (!r->persist_user_defined_timestamps) {
  1395. r->props.raw_key_size -= r->ts_sz;
  1396. }
  1397. r->props.raw_value_size += value.size();
  1398. if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
  1399. value_type == kTypeDeletionWithTimestamp) {
  1400. r->props.num_deletions++;
  1401. } else if (value_type == kTypeRangeDeletion) {
  1402. r->props.num_deletions++;
  1403. r->props.num_range_deletions++;
  1404. } else if (value_type == kTypeMerge) {
  1405. r->props.num_merge_operands++;
  1406. }
  1407. }
  1408. void BlockBasedTableBuilder::Flush(const Slice* first_key_in_next_block) {
  1409. Rep* r = rep_.get();
  1410. assert(rep_->state != Rep::State::kClosed);
  1411. if (UNLIKELY(!ok())) {
  1412. return;
  1413. }
  1414. if (r->data_block.empty()) {
  1415. return;
  1416. }
  1417. Slice uncompressed_block_data = r->data_block.Finish();
  1418. // NOTE: compression sampling is done here in the same thread as building
  1419. // the uncompressed block because of the requirements to call table
  1420. // property collectors:
  1421. // * BlockAdd function expects block_compressed_bytes_{fast,slow} for
  1422. // historical reasons. Probably a hassle to remove.
  1423. // * Collector is not thread safe so calls need to be
  1424. // serialized/synchronized.
  1425. // * Ideally, AddUserKey and BlockAdd calls need to line up such that a
  1426. // reported block corresponds to all the keys reported since the previous
  1427. // block.
  1428. // If requested, we sample one in every N block with a
  1429. // fast and slow compression algorithm and report the stats.
  1430. // The users can use these stats to decide if it is worthwhile
  1431. // enabling compression and they also get a hint about which
  1432. // compression algorithm wil be beneficial.
  1433. if (r->sample_for_compression > 0 &&
  1434. Random::GetTLSInstance()->OneIn(
  1435. static_cast<int>(r->sample_for_compression))) {
  1436. std::string sampled_output_fast;
  1437. std::string sampled_output_slow;
  1438. // Sampling with a fast compression algorithm
  1439. if (LZ4_Supported() || Snappy_Supported()) {
  1440. CompressionType c =
  1441. LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
  1442. CompressionOptions options;
  1443. CompressionContext context(c, options);
  1444. CompressionInfo info_tmp(options, context,
  1445. CompressionDict::GetEmptyDict(), c);
  1446. OLD_CompressData(
  1447. uncompressed_block_data, info_tmp,
  1448. GetCompressFormatForVersion(r->table_options.format_version),
  1449. &sampled_output_fast);
  1450. }
  1451. // Sampling with a slow but high-compression algorithm
  1452. if (ZSTD_Supported() || Zlib_Supported()) {
  1453. CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
  1454. CompressionOptions options;
  1455. CompressionContext context(c, options);
  1456. CompressionInfo info_tmp(options, context,
  1457. CompressionDict::GetEmptyDict(), c);
  1458. OLD_CompressData(
  1459. uncompressed_block_data, info_tmp,
  1460. GetCompressFormatForVersion(r->table_options.format_version),
  1461. &sampled_output_slow);
  1462. }
  1463. if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
  1464. // Currently compression sampling is only enabled for data block.
  1465. r->sampled_input_data_bytes.FetchAddRelaxed(
  1466. uncompressed_block_data.size());
  1467. r->sampled_output_slow_data_bytes.FetchAddRelaxed(
  1468. sampled_output_slow.size());
  1469. r->sampled_output_fast_data_bytes.FetchAddRelaxed(
  1470. sampled_output_fast.size());
  1471. }
  1472. NotifyCollectTableCollectorsOnBlockAdd(
  1473. r->table_properties_collectors, uncompressed_block_data.size(),
  1474. sampled_output_slow.size(), sampled_output_fast.size());
  1475. } else {
  1476. NotifyCollectTableCollectorsOnBlockAdd(
  1477. r->table_properties_collectors, uncompressed_block_data.size(),
  1478. 0 /*block_compressed_bytes_slow*/, 0 /*block_compressed_bytes_fast*/);
  1479. }
  1480. if (rep_->state == Rep::State::kBuffered) {
  1481. std::string uncompressed_block_holder;
  1482. uncompressed_block_holder.reserve(rep_->table_options.block_size);
  1483. r->data_block.SwapAndReset(uncompressed_block_holder);
  1484. assert(uncompressed_block_data.size() == uncompressed_block_holder.size());
  1485. rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_holder));
  1486. rep_->data_begin_offset += uncompressed_block_data.size();
  1487. MaybeEnterUnbuffered(first_key_in_next_block);
  1488. } else {
  1489. if (r->IsParallelCompressionActive()) {
  1490. EmitBlockForParallel(r->data_block.MutableBuffer(), r->last_ikey,
  1491. first_key_in_next_block);
  1492. } else {
  1493. EmitBlock(r->data_block.MutableBuffer(), r->last_ikey,
  1494. first_key_in_next_block);
  1495. }
  1496. r->data_block.Reset();
  1497. }
  1498. }
  1499. void BlockBasedTableBuilder::EmitBlockForParallel(
  1500. std::string& uncompressed, const Slice& last_key_in_current_block,
  1501. const Slice* first_key_in_next_block) {
  1502. Rep* r = rep_.get();
  1503. assert(r->state == Rep::State::kUnbuffered);
  1504. assert(uncompressed.size() > 0);
  1505. auto& pc_rep = *r->pc_rep;
  1506. // Can emit the uncompressed block into the ring buffer
  1507. assert(pc_rep.emit_thread_state ==
  1508. ParallelCompressionRep::ThreadState::kEmitting);
  1509. auto* block_rep = &pc_rep.ring_buffer[pc_rep.emit_slot];
  1510. pc_rep.estimated_inflight_size.FetchAddRelaxed(uncompressed.size() +
  1511. kBlockTrailerSize);
  1512. std::swap(uncompressed, block_rep->uncompressed);
  1513. r->index_builder->PrepareIndexEntry(last_key_in_current_block,
  1514. first_key_in_next_block,
  1515. block_rep->prepared_index_entry.get());
  1516. block_rep->compressed.Reset();
  1517. block_rep->compression_type = kNoCompression;
  1518. // Might need to take up some compression work before we are able to
  1519. // resume emitting the next uncompressed block.
  1520. for (;;) {
  1521. pc_rep.EmitterStateTransition(pc_rep.emit_thread_state, pc_rep.emit_slot);
  1522. if (pc_rep.emit_thread_state ==
  1523. ParallelCompressionRep::ThreadState::kCompressing) {
  1524. // Took up some compression work to help unblock ourself
  1525. block_rep = &pc_rep.ring_buffer[pc_rep.emit_slot];
  1526. Status s = CompressAndVerifyBlock(
  1527. block_rep->uncompressed, /*is_data_block=*/true,
  1528. r->data_block_working_area, &block_rep->compressed,
  1529. &block_rep->compression_type);
  1530. if (UNLIKELY(!s.ok())) {
  1531. r->SetStatus(s);
  1532. pc_rep.SetAbort(pc_rep.emit_thread_state);
  1533. break;
  1534. }
  1535. } else {
  1536. assert(pc_rep.emit_thread_state !=
  1537. ParallelCompressionRep::ThreadState::kCompressingAndWriting);
  1538. assert(pc_rep.emit_thread_state !=
  1539. ParallelCompressionRep::ThreadState::kWriting);
  1540. assert(pc_rep.emit_thread_state !=
  1541. ParallelCompressionRep::ThreadState::kIdle);
  1542. // Either emitting or end state.
  1543. // Detect nothing more to emit and set if so.
  1544. if (first_key_in_next_block == nullptr &&
  1545. pc_rep.emit_thread_state ==
  1546. ParallelCompressionRep::ThreadState::kEmitting) {
  1547. pc_rep.SetNoMoreToEmit(pc_rep.emit_thread_state, pc_rep.emit_slot);
  1548. }
  1549. break;
  1550. }
  1551. }
  1552. }
  1553. void BlockBasedTableBuilder::EmitBlock(std::string& uncompressed,
  1554. const Slice& last_key_in_current_block,
  1555. const Slice* first_key_in_next_block) {
  1556. Rep* r = rep_.get();
  1557. assert(r->state == Rep::State::kUnbuffered);
  1558. // Single-threaded context only
  1559. assert(!r->IsParallelCompressionActive());
  1560. assert(uncompressed.size() > 0);
  1561. // When data blocks are aligned with super block alignment, delta encoding
  1562. // needs to be skipped for the first block after padding.
  1563. bool skip_delta_encoding = false;
  1564. WriteBlock(uncompressed, &r->pending_handle, BlockType::kData,
  1565. &skip_delta_encoding);
  1566. if (LIKELY(ok())) {
  1567. // We do not emit the index entry for a block until we have seen the
  1568. // first key for the next data block. This allows us to use shorter
  1569. // keys in the index block. For example, consider a block boundary
  1570. // between the keys "the quick brown fox" and "the who". We can use
  1571. // "the r" as the key for the index block entry since it is >= all
  1572. // entries in the first block and < all entries in subsequent
  1573. // blocks.
  1574. r->index_builder->AddIndexEntry(
  1575. last_key_in_current_block, first_key_in_next_block, r->pending_handle,
  1576. &r->index_separator_scratch, skip_delta_encoding);
  1577. }
  1578. }
  1579. void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
  1580. BlockHandle* handle,
  1581. BlockType block_type,
  1582. bool* skip_delta_encoding) {
  1583. Rep* r = rep_.get();
  1584. assert(r->state == Rep::State::kUnbuffered);
  1585. // Single-threaded context only
  1586. assert(!r->IsParallelCompressionActive());
  1587. CompressionType type;
  1588. bool is_data_block = block_type == BlockType::kData;
  1589. Status compress_status = CompressAndVerifyBlock(
  1590. uncompressed_block_data, is_data_block,
  1591. is_data_block ? r->data_block_working_area : r->basic_working_area,
  1592. &r->single_threaded_compressed_output, &type);
  1593. r->SetStatus(compress_status);
  1594. if (UNLIKELY(!ok())) {
  1595. return;
  1596. }
  1597. TEST_SYNC_POINT_CALLBACK(
  1598. "BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
  1599. &r->single_threaded_compressed_output);
  1600. WriteMaybeCompressedBlock(
  1601. type == kNoCompression ? uncompressed_block_data
  1602. : Slice(r->single_threaded_compressed_output),
  1603. type, handle, block_type, &uncompressed_block_data, skip_delta_encoding);
  1604. r->single_threaded_compressed_output.Reset();
  1605. if (is_data_block) {
  1606. r->props.data_size = r->get_offset();
  1607. r->props.uncompressed_data_size += uncompressed_block_data.size();
  1608. ++r->props.num_data_blocks;
  1609. }
  1610. }
  1611. uint64_t BlockBasedTableBuilder::GetWorkerCPUMicros() const {
  1612. return rep_->worker_cpu_micros.LoadRelaxed();
  1613. }
  1614. void BlockBasedTableBuilder::BGWorker(WorkingAreaPair& working_area) {
  1615. // Record CPU usage of this thread
  1616. const uint64_t start_cpu_micros =
  1617. rep_->ioptions.env->GetSystemClock()->CPUMicros();
  1618. Defer log_cpu{[this, start_cpu_micros]() {
  1619. rep_->worker_cpu_micros.FetchAddRelaxed(
  1620. rep_->ioptions.env->GetSystemClock()->CPUMicros() - start_cpu_micros);
  1621. }};
  1622. auto& pc_rep = *rep_->pc_rep;
  1623. #ifdef BBTB_PC_WATCHDOG
  1624. pc_rep.live_workers.FetchAddRelaxed(1);
  1625. Defer decr{[&pc_rep]() { pc_rep.live_workers.FetchSubRelaxed(1); }};
  1626. #endif // BBTB_PC_WATCHDOG
  1627. ParallelCompressionRep::ThreadState thread_state =
  1628. ParallelCompressionRep::ThreadState::kIdle;
  1629. uint32_t slot = 0;
  1630. // Workers should avoid checking the shared status (e.g. ok()) to minimize
  1631. // potential data dependencies across threads. If another thread hits an
  1632. // error, we will pick up the kEnd state from the abort.
  1633. IOStatus ios;
  1634. do {
  1635. pc_rep.WorkerStateTransition(thread_state, slot);
  1636. ParallelCompressionRep::BlockRep* block_rep = &pc_rep.ring_buffer[slot];
  1637. auto compress_fn = [this, block_rep, &ios, &working_area]() {
  1638. ios = status_to_io_status(CompressAndVerifyBlock(
  1639. block_rep->uncompressed, /*is_data_block=*/true, working_area,
  1640. &block_rep->compressed, &block_rep->compression_type));
  1641. };
  1642. auto write_fn = [this, block_rep, &ios]() {
  1643. Slice compressed = block_rep->compressed;
  1644. Slice uncompressed = block_rep->uncompressed;
  1645. bool skip_delta_encoding = false;
  1646. ios = WriteMaybeCompressedBlockImpl(
  1647. block_rep->compression_type == kNoCompression ? uncompressed
  1648. : compressed,
  1649. block_rep->compression_type, &rep_->pending_handle, BlockType::kData,
  1650. &uncompressed, &skip_delta_encoding);
  1651. if (LIKELY(ios.ok())) {
  1652. rep_->props.data_size = rep_->get_offset();
  1653. rep_->props.uncompressed_data_size += block_rep->uncompressed.size();
  1654. ++rep_->props.num_data_blocks;
  1655. rep_->index_builder->FinishIndexEntry(
  1656. rep_->pending_handle, block_rep->prepared_index_entry.get(),
  1657. skip_delta_encoding);
  1658. }
  1659. };
  1660. switch (thread_state) {
  1661. case ParallelCompressionRep::ThreadState::kEnd:
  1662. // All done
  1663. assert(ios.ok());
  1664. return;
  1665. case ParallelCompressionRep::ThreadState::kCompressing:
  1666. compress_fn();
  1667. break;
  1668. case ParallelCompressionRep::ThreadState::kCompressingAndWriting:
  1669. compress_fn();
  1670. if (LIKELY(ios.ok())) {
  1671. write_fn();
  1672. }
  1673. break;
  1674. case ParallelCompressionRep::ThreadState::kWriting:
  1675. write_fn();
  1676. break;
  1677. case ParallelCompressionRep::ThreadState::kEmitting:
  1678. // Shouldn't happen
  1679. assert(thread_state != ParallelCompressionRep::ThreadState::kEmitting);
  1680. break;
  1681. case ParallelCompressionRep::ThreadState::kIdle:
  1682. // Shouldn't happen
  1683. assert(thread_state != ParallelCompressionRep::ThreadState::kIdle);
  1684. break;
  1685. default:
  1686. assert(false);
  1687. break;
  1688. }
  1689. } while (LIKELY(ios.ok()));
  1690. // Hit an error, so abort
  1691. rep_->SetIOStatus(ios);
  1692. pc_rep.SetAbort(thread_state);
  1693. }
  1694. Status BlockBasedTableBuilder::CompressAndVerifyBlock(
  1695. const Slice& uncompressed_block_data, bool is_data_block,
  1696. WorkingAreaPair& working_area, GrowableBuffer* compressed_output,
  1697. CompressionType* result_compression_type) {
  1698. Rep* r = rep_.get();
  1699. Status status;
  1700. Compressor* compressor = nullptr;
  1701. Decompressor* verify_decomp = nullptr;
  1702. if (is_data_block) {
  1703. compressor = r->data_block_compressor;
  1704. verify_decomp = r->data_block_verify_decompressor.get();
  1705. } else {
  1706. compressor = r->basic_compressor.get();
  1707. verify_decomp = r->verify_decompressor.get();
  1708. }
  1709. compressed_output->Reset();
  1710. CompressionType type = kNoCompression;
  1711. if (LIKELY(uncompressed_block_data.size() < kCompressionSizeLimit)) {
  1712. if (compressor) {
  1713. StopWatchNano timer(
  1714. r->ioptions.clock,
  1715. ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
  1716. size_t max_compressed_size = static_cast<size_t>(
  1717. (static_cast<uint64_t>(r->max_compressed_bytes_per_kb) *
  1718. uncompressed_block_data.size()) >>
  1719. 10);
  1720. compressed_output->ResetForSize(max_compressed_size);
  1721. status = compressor->CompressBlock(
  1722. uncompressed_block_data, compressed_output->data(),
  1723. &compressed_output->MutableSize(), &type, &working_area.compress);
  1724. // Post-condition of Compressor::CompressBlock
  1725. assert(type == kNoCompression || status.ok());
  1726. assert(type == kNoCompression ||
  1727. r->table_options.verify_compression == (verify_decomp != nullptr));
  1728. // Some of the compression algorithms are known to be unreliable. If
  1729. // the verify_compression flag is set then try to de-compress the
  1730. // compressed data and compare to the input.
  1731. if (verify_decomp && type != kNoCompression) {
  1732. BlockContents contents;
  1733. Status uncompress_status = DecompressBlockData(
  1734. compressed_output->data(), compressed_output->size(), type,
  1735. *verify_decomp, &contents, r->ioptions,
  1736. /*allocator=*/nullptr, &working_area.verify);
  1737. if (LIKELY(uncompress_status.ok())) {
  1738. bool data_match = contents.data.compare(uncompressed_block_data) == 0;
  1739. if (!data_match) {
  1740. // The result of the compression was invalid. abort.
  1741. const char* const msg =
  1742. "Decompressed block did not match pre-compression block";
  1743. ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
  1744. status = Status::Corruption(msg);
  1745. type = kNoCompression;
  1746. }
  1747. } else {
  1748. // Decompression reported an error. abort.
  1749. status = Status::Corruption(std::string("Could not decompress: ") +
  1750. uncompress_status.getState());
  1751. type = kNoCompression;
  1752. }
  1753. }
  1754. if (timer.IsStarted()) {
  1755. RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
  1756. timer.ElapsedNanos());
  1757. }
  1758. }
  1759. if (is_data_block) {
  1760. r->compressible_input_data_bytes.FetchAddRelaxed(
  1761. uncompressed_block_data.size());
  1762. r->uncompressible_input_data_bytes.FetchAddRelaxed(kBlockTrailerSize);
  1763. }
  1764. } else {
  1765. // Status is not OK, or block is too big to be compressed.
  1766. if (is_data_block) {
  1767. r->uncompressible_input_data_bytes.FetchAddRelaxed(
  1768. uncompressed_block_data.size() + kBlockTrailerSize);
  1769. }
  1770. }
  1771. // Abort compression if the block is too big, or did not pass
  1772. // verification.
  1773. if (type == kNoCompression) {
  1774. bool compression_attempted = !compressed_output->empty();
  1775. RecordTick(r->ioptions.stats, compression_attempted
  1776. ? NUMBER_BLOCK_COMPRESSION_REJECTED
  1777. : NUMBER_BLOCK_COMPRESSION_BYPASSED);
  1778. RecordTick(r->ioptions.stats,
  1779. compression_attempted ? BYTES_COMPRESSION_REJECTED
  1780. : BYTES_COMPRESSION_BYPASSED,
  1781. uncompressed_block_data.size());
  1782. } else {
  1783. RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
  1784. RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
  1785. uncompressed_block_data.size());
  1786. RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
  1787. compressed_output->size());
  1788. if (r->IsParallelCompressionActive() && is_data_block) {
  1789. r->pc_rep->estimated_inflight_size.FetchSubRelaxed(
  1790. uncompressed_block_data.size() - compressed_output->size());
  1791. }
  1792. }
  1793. *result_compression_type = type;
  1794. return status;
  1795. }
  1796. void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
  1797. const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
  1798. BlockType block_type, const Slice* uncompressed_block_data,
  1799. bool* skip_delta_encoding) {
  1800. rep_->SetIOStatus(WriteMaybeCompressedBlockImpl(
  1801. block_contents, comp_type, handle, block_type, uncompressed_block_data,
  1802. skip_delta_encoding));
  1803. }
  1804. IOStatus BlockBasedTableBuilder::WriteMaybeCompressedBlockImpl(
  1805. const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
  1806. BlockType block_type, const Slice* uncompressed_block_data,
  1807. bool* skip_delta_encoding) {
  1808. // File format contains a sequence of blocks where each block has:
  1809. // block_data: uint8[n]
  1810. // compression_type: uint8
  1811. // checksum: uint32
  1812. Rep* r = rep_.get();
  1813. bool is_data_block = block_type == BlockType::kData;
  1814. // For data block, skip_delta_encoding must be non null
  1815. if (is_data_block) {
  1816. assert(skip_delta_encoding != nullptr);
  1817. }
  1818. if (skip_delta_encoding != nullptr) {
  1819. *skip_delta_encoding = false;
  1820. }
  1821. IOOptions io_options;
  1822. // Always return io_s for NRVO
  1823. IOStatus io_s =
  1824. WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
  1825. if (UNLIKELY(!io_s.ok())) {
  1826. return io_s;
  1827. }
  1828. // Old, misleading name of this function: WriteRawBlock
  1829. StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
  1830. auto offset = r->get_offset();
  1831. // try to align the data block page to the super alignment size, if enabled
  1832. if ((r->table_options.super_block_alignment_size != 0) && is_data_block) {
  1833. auto super_block_alignment_mask =
  1834. r->table_options.super_block_alignment_size - 1;
  1835. if ((r->table_options.super_block_alignment_space_overhead_ratio != 0) &&
  1836. (offset & (~super_block_alignment_mask)) !=
  1837. ((offset + block_contents.size()) &
  1838. (~super_block_alignment_mask))) {
  1839. auto allowed_max_padding_size =
  1840. r->table_options.super_block_alignment_size /
  1841. r->table_options.super_block_alignment_space_overhead_ratio;
  1842. // new block would cross the super block boundary
  1843. auto pad_bytes = r->table_options.super_block_alignment_size -
  1844. (offset & super_block_alignment_mask);
  1845. if (pad_bytes < allowed_max_padding_size) {
  1846. io_s = r->file->Pad(io_options, pad_bytes, allowed_max_padding_size);
  1847. if (UNLIKELY(!io_s.ok())) {
  1848. r->SetIOStatus(io_s);
  1849. return io_s;
  1850. }
  1851. r->pre_compression_size += pad_bytes;
  1852. offset += pad_bytes;
  1853. r->set_offset(offset);
  1854. if (skip_delta_encoding != nullptr) {
  1855. // Skip delta encoding in index block builder when a super block
  1856. // alignment padding is added for data block.
  1857. *skip_delta_encoding = true;
  1858. }
  1859. TEST_SYNC_POINT(
  1860. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
  1861. "SuperBlockAlignment");
  1862. } else {
  1863. TEST_SYNC_POINT(
  1864. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
  1865. "SuperBlockAlignmentPaddingBytesExceedLimit");
  1866. }
  1867. }
  1868. }
  1869. handle->set_offset(offset);
  1870. handle->set_size(block_contents.size());
  1871. assert(status().ok());
  1872. assert(io_status().ok());
  1873. if (uncompressed_block_data == nullptr) {
  1874. uncompressed_block_data = &block_contents;
  1875. assert(comp_type == kNoCompression);
  1876. }
  1877. // TODO: consider a variant of this function that puts the trailer after
  1878. // block_contents (if it comes from a std::string) so we only need one
  1879. // r->file->Append call
  1880. {
  1881. io_s = r->file->Append(io_options, block_contents);
  1882. if (UNLIKELY(!io_s.ok())) {
  1883. return io_s;
  1884. }
  1885. }
  1886. r->compression_types_used.Add(comp_type);
  1887. std::array<char, kBlockTrailerSize> trailer;
  1888. trailer[0] = comp_type;
  1889. uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
  1890. r->table_options.checksum, block_contents.data(), block_contents.size(),
  1891. /*last_byte*/ comp_type);
  1892. checksum += ChecksumModifierForContext(r->base_context_checksum, offset);
  1893. if (block_type == BlockType::kFilter) {
  1894. io_s = status_to_io_status(
  1895. r->filter_builder->MaybePostVerifyFilter(block_contents));
  1896. if (UNLIKELY(!io_s.ok())) {
  1897. return io_s;
  1898. }
  1899. }
  1900. EncodeFixed32(trailer.data() + 1, checksum);
  1901. TEST_SYNC_POINT_CALLBACK(
  1902. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
  1903. trailer.data());
  1904. {
  1905. io_s = r->file->Append(io_options, Slice(trailer.data(), trailer.size()));
  1906. if UNLIKELY (!io_s.ok()) {
  1907. return io_s;
  1908. }
  1909. }
  1910. if (r->warm_cache) {
  1911. io_s = status_to_io_status(
  1912. InsertBlockInCacheHelper(*uncompressed_block_data, handle, block_type));
  1913. if (UNLIKELY(!io_s.ok())) {
  1914. return io_s;
  1915. }
  1916. }
  1917. r->pre_compression_size +=
  1918. uncompressed_block_data->size() + kBlockTrailerSize;
  1919. r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
  1920. if (r->table_options.block_align && is_data_block) {
  1921. size_t pad_bytes =
  1922. (r->alignment -
  1923. ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
  1924. (r->alignment - 1);
  1925. io_s = r->file->Pad(io_options, pad_bytes, kDefaultPageSize);
  1926. if (LIKELY(io_s.ok())) {
  1927. r->pre_compression_size += pad_bytes;
  1928. r->set_offset(r->get_offset() + pad_bytes);
  1929. } else {
  1930. return io_s;
  1931. }
  1932. }
  1933. if (r->IsParallelCompressionActive() && is_data_block) {
  1934. r->pc_rep->estimated_inflight_size.FetchSubRelaxed(block_contents.size() +
  1935. kBlockTrailerSize);
  1936. }
  1937. return io_s;
  1938. }
  1939. void BlockBasedTableBuilder::MaybeStartParallelCompression() {
  1940. if (rep_->compression_parallel_threads <= 1) {
  1941. return;
  1942. }
  1943. // Although in theory having a separate thread for writing to the SST file
  1944. // could help to hide the latency associated with writing, it is more often
  1945. // the case that the latency comes in large units for rare calls to write that
  1946. // flush downstream buffers, including in WritableFileWriter. The buffering
  1947. // provided by the compression ring buffer is almost negligible for hiding
  1948. // that latency. So even with some optimizations, turning on the parallel
  1949. // framework when compression is disabled just eats more CPU with little-to-no
  1950. // improvement in throughput.
  1951. if (rep_->data_block_compressor == nullptr) {
  1952. // Force the generally best configuration for no compression: no parallelism
  1953. return;
  1954. }
  1955. rep_->pc_rep = std::make_unique<ParallelCompressionRep>(
  1956. rep_->compression_parallel_threads);
  1957. auto& pc_rep = *rep_->pc_rep;
  1958. for (uint32_t i = 0; i <= pc_rep.ring_buffer_mask; i++) {
  1959. pc_rep.ring_buffer[i].prepared_index_entry =
  1960. rep_->index_builder->CreatePreparedIndexEntry();
  1961. }
  1962. pc_rep.worker_threads.reserve(pc_rep.num_worker_threads);
  1963. pc_rep.working_areas.resize(pc_rep.num_worker_threads);
  1964. for (uint32_t i = 0; i < pc_rep.num_worker_threads; i++) {
  1965. auto& wa = pc_rep.working_areas[i];
  1966. if (rep_->data_block_compressor) {
  1967. wa.compress = rep_->data_block_compressor->ObtainWorkingArea();
  1968. }
  1969. if (rep_->data_block_verify_decompressor) {
  1970. wa.verify = rep_->data_block_verify_decompressor->ObtainWorkingArea(
  1971. rep_->data_block_compressor->GetPreferredCompressionType());
  1972. }
  1973. pc_rep.worker_threads.emplace_back([this, &wa] { BGWorker(wa); });
  1974. }
  1975. #ifdef BBTB_PC_WATCHDOG
  1976. // Start watchdog thread
  1977. pc_rep.watchdog_thread = std::thread([&pc_rep] { pc_rep.BGWatchdog(); });
  1978. pc_rep.live_emit.StoreRelaxed(true);
  1979. #endif // BBTB_PC_WATCHDOG
  1980. }
  1981. void BlockBasedTableBuilder::StopParallelCompression(bool abort) {
  1982. auto& pc_rep = *rep_->pc_rep;
  1983. if (abort) {
  1984. pc_rep.SetAbort(pc_rep.emit_thread_state);
  1985. } else if (pc_rep.emit_thread_state !=
  1986. ParallelCompressionRep::ThreadState::kEnd) {
  1987. // In case we didn't do a final flush with no next key
  1988. assert(rep_->props.num_data_blocks == 0);
  1989. pc_rep.SetNoMoreToEmit(pc_rep.emit_thread_state, pc_rep.emit_slot);
  1990. }
  1991. #ifdef BBTB_PC_WATCHDOG
  1992. pc_rep.live_emit.StoreRelaxed(false);
  1993. #endif // BBTB_PC_WATCHDOG
  1994. assert(pc_rep.emit_thread_state == ParallelCompressionRep::ThreadState::kEnd);
  1995. for (auto& thread : pc_rep.worker_threads) {
  1996. thread.join();
  1997. }
  1998. #ifdef BBTB_PC_WATCHDOG
  1999. // Wake & shutdown watchdog thread
  2000. {
  2001. std::unique_lock<std::mutex> lock(pc_rep.watchdog_mutex);
  2002. pc_rep.shutdown_watchdog = true;
  2003. pc_rep.watchdog_cv.notify_all();
  2004. }
  2005. pc_rep.watchdog_thread.join();
  2006. #endif // BBTB_PC_WATCHDOG
  2007. rep_->pc_rep.reset();
  2008. }
  2009. Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
  2010. IOStatus BlockBasedTableBuilder::io_status() const {
  2011. return rep_->GetIOStatus();
  2012. }
  2013. bool BlockBasedTableBuilder::ok() const { return rep_->StatusOk(); }
  2014. Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
  2015. const Slice& block_contents, const BlockHandle* handle,
  2016. BlockType block_type) {
  2017. Cache* block_cache = rep_->table_options.block_cache.get();
  2018. Status s;
  2019. auto helper =
  2020. GetCacheItemHelper(block_type, rep_->ioptions.lowest_used_cache_tier);
  2021. if (block_cache && helper && helper->create_cb) {
  2022. CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
  2023. size_t charge;
  2024. // NOTE: data blocks (and everything else) will be warmed in decompressed
  2025. // state, so does not need a dictionary-aware decompressor. The only thing
  2026. // needing a decompressor here (in create_context) is warming the
  2027. // (de)compression dictionary, which will clone and save a dict-based
  2028. // decompressor from the corresponding non-dict decompressor.
  2029. s = WarmInCache(block_cache, key.AsSlice(), block_contents,
  2030. &rep_->create_context, helper, Cache::Priority::LOW,
  2031. &charge);
  2032. if (LIKELY(s.ok())) {
  2033. BlockBasedTable::UpdateCacheInsertionMetrics(
  2034. block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
  2035. rep_->ioptions.stats);
  2036. } else {
  2037. RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
  2038. }
  2039. }
  2040. return s;
  2041. }
  2042. void BlockBasedTableBuilder::WriteFilterBlock(
  2043. MetaIndexBuilder* meta_index_builder) {
  2044. if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
  2045. // No filter block needed
  2046. return;
  2047. }
  2048. if (!rep_->last_ikey.empty()) {
  2049. // We might have been using AddWithPrevKey, so need PrevKeyBeforeFinish
  2050. // to be safe. And because we are re-synchronized after buffered/parallel
  2051. // operation, rep_->last_ikey is accurate.
  2052. rep_->filter_builder->PrevKeyBeforeFinish(
  2053. ExtractUserKeyAndStripTimestamp(rep_->last_ikey, rep_->ts_sz));
  2054. }
  2055. BlockHandle filter_block_handle;
  2056. bool is_partitioned_filter = rep_->table_options.partition_filters;
  2057. if (LIKELY(ok())) {
  2058. rep_->props.num_filter_entries +=
  2059. rep_->filter_builder->EstimateEntriesAdded();
  2060. Status s = Status::Incomplete();
  2061. while (LIKELY(ok()) && s.IsIncomplete()) {
  2062. // filter_data is used to store the transferred filter data payload from
  2063. // FilterBlockBuilder and deallocate the payload by going out of scope.
  2064. // Otherwise, the payload will unnecessarily remain until
  2065. // BlockBasedTableBuilder is deallocated.
  2066. //
  2067. // See FilterBlockBuilder::Finish() for more on the difference in
  2068. // transferred filter data payload among different FilterBlockBuilder
  2069. // subtypes.
  2070. std::unique_ptr<const char[]> filter_owner;
  2071. Slice filter_content;
  2072. s = rep_->filter_builder->Finish(filter_block_handle, &filter_content,
  2073. &filter_owner);
  2074. assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
  2075. if (s.IsCorruption()) {
  2076. rep_->SetStatus(s);
  2077. break;
  2078. }
  2079. rep_->props.filter_size += filter_content.size();
  2080. BlockType btype = is_partitioned_filter && /* last */ s.ok()
  2081. ? BlockType::kFilterPartitionIndex
  2082. : BlockType::kFilter;
  2083. WriteMaybeCompressedBlock(filter_content, kNoCompression,
  2084. &filter_block_handle, btype);
  2085. }
  2086. rep_->filter_builder->ResetFilterBitsBuilder();
  2087. }
  2088. if (LIKELY(ok())) {
  2089. // Add mapping from "<filter_block_prefix>.Name" to location
  2090. // of filter data.
  2091. std::string key;
  2092. key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
  2093. : BlockBasedTable::kFullFilterBlockPrefix;
  2094. key.append(rep_->table_options.filter_policy->CompatibilityName());
  2095. meta_index_builder->Add(key, filter_block_handle);
  2096. }
  2097. }
  2098. void BlockBasedTableBuilder::WriteIndexBlock(
  2099. MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
  2100. if (UNLIKELY(!ok())) {
  2101. return;
  2102. }
  2103. IndexBuilder::IndexBlocks index_blocks;
  2104. auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
  2105. if (LIKELY(ok()) && !index_builder_status.ok() &&
  2106. !index_builder_status.IsIncomplete()) {
  2107. // If the index builder failed for non-Incomplete errors, we should
  2108. // mark the entire builder as having failed wit that status. However,
  2109. // If the index builder failed with an incomplete error, we should
  2110. // continue writing out any meta blocks that may have been generated.
  2111. rep_->SetStatus(index_builder_status);
  2112. }
  2113. if (LIKELY(ok())) {
  2114. for (const auto& item : index_blocks.meta_blocks) {
  2115. BlockHandle block_handle;
  2116. if (item.second.first == BlockType::kIndex) {
  2117. WriteBlock(item.second.second, &block_handle, item.second.first);
  2118. } else {
  2119. assert(item.second.first == BlockType::kUserDefinedIndex);
  2120. WriteMaybeCompressedBlock(item.second.second, kNoCompression,
  2121. &block_handle, item.second.first);
  2122. }
  2123. if (UNLIKELY(!ok())) {
  2124. break;
  2125. }
  2126. meta_index_builder->Add(item.first, block_handle);
  2127. }
  2128. }
  2129. if (LIKELY(ok())) {
  2130. if (rep_->table_options.enable_index_compression) {
  2131. WriteBlock(index_blocks.index_block_contents, index_block_handle,
  2132. BlockType::kIndex);
  2133. } else {
  2134. WriteMaybeCompressedBlock(index_blocks.index_block_contents,
  2135. kNoCompression, index_block_handle,
  2136. BlockType::kIndex);
  2137. }
  2138. }
  2139. // If there are more index partitions, finish them and write them out
  2140. if (index_builder_status.IsIncomplete()) {
  2141. bool index_building_finished = false;
  2142. while (LIKELY(ok()) && !index_building_finished) {
  2143. Status s =
  2144. rep_->index_builder->Finish(&index_blocks, *index_block_handle);
  2145. if (s.ok()) {
  2146. index_building_finished = true;
  2147. } else if (s.IsIncomplete()) {
  2148. // More partitioned index after this one
  2149. assert(!index_building_finished);
  2150. } else {
  2151. // Error
  2152. rep_->SetStatus(s);
  2153. return;
  2154. }
  2155. if (rep_->table_options.enable_index_compression) {
  2156. WriteBlock(index_blocks.index_block_contents, index_block_handle,
  2157. BlockType::kIndex);
  2158. } else {
  2159. WriteMaybeCompressedBlock(index_blocks.index_block_contents,
  2160. kNoCompression, index_block_handle,
  2161. BlockType::kIndex);
  2162. }
  2163. // The last index_block_handle will be for the partition index block
  2164. }
  2165. }
  2166. // If success and need to record in metaindex rather than footer...
  2167. if (LIKELY(ok()) && !FormatVersionUsesIndexHandleInFooter(
  2168. rep_->table_options.format_version)) {
  2169. meta_index_builder->Add(kIndexBlockName, *index_block_handle);
  2170. }
  2171. }
  2172. void BlockBasedTableBuilder::WritePropertiesBlock(
  2173. MetaIndexBuilder* meta_index_builder) {
  2174. BlockHandle properties_block_handle;
  2175. if (LIKELY(ok())) {
  2176. PropertyBlockBuilder property_block_builder;
  2177. rep_->props.filter_policy_name =
  2178. rep_->table_options.filter_policy != nullptr
  2179. ? rep_->table_options.filter_policy->Name()
  2180. : "";
  2181. rep_->props.index_size =
  2182. rep_->index_builder->IndexSize() + kBlockTrailerSize;
  2183. rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
  2184. ? rep_->ioptions.user_comparator->Name()
  2185. : "nullptr";
  2186. rep_->props.merge_operator_name =
  2187. rep_->ioptions.merge_operator != nullptr
  2188. ? rep_->ioptions.merge_operator->Name()
  2189. : "nullptr";
  2190. rep_->props.prefix_extractor_name =
  2191. rep_->prefix_extractor ? rep_->prefix_extractor->AsString() : "nullptr";
  2192. std::string property_collectors_names = "[";
  2193. for (size_t i = 0;
  2194. i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
  2195. if (i != 0) {
  2196. property_collectors_names += ",";
  2197. }
  2198. property_collectors_names +=
  2199. rep_->ioptions.table_properties_collector_factories[i]->Name();
  2200. }
  2201. property_collectors_names += "]";
  2202. rep_->props.property_collectors_names = property_collectors_names;
  2203. rep_->PostPopulateCompressionProperties();
  2204. if (rep_->table_options.index_type ==
  2205. BlockBasedTableOptions::kTwoLevelIndexSearch) {
  2206. assert(rep_->p_index_builder_ != nullptr);
  2207. rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
  2208. rep_->props.top_level_index_size =
  2209. rep_->p_index_builder_->TopLevelIndexSize(rep_->offset.LoadRelaxed());
  2210. }
  2211. rep_->props.index_key_is_user_key =
  2212. !rep_->index_builder->separator_is_key_plus_seq();
  2213. rep_->props.index_value_is_delta_encoded =
  2214. rep_->use_delta_encoding_for_index_values;
  2215. if (rep_->sampled_input_data_bytes.LoadRelaxed() > 0) {
  2216. rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
  2217. static_cast<double>(
  2218. rep_->sampled_output_slow_data_bytes.LoadRelaxed()) /
  2219. rep_->sampled_input_data_bytes.LoadRelaxed() *
  2220. rep_->compressible_input_data_bytes.LoadRelaxed() +
  2221. rep_->uncompressible_input_data_bytes.LoadRelaxed() + 0.5);
  2222. rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
  2223. static_cast<double>(
  2224. rep_->sampled_output_fast_data_bytes.LoadRelaxed()) /
  2225. rep_->sampled_input_data_bytes.LoadRelaxed() *
  2226. rep_->compressible_input_data_bytes.LoadRelaxed() +
  2227. rep_->uncompressible_input_data_bytes.LoadRelaxed() + 0.5);
  2228. } else if (rep_->sample_for_compression > 0) {
  2229. // We tried to sample but none were found. Assume worst-case
  2230. // (compression ratio 1.0) so data is complete and aggregatable.
  2231. rep_->props.slow_compression_estimated_data_size =
  2232. rep_->compressible_input_data_bytes.LoadRelaxed() +
  2233. rep_->uncompressible_input_data_bytes.LoadRelaxed();
  2234. rep_->props.fast_compression_estimated_data_size =
  2235. rep_->compressible_input_data_bytes.LoadRelaxed() +
  2236. rep_->uncompressible_input_data_bytes.LoadRelaxed();
  2237. }
  2238. rep_->props.user_defined_timestamps_persisted =
  2239. rep_->persist_user_defined_timestamps;
  2240. assert(IsEmpty() || rep_->props.key_largest_seqno != UINT64_MAX);
  2241. // Add basic properties
  2242. property_block_builder.AddTableProperty(rep_->props);
  2243. // Add use collected properties
  2244. NotifyCollectTableCollectorsOnFinish(
  2245. rep_->table_properties_collectors, rep_->ioptions.logger,
  2246. &property_block_builder, rep_->props.user_collected_properties,
  2247. rep_->props.readable_properties);
  2248. Slice block_data = property_block_builder.Finish();
  2249. TEST_SYNC_POINT_CALLBACK(
  2250. "BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
  2251. WriteMaybeCompressedBlock(block_data, kNoCompression,
  2252. &properties_block_handle, BlockType::kProperties);
  2253. }
  2254. if (LIKELY(ok())) {
  2255. #ifndef NDEBUG
  2256. {
  2257. uint64_t props_block_offset = properties_block_handle.offset();
  2258. uint64_t props_block_size = properties_block_handle.size();
  2259. TEST_SYNC_POINT_CALLBACK(
  2260. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
  2261. &props_block_offset);
  2262. TEST_SYNC_POINT_CALLBACK(
  2263. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
  2264. &props_block_size);
  2265. }
  2266. #endif // !NDEBUG
  2267. const std::string* properties_block_meta = &kPropertiesBlockName;
  2268. TEST_SYNC_POINT_CALLBACK(
  2269. "BlockBasedTableBuilder::WritePropertiesBlock:Meta",
  2270. &properties_block_meta);
  2271. meta_index_builder->Add(*properties_block_meta, properties_block_handle);
  2272. }
  2273. }
  2274. void BlockBasedTableBuilder::WriteCompressionDictBlock(
  2275. MetaIndexBuilder* meta_index_builder) {
  2276. Slice compression_dict;
  2277. if (rep_->compressor_with_dict) {
  2278. compression_dict = rep_->compressor_with_dict->GetSerializedDict();
  2279. }
  2280. if (!compression_dict.empty()) {
  2281. BlockHandle compression_dict_block_handle;
  2282. if (LIKELY(ok())) {
  2283. WriteMaybeCompressedBlock(compression_dict, kNoCompression,
  2284. &compression_dict_block_handle,
  2285. BlockType::kCompressionDictionary);
  2286. TEST_SYNC_POINT_CALLBACK(
  2287. "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
  2288. &compression_dict);
  2289. }
  2290. if (LIKELY(ok())) {
  2291. meta_index_builder->Add(kCompressionDictBlockName,
  2292. compression_dict_block_handle);
  2293. }
  2294. }
  2295. }
  2296. void BlockBasedTableBuilder::WriteRangeDelBlock(
  2297. MetaIndexBuilder* meta_index_builder) {
  2298. if (LIKELY(ok()) && !rep_->range_del_block.empty()) {
  2299. BlockHandle range_del_block_handle;
  2300. WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
  2301. &range_del_block_handle,
  2302. BlockType::kRangeDeletion);
  2303. meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
  2304. }
  2305. }
  2306. void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
  2307. BlockHandle& index_block_handle) {
  2308. assert(LIKELY(ok()));
  2309. Rep* r = rep_.get();
  2310. // this is guaranteed by BlockBasedTableBuilder's constructor
  2311. assert(r->table_options.checksum == kCRC32c ||
  2312. r->table_options.format_version != 0);
  2313. FooterBuilder footer;
  2314. Status s = footer.Build(kBlockBasedTableMagicNumber,
  2315. r->table_options.format_version, r->get_offset(),
  2316. r->table_options.checksum, metaindex_block_handle,
  2317. index_block_handle, r->base_context_checksum);
  2318. if (!s.ok()) {
  2319. r->SetStatus(s);
  2320. return;
  2321. }
  2322. IOOptions io_options;
  2323. IOStatus ios =
  2324. WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
  2325. if (!ios.ok()) {
  2326. r->SetIOStatus(ios);
  2327. return;
  2328. }
  2329. ios = r->file->Append(io_options, footer.GetSlice());
  2330. if (ios.ok()) {
  2331. r->pre_compression_size += footer.GetSlice().size();
  2332. r->set_offset(r->get_offset() + footer.GetSlice().size());
  2333. } else {
  2334. r->SetIOStatus(ios);
  2335. }
  2336. }
  2337. void BlockBasedTableBuilder::MaybeEnterUnbuffered(
  2338. const Slice* first_key_in_next_block) {
  2339. Rep* r = rep_.get();
  2340. assert(r->state == Rep::State::kBuffered);
  2341. // Don't yet enter unbuffered (early return) if none of the conditions are
  2342. // met
  2343. if (first_key_in_next_block != nullptr) {
  2344. bool exceeds_buffer_limit =
  2345. (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
  2346. if (!exceeds_buffer_limit) {
  2347. bool exceeds_global_block_cache_limit = false;
  2348. // Increase cache charging for the last buffered data block
  2349. // only if the block is not going to be unbuffered immediately
  2350. // and there exists a cache reservation manager
  2351. if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
  2352. Status s =
  2353. r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
  2354. r->data_begin_offset);
  2355. exceeds_global_block_cache_limit = s.IsMemoryLimit();
  2356. }
  2357. if (!exceeds_global_block_cache_limit) {
  2358. return;
  2359. }
  2360. }
  2361. }
  2362. // Enter Unbuffered state
  2363. r->state = Rep::State::kUnbuffered;
  2364. const size_t kNumBlocksBuffered = r->data_block_buffers.size();
  2365. if (kNumBlocksBuffered == 0) {
  2366. // The below code is neither safe nor necessary for handling zero data
  2367. // blocks.
  2368. // For PostPopulateCompressionProperties()
  2369. r->data_block_compressor = r->basic_compressor.get();
  2370. return;
  2371. }
  2372. // Abstract algebra teaches us that a finite cyclic group (such as the
  2373. // additive group of integers modulo N) can be generated by a number that is
  2374. // coprime with N. Since N is variable (number of buffered data blocks), we
  2375. // must then pick a prime number in order to guarantee coprimeness with any
  2376. // N.
  2377. //
  2378. // One downside of this approach is the spread will be poor when
  2379. // `kPrimeGeneratorRemainder` is close to zero or close to
  2380. // `kNumBlocksBuffered`.
  2381. //
  2382. // Picked a random number between one and one trillion and then chose the
  2383. // next prime number greater than or equal to it.
  2384. const uint64_t kPrimeGenerator = 545055921143ull;
  2385. // Can avoid repeated division by just adding the remainder repeatedly.
  2386. const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
  2387. kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
  2388. const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
  2389. Compressor::DictSampleArgs samples;
  2390. size_t buffer_idx = kInitSampleIdx;
  2391. for (size_t i = 0; i < kNumBlocksBuffered &&
  2392. samples.sample_data.size() < r->max_dict_sample_bytes;
  2393. ++i) {
  2394. size_t copy_len =
  2395. std::min(r->max_dict_sample_bytes - samples.sample_data.size(),
  2396. r->data_block_buffers[buffer_idx].size());
  2397. samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
  2398. samples.sample_lens.emplace_back(copy_len);
  2399. buffer_idx += kPrimeGeneratorRemainder;
  2400. if (buffer_idx >= kNumBlocksBuffered) {
  2401. buffer_idx -= kNumBlocksBuffered;
  2402. }
  2403. }
  2404. assert(samples.sample_data.size() > 0);
  2405. // final sample data block flushed, now we can generate dictionary
  2406. r->compressor_with_dict = r->basic_compressor->MaybeCloneSpecialized(
  2407. CacheEntryRole::kDataBlock, std::move(samples));
  2408. // The compressor might opt not to use a dictionary, in which case we
  2409. // can use the same compressor as for e.g. index blocks.
  2410. r->data_block_compressor = r->compressor_with_dict
  2411. ? r->compressor_with_dict.get()
  2412. : r->basic_compressor.get();
  2413. Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
  2414. if (r->verify_decompressor) {
  2415. if (serialized_dict.empty()) {
  2416. // No dictionary
  2417. r->data_block_verify_decompressor = r->verify_decompressor.get();
  2418. } else {
  2419. // Get an updated dictionary-aware decompressor for verification.
  2420. Status s = r->verify_decompressor->MaybeCloneForDict(
  2421. serialized_dict, &r->verify_decompressor_with_dict);
  2422. // Dictionary support must be present on the decompressor side if it's
  2423. // on the compressor side.
  2424. assert(r->verify_decompressor_with_dict);
  2425. if (r->verify_decompressor_with_dict) {
  2426. r->data_block_verify_decompressor =
  2427. r->verify_decompressor_with_dict.get();
  2428. assert(s.ok());
  2429. } else {
  2430. assert(!s.ok());
  2431. r->SetStatus(s);
  2432. }
  2433. }
  2434. }
  2435. auto get_iterator_for_block = [&r](size_t i) {
  2436. auto& data_block = r->data_block_buffers[i];
  2437. assert(!data_block.empty());
  2438. Block reader{BlockContents{data_block}};
  2439. DataBlockIter* iter = reader.NewDataIterator(
  2440. r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
  2441. nullptr /* iter */, nullptr /* stats */,
  2442. false /* block_contents_pinned */, r->persist_user_defined_timestamps);
  2443. iter->SeekToFirst();
  2444. assert(iter->Valid());
  2445. return std::unique_ptr<DataBlockIter>(iter);
  2446. };
  2447. std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
  2448. for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
  2449. if (iter == nullptr) {
  2450. iter = get_iterator_for_block(i);
  2451. assert(iter != nullptr);
  2452. };
  2453. for (; iter->Valid(); iter->Next()) {
  2454. Slice key = iter->key();
  2455. if (r->filter_builder != nullptr) {
  2456. // NOTE: AddWithPrevKey here would only save key copying if prev is
  2457. // pinned (iter->IsKeyPinned()), which is probably rare with delta
  2458. // encoding. OK to go from Add() here to AddWithPrevKey() in
  2459. // unbuffered operation.
  2460. r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
  2461. }
  2462. r->index_builder->OnKeyAdded(key, iter->value());
  2463. }
  2464. Slice first_key_in_loop_next_block;
  2465. const Slice* first_key_in_loop_next_block_ptr;
  2466. if (i + 1 < r->data_block_buffers.size()) {
  2467. next_block_iter = get_iterator_for_block(i + 1);
  2468. first_key_in_loop_next_block = next_block_iter->key();
  2469. first_key_in_loop_next_block_ptr = &first_key_in_loop_next_block;
  2470. } else {
  2471. first_key_in_loop_next_block_ptr = first_key_in_next_block;
  2472. }
  2473. auto& data_block = r->data_block_buffers[i];
  2474. iter->SeekToLast();
  2475. assert(iter->Valid());
  2476. if (r->IsParallelCompressionActive()) {
  2477. EmitBlockForParallel(data_block, iter->key(),
  2478. first_key_in_loop_next_block_ptr);
  2479. } else {
  2480. EmitBlock(data_block, iter->key(), first_key_in_loop_next_block_ptr);
  2481. }
  2482. std::swap(iter, next_block_iter);
  2483. }
  2484. r->data_block_buffers.clear();
  2485. r->data_begin_offset = 0;
  2486. // Release all reserved cache for data block buffers
  2487. if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
  2488. Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
  2489. r->data_begin_offset);
  2490. s.PermitUncheckedError();
  2491. }
  2492. }
  2493. Status BlockBasedTableBuilder::Finish() {
  2494. Rep* r = rep_.get();
  2495. assert(r->state != Rep::State::kClosed);
  2496. // To make sure properties block is able to keep the accurate size of index
  2497. // block, we will finish writing all index entries first, in Flush().
  2498. Flush(/*first_key_in_next_block=*/nullptr);
  2499. if (rep_->state == Rep::State::kBuffered) {
  2500. MaybeEnterUnbuffered(nullptr);
  2501. }
  2502. assert(r->state == Rep::State::kUnbuffered);
  2503. if (r->IsParallelCompressionActive()) {
  2504. StopParallelCompression(/*abort=*/false);
  2505. }
  2506. r->props.tail_start_offset = r->offset.LoadRelaxed();
  2507. // Write meta blocks, metaindex block and footer in the following order.
  2508. // 1. [meta block: filter]
  2509. // 2. [meta block: index]
  2510. // 3. [meta block: compression dictionary]
  2511. // 4. [meta block: range deletion tombstone]
  2512. // 5. [meta block: properties]
  2513. // 6. [metaindex block]
  2514. // 7. Footer
  2515. BlockHandle metaindex_block_handle, index_block_handle;
  2516. MetaIndexBuilder meta_index_builder;
  2517. WriteFilterBlock(&meta_index_builder);
  2518. WriteIndexBlock(&meta_index_builder, &index_block_handle);
  2519. WriteCompressionDictBlock(&meta_index_builder);
  2520. WriteRangeDelBlock(&meta_index_builder);
  2521. WritePropertiesBlock(&meta_index_builder);
  2522. if (LIKELY(ok())) {
  2523. // flush the meta index block
  2524. WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
  2525. &metaindex_block_handle, BlockType::kMetaIndex);
  2526. }
  2527. if (LIKELY(ok())) {
  2528. WriteFooter(metaindex_block_handle, index_block_handle);
  2529. }
  2530. r->state = Rep::State::kClosed;
  2531. r->tail_size = r->offset.LoadRelaxed() - r->props.tail_start_offset;
  2532. return r->GetStatus();
  2533. }
  2534. void BlockBasedTableBuilder::Abandon() {
  2535. assert(rep_->state != Rep::State::kClosed);
  2536. if (rep_->IsParallelCompressionActive()) {
  2537. StopParallelCompression(/*abort=*/true);
  2538. }
  2539. rep_->state = Rep::State::kClosed;
  2540. rep_->GetIOStatus().PermitUncheckedError();
  2541. }
  2542. uint64_t BlockBasedTableBuilder::NumEntries() const {
  2543. return rep_->props.num_entries;
  2544. }
  2545. bool BlockBasedTableBuilder::IsEmpty() const {
  2546. return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
  2547. }
  2548. uint64_t BlockBasedTableBuilder::PreCompressionSize() const {
  2549. return rep_->pre_compression_size;
  2550. }
  2551. uint64_t BlockBasedTableBuilder::FileSize() const {
  2552. return rep_->offset.LoadRelaxed();
  2553. }
  2554. uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
  2555. if (rep_->IsParallelCompressionActive()) {
  2556. // Use upper bound on "inflight" data size to estimate
  2557. return FileSize() + rep_->pc_rep->estimated_inflight_size.LoadRelaxed();
  2558. } else {
  2559. return FileSize();
  2560. }
  2561. }
  2562. uint64_t BlockBasedTableBuilder::GetTailSize() const { return rep_->tail_size; }
  2563. bool BlockBasedTableBuilder::NeedCompact() const {
  2564. for (const auto& collector : rep_->table_properties_collectors) {
  2565. if (collector->NeedCompact()) {
  2566. return true;
  2567. }
  2568. }
  2569. return false;
  2570. }
  2571. TableProperties BlockBasedTableBuilder::GetTableProperties() const {
  2572. return rep_->props;
  2573. }
  2574. std::string BlockBasedTableBuilder::GetFileChecksum() const {
  2575. if (rep_->file != nullptr) {
  2576. return rep_->file->GetFileChecksum();
  2577. } else {
  2578. return kUnknownFileChecksum;
  2579. }
  2580. }
  2581. const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
  2582. if (rep_->file != nullptr) {
  2583. return rep_->file->GetFileChecksumFuncName();
  2584. } else {
  2585. return kUnknownFileChecksumFuncName;
  2586. }
  2587. }
  2588. void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
  2589. const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) {
  2590. assert(rep_->props.seqno_to_time_mapping.empty());
  2591. relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping);
  2592. rep_->props.creation_time = oldest_ancestor_time;
  2593. }
  2594. const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
  2595. const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
  2596. const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
  2597. "partitionedfilter.";
  2598. } // namespace ROCKSDB_NAMESPACE