tiered_compaction_test.cc 119 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. //
  7. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  8. // Use of this source code is governed by a BSD-style license that can be
  9. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  10. #include "db/db_test_util.h"
  11. #include "options/cf_options.h"
  12. #include "port/stack_trace.h"
  13. #include "rocksdb/iostats_context.h"
  14. #include "rocksdb/listener.h"
  15. #include "rocksdb/utilities/debug.h"
  16. #include "rocksdb/utilities/table_properties_collectors.h"
  17. #include "test_util/mock_time_env.h"
  18. #include "util/defer.h"
  19. #include "utilities/merge_operators.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. namespace {
  22. ConfigOptions GetStrictConfigOptions() {
  23. ConfigOptions config_options;
  24. config_options.ignore_unknown_options = false;
  25. config_options.ignore_unsupported_options = false;
  26. config_options.input_strings_escaped = false;
  27. return config_options;
  28. }
  29. } // namespace
  30. class TieredCompactionTest : public DBTestBase {
  31. public:
  32. TieredCompactionTest()
  33. : DBTestBase("tiered_compaction_test", /*env_do_fsync=*/true) {}
  34. protected:
  35. std::atomic_bool enable_per_key_placement = true;
  36. CompactionJobStats job_stats;
  37. void SetUp() override {
  38. SyncPoint::GetInstance()->SetCallBack(
  39. "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
  40. auto supports_per_key_placement = static_cast<bool*>(arg);
  41. *supports_per_key_placement = enable_per_key_placement;
  42. });
  43. SyncPoint::GetInstance()->EnableProcessing();
  44. }
  45. const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
  46. VersionSet* const versions = dbfull()->GetVersionSet();
  47. assert(versions);
  48. assert(versions->GetColumnFamilySet());
  49. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  50. assert(cfd);
  51. const InternalStats* const internal_stats = cfd->internal_stats();
  52. assert(internal_stats);
  53. return internal_stats->TEST_GetCompactionStats();
  54. }
  55. const InternalStats::CompactionStats& GetPerKeyPlacementCompactionStats() {
  56. VersionSet* const versions = dbfull()->GetVersionSet();
  57. assert(versions);
  58. assert(versions->GetColumnFamilySet());
  59. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  60. assert(cfd);
  61. const InternalStats* const internal_stats = cfd->internal_stats();
  62. assert(internal_stats);
  63. return internal_stats->TEST_GetPerKeyPlacementCompactionStats();
  64. }
  65. // Verify the compaction stats, the stats are roughly compared
  66. void VerifyCompactionStats(
  67. const std::vector<InternalStats::CompactionStats>& expected_stats,
  68. const InternalStats::CompactionStats& expected_pl_stats,
  69. size_t output_level, uint64_t num_input_range_del = 0) {
  70. const std::vector<InternalStats::CompactionStats>& stats =
  71. GetCompactionStats();
  72. const size_t kLevels = expected_stats.size();
  73. ASSERT_EQ(kLevels, stats.size());
  74. ASSERT_TRUE(output_level < kLevels);
  75. for (size_t level = 0; level < kLevels; level++) {
  76. VerifyCompactionStats(stats[level], expected_stats[level]);
  77. }
  78. const InternalStats::CompactionStats& pl_stats =
  79. GetPerKeyPlacementCompactionStats();
  80. VerifyCompactionStats(pl_stats, expected_pl_stats);
  81. const auto& output_level_stats = stats[output_level];
  82. CompactionJobStats expected_job_stats;
  83. expected_job_stats.cpu_micros = output_level_stats.cpu_micros;
  84. expected_job_stats.num_input_files =
  85. output_level_stats.num_input_files_in_output_level +
  86. output_level_stats.num_input_files_in_non_output_levels;
  87. expected_job_stats.num_input_records =
  88. output_level_stats.num_input_records - num_input_range_del;
  89. expected_job_stats.num_output_files =
  90. output_level_stats.num_output_files + pl_stats.num_output_files;
  91. expected_job_stats.num_output_records =
  92. output_level_stats.num_output_records + pl_stats.num_output_records;
  93. VerifyCompactionJobStats(job_stats, expected_job_stats);
  94. }
  95. void ResetAllStats(std::vector<InternalStats::CompactionStats>& stats,
  96. InternalStats::CompactionStats& pl_stats) {
  97. ASSERT_OK(dbfull()->ResetStats());
  98. for (auto& level_stats : stats) {
  99. level_stats.Clear();
  100. }
  101. pl_stats.Clear();
  102. }
  103. void SetColdTemperature(Options& options) {
  104. options.last_level_temperature = Temperature::kCold;
  105. }
  106. private:
  107. void VerifyCompactionStats(
  108. const InternalStats::CompactionStats& stats,
  109. const InternalStats::CompactionStats& expect_stats) {
  110. ASSERT_EQ(stats.micros > 0, expect_stats.micros > 0);
  111. ASSERT_EQ(stats.cpu_micros > 0, expect_stats.cpu_micros > 0);
  112. // Hard to get consistent byte sizes of SST files.
  113. // Use ASSERT_NEAR for comparison
  114. ASSERT_NEAR(stats.bytes_read_non_output_levels * 1.0f,
  115. expect_stats.bytes_read_non_output_levels * 1.0f,
  116. stats.bytes_read_non_output_levels * 0.5f);
  117. ASSERT_NEAR(stats.bytes_read_output_level * 1.0f,
  118. expect_stats.bytes_read_output_level * 1.0f,
  119. stats.bytes_read_output_level * 0.5f);
  120. ASSERT_NEAR(stats.bytes_read_blob * 1.0f,
  121. expect_stats.bytes_read_blob * 1.0f,
  122. stats.bytes_read_blob * 0.5f);
  123. ASSERT_NEAR(stats.bytes_written * 1.0f, expect_stats.bytes_written * 1.0f,
  124. stats.bytes_written * 0.5f);
  125. ASSERT_EQ(stats.bytes_moved, expect_stats.bytes_moved);
  126. ASSERT_EQ(stats.num_input_files_in_non_output_levels,
  127. expect_stats.num_input_files_in_non_output_levels);
  128. ASSERT_EQ(stats.num_input_files_in_output_level,
  129. expect_stats.num_input_files_in_output_level);
  130. ASSERT_EQ(stats.num_output_files, expect_stats.num_output_files);
  131. ASSERT_EQ(stats.num_output_files_blob, expect_stats.num_output_files_blob);
  132. ASSERT_EQ(stats.num_input_records, expect_stats.num_input_records);
  133. ASSERT_EQ(stats.num_dropped_records, expect_stats.num_dropped_records);
  134. ASSERT_EQ(stats.num_output_records, expect_stats.num_output_records);
  135. ASSERT_EQ(stats.count, expect_stats.count);
  136. for (int i = 0; i < static_cast<int>(CompactionReason::kNumOfReasons);
  137. i++) {
  138. ASSERT_EQ(stats.counts[i], expect_stats.counts[i]);
  139. }
  140. }
  141. void VerifyCompactionJobStats(const CompactionJobStats& stats,
  142. const CompactionJobStats& expected_stats) {
  143. ASSERT_EQ(stats.cpu_micros, expected_stats.cpu_micros);
  144. ASSERT_EQ(stats.num_input_files, expected_stats.num_input_files);
  145. ASSERT_EQ(stats.num_input_records, expected_stats.num_input_records);
  146. ASSERT_EQ(job_stats.num_output_files, expected_stats.num_output_files);
  147. ASSERT_EQ(job_stats.num_output_records, expected_stats.num_output_records);
  148. }
  149. };
  150. TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
  151. const int kNumTrigger = 4;
  152. const int kNumLevels = 7;
  153. const int kNumKeys = 100;
  154. const int kLastLevel = kNumLevels - 1;
  155. auto options = CurrentOptions();
  156. options.compaction_style = kCompactionStyleUniversal;
  157. SetColdTemperature(options);
  158. options.level0_file_num_compaction_trigger = kNumTrigger;
  159. options.statistics = CreateDBStatistics();
  160. options.max_subcompactions = 10;
  161. DestroyAndReopen(options);
  162. std::atomic_uint64_t latest_cold_seq = 0;
  163. std::vector<SequenceNumber> seq_history;
  164. SyncPoint::GetInstance()->SetCallBack(
  165. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  166. [&](void* arg) {
  167. *static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
  168. });
  169. SyncPoint::GetInstance()->SetCallBack(
  170. "CompactionJob::Install:AfterUpdateCompactionJobStats", [&](void* arg) {
  171. job_stats.Reset();
  172. job_stats.Add(*(static_cast<CompactionJobStats*>(arg)));
  173. });
  174. SyncPoint::GetInstance()->EnableProcessing();
  175. std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
  176. InternalStats::CompactionStats expect_pl_stats;
  177. // Put keys in the following way to create overlaps
  178. // First file from 0 ~ 99
  179. // Second file from 10 ~ 109
  180. // ...
  181. size_t bytes_per_file = 1952;
  182. uint64_t total_input_key_count = kNumTrigger * kNumKeys;
  183. uint64_t total_output_key_count = 130; // 0 ~ 129
  184. for (int i = 0; i < kNumTrigger; i++) {
  185. for (int j = 0; j < kNumKeys; j++) {
  186. ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
  187. }
  188. ASSERT_OK(Flush());
  189. seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
  190. InternalStats::CompactionStats flush_stats(CompactionReason::kFlush, 1);
  191. flush_stats.cpu_micros = 1;
  192. flush_stats.micros = 1;
  193. flush_stats.bytes_written = bytes_per_file;
  194. flush_stats.num_output_files = 1;
  195. flush_stats.num_input_records = kNumKeys;
  196. flush_stats.num_output_records = kNumKeys;
  197. expect_stats[0].Add(flush_stats);
  198. }
  199. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  200. // the penultimate level file temperature is not cold, all data are output to
  201. // the penultimate level.
  202. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  203. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  204. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  205. uint64_t bytes_written_penultimate_level =
  206. GetPerKeyPlacementCompactionStats().bytes_written;
  207. // TODO - Use designated initializer when c++20 support is required
  208. {
  209. InternalStats::CompactionStats last_level_compaction_stats(
  210. CompactionReason::kUniversalSizeAmplification, 1);
  211. last_level_compaction_stats.cpu_micros = 1;
  212. last_level_compaction_stats.micros = 1;
  213. last_level_compaction_stats.bytes_written = 0;
  214. last_level_compaction_stats.bytes_read_non_output_levels =
  215. bytes_per_file * kNumTrigger;
  216. last_level_compaction_stats.num_input_files_in_non_output_levels =
  217. kNumTrigger;
  218. last_level_compaction_stats.num_input_records = total_input_key_count;
  219. last_level_compaction_stats.num_dropped_records =
  220. total_input_key_count - total_output_key_count;
  221. last_level_compaction_stats.num_output_records = 0;
  222. last_level_compaction_stats.num_output_files = 0;
  223. expect_stats[kLastLevel].Add(last_level_compaction_stats);
  224. }
  225. {
  226. InternalStats::CompactionStats penultimate_level_compaction_stats(
  227. CompactionReason::kUniversalSizeAmplification, 1);
  228. penultimate_level_compaction_stats.cpu_micros = 1;
  229. penultimate_level_compaction_stats.micros = 1;
  230. penultimate_level_compaction_stats.bytes_written =
  231. bytes_written_penultimate_level;
  232. penultimate_level_compaction_stats.num_output_files = 1;
  233. penultimate_level_compaction_stats.num_output_records =
  234. total_output_key_count;
  235. expect_pl_stats.Add(penultimate_level_compaction_stats);
  236. }
  237. VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
  238. ResetAllStats(expect_stats, expect_pl_stats);
  239. // move forward the cold_seq to split the file into 2 levels, so should have
  240. // both the last level stats and the penultimate level stats
  241. latest_cold_seq = seq_history[0];
  242. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  243. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  244. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  245. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  246. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  247. // Now update the input count to be the total count from the previous
  248. total_input_key_count = total_output_key_count;
  249. uint64_t moved_to_last_level_key_count = 10;
  250. // bytes read in non output = bytes written in penultimate level from previous
  251. uint64_t bytes_read_in_non_output_level = bytes_written_penultimate_level;
  252. uint64_t bytes_written_output_level =
  253. GetCompactionStats()[kLastLevel].bytes_written;
  254. // Now get the new bytes written in penultimate level
  255. bytes_written_penultimate_level =
  256. GetPerKeyPlacementCompactionStats().bytes_written;
  257. {
  258. InternalStats::CompactionStats last_level_compaction_stats(
  259. CompactionReason::kManualCompaction, 1);
  260. last_level_compaction_stats.cpu_micros = 1;
  261. last_level_compaction_stats.micros = 1;
  262. last_level_compaction_stats.bytes_written = bytes_written_output_level;
  263. last_level_compaction_stats.bytes_read_non_output_levels =
  264. bytes_read_in_non_output_level;
  265. last_level_compaction_stats.num_input_files_in_non_output_levels = 1;
  266. last_level_compaction_stats.num_input_records = total_input_key_count;
  267. last_level_compaction_stats.num_dropped_records =
  268. total_input_key_count - total_output_key_count;
  269. last_level_compaction_stats.num_output_records =
  270. moved_to_last_level_key_count;
  271. last_level_compaction_stats.num_output_files = 1;
  272. expect_stats[kLastLevel].Add(last_level_compaction_stats);
  273. }
  274. {
  275. InternalStats::CompactionStats penultimate_level_compaction_stats(
  276. CompactionReason::kManualCompaction, 1);
  277. penultimate_level_compaction_stats.cpu_micros = 1;
  278. penultimate_level_compaction_stats.micros = 1;
  279. penultimate_level_compaction_stats.bytes_written =
  280. bytes_written_penultimate_level;
  281. penultimate_level_compaction_stats.num_output_files = 1;
  282. penultimate_level_compaction_stats.num_output_records =
  283. total_output_key_count - moved_to_last_level_key_count;
  284. expect_pl_stats.Add(penultimate_level_compaction_stats);
  285. }
  286. VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
  287. // delete all cold data, so all data will be on proximal level
  288. for (int i = 0; i < 10; i++) {
  289. ASSERT_OK(Delete(Key(i)));
  290. }
  291. ASSERT_OK(Flush());
  292. ResetAllStats(expect_stats, expect_pl_stats);
  293. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  294. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  295. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  296. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  297. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  298. // 10 tombstones added
  299. total_input_key_count = total_input_key_count + 10;
  300. total_output_key_count = total_output_key_count - 10;
  301. auto last_level_stats = GetCompactionStats()[kLastLevel];
  302. bytes_written_penultimate_level =
  303. GetPerKeyPlacementCompactionStats().bytes_written;
  304. ASSERT_LT(bytes_written_penultimate_level,
  305. last_level_stats.bytes_read_non_output_levels +
  306. last_level_stats.bytes_read_output_level);
  307. {
  308. InternalStats::CompactionStats last_level_compaction_stats(
  309. CompactionReason::kManualCompaction, 1);
  310. last_level_compaction_stats.cpu_micros = 1;
  311. last_level_compaction_stats.micros = 1;
  312. last_level_compaction_stats.bytes_written = 0;
  313. last_level_compaction_stats.bytes_read_non_output_levels =
  314. last_level_stats.bytes_read_non_output_levels;
  315. last_level_compaction_stats.bytes_read_output_level =
  316. last_level_stats.bytes_read_output_level;
  317. last_level_compaction_stats.num_input_files_in_non_output_levels = 2;
  318. last_level_compaction_stats.num_input_files_in_output_level = 1;
  319. last_level_compaction_stats.num_input_records = total_input_key_count;
  320. last_level_compaction_stats.num_dropped_records =
  321. total_input_key_count - total_output_key_count;
  322. last_level_compaction_stats.num_output_records = 0;
  323. last_level_compaction_stats.num_output_files = 0;
  324. expect_stats[kLastLevel].Add(last_level_compaction_stats);
  325. }
  326. {
  327. InternalStats::CompactionStats penultimate_level_compaction_stats(
  328. CompactionReason::kManualCompaction, 1);
  329. penultimate_level_compaction_stats.cpu_micros = 1;
  330. penultimate_level_compaction_stats.micros = 1;
  331. penultimate_level_compaction_stats.bytes_written =
  332. bytes_written_penultimate_level;
  333. penultimate_level_compaction_stats.num_output_files = 1;
  334. penultimate_level_compaction_stats.num_output_records =
  335. total_output_key_count;
  336. expect_pl_stats.Add(penultimate_level_compaction_stats);
  337. }
  338. VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
  339. // move forward the cold_seq again with range delete, take a snapshot to keep
  340. // the range dels in both cold and hot SSTs
  341. auto snap = db_->GetSnapshot();
  342. latest_cold_seq = seq_history[2];
  343. std::string start = Key(25), end = Key(35);
  344. ASSERT_OK(
  345. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  346. ASSERT_OK(Flush());
  347. uint64_t num_input_range_del = 1;
  348. ResetAllStats(expect_stats, expect_pl_stats);
  349. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  350. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  351. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  352. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  353. // Previous output + one delete range
  354. total_input_key_count = total_output_key_count + num_input_range_del;
  355. moved_to_last_level_key_count = 20;
  356. last_level_stats = GetCompactionStats()[kLastLevel];
  357. bytes_written_penultimate_level =
  358. GetPerKeyPlacementCompactionStats().bytes_written;
  359. // Expected to write more in last level
  360. ASSERT_GT(bytes_written_penultimate_level, last_level_stats.bytes_written);
  361. {
  362. InternalStats::CompactionStats last_level_compaction_stats(
  363. CompactionReason::kManualCompaction, 1);
  364. last_level_compaction_stats.cpu_micros = 1;
  365. last_level_compaction_stats.micros = 1;
  366. last_level_compaction_stats.bytes_written = last_level_stats.bytes_written;
  367. last_level_compaction_stats.bytes_read_non_output_levels =
  368. last_level_stats.bytes_read_non_output_levels;
  369. last_level_compaction_stats.bytes_read_output_level = 0;
  370. last_level_compaction_stats.num_input_files_in_non_output_levels = 2;
  371. last_level_compaction_stats.num_input_files_in_output_level = 0;
  372. last_level_compaction_stats.num_input_records = total_input_key_count;
  373. last_level_compaction_stats.num_dropped_records =
  374. num_input_range_del; // delete range tombstone
  375. last_level_compaction_stats.num_output_records =
  376. moved_to_last_level_key_count;
  377. last_level_compaction_stats.num_output_files = 1;
  378. expect_stats[kLastLevel].Add(last_level_compaction_stats);
  379. }
  380. {
  381. InternalStats::CompactionStats penultimate_level_compaction_stats(
  382. CompactionReason::kManualCompaction, 1);
  383. penultimate_level_compaction_stats.cpu_micros = 1;
  384. penultimate_level_compaction_stats.micros = 1;
  385. penultimate_level_compaction_stats.bytes_written =
  386. bytes_written_penultimate_level;
  387. penultimate_level_compaction_stats.num_output_files = 1;
  388. penultimate_level_compaction_stats.num_output_records =
  389. total_input_key_count - moved_to_last_level_key_count -
  390. num_input_range_del;
  391. expect_pl_stats.Add(penultimate_level_compaction_stats);
  392. }
  393. VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel,
  394. num_input_range_del);
  395. // verify data
  396. std::string value;
  397. for (int i = 0; i < kNumKeys; i++) {
  398. if (i < 10 || (i >= 25 && i < 35)) {
  399. ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound());
  400. } else {
  401. ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
  402. }
  403. }
  404. // range delete all hot data
  405. start = Key(30);
  406. end = Key(130);
  407. ASSERT_OK(
  408. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  409. ASSERT_OK(Flush());
  410. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  411. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  412. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  413. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  414. // no range del is dropped because of snapshot
  415. ASSERT_EQ(
  416. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  417. 0);
  418. // release the snapshot and do compaction again should remove all hot data
  419. db_->ReleaseSnapshot(snap);
  420. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  421. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  422. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  423. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  424. // 2 range dels are dropped
  425. ASSERT_EQ(
  426. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  427. 3);
  428. // move backward the cold_seq, for example the user may change the setting of
  429. // hot/cold data, but it won't impact the existing cold data, as the sequence
  430. // number is zeroed out.
  431. latest_cold_seq = seq_history[1];
  432. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  433. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  434. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  435. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  436. }
  437. // This test was essentially for a hacked-up version on future functionality.
  438. // It can be resurrected if/when a form of range-based tiering is properly
  439. // implemented.
  440. // TODO - Add stats verification when adding this test back
  441. TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageUniversal) {
  442. const int kNumTrigger = 4;
  443. const int kNumLevels = 7;
  444. const int kNumKeys = 100;
  445. auto options = CurrentOptions();
  446. options.compaction_style = kCompactionStyleUniversal;
  447. SetColdTemperature(options);
  448. options.level0_file_num_compaction_trigger = kNumTrigger;
  449. options.statistics = CreateDBStatistics();
  450. options.max_subcompactions = 10;
  451. DestroyAndReopen(options);
  452. auto cmp = options.comparator;
  453. port::Mutex mutex;
  454. std::string hot_start = Key(10);
  455. std::string hot_end = Key(50);
  456. SyncPoint::GetInstance()->SetCallBack(
  457. "CompactionIterator::PrepareOutput.context", [&](void* arg) {
  458. auto context = static_cast<PerKeyPlacementContext*>(arg);
  459. MutexLock l(&mutex);
  460. context->output_to_proximal_level =
  461. cmp->Compare(context->key, hot_start) >= 0 &&
  462. cmp->Compare(context->key, hot_end) < 0;
  463. });
  464. SyncPoint::GetInstance()->EnableProcessing();
  465. std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
  466. InternalStats::CompactionStats expect_pl_stats;
  467. for (int i = 0; i < kNumTrigger; i++) {
  468. for (int j = 0; j < kNumKeys; j++) {
  469. ASSERT_OK(Put(Key(j), "value" + std::to_string(j)));
  470. }
  471. ASSERT_OK(Flush());
  472. }
  473. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  474. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  475. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  476. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  477. ResetAllStats(expect_stats, expect_pl_stats);
  478. // change to all cold, no output_to_proximal_level output
  479. {
  480. MutexLock l(&mutex);
  481. hot_start = Key(100);
  482. hot_end = Key(200);
  483. }
  484. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  485. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  486. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  487. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  488. // change to all hot, universal compaction support moving data to up level if
  489. // it's within compaction level range.
  490. {
  491. MutexLock l(&mutex);
  492. hot_start = Key(0);
  493. hot_end = Key(100);
  494. }
  495. // No data is moved from cold tier to hot tier because no input files from L5
  496. // or higher, it's not safe to move data to output_to_proximal_level level.
  497. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  498. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  499. // Add 2 keys in higher level, but in separated files, all keys can be moved
  500. // up if it's hot
  501. ASSERT_OK(Put(Key(0), "value" + std::to_string(0)));
  502. ASSERT_OK(Flush());
  503. ASSERT_OK(Put(Key(50), "value" + std::to_string(0)));
  504. ASSERT_OK(Flush());
  505. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  506. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  507. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  508. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  509. // change to only 1 key cold, to test compaction could stop even it matches
  510. // size amp compaction threshold
  511. {
  512. MutexLock l(&mutex);
  513. hot_start = Key(1);
  514. hot_end = Key(300);
  515. }
  516. // generate files just enough to trigger compaction
  517. for (int i = 0; i < kNumTrigger - 1; i++) {
  518. for (int j = 0; j < 300; j++) {
  519. ASSERT_OK(Put(Key(j), "value" + std::to_string(j)));
  520. }
  521. ASSERT_OK(Flush());
  522. }
  523. // make sure the compaction is able to finish
  524. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  525. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  526. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  527. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  528. auto opts = db_->GetOptions();
  529. auto max_size_amp =
  530. opts.compaction_options_universal.max_size_amplification_percent / 100;
  531. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown),
  532. GetSstSizeHelper(Temperature::kCold) * max_size_amp);
  533. // delete all cold data
  534. ASSERT_OK(Delete(Key(0)));
  535. ASSERT_OK(Flush());
  536. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  537. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  538. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  539. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  540. // range delete overlap with both hot/cold data, with a snapshot to make sure
  541. // the range del is saved
  542. auto snap = db_->GetSnapshot();
  543. {
  544. MutexLock l(&mutex);
  545. hot_start = Key(50);
  546. hot_end = Key(100);
  547. }
  548. std::string start = Key(1), end = Key(70);
  549. ASSERT_OK(
  550. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  551. ASSERT_OK(Flush());
  552. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  553. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  554. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  555. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  556. // no range del is dropped until snapshot is released
  557. ASSERT_EQ(
  558. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  559. 0);
  560. // verify data
  561. std::string value;
  562. for (int i = 0; i < kNumKeys; i++) {
  563. SCOPED_TRACE(Key(i));
  564. if (i < 70) {
  565. ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound());
  566. } else {
  567. ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
  568. }
  569. }
  570. db_->ReleaseSnapshot(snap);
  571. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  572. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  573. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  574. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  575. // range del is dropped
  576. ASSERT_EQ(
  577. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  578. 1);
  579. }
  580. TEST_F(TieredCompactionTest, LevelColdRangeDelete) {
  581. const int kNumTrigger = 4;
  582. const int kNumLevels = 7;
  583. const int kNumKeys = 100;
  584. const int kLastLevel = kNumLevels - 1;
  585. auto options = CurrentOptions();
  586. SetColdTemperature(options);
  587. options.level0_file_num_compaction_trigger = kNumTrigger;
  588. options.num_levels = kNumLevels;
  589. options.statistics = CreateDBStatistics();
  590. options.max_subcompactions = 10;
  591. DestroyAndReopen(options);
  592. // Initially let everything into cold
  593. std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;
  594. SyncPoint::GetInstance()->SetCallBack(
  595. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  596. [&](void* arg) {
  597. *static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
  598. });
  599. SyncPoint::GetInstance()->EnableProcessing();
  600. for (int i = 0; i < kNumKeys; i++) {
  601. ASSERT_OK(Put(Key(i), "value" + std::to_string(i)));
  602. }
  603. ASSERT_OK(Flush());
  604. CompactRangeOptions cro;
  605. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  606. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  607. ASSERT_EQ("0,1",
  608. FilesPerLevel()); // bottommost but not last level file is hot
  609. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  610. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  611. // explicitly move the data to the last level
  612. MoveFilesToLevel(kLastLevel);
  613. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  614. auto snap = db_->GetSnapshot();
  615. std::string start = Key(10);
  616. std::string end = Key(50);
  617. ASSERT_OK(
  618. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  619. // 20->30 will be marked as cold data, but it cannot be placed to cold tier
  620. // (bottommost) otherwise, it will be "deleted" by the range del in
  621. // output_to_proximal_level level verify that these data will be able to
  622. // queried
  623. for (int i = 20; i < 30; i++) {
  624. ASSERT_OK(Put(Key(i), "value" + std::to_string(i)));
  625. }
  626. // make the range tombstone and data after that cold
  627. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  628. // add home hot data, just for test
  629. for (int i = 30; i < 40; i++) {
  630. ASSERT_OK(Put(Key(i), "value" + std::to_string(i)));
  631. }
  632. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  633. std::string value;
  634. for (int i = 0; i < kNumKeys; i++) {
  635. auto s = db_->Get(ReadOptions(), Key(i), &value);
  636. if ((i >= 10 && i < 20) || (i >= 40 && i < 50)) {
  637. ASSERT_TRUE(s.IsNotFound());
  638. } else {
  639. ASSERT_OK(s);
  640. }
  641. }
  642. db_->ReleaseSnapshot(snap);
  643. }
  644. // Test SST partitioner cut after every single key
  645. class SingleKeySstPartitioner : public SstPartitioner {
  646. public:
  647. const char* Name() const override { return "SingleKeySstPartitioner"; }
  648. PartitionerResult ShouldPartition(
  649. const PartitionerRequest& /*request*/) override {
  650. return kRequired;
  651. }
  652. bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
  653. const Slice& /*largest_user_key*/) override {
  654. return false;
  655. }
  656. };
  657. class SingleKeySstPartitionerFactory : public SstPartitionerFactory {
  658. public:
  659. static const char* kClassName() { return "SingleKeySstPartitionerFactory"; }
  660. const char* Name() const override { return kClassName(); }
  661. std::unique_ptr<SstPartitioner> CreatePartitioner(
  662. const SstPartitioner::Context& /* context */) const override {
  663. return std::unique_ptr<SstPartitioner>(new SingleKeySstPartitioner());
  664. }
  665. };
  666. TEST_F(TieredCompactionTest, LevelOutofBoundaryRangeDelete) {
  667. const int kNumTrigger = 4;
  668. const int kNumLevels = 3;
  669. const int kNumKeys = 10;
  670. auto factory = std::make_shared<SingleKeySstPartitionerFactory>();
  671. auto options = CurrentOptions();
  672. SetColdTemperature(options);
  673. options.level0_file_num_compaction_trigger = kNumTrigger;
  674. options.num_levels = kNumLevels;
  675. options.statistics = CreateDBStatistics();
  676. options.sst_partitioner_factory = factory;
  677. options.max_subcompactions = 10;
  678. DestroyAndReopen(options);
  679. // Initially let everything into cold
  680. std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;
  681. SyncPoint::GetInstance()->SetCallBack(
  682. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  683. [&](void* arg) {
  684. *static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
  685. });
  686. SyncPoint::GetInstance()->EnableProcessing();
  687. for (int i = 0; i < kNumKeys; i++) {
  688. ASSERT_OK(Put(Key(i), "value" + std::to_string(i)));
  689. }
  690. ASSERT_OK(Flush());
  691. MoveFilesToLevel(kNumLevels - 1);
  692. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  693. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  694. ASSERT_EQ("0,0,10", FilesPerLevel());
  695. // Stop admitting to cold tier
  696. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  697. auto snap = db_->GetSnapshot();
  698. // only range delete
  699. std::string start = Key(3);
  700. std::string end = Key(5);
  701. ASSERT_OK(
  702. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  703. ASSERT_OK(Flush());
  704. CompactRangeOptions cro;
  705. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  706. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  707. // range tombstone is not in cold tier
  708. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  709. std::vector<std::vector<FileMetaData>> level_to_files;
  710. dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
  711. &level_to_files);
  712. // range tombstone is in the proximal level
  713. const int proximal_level = kNumLevels - 2;
  714. ASSERT_EQ(level_to_files[proximal_level].size(), 1);
  715. ASSERT_EQ(level_to_files[proximal_level][0].num_entries, 1);
  716. ASSERT_EQ(level_to_files[proximal_level][0].num_deletions, 1);
  717. ASSERT_EQ(level_to_files[proximal_level][0].temperature,
  718. Temperature::kUnknown);
  719. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  720. ASSERT_EQ("0,1,10",
  721. FilesPerLevel()); // one file is at the proximal level which
  722. // only contains a range delete
  723. // Add 2 hot keys, each is a new SST, they will be placed in the same level as
  724. // range del, but they don't have overlap with range del, make sure the range
  725. // del will still be placed there
  726. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  727. ASSERT_OK(Put(Key(0), "new value" + std::to_string(0)));
  728. auto snap2 = db_->GetSnapshot();
  729. ASSERT_OK(Put(Key(6), "new value" + std::to_string(6)));
  730. ASSERT_OK(Flush());
  731. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  732. ASSERT_EQ("0,2,10",
  733. FilesPerLevel()); // one file is at the proximal level
  734. // which only contains a range delete
  735. std::vector<LiveFileMetaData> live_file_meta;
  736. db_->GetLiveFilesMetaData(&live_file_meta);
  737. bool found_sst_with_del = false;
  738. uint64_t sst_with_del_num = 0;
  739. for (const auto& meta : live_file_meta) {
  740. if (meta.num_deletions > 0) {
  741. // found SST with del, which has 2 entries, one for data one for range del
  742. ASSERT_EQ(meta.level,
  743. kNumLevels - 2); // output to proximal level
  744. ASSERT_EQ(meta.num_entries, 2);
  745. ASSERT_EQ(meta.num_deletions, 1);
  746. found_sst_with_del = true;
  747. sst_with_del_num = meta.file_number;
  748. }
  749. }
  750. ASSERT_TRUE(found_sst_with_del);
  751. // release the first snapshot and compact, which should compact the range del
  752. // but new inserted key `0` and `6` are still hot data which will be placed on
  753. // the proximal level
  754. db_->ReleaseSnapshot(snap);
  755. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  756. ASSERT_EQ("0,2,7", FilesPerLevel());
  757. db_->GetLiveFilesMetaData(&live_file_meta);
  758. found_sst_with_del = false;
  759. for (const auto& meta : live_file_meta) {
  760. // check new SST with del (the old one may not yet be deleted after
  761. // compaction)
  762. if (meta.num_deletions > 0 && meta.file_number != sst_with_del_num) {
  763. found_sst_with_del = true;
  764. }
  765. }
  766. ASSERT_FALSE(found_sst_with_del);
  767. // Now make all data cold, key 0 will be moved to the last level, but key 6 is
  768. // still in snap2, so it will be kept at the proximal level
  769. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  770. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  771. ASSERT_EQ("0,1,8", FilesPerLevel());
  772. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  773. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  774. db_->ReleaseSnapshot(snap2);
  775. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  776. ASSERT_EQ("0,0,8", FilesPerLevel());
  777. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  778. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  779. }
  780. TEST_F(TieredCompactionTest, UniversalRangeDelete) {
  781. const int kNumTrigger = 4;
  782. const int kNumLevels = 7;
  783. const int kNumKeys = 10;
  784. auto factory = std::make_shared<SingleKeySstPartitionerFactory>();
  785. auto options = CurrentOptions();
  786. options.compaction_style = kCompactionStyleUniversal;
  787. SetColdTemperature(options);
  788. options.level0_file_num_compaction_trigger = kNumTrigger;
  789. options.statistics = CreateDBStatistics();
  790. options.sst_partitioner_factory = factory;
  791. options.max_subcompactions = 10;
  792. DestroyAndReopen(options);
  793. std::atomic_uint64_t latest_cold_seq = 0;
  794. SyncPoint::GetInstance()->SetCallBack(
  795. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  796. [&](void* arg) {
  797. *static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
  798. });
  799. SyncPoint::GetInstance()->EnableProcessing();
  800. for (int i = 0; i < kNumKeys; i++) {
  801. ASSERT_OK(Put(Key(i), "value" + std::to_string(i)));
  802. }
  803. ASSERT_OK(Flush());
  804. // compact to the proximal level with 10 files
  805. CompactRangeOptions cro;
  806. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  807. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  808. ASSERT_EQ("0,0,0,0,0,10", FilesPerLevel());
  809. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  810. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  811. // make all data cold
  812. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  813. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  814. ASSERT_EQ("0,0,0,0,0,0,10", FilesPerLevel());
  815. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  816. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  817. // range del which considered as hot data, but it will be merged and deleted
  818. // with the last level data
  819. std::string start = Key(3);
  820. std::string end = Key(5);
  821. ASSERT_OK(
  822. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  823. ASSERT_OK(Flush());
  824. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  825. ASSERT_EQ("0,0,0,0,0,0,8", FilesPerLevel());
  826. // range del with snapshot should be preserved in the proximal level
  827. auto snap = db_->GetSnapshot();
  828. start = Key(6);
  829. end = Key(8);
  830. ASSERT_OK(
  831. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  832. ASSERT_OK(Flush());
  833. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  834. ASSERT_EQ("0,0,0,0,0,1,8", FilesPerLevel());
  835. // Add 2 hot keys, each is a new SST, they will be placed in the same level as
  836. // range del, but no overlap with range del.
  837. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  838. ASSERT_OK(Put(Key(4), "new value" + std::to_string(0)));
  839. auto snap2 = db_->GetSnapshot();
  840. ASSERT_OK(Put(Key(9), "new value" + std::to_string(6)));
  841. ASSERT_OK(Flush());
  842. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  843. ASSERT_EQ("0,0,0,0,0,2,8", FilesPerLevel());
  844. // find the SST with range del
  845. std::vector<LiveFileMetaData> live_file_meta;
  846. db_->GetLiveFilesMetaData(&live_file_meta);
  847. bool found_sst_with_del = false;
  848. uint64_t sst_with_del_num = 0;
  849. for (const auto& meta : live_file_meta) {
  850. if (meta.num_deletions > 0) {
  851. // found SST with del, which has 2 entries, one for data one for range del
  852. ASSERT_EQ(meta.level,
  853. kNumLevels - 2); // output_to_proximal_level level
  854. ASSERT_EQ(meta.num_entries, 2);
  855. ASSERT_EQ(meta.num_deletions, 1);
  856. found_sst_with_del = true;
  857. sst_with_del_num = meta.file_number;
  858. }
  859. }
  860. ASSERT_TRUE(found_sst_with_del);
  861. // release the first snapshot which should compact the range del, but data on
  862. // the same level is still hot
  863. db_->ReleaseSnapshot(snap);
  864. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  865. ASSERT_EQ("0,0,0,0,0,2,6", FilesPerLevel());
  866. db_->GetLiveFilesMetaData(&live_file_meta);
  867. // no range del should be found in SST
  868. found_sst_with_del = false;
  869. for (const auto& meta : live_file_meta) {
  870. // check new SST with del (the old one may not yet be deleted after
  871. // compaction)
  872. if (meta.num_deletions > 0 && meta.file_number != sst_with_del_num) {
  873. found_sst_with_del = true;
  874. }
  875. }
  876. ASSERT_FALSE(found_sst_with_del);
  877. // make all data to cold, but key 6 is still protected by snap2
  878. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  879. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  880. ASSERT_EQ("0,0,0,0,0,1,7", FilesPerLevel());
  881. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  882. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  883. db_->ReleaseSnapshot(snap2);
  884. // release snapshot, everything go to bottommost
  885. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  886. ASSERT_EQ("0,0,0,0,0,0,7", FilesPerLevel());
  887. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  888. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  889. }
  890. TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
  891. const int kNumTrigger = 4;
  892. const int kNumLevels = 7;
  893. const int kNumKeys = 100;
  894. const int kLastLevel = kNumLevels - 1;
  895. int output_level = 0;
  896. auto options = CurrentOptions();
  897. SetColdTemperature(options);
  898. options.level0_file_num_compaction_trigger = kNumTrigger;
  899. options.num_levels = kNumLevels;
  900. options.statistics = CreateDBStatistics();
  901. options.max_subcompactions = 10;
  902. DestroyAndReopen(options);
  903. std::atomic_uint64_t latest_cold_seq = kMaxSequenceNumber;
  904. std::vector<SequenceNumber> seq_history;
  905. SyncPoint::GetInstance()->SetCallBack(
  906. "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
  907. [&](void* arg) {
  908. *static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
  909. });
  910. SyncPoint::GetInstance()->SetCallBack(
  911. "CompactionJob::Install:AfterUpdateCompactionJobStats", [&](void* arg) {
  912. job_stats.Reset();
  913. job_stats.Add(*(static_cast<CompactionJobStats*>(arg)));
  914. });
  915. SyncPoint::GetInstance()->SetCallBack(
  916. "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
  917. auto compaction = static_cast<Compaction*>(arg);
  918. output_level = compaction->output_level();
  919. });
  920. SyncPoint::GetInstance()->EnableProcessing();
  921. std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
  922. InternalStats::CompactionStats expect_pl_stats;
  923. // Put keys in the following way to create overlaps
  924. // First file from 0 ~ 99
  925. // Second file from 10 ~ 109
  926. // ...
  927. size_t bytes_per_file = 1952;
  928. uint64_t total_input_key_count = kNumTrigger * kNumKeys;
  929. uint64_t total_output_key_count = 130; // 0 ~ 129
  930. for (int i = 0; i < kNumTrigger; i++) {
  931. for (int j = 0; j < kNumKeys; j++) {
  932. ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
  933. }
  934. ASSERT_OK(Flush());
  935. InternalStats::CompactionStats flush_stats(CompactionReason::kFlush, 1);
  936. flush_stats.cpu_micros = 1;
  937. flush_stats.micros = 1;
  938. flush_stats.bytes_written = bytes_per_file;
  939. flush_stats.num_output_files = 1;
  940. flush_stats.num_input_records = kNumKeys;
  941. flush_stats.num_output_records = kNumKeys;
  942. expect_stats[0].Add(flush_stats);
  943. }
  944. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  945. // non last level is hot
  946. ASSERT_EQ("0,1", FilesPerLevel());
  947. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  948. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  949. uint64_t bytes_written_output_level =
  950. GetCompactionStats()[output_level].bytes_written;
  951. ASSERT_GT(bytes_written_output_level, 0);
  952. {
  953. InternalStats::CompactionStats output_level_compaction_stats(
  954. CompactionReason::kLevelL0FilesNum, 1);
  955. output_level_compaction_stats.cpu_micros = 1;
  956. output_level_compaction_stats.micros = 1;
  957. output_level_compaction_stats.bytes_written = bytes_written_output_level;
  958. output_level_compaction_stats.bytes_read_non_output_levels =
  959. bytes_per_file * kNumTrigger;
  960. output_level_compaction_stats.bytes_read_output_level = 0;
  961. output_level_compaction_stats.num_input_files_in_non_output_levels =
  962. kNumTrigger;
  963. output_level_compaction_stats.num_input_files_in_output_level = 0;
  964. output_level_compaction_stats.num_input_records = total_input_key_count;
  965. output_level_compaction_stats.num_dropped_records =
  966. total_input_key_count - total_output_key_count;
  967. output_level_compaction_stats.num_output_records = total_output_key_count;
  968. output_level_compaction_stats.num_output_files = 1;
  969. expect_stats[output_level].Add(output_level_compaction_stats);
  970. }
  971. VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
  972. // move all data to the last level
  973. MoveFilesToLevel(kLastLevel);
  974. ResetAllStats(expect_stats, expect_pl_stats);
  975. // The compaction won't move the data up
  976. CompactRangeOptions cro;
  977. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  978. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  979. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  980. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  981. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  982. total_input_key_count = total_output_key_count;
  983. {
  984. InternalStats::CompactionStats output_level_compaction_stats(
  985. CompactionReason::kManualCompaction, 1);
  986. output_level_compaction_stats.cpu_micros = 1;
  987. output_level_compaction_stats.micros = 1;
  988. output_level_compaction_stats.bytes_written = bytes_written_output_level;
  989. output_level_compaction_stats.bytes_read_non_output_levels = 0;
  990. output_level_compaction_stats.bytes_read_output_level =
  991. bytes_written_output_level;
  992. output_level_compaction_stats.num_input_files_in_non_output_levels = 0;
  993. output_level_compaction_stats.num_input_files_in_output_level = 1;
  994. output_level_compaction_stats.num_input_records = total_input_key_count;
  995. output_level_compaction_stats.num_dropped_records =
  996. total_input_key_count - total_output_key_count;
  997. output_level_compaction_stats.num_output_records = total_output_key_count;
  998. output_level_compaction_stats.num_output_files = 1;
  999. expect_stats[output_level].Add(output_level_compaction_stats);
  1000. }
  1001. VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
  1002. // Add new data, which is all hot and overriding all existing data
  1003. latest_cold_seq = dbfull()->GetLatestSequenceNumber();
  1004. for (int i = 0; i < kNumTrigger; i++) {
  1005. for (int j = 0; j < kNumKeys; j++) {
  1006. ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
  1007. }
  1008. ASSERT_OK(Flush());
  1009. seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
  1010. }
  1011. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1012. ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel());
  1013. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1014. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1015. ResetAllStats(expect_stats, expect_pl_stats);
  1016. // after compaction, all data are hot
  1017. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1018. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1019. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1020. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1021. uint64_t bytes_written_in_proximal_level =
  1022. GetPerKeyPlacementCompactionStats().bytes_written;
  1023. for (int level = 2; level < kNumLevels - 1; level++) {
  1024. expect_stats[level].bytes_moved = bytes_written_in_proximal_level;
  1025. }
  1026. // Another set of 130 keys + from the previous
  1027. total_input_key_count = total_output_key_count + 130;
  1028. // Merged into 130
  1029. total_output_key_count = 130;
  1030. {
  1031. InternalStats::CompactionStats output_level_compaction_stats(
  1032. CompactionReason::kManualCompaction, 1);
  1033. output_level_compaction_stats.cpu_micros = 1;
  1034. output_level_compaction_stats.micros = 1;
  1035. output_level_compaction_stats.bytes_written = 0;
  1036. output_level_compaction_stats.bytes_read_non_output_levels =
  1037. bytes_written_in_proximal_level;
  1038. output_level_compaction_stats.bytes_read_output_level =
  1039. bytes_written_output_level;
  1040. output_level_compaction_stats.num_input_files_in_non_output_levels = 1;
  1041. output_level_compaction_stats.num_input_files_in_output_level = 1;
  1042. output_level_compaction_stats.num_input_records = total_input_key_count;
  1043. output_level_compaction_stats.num_dropped_records =
  1044. total_input_key_count - total_output_key_count;
  1045. output_level_compaction_stats.num_output_records = 0;
  1046. output_level_compaction_stats.num_output_files = 0;
  1047. expect_stats[output_level].Add(output_level_compaction_stats);
  1048. }
  1049. {
  1050. InternalStats::CompactionStats proximal_level_compaction_stats(
  1051. CompactionReason::kManualCompaction, 1);
  1052. expect_pl_stats.cpu_micros = 1;
  1053. expect_pl_stats.micros = 1;
  1054. expect_pl_stats.bytes_written = bytes_written_in_proximal_level;
  1055. expect_pl_stats.num_output_files = 1;
  1056. expect_pl_stats.num_output_records = total_output_key_count;
  1057. expect_pl_stats.Add(proximal_level_compaction_stats);
  1058. }
  1059. VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
  1060. // move forward the cold_seq, try to split the data into cold and hot, but in
  1061. // this case it's unsafe to split the data
  1062. // because it's non-last-level but bottommost file, the sequence number will
  1063. // be zeroed out and lost the time information (with
  1064. // `level_compaction_dynamic_level_bytes` or Universal Compaction, it should
  1065. // be rare.)
  1066. // TODO(zjay): ideally we should avoid zero out non-last-level bottommost file
  1067. latest_cold_seq = seq_history[1];
  1068. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1069. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1070. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1071. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1072. seq_history.clear();
  1073. // manually move all data (cold) to last level
  1074. MoveFilesToLevel(kLastLevel);
  1075. seq_history.clear();
  1076. // Add new data once again
  1077. for (int i = 0; i < kNumTrigger; i++) {
  1078. for (int j = 0; j < kNumKeys; j++) {
  1079. ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
  1080. }
  1081. ASSERT_OK(Flush());
  1082. seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
  1083. }
  1084. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1085. latest_cold_seq = seq_history[0];
  1086. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1087. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1088. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1089. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1090. // delete all cold data
  1091. for (int i = 0; i < 10; i++) {
  1092. ASSERT_OK(Delete(Key(i)));
  1093. }
  1094. ASSERT_OK(Flush());
  1095. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1096. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1097. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1098. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1099. latest_cold_seq = seq_history[2];
  1100. MoveFilesToLevel(kLastLevel);
  1101. // move forward the cold_seq again with range delete, take a snapshot to keep
  1102. // the range dels in bottommost
  1103. auto snap = db_->GetSnapshot();
  1104. std::string start = Key(25), end = Key(35);
  1105. ASSERT_OK(
  1106. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  1107. // add one small key and large key in the input level, to make sure it's able
  1108. // to move hot data to input level within that range
  1109. ASSERT_OK(Put(Key(0), "value" + std::to_string(0)));
  1110. ASSERT_OK(Put(Key(100), "value" + std::to_string(0)));
  1111. ASSERT_OK(Flush());
  1112. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1113. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1114. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1115. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1116. // verify data
  1117. std::string value;
  1118. for (int i = 1; i < 130; i++) {
  1119. if (i < 10 || (i >= 25 && i < 35)) {
  1120. ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &value).IsNotFound());
  1121. } else {
  1122. ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
  1123. }
  1124. }
  1125. // delete all hot data
  1126. ASSERT_OK(Delete(Key(0)));
  1127. start = Key(30);
  1128. end = Key(101); // range [101, 130] is cold, because it's not in input range
  1129. // in previous compaction
  1130. ASSERT_OK(
  1131. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  1132. ASSERT_OK(Flush());
  1133. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1134. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1135. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1136. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1137. // no range del is dropped because of snapshot
  1138. ASSERT_EQ(
  1139. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  1140. 0);
  1141. db_->ReleaseSnapshot(snap);
  1142. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1143. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1144. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1145. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1146. // 3 range dels dropped, the first one is double counted as expected, which is
  1147. // spread into 2 SST files
  1148. ASSERT_EQ(
  1149. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  1150. 3);
  1151. // move backward of cold_seq, which might happen when the user change the
  1152. // setting. the hot data won't move up, just to make sure it still runs
  1153. // fine, which is because:
  1154. // 1. sequence number is zeroed out, so no time information
  1155. // 2. leveled compaction only support move data up within the higher level
  1156. // input range
  1157. latest_cold_seq = seq_history[1];
  1158. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1159. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1160. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1161. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1162. }
  1163. // This test was essentially for a hacked-up version on future functionality.
  1164. // It can be resurrected if/when a form of range-based tiering is properly
  1165. // implemented.
  1166. // FIXME: aside from that, this test reproduces a near-endless compaction
  1167. // cycle that needs to be reproduced independently and fixed before
  1168. // leveled compaction can be used with the preclude feature in production.
  1169. TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageLevel) {
  1170. const int kNumTrigger = 4;
  1171. const int kNumLevels = 7;
  1172. const int kNumKeys = 100;
  1173. auto options = CurrentOptions();
  1174. SetColdTemperature(options);
  1175. options.level0_file_num_compaction_trigger = kNumTrigger;
  1176. options.level_compaction_dynamic_level_bytes = true;
  1177. options.num_levels = kNumLevels;
  1178. options.statistics = CreateDBStatistics();
  1179. options.max_subcompactions = 10;
  1180. options.preclude_last_level_data_seconds = 10000;
  1181. DestroyAndReopen(options);
  1182. auto cmp = options.comparator;
  1183. port::Mutex mutex;
  1184. std::string hot_start = Key(10);
  1185. std::string hot_end = Key(50);
  1186. SyncPoint::GetInstance()->SetCallBack(
  1187. "CompactionIterator::PrepareOutput.context", [&](void* arg) {
  1188. auto context = static_cast<PerKeyPlacementContext*>(arg);
  1189. MutexLock l(&mutex);
  1190. context->output_to_proximal_level =
  1191. cmp->Compare(context->key, hot_start) >= 0 &&
  1192. cmp->Compare(context->key, hot_end) < 0;
  1193. });
  1194. SyncPoint::GetInstance()->EnableProcessing();
  1195. for (int i = 0; i < kNumTrigger; i++) {
  1196. for (int j = 0; j < kNumKeys; j++) {
  1197. ASSERT_OK(Put(Key(j), "value" + std::to_string(j)));
  1198. }
  1199. ASSERT_OK(Flush());
  1200. }
  1201. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1202. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1203. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1204. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1205. // change to all cold
  1206. {
  1207. MutexLock l(&mutex);
  1208. hot_start = Key(100);
  1209. hot_end = Key(200);
  1210. }
  1211. CompactRangeOptions cro;
  1212. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  1213. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1214. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1215. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  1216. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1217. // change to all hot, but level compaction only support move cold to hot
  1218. // within it's higher level input range.
  1219. {
  1220. MutexLock l(&mutex);
  1221. hot_start = Key(0);
  1222. hot_end = Key(100);
  1223. }
  1224. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1225. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1226. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  1227. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1228. // with mixed hot/cold data
  1229. {
  1230. MutexLock l(&mutex);
  1231. hot_start = Key(50);
  1232. hot_end = Key(100);
  1233. }
  1234. ASSERT_OK(Put(Key(0), "value" + std::to_string(0)));
  1235. ASSERT_OK(Put(Key(100), "value" + std::to_string(100)));
  1236. ASSERT_OK(Flush());
  1237. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1238. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1239. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1240. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1241. // delete all hot data, but with snapshot to keep the range del
  1242. auto snap = db_->GetSnapshot();
  1243. std::string start = Key(50);
  1244. std::string end = Key(100);
  1245. ASSERT_OK(
  1246. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
  1247. ASSERT_OK(Flush());
  1248. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1249. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1250. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1251. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1252. // no range del is dropped because of snapshot
  1253. ASSERT_EQ(
  1254. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  1255. 0);
  1256. // release the snapshot and do compaction again should remove all hot data
  1257. db_->ReleaseSnapshot(snap);
  1258. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1259. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1260. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  1261. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1262. ASSERT_EQ(
  1263. options.statistics->getTickerCount(COMPACTION_RANGE_DEL_DROP_OBSOLETE),
  1264. 1);
  1265. // Tests that we only compact keys up to proximal level
  1266. // that are within proximal level input's internal key range.
  1267. // UPDATE: this functionality has changed. With proximal-enabled
  1268. // compaction, the expanded potential output range in the proximal
  1269. // level is reserved so should be safe to use.
  1270. {
  1271. MutexLock l(&mutex);
  1272. hot_start = Key(0);
  1273. hot_end = Key(100);
  1274. }
  1275. const Snapshot* temp_snap = db_->GetSnapshot();
  1276. // Key(0) and Key(1) here are inserted with higher sequence number
  1277. // than Key(0) and Key(1) inserted above.
  1278. ASSERT_OK(Put(Key(0), "value" + std::to_string(0)));
  1279. ASSERT_OK(Put(Key(1), "value" + std::to_string(100)));
  1280. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1281. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1282. {
  1283. std::vector<LiveFileMetaData> metas;
  1284. db_->GetLiveFilesMetaData(&metas);
  1285. for (const auto& f : metas) {
  1286. if (f.temperature == Temperature::kUnknown) {
  1287. // UPDATED (was 3 entries, Key0..Key1)
  1288. ASSERT_EQ(f.num_entries, 52);
  1289. ASSERT_EQ(f.smallestkey, Key(0));
  1290. ASSERT_EQ(f.largestkey, Key(49));
  1291. } else {
  1292. ASSERT_EQ(f.temperature, Temperature::kCold);
  1293. // UPDATED (was 50 entries, Key0..Key49)
  1294. // Key(100) is outside the hot range
  1295. ASSERT_EQ(f.num_entries, 1);
  1296. ASSERT_EQ(f.smallestkey, Key(100));
  1297. ASSERT_EQ(f.largestkey, Key(100));
  1298. }
  1299. }
  1300. }
  1301. db_->ReleaseSnapshot(temp_snap);
  1302. }
  1303. class PrecludeLastLevelTestBase : public DBTestBase {
  1304. public:
  1305. PrecludeLastLevelTestBase(std::string test_name = "preclude_last_level_test")
  1306. : DBTestBase(test_name, /*env_do_fsync=*/false) {
  1307. mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
  1308. mock_clock_->SetCurrentTime(kMockStartTime);
  1309. mock_env_ = std::make_unique<CompositeEnvWrapper>(env_, mock_clock_);
  1310. }
  1311. protected:
  1312. std::unique_ptr<Env> mock_env_;
  1313. std::shared_ptr<MockSystemClock> mock_clock_;
  1314. // Sufficient starting time that preserve time doesn't under-flow into
  1315. // pre-history
  1316. static constexpr uint32_t kMockStartTime = 10000000;
  1317. void SetUp() override {
  1318. mock_clock_->InstallTimedWaitFixCallback();
  1319. SyncPoint::GetInstance()->SetCallBack(
  1320. "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) {
  1321. auto periodic_task_scheduler_ptr =
  1322. static_cast<PeriodicTaskScheduler*>(arg);
  1323. periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get());
  1324. });
  1325. mock_clock_->SetCurrentTime(kMockStartTime);
  1326. }
  1327. void ApplyConfigChangeImpl(
  1328. bool dynamic, Options* options,
  1329. const std::unordered_map<std::string, std::string>& config_change,
  1330. const std::unordered_map<std::string, std::string>& db_config_change) {
  1331. if (dynamic) {
  1332. if (config_change.size() > 0) {
  1333. ASSERT_OK(db_->SetOptions(config_change));
  1334. }
  1335. if (db_config_change.size() > 0) {
  1336. ASSERT_OK(db_->SetDBOptions(db_config_change));
  1337. }
  1338. } else {
  1339. if (config_change.size() > 0) {
  1340. ASSERT_OK(GetColumnFamilyOptionsFromMap(
  1341. GetStrictConfigOptions(), *options, config_change, options));
  1342. }
  1343. if (db_config_change.size() > 0) {
  1344. ASSERT_OK(GetDBOptionsFromMap(GetStrictConfigOptions(), *options,
  1345. db_config_change, options));
  1346. }
  1347. Reopen(*options);
  1348. }
  1349. }
  1350. };
  1351. class PrecludeLastLevelTest : public PrecludeLastLevelTestBase,
  1352. public testing::WithParamInterface<bool> {
  1353. public:
  1354. using PrecludeLastLevelTestBase::PrecludeLastLevelTestBase;
  1355. bool UseDynamicConfig() const { return GetParam(); }
  1356. void ApplyConfigChange(
  1357. Options* options,
  1358. const std::unordered_map<std::string, std::string>& config_change,
  1359. const std::unordered_map<std::string, std::string>& db_config_change =
  1360. {}) {
  1361. ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
  1362. db_config_change);
  1363. }
  1364. };
  1365. TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
  1366. const int kNumTrigger = 4;
  1367. const int kNumLevels = 7;
  1368. const int kNumKeys = 100;
  1369. const int kKeyPerSec = 10;
  1370. Options options = CurrentOptions();
  1371. options.compaction_style = kCompactionStyleUniversal;
  1372. options.preserve_internal_time_seconds = 10000;
  1373. options.env = mock_env_.get();
  1374. options.level0_file_num_compaction_trigger = kNumTrigger;
  1375. options.num_levels = kNumLevels;
  1376. DestroyAndReopen(options);
  1377. int sst_num = 0;
  1378. // Write files that are overlap and enough to trigger compaction
  1379. for (; sst_num < kNumTrigger; sst_num++) {
  1380. for (int i = 0; i < kNumKeys; i++) {
  1381. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
  1382. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1383. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  1384. });
  1385. }
  1386. ASSERT_OK(Flush());
  1387. }
  1388. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1389. // all data is pushed to the last level
  1390. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1391. // enable preclude feature
  1392. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"},
  1393. {"last_level_temperature", "kCold"}});
  1394. // all data is hot, even they're in the last level
  1395. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1396. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1397. // Generate a sstable and trigger manual compaction
  1398. ASSERT_OK(Put(Key(10), "value"));
  1399. ASSERT_OK(Flush());
  1400. CompactRangeOptions cro;
  1401. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  1402. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1403. // all data is moved up to the proximal level
  1404. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1405. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1406. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1407. // close explicitly, because the env is local variable which will be released
  1408. // first.
  1409. Close();
  1410. }
  1411. TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
  1412. const int kNumTrigger = 4;
  1413. const int kNumLevels = 7;
  1414. const int kNumKeys = 100;
  1415. const int kKeyPerSec = 10;
  1416. Options options = CurrentOptions();
  1417. options.compaction_style = kCompactionStyleUniversal;
  1418. options.preserve_internal_time_seconds = 10000;
  1419. options.env = mock_env_.get();
  1420. options.level0_file_num_compaction_trigger = kNumTrigger;
  1421. options.num_levels = kNumLevels;
  1422. DestroyAndReopen(options);
  1423. int sst_num = 0;
  1424. // Write files that are overlap and enough to trigger compaction
  1425. for (; sst_num < kNumTrigger; sst_num++) {
  1426. for (int i = 0; i < kNumKeys; i++) {
  1427. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
  1428. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1429. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  1430. });
  1431. }
  1432. ASSERT_OK(Flush());
  1433. }
  1434. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1435. // all data is pushed to the last level
  1436. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1437. // enable preclude feature, and...
  1438. // make sure it won't trigger Size Amp compaction, unlike normal Size Amp
  1439. // compaction which is typically a last level compaction, when tiered Storage
  1440. // ("preclude_last_level") is enabled, size amp won't include the last level.
  1441. // As the last level would be in cold tier and the size would not be a
  1442. // problem, which also avoid frequent hot to cold storage compaction.
  1443. ApplyConfigChange(
  1444. &options,
  1445. {{"preclude_last_level_data_seconds", "10000"},
  1446. {"last_level_temperature", "kCold"},
  1447. {"compaction_options_universal.max_size_amplification_percent", "400"}});
  1448. // all data is hot, even they're in the last level
  1449. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1450. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1451. // Write more data, but still all hot until the 10th SST, as:
  1452. // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds
  1453. // The preclude_last_level_data_seconds is 10k
  1454. Random rnd(301);
  1455. for (; sst_num < kNumTrigger * 2 - 1; sst_num++) {
  1456. for (int i = 0; i < kNumKeys; i++) {
  1457. // the value needs to be big enough to trigger full compaction
  1458. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100)));
  1459. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1460. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  1461. });
  1462. }
  1463. ASSERT_OK(Flush());
  1464. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1465. }
  1466. // all data is moved up to the proximal level
  1467. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1468. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1469. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1470. // close explicitly, because the env is local variable which will be released
  1471. // first.
  1472. Close();
  1473. }
  1474. TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
  1475. const int kNumTrigger = 4;
  1476. const int kNumLevels = 7;
  1477. const int kNumKeys = 100;
  1478. const int kKeyPerSec = 10;
  1479. Options options = CurrentOptions();
  1480. options.compaction_style = kCompactionStyleUniversal;
  1481. options.preserve_internal_time_seconds = 2000;
  1482. options.env = mock_env_.get();
  1483. options.level0_file_num_compaction_trigger = kNumTrigger;
  1484. options.num_levels = kNumLevels;
  1485. DestroyAndReopen(options);
  1486. int sst_num = 0;
  1487. // Write files that are overlap and enough to trigger compaction
  1488. for (; sst_num < kNumTrigger; sst_num++) {
  1489. for (int i = 0; i < kNumKeys; i++) {
  1490. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
  1491. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1492. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  1493. });
  1494. }
  1495. ASSERT_OK(Flush());
  1496. }
  1497. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1498. // all data is pushed to the last level
  1499. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1500. std::vector<KeyVersion> key_versions;
  1501. ASSERT_OK(GetAllKeyVersions(db_, {}, {}, std::numeric_limits<size_t>::max(),
  1502. &key_versions));
  1503. // make sure there're more than 300 keys and first 100 keys are having seqno
  1504. // zeroed out, the last 100 key seqno not zeroed out
  1505. ASSERT_GT(key_versions.size(), 300);
  1506. for (int i = 0; i < 100; i++) {
  1507. ASSERT_EQ(key_versions[i].sequence, 0);
  1508. }
  1509. auto rit = key_versions.rbegin();
  1510. for (int i = 0; i < 100; i++) {
  1511. ASSERT_GT(rit->sequence, 0);
  1512. rit++;
  1513. }
  1514. // enable preclude feature
  1515. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "2000"},
  1516. {"last_level_temperature", "kCold"}});
  1517. // Generate a sstable and trigger manual compaction
  1518. ASSERT_OK(Put(Key(10), "value"));
  1519. ASSERT_OK(Flush());
  1520. CompactRangeOptions cro;
  1521. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  1522. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1523. // some data are moved up, some are not
  1524. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1525. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1526. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1527. Close();
  1528. }
  1529. TEST_P(PrecludeLastLevelTest, SmallPrecludeTime) {
  1530. const int kNumTrigger = 4;
  1531. const int kNumLevels = 7;
  1532. const int kNumKeys = 100;
  1533. Options options = CurrentOptions();
  1534. options.compaction_style = kCompactionStyleUniversal;
  1535. options.preclude_last_level_data_seconds = 60;
  1536. options.preserve_internal_time_seconds = 0;
  1537. options.env = mock_env_.get();
  1538. options.level0_file_num_compaction_trigger = kNumTrigger;
  1539. options.num_levels = kNumLevels;
  1540. // This existing test selected to also check the case of various temperatures
  1541. // for last_level_temperature, which should not be interesting enough to
  1542. // exercise across many/all test cases
  1543. options.last_level_temperature = RandomKnownTemperature();
  1544. DestroyAndReopen(options);
  1545. Random rnd(301);
  1546. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1547. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1));
  1548. });
  1549. for (int i = 0; i < kNumKeys; i++) {
  1550. ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
  1551. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1552. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
  1553. });
  1554. }
  1555. ASSERT_OK(Flush());
  1556. TablePropertiesCollection tables_props;
  1557. ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
  1558. ASSERT_EQ(tables_props.size(), 1);
  1559. ASSERT_FALSE(tables_props.begin()->second->seqno_to_time_mapping.empty());
  1560. SeqnoToTimeMapping tp_mapping;
  1561. ASSERT_OK(tp_mapping.DecodeFrom(
  1562. tables_props.begin()->second->seqno_to_time_mapping));
  1563. ASSERT_FALSE(tp_mapping.Empty());
  1564. auto seqs = tp_mapping.TEST_GetInternalMapping();
  1565. ASSERT_FALSE(seqs.empty());
  1566. ASSERT_GE(GetSstSizeHelper(Temperature::kUnknown), 1);
  1567. for (auto t : kKnownTemperatures) {
  1568. ASSERT_EQ(GetSstSizeHelper(t), 0);
  1569. }
  1570. // Wait more than preclude_last_level time, then make sure all the data is
  1571. // compacted to the last level even there's no write (no seqno -> time
  1572. // information was flushed to any SST).
  1573. mock_clock_->MockSleepForSeconds(100);
  1574. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1575. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1576. for (auto t : kKnownTemperatures) {
  1577. if (t == options.last_level_temperature) {
  1578. ASSERT_GT(GetSstSizeHelper(t), 0);
  1579. } else {
  1580. ASSERT_EQ(GetSstSizeHelper(t), 0);
  1581. }
  1582. }
  1583. Close();
  1584. }
  1585. TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) {
  1586. // When compacting keys from the last level to proximal level,
  1587. // output to proximal level should be within internal key range
  1588. // of input files from proximal level.
  1589. // Set up:
  1590. // L5:
  1591. // File 1: DeleteRange[1, 3)@4, File 2: [3@5, 100@6]
  1592. // L6:
  1593. // File 3: [2@1, 3@2], File 4: [50@3]
  1594. //
  1595. // When File 1 and File 3 are being compacted,
  1596. // Key(3) cannot be compacted up, otherwise it causes
  1597. // inconsistency where File 3's Key(3) has a lower sequence number
  1598. // than File 2's Key(3).
  1599. const int kNumLevels = 7;
  1600. auto options = CurrentOptions();
  1601. options.env = mock_env_.get();
  1602. options.last_level_temperature = Temperature::kCold;
  1603. options.level_compaction_dynamic_level_bytes = true;
  1604. options.num_levels = kNumLevels;
  1605. options.statistics = CreateDBStatistics();
  1606. options.max_subcompactions = 10;
  1607. options.preserve_internal_time_seconds = 10000;
  1608. DestroyAndReopen(options);
  1609. // File 3
  1610. ASSERT_OK(Put(Key(2), "val2"));
  1611. ASSERT_OK(Put(Key(3), "val3"));
  1612. ASSERT_OK(Flush());
  1613. MoveFilesToLevel(6);
  1614. // File 4
  1615. ASSERT_OK(Put(Key(50), "val50"));
  1616. ASSERT_OK(Flush());
  1617. MoveFilesToLevel(6);
  1618. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}});
  1619. const Snapshot* snapshot = db_->GetSnapshot();
  1620. // File 1
  1621. std::string start = Key(1);
  1622. std::string end = Key(3);
  1623. ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), start, end));
  1624. ASSERT_OK(Flush());
  1625. MoveFilesToLevel(5);
  1626. // File 2
  1627. ASSERT_OK(Put(Key(3), "vall"));
  1628. ASSERT_OK(Put(Key(100), "val100"));
  1629. ASSERT_OK(Flush());
  1630. MoveFilesToLevel(5);
  1631. ASSERT_EQ("0,0,0,0,0,2,2", FilesPerLevel());
  1632. auto VerifyLogicalState = [&](int line) {
  1633. SCOPED_TRACE("Called from line " + std::to_string(line));
  1634. // First with snapshot
  1635. ASSERT_EQ("val2", Get(Key(2), snapshot));
  1636. ASSERT_EQ("val3", Get(Key(3), snapshot));
  1637. ASSERT_EQ("val50", Get(Key(50), snapshot));
  1638. ASSERT_EQ("NOT_FOUND", Get(Key(100), snapshot));
  1639. // Then without snapshot
  1640. ASSERT_EQ("NOT_FOUND", Get(Key(2)));
  1641. ASSERT_EQ("vall", Get(Key(3)));
  1642. ASSERT_EQ("val50", Get(Key(50)));
  1643. ASSERT_EQ("val100", Get(Key(100)));
  1644. };
  1645. VerifyLogicalState(__LINE__);
  1646. // Try to compact keys up
  1647. CompactRangeOptions cro;
  1648. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  1649. // Without internal key range checking, we get the following error:
  1650. // Corruption: force_consistency_checks(DEBUG): VersionBuilder: L5 has
  1651. // overlapping ranges: file #18 largest key: '6B6579303030303033' seq:102,
  1652. // type:1 vs. file #15 smallest key: '6B6579303030303033' seq:104, type:1
  1653. ASSERT_OK(CompactRange(cro, Key(1), Key(2)));
  1654. VerifyLogicalState(__LINE__);
  1655. db_->ReleaseSnapshot(snapshot);
  1656. Close();
  1657. }
  1658. INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest,
  1659. ::testing::Bool());
  1660. class PrecludeWithCompactStyleTest : public PrecludeLastLevelTestBase,
  1661. public testing::WithParamInterface<bool> {
  1662. public:
  1663. void ApplyConfigChange(
  1664. Options* options,
  1665. const std::unordered_map<std::string, std::string>& config_change,
  1666. const std::unordered_map<std::string, std::string>& db_config_change =
  1667. {}) {
  1668. // Depends on dynamic config change while holding a snapshot
  1669. ApplyConfigChangeImpl(true /*dynamic*/, options, config_change,
  1670. db_config_change);
  1671. }
  1672. };
  1673. TEST_P(PrecludeWithCompactStyleTest, RangeTombstoneSnapshotMigrateFromLast) {
  1674. // Reproducer for issue originally described in
  1675. // https://github.com/facebook/rocksdb/pull/9964/files#r1024449523
  1676. const bool universal = GetParam();
  1677. const int kNumLevels = 7;
  1678. auto options = CurrentOptions();
  1679. options.env = mock_env_.get();
  1680. options.last_level_temperature = Temperature::kCold;
  1681. options.compaction_style =
  1682. universal ? kCompactionStyleUniversal : kCompactionStyleLevel;
  1683. options.compaction_options_universal.allow_trivial_move = true;
  1684. options.num_levels = kNumLevels;
  1685. options.statistics = CreateDBStatistics();
  1686. options.max_subcompactions = 10;
  1687. options.preserve_internal_time_seconds = 30000;
  1688. DestroyAndReopen(options);
  1689. // Entries with much older write time
  1690. ASSERT_OK(Put(Key(2), "val2"));
  1691. ASSERT_OK(Put(Key(6), "val6"));
  1692. for (int i = 0; i < 10; i++) {
  1693. dbfull()->TEST_WaitForPeriodicTaskRun(
  1694. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
  1695. }
  1696. const Snapshot* snapshot = db_->GetSnapshot();
  1697. ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), Key(1), Key(5)));
  1698. ASSERT_OK(Put(Key(1), "val1"));
  1699. ASSERT_OK(Flush());
  1700. // Send to last level
  1701. if (universal) {
  1702. ASSERT_OK(CompactRange({}, {}, {}));
  1703. } else {
  1704. MoveFilesToLevel(6);
  1705. }
  1706. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1707. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}});
  1708. // To exercise the WithinProximalLevelOutputRange feature, we want files
  1709. // around the middle file to be compacted on the proximal level
  1710. ASSERT_OK(Put(Key(0), "val0"));
  1711. ASSERT_OK(Flush());
  1712. ASSERT_OK(Put(Key(3), "val3"));
  1713. ASSERT_OK(Flush());
  1714. ASSERT_OK(Put(Key(7), "val7"));
  1715. // FIXME: ideally this wouldn't be necessary to get a seqno to time entry
  1716. // into a later compaction to get data into the last level
  1717. dbfull()->TEST_WaitForPeriodicTaskRun(
  1718. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
  1719. ASSERT_OK(Flush());
  1720. // Send three files to next-to-last level (if explicitly needed)
  1721. if (universal) {
  1722. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1723. } else {
  1724. MoveFilesToLevel(5);
  1725. }
  1726. ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
  1727. auto VerifyLogicalState = [&](int line) {
  1728. SCOPED_TRACE("Called from line " + std::to_string(line));
  1729. // First with snapshot
  1730. if (snapshot) {
  1731. ASSERT_EQ("NOT_FOUND", Get(Key(0), snapshot));
  1732. ASSERT_EQ("NOT_FOUND", Get(Key(1), snapshot));
  1733. ASSERT_EQ("val2", Get(Key(2), snapshot));
  1734. ASSERT_EQ("NOT_FOUND", Get(Key(3), snapshot));
  1735. ASSERT_EQ("val6", Get(Key(6), snapshot));
  1736. ASSERT_EQ("NOT_FOUND", Get(Key(7), snapshot));
  1737. }
  1738. // Then without snapshot
  1739. ASSERT_EQ("val0", Get(Key(0)));
  1740. ASSERT_EQ("val1", Get(Key(1)));
  1741. ASSERT_EQ("NOT_FOUND", Get(Key(2)));
  1742. ASSERT_EQ("val3", Get(Key(3)));
  1743. ASSERT_EQ("val6", Get(Key(6)));
  1744. ASSERT_EQ("val7", Get(Key(7)));
  1745. };
  1746. VerifyLogicalState(__LINE__);
  1747. // Try a limited range compaction
  1748. // (These would previously hit "Unsafe to store Seq later than snapshot")
  1749. if (universal) {
  1750. uint64_t middle_l5 = GetLevelFileMetadatas(5)[1]->fd.GetNumber();
  1751. ASSERT_OK(db_->CompactFiles({}, {MakeTableFileName(middle_l5)}, 6));
  1752. } else {
  1753. ASSERT_OK(CompactRange({}, Key(3), Key(4)));
  1754. }
  1755. EXPECT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
  1756. VerifyLogicalState(__LINE__);
  1757. // Compact everything, but some data still goes to both proximal and last
  1758. // levels. A full-range compaction should be safe to "migrate" data from the
  1759. // last level to proximal (because of preclude setting change).
  1760. ASSERT_OK(CompactRange({}, {}, {}));
  1761. EXPECT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1762. VerifyLogicalState(__LINE__);
  1763. // Key1 should have been migrated out of the last level
  1764. // FIXME: doesn't yet work with leveled compaction
  1765. if (universal) {
  1766. auto& meta = *GetLevelFileMetadatas(6)[0];
  1767. ASSERT_LT(Key(1), meta.smallest.user_key().ToString());
  1768. }
  1769. // Make data eligible for last level
  1770. db_->ReleaseSnapshot(snapshot);
  1771. snapshot = nullptr;
  1772. mock_clock_->MockSleepForSeconds(static_cast<int>(10000));
  1773. ASSERT_OK(CompactRange({}, {}, {}));
  1774. EXPECT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1775. VerifyLogicalState(__LINE__);
  1776. Close();
  1777. }
  1778. INSTANTIATE_TEST_CASE_P(PrecludeWithCompactStyleTest,
  1779. PrecludeWithCompactStyleTest, ::testing::Bool());
  1780. class TimedPutPrecludeLastLevelTest
  1781. : public PrecludeLastLevelTestBase,
  1782. public testing::WithParamInterface<size_t> {
  1783. public:
  1784. TimedPutPrecludeLastLevelTest()
  1785. : PrecludeLastLevelTestBase("timed_put_preclude_last_level_test") {}
  1786. size_t ProtectionBytesPerKey() const { return GetParam(); }
  1787. };
  1788. TEST_P(TimedPutPrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
  1789. const int kNumTrigger = 4;
  1790. const int kNumLevels = 7;
  1791. const int kNumKeys = 100;
  1792. Options options = CurrentOptions();
  1793. options.compaction_style = kCompactionStyleUniversal;
  1794. options.preclude_last_level_data_seconds = 60;
  1795. options.preserve_internal_time_seconds = 0;
  1796. options.env = mock_env_.get();
  1797. options.level0_file_num_compaction_trigger = kNumTrigger;
  1798. options.num_levels = kNumLevels;
  1799. options.last_level_temperature = Temperature::kCold;
  1800. DestroyAndReopen(options);
  1801. WriteOptions wo;
  1802. wo.protection_bytes_per_key = ProtectionBytesPerKey();
  1803. Random rnd(301);
  1804. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1805. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1));
  1806. });
  1807. for (int i = 0; i < kNumKeys / 2; i++) {
  1808. ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo));
  1809. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1810. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
  1811. });
  1812. }
  1813. // Create one file with regular Put.
  1814. ASSERT_OK(Flush());
  1815. // Create one file with TimedPut.
  1816. // With above mock clock operations, write_unix_time 50 should be before
  1817. // current_time - preclude_last_level_seconds.
  1818. // These data are eligible to be put on the last level once written to db
  1819. // and compaction will fast track them to the last level.
  1820. for (int i = kNumKeys / 2; i < kNumKeys; i++) {
  1821. ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
  1822. }
  1823. ASSERT_OK(Flush());
  1824. // TimedPut file moved to the last level immediately.
  1825. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1826. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1827. // Wait more than preclude_last_level time, Put file eventually moved to the
  1828. // last level.
  1829. mock_clock_->MockSleepForSeconds(100);
  1830. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1831. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1832. ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
  1833. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1834. Close();
  1835. }
  1836. TEST_P(TimedPutPrecludeLastLevelTest, InterleavedTimedPutAndPut) {
  1837. Options options = CurrentOptions();
  1838. options.compaction_style = kCompactionStyleUniversal;
  1839. options.disable_auto_compactions = true;
  1840. options.preclude_last_level_data_seconds = 1 * 24 * 60 * 60;
  1841. options.env = mock_env_.get();
  1842. options.num_levels = 7;
  1843. options.last_level_temperature = Temperature::kCold;
  1844. options.default_write_temperature = Temperature::kHot;
  1845. DestroyAndReopen(options);
  1846. WriteOptions wo;
  1847. wo.protection_bytes_per_key = ProtectionBytesPerKey();
  1848. // Start time: kMockStartTime = 10000000;
  1849. ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60, wo));
  1850. ASSERT_OK(Put(Key(1), "v1", wo));
  1851. ASSERT_OK(Flush());
  1852. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1853. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  1854. ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
  1855. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1856. Close();
  1857. }
  1858. TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnProximalLevel) {
  1859. Options options = CurrentOptions();
  1860. options.compaction_style = kCompactionStyleUniversal;
  1861. options.disable_auto_compactions = true;
  1862. options.preclude_last_level_data_seconds = 3 * 24 * 60 * 60;
  1863. int seconds_between_recording = (3 * 24 * 60 * 60) / kMaxSeqnoTimePairsPerCF;
  1864. options.env = mock_env_.get();
  1865. options.num_levels = 7;
  1866. options.last_level_temperature = Temperature::kCold;
  1867. options.default_write_temperature = Temperature::kHot;
  1868. DestroyAndReopen(options);
  1869. WriteOptions wo;
  1870. wo.protection_bytes_per_key = ProtectionBytesPerKey();
  1871. // Creating a snapshot to manually control when preferred sequence number is
  1872. // swapped in. An entry's preferred seqno won't get swapped in until it's
  1873. // visible to the earliest snapshot. With this, we can test relevant seqno to
  1874. // time mapping recorded in SST file also covers preferred seqno, not just
  1875. // the seqno in the internal keys.
  1876. auto* snap1 = db_->GetSnapshot();
  1877. // Start time: kMockStartTime = 10000000;
  1878. ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60, wo));
  1879. ASSERT_OK(TimedPut(0, Key(1), "v1", kMockStartTime - 1 * 24 * 60 * 60, wo));
  1880. ASSERT_OK(TimedPut(0, Key(2), "v2", kMockStartTime - 1 * 24 * 60 * 60, wo));
  1881. ASSERT_OK(Flush());
  1882. // Should still be in proximal level.
  1883. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1884. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1885. ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
  1886. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1887. // Wait one more day and release snapshot. Data's preferred seqno should be
  1888. // swapped in, but data should still stay in proximal level. SST file's
  1889. // seqno to time mapping should continue to cover preferred seqno after
  1890. // compaction.
  1891. db_->ReleaseSnapshot(snap1);
  1892. mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60);
  1893. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1894. ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
  1895. ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
  1896. ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
  1897. // Wait one more day and data are eligible to be placed on last level.
  1898. // Instead of waiting exactly one more day, here we waited
  1899. // `seconds_between_recording` less seconds to show that it's not precise.
  1900. // Data could start to be placed on cold tier one recording interval before
  1901. // they exactly become cold based on the setting. For this one column family
  1902. // setting preserving 3 days of recording, it's about 43 minutes.
  1903. mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60 -
  1904. seconds_between_recording);
  1905. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1906. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  1907. ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
  1908. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1909. Close();
  1910. }
  1911. TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) {
  1912. const int kNumTrigger = 10;
  1913. const int kNumLevels = 7;
  1914. const int kNumKeys = 200;
  1915. Options options = CurrentOptions();
  1916. options.compaction_style = kCompactionStyleUniversal;
  1917. options.preclude_last_level_data_seconds = 60;
  1918. options.preserve_internal_time_seconds = 0;
  1919. options.env = mock_env_.get();
  1920. options.level0_file_num_compaction_trigger = kNumTrigger;
  1921. options.num_levels = kNumLevels;
  1922. options.last_level_temperature = Temperature::kCold;
  1923. ConfigOptions config_options;
  1924. config_options.ignore_unsupported_options = false;
  1925. std::shared_ptr<TablePropertiesCollectorFactory> factory;
  1926. std::string id = CompactForTieringCollectorFactory::kClassName();
  1927. ASSERT_OK(TablePropertiesCollectorFactory::CreateFromString(
  1928. config_options, "compaction_trigger_ratio=0.4; id=" + id, &factory));
  1929. auto collector_factory =
  1930. factory->CheckedCast<CompactForTieringCollectorFactory>();
  1931. options.table_properties_collector_factories.push_back(factory);
  1932. DestroyAndReopen(options);
  1933. WriteOptions wo;
  1934. wo.protection_bytes_per_key = ProtectionBytesPerKey();
  1935. Random rnd(301);
  1936. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1937. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1));
  1938. });
  1939. for (int i = 0; i < kNumKeys / 4; i++) {
  1940. ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo));
  1941. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1942. mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
  1943. });
  1944. }
  1945. // Create one file with regular Put.
  1946. ASSERT_OK(Flush());
  1947. // Create one file with TimedPut.
  1948. // These data are eligible to be put on the last level once written to db
  1949. // and compaction will fast track them to the last level.
  1950. for (int i = kNumKeys / 4; i < kNumKeys / 2; i++) {
  1951. ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
  1952. }
  1953. ASSERT_OK(Flush());
  1954. // TimedPut file moved to the last level via auto triggered compaction.
  1955. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1956. ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
  1957. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  1958. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  1959. collector_factory->SetCompactionTriggerRatio(1.1);
  1960. for (int i = kNumKeys / 2; i < kNumKeys * 3 / 4; i++) {
  1961. ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
  1962. }
  1963. ASSERT_OK(Flush());
  1964. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1965. ASSERT_EQ("2,0,0,0,0,0,1", FilesPerLevel());
  1966. collector_factory->SetCompactionTriggerRatio(0);
  1967. for (int i = kNumKeys * 3 / 4; i < kNumKeys; i++) {
  1968. ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
  1969. }
  1970. ASSERT_OK(Flush());
  1971. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1972. ASSERT_EQ("3,0,0,0,0,0,1", FilesPerLevel());
  1973. Close();
  1974. }
  1975. INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest,
  1976. TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8));
  1977. TEST_P(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
  1978. const int kNumTrigger = 4;
  1979. const int kNumLevels = 7;
  1980. const int kNumKeys = 100;
  1981. const int kKeyPerSec = 10;
  1982. Options options = CurrentOptions();
  1983. options.compaction_style = kCompactionStyleUniversal;
  1984. options.preserve_internal_time_seconds = 2000;
  1985. options.env = mock_env_.get();
  1986. options.level0_file_num_compaction_trigger = kNumTrigger;
  1987. options.num_levels = kNumLevels;
  1988. DestroyAndReopen(options);
  1989. int sst_num = 0;
  1990. // Write files that are overlap and enough to trigger compaction
  1991. for (; sst_num < kNumTrigger; sst_num++) {
  1992. for (int i = 0; i < kNumKeys; i++) {
  1993. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
  1994. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  1995. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  1996. });
  1997. }
  1998. ASSERT_OK(Flush());
  1999. }
  2000. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2001. // all data is pushed to the last level
  2002. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  2003. // enable preclude feature
  2004. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "2000"},
  2005. {"last_level_temperature", "kCold"}});
  2006. CompactRangeOptions cro;
  2007. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2008. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2009. // some data are moved up, some are not
  2010. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  2011. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  2012. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  2013. std::vector<KeyVersion> key_versions;
  2014. ASSERT_OK(GetAllKeyVersions(db_, {}, {}, std::numeric_limits<size_t>::max(),
  2015. &key_versions));
  2016. // make sure there're more than 300 keys and first 100 keys are having seqno
  2017. // zeroed out, the last 100 key seqno not zeroed out
  2018. ASSERT_GT(key_versions.size(), 300);
  2019. for (int i = 0; i < 100; i++) {
  2020. ASSERT_EQ(key_versions[i].sequence, 0);
  2021. }
  2022. auto rit = key_versions.rbegin();
  2023. for (int i = 0; i < 100; i++) {
  2024. ASSERT_GT(rit->sequence, 0);
  2025. rit++;
  2026. }
  2027. Close();
  2028. }
  2029. class PrecludeLastLevelOptionalTest
  2030. : public PrecludeLastLevelTestBase,
  2031. public testing::WithParamInterface<std::tuple<bool, bool>> {
  2032. public:
  2033. bool UseDynamicConfig() const { return std::get<0>(GetParam()); }
  2034. void ApplyConfigChange(
  2035. Options* options,
  2036. const std::unordered_map<std::string, std::string>& config_change,
  2037. const std::unordered_map<std::string, std::string>& db_config_change =
  2038. {}) {
  2039. ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
  2040. db_config_change);
  2041. }
  2042. bool EnablePrecludeLastLevel() const { return std::get<1>(GetParam()); }
  2043. };
  2044. TEST_P(PrecludeLastLevelOptionalTest, LastLevelOnlyCompactionNoPreclude) {
  2045. const int kNumTrigger = 4;
  2046. const int kNumLevels = 7;
  2047. const int kNumKeys = 100;
  2048. const int kKeyPerSec = 10;
  2049. Options options = CurrentOptions();
  2050. options.compaction_style = kCompactionStyleUniversal;
  2051. options.preserve_internal_time_seconds = 2000;
  2052. options.env = mock_env_.get();
  2053. options.level0_file_num_compaction_trigger = kNumTrigger;
  2054. options.num_levels = kNumLevels;
  2055. DestroyAndReopen(options);
  2056. Random rnd(301);
  2057. int sst_num = 0;
  2058. // Write files that are overlap and enough to trigger compaction
  2059. for (; sst_num < kNumTrigger; sst_num++) {
  2060. for (int i = 0; i < kNumKeys; i++) {
  2061. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100)));
  2062. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  2063. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  2064. });
  2065. }
  2066. ASSERT_OK(Flush());
  2067. }
  2068. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2069. // all data is pushed to the last level
  2070. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  2071. std::atomic_bool is_manual_compaction_running = false;
  2072. std::atomic_bool verified_compaction_order = false;
  2073. // Make sure the manual compaction is in progress and try to trigger a
  2074. // SizeRatio compaction by flushing 4 files to L0. The compaction will try to
  2075. // compact 4 files at L0 to L5 (the last empty level).
  2076. // If the preclude_last_feature is enabled, the auto triggered compaction
  2077. // cannot be picked. Otherwise, the auto triggered compaction can run in
  2078. // parallel with the last level compaction.
  2079. // L0: [a] [b] [c] [d]
  2080. // L5: (locked if preclude_last_level is enabled)
  2081. // L6: [z] (locked: manual compaction in progress)
  2082. // TODO: in this case, L0 files should just be compacted to L4, so the 2
  2083. // compactions won't be overlapped.
  2084. SyncPoint::GetInstance()->SetCallBack(
  2085. "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
  2086. auto compaction = static_cast<Compaction*>(arg);
  2087. if (compaction->is_manual_compaction()) {
  2088. is_manual_compaction_running = true;
  2089. TEST_SYNC_POINT(
  2090. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2091. "ManualCompaction1");
  2092. TEST_SYNC_POINT(
  2093. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2094. "ManualCompaction2");
  2095. is_manual_compaction_running = false;
  2096. }
  2097. });
  2098. SyncPoint::GetInstance()->SetCallBack(
  2099. "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
  2100. auto compaction = static_cast<Compaction*>(arg);
  2101. if (EnablePrecludeLastLevel() && is_manual_compaction_running) {
  2102. ASSERT_TRUE(compaction == nullptr);
  2103. verified_compaction_order = true;
  2104. } else {
  2105. ASSERT_TRUE(compaction != nullptr);
  2106. verified_compaction_order = true;
  2107. }
  2108. if (!compaction || !compaction->is_manual_compaction()) {
  2109. TEST_SYNC_POINT(
  2110. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2111. "AutoCompactionPicked");
  2112. }
  2113. });
  2114. SyncPoint::GetInstance()->LoadDependency({
  2115. {"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2116. "ManualCompaction1",
  2117. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite"},
  2118. {"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2119. "AutoCompactionPicked",
  2120. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
  2121. "ManualCompaction2"},
  2122. });
  2123. SyncPoint::GetInstance()->EnableProcessing();
  2124. // only enable if the Parameter is true
  2125. ApplyConfigChange(&options,
  2126. {{"preclude_last_level_data_seconds",
  2127. EnablePrecludeLastLevel() ? "2000" : "0"},
  2128. {"last_level_temperature", "kCold"}},
  2129. {{"max_background_jobs", "8"}});
  2130. auto manual_compaction_thread = port::Thread([this]() {
  2131. CompactRangeOptions cro;
  2132. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2133. cro.exclusive_manual_compaction = false;
  2134. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2135. });
  2136. TEST_SYNC_POINT(
  2137. "PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite");
  2138. auto stop_token =
  2139. dbfull()->TEST_write_controler().GetCompactionPressureToken();
  2140. for (; sst_num < kNumTrigger * 2; sst_num++) {
  2141. for (int i = 0; i < kNumKeys; i++) {
  2142. // the value needs to be big enough to trigger full compaction
  2143. ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
  2144. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  2145. mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
  2146. });
  2147. }
  2148. ASSERT_OK(Flush());
  2149. }
  2150. manual_compaction_thread.join();
  2151. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2152. if (EnablePrecludeLastLevel()) {
  2153. ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel());
  2154. } else {
  2155. ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
  2156. }
  2157. ASSERT_TRUE(verified_compaction_order);
  2158. SyncPoint::GetInstance()->DisableProcessing();
  2159. SyncPoint::GetInstance()->ClearAllCallBacks();
  2160. stop_token.reset();
  2161. Close();
  2162. }
  2163. TEST_P(PrecludeLastLevelOptionalTest, PeriodicCompactionToProximalLevel) {
  2164. // Test the last level only periodic compaction should also be blocked by an
  2165. // ongoing compaction in proximal level if tiered compaction is enabled
  2166. // otherwise, the periodic compaction should just run for the last level.
  2167. const int kNumTrigger = 4;
  2168. const int kNumLevels = 7;
  2169. const int kProximalLevel = kNumLevels - 2;
  2170. const int kKeyPerSec = 1;
  2171. const int kNumKeys = 100;
  2172. Options options = CurrentOptions();
  2173. options.compaction_style = kCompactionStyleUniversal;
  2174. options.preserve_internal_time_seconds = 20000;
  2175. options.env = mock_env_.get();
  2176. options.level0_file_num_compaction_trigger = kNumTrigger;
  2177. options.num_levels = kNumLevels;
  2178. options.periodic_compaction_seconds = 10000;
  2179. DestroyAndReopen(options);
  2180. Random rnd(301);
  2181. for (int i = 0; i < 3 * kNumKeys; i++) {
  2182. ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
  2183. dbfull()->TEST_WaitForPeriodicTaskRun(
  2184. [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
  2185. }
  2186. ASSERT_OK(Flush());
  2187. CompactRangeOptions cro;
  2188. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2189. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2190. // make sure all data is compacted to the last level
  2191. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  2192. // enable preclude feature
  2193. ApplyConfigChange(&options,
  2194. {{"preclude_last_level_data_seconds",
  2195. EnablePrecludeLastLevel() ? "2000" : "0"},
  2196. {"last_level_temperature", "kCold"}},
  2197. {{"max_background_jobs", "8"}});
  2198. std::atomic_bool is_size_ratio_compaction_running = false;
  2199. std::atomic_bool verified_last_level_compaction = false;
  2200. SyncPoint::GetInstance()->SetCallBack(
  2201. "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
  2202. auto compaction = static_cast<Compaction*>(arg);
  2203. if (compaction->output_level() == kProximalLevel) {
  2204. is_size_ratio_compaction_running = true;
  2205. TEST_SYNC_POINT(
  2206. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2207. "SizeRatioCompaction1");
  2208. TEST_SYNC_POINT(
  2209. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2210. "SizeRatioCompaction2");
  2211. is_size_ratio_compaction_running = false;
  2212. }
  2213. });
  2214. SyncPoint::GetInstance()->SetCallBack(
  2215. "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
  2216. auto compaction = static_cast<Compaction*>(arg);
  2217. if (is_size_ratio_compaction_running) {
  2218. if (EnablePrecludeLastLevel()) {
  2219. ASSERT_TRUE(compaction == nullptr);
  2220. } else {
  2221. ASSERT_TRUE(compaction != nullptr);
  2222. ASSERT_EQ(compaction->compaction_reason(),
  2223. CompactionReason::kPeriodicCompaction);
  2224. ASSERT_EQ(compaction->start_level(), kNumLevels - 1);
  2225. }
  2226. verified_last_level_compaction = true;
  2227. }
  2228. TEST_SYNC_POINT(
  2229. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2230. "AutoCompactionPicked");
  2231. });
  2232. SyncPoint::GetInstance()->LoadDependency({
  2233. {"PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2234. "SizeRatioCompaction1",
  2235. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:DoneWrite"},
  2236. {"PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2237. "AutoCompactionPicked",
  2238. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:"
  2239. "SizeRatioCompaction2"},
  2240. });
  2241. auto stop_token =
  2242. dbfull()->TEST_write_controler().GetCompactionPressureToken();
  2243. for (int i = 0; i < kNumTrigger - 1; i++) {
  2244. for (int j = 0; j < kNumKeys; j++) {
  2245. ASSERT_OK(Put(Key(i * (kNumKeys - 1) + i), rnd.RandomString(10)));
  2246. dbfull()->TEST_WaitForPeriodicTaskRun(
  2247. [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
  2248. }
  2249. ASSERT_OK(Flush());
  2250. }
  2251. TEST_SYNC_POINT(
  2252. "PrecludeLastLevelTest::PeriodicCompactionToProximalLevel:DoneWrite");
  2253. // wait for periodic compaction time and flush to trigger the periodic
  2254. // compaction, which should be blocked by ongoing compaction in the
  2255. // proximal level
  2256. mock_clock_->MockSleepForSeconds(10000);
  2257. for (int i = 0; i < 3 * kNumKeys; i++) {
  2258. ASSERT_OK(Put(Key(i), rnd.RandomString(10)));
  2259. dbfull()->TEST_WaitForPeriodicTaskRun(
  2260. [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
  2261. }
  2262. ASSERT_OK(Flush());
  2263. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2264. stop_token.reset();
  2265. Close();
  2266. }
  2267. INSTANTIATE_TEST_CASE_P(PrecludeLastLevelOptionalTest,
  2268. PrecludeLastLevelOptionalTest,
  2269. ::testing::Combine(::testing::Bool(),
  2270. ::testing::Bool()));
  2271. // partition the SST into 3 ranges [0, 19] [20, 39] [40, ...]
  2272. class ThreeRangesPartitioner : public SstPartitioner {
  2273. public:
  2274. const char* Name() const override { return "SingleKeySstPartitioner"; }
  2275. PartitionerResult ShouldPartition(
  2276. const PartitionerRequest& request) override {
  2277. if ((cmp->CompareWithoutTimestamp(*request.current_user_key,
  2278. DBTestBase::Key(20)) >= 0 &&
  2279. cmp->CompareWithoutTimestamp(*request.prev_user_key,
  2280. DBTestBase::Key(20)) < 0) ||
  2281. (cmp->CompareWithoutTimestamp(*request.current_user_key,
  2282. DBTestBase::Key(40)) >= 0 &&
  2283. cmp->CompareWithoutTimestamp(*request.prev_user_key,
  2284. DBTestBase::Key(40)) < 0)) {
  2285. return kRequired;
  2286. } else {
  2287. return kNotRequired;
  2288. }
  2289. }
  2290. bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
  2291. const Slice& /*largest_user_key*/) override {
  2292. return false;
  2293. }
  2294. const Comparator* cmp = BytewiseComparator();
  2295. };
  2296. class ThreeRangesPartitionerFactory : public SstPartitionerFactory {
  2297. public:
  2298. static const char* kClassName() {
  2299. return "TombstoneTestSstPartitionerFactory";
  2300. }
  2301. const char* Name() const override { return kClassName(); }
  2302. std::unique_ptr<SstPartitioner> CreatePartitioner(
  2303. const SstPartitioner::Context& /* context */) const override {
  2304. return std::unique_ptr<SstPartitioner>(new ThreeRangesPartitioner());
  2305. }
  2306. };
  2307. TEST_P(PrecludeLastLevelTest, PartialProximalLevelCompaction) {
  2308. const int kNumTrigger = 4;
  2309. const int kNumLevels = 7;
  2310. const int kKeyPerSec = 10;
  2311. Options options = CurrentOptions();
  2312. options.compaction_style = kCompactionStyleUniversal;
  2313. options.env = mock_env_.get();
  2314. options.level0_file_num_compaction_trigger = kNumTrigger;
  2315. options.preserve_internal_time_seconds = 10000;
  2316. options.num_levels = kNumLevels;
  2317. options.statistics = CreateDBStatistics();
  2318. DestroyAndReopen(options);
  2319. Random rnd(301);
  2320. for (int i = 0; i < 300; i++) {
  2321. ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
  2322. dbfull()->TEST_WaitForPeriodicTaskRun(
  2323. [&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
  2324. }
  2325. ASSERT_OK(Flush());
  2326. CompactRangeOptions cro;
  2327. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2328. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2329. // make sure all data is compacted to the last level
  2330. ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
  2331. // Create 3 L5 files
  2332. auto factory = std::make_shared<ThreeRangesPartitionerFactory>();
  2333. options.sst_partitioner_factory = factory;
  2334. Reopen(options);
  2335. for (int i = 0; i < kNumTrigger - 1; i++) {
  2336. for (int j = 0; j < 100; j++) {
  2337. ASSERT_OK(Put(Key(i * 100 + j), rnd.RandomString(10)));
  2338. }
  2339. ASSERT_OK(Flush());
  2340. }
  2341. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2342. // L5: [0,19] [20,39] [40,299]
  2343. // L6: [0, 299]
  2344. ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
  2345. ASSERT_OK(options.statistics->Reset());
  2346. // enable tiered storage feature
  2347. ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"},
  2348. {"last_level_temperature", "kCold"}});
  2349. ColumnFamilyMetaData meta;
  2350. db_->GetColumnFamilyMetaData(&meta);
  2351. ASSERT_EQ(meta.levels[5].files.size(), 3);
  2352. ASSERT_EQ(meta.levels[6].files.size(), 1);
  2353. ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0));
  2354. ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(299));
  2355. std::string file_path = meta.levels[5].files[1].db_path;
  2356. std::vector<std::string> files;
  2357. // pick 3rd file @L5 + file@L6 for compaction
  2358. files.push_back(file_path + "/" + meta.levels[5].files[2].name);
  2359. files.push_back(file_path + "/" + meta.levels[6].files[0].name);
  2360. ASSERT_OK(db_->CompactFiles(CompactionOptions(), files, 6));
  2361. // The compaction only moved partial of the hot data to hot tier, range[0,39]
  2362. // is unsafe to move up, otherwise, they will be overlapped with the existing
  2363. // files@L5.
  2364. // The output should be:
  2365. // L5: [0,19] [20,39] [40,299] <-- Temperature::kUnknown
  2366. // L6: [0,19] [20,39] <-- Temperature::kCold
  2367. // L6 file is split because of the customized partitioner
  2368. ASSERT_EQ("0,0,0,0,0,3,2", FilesPerLevel());
  2369. // even all the data is hot, but not all data are moved to the hot tier
  2370. ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
  2371. ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
  2372. db_->GetColumnFamilyMetaData(&meta);
  2373. ASSERT_EQ(meta.levels[5].files.size(), 3);
  2374. ASSERT_EQ(meta.levels[6].files.size(), 2);
  2375. for (const auto& file : meta.levels[5].files) {
  2376. ASSERT_EQ(file.temperature, Temperature::kUnknown);
  2377. }
  2378. for (const auto& file : meta.levels[6].files) {
  2379. ASSERT_EQ(file.temperature, Temperature::kCold);
  2380. }
  2381. ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0));
  2382. ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(19));
  2383. ASSERT_EQ(meta.levels[6].files[1].smallestkey, Key(20));
  2384. ASSERT_EQ(meta.levels[6].files[1].largestkey, Key(39));
  2385. Close();
  2386. }
  2387. TEST_P(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
  2388. const int kNumLevels = 7;
  2389. const int kSecondsPerKey = 10;
  2390. const int kNumFiles = 3;
  2391. const int kValueBytes = 4 << 10;
  2392. const int kFileBytes = 4 * kValueBytes;
  2393. // `kNumKeysPerFile == 5` is determined by the current file cutting heuristics
  2394. // for this choice of `kValueBytes` and `kFileBytes`.
  2395. const int kNumKeysPerFile = 5;
  2396. const int kNumKeys = kNumFiles * kNumKeysPerFile;
  2397. Options options = CurrentOptions();
  2398. options.compaction_style = kCompactionStyleUniversal;
  2399. options.env = mock_env_.get();
  2400. options.last_level_temperature = Temperature::kCold;
  2401. options.preserve_internal_time_seconds = 600;
  2402. options.preclude_last_level_data_seconds = 1;
  2403. options.num_levels = kNumLevels;
  2404. options.target_file_size_base = kFileBytes;
  2405. DestroyAndReopen(options);
  2406. // Flush an L0 file with the following contents (new to old):
  2407. //
  2408. // Range deletions [4, 6) [7, 8) [9, 11)
  2409. // --- snap2 ---
  2410. // Key(0) .. Key(14)
  2411. // --- snap1 ---
  2412. // Key(3) .. Key(17)
  2413. const auto verify_db = [&]() {
  2414. for (int i = 0; i < kNumKeys; i++) {
  2415. std::string value;
  2416. auto s = db_->Get(ReadOptions(), Key(i), &value);
  2417. if (i == 4 || i == 5 || i == 7 || i == 9 || i == 10) {
  2418. ASSERT_TRUE(s.IsNotFound());
  2419. } else {
  2420. ASSERT_OK(s);
  2421. }
  2422. }
  2423. };
  2424. Random rnd(301);
  2425. for (int i = 0; i < kNumKeys; i++) {
  2426. ASSERT_OK(Put(Key(i + 3), rnd.RandomString(kValueBytes)));
  2427. dbfull()->TEST_WaitForPeriodicTaskRun(
  2428. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerKey); });
  2429. }
  2430. auto* snap1 = db_->GetSnapshot();
  2431. for (int i = 0; i < kNumKeys; i++) {
  2432. ASSERT_OK(Put(Key(i), rnd.RandomString(kValueBytes)));
  2433. dbfull()->TEST_WaitForPeriodicTaskRun(
  2434. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerKey); });
  2435. }
  2436. auto* snap2 = db_->GetSnapshot();
  2437. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  2438. Key(kNumKeysPerFile - 1),
  2439. Key(kNumKeysPerFile + 1)));
  2440. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  2441. Key(kNumKeysPerFile + 2),
  2442. Key(kNumKeysPerFile + 3)));
  2443. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  2444. Key(2 * kNumKeysPerFile - 1),
  2445. Key(2 * kNumKeysPerFile + 1)));
  2446. ASSERT_OK(Flush());
  2447. dbfull()->TEST_WaitForPeriodicTaskRun(
  2448. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerKey); });
  2449. verify_db();
  2450. // Count compactions supporting per-key placement
  2451. std::atomic_int per_key_comp_num = 0;
  2452. SyncPoint::GetInstance()->SetCallBack(
  2453. "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
  2454. auto compaction = static_cast<Compaction*>(arg);
  2455. if (compaction->SupportsPerKeyPlacement()) {
  2456. ASSERT_EQ(compaction->GetProximalOutputRangeType(),
  2457. Compaction::ProximalOutputRangeType::kNonLastRange);
  2458. per_key_comp_num++;
  2459. }
  2460. });
  2461. SyncPoint::GetInstance()->EnableProcessing();
  2462. // The `CompactRange()` writes the following files to L5.
  2463. //
  2464. // [key000000#16,kTypeValue,
  2465. // key000005#kMaxSequenceNumber,kTypeRangeDeletion]
  2466. // [key000005#21,kTypeValue,
  2467. // key000010#kMaxSequenceNumber,kTypeRangeDeletion]
  2468. // [key000010#26,kTypeValue, key000014#30,kTypeValue]
  2469. //
  2470. // And it writes the following files to L6.
  2471. //
  2472. // [key000003#1,kTypeValue, key000007#5,kTypeValue]
  2473. // [key000008#6,kTypeValue, key000012#10,kTypeValue]
  2474. // [key000013#11,kTypeValue, key000017#15,kTypeValue]
  2475. CompactRangeOptions cro;
  2476. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2477. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2478. ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel());
  2479. verify_db();
  2480. // Rewrite the middle file only. File endpoints should not change.
  2481. std::string begin_key_buf = Key(kNumKeysPerFile + 1),
  2482. end_key_buf = Key(kNumKeysPerFile + 2);
  2483. Slice begin_key(begin_key_buf), end_key(end_key_buf);
  2484. ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
  2485. &end_key));
  2486. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2487. ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel());
  2488. ASSERT_EQ(1, per_key_comp_num);
  2489. verify_db();
  2490. // Rewrite the middle file again after releasing snap2. Still file endpoints
  2491. // should not change.
  2492. db_->ReleaseSnapshot(snap2);
  2493. ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
  2494. &end_key));
  2495. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2496. ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel());
  2497. ASSERT_EQ(2, per_key_comp_num);
  2498. verify_db();
  2499. // Middle file once more after releasing snap1. This time the data in the
  2500. // middle L5 file can all be compacted to the last level.
  2501. db_->ReleaseSnapshot(snap1);
  2502. ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
  2503. &end_key));
  2504. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2505. ASSERT_EQ("0,0,0,0,0,2,3", FilesPerLevel());
  2506. ASSERT_EQ(3, per_key_comp_num);
  2507. verify_db();
  2508. // Finish off the proximal level.
  2509. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  2510. ASSERT_EQ("0,0,0,0,0,0,3", FilesPerLevel());
  2511. verify_db();
  2512. Close();
  2513. }
  2514. // Tests DBIter::GetProperty("rocksdb.iterator.write-time") return a data's
  2515. // approximate write unix time.
  2516. class IteratorWriteTimeTest
  2517. : public PrecludeLastLevelTestBase,
  2518. public testing::WithParamInterface<std::tuple<bool, bool>> {
  2519. public:
  2520. IteratorWriteTimeTest()
  2521. : PrecludeLastLevelTestBase("iterator_write_time_test") {}
  2522. bool UseTailingIterator() const { return std::get<0>(GetParam()); }
  2523. bool UseDynamicConfig() const { return std::get<1>(GetParam()); }
  2524. void ApplyConfigChange(
  2525. Options* options,
  2526. const std::unordered_map<std::string, std::string>& config_change,
  2527. const std::unordered_map<std::string, std::string>& db_config_change =
  2528. {}) {
  2529. ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
  2530. db_config_change);
  2531. }
  2532. uint64_t VerifyKeyAndGetWriteTime(Iterator* iter,
  2533. const std::string& expected_key) {
  2534. std::string prop;
  2535. uint64_t write_time = 0;
  2536. EXPECT_TRUE(iter->Valid());
  2537. EXPECT_EQ(expected_key, iter->key());
  2538. EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop));
  2539. Slice prop_slice = prop;
  2540. EXPECT_TRUE(GetFixed64(&prop_slice, &write_time));
  2541. return write_time;
  2542. }
  2543. void VerifyKeyAndWriteTime(Iterator* iter, const std::string& expected_key,
  2544. uint64_t expected_write_time) {
  2545. std::string prop;
  2546. uint64_t write_time = 0;
  2547. EXPECT_TRUE(iter->Valid());
  2548. EXPECT_EQ(expected_key, iter->key());
  2549. EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop));
  2550. Slice prop_slice = prop;
  2551. EXPECT_TRUE(GetFixed64(&prop_slice, &write_time));
  2552. EXPECT_EQ(expected_write_time, write_time);
  2553. }
  2554. };
  2555. TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
  2556. const int kNumTrigger = 4;
  2557. const int kNumLevels = 7;
  2558. const int kNumKeys = 100;
  2559. const int kSecondsPerRecording = 101;
  2560. const int kKeyWithWriteTime = 25;
  2561. const uint64_t kUserSpecifiedWriteTime =
  2562. kMockStartTime + kSecondsPerRecording * 15;
  2563. Options options = CurrentOptions();
  2564. options.compaction_style = kCompactionStyleUniversal;
  2565. options.env = mock_env_.get();
  2566. options.level0_file_num_compaction_trigger = kNumTrigger;
  2567. options.num_levels = kNumLevels;
  2568. DestroyAndReopen(options);
  2569. // While there are issues with tracking seqno 0
  2570. ASSERT_OK(Delete("something_to_bump_seqno"));
  2571. ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
  2572. Random rnd(301);
  2573. for (int i = 0; i < kNumKeys; i++) {
  2574. dbfull()->TEST_WaitForPeriodicTaskRun(
  2575. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
  2576. if (i == kKeyWithWriteTime) {
  2577. ASSERT_OK(
  2578. TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
  2579. } else {
  2580. ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
  2581. }
  2582. }
  2583. ReadOptions ropts;
  2584. ropts.tailing = UseTailingIterator();
  2585. int i;
  2586. // Forward iteration
  2587. uint64_t start_time = 0;
  2588. {
  2589. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2590. for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
  2591. if (start_time == 0) {
  2592. start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
  2593. } else if (i == kKeyWithWriteTime) {
  2594. VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
  2595. } else {
  2596. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2597. start_time + kSecondsPerRecording * (i + 1));
  2598. }
  2599. }
  2600. ASSERT_EQ(kNumKeys, i);
  2601. ASSERT_OK(iter->status());
  2602. }
  2603. // Backward iteration
  2604. {
  2605. ropts.tailing = false;
  2606. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2607. for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid();
  2608. iter->Prev(), i--) {
  2609. if (i == 0) {
  2610. VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
  2611. } else if (i == kKeyWithWriteTime) {
  2612. VerifyKeyAndWriteTime(iter.get(), Key(i), kUserSpecifiedWriteTime);
  2613. } else {
  2614. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2615. start_time + kSecondsPerRecording * (i + 1));
  2616. }
  2617. }
  2618. ASSERT_OK(iter->status());
  2619. ASSERT_EQ(-1, i);
  2620. }
  2621. // Disable the seqno to time recording. Data with user specified write time
  2622. // can still get a write time before it's flushed.
  2623. ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "0"}});
  2624. ASSERT_OK(TimedPut(Key(kKeyWithWriteTime), rnd.RandomString(100),
  2625. kUserSpecifiedWriteTime));
  2626. {
  2627. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2628. iter->Seek(Key(kKeyWithWriteTime));
  2629. VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
  2630. kUserSpecifiedWriteTime);
  2631. ASSERT_OK(iter->status());
  2632. }
  2633. ASSERT_OK(Flush());
  2634. {
  2635. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2636. iter->Seek(Key(kKeyWithWriteTime));
  2637. VerifyKeyAndWriteTime(iter.get(), Key(kKeyWithWriteTime),
  2638. std::numeric_limits<uint64_t>::max());
  2639. ASSERT_OK(iter->status());
  2640. }
  2641. Close();
  2642. }
  2643. TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
  2644. const int kNumTrigger = 4;
  2645. const int kNumLevels = 7;
  2646. const int kNumKeys = 100;
  2647. const int kSecondsPerRecording = 101;
  2648. const int kKeyWithWriteTime = 25;
  2649. const uint64_t kUserSpecifiedWriteTime =
  2650. kMockStartTime + kSecondsPerRecording * 15;
  2651. Options options = CurrentOptions();
  2652. options.compaction_style = kCompactionStyleUniversal;
  2653. options.env = mock_env_.get();
  2654. options.level0_file_num_compaction_trigger = kNumTrigger;
  2655. options.num_levels = kNumLevels;
  2656. DestroyAndReopen(options);
  2657. // While there are issues with tracking seqno 0
  2658. ASSERT_OK(Delete("something_to_bump_seqno"));
  2659. ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
  2660. Random rnd(301);
  2661. for (int i = 0; i < kNumKeys; i++) {
  2662. dbfull()->TEST_WaitForPeriodicTaskRun(
  2663. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
  2664. if (i == kKeyWithWriteTime) {
  2665. ASSERT_OK(
  2666. TimedPut(Key(i), rnd.RandomString(100), kUserSpecifiedWriteTime));
  2667. } else {
  2668. ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
  2669. }
  2670. }
  2671. ASSERT_OK(Flush());
  2672. ReadOptions ropts;
  2673. ropts.tailing = UseTailingIterator();
  2674. std::string prop;
  2675. int i;
  2676. // Forward iteration
  2677. uint64_t start_time = 0;
  2678. {
  2679. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2680. for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
  2681. if (start_time == 0) {
  2682. start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
  2683. } else if (i == kKeyWithWriteTime) {
  2684. // It's not precisely kUserSpecifiedWriteTime, instead it has a margin
  2685. // of error that is one recording apart while we convert write time to
  2686. // sequence number, and then back to write time.
  2687. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2688. kUserSpecifiedWriteTime - kSecondsPerRecording);
  2689. } else {
  2690. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2691. start_time + kSecondsPerRecording * (i + 1));
  2692. }
  2693. }
  2694. ASSERT_OK(iter->status());
  2695. ASSERT_EQ(kNumKeys, i);
  2696. }
  2697. // Backward iteration
  2698. {
  2699. ropts.tailing = false;
  2700. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2701. for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid();
  2702. iter->Prev(), i--) {
  2703. if (i == 0) {
  2704. VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
  2705. } else if (i == kKeyWithWriteTime) {
  2706. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2707. kUserSpecifiedWriteTime - kSecondsPerRecording);
  2708. } else {
  2709. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2710. start_time + kSecondsPerRecording * (i + 1));
  2711. }
  2712. }
  2713. ASSERT_OK(iter->status());
  2714. ASSERT_EQ(-1, i);
  2715. }
  2716. // Disable the seqno to time recording. Data retrieved from SST files still
  2717. // have write time available.
  2718. ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "0"}});
  2719. dbfull()->TEST_WaitForPeriodicTaskRun(
  2720. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
  2721. ASSERT_OK(Put("a", "val"));
  2722. ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty());
  2723. {
  2724. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2725. iter->SeekToFirst();
  2726. ASSERT_TRUE(iter->Valid());
  2727. // "a" is retrieved from memtable, its write time is unknown because the
  2728. // seqno to time mapping recording is not available.
  2729. VerifyKeyAndWriteTime(iter.get(), "a",
  2730. std::numeric_limits<uint64_t>::max());
  2731. for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
  2732. if (i == 0) {
  2733. VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
  2734. } else if (i == kKeyWithWriteTime) {
  2735. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2736. kUserSpecifiedWriteTime - kSecondsPerRecording);
  2737. } else {
  2738. VerifyKeyAndWriteTime(iter.get(), Key(i),
  2739. start_time + kSecondsPerRecording * (i + 1));
  2740. }
  2741. }
  2742. ASSERT_EQ(kNumKeys, i);
  2743. ASSERT_OK(iter->status());
  2744. }
  2745. // There is no write time info for "a" after it's flushed to SST file either.
  2746. ASSERT_OK(Flush());
  2747. {
  2748. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2749. iter->SeekToFirst();
  2750. ASSERT_TRUE(iter->Valid());
  2751. VerifyKeyAndWriteTime(iter.get(), "a",
  2752. std::numeric_limits<uint64_t>::max());
  2753. }
  2754. // Sequence number zeroed out after compacted to the last level, write time
  2755. // all becomes zero.
  2756. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2757. {
  2758. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2759. iter->SeekToFirst();
  2760. for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
  2761. VerifyKeyAndWriteTime(iter.get(), Key(i), 0);
  2762. }
  2763. ASSERT_OK(iter->status());
  2764. ASSERT_EQ(kNumKeys, i);
  2765. }
  2766. Close();
  2767. }
  2768. TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) {
  2769. const int kNumTrigger = 4;
  2770. const int kNumLevels = 7;
  2771. const int kSecondsPerRecording = 101;
  2772. Options options = CurrentOptions();
  2773. options.compaction_style = kCompactionStyleUniversal;
  2774. options.env = mock_env_.get();
  2775. options.level0_file_num_compaction_trigger = kNumTrigger;
  2776. options.num_levels = kNumLevels;
  2777. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  2778. DestroyAndReopen(options);
  2779. ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
  2780. dbfull()->TEST_WaitForPeriodicTaskRun(
  2781. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
  2782. ASSERT_OK(Put("foo", "fv1"));
  2783. dbfull()->TEST_WaitForPeriodicTaskRun(
  2784. [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
  2785. ASSERT_OK(Put("bar", "bv1"));
  2786. ASSERT_OK(Merge("foo", "bv1"));
  2787. ReadOptions ropts;
  2788. ropts.tailing = UseTailingIterator();
  2789. {
  2790. std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
  2791. iter->SeekToFirst();
  2792. uint64_t bar_time = VerifyKeyAndGetWriteTime(iter.get(), "bar");
  2793. iter->Next();
  2794. uint64_t foo_time = VerifyKeyAndGetWriteTime(iter.get(), "foo");
  2795. // "foo" has an older write time because its base value's write time is used
  2796. ASSERT_GT(bar_time, foo_time);
  2797. iter->Next();
  2798. ASSERT_FALSE(iter->Valid());
  2799. ASSERT_OK(iter->status());
  2800. }
  2801. Close();
  2802. }
  2803. INSTANTIATE_TEST_CASE_P(IteratorWriteTimeTest, IteratorWriteTimeTest,
  2804. testing::Combine(testing::Bool(), testing::Bool()));
  2805. } // namespace ROCKSDB_NAMESPACE
  2806. int main(int argc, char** argv) {
  2807. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2808. ::testing::InitGoogleTest(&argc, argv);
  2809. return RUN_ALL_TESTS();
  2810. }