block_based_table_iterator.cc 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_based/block_based_table_iterator.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); }
  12. void BlockBasedTableIterator::Seek(const Slice& target) {
  13. SeekImpl(&target, true);
  14. }
  15. void BlockBasedTableIterator::SeekSecondPass(const Slice* target) {
  16. AsyncInitDataBlock(/*is_first_pass=*/false);
  17. if (target) {
  18. block_iter_.Seek(*target);
  19. } else {
  20. block_iter_.SeekToFirst();
  21. }
  22. FindKeyForward();
  23. CheckOutOfBound();
  24. if (target) {
  25. assert(!Valid() || icomp_.Compare(*target, key()) <= 0);
  26. }
  27. }
  28. void BlockBasedTableIterator::SeekImpl(const Slice* target,
  29. bool async_prefetch) {
  30. // TODO(hx235): set `seek_key_prefix_for_readahead_trimming_`
  31. // even when `target == nullptr` that is when `SeekToFirst()` is called
  32. if (!multi_scan_status_.ok()) {
  33. return;
  34. }
  35. if (multi_scan_) {
  36. SeekMultiScan(target);
  37. return;
  38. }
  39. if (target != nullptr && prefix_extractor_ &&
  40. read_options_.prefix_same_as_start) {
  41. const Slice& seek_user_key = ExtractUserKey(*target);
  42. seek_key_prefix_for_readahead_trimming_ =
  43. prefix_extractor_->InDomain(seek_user_key)
  44. ? prefix_extractor_->Transform(seek_user_key).ToString()
  45. : "";
  46. }
  47. bool is_first_pass = !async_read_in_progress_;
  48. if (!is_first_pass) {
  49. SeekSecondPass(target);
  50. return;
  51. }
  52. ResetBlockCacheLookupVar();
  53. bool autotune_readaheadsize =
  54. read_options_.auto_readahead_size &&
  55. (read_options_.iterate_upper_bound || read_options_.prefix_same_as_start);
  56. if (autotune_readaheadsize &&
  57. table_->get_rep()->table_options.block_cache.get() &&
  58. direction_ == IterDirection::kForward) {
  59. readahead_cache_lookup_ = true;
  60. }
  61. is_out_of_bound_ = false;
  62. is_at_first_key_from_index_ = false;
  63. seek_stat_state_ = kNone;
  64. bool filter_checked = false;
  65. if (target &&
  66. !CheckPrefixMayMatch(*target, IterDirection::kForward, &filter_checked)) {
  67. ResetDataIter();
  68. RecordTick(table_->GetStatistics(), is_last_level_
  69. ? LAST_LEVEL_SEEK_FILTERED
  70. : NON_LAST_LEVEL_SEEK_FILTERED);
  71. return;
  72. }
  73. if (filter_checked) {
  74. seek_stat_state_ = kFilterUsed;
  75. RecordTick(table_->GetStatistics(), is_last_level_
  76. ? LAST_LEVEL_SEEK_FILTER_MATCH
  77. : NON_LAST_LEVEL_SEEK_FILTER_MATCH);
  78. }
  79. bool need_seek_index = true;
  80. // In case of readahead_cache_lookup_, index_iter_ could change to find the
  81. // readahead size in BlockCacheLookupForReadAheadSize so it needs to
  82. // reseek.
  83. if (IsIndexAtCurr() && block_iter_points_to_real_block_ &&
  84. block_iter_.Valid()) {
  85. // Reseek.
  86. prev_block_offset_ = index_iter_->value().handle.offset();
  87. if (target) {
  88. // We can avoid an index seek if:
  89. // 1. The new seek key is larger than the current key
  90. // 2. The new seek key is within the upper bound of the block
  91. // Since we don't necessarily know the internal key for either
  92. // the current key or the upper bound, we check user keys and
  93. // exclude the equality case. Considering internal keys can
  94. // improve for the boundary cases, but it would complicate the
  95. // code.
  96. if (user_comparator_.Compare(ExtractUserKey(*target),
  97. block_iter_.user_key()) > 0 &&
  98. user_comparator_.Compare(ExtractUserKey(*target),
  99. index_iter_->user_key()) < 0) {
  100. need_seek_index = false;
  101. }
  102. }
  103. }
  104. if (need_seek_index) {
  105. if (target) {
  106. index_iter_->Seek(*target);
  107. } else {
  108. index_iter_->SeekToFirst();
  109. }
  110. is_index_at_curr_block_ = true;
  111. if (!index_iter_->Valid()) {
  112. ResetDataIter();
  113. return;
  114. }
  115. }
  116. // After reseek, index_iter_ point to the right key i.e. target in
  117. // case of readahead_cache_lookup_. So index_iter_ can be used directly.
  118. IndexValue v = index_iter_->value();
  119. const bool same_block = block_iter_points_to_real_block_ &&
  120. v.handle.offset() == prev_block_offset_;
  121. if (!v.first_internal_key.empty() && !same_block &&
  122. (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) &&
  123. allow_unprepared_value_) {
  124. // Index contains the first key of the block, and it's >= target.
  125. // We can defer reading the block.
  126. is_at_first_key_from_index_ = true;
  127. // ResetDataIter() will invalidate block_iter_. Thus, there is no need to
  128. // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
  129. // as that will be done later when the data block is actually read.
  130. ResetDataIter();
  131. } else {
  132. // Need to use the data block.
  133. if (!same_block) {
  134. if (read_options_.async_io && async_prefetch) {
  135. AsyncInitDataBlock(/*is_first_pass=*/true);
  136. if (async_read_in_progress_) {
  137. // Status::TryAgain indicates asynchronous request for retrieval of
  138. // data blocks has been submitted. So it should return at this point
  139. // and Seek should be called again to retrieve the requested block
  140. // and execute the remaining code.
  141. return;
  142. }
  143. } else {
  144. InitDataBlock();
  145. }
  146. } else {
  147. // When the user does a reseek, the iterate_upper_bound might have
  148. // changed. CheckDataBlockWithinUpperBound() needs to be called
  149. // explicitly if the reseek ends up in the same data block.
  150. // If the reseek ends up in a different block, InitDataBlock() will do
  151. // the iterator upper bound check.
  152. CheckDataBlockWithinUpperBound();
  153. }
  154. if (target) {
  155. block_iter_.Seek(*target);
  156. } else {
  157. block_iter_.SeekToFirst();
  158. }
  159. FindKeyForward();
  160. }
  161. CheckOutOfBound();
  162. if (target) {
  163. assert(!Valid() || icomp_.Compare(*target, key()) <= 0);
  164. }
  165. }
  166. void BlockBasedTableIterator::SeekForPrev(const Slice& target) {
  167. multi_scan_.reset();
  168. direction_ = IterDirection::kBackward;
  169. ResetBlockCacheLookupVar();
  170. is_out_of_bound_ = false;
  171. is_at_first_key_from_index_ = false;
  172. seek_stat_state_ = kNone;
  173. bool filter_checked = false;
  174. // For now totally disable prefix seek in auto prefix mode because we don't
  175. // have logic
  176. if (!CheckPrefixMayMatch(target, IterDirection::kBackward, &filter_checked)) {
  177. ResetDataIter();
  178. RecordTick(table_->GetStatistics(), is_last_level_
  179. ? LAST_LEVEL_SEEK_FILTERED
  180. : NON_LAST_LEVEL_SEEK_FILTERED);
  181. return;
  182. }
  183. if (filter_checked) {
  184. seek_stat_state_ = kFilterUsed;
  185. RecordTick(table_->GetStatistics(), is_last_level_
  186. ? LAST_LEVEL_SEEK_FILTER_MATCH
  187. : NON_LAST_LEVEL_SEEK_FILTER_MATCH);
  188. }
  189. SavePrevIndexValue();
  190. // Call Seek() rather than SeekForPrev() in the index block, because the
  191. // target data block will likely to contain the position for `target`, the
  192. // same as Seek(), rather than than before.
  193. // For example, if we have three data blocks, each containing two keys:
  194. // [2, 4] [6, 8] [10, 12]
  195. // (the keys in the index block would be [4, 8, 12])
  196. // and the user calls SeekForPrev(7), we need to go to the second block,
  197. // just like if they call Seek(7).
  198. // The only case where the block is difference is when they seek to a position
  199. // in the boundary. For example, if they SeekForPrev(5), we should go to the
  200. // first block, rather than the second. However, we don't have the information
  201. // to distinguish the two unless we read the second block. In this case, we'll
  202. // end up with reading two blocks.
  203. index_iter_->Seek(target);
  204. is_index_at_curr_block_ = true;
  205. if (!index_iter_->Valid()) {
  206. auto seek_status = index_iter_->status();
  207. // Check for IO error
  208. if (!seek_status.IsNotFound() && !seek_status.ok()) {
  209. ResetDataIter();
  210. return;
  211. }
  212. // With prefix index, Seek() returns NotFound if the prefix doesn't exist
  213. if (seek_status.IsNotFound()) {
  214. // Any key less than the target is fine for prefix seek
  215. ResetDataIter();
  216. return;
  217. } else {
  218. index_iter_->SeekToLast();
  219. }
  220. // Check for IO error
  221. if (!index_iter_->Valid()) {
  222. ResetDataIter();
  223. return;
  224. }
  225. }
  226. InitDataBlock();
  227. block_iter_.SeekForPrev(target);
  228. FindKeyBackward();
  229. CheckDataBlockWithinUpperBound();
  230. assert(!block_iter_.Valid() ||
  231. icomp_.Compare(target, block_iter_.key()) >= 0);
  232. }
  233. void BlockBasedTableIterator::SeekToLast() {
  234. multi_scan_.reset();
  235. direction_ = IterDirection::kBackward;
  236. ResetBlockCacheLookupVar();
  237. is_out_of_bound_ = false;
  238. is_at_first_key_from_index_ = false;
  239. seek_stat_state_ = kNone;
  240. SavePrevIndexValue();
  241. index_iter_->SeekToLast();
  242. is_index_at_curr_block_ = true;
  243. if (!index_iter_->Valid()) {
  244. ResetDataIter();
  245. return;
  246. }
  247. InitDataBlock();
  248. block_iter_.SeekToLast();
  249. FindKeyBackward();
  250. CheckDataBlockWithinUpperBound();
  251. }
  252. void BlockBasedTableIterator::Next() {
  253. assert(Valid());
  254. if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) {
  255. assert(!multi_scan_);
  256. return;
  257. }
  258. assert(block_iter_points_to_real_block_);
  259. block_iter_.Next();
  260. FindKeyForward();
  261. CheckOutOfBound();
  262. }
  263. bool BlockBasedTableIterator::NextAndGetResult(IterateResult* result) {
  264. Next();
  265. bool is_valid = Valid();
  266. if (is_valid) {
  267. result->key = key();
  268. result->bound_check_result = UpperBoundCheckResult();
  269. result->value_prepared = !is_at_first_key_from_index_;
  270. }
  271. return is_valid;
  272. }
  273. void BlockBasedTableIterator::Prev() {
  274. assert(!multi_scan_);
  275. if ((readahead_cache_lookup_ && !IsIndexAtCurr()) || multi_scan_) {
  276. multi_scan_.reset();
  277. // In case of readahead_cache_lookup_, index_iter_ has moved forward. So we
  278. // need to reseek the index_iter_ to point to current block by using
  279. // block_iter_'s key.
  280. if (Valid()) {
  281. ResetBlockCacheLookupVar();
  282. direction_ = IterDirection::kBackward;
  283. Slice last_key = key();
  284. index_iter_->Seek(last_key);
  285. is_index_at_curr_block_ = true;
  286. // Check for IO error.
  287. if (!index_iter_->Valid()) {
  288. ResetDataIter();
  289. return;
  290. }
  291. }
  292. if (!Valid()) {
  293. ResetDataIter();
  294. return;
  295. }
  296. }
  297. ResetBlockCacheLookupVar();
  298. if (is_at_first_key_from_index_) {
  299. is_at_first_key_from_index_ = false;
  300. index_iter_->Prev();
  301. if (!index_iter_->Valid()) {
  302. return;
  303. }
  304. InitDataBlock();
  305. block_iter_.SeekToLast();
  306. } else {
  307. assert(block_iter_points_to_real_block_);
  308. block_iter_.Prev();
  309. }
  310. FindKeyBackward();
  311. }
  312. void BlockBasedTableIterator::InitDataBlock() {
  313. BlockHandle data_block_handle;
  314. bool is_in_cache = false;
  315. bool use_block_cache_for_lookup = true;
  316. if (DoesContainBlockHandles()) {
  317. data_block_handle = block_handles_->front().handle_;
  318. is_in_cache = block_handles_->front().is_cache_hit_;
  319. use_block_cache_for_lookup = false;
  320. } else {
  321. data_block_handle = index_iter_->value().handle;
  322. }
  323. if (!block_iter_points_to_real_block_ ||
  324. data_block_handle.offset() != prev_block_offset_ ||
  325. // if previous attempt of reading the block missed cache, try again
  326. block_iter_.status().IsIncomplete()) {
  327. if (block_iter_points_to_real_block_) {
  328. ResetDataIter();
  329. }
  330. bool is_for_compaction =
  331. lookup_context_.caller == TableReaderCaller::kCompaction;
  332. // Initialize Data Block From CacheableEntry.
  333. if (is_in_cache) {
  334. Status s;
  335. block_iter_.Invalidate(Status::OK());
  336. table_->NewDataBlockIterator<DataBlockIter>(
  337. read_options_, (block_handles_->front().cachable_entry_).As<Block>(),
  338. &block_iter_, s);
  339. } else {
  340. auto* rep = table_->get_rep();
  341. std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb =
  342. nullptr;
  343. if (readahead_cache_lookup_) {
  344. readaheadsize_cb = std::bind(
  345. &BlockBasedTableIterator::BlockCacheLookupForReadAheadSize, this,
  346. std::placeholders::_1, std::placeholders::_2,
  347. std::placeholders::_3);
  348. }
  349. // Prefetch additional data for range scans (iterators).
  350. // Implicit auto readahead:
  351. // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
  352. // Explicit user requested readahead:
  353. // Enabled from the very first IO when ReadOptions.readahead_size is
  354. // set.
  355. block_prefetcher_.PrefetchIfNeeded(
  356. rep, data_block_handle, read_options_.readahead_size,
  357. is_for_compaction,
  358. /*no_sequential_checking=*/false, read_options_, readaheadsize_cb,
  359. read_options_.async_io);
  360. Status s;
  361. table_->NewDataBlockIterator<DataBlockIter>(
  362. read_options_, data_block_handle, &block_iter_, BlockType::kData,
  363. /*get_context=*/nullptr, &lookup_context_,
  364. block_prefetcher_.prefetch_buffer(),
  365. /*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
  366. use_block_cache_for_lookup);
  367. }
  368. block_iter_points_to_real_block_ = true;
  369. CheckDataBlockWithinUpperBound();
  370. if (!is_for_compaction &&
  371. (seek_stat_state_ & kDataBlockReadSinceLastSeek) == 0) {
  372. RecordTick(table_->GetStatistics(), is_last_level_
  373. ? LAST_LEVEL_SEEK_DATA
  374. : NON_LAST_LEVEL_SEEK_DATA);
  375. seek_stat_state_ = static_cast<SeekStatState>(
  376. seek_stat_state_ | kDataBlockReadSinceLastSeek | kReportOnUseful);
  377. }
  378. }
  379. }
  380. void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
  381. BlockHandle data_block_handle;
  382. bool is_for_compaction =
  383. lookup_context_.caller == TableReaderCaller::kCompaction;
  384. if (is_first_pass) {
  385. data_block_handle = index_iter_->value().handle;
  386. if (!block_iter_points_to_real_block_ ||
  387. data_block_handle.offset() != prev_block_offset_ ||
  388. // if previous attempt of reading the block missed cache, try again
  389. block_iter_.status().IsIncomplete()) {
  390. if (block_iter_points_to_real_block_) {
  391. ResetDataIter();
  392. }
  393. auto* rep = table_->get_rep();
  394. std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb =
  395. nullptr;
  396. if (readahead_cache_lookup_) {
  397. readaheadsize_cb = std::bind(
  398. &BlockBasedTableIterator::BlockCacheLookupForReadAheadSize, this,
  399. std::placeholders::_1, std::placeholders::_2,
  400. std::placeholders::_3);
  401. }
  402. // Prefetch additional data for range scans (iterators).
  403. // Implicit auto readahead:
  404. // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
  405. // Explicit user requested readahead:
  406. // Enabled from the very first IO when ReadOptions.readahead_size is
  407. // set.
  408. // In case of async_io with Implicit readahead, block_prefetcher_ will
  409. // always the create the prefetch buffer by setting no_sequential_checking
  410. // = true.
  411. block_prefetcher_.PrefetchIfNeeded(
  412. rep, data_block_handle, read_options_.readahead_size,
  413. is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
  414. read_options_, readaheadsize_cb, read_options_.async_io);
  415. Status s;
  416. table_->NewDataBlockIterator<DataBlockIter>(
  417. read_options_, data_block_handle, &block_iter_, BlockType::kData,
  418. /*get_context=*/nullptr, &lookup_context_,
  419. block_prefetcher_.prefetch_buffer(),
  420. /*for_compaction=*/is_for_compaction, /*async_read=*/true, s,
  421. /*use_block_cache_for_lookup=*/true);
  422. if (s.IsTryAgain()) {
  423. async_read_in_progress_ = true;
  424. return;
  425. }
  426. }
  427. } else {
  428. // Second pass will call the Poll to get the data block which has been
  429. // requested asynchronously.
  430. bool is_in_cache = false;
  431. if (DoesContainBlockHandles()) {
  432. data_block_handle = block_handles_->front().handle_;
  433. is_in_cache = block_handles_->front().is_cache_hit_;
  434. } else {
  435. data_block_handle = index_iter_->value().handle;
  436. }
  437. Status s;
  438. // Initialize Data Block From CacheableEntry.
  439. if (is_in_cache) {
  440. block_iter_.Invalidate(Status::OK());
  441. table_->NewDataBlockIterator<DataBlockIter>(
  442. read_options_, (block_handles_->front().cachable_entry_).As<Block>(),
  443. &block_iter_, s);
  444. } else {
  445. table_->NewDataBlockIterator<DataBlockIter>(
  446. read_options_, data_block_handle, &block_iter_, BlockType::kData,
  447. /*get_context=*/nullptr, &lookup_context_,
  448. block_prefetcher_.prefetch_buffer(),
  449. /*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
  450. /*use_block_cache_for_lookup=*/false);
  451. }
  452. }
  453. block_iter_points_to_real_block_ = true;
  454. CheckDataBlockWithinUpperBound();
  455. if (!is_for_compaction &&
  456. (seek_stat_state_ & kDataBlockReadSinceLastSeek) == 0) {
  457. RecordTick(table_->GetStatistics(), is_last_level_
  458. ? LAST_LEVEL_SEEK_DATA
  459. : NON_LAST_LEVEL_SEEK_DATA);
  460. seek_stat_state_ = static_cast<SeekStatState>(
  461. seek_stat_state_ | kDataBlockReadSinceLastSeek | kReportOnUseful);
  462. }
  463. async_read_in_progress_ = false;
  464. }
  465. bool BlockBasedTableIterator::MaterializeCurrentBlock() {
  466. assert(is_at_first_key_from_index_);
  467. assert(!block_iter_points_to_real_block_);
  468. assert(index_iter_->Valid());
  469. is_at_first_key_from_index_ = false;
  470. InitDataBlock();
  471. assert(block_iter_points_to_real_block_);
  472. if (!block_iter_.status().ok()) {
  473. return false;
  474. }
  475. block_iter_.SeekToFirst();
  476. // MaterializeCurrentBlock is called when block is actually read by
  477. // calling InitDataBlock. is_at_first_key_from_index_ will be false for block
  478. // handles placed in blockhandle. So index_ will be pointing to current block.
  479. // After InitDataBlock, index_iter_ can point to different block if
  480. // BlockCacheLookupForReadAheadSize is called.
  481. Slice first_internal_key;
  482. if (DoesContainBlockHandles()) {
  483. first_internal_key = block_handles_->front().first_internal_key_;
  484. } else {
  485. first_internal_key = index_iter_->value().first_internal_key;
  486. }
  487. if (!block_iter_.Valid() ||
  488. icomp_.Compare(block_iter_.key(), first_internal_key) != 0) {
  489. block_iter_.Invalidate(Status::Corruption(
  490. "first key in index doesn't match first key in block"));
  491. return false;
  492. }
  493. return true;
  494. }
  495. void BlockBasedTableIterator::FindKeyForward() {
  496. // This method's code is kept short to make it likely to be inlined.
  497. assert(!is_out_of_bound_);
  498. assert(block_iter_points_to_real_block_);
  499. if (!block_iter_.Valid()) {
  500. // This is the only call site of FindBlockForward(), but it's extracted into
  501. // a separate method to keep FindKeyForward() short and likely to be
  502. // inlined. When transitioning to a different block, we call
  503. // FindBlockForward(), which is much longer and is probably not inlined.
  504. FindBlockForward();
  505. } else {
  506. // This is the fast path that avoids a function call.
  507. }
  508. }
  509. void BlockBasedTableIterator::FindBlockForward() {
  510. if (multi_scan_) {
  511. FindBlockForwardInMultiScan();
  512. return;
  513. }
  514. // TODO the while loop inherits from two-level-iterator. We don't know
  515. // whether a block can be empty so it can be replaced by an "if".
  516. do {
  517. if (!block_iter_.status().ok()) {
  518. return;
  519. }
  520. // Whether next data block is out of upper bound, if there is one.
  521. // index_iter_ can point to different block in case of
  522. // readahead_cache_lookup_. readahead_cache_lookup_ will be handle the
  523. // upper_bound check.
  524. bool next_block_is_out_of_bound =
  525. IsIndexAtCurr() && read_options_.iterate_upper_bound != nullptr &&
  526. block_iter_points_to_real_block_ &&
  527. block_upper_bound_check_ == BlockUpperBound::kUpperBoundInCurBlock;
  528. assert(!next_block_is_out_of_bound ||
  529. user_comparator_.CompareWithoutTimestamp(
  530. *read_options_.iterate_upper_bound, /*a_has_ts=*/false,
  531. index_iter_->user_key(), /*b_has_ts=*/true) <= 0);
  532. ResetDataIter();
  533. if (DoesContainBlockHandles()) {
  534. // Advance and point to that next Block handle to make that block handle
  535. // current.
  536. block_handles_->pop_front();
  537. }
  538. if (!DoesContainBlockHandles()) {
  539. // For readahead_cache_lookup_ enabled scenario -
  540. // 1. In case of Seek, block_handle will be empty and it should be follow
  541. // as usual doing index_iter_->Next().
  542. // 2. If block_handles is empty and index is not at current because of
  543. // lookup (during Next), it should skip doing index_iter_->Next(), as
  544. // it's already pointing to next block;
  545. // 3. Last block could be out of bound and it won't iterate over that
  546. // during BlockCacheLookup. We need to set for that block here.
  547. if (IsIndexAtCurr() || is_index_out_of_bound_) {
  548. index_iter_->Next();
  549. if (is_index_out_of_bound_) {
  550. next_block_is_out_of_bound = is_index_out_of_bound_;
  551. is_index_out_of_bound_ = false;
  552. }
  553. } else {
  554. // Skip Next as index_iter_ already points to correct index when it
  555. // iterates in BlockCacheLookupForReadAheadSize.
  556. is_index_at_curr_block_ = true;
  557. }
  558. if (next_block_is_out_of_bound) {
  559. // The next block is out of bound. No need to read it.
  560. TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound",
  561. nullptr);
  562. // We need to make sure this is not the last data block before setting
  563. // is_out_of_bound_, since the index key for the last data block can be
  564. // larger than smallest key of the next file on the same level.
  565. if (index_iter_->Valid()) {
  566. is_out_of_bound_ = true;
  567. }
  568. return;
  569. }
  570. if (!index_iter_->Valid()) {
  571. return;
  572. }
  573. IndexValue v = index_iter_->value();
  574. if (!v.first_internal_key.empty() && allow_unprepared_value_) {
  575. // Index contains the first key of the block. Defer reading the block.
  576. is_at_first_key_from_index_ = true;
  577. return;
  578. }
  579. }
  580. InitDataBlock();
  581. block_iter_.SeekToFirst();
  582. } while (!block_iter_.Valid());
  583. }
  584. void BlockBasedTableIterator::FindKeyBackward() {
  585. while (!block_iter_.Valid()) {
  586. if (!block_iter_.status().ok()) {
  587. return;
  588. }
  589. ResetDataIter();
  590. index_iter_->Prev();
  591. if (index_iter_->Valid()) {
  592. InitDataBlock();
  593. block_iter_.SeekToLast();
  594. } else {
  595. return;
  596. }
  597. }
  598. // We could have check lower bound here too, but we opt not to do it for
  599. // code simplicity.
  600. }
  601. void BlockBasedTableIterator::CheckOutOfBound() {
  602. if (read_options_.iterate_upper_bound != nullptr &&
  603. block_upper_bound_check_ != BlockUpperBound::kUpperBoundBeyondCurBlock &&
  604. Valid()) {
  605. is_out_of_bound_ =
  606. user_comparator_.CompareWithoutTimestamp(
  607. *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(),
  608. /*b_has_ts=*/true) <= 0;
  609. }
  610. }
  611. void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() {
  612. if (IsIndexAtCurr() && read_options_.iterate_upper_bound != nullptr &&
  613. block_iter_points_to_real_block_) {
  614. block_upper_bound_check_ = (user_comparator_.CompareWithoutTimestamp(
  615. *read_options_.iterate_upper_bound,
  616. /*a_has_ts=*/false, index_iter_->user_key(),
  617. /*b_has_ts=*/true) > 0)
  618. ? BlockUpperBound::kUpperBoundBeyondCurBlock
  619. : BlockUpperBound::kUpperBoundInCurBlock;
  620. }
  621. }
  622. void BlockBasedTableIterator::InitializeStartAndEndOffsets(
  623. bool read_curr_block, bool& found_first_miss_block,
  624. uint64_t& start_updated_offset, uint64_t& end_updated_offset,
  625. size_t& prev_handles_size) {
  626. assert(block_handles_ != nullptr);
  627. prev_handles_size = block_handles_->size();
  628. size_t footer = table_->get_rep()->footer.GetBlockTrailerSize();
  629. // It initialize start and end offset to begin which is covered by following
  630. // scenarios
  631. if (read_curr_block) {
  632. if (!DoesContainBlockHandles()) {
  633. // Scenario 1 : read_curr_block (callback made on miss block which caller
  634. // was reading) and it has no existing handles in queue. i.e.
  635. // index_iter_ is pointing to block that is being read by
  636. // caller.
  637. //
  638. // Add current block here as it doesn't need any lookup.
  639. BlockHandleInfo block_handle_info;
  640. block_handle_info.handle_ = index_iter_->value().handle;
  641. block_handle_info.SetFirstInternalKey(
  642. index_iter_->value().first_internal_key);
  643. end_updated_offset = block_handle_info.handle_.offset() + footer +
  644. block_handle_info.handle_.size();
  645. block_handles_->emplace_back(std::move(block_handle_info));
  646. index_iter_->Next();
  647. is_index_at_curr_block_ = false;
  648. found_first_miss_block = true;
  649. } else {
  650. // Scenario 2 : read_curr_block (callback made on miss block which caller
  651. // was reading) but the queue already has some handles.
  652. //
  653. // It can be due to reading error in second buffer in FilePrefetchBuffer.
  654. // BlockHandles already added to the queue but there was error in fetching
  655. // those data blocks. So in this call they need to be read again.
  656. found_first_miss_block = true;
  657. // Initialize prev_handles_size to 0 as all those handles need to be read
  658. // again.
  659. prev_handles_size = 0;
  660. start_updated_offset = block_handles_->front().handle_.offset();
  661. end_updated_offset = block_handles_->back().handle_.offset() + footer +
  662. block_handles_->back().handle_.size();
  663. }
  664. } else {
  665. // Scenario 3 : read_curr_block is false (callback made to do additional
  666. // prefetching in buffers) and the queue already has some
  667. // handles from first buffer.
  668. if (DoesContainBlockHandles()) {
  669. start_updated_offset = block_handles_->back().handle_.offset() + footer +
  670. block_handles_->back().handle_.size();
  671. end_updated_offset = start_updated_offset;
  672. } else {
  673. // Scenario 4 : read_curr_block is false (callback made to do additional
  674. // prefetching in buffers) but the queue has no handle
  675. // from first buffer.
  676. //
  677. // It can be when Reseek is from block cache (which doesn't clear the
  678. // buffers in FilePrefetchBuffer but clears block handles from queue) and
  679. // reseek also lies within the buffer. So Next will get data from
  680. // exisiting buffers untill this callback is made to prefetch additional
  681. // data. All handles need to be added to the queue starting from
  682. // index_iter_.
  683. assert(index_iter_->Valid());
  684. start_updated_offset = index_iter_->value().handle.offset();
  685. end_updated_offset = start_updated_offset;
  686. }
  687. }
  688. }
  689. // BlockCacheLookupForReadAheadSize API lookups in the block cache and tries to
  690. // reduce the start and end offset passed.
  691. //
  692. // Implementation -
  693. // This function looks into the block cache for the blocks between start_offset
  694. // and end_offset and add all the handles in the queue.
  695. // It then iterates from the end to find first miss block and update the end
  696. // offset to that block.
  697. // It also iterates from the start and find first miss block and update the
  698. // start offset to that block.
  699. //
  700. // Arguments -
  701. // start_offset : Offset from which the caller wants to read.
  702. // end_offset : End offset till which the caller wants to read.
  703. // read_curr_block : True if this call was due to miss in the cache and
  704. // caller wants to read that block.
  705. // False if current call is to prefetch additional data in
  706. // extra buffers.
  707. void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
  708. bool read_curr_block, uint64_t& start_offset, uint64_t& end_offset) {
  709. uint64_t start_updated_offset = start_offset;
  710. // readahead_cache_lookup_ can be set false, if after Seek and Next
  711. // there is SeekForPrev or any other backward operation.
  712. if (!readahead_cache_lookup_) {
  713. return;
  714. }
  715. size_t footer = table_->get_rep()->footer.GetBlockTrailerSize();
  716. if (read_curr_block && !DoesContainBlockHandles() &&
  717. IsNextBlockOutOfReadaheadBound()) {
  718. end_offset = index_iter_->value().handle.offset() + footer +
  719. index_iter_->value().handle.size();
  720. return;
  721. }
  722. uint64_t end_updated_offset = start_updated_offset;
  723. bool found_first_miss_block = false;
  724. size_t prev_handles_size;
  725. // Initialize start and end offsets based on exisiting handles in the queue
  726. // and read_curr_block argument passed.
  727. if (block_handles_ == nullptr) {
  728. block_handles_.reset(new std::deque<BlockHandleInfo>());
  729. }
  730. InitializeStartAndEndOffsets(read_curr_block, found_first_miss_block,
  731. start_updated_offset, end_updated_offset,
  732. prev_handles_size);
  733. while (index_iter_->Valid() && !is_index_out_of_bound_) {
  734. BlockHandle block_handle = index_iter_->value().handle;
  735. // Adding this data block exceeds end offset. So this data
  736. // block won't be added.
  737. // There can be a case where passed end offset is smaller than
  738. // block_handle.size() + footer because of readahead_size truncated to
  739. // upper_bound. So we prefer to read the block rather than skip it to avoid
  740. // sync read calls in case of async_io.
  741. if (start_updated_offset != end_updated_offset &&
  742. (end_updated_offset + block_handle.size() + footer > end_offset)) {
  743. break;
  744. }
  745. // For current data block, do the lookup in the cache. Lookup should pin the
  746. // data block in cache.
  747. BlockHandleInfo block_handle_info;
  748. block_handle_info.handle_ = index_iter_->value().handle;
  749. block_handle_info.SetFirstInternalKey(
  750. index_iter_->value().first_internal_key);
  751. end_updated_offset += footer + block_handle_info.handle_.size();
  752. Status s = table_->LookupAndPinBlocksInCache<Block_kData>(
  753. read_options_, block_handle,
  754. &(block_handle_info.cachable_entry_).As<Block_kData>());
  755. if (!s.ok()) {
  756. #ifndef NDEBUG
  757. // To allow fault injection verification to pass since non-okay status in
  758. // `BlockCacheLookupForReadAheadSize()` won't fail the read but to have
  759. // less or no readahead
  760. IGNORE_STATUS_IF_ERROR(s);
  761. #endif
  762. break;
  763. }
  764. block_handle_info.is_cache_hit_ =
  765. (block_handle_info.cachable_entry_.GetValue() ||
  766. block_handle_info.cachable_entry_.GetCacheHandle());
  767. // If this is the first miss block, update start offset to this block.
  768. if (!found_first_miss_block && !block_handle_info.is_cache_hit_) {
  769. found_first_miss_block = true;
  770. start_updated_offset = block_handle_info.handle_.offset();
  771. }
  772. // Add the handle to the queue.
  773. block_handles_->emplace_back(std::move(block_handle_info));
  774. // Can't figure out for current block if current block
  775. // is out of bound. But for next block we can find that.
  776. // If curr block's index key >= iterate_upper_bound, it
  777. // means all the keys in next block or above are out of
  778. // bound.
  779. if (IsNextBlockOutOfReadaheadBound()) {
  780. is_index_out_of_bound_ = true;
  781. break;
  782. }
  783. index_iter_->Next();
  784. is_index_at_curr_block_ = false;
  785. }
  786. #ifndef NDEBUG
  787. // To allow fault injection verification to pass since non-okay status in
  788. // `BlockCacheLookupForReadAheadSize()` won't fail the read but to have less
  789. // or no readahead
  790. if (!index_iter_->status().ok()) {
  791. IGNORE_STATUS_IF_ERROR(index_iter_->status());
  792. }
  793. #endif
  794. if (found_first_miss_block) {
  795. // Iterate cache hit block handles from the end till a Miss is there, to
  796. // truncate and update the end offset till that Miss.
  797. auto it = block_handles_->rbegin();
  798. auto it_end =
  799. block_handles_->rbegin() + (block_handles_->size() - prev_handles_size);
  800. while (it != it_end && (*it).is_cache_hit_ &&
  801. start_updated_offset != (*it).handle_.offset()) {
  802. it++;
  803. }
  804. end_updated_offset = (*it).handle_.offset() + footer + (*it).handle_.size();
  805. } else {
  806. // Nothing to read. Can be because of IOError in index_iter_->Next() or
  807. // reached upper_bound.
  808. end_updated_offset = start_updated_offset;
  809. }
  810. end_offset = end_updated_offset;
  811. start_offset = start_updated_offset;
  812. ResetPreviousBlockOffset();
  813. }
  814. BlockBasedTableIterator::MultiScanState::~MultiScanState() {
  815. // Abort any pending async IO operations to prevent callback being called
  816. // after async read states are destructed.
  817. if (!async_states.empty()) {
  818. std::vector<void*> io_handles_to_abort;
  819. std::vector<AsyncReadState*> states_to_cleanup;
  820. // Collect all pending IO handles
  821. for (size_t i = 0; i < async_states.size(); ++i) {
  822. auto& async_read = async_states[i];
  823. if (async_read.io_handle != nullptr) {
  824. assert(!async_read.finished);
  825. io_handles_to_abort.push_back(async_read.io_handle);
  826. states_to_cleanup.push_back(&async_read);
  827. }
  828. }
  829. if (!io_handles_to_abort.empty()) {
  830. IOStatus abort_status = fs->AbortIO(io_handles_to_abort);
  831. if (!abort_status.ok()) {
  832. #ifndef NDEBUG
  833. fprintf(stderr, "Error aborting async IO operations: %s\n",
  834. abort_status.ToString().c_str());
  835. #endif
  836. assert(false);
  837. }
  838. (void)abort_status; // Suppress unused variable warning
  839. }
  840. for (auto async_read : states_to_cleanup) {
  841. async_read->CleanUpIOHandle();
  842. }
  843. }
  844. }
  845. // Note:
  846. // - Iterator should not be reused for multiple multiscans or mixing
  847. // multiscan with regular iterator usage.
  848. // - scan ranges should be non-overlapping, and have increasing start keys.
  849. // If a scan range's limit is not set, then there should only be one scan range.
  850. // - After Prepare(), the iterator expects Seek to be called on the start key
  851. // of each ScanOption in order. If any other Seek is done, an error status is
  852. // returned
  853. // - Whenever all blocks of a scan opt are exhausted, the iterator will become
  854. // invalid and UpperBoundCheckResult() will return kOutOfBound. So that the
  855. // upper layer (LevelIterator) will stop scanning instead thinking EOF is
  856. // reached and continue into the next file. The only exception is for the last
  857. // scan opt. If we reach the end of the last scan opt, UpperBoundCheckResult()
  858. // will return kUnknown instead of kOutOfBound. This mechanism requires that
  859. // scan opts are properly pruned such that there is no scan opt that is after
  860. // this file's key range.
  861. // FIXME: DBIter and MergingIterator may
  862. // internally do Seek() on child iterators, e.g. due to
  863. // ReadOptions::max_skippable_internal_keys or reseeking into range deletion
  864. // end key. These Seeks will be handled properly, as long as the target is
  865. // moving forward.
  866. void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) {
  867. assert(!multi_scan_);
  868. if (!index_iter_->status().ok()) {
  869. multi_scan_status_ = index_iter_->status();
  870. return;
  871. }
  872. if (multi_scan_) {
  873. multi_scan_.reset();
  874. multi_scan_status_ = Status::InvalidArgument("Prepare already called");
  875. return;
  876. }
  877. index_iter_->Prepare(multiscan_opts);
  878. std::vector<BlockHandle> scan_block_handles;
  879. std::vector<std::string> data_block_separators;
  880. std::vector<std::tuple<size_t, size_t>> block_index_ranges_per_scan;
  881. const std::vector<ScanOptions>& scan_opts = multiscan_opts->GetScanRanges();
  882. multi_scan_status_ =
  883. CollectBlockHandles(scan_opts, &scan_block_handles,
  884. &block_index_ranges_per_scan, &data_block_separators);
  885. if (!multi_scan_status_.ok()) {
  886. return;
  887. }
  888. // Pin already cached blocks, collect remaining blocks to read
  889. std::vector<size_t> block_indices_to_read;
  890. std::vector<CachableEntry<Block>> pinned_data_blocks_guard(
  891. scan_block_handles.size());
  892. size_t prefetched_max_idx;
  893. multi_scan_status_ = FilterAndPinCachedBlocks(
  894. scan_block_handles, multiscan_opts, &block_indices_to_read,
  895. &pinned_data_blocks_guard, &prefetched_max_idx);
  896. if (!multi_scan_status_.ok()) {
  897. return;
  898. }
  899. std::vector<AsyncReadState> async_states;
  900. // Maps from block index into async read request (index into async_states[])
  901. UnorderedMap<size_t, size_t> block_idx_to_readreq_idx;
  902. if (!block_indices_to_read.empty()) {
  903. std::vector<FSReadRequest> read_reqs;
  904. std::vector<std::vector<size_t>> coalesced_block_indices;
  905. PrepareIORequests(block_indices_to_read, scan_block_handles, multiscan_opts,
  906. &read_reqs, &block_idx_to_readreq_idx,
  907. &coalesced_block_indices);
  908. multi_scan_status_ =
  909. ExecuteIO(scan_block_handles, multiscan_opts, coalesced_block_indices,
  910. &read_reqs, &async_states, &pinned_data_blocks_guard);
  911. if (!multi_scan_status_.ok()) {
  912. return;
  913. }
  914. }
  915. // Successful Prepare, init related states so the iterator reads from prepared
  916. // blocks.
  917. multi_scan_ = std::make_unique<MultiScanState>(
  918. table_->get_rep()->ioptions.env->GetFileSystem(), multiscan_opts,
  919. std::move(pinned_data_blocks_guard), std::move(data_block_separators),
  920. std::move(block_index_ranges_per_scan),
  921. std::move(block_idx_to_readreq_idx), std::move(async_states),
  922. prefetched_max_idx);
  923. is_index_at_curr_block_ = false;
  924. block_iter_points_to_real_block_ = false;
  925. }
  926. void BlockBasedTableIterator::SeekMultiScan(const Slice* seek_target) {
  927. assert(multi_scan_ && multi_scan_status_.ok());
  928. // This is a MultiScan and Preapre() has been called.
  929. // Reset out of bound on seek, if it is out of bound again, it will be set
  930. // properly later in the code path
  931. is_out_of_bound_ = false;
  932. // Validate seek key with scan options
  933. if (!seek_target) {
  934. // start key must be set for multi-scan
  935. multi_scan_status_ = Status::InvalidArgument("No seek key for MultiScan");
  936. return;
  937. }
  938. // Check the case where there is no range prepared on this table
  939. if (multi_scan_->scan_opts->size() == 0) {
  940. // out of bound
  941. MarkPreparedRangeExhausted();
  942. return;
  943. }
  944. // Check whether seek key is moving forward.
  945. if (multi_scan_->prev_seek_key_.empty() ||
  946. icomp_.Compare(*seek_target, multi_scan_->prev_seek_key_) > 0) {
  947. // If seek key is empty or is larger than previous seek key, update the
  948. // previous seek key. Otherwise use the previous seek key as the adjusted
  949. // seek target moving forward. This prevents seek target going backward,
  950. // which would visit pages that have been unpinned.
  951. // This issue is caused by sub-optimal range delete handling inside merge
  952. // iterator.
  953. // TODO xingbo issues:14068 : Optimize the handling of range delete iterator
  954. // inside merge iterator, so that it doesn't move seek key backward. After
  955. // that we could return error if the key moves backward here.
  956. multi_scan_->prev_seek_key_ = seek_target->ToString();
  957. } else {
  958. // Seek key is adjusted to previous one, we can return here directly.
  959. return;
  960. }
  961. // There are 3 different Cases we need to handle:
  962. // The following diagram explain different seek targets seeking at various
  963. // position on the table, while the next_scan_idx points to the PreparedRange
  964. // 2.
  965. //
  966. // next_scan_idx: -------------------┐
  967. // ▼
  968. // table: : __[PreparedRange 1]__[PreparedRange 2]__[PreparedRange 3]__
  969. // Seek target: <----- Case 1 ------>▲<------------- Case 2 -------------->
  970. // │
  971. // Case 3
  972. //
  973. // Case 1: seek before the start of next prepared ranges. This could happen
  974. // due to too many delete tomestone triggered reseek or delete range.
  975. // Case 2: seek after the start of next prepared range.
  976. // This could happen due to seek key adjustment from delete range file.
  977. // E.g. LSM has 3 levels, each level has only 1 file:
  978. // L1 : key : 0---10
  979. // L2 : Delete range key : 0-5
  980. // L3 : key : 0---10
  981. // When a range 2-8 was prepared, the prepared key would be 2 on L3 file,
  982. // but the seek key would be 5, as the seek key was updated by the largest
  983. // key of delete range. This causes all of the cases above to be possible,
  984. // when the ranges are adjusted in the above examples.
  985. // Case 3: seek at the beginning of a prepared range (expected case)
  986. // Allow reseek on the start of the last prepared range due to too many
  987. // tombstone
  988. multi_scan_->next_scan_idx =
  989. std::min(multi_scan_->next_scan_idx,
  990. multi_scan_->block_index_ranges_per_scan.size() - 1);
  991. auto user_seek_target = ExtractUserKey(*seek_target);
  992. auto compare_next_scan_start_result =
  993. user_comparator_.CompareWithoutTimestamp(
  994. user_seek_target, /*a_has_ts=*/true,
  995. multi_scan_->scan_opts->GetScanRanges()[multi_scan_->next_scan_idx]
  996. .range.start.value(),
  997. /*b_has_ts=*/false);
  998. if (compare_next_scan_start_result != 0) {
  999. // The seek target is not exactly same as what was prepared.
  1000. if (compare_next_scan_start_result < 0) {
  1001. // Case 1:
  1002. if (multi_scan_->next_scan_idx == 0) {
  1003. // This should not happen, even when seek target is adjusted by delete
  1004. // range. The reason is that if the seek target is before the start key
  1005. // of the first prepared range, its end key needs to be >= the smallest
  1006. // key of this file, otherwise it is skipped in level iterator. If its
  1007. // end key is >= the smallest key of this file, then this range will be
  1008. // prepared for this file. As delete range could only adjust seek
  1009. // target forward, so it would never be before the start key of the
  1010. // first prepared range.
  1011. assert(false && "Seek target before the first prepared range");
  1012. MarkPreparedRangeExhausted();
  1013. return;
  1014. }
  1015. auto seek_target_before_previous_prepared_range =
  1016. user_comparator_.CompareWithoutTimestamp(
  1017. user_seek_target, /*a_has_ts=*/true,
  1018. multi_scan_->scan_opts
  1019. ->GetScanRanges()[multi_scan_->next_scan_idx - 1]
  1020. .range.start.value(),
  1021. /*b_has_ts=*/false) < 0;
  1022. // Not expected to happen
  1023. // This should never happen, the reason is that the
  1024. // multi_scan_->next_scan_idx is set to a non zero value is due to a seek
  1025. // target larger or equal to the start key of multi_scan_->next_scan_idx-1
  1026. // happended earlier. If a seek happens before the start key of
  1027. // multi_scan_->next_scan_idx-1, it would seek a key that is less than
  1028. // what was seeked before.
  1029. assert(!seek_target_before_previous_prepared_range);
  1030. if (seek_target_before_previous_prepared_range) {
  1031. multi_scan_status_ = Status::InvalidArgument(
  1032. "Seek target is before the previous prepared range at index " +
  1033. std::to_string(multi_scan_->next_scan_idx));
  1034. return;
  1035. }
  1036. // It should only be possible to seek a key between the start of current
  1037. // prepared scan and start of next prepared range.
  1038. MultiScanUnexpectedSeekTarget(
  1039. seek_target, &user_seek_target,
  1040. std::get<0>(multi_scan_->block_index_ranges_per_scan
  1041. [multi_scan_->next_scan_idx - 1]));
  1042. } else {
  1043. // Case 2:
  1044. MultiScanUnexpectedSeekTarget(
  1045. seek_target, &user_seek_target,
  1046. std::get<0>(
  1047. multi_scan_
  1048. ->block_index_ranges_per_scan[multi_scan_->next_scan_idx]));
  1049. }
  1050. } else {
  1051. // Case 2:
  1052. assert(multi_scan_->next_scan_idx <
  1053. multi_scan_->block_index_ranges_per_scan.size());
  1054. auto [cur_scan_start_idx, cur_scan_end_idx] =
  1055. multi_scan_->block_index_ranges_per_scan[multi_scan_->next_scan_idx];
  1056. // We should have the data block already loaded
  1057. ++multi_scan_->next_scan_idx;
  1058. if (cur_scan_start_idx >= cur_scan_end_idx) {
  1059. // No blocks are prepared for this range at current file.
  1060. MarkPreparedRangeExhausted();
  1061. return;
  1062. }
  1063. MultiScanSeekTargetFromBlock(seek_target, cur_scan_start_idx);
  1064. }
  1065. }
  1066. void BlockBasedTableIterator::MultiScanUnexpectedSeekTarget(
  1067. const Slice* seek_target, const Slice* user_seek_target, size_t block_idx) {
  1068. // linear search the block that contains the seek target, and unpin blocks
  1069. // that are before it.
  1070. // The logic here could be confusing when there is a delete range involved.
  1071. // E.g. we have an LSM with 3 levels, each level has only 1 file:
  1072. // L1: data file : 0---10
  1073. // L2: Delete range : 0-5
  1074. // L3: data file : 0---10
  1075. //
  1076. // MultiScan on ranges 1-2, 3-4, and 5-6.
  1077. // When user first do Seek(1), on level 2, due to delete range 0-5, the seek
  1078. // key is adjusted to 5 at level 3. Therefore, we will internally do Seek(5)
  1079. // and unpins all blocks until 5 at level 3. Then the next scan's blocks from
  1080. // 3-4 are unpinned at level 3. It is confusing that maybe block 3-4 should
  1081. // not be unpinned, as next scan would need it. But Seek(5) implies that these
  1082. // keys are all covered by some range deletion, so the next Seek(3) will also
  1083. // do Seek(5) internally, so the blocks from 3-4 could be safely unpinned.
  1084. // advance to the right prepared range
  1085. while (
  1086. multi_scan_->next_scan_idx <
  1087. multi_scan_->block_index_ranges_per_scan.size() &&
  1088. (user_comparator_.CompareWithoutTimestamp(
  1089. *user_seek_target, /*a_has_ts=*/true,
  1090. multi_scan_->scan_opts->GetScanRanges()[multi_scan_->next_scan_idx]
  1091. .range.start.value(),
  1092. /*b_has_ts=*/false) >= 0)) {
  1093. multi_scan_->next_scan_idx++;
  1094. }
  1095. // next_scan_idx is guaranteed to be higher than 0. If the seek key is before
  1096. // the start key of first prepared range, it is already handled by caller
  1097. // SeekMultiScan. It is equal, it would not call this funciton. If it is
  1098. // after, next_scan_idx would be advanced by the loop above.
  1099. assert(multi_scan_->next_scan_idx > 0);
  1100. // Get the current range
  1101. auto cur_scan_idx = multi_scan_->next_scan_idx - 1;
  1102. auto [cur_scan_start_idx, cur_scan_end_idx] =
  1103. multi_scan_->block_index_ranges_per_scan[cur_scan_idx];
  1104. if (cur_scan_start_idx >= cur_scan_end_idx) {
  1105. // No blocks are prepared for this range at current file.
  1106. MarkPreparedRangeExhausted();
  1107. return;
  1108. }
  1109. // Unpin all the blocks from multi_scan_->cur_data_block_idx to
  1110. // cur_scan_start_idx
  1111. for (auto unpin_block_idx = multi_scan_->cur_data_block_idx;
  1112. unpin_block_idx < cur_scan_start_idx; unpin_block_idx++) {
  1113. if (!multi_scan_->pinned_data_blocks[unpin_block_idx].IsEmpty()) {
  1114. multi_scan_->pinned_data_blocks[unpin_block_idx].Reset();
  1115. }
  1116. }
  1117. // Find the right block_idx;
  1118. block_idx = cur_scan_start_idx;
  1119. auto const& data_block_separators = multi_scan_->data_block_separators;
  1120. while (block_idx < data_block_separators.size() &&
  1121. (user_comparator_.CompareWithoutTimestamp(
  1122. *user_seek_target, /*a_has_ts=*/true,
  1123. data_block_separators[block_idx],
  1124. /*b_has_ts=*/false) > 0)) {
  1125. // Unpin the blocks that are passed
  1126. if (!multi_scan_->pinned_data_blocks[block_idx].IsEmpty()) {
  1127. multi_scan_->pinned_data_blocks[block_idx].Reset();
  1128. }
  1129. block_idx++;
  1130. }
  1131. if (block_idx >= data_block_separators.size()) {
  1132. // All of the prepared blocks for this file is exhausted.
  1133. MarkPreparedRangeExhausted();
  1134. return;
  1135. }
  1136. // The current block may contain the data for the target key
  1137. MultiScanSeekTargetFromBlock(seek_target, block_idx);
  1138. }
  1139. void BlockBasedTableIterator::MultiScanSeekTargetFromBlock(
  1140. const Slice* seek_target, size_t block_idx) {
  1141. assert(multi_scan_->cur_data_block_idx <= block_idx);
  1142. if (!block_iter_points_to_real_block_ ||
  1143. multi_scan_->cur_data_block_idx != block_idx) {
  1144. if (block_iter_points_to_real_block_) {
  1145. // Should be scan in increasing key range.
  1146. // All blocks before cur_data_block_idx_ are not pinned anymore.
  1147. assert(multi_scan_->cur_data_block_idx < block_idx);
  1148. }
  1149. ResetDataIter();
  1150. if (MultiScanLoadDataBlock(block_idx)) {
  1151. return;
  1152. }
  1153. }
  1154. // Move current data block index forward until block_idx, meantime, unpin all
  1155. // the blocks in between
  1156. while (multi_scan_->cur_data_block_idx < block_idx) {
  1157. // unpin block
  1158. if (!multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx]
  1159. .IsEmpty()) {
  1160. multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx].Reset();
  1161. }
  1162. multi_scan_->cur_data_block_idx++;
  1163. }
  1164. block_iter_points_to_real_block_ = true;
  1165. block_iter_.Seek(*seek_target);
  1166. FindKeyForward();
  1167. CheckOutOfBound();
  1168. }
  1169. void BlockBasedTableIterator::FindBlockForwardInMultiScan() {
  1170. assert(multi_scan_);
  1171. assert(multi_scan_->next_scan_idx >= 1);
  1172. const auto cur_scan_end_idx = std::get<1>(
  1173. multi_scan_->block_index_ranges_per_scan[multi_scan_->next_scan_idx - 1]);
  1174. do {
  1175. if (!block_iter_.status().ok()) {
  1176. return;
  1177. }
  1178. // If is_out_of_bound_ is true, upper layer (LevelIterator) considers this
  1179. // level has reached iterate_upper_bound_ and will not continue to iterate
  1180. // into the next file. When we are doing the last scan within a MultiScan
  1181. // for this file, it may need to continue to scan into the next file, so
  1182. // we do not set is_out_of_bound_ in this case.
  1183. if (multi_scan_->cur_data_block_idx + 1 >= cur_scan_end_idx) {
  1184. MarkPreparedRangeExhausted();
  1185. return;
  1186. }
  1187. // Move to the next pinned data block
  1188. ResetDataIter();
  1189. // Unpin previous block if it is not reset by data iterator
  1190. if (!multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx]
  1191. .IsEmpty()) {
  1192. multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx].Reset();
  1193. }
  1194. ++multi_scan_->cur_data_block_idx;
  1195. if (MultiScanLoadDataBlock(multi_scan_->cur_data_block_idx)) {
  1196. return;
  1197. }
  1198. block_iter_points_to_real_block_ = true;
  1199. block_iter_.SeekToFirst();
  1200. } while (!block_iter_.Valid());
  1201. }
  1202. Status BlockBasedTableIterator::PollForBlock(size_t idx) {
  1203. assert(multi_scan_);
  1204. const auto async_idx = multi_scan_->block_idx_to_readreq_idx.find(idx);
  1205. if (async_idx == multi_scan_->block_idx_to_readreq_idx.end()) {
  1206. // Did not require async read, should already be pinned.
  1207. assert(multi_scan_->pinned_data_blocks[idx].GetValue());
  1208. return Status::OK();
  1209. }
  1210. AsyncReadState& async_read = multi_scan_->async_states[async_idx->second];
  1211. if (async_read.finished) {
  1212. assert(async_read.io_handle == nullptr);
  1213. assert(async_read.status.ok());
  1214. return async_read.status;
  1215. }
  1216. {
  1217. std::vector<void*> handles = {async_read.io_handle};
  1218. Status poll_s =
  1219. table_->get_rep()->ioptions.env->GetFileSystem()->Poll(handles, 1);
  1220. if (!poll_s.ok()) {
  1221. return poll_s;
  1222. }
  1223. }
  1224. assert(async_read.status.ok());
  1225. if (!async_read.status.ok()) {
  1226. return async_read.status;
  1227. }
  1228. async_read.CleanUpIOHandle();
  1229. // Initialize and pin blocks from async read result.
  1230. for (size_t i = 0; i < async_read.blocks.size(); ++i) {
  1231. const auto& block = async_read.blocks[i];
  1232. Status s = CreateAndPinBlockFromBuffer(
  1233. block, async_read.offset, async_read.result,
  1234. multi_scan_->pinned_data_blocks[async_read.block_indices[i]]);
  1235. if (!s.ok()) {
  1236. return s;
  1237. }
  1238. assert(multi_scan_->pinned_data_blocks[async_read.block_indices[i]]
  1239. .GetValue());
  1240. }
  1241. assert(multi_scan_->pinned_data_blocks[idx].GetValue());
  1242. return Status::OK();
  1243. }
  1244. Status BlockBasedTableIterator::CreateAndPinBlockFromBuffer(
  1245. const BlockHandle& block, uint64_t buffer_start_offset,
  1246. const Slice& buffer_data, CachableEntry<Block>& pinned_block_entry) {
  1247. // Get decompressor and handle dictionary loading
  1248. UnownedPtr<Decompressor> decompressor = table_->get_rep()->decompressor.get();
  1249. CachableEntry<DecompressorDict> cached_dict;
  1250. if (table_->get_rep()->uncompression_dict_reader) {
  1251. {
  1252. Status s =
  1253. table_->get_rep()
  1254. ->uncompression_dict_reader->GetOrReadUncompressionDictionary(
  1255. /* prefetch_buffer= */ nullptr, read_options_,
  1256. /* get_context= */ nullptr, /* lookup_context= */ nullptr,
  1257. &cached_dict);
  1258. if (!s.ok()) {
  1259. #ifndef NDEBUG
  1260. fprintf(stdout, "Prepare dictionary loading failed with %s\n",
  1261. s.ToString().c_str());
  1262. #endif
  1263. return s;
  1264. }
  1265. }
  1266. if (!cached_dict.GetValue()) {
  1267. #ifndef NDEBUG
  1268. fprintf(stdout, "Success but no dictionary read\n");
  1269. #endif
  1270. return Status::InvalidArgument("No dictionary found");
  1271. }
  1272. decompressor = cached_dict.GetValue()->decompressor_.get();
  1273. }
  1274. // Create block from buffer data
  1275. const auto block_size_with_trailer =
  1276. BlockBasedTable::BlockSizeWithTrailer(block);
  1277. const auto block_offset_in_buffer = block.offset() - buffer_start_offset;
  1278. CacheAllocationPtr data =
  1279. AllocateBlock(block_size_with_trailer,
  1280. GetMemoryAllocator(table_->get_rep()->table_options));
  1281. memcpy(data.get(), buffer_data.data() + block_offset_in_buffer,
  1282. block_size_with_trailer);
  1283. BlockContents tmp_contents(std::move(data), block.size());
  1284. #ifndef NDEBUG
  1285. tmp_contents.has_trailer =
  1286. table_->get_rep()->footer.GetBlockTrailerSize() > 0;
  1287. #endif
  1288. return table_->CreateAndPinBlockInCache<Block_kData>(
  1289. read_options_, block, decompressor, &tmp_contents,
  1290. &pinned_block_entry.As<Block_kData>());
  1291. }
  1292. constexpr auto kVerbose = false;
  1293. Status BlockBasedTableIterator::CollectBlockHandles(
  1294. const std::vector<ScanOptions>& scan_opts,
  1295. std::vector<BlockHandle>* scan_block_handles,
  1296. std::vector<std::tuple<size_t, size_t>>* block_index_ranges_per_scan,
  1297. std::vector<std::string>* data_block_separators) {
  1298. // print file name and level
  1299. if (UNLIKELY(kVerbose)) {
  1300. auto file_name = table_->get_rep()->file->file_name();
  1301. auto level = table_->get_rep()->level;
  1302. printf("file name : %s, level %d\n", file_name.c_str(), level);
  1303. }
  1304. for (const auto& scan_opt : scan_opts) {
  1305. size_t num_blocks = 0;
  1306. bool check_overlap = !scan_block_handles->empty();
  1307. InternalKey start_key;
  1308. const size_t timestamp_size =
  1309. user_comparator_.user_comparator()->timestamp_size();
  1310. if (timestamp_size == 0) {
  1311. start_key = InternalKey(scan_opt.range.start.value(), kMaxSequenceNumber,
  1312. kValueTypeForSeek);
  1313. } else {
  1314. std::string seek_key;
  1315. AppendKeyWithMaxTimestamp(&seek_key, scan_opt.range.start.value(),
  1316. timestamp_size);
  1317. start_key = InternalKey(seek_key, kMaxSequenceNumber, kValueTypeForSeek);
  1318. }
  1319. index_iter_->Seek(start_key.Encode());
  1320. while (index_iter_->status().ok() && index_iter_->Valid() &&
  1321. (!scan_opt.range.limit.has_value() ||
  1322. user_comparator_.CompareWithoutTimestamp(index_iter_->user_key(),
  1323. /*a_has_ts*/ true,
  1324. *scan_opt.range.limit,
  1325. /*b_has_ts=*/false) < 0)) {
  1326. // Only add the block if the index separator is smaller than limit. When
  1327. // they are equal or larger, it will be handled later below.
  1328. if (check_overlap &&
  1329. scan_block_handles->back() == index_iter_->value().handle) {
  1330. // Skip the current block since it's already in the list
  1331. } else {
  1332. scan_block_handles->push_back(index_iter_->value().handle);
  1333. // clone the Slice to avoid the lifetime issue
  1334. data_block_separators->push_back(index_iter_->user_key().ToString());
  1335. }
  1336. ++num_blocks;
  1337. index_iter_->Next();
  1338. check_overlap = false;
  1339. }
  1340. if (!index_iter_->status().ok()) {
  1341. // Abort: index iterator error
  1342. return index_iter_->status();
  1343. }
  1344. if (index_iter_->Valid()) {
  1345. // Handle the last block when its separator is equal or larger than limit
  1346. if (check_overlap &&
  1347. scan_block_handles->back() == index_iter_->value().handle) {
  1348. // Skip adding the current block since it's already in the list
  1349. } else {
  1350. scan_block_handles->push_back(index_iter_->value().handle);
  1351. data_block_separators->push_back(index_iter_->user_key().ToString());
  1352. }
  1353. ++num_blocks;
  1354. }
  1355. block_index_ranges_per_scan->emplace_back(
  1356. scan_block_handles->size() - num_blocks, scan_block_handles->size());
  1357. if (UNLIKELY(kVerbose)) {
  1358. printf("separators :");
  1359. for (const auto& separator : *data_block_separators) {
  1360. printf("%s, ", separator.c_str());
  1361. }
  1362. printf("\nblock_index_ranges_per_scan :");
  1363. for (auto const& block_index_range : *block_index_ranges_per_scan) {
  1364. printf("[%zu, %zu], ", std::get<0>(block_index_range),
  1365. std::get<1>(block_index_range));
  1366. }
  1367. printf("\n");
  1368. }
  1369. }
  1370. return Status::OK();
  1371. }
  1372. Status BlockBasedTableIterator::FilterAndPinCachedBlocks(
  1373. const std::vector<BlockHandle>& scan_block_handles,
  1374. const MultiScanArgs* multiscan_opts,
  1375. std::vector<size_t>* block_indices_to_read,
  1376. std::vector<CachableEntry<Block>>* pinned_data_blocks_guard,
  1377. size_t* prefetched_max_idx) {
  1378. uint64_t total_prefetch_size = 0;
  1379. *prefetched_max_idx = scan_block_handles.size();
  1380. for (size_t i = 0; i < scan_block_handles.size(); ++i) {
  1381. const auto& data_block_handle = scan_block_handles[i];
  1382. total_prefetch_size +=
  1383. BlockBasedTable::BlockSizeWithTrailer(data_block_handle);
  1384. if (multiscan_opts->max_prefetch_size > 0 &&
  1385. total_prefetch_size > multiscan_opts->max_prefetch_size) {
  1386. for (size_t j = i; j < scan_block_handles.size(); ++j) {
  1387. assert((*pinned_data_blocks_guard)[j].IsEmpty());
  1388. }
  1389. *prefetched_max_idx = i;
  1390. break;
  1391. }
  1392. Status s = table_->LookupAndPinBlocksInCache<Block_kData>(
  1393. read_options_, data_block_handle,
  1394. &(*pinned_data_blocks_guard)[i].As<Block_kData>());
  1395. if (!s.ok()) {
  1396. // Abort: block cache look up failed.
  1397. return s;
  1398. }
  1399. if (!(*pinned_data_blocks_guard)[i].GetValue()) {
  1400. // Block not in cache
  1401. block_indices_to_read->emplace_back(i);
  1402. }
  1403. }
  1404. return Status::OK();
  1405. }
  1406. void BlockBasedTableIterator::PrepareIORequests(
  1407. const std::vector<size_t>& block_indices_to_read,
  1408. const std::vector<BlockHandle>& scan_block_handles,
  1409. const MultiScanArgs* multiscan_opts, std::vector<FSReadRequest>* read_reqs,
  1410. UnorderedMap<size_t, size_t>* block_idx_to_readreq_idx,
  1411. std::vector<std::vector<size_t>>* coalesced_block_indices) {
  1412. assert(coalesced_block_indices->empty());
  1413. coalesced_block_indices->resize(1);
  1414. for (const auto& block_idx : block_indices_to_read) {
  1415. if (!coalesced_block_indices->back().empty()) {
  1416. // Check if we can coalesce.
  1417. const auto& last_block_handle =
  1418. scan_block_handles[coalesced_block_indices->back().back()];
  1419. uint64_t last_block_end =
  1420. last_block_handle.offset() +
  1421. BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
  1422. uint64_t current_start = scan_block_handles[block_idx].offset();
  1423. if (current_start >
  1424. last_block_end + multiscan_opts->io_coalesce_threshold) {
  1425. // new IO
  1426. coalesced_block_indices->emplace_back();
  1427. }
  1428. }
  1429. coalesced_block_indices->back().emplace_back(block_idx);
  1430. }
  1431. assert(read_reqs->empty());
  1432. read_reqs->reserve(coalesced_block_indices->size());
  1433. for (const auto& block_indices : *coalesced_block_indices) {
  1434. assert(block_indices.size());
  1435. const auto& first_block_handle = scan_block_handles[block_indices[0]];
  1436. const auto& last_block_handle = scan_block_handles[block_indices.back()];
  1437. const auto start_offset = first_block_handle.offset();
  1438. const auto end_offset =
  1439. last_block_handle.offset() +
  1440. BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
  1441. #ifndef NDEBUG
  1442. // Debug print for failing the assertion below.
  1443. if (start_offset >= end_offset) {
  1444. fprintf(stderr, "scan_block_handles: ");
  1445. for (const auto& block : scan_block_handles) {
  1446. fprintf(stderr, "offset: %" PRIu64 ", size: %" PRIu64 "; ",
  1447. block.offset(), block.size());
  1448. }
  1449. fprintf(stderr,
  1450. "\nfirst block - offset: %" PRIu64 ", size: %" PRIu64 "\n",
  1451. first_block_handle.offset(), first_block_handle.size());
  1452. fprintf(stderr, "last block - offset: %" PRIu64 ", size: %" PRIu64 "\n",
  1453. last_block_handle.offset(), last_block_handle.size());
  1454. fprintf(stderr, "coalesced_block_indices: ");
  1455. for (const auto& b : *coalesced_block_indices) {
  1456. fprintf(stderr, "[");
  1457. for (const auto& block_idx : b) {
  1458. fprintf(stderr, "%zu ", block_idx);
  1459. }
  1460. fprintf(stderr, "] ");
  1461. }
  1462. fprintf(stderr, "\ncurrent blocks: ");
  1463. for (const auto& block_idx : block_indices) {
  1464. fprintf(stderr, "offset: %" PRIu64 ", size: %" PRIu64 "; ",
  1465. scan_block_handles[block_idx].offset(),
  1466. scan_block_handles[block_idx].size());
  1467. }
  1468. fprintf(stderr, "\n");
  1469. }
  1470. #endif // NDEBUG
  1471. assert(end_offset > start_offset);
  1472. read_reqs->emplace_back();
  1473. read_reqs->back().offset = start_offset;
  1474. read_reqs->back().len = end_offset - start_offset;
  1475. if (multiscan_opts->use_async_io) {
  1476. for (const auto& block_idx : block_indices) {
  1477. (*block_idx_to_readreq_idx)[block_idx] = read_reqs->size() - 1;
  1478. }
  1479. }
  1480. }
  1481. }
  1482. Status BlockBasedTableIterator::ExecuteIO(
  1483. const std::vector<BlockHandle>& scan_block_handles,
  1484. const MultiScanArgs* multiscan_opts,
  1485. const std::vector<std::vector<size_t>>& coalesced_block_indices,
  1486. std::vector<FSReadRequest>* read_reqs,
  1487. std::vector<AsyncReadState>* async_states,
  1488. std::vector<CachableEntry<Block>>* pinned_data_blocks_guard) {
  1489. IOOptions io_opts;
  1490. Status s;
  1491. s = table_->get_rep()->file->PrepareIOOptions(read_options_, io_opts);
  1492. if (!s.ok()) {
  1493. // Abort: PrepareIOOptions failed
  1494. return s;
  1495. }
  1496. const bool direct_io = table_->get_rep()->file->use_direct_io();
  1497. if (multiscan_opts->use_async_io) {
  1498. async_states->resize(read_reqs->size());
  1499. for (size_t i = 0; i < read_reqs->size(); ++i) {
  1500. auto& read_req = (*read_reqs)[i];
  1501. auto& async_read = (*async_states)[i];
  1502. async_read.finished = false;
  1503. async_read.offset = read_req.offset;
  1504. async_read.block_indices = coalesced_block_indices[i];
  1505. for (const auto idx : coalesced_block_indices[i]) {
  1506. async_read.blocks.emplace_back(scan_block_handles[idx]);
  1507. }
  1508. if (direct_io) {
  1509. read_req.scratch = nullptr;
  1510. } else {
  1511. async_read.buf.reset(new char[read_req.len]);
  1512. read_req.scratch = async_read.buf.get();
  1513. }
  1514. auto cb = std::bind(&BlockBasedTableIterator::PrepareReadAsyncCallBack,
  1515. this, std::placeholders::_1, std::placeholders::_2);
  1516. // TODO: for mmap, io_handle will not be set but callback will already
  1517. // be called.
  1518. s = table_->get_rep()->file.get()->ReadAsync(
  1519. read_req, io_opts, cb, &async_read, &async_read.io_handle,
  1520. &async_read.del_fn, direct_io ? &async_read.aligned_buf : nullptr);
  1521. if (!s.ok()) {
  1522. #ifndef NDEBUG
  1523. fprintf(stderr, "ReadAsync failed with %s\n", s.ToString().c_str());
  1524. #endif
  1525. assert(false);
  1526. return s;
  1527. }
  1528. assert(async_read.io_handle);
  1529. for (auto& req : *read_reqs) {
  1530. if (!req.status.ok()) {
  1531. assert(false);
  1532. // Silence compiler warning about NRVO
  1533. s = req.status;
  1534. return s;
  1535. }
  1536. }
  1537. }
  1538. } else {
  1539. // Synchronous IO using MultiRead
  1540. std::unique_ptr<char[]> buf;
  1541. if (direct_io) {
  1542. for (auto& read_req : *read_reqs) {
  1543. read_req.scratch = nullptr;
  1544. }
  1545. } else {
  1546. // TODO: optimize if FSSupportedOps::kFSBuffer is supported.
  1547. size_t total_len = 0;
  1548. for (const auto& req : *read_reqs) {
  1549. total_len += req.len;
  1550. }
  1551. buf.reset(new char[total_len]);
  1552. size_t offset = 0;
  1553. for (auto& read_req : *read_reqs) {
  1554. read_req.scratch = buf.get() + offset;
  1555. offset += read_req.len;
  1556. }
  1557. }
  1558. AlignedBuf aligned_buf;
  1559. s = table_->get_rep()->file->MultiRead(io_opts, read_reqs->data(),
  1560. read_reqs->size(),
  1561. direct_io ? &aligned_buf : nullptr);
  1562. if (!s.ok()) {
  1563. return s;
  1564. }
  1565. for (auto& req : *read_reqs) {
  1566. if (!req.status.ok()) {
  1567. // Silence compiler warning about NRVO
  1568. s = req.status;
  1569. return s;
  1570. }
  1571. }
  1572. // Init blocks and pin them in block cache.
  1573. assert(read_reqs->size() == coalesced_block_indices.size());
  1574. for (size_t i = 0; i < coalesced_block_indices.size(); i++) {
  1575. const auto& read_req = (*read_reqs)[i];
  1576. for (const auto& block_idx : coalesced_block_indices[i]) {
  1577. const auto& block = scan_block_handles[block_idx];
  1578. assert((*pinned_data_blocks_guard)[block_idx].IsEmpty());
  1579. s = CreateAndPinBlockFromBuffer(block, read_req.offset, read_req.result,
  1580. (*pinned_data_blocks_guard)[block_idx]);
  1581. if (!s.ok()) {
  1582. assert(false);
  1583. // Abort: failed to create and pin block in cache
  1584. return s;
  1585. }
  1586. assert((*pinned_data_blocks_guard)[block_idx].GetValue());
  1587. }
  1588. }
  1589. }
  1590. return s;
  1591. }
  1592. } // namespace ROCKSDB_NAMESPACE