external_sst_file_test.cc 101 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include <functional>
  7. #include "db/db_test_util.h"
  8. #include "file/filename.h"
  9. #include "port/port.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/sst_file_writer.h"
  12. #include "test_util/fault_injection_test_env.h"
  13. #include "test_util/testutil.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. // A test environment that can be configured to fail the Link operation.
  16. class ExternalSSTTestEnv : public EnvWrapper {
  17. public:
  18. ExternalSSTTestEnv(Env* t, bool fail_link)
  19. : EnvWrapper(t), fail_link_(fail_link) {}
  20. Status LinkFile(const std::string& s, const std::string& t) override {
  21. if (fail_link_) {
  22. return Status::NotSupported("Link failed");
  23. }
  24. return target()->LinkFile(s, t);
  25. }
  26. void set_fail_link(bool fail_link) { fail_link_ = fail_link; }
  27. private:
  28. bool fail_link_;
  29. };
  30. class ExternSSTFileLinkFailFallbackTest
  31. : public DBTestBase,
  32. public ::testing::WithParamInterface<std::tuple<bool, bool>> {
  33. public:
  34. ExternSSTFileLinkFailFallbackTest()
  35. : DBTestBase("/external_sst_file_test"),
  36. test_env_(new ExternalSSTTestEnv(env_, true)) {
  37. sst_files_dir_ = dbname_ + "/sst_files/";
  38. test::DestroyDir(env_, sst_files_dir_);
  39. env_->CreateDir(sst_files_dir_);
  40. options_ = CurrentOptions();
  41. options_.disable_auto_compactions = true;
  42. options_.env = test_env_;
  43. }
  44. void TearDown() override {
  45. delete db_;
  46. db_ = nullptr;
  47. ASSERT_OK(DestroyDB(dbname_, options_));
  48. delete test_env_;
  49. test_env_ = nullptr;
  50. }
  51. protected:
  52. std::string sst_files_dir_;
  53. Options options_;
  54. ExternalSSTTestEnv* test_env_;
  55. };
  56. class ExternalSSTFileTest
  57. : public DBTestBase,
  58. public ::testing::WithParamInterface<std::tuple<bool, bool>> {
  59. public:
  60. ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") {
  61. sst_files_dir_ = dbname_ + "/sst_files/";
  62. DestroyAndRecreateExternalSSTFilesDir();
  63. }
  64. void DestroyAndRecreateExternalSSTFilesDir() {
  65. test::DestroyDir(env_, sst_files_dir_);
  66. env_->CreateDir(sst_files_dir_);
  67. }
  68. Status GenerateOneExternalFile(
  69. const Options& options, ColumnFamilyHandle* cfh,
  70. std::vector<std::pair<std::string, std::string>>& data, int file_id,
  71. bool sort_data, std::string* external_file_path,
  72. std::map<std::string, std::string>* true_data) {
  73. // Generate a file id if not provided
  74. if (-1 == file_id) {
  75. file_id = (++last_file_id_);
  76. }
  77. // Sort data if asked to do so
  78. if (sort_data) {
  79. std::sort(data.begin(), data.end(),
  80. [&](const std::pair<std::string, std::string>& e1,
  81. const std::pair<std::string, std::string>& e2) {
  82. return options.comparator->Compare(e1.first, e2.first) < 0;
  83. });
  84. auto uniq_iter = std::unique(
  85. data.begin(), data.end(),
  86. [&](const std::pair<std::string, std::string>& e1,
  87. const std::pair<std::string, std::string>& e2) {
  88. return options.comparator->Compare(e1.first, e2.first) == 0;
  89. });
  90. data.resize(uniq_iter - data.begin());
  91. }
  92. std::string file_path = sst_files_dir_ + ToString(file_id);
  93. SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
  94. Status s = sst_file_writer.Open(file_path);
  95. if (!s.ok()) {
  96. return s;
  97. }
  98. for (const auto& entry : data) {
  99. s = sst_file_writer.Put(entry.first, entry.second);
  100. if (!s.ok()) {
  101. sst_file_writer.Finish();
  102. return s;
  103. }
  104. }
  105. s = sst_file_writer.Finish();
  106. if (s.ok() && external_file_path != nullptr) {
  107. *external_file_path = file_path;
  108. }
  109. if (s.ok() && nullptr != true_data) {
  110. for (const auto& entry : data) {
  111. true_data->insert({entry.first, entry.second});
  112. }
  113. }
  114. return s;
  115. }
  116. Status GenerateAndAddExternalFile(
  117. const Options options,
  118. std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
  119. bool allow_global_seqno = false, bool write_global_seqno = false,
  120. bool verify_checksums_before_ingest = true, bool ingest_behind = false,
  121. bool sort_data = false,
  122. std::map<std::string, std::string>* true_data = nullptr,
  123. ColumnFamilyHandle* cfh = nullptr) {
  124. // Generate a file id if not provided
  125. if (file_id == -1) {
  126. file_id = last_file_id_ + 1;
  127. last_file_id_++;
  128. }
  129. // Sort data if asked to do so
  130. if (sort_data) {
  131. std::sort(data.begin(), data.end(),
  132. [&](const std::pair<std::string, std::string>& e1,
  133. const std::pair<std::string, std::string>& e2) {
  134. return options.comparator->Compare(e1.first, e2.first) < 0;
  135. });
  136. auto uniq_iter = std::unique(
  137. data.begin(), data.end(),
  138. [&](const std::pair<std::string, std::string>& e1,
  139. const std::pair<std::string, std::string>& e2) {
  140. return options.comparator->Compare(e1.first, e2.first) == 0;
  141. });
  142. data.resize(uniq_iter - data.begin());
  143. }
  144. std::string file_path = sst_files_dir_ + ToString(file_id);
  145. SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
  146. Status s = sst_file_writer.Open(file_path);
  147. if (!s.ok()) {
  148. return s;
  149. }
  150. for (auto& entry : data) {
  151. s = sst_file_writer.Put(entry.first, entry.second);
  152. if (!s.ok()) {
  153. sst_file_writer.Finish();
  154. return s;
  155. }
  156. }
  157. s = sst_file_writer.Finish();
  158. if (s.ok()) {
  159. IngestExternalFileOptions ifo;
  160. ifo.allow_global_seqno = allow_global_seqno;
  161. ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false;
  162. ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
  163. ifo.ingest_behind = ingest_behind;
  164. if (cfh) {
  165. s = db_->IngestExternalFile(cfh, {file_path}, ifo);
  166. } else {
  167. s = db_->IngestExternalFile({file_path}, ifo);
  168. }
  169. }
  170. if (s.ok() && true_data) {
  171. for (auto& entry : data) {
  172. (*true_data)[entry.first] = entry.second;
  173. }
  174. }
  175. return s;
  176. }
  177. Status GenerateAndAddExternalFiles(
  178. const Options& options,
  179. const std::vector<ColumnFamilyHandle*>& column_families,
  180. const std::vector<IngestExternalFileOptions>& ifos,
  181. std::vector<std::vector<std::pair<std::string, std::string>>>& data,
  182. int file_id, bool sort_data,
  183. std::vector<std::map<std::string, std::string>>& true_data) {
  184. if (-1 == file_id) {
  185. file_id = (++last_file_id_);
  186. }
  187. // Generate external SST files, one for each column family
  188. size_t num_cfs = column_families.size();
  189. assert(ifos.size() == num_cfs);
  190. assert(data.size() == num_cfs);
  191. Status s;
  192. std::vector<IngestExternalFileArg> args(num_cfs);
  193. for (size_t i = 0; i != num_cfs; ++i) {
  194. std::string external_file_path;
  195. s = GenerateOneExternalFile(
  196. options, column_families[i], data[i], file_id, sort_data,
  197. &external_file_path,
  198. true_data.size() == num_cfs ? &true_data[i] : nullptr);
  199. if (!s.ok()) {
  200. return s;
  201. }
  202. ++file_id;
  203. args[i].column_family = column_families[i];
  204. args[i].external_files.push_back(external_file_path);
  205. args[i].options = ifos[i];
  206. }
  207. s = db_->IngestExternalFiles(args);
  208. return s;
  209. }
  210. Status GenerateAndAddExternalFile(
  211. const Options options, std::vector<std::pair<int, std::string>> data,
  212. int file_id = -1, bool allow_global_seqno = false,
  213. bool write_global_seqno = false,
  214. bool verify_checksums_before_ingest = true, bool ingest_behind = false,
  215. bool sort_data = false,
  216. std::map<std::string, std::string>* true_data = nullptr,
  217. ColumnFamilyHandle* cfh = nullptr) {
  218. std::vector<std::pair<std::string, std::string>> file_data;
  219. for (auto& entry : data) {
  220. file_data.emplace_back(Key(entry.first), entry.second);
  221. }
  222. return GenerateAndAddExternalFile(options, file_data, file_id,
  223. allow_global_seqno, write_global_seqno,
  224. verify_checksums_before_ingest,
  225. ingest_behind, sort_data, true_data, cfh);
  226. }
  227. Status GenerateAndAddExternalFile(
  228. const Options options, std::vector<int> keys, int file_id = -1,
  229. bool allow_global_seqno = false, bool write_global_seqno = false,
  230. bool verify_checksums_before_ingest = true, bool ingest_behind = false,
  231. bool sort_data = false,
  232. std::map<std::string, std::string>* true_data = nullptr,
  233. ColumnFamilyHandle* cfh = nullptr) {
  234. std::vector<std::pair<std::string, std::string>> file_data;
  235. for (auto& k : keys) {
  236. file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
  237. }
  238. return GenerateAndAddExternalFile(options, file_data, file_id,
  239. allow_global_seqno, write_global_seqno,
  240. verify_checksums_before_ingest,
  241. ingest_behind, sort_data, true_data, cfh);
  242. }
  243. Status DeprecatedAddFile(const std::vector<std::string>& files,
  244. bool move_files = false,
  245. bool skip_snapshot_check = false,
  246. bool skip_write_global_seqno = false) {
  247. IngestExternalFileOptions opts;
  248. opts.move_files = move_files;
  249. opts.snapshot_consistency = !skip_snapshot_check;
  250. opts.allow_global_seqno = false;
  251. opts.allow_blocking_flush = false;
  252. opts.write_global_seqno = !skip_write_global_seqno;
  253. return db_->IngestExternalFile(files, opts);
  254. }
  255. ~ExternalSSTFileTest() override { test::DestroyDir(env_, sst_files_dir_); }
  256. protected:
  257. int last_file_id_ = 0;
  258. std::string sst_files_dir_;
  259. };
  260. TEST_F(ExternalSSTFileTest, Basic) {
  261. do {
  262. Options options = CurrentOptions();
  263. SstFileWriter sst_file_writer(EnvOptions(), options);
  264. // Current file size should be 0 after sst_file_writer init and before open a file.
  265. ASSERT_EQ(sst_file_writer.FileSize(), 0);
  266. // file1.sst (0 => 99)
  267. std::string file1 = sst_files_dir_ + "file1.sst";
  268. ASSERT_OK(sst_file_writer.Open(file1));
  269. for (int k = 0; k < 100; k++) {
  270. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  271. }
  272. ExternalSstFileInfo file1_info;
  273. Status s = sst_file_writer.Finish(&file1_info);
  274. ASSERT_TRUE(s.ok()) << s.ToString();
  275. // Current file size should be non-zero after success write.
  276. ASSERT_GT(sst_file_writer.FileSize(), 0);
  277. ASSERT_EQ(file1_info.file_path, file1);
  278. ASSERT_EQ(file1_info.num_entries, 100);
  279. ASSERT_EQ(file1_info.smallest_key, Key(0));
  280. ASSERT_EQ(file1_info.largest_key, Key(99));
  281. ASSERT_EQ(file1_info.num_range_del_entries, 0);
  282. ASSERT_EQ(file1_info.smallest_range_del_key, "");
  283. ASSERT_EQ(file1_info.largest_range_del_key, "");
  284. // sst_file_writer already finished, cannot add this value
  285. s = sst_file_writer.Put(Key(100), "bad_val");
  286. ASSERT_FALSE(s.ok()) << s.ToString();
  287. // file2.sst (100 => 199)
  288. std::string file2 = sst_files_dir_ + "file2.sst";
  289. ASSERT_OK(sst_file_writer.Open(file2));
  290. for (int k = 100; k < 200; k++) {
  291. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  292. }
  293. // Cannot add this key because it's not after last added key
  294. s = sst_file_writer.Put(Key(99), "bad_val");
  295. ASSERT_FALSE(s.ok()) << s.ToString();
  296. ExternalSstFileInfo file2_info;
  297. s = sst_file_writer.Finish(&file2_info);
  298. ASSERT_TRUE(s.ok()) << s.ToString();
  299. ASSERT_EQ(file2_info.file_path, file2);
  300. ASSERT_EQ(file2_info.num_entries, 100);
  301. ASSERT_EQ(file2_info.smallest_key, Key(100));
  302. ASSERT_EQ(file2_info.largest_key, Key(199));
  303. // file3.sst (195 => 299)
  304. // This file values overlap with file2 values
  305. std::string file3 = sst_files_dir_ + "file3.sst";
  306. ASSERT_OK(sst_file_writer.Open(file3));
  307. for (int k = 195; k < 300; k++) {
  308. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  309. }
  310. ExternalSstFileInfo file3_info;
  311. s = sst_file_writer.Finish(&file3_info);
  312. ASSERT_TRUE(s.ok()) << s.ToString();
  313. // Current file size should be non-zero after success finish.
  314. ASSERT_GT(sst_file_writer.FileSize(), 0);
  315. ASSERT_EQ(file3_info.file_path, file3);
  316. ASSERT_EQ(file3_info.num_entries, 105);
  317. ASSERT_EQ(file3_info.smallest_key, Key(195));
  318. ASSERT_EQ(file3_info.largest_key, Key(299));
  319. // file4.sst (30 => 39)
  320. // This file values overlap with file1 values
  321. std::string file4 = sst_files_dir_ + "file4.sst";
  322. ASSERT_OK(sst_file_writer.Open(file4));
  323. for (int k = 30; k < 40; k++) {
  324. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  325. }
  326. ExternalSstFileInfo file4_info;
  327. s = sst_file_writer.Finish(&file4_info);
  328. ASSERT_TRUE(s.ok()) << s.ToString();
  329. ASSERT_EQ(file4_info.file_path, file4);
  330. ASSERT_EQ(file4_info.num_entries, 10);
  331. ASSERT_EQ(file4_info.smallest_key, Key(30));
  332. ASSERT_EQ(file4_info.largest_key, Key(39));
  333. // file5.sst (400 => 499)
  334. std::string file5 = sst_files_dir_ + "file5.sst";
  335. ASSERT_OK(sst_file_writer.Open(file5));
  336. for (int k = 400; k < 500; k++) {
  337. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  338. }
  339. ExternalSstFileInfo file5_info;
  340. s = sst_file_writer.Finish(&file5_info);
  341. ASSERT_TRUE(s.ok()) << s.ToString();
  342. ASSERT_EQ(file5_info.file_path, file5);
  343. ASSERT_EQ(file5_info.num_entries, 100);
  344. ASSERT_EQ(file5_info.smallest_key, Key(400));
  345. ASSERT_EQ(file5_info.largest_key, Key(499));
  346. // file6.sst (delete 400 => 500)
  347. std::string file6 = sst_files_dir_ + "file6.sst";
  348. ASSERT_OK(sst_file_writer.Open(file6));
  349. sst_file_writer.DeleteRange(Key(400), Key(500));
  350. ExternalSstFileInfo file6_info;
  351. s = sst_file_writer.Finish(&file6_info);
  352. ASSERT_TRUE(s.ok()) << s.ToString();
  353. ASSERT_EQ(file6_info.file_path, file6);
  354. ASSERT_EQ(file6_info.num_entries, 0);
  355. ASSERT_EQ(file6_info.smallest_key, "");
  356. ASSERT_EQ(file6_info.largest_key, "");
  357. ASSERT_EQ(file6_info.num_range_del_entries, 1);
  358. ASSERT_EQ(file6_info.smallest_range_del_key, Key(400));
  359. ASSERT_EQ(file6_info.largest_range_del_key, Key(500));
  360. // file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
  361. std::string file7 = sst_files_dir_ + "file7.sst";
  362. ASSERT_OK(sst_file_writer.Open(file7));
  363. sst_file_writer.DeleteRange(Key(500), Key(550));
  364. for (int k = 520; k < 560; k += 2) {
  365. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  366. }
  367. sst_file_writer.DeleteRange(Key(525), Key(575));
  368. for (int k = 560; k < 600; k += 2) {
  369. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  370. }
  371. ExternalSstFileInfo file7_info;
  372. s = sst_file_writer.Finish(&file7_info);
  373. ASSERT_TRUE(s.ok()) << s.ToString();
  374. ASSERT_EQ(file7_info.file_path, file7);
  375. ASSERT_EQ(file7_info.num_entries, 40);
  376. ASSERT_EQ(file7_info.smallest_key, Key(520));
  377. ASSERT_EQ(file7_info.largest_key, Key(598));
  378. ASSERT_EQ(file7_info.num_range_del_entries, 2);
  379. ASSERT_EQ(file7_info.smallest_range_del_key, Key(500));
  380. ASSERT_EQ(file7_info.largest_range_del_key, Key(575));
  381. // file8.sst (delete 600 => 700)
  382. std::string file8 = sst_files_dir_ + "file8.sst";
  383. ASSERT_OK(sst_file_writer.Open(file8));
  384. sst_file_writer.DeleteRange(Key(600), Key(700));
  385. ExternalSstFileInfo file8_info;
  386. s = sst_file_writer.Finish(&file8_info);
  387. ASSERT_TRUE(s.ok()) << s.ToString();
  388. ASSERT_EQ(file8_info.file_path, file8);
  389. ASSERT_EQ(file8_info.num_entries, 0);
  390. ASSERT_EQ(file8_info.smallest_key, "");
  391. ASSERT_EQ(file8_info.largest_key, "");
  392. ASSERT_EQ(file8_info.num_range_del_entries, 1);
  393. ASSERT_EQ(file8_info.smallest_range_del_key, Key(600));
  394. ASSERT_EQ(file8_info.largest_range_del_key, Key(700));
  395. // Cannot create an empty sst file
  396. std::string file_empty = sst_files_dir_ + "file_empty.sst";
  397. ExternalSstFileInfo file_empty_info;
  398. s = sst_file_writer.Finish(&file_empty_info);
  399. ASSERT_NOK(s);
  400. DestroyAndReopen(options);
  401. // Add file using file path
  402. s = DeprecatedAddFile({file1});
  403. ASSERT_TRUE(s.ok()) << s.ToString();
  404. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  405. for (int k = 0; k < 100; k++) {
  406. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  407. }
  408. // Add file while holding a snapshot will fail
  409. const Snapshot* s1 = db_->GetSnapshot();
  410. if (s1 != nullptr) {
  411. ASSERT_NOK(DeprecatedAddFile({file2}));
  412. db_->ReleaseSnapshot(s1);
  413. }
  414. // We can add the file after releaseing the snapshot
  415. ASSERT_OK(DeprecatedAddFile({file2}));
  416. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  417. for (int k = 0; k < 200; k++) {
  418. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  419. }
  420. // This file has overlapping values with the existing data
  421. s = DeprecatedAddFile({file3});
  422. ASSERT_FALSE(s.ok()) << s.ToString();
  423. // This file has overlapping values with the existing data
  424. s = DeprecatedAddFile({file4});
  425. ASSERT_FALSE(s.ok()) << s.ToString();
  426. // Overwrite values of keys divisible by 5
  427. for (int k = 0; k < 200; k += 5) {
  428. ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
  429. }
  430. ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
  431. // Key range of file5 (400 => 499) dont overlap with any keys in DB
  432. ASSERT_OK(DeprecatedAddFile({file5}));
  433. // This file has overlapping values with the existing data
  434. s = DeprecatedAddFile({file6});
  435. ASSERT_FALSE(s.ok()) << s.ToString();
  436. // Key range of file7 (500 => 598) dont overlap with any keys in DB
  437. ASSERT_OK(DeprecatedAddFile({file7}));
  438. // Key range of file7 (600 => 700) dont overlap with any keys in DB
  439. ASSERT_OK(DeprecatedAddFile({file8}));
  440. // Make sure values are correct before and after flush/compaction
  441. for (int i = 0; i < 2; i++) {
  442. for (int k = 0; k < 200; k++) {
  443. std::string value = Key(k) + "_val";
  444. if (k % 5 == 0) {
  445. value += "_new";
  446. }
  447. ASSERT_EQ(Get(Key(k)), value);
  448. }
  449. for (int k = 400; k < 500; k++) {
  450. std::string value = Key(k) + "_val";
  451. ASSERT_EQ(Get(Key(k)), value);
  452. }
  453. for (int k = 500; k < 600; k++) {
  454. std::string value = Key(k) + "_val";
  455. if (k < 520 || k % 2 == 1) {
  456. value = "NOT_FOUND";
  457. }
  458. ASSERT_EQ(Get(Key(k)), value);
  459. }
  460. ASSERT_OK(Flush());
  461. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  462. }
  463. Close();
  464. options.disable_auto_compactions = true;
  465. Reopen(options);
  466. // Delete keys in range (400 => 499)
  467. for (int k = 400; k < 500; k++) {
  468. ASSERT_OK(Delete(Key(k)));
  469. }
  470. // We deleted range (400 => 499) but cannot add file5 because
  471. // of the range tombstones
  472. ASSERT_NOK(DeprecatedAddFile({file5}));
  473. // Compacting the DB will remove the tombstones
  474. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  475. // Now we can add the file
  476. ASSERT_OK(DeprecatedAddFile({file5}));
  477. // Verify values of file5 in DB
  478. for (int k = 400; k < 500; k++) {
  479. std::string value = Key(k) + "_val";
  480. ASSERT_EQ(Get(Key(k)), value);
  481. }
  482. DestroyAndRecreateExternalSSTFilesDir();
  483. } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
  484. kRangeDelSkipConfigs));
  485. }
  486. class SstFileWriterCollector : public TablePropertiesCollector {
  487. public:
  488. explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) {
  489. name_ = prefix_ + "_SstFileWriterCollector";
  490. }
  491. const char* Name() const override { return name_.c_str(); }
  492. Status Finish(UserCollectedProperties* properties) override {
  493. std::string count = std::to_string(count_);
  494. *properties = UserCollectedProperties{
  495. {prefix_ + "_SstFileWriterCollector", "YES"},
  496. {prefix_ + "_Count", count},
  497. };
  498. return Status::OK();
  499. }
  500. Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
  501. EntryType /*type*/, SequenceNumber /*seq*/,
  502. uint64_t /*file_size*/) override {
  503. ++count_;
  504. return Status::OK();
  505. }
  506. UserCollectedProperties GetReadableProperties() const override {
  507. return UserCollectedProperties{};
  508. }
  509. private:
  510. uint32_t count_ = 0;
  511. std::string prefix_;
  512. std::string name_;
  513. };
  514. class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory {
  515. public:
  516. explicit SstFileWriterCollectorFactory(std::string prefix)
  517. : prefix_(prefix), num_created_(0) {}
  518. TablePropertiesCollector* CreateTablePropertiesCollector(
  519. TablePropertiesCollectorFactory::Context /*context*/) override {
  520. num_created_++;
  521. return new SstFileWriterCollector(prefix_);
  522. }
  523. const char* Name() const override { return "SstFileWriterCollectorFactory"; }
  524. std::string prefix_;
  525. uint32_t num_created_;
  526. };
  527. TEST_F(ExternalSSTFileTest, AddList) {
  528. do {
  529. Options options = CurrentOptions();
  530. auto abc_collector = std::make_shared<SstFileWriterCollectorFactory>("abc");
  531. auto xyz_collector = std::make_shared<SstFileWriterCollectorFactory>("xyz");
  532. options.table_properties_collector_factories.emplace_back(abc_collector);
  533. options.table_properties_collector_factories.emplace_back(xyz_collector);
  534. SstFileWriter sst_file_writer(EnvOptions(), options);
  535. // file1.sst (0 => 99)
  536. std::string file1 = sst_files_dir_ + "file1.sst";
  537. ASSERT_OK(sst_file_writer.Open(file1));
  538. for (int k = 0; k < 100; k++) {
  539. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  540. }
  541. ExternalSstFileInfo file1_info;
  542. Status s = sst_file_writer.Finish(&file1_info);
  543. ASSERT_TRUE(s.ok()) << s.ToString();
  544. ASSERT_EQ(file1_info.file_path, file1);
  545. ASSERT_EQ(file1_info.num_entries, 100);
  546. ASSERT_EQ(file1_info.smallest_key, Key(0));
  547. ASSERT_EQ(file1_info.largest_key, Key(99));
  548. // sst_file_writer already finished, cannot add this value
  549. s = sst_file_writer.Put(Key(100), "bad_val");
  550. ASSERT_FALSE(s.ok()) << s.ToString();
  551. // file2.sst (100 => 199)
  552. std::string file2 = sst_files_dir_ + "file2.sst";
  553. ASSERT_OK(sst_file_writer.Open(file2));
  554. for (int k = 100; k < 200; k++) {
  555. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  556. }
  557. // Cannot add this key because it's not after last added key
  558. s = sst_file_writer.Put(Key(99), "bad_val");
  559. ASSERT_FALSE(s.ok()) << s.ToString();
  560. ExternalSstFileInfo file2_info;
  561. s = sst_file_writer.Finish(&file2_info);
  562. ASSERT_TRUE(s.ok()) << s.ToString();
  563. ASSERT_EQ(file2_info.file_path, file2);
  564. ASSERT_EQ(file2_info.num_entries, 100);
  565. ASSERT_EQ(file2_info.smallest_key, Key(100));
  566. ASSERT_EQ(file2_info.largest_key, Key(199));
  567. // file3.sst (195 => 199)
  568. // This file values overlap with file2 values
  569. std::string file3 = sst_files_dir_ + "file3.sst";
  570. ASSERT_OK(sst_file_writer.Open(file3));
  571. for (int k = 195; k < 200; k++) {
  572. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  573. }
  574. ExternalSstFileInfo file3_info;
  575. s = sst_file_writer.Finish(&file3_info);
  576. ASSERT_TRUE(s.ok()) << s.ToString();
  577. ASSERT_EQ(file3_info.file_path, file3);
  578. ASSERT_EQ(file3_info.num_entries, 5);
  579. ASSERT_EQ(file3_info.smallest_key, Key(195));
  580. ASSERT_EQ(file3_info.largest_key, Key(199));
  581. // file4.sst (30 => 39)
  582. // This file values overlap with file1 values
  583. std::string file4 = sst_files_dir_ + "file4.sst";
  584. ASSERT_OK(sst_file_writer.Open(file4));
  585. for (int k = 30; k < 40; k++) {
  586. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  587. }
  588. ExternalSstFileInfo file4_info;
  589. s = sst_file_writer.Finish(&file4_info);
  590. ASSERT_TRUE(s.ok()) << s.ToString();
  591. ASSERT_EQ(file4_info.file_path, file4);
  592. ASSERT_EQ(file4_info.num_entries, 10);
  593. ASSERT_EQ(file4_info.smallest_key, Key(30));
  594. ASSERT_EQ(file4_info.largest_key, Key(39));
  595. // file5.sst (200 => 299)
  596. std::string file5 = sst_files_dir_ + "file5.sst";
  597. ASSERT_OK(sst_file_writer.Open(file5));
  598. for (int k = 200; k < 300; k++) {
  599. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  600. }
  601. ExternalSstFileInfo file5_info;
  602. s = sst_file_writer.Finish(&file5_info);
  603. ASSERT_TRUE(s.ok()) << s.ToString();
  604. ASSERT_EQ(file5_info.file_path, file5);
  605. ASSERT_EQ(file5_info.num_entries, 100);
  606. ASSERT_EQ(file5_info.smallest_key, Key(200));
  607. ASSERT_EQ(file5_info.largest_key, Key(299));
  608. // file6.sst (delete 0 => 100)
  609. std::string file6 = sst_files_dir_ + "file6.sst";
  610. ASSERT_OK(sst_file_writer.Open(file6));
  611. ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
  612. ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
  613. ExternalSstFileInfo file6_info;
  614. s = sst_file_writer.Finish(&file6_info);
  615. ASSERT_TRUE(s.ok()) << s.ToString();
  616. ASSERT_EQ(file6_info.file_path, file6);
  617. ASSERT_EQ(file6_info.num_entries, 0);
  618. ASSERT_EQ(file6_info.smallest_key, "");
  619. ASSERT_EQ(file6_info.largest_key, "");
  620. ASSERT_EQ(file6_info.num_range_del_entries, 2);
  621. ASSERT_EQ(file6_info.smallest_range_del_key, Key(0));
  622. ASSERT_EQ(file6_info.largest_range_del_key, Key(100));
  623. // file7.sst (delete 99 => 201)
  624. std::string file7 = sst_files_dir_ + "file7.sst";
  625. ASSERT_OK(sst_file_writer.Open(file7));
  626. ASSERT_OK(sst_file_writer.DeleteRange(Key(99), Key(201)));
  627. ExternalSstFileInfo file7_info;
  628. s = sst_file_writer.Finish(&file7_info);
  629. ASSERT_TRUE(s.ok()) << s.ToString();
  630. ASSERT_EQ(file7_info.file_path, file7);
  631. ASSERT_EQ(file7_info.num_entries, 0);
  632. ASSERT_EQ(file7_info.smallest_key, "");
  633. ASSERT_EQ(file7_info.largest_key, "");
  634. ASSERT_EQ(file7_info.num_range_del_entries, 1);
  635. ASSERT_EQ(file7_info.smallest_range_del_key, Key(99));
  636. ASSERT_EQ(file7_info.largest_range_del_key, Key(201));
  637. // list 1 has internal key range conflict
  638. std::vector<std::string> file_list0({file1, file2});
  639. std::vector<std::string> file_list1({file3, file2, file1});
  640. std::vector<std::string> file_list2({file5});
  641. std::vector<std::string> file_list3({file3, file4});
  642. std::vector<std::string> file_list4({file5, file7});
  643. std::vector<std::string> file_list5({file6, file7});
  644. DestroyAndReopen(options);
  645. // These lists of files have key ranges that overlap with each other
  646. s = DeprecatedAddFile(file_list1);
  647. ASSERT_FALSE(s.ok()) << s.ToString();
  648. // Both of the following overlap on the range deletion tombstone.
  649. s = DeprecatedAddFile(file_list4);
  650. ASSERT_FALSE(s.ok()) << s.ToString();
  651. s = DeprecatedAddFile(file_list5);
  652. ASSERT_FALSE(s.ok()) << s.ToString();
  653. // Add files using file path list
  654. s = DeprecatedAddFile(file_list0);
  655. ASSERT_TRUE(s.ok()) << s.ToString();
  656. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  657. for (int k = 0; k < 200; k++) {
  658. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  659. }
  660. TablePropertiesCollection props;
  661. ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
  662. ASSERT_EQ(props.size(), 2);
  663. for (auto file_props : props) {
  664. auto user_props = file_props.second->user_collected_properties;
  665. ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
  666. ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
  667. ASSERT_EQ(user_props["abc_Count"], "100");
  668. ASSERT_EQ(user_props["xyz_Count"], "100");
  669. }
  670. // Add file while holding a snapshot will fail
  671. const Snapshot* s1 = db_->GetSnapshot();
  672. if (s1 != nullptr) {
  673. ASSERT_NOK(DeprecatedAddFile(file_list2));
  674. db_->ReleaseSnapshot(s1);
  675. }
  676. // We can add the file after releaseing the snapshot
  677. ASSERT_OK(DeprecatedAddFile(file_list2));
  678. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  679. for (int k = 0; k < 300; k++) {
  680. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  681. }
  682. ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
  683. ASSERT_EQ(props.size(), 3);
  684. for (auto file_props : props) {
  685. auto user_props = file_props.second->user_collected_properties;
  686. ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
  687. ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
  688. ASSERT_EQ(user_props["abc_Count"], "100");
  689. ASSERT_EQ(user_props["xyz_Count"], "100");
  690. }
  691. // This file list has overlapping values with the existing data
  692. s = DeprecatedAddFile(file_list3);
  693. ASSERT_FALSE(s.ok()) << s.ToString();
  694. // Overwrite values of keys divisible by 5
  695. for (int k = 0; k < 200; k += 5) {
  696. ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
  697. }
  698. ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
  699. // Make sure values are correct before and after flush/compaction
  700. for (int i = 0; i < 2; i++) {
  701. for (int k = 0; k < 200; k++) {
  702. std::string value = Key(k) + "_val";
  703. if (k % 5 == 0) {
  704. value += "_new";
  705. }
  706. ASSERT_EQ(Get(Key(k)), value);
  707. }
  708. for (int k = 200; k < 300; k++) {
  709. std::string value = Key(k) + "_val";
  710. ASSERT_EQ(Get(Key(k)), value);
  711. }
  712. ASSERT_OK(Flush());
  713. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  714. }
  715. // Delete keys in range (200 => 299)
  716. for (int k = 200; k < 300; k++) {
  717. ASSERT_OK(Delete(Key(k)));
  718. }
  719. // We deleted range (200 => 299) but cannot add file5 because
  720. // of the range tombstones
  721. ASSERT_NOK(DeprecatedAddFile(file_list2));
  722. // Compacting the DB will remove the tombstones
  723. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  724. // Now we can add the file
  725. ASSERT_OK(DeprecatedAddFile(file_list2));
  726. // Verify values of file5 in DB
  727. for (int k = 200; k < 300; k++) {
  728. std::string value = Key(k) + "_val";
  729. ASSERT_EQ(Get(Key(k)), value);
  730. }
  731. DestroyAndRecreateExternalSSTFilesDir();
  732. } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
  733. kRangeDelSkipConfigs));
  734. }
  735. TEST_F(ExternalSSTFileTest, AddListAtomicity) {
  736. do {
  737. Options options = CurrentOptions();
  738. SstFileWriter sst_file_writer(EnvOptions(), options);
  739. // files[0].sst (0 => 99)
  740. // files[1].sst (100 => 199)
  741. // ...
  742. // file[8].sst (800 => 899)
  743. int n = 9;
  744. std::vector<std::string> files(n);
  745. std::vector<ExternalSstFileInfo> files_info(n);
  746. for (int i = 0; i < n; i++) {
  747. files[i] = sst_files_dir_ + "file" + std::to_string(i) + ".sst";
  748. ASSERT_OK(sst_file_writer.Open(files[i]));
  749. for (int k = i * 100; k < (i + 1) * 100; k++) {
  750. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  751. }
  752. Status s = sst_file_writer.Finish(&files_info[i]);
  753. ASSERT_TRUE(s.ok()) << s.ToString();
  754. ASSERT_EQ(files_info[i].file_path, files[i]);
  755. ASSERT_EQ(files_info[i].num_entries, 100);
  756. ASSERT_EQ(files_info[i].smallest_key, Key(i * 100));
  757. ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1));
  758. }
  759. files.push_back(sst_files_dir_ + "file" + std::to_string(n) + ".sst");
  760. auto s = DeprecatedAddFile(files);
  761. ASSERT_NOK(s) << s.ToString();
  762. for (int k = 0; k < n * 100; k++) {
  763. ASSERT_EQ("NOT_FOUND", Get(Key(k)));
  764. }
  765. files.pop_back();
  766. ASSERT_OK(DeprecatedAddFile(files));
  767. for (int k = 0; k < n * 100; k++) {
  768. std::string value = Key(k) + "_val";
  769. ASSERT_EQ(Get(Key(k)), value);
  770. }
  771. DestroyAndRecreateExternalSSTFilesDir();
  772. } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
  773. }
  774. // This test reporduce a bug that can happen in some cases if the DB started
  775. // purging obsolete files when we are adding an external sst file.
  776. // This situation may result in deleting the file while it's being added.
  777. TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
  778. Options options = CurrentOptions();
  779. SstFileWriter sst_file_writer(EnvOptions(), options);
  780. // file1.sst (0 => 500)
  781. std::string sst_file_path = sst_files_dir_ + "file1.sst";
  782. Status s = sst_file_writer.Open(sst_file_path);
  783. ASSERT_OK(s);
  784. for (int i = 0; i < 500; i++) {
  785. std::string k = Key(i);
  786. s = sst_file_writer.Put(k, k + "_val");
  787. ASSERT_OK(s);
  788. }
  789. ExternalSstFileInfo sst_file_info;
  790. s = sst_file_writer.Finish(&sst_file_info);
  791. ASSERT_OK(s);
  792. options.delete_obsolete_files_period_micros = 0;
  793. options.disable_auto_compactions = true;
  794. DestroyAndReopen(options);
  795. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  796. "ExternalSstFileIngestionJob::Prepare:FileAdded", [&](void* /* arg */) {
  797. ASSERT_OK(Put("aaa", "bbb"));
  798. ASSERT_OK(Flush());
  799. ASSERT_OK(Put("aaa", "xxx"));
  800. ASSERT_OK(Flush());
  801. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  802. });
  803. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  804. s = DeprecatedAddFile({sst_file_path});
  805. ASSERT_OK(s);
  806. for (int i = 0; i < 500; i++) {
  807. std::string k = Key(i);
  808. std::string v = k + "_val";
  809. ASSERT_EQ(Get(k), v);
  810. }
  811. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  812. }
  813. TEST_F(ExternalSSTFileTest, SkipSnapshot) {
  814. Options options = CurrentOptions();
  815. SstFileWriter sst_file_writer(EnvOptions(), options);
  816. // file1.sst (0 => 99)
  817. std::string file1 = sst_files_dir_ + "file1.sst";
  818. ASSERT_OK(sst_file_writer.Open(file1));
  819. for (int k = 0; k < 100; k++) {
  820. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  821. }
  822. ExternalSstFileInfo file1_info;
  823. Status s = sst_file_writer.Finish(&file1_info);
  824. ASSERT_TRUE(s.ok()) << s.ToString();
  825. ASSERT_EQ(file1_info.file_path, file1);
  826. ASSERT_EQ(file1_info.num_entries, 100);
  827. ASSERT_EQ(file1_info.smallest_key, Key(0));
  828. ASSERT_EQ(file1_info.largest_key, Key(99));
  829. // file2.sst (100 => 299)
  830. std::string file2 = sst_files_dir_ + "file2.sst";
  831. ASSERT_OK(sst_file_writer.Open(file2));
  832. for (int k = 100; k < 300; k++) {
  833. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  834. }
  835. ExternalSstFileInfo file2_info;
  836. s = sst_file_writer.Finish(&file2_info);
  837. ASSERT_TRUE(s.ok()) << s.ToString();
  838. ASSERT_EQ(file2_info.file_path, file2);
  839. ASSERT_EQ(file2_info.num_entries, 200);
  840. ASSERT_EQ(file2_info.smallest_key, Key(100));
  841. ASSERT_EQ(file2_info.largest_key, Key(299));
  842. ASSERT_OK(DeprecatedAddFile({file1}));
  843. // Add file will fail when holding snapshot and use the default
  844. // skip_snapshot_check to false
  845. const Snapshot* s1 = db_->GetSnapshot();
  846. if (s1 != nullptr) {
  847. ASSERT_NOK(DeprecatedAddFile({file2}));
  848. }
  849. // Add file will success when set skip_snapshot_check to true even db holding
  850. // snapshot
  851. if (s1 != nullptr) {
  852. ASSERT_OK(DeprecatedAddFile({file2}, false, true));
  853. db_->ReleaseSnapshot(s1);
  854. }
  855. // file3.sst (300 => 399)
  856. std::string file3 = sst_files_dir_ + "file3.sst";
  857. ASSERT_OK(sst_file_writer.Open(file3));
  858. for (int k = 300; k < 400; k++) {
  859. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  860. }
  861. ExternalSstFileInfo file3_info;
  862. s = sst_file_writer.Finish(&file3_info);
  863. ASSERT_TRUE(s.ok()) << s.ToString();
  864. ASSERT_EQ(file3_info.file_path, file3);
  865. ASSERT_EQ(file3_info.num_entries, 100);
  866. ASSERT_EQ(file3_info.smallest_key, Key(300));
  867. ASSERT_EQ(file3_info.largest_key, Key(399));
  868. // check that we have change the old key
  869. ASSERT_EQ(Get(Key(300)), "NOT_FOUND");
  870. const Snapshot* s2 = db_->GetSnapshot();
  871. ASSERT_OK(DeprecatedAddFile({file3}, false, true));
  872. ASSERT_EQ(Get(Key(300)), Key(300) + ("_val"));
  873. ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val"));
  874. db_->ReleaseSnapshot(s2);
  875. }
  876. TEST_F(ExternalSSTFileTest, MultiThreaded) {
  877. // Bulk load 10 files every file contain 1000 keys
  878. int num_files = 10;
  879. int keys_per_file = 1000;
  880. // Generate file names
  881. std::vector<std::string> file_names;
  882. for (int i = 0; i < num_files; i++) {
  883. std::string file_name = "file_" + ToString(i) + ".sst";
  884. file_names.push_back(sst_files_dir_ + file_name);
  885. }
  886. do {
  887. Options options = CurrentOptions();
  888. std::atomic<int> thread_num(0);
  889. std::function<void()> write_file_func = [&]() {
  890. int file_idx = thread_num.fetch_add(1);
  891. int range_start = file_idx * keys_per_file;
  892. int range_end = range_start + keys_per_file;
  893. SstFileWriter sst_file_writer(EnvOptions(), options);
  894. ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
  895. for (int k = range_start; k < range_end; k++) {
  896. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
  897. }
  898. Status s = sst_file_writer.Finish();
  899. ASSERT_TRUE(s.ok()) << s.ToString();
  900. };
  901. // Write num_files files in parallel
  902. std::vector<port::Thread> sst_writer_threads;
  903. for (int i = 0; i < num_files; ++i) {
  904. sst_writer_threads.emplace_back(write_file_func);
  905. }
  906. for (auto& t : sst_writer_threads) {
  907. t.join();
  908. }
  909. fprintf(stderr, "Wrote %d files (%d keys)\n", num_files,
  910. num_files * keys_per_file);
  911. thread_num.store(0);
  912. std::atomic<int> files_added(0);
  913. // Thread 0 -> Load {f0,f1}
  914. // Thread 1 -> Load {f0,f1}
  915. // Thread 2 -> Load {f2,f3}
  916. // Thread 3 -> Load {f2,f3}
  917. // Thread 4 -> Load {f4,f5}
  918. // Thread 5 -> Load {f4,f5}
  919. // ...
  920. std::function<void()> load_file_func = [&]() {
  921. // We intentionally add every file twice, and assert that it was added
  922. // only once and the other add failed
  923. int thread_id = thread_num.fetch_add(1);
  924. int file_idx = (thread_id / 2) * 2;
  925. // sometimes we use copy, sometimes link .. the result should be the same
  926. bool move_file = (thread_id % 3 == 0);
  927. std::vector<std::string> files_to_add;
  928. files_to_add = {file_names[file_idx]};
  929. if (static_cast<size_t>(file_idx + 1) < file_names.size()) {
  930. files_to_add.push_back(file_names[file_idx + 1]);
  931. }
  932. Status s = DeprecatedAddFile(files_to_add, move_file);
  933. if (s.ok()) {
  934. files_added += static_cast<int>(files_to_add.size());
  935. }
  936. };
  937. // Bulk load num_files files in parallel
  938. std::vector<port::Thread> add_file_threads;
  939. DestroyAndReopen(options);
  940. for (int i = 0; i < num_files; ++i) {
  941. add_file_threads.emplace_back(load_file_func);
  942. }
  943. for (auto& t : add_file_threads) {
  944. t.join();
  945. }
  946. ASSERT_EQ(files_added.load(), num_files);
  947. fprintf(stderr, "Loaded %d files (%d keys)\n", num_files,
  948. num_files * keys_per_file);
  949. // Overwrite values of keys divisible by 100
  950. for (int k = 0; k < num_files * keys_per_file; k += 100) {
  951. std::string key = Key(k);
  952. Status s = Put(key, key + "_new");
  953. ASSERT_TRUE(s.ok());
  954. }
  955. for (int i = 0; i < 2; i++) {
  956. // Make sure the values are correct before and after flush/compaction
  957. for (int k = 0; k < num_files * keys_per_file; ++k) {
  958. std::string key = Key(k);
  959. std::string value = (k % 100 == 0) ? (key + "_new") : key;
  960. ASSERT_EQ(Get(key), value);
  961. }
  962. ASSERT_OK(Flush());
  963. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  964. }
  965. fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
  966. DestroyAndRecreateExternalSSTFilesDir();
  967. } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
  968. }
  969. TEST_F(ExternalSSTFileTest, OverlappingRanges) {
  970. Random rnd(301);
  971. SequenceNumber assigned_seqno = 0;
  972. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  973. "ExternalSstFileIngestionJob::Run", [&assigned_seqno](void* arg) {
  974. ASSERT_TRUE(arg != nullptr);
  975. assigned_seqno = *(static_cast<SequenceNumber*>(arg));
  976. });
  977. bool need_flush = false;
  978. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  979. "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) {
  980. ASSERT_TRUE(arg != nullptr);
  981. need_flush = *(static_cast<bool*>(arg));
  982. });
  983. bool overlap_with_db = false;
  984. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  985. "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
  986. [&overlap_with_db](void* arg) {
  987. ASSERT_TRUE(arg != nullptr);
  988. overlap_with_db = *(static_cast<bool*>(arg));
  989. });
  990. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  991. do {
  992. Options options = CurrentOptions();
  993. DestroyAndReopen(options);
  994. SstFileWriter sst_file_writer(EnvOptions(), options);
  995. printf("Option config = %d\n", option_config_);
  996. std::vector<std::pair<int, int>> key_ranges;
  997. for (int i = 0; i < 100; i++) {
  998. int range_start = rnd.Uniform(20000);
  999. int keys_per_range = 10 + rnd.Uniform(41);
  1000. key_ranges.emplace_back(range_start, range_start + keys_per_range);
  1001. }
  1002. int memtable_add = 0;
  1003. int success_add_file = 0;
  1004. int failed_add_file = 0;
  1005. std::map<std::string, std::string> true_data;
  1006. for (size_t i = 0; i < key_ranges.size(); i++) {
  1007. int range_start = key_ranges[i].first;
  1008. int range_end = key_ranges[i].second;
  1009. Status s;
  1010. std::string range_val = "range_" + ToString(i);
  1011. // For 20% of ranges we use DB::Put, for 80% we use DB::AddFile
  1012. if (i && i % 5 == 0) {
  1013. // Use DB::Put to insert range (insert into memtable)
  1014. range_val += "_put";
  1015. for (int k = range_start; k <= range_end; k++) {
  1016. s = Put(Key(k), range_val);
  1017. ASSERT_OK(s);
  1018. }
  1019. memtable_add++;
  1020. } else {
  1021. // Use DB::AddFile to insert range
  1022. range_val += "_add_file";
  1023. // Generate the file containing the range
  1024. std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
  1025. ASSERT_OK(sst_file_writer.Open(file_name));
  1026. for (int k = range_start; k <= range_end; k++) {
  1027. s = sst_file_writer.Put(Key(k), range_val);
  1028. ASSERT_OK(s);
  1029. }
  1030. ExternalSstFileInfo file_info;
  1031. s = sst_file_writer.Finish(&file_info);
  1032. ASSERT_OK(s);
  1033. // Insert the generated file
  1034. s = DeprecatedAddFile({file_name});
  1035. auto it = true_data.lower_bound(Key(range_start));
  1036. if (option_config_ != kUniversalCompaction &&
  1037. option_config_ != kUniversalCompactionMultiLevel &&
  1038. option_config_ != kUniversalSubcompactions) {
  1039. if (it != true_data.end() && it->first <= Key(range_end)) {
  1040. // This range overlap with data already exist in DB
  1041. ASSERT_NOK(s);
  1042. failed_add_file++;
  1043. } else {
  1044. ASSERT_OK(s);
  1045. success_add_file++;
  1046. }
  1047. } else {
  1048. if ((it != true_data.end() && it->first <= Key(range_end)) ||
  1049. need_flush || assigned_seqno > 0 || overlap_with_db) {
  1050. // This range overlap with data already exist in DB
  1051. ASSERT_NOK(s);
  1052. failed_add_file++;
  1053. } else {
  1054. ASSERT_OK(s);
  1055. success_add_file++;
  1056. }
  1057. }
  1058. }
  1059. if (s.ok()) {
  1060. // Update true_data map to include the new inserted data
  1061. for (int k = range_start; k <= range_end; k++) {
  1062. true_data[Key(k)] = range_val;
  1063. }
  1064. }
  1065. // Flush / Compact the DB
  1066. if (i && i % 50 == 0) {
  1067. Flush();
  1068. }
  1069. if (i && i % 75 == 0) {
  1070. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  1071. }
  1072. }
  1073. printf("Total: %" ROCKSDB_PRIszt
  1074. " ranges\n"
  1075. "AddFile()|Success: %d ranges\n"
  1076. "AddFile()|RangeConflict: %d ranges\n"
  1077. "Put(): %d ranges\n",
  1078. key_ranges.size(), success_add_file, failed_add_file, memtable_add);
  1079. // Verify the correctness of the data
  1080. for (const auto& kv : true_data) {
  1081. ASSERT_EQ(Get(kv.first), kv.second);
  1082. }
  1083. printf("keys/values verified\n");
  1084. DestroyAndRecreateExternalSSTFilesDir();
  1085. } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
  1086. }
  1087. TEST_P(ExternalSSTFileTest, PickedLevel) {
  1088. Options options = CurrentOptions();
  1089. options.disable_auto_compactions = false;
  1090. options.level0_file_num_compaction_trigger = 4;
  1091. options.num_levels = 4;
  1092. DestroyAndReopen(options);
  1093. std::map<std::string, std::string> true_data;
  1094. // File 0 will go to last level (L3)
  1095. ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false, true,
  1096. false, false, &true_data));
  1097. EXPECT_EQ(FilesPerLevel(), "0,0,0,1");
  1098. // File 1 will go to level L2 (since it overlap with file 0 in L3)
  1099. ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false, true,
  1100. false, false, &true_data));
  1101. EXPECT_EQ(FilesPerLevel(), "0,0,1,1");
  1102. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  1103. {"ExternalSSTFileTest::PickedLevel:0", "BackgroundCallCompaction:0"},
  1104. {"DBImpl::BackgroundCompaction:Start",
  1105. "ExternalSSTFileTest::PickedLevel:1"},
  1106. {"ExternalSSTFileTest::PickedLevel:2",
  1107. "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
  1108. });
  1109. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1110. // Flush 4 files containing the same keys
  1111. for (int i = 0; i < 4; i++) {
  1112. ASSERT_OK(Put(Key(3), Key(3) + "put"));
  1113. ASSERT_OK(Put(Key(8), Key(8) + "put"));
  1114. true_data[Key(3)] = Key(3) + "put";
  1115. true_data[Key(8)] = Key(8) + "put";
  1116. ASSERT_OK(Flush());
  1117. }
  1118. // Wait for BackgroundCompaction() to be called
  1119. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:0");
  1120. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:1");
  1121. EXPECT_EQ(FilesPerLevel(), "4,0,1,1");
  1122. // This file overlaps with file 0 (L3), file 1 (L2) and the
  1123. // output of compaction going to L1
  1124. ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true,
  1125. false, false, &true_data));
  1126. EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
  1127. // This file does not overlap with any file or with the running compaction
  1128. ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
  1129. false, false, false, &true_data));
  1130. EXPECT_EQ(FilesPerLevel(), "5,0,1,2");
  1131. // Hold compaction from finishing
  1132. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:2");
  1133. dbfull()->TEST_WaitForCompact();
  1134. EXPECT_EQ(FilesPerLevel(), "1,1,1,2");
  1135. size_t kcnt = 0;
  1136. VerifyDBFromMap(true_data, &kcnt, false);
  1137. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1138. }
  1139. TEST_F(ExternalSSTFileTest, PickedLevelBug) {
  1140. Options options = CurrentOptions();
  1141. options.disable_auto_compactions = false;
  1142. options.level0_file_num_compaction_trigger = 3;
  1143. options.num_levels = 2;
  1144. DestroyAndReopen(options);
  1145. std::vector<int> file_keys;
  1146. // file #1 in L0
  1147. file_keys = {0, 5, 7};
  1148. for (int k : file_keys) {
  1149. ASSERT_OK(Put(Key(k), Key(k)));
  1150. }
  1151. ASSERT_OK(Flush());
  1152. // file #2 in L0
  1153. file_keys = {4, 6, 8, 9};
  1154. for (int k : file_keys) {
  1155. ASSERT_OK(Put(Key(k), Key(k)));
  1156. }
  1157. ASSERT_OK(Flush());
  1158. // We have 2 overlapping files in L0
  1159. EXPECT_EQ(FilesPerLevel(), "2");
  1160. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1161. {{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
  1162. {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
  1163. {"ExternalSSTFileTest::PickedLevelBug:2",
  1164. "DBImpl::RunManualCompaction:0"},
  1165. {"ExternalSSTFileTest::PickedLevelBug:3",
  1166. "DBImpl::RunManualCompaction:1"}});
  1167. std::atomic<bool> bg_compact_started(false);
  1168. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1169. "DBImpl::BackgroundCompaction:Start",
  1170. [&](void* /*arg*/) { bg_compact_started.store(true); });
  1171. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1172. // While writing the MANIFEST start a thread that will ask for compaction
  1173. ROCKSDB_NAMESPACE::port::Thread bg_compact([&]() {
  1174. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1175. });
  1176. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
  1177. // Start a thread that will ingest a new file
  1178. ROCKSDB_NAMESPACE::port::Thread bg_addfile([&]() {
  1179. file_keys = {1, 2, 3};
  1180. ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
  1181. });
  1182. // Wait for AddFile to start picking levels and writing MANIFEST
  1183. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
  1184. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
  1185. // We need to verify that no compactions can run while AddFile is
  1186. // ingesting the files into the levels it find suitable. So we will
  1187. // wait for 2 seconds to give a chance for compactions to run during
  1188. // this period, and then make sure that no compactions where able to run
  1189. env_->SleepForMicroseconds(1000000 * 2);
  1190. ASSERT_FALSE(bg_compact_started.load());
  1191. // Hold AddFile from finishing writing the MANIFEST
  1192. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
  1193. bg_addfile.join();
  1194. bg_compact.join();
  1195. dbfull()->TEST_WaitForCompact();
  1196. int total_keys = 0;
  1197. Iterator* iter = db_->NewIterator(ReadOptions());
  1198. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  1199. ASSERT_OK(iter->status());
  1200. total_keys++;
  1201. }
  1202. ASSERT_EQ(total_keys, 10);
  1203. delete iter;
  1204. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1205. }
  1206. TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
  1207. Options options = CurrentOptions();
  1208. DestroyAndReopen(options);
  1209. Status s = db_->IngestExternalFile({"non_existing_file"},
  1210. IngestExternalFileOptions());
  1211. ASSERT_NOK(s);
  1212. // Verify file deletion is not impacted (verify a bug fix)
  1213. ASSERT_OK(Put(Key(1), Key(1)));
  1214. ASSERT_OK(Put(Key(9), Key(9)));
  1215. ASSERT_OK(Flush());
  1216. ASSERT_OK(Put(Key(1), Key(1)));
  1217. ASSERT_OK(Put(Key(9), Key(9)));
  1218. ASSERT_OK(Flush());
  1219. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1220. ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
  1221. // After full compaction, there should be only 1 file.
  1222. std::vector<std::string> files;
  1223. env_->GetChildren(dbname_, &files);
  1224. int num_sst_files = 0;
  1225. for (auto& f : files) {
  1226. uint64_t number;
  1227. FileType type;
  1228. if (ParseFileName(f, &number, &type) && type == kTableFile) {
  1229. num_sst_files++;
  1230. }
  1231. }
  1232. ASSERT_EQ(1, num_sst_files);
  1233. }
  1234. TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
  1235. Options options = CurrentOptions();
  1236. options.disable_auto_compactions = false;
  1237. options.level0_file_num_compaction_trigger = 2;
  1238. options.num_levels = 2;
  1239. DestroyAndReopen(options);
  1240. std::function<void()> bg_compact = [&]() {
  1241. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1242. };
  1243. int range_id = 0;
  1244. std::vector<int> file_keys;
  1245. std::function<void()> bg_addfile = [&]() {
  1246. ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
  1247. };
  1248. const int num_of_ranges = 1000;
  1249. std::vector<port::Thread> threads;
  1250. while (range_id < num_of_ranges) {
  1251. int range_start = range_id * 10;
  1252. int range_end = range_start + 10;
  1253. file_keys.clear();
  1254. for (int k = range_start + 1; k < range_end; k++) {
  1255. file_keys.push_back(k);
  1256. }
  1257. ASSERT_OK(Put(Key(range_start), Key(range_start)));
  1258. ASSERT_OK(Put(Key(range_end), Key(range_end)));
  1259. ASSERT_OK(Flush());
  1260. if (range_id % 10 == 0) {
  1261. threads.emplace_back(bg_compact);
  1262. }
  1263. threads.emplace_back(bg_addfile);
  1264. for (auto& t : threads) {
  1265. t.join();
  1266. }
  1267. threads.clear();
  1268. range_id++;
  1269. }
  1270. for (int rid = 0; rid < num_of_ranges; rid++) {
  1271. int range_start = rid * 10;
  1272. int range_end = range_start + 10;
  1273. ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid;
  1274. ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid;
  1275. for (int k = range_start + 1; k < range_end; k++) {
  1276. std::string v = Key(k) + ToString(rid);
  1277. ASSERT_EQ(Get(Key(k)), v) << rid;
  1278. }
  1279. }
  1280. }
  1281. TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
  1282. Options options = CurrentOptions();
  1283. options.disable_auto_compactions = false;
  1284. options.level0_file_num_compaction_trigger = 4;
  1285. options.level_compaction_dynamic_level_bytes = true;
  1286. options.num_levels = 4;
  1287. DestroyAndReopen(options);
  1288. std::map<std::string, std::string> true_data;
  1289. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  1290. {"ExternalSSTFileTest::PickedLevelDynamic:0",
  1291. "BackgroundCallCompaction:0"},
  1292. {"DBImpl::BackgroundCompaction:Start",
  1293. "ExternalSSTFileTest::PickedLevelDynamic:1"},
  1294. {"ExternalSSTFileTest::PickedLevelDynamic:2",
  1295. "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
  1296. });
  1297. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1298. // Flush 4 files containing the same keys
  1299. for (int i = 0; i < 4; i++) {
  1300. for (int k = 20; k <= 30; k++) {
  1301. ASSERT_OK(Put(Key(k), Key(k) + "put"));
  1302. true_data[Key(k)] = Key(k) + "put";
  1303. }
  1304. for (int k = 50; k <= 60; k++) {
  1305. ASSERT_OK(Put(Key(k), Key(k) + "put"));
  1306. true_data[Key(k)] = Key(k) + "put";
  1307. }
  1308. ASSERT_OK(Flush());
  1309. }
  1310. // Wait for BackgroundCompaction() to be called
  1311. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:0");
  1312. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:1");
  1313. // This file overlaps with the output of the compaction (going to L3)
  1314. // so the file will be added to L0 since L3 is the base level
  1315. ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false,
  1316. false, true, false, false, &true_data));
  1317. EXPECT_EQ(FilesPerLevel(), "5");
  1318. // This file does not overlap with the current running compactiong
  1319. ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
  1320. true, false, false, &true_data));
  1321. EXPECT_EQ(FilesPerLevel(), "5,0,0,1");
  1322. // Hold compaction from finishing
  1323. TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:2");
  1324. // Output of the compaction will go to L3
  1325. dbfull()->TEST_WaitForCompact();
  1326. EXPECT_EQ(FilesPerLevel(), "1,0,0,2");
  1327. Close();
  1328. options.disable_auto_compactions = true;
  1329. Reopen(options);
  1330. ASSERT_OK(GenerateAndAddExternalFile(options, {1, 15, 19}, -1, false, false,
  1331. true, false, false, &true_data));
  1332. ASSERT_EQ(FilesPerLevel(), "1,0,0,3");
  1333. ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1001, 1002}, -1, false,
  1334. false, true, false, false, &true_data));
  1335. ASSERT_EQ(FilesPerLevel(), "1,0,0,4");
  1336. ASSERT_OK(GenerateAndAddExternalFile(options, {500, 600, 700}, -1, false,
  1337. false, true, false, false, &true_data));
  1338. ASSERT_EQ(FilesPerLevel(), "1,0,0,5");
  1339. // File 5 overlaps with file 2 (L3 / base level)
  1340. ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false, true,
  1341. false, false, &true_data));
  1342. ASSERT_EQ(FilesPerLevel(), "2,0,0,5");
  1343. // File 6 overlaps with file 2 (L3 / base level) and file 5 (L0)
  1344. ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false, true,
  1345. false, false, &true_data));
  1346. ASSERT_EQ(FilesPerLevel(), "3,0,0,5");
  1347. // Verify data in files
  1348. size_t kcnt = 0;
  1349. VerifyDBFromMap(true_data, &kcnt, false);
  1350. // Write range [5 => 10] to L0
  1351. for (int i = 5; i <= 10; i++) {
  1352. std::string k = Key(i);
  1353. std::string v = k + "put";
  1354. ASSERT_OK(Put(k, v));
  1355. true_data[k] = v;
  1356. }
  1357. ASSERT_OK(Flush());
  1358. ASSERT_EQ(FilesPerLevel(), "4,0,0,5");
  1359. // File 7 overlaps with file 4 (L3)
  1360. ASSERT_OK(GenerateAndAddExternalFile(options, {650, 651, 652}, -1, false,
  1361. false, true, false, false, &true_data));
  1362. ASSERT_EQ(FilesPerLevel(), "5,0,0,5");
  1363. VerifyDBFromMap(true_data, &kcnt, false);
  1364. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1365. }
  1366. TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) {
  1367. Options options = CurrentOptions();
  1368. options.comparator = ReverseBytewiseComparator();
  1369. DestroyAndReopen(options);
  1370. SstFileWriter sst_file_writer(EnvOptions(), options);
  1371. // Generate files with these key ranges
  1372. // {14 -> 0}
  1373. // {24 -> 10}
  1374. // {34 -> 20}
  1375. // {44 -> 30}
  1376. // ..
  1377. std::vector<std::string> generated_files;
  1378. for (int i = 0; i < 10; i++) {
  1379. std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
  1380. ASSERT_OK(sst_file_writer.Open(file_name));
  1381. int range_end = i * 10;
  1382. int range_start = range_end + 15;
  1383. for (int k = (range_start - 1); k >= range_end; k--) {
  1384. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
  1385. }
  1386. ExternalSstFileInfo file_info;
  1387. ASSERT_OK(sst_file_writer.Finish(&file_info));
  1388. generated_files.push_back(file_name);
  1389. }
  1390. std::vector<std::string> in_files;
  1391. // These 2nd and 3rd files overlap with each other
  1392. in_files = {generated_files[0], generated_files[4], generated_files[5],
  1393. generated_files[7]};
  1394. ASSERT_NOK(DeprecatedAddFile(in_files));
  1395. // These 2 files dont overlap with each other
  1396. in_files = {generated_files[0], generated_files[2]};
  1397. ASSERT_OK(DeprecatedAddFile(in_files));
  1398. // These 2 files dont overlap with each other but overlap with keys in DB
  1399. in_files = {generated_files[3], generated_files[7]};
  1400. ASSERT_NOK(DeprecatedAddFile(in_files));
  1401. // Files dont overlap and dont overlap with DB key range
  1402. in_files = {generated_files[4], generated_files[6], generated_files[8]};
  1403. ASSERT_OK(DeprecatedAddFile(in_files));
  1404. for (int i = 0; i < 100; i++) {
  1405. if (i % 20 <= 14) {
  1406. ASSERT_EQ(Get(Key(i)), Key(i));
  1407. } else {
  1408. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  1409. }
  1410. }
  1411. }
  1412. TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
  1413. Options options = CurrentOptions();
  1414. options.num_levels = 3;
  1415. options.IncreaseParallelism(20);
  1416. DestroyAndReopen(options);
  1417. ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4}, 1)); // L3
  1418. ASSERT_OK(GenerateAndAddExternalFile(options, {2, 3}, 2)); // L2
  1419. ASSERT_OK(GenerateAndAddExternalFile(options, {10, 14}, 3)); // L3
  1420. ASSERT_OK(GenerateAndAddExternalFile(options, {12, 13}, 4)); // L2
  1421. ASSERT_OK(GenerateAndAddExternalFile(options, {20, 24}, 5)); // L3
  1422. ASSERT_OK(GenerateAndAddExternalFile(options, {22, 23}, 6)); // L2
  1423. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1424. "CompactionJob::Run():Start", [&](void* /*arg*/) {
  1425. // fit in L3 but will overlap with compaction so will be added
  1426. // to L2 but a compaction will trivially move it to L3
  1427. // and break LSM consistency
  1428. static std::atomic<bool> called = {false};
  1429. if (!called) {
  1430. called = true;
  1431. ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
  1432. ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
  1433. }
  1434. });
  1435. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1436. CompactRangeOptions cro;
  1437. cro.exclusive_manual_compaction = false;
  1438. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  1439. dbfull()->TEST_WaitForCompact();
  1440. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1441. }
  1442. TEST_F(ExternalSSTFileTest, CompactAddedFiles) {
  1443. Options options = CurrentOptions();
  1444. options.num_levels = 3;
  1445. DestroyAndReopen(options);
  1446. ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, 1)); // L3
  1447. ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, 2)); // L2
  1448. ASSERT_OK(GenerateAndAddExternalFile(options, {3, 8}, 3)); // L1
  1449. ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, 4)); // L0
  1450. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1451. }
  1452. TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) {
  1453. Options options = CurrentOptions();
  1454. DestroyAndReopen(options);
  1455. std::string file_path = sst_files_dir_ + "/not_shared";
  1456. SstFileWriter sst_file_writer(EnvOptions(), options);
  1457. std::string suffix(100, 'X');
  1458. ASSERT_OK(sst_file_writer.Open(file_path));
  1459. ASSERT_OK(sst_file_writer.Put("A" + suffix, "VAL"));
  1460. ASSERT_OK(sst_file_writer.Put("BB" + suffix, "VAL"));
  1461. ASSERT_OK(sst_file_writer.Put("CC" + suffix, "VAL"));
  1462. ASSERT_OK(sst_file_writer.Put("CXD" + suffix, "VAL"));
  1463. ASSERT_OK(sst_file_writer.Put("CZZZ" + suffix, "VAL"));
  1464. ASSERT_OK(sst_file_writer.Put("ZAAAX" + suffix, "VAL"));
  1465. ASSERT_OK(sst_file_writer.Finish());
  1466. ASSERT_OK(DeprecatedAddFile({file_path}));
  1467. }
  1468. TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
  1469. SyncPoint::GetInstance()->DisableProcessing();
  1470. SyncPoint::GetInstance()->LoadDependency(
  1471. {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
  1472. "ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"},
  1473. {"DBImpl::WaitForPendingWrites:BeforeBlock",
  1474. "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
  1475. SyncPoint::GetInstance()->SetCallBack(
  1476. "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) {
  1477. ASSERT_TRUE(*reinterpret_cast<bool*>(need_flush));
  1478. });
  1479. Options options = CurrentOptions();
  1480. options.unordered_write = true;
  1481. DestroyAndReopen(options);
  1482. Put("foo", "v1");
  1483. SyncPoint::GetInstance()->EnableProcessing();
  1484. port::Thread writer([&]() { Put("bar", "v2"); });
  1485. TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL");
  1486. ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1,
  1487. true /* allow_global_seqno */));
  1488. ASSERT_EQ(Get("bar"), "v3");
  1489. writer.join();
  1490. SyncPoint::GetInstance()->DisableProcessing();
  1491. SyncPoint::GetInstance()->ClearAllCallBacks();
  1492. }
  1493. TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
  1494. Options options = CurrentOptions();
  1495. options.IncreaseParallelism(20);
  1496. options.level0_slowdown_writes_trigger = 256;
  1497. options.level0_stop_writes_trigger = 256;
  1498. bool write_global_seqno = std::get<0>(GetParam());
  1499. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1500. for (int iter = 0; iter < 2; iter++) {
  1501. bool write_to_memtable = (iter == 0);
  1502. DestroyAndReopen(options);
  1503. Random rnd(301);
  1504. std::map<std::string, std::string> true_data;
  1505. for (int i = 0; i < 500; i++) {
  1506. std::vector<std::pair<std::string, std::string>> random_data;
  1507. for (int j = 0; j < 100; j++) {
  1508. std::string k;
  1509. std::string v;
  1510. test::RandomString(&rnd, rnd.Next() % 20, &k);
  1511. test::RandomString(&rnd, rnd.Next() % 50, &v);
  1512. random_data.emplace_back(k, v);
  1513. }
  1514. if (write_to_memtable && rnd.OneIn(4)) {
  1515. // 25% of writes go through memtable
  1516. for (auto& entry : random_data) {
  1517. ASSERT_OK(Put(entry.first, entry.second));
  1518. true_data[entry.first] = entry.second;
  1519. }
  1520. } else {
  1521. ASSERT_OK(GenerateAndAddExternalFile(
  1522. options, random_data, -1, true, write_global_seqno,
  1523. verify_checksums_before_ingest, false, true, &true_data));
  1524. }
  1525. }
  1526. size_t kcnt = 0;
  1527. VerifyDBFromMap(true_data, &kcnt, false);
  1528. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  1529. VerifyDBFromMap(true_data, &kcnt, false);
  1530. }
  1531. }
  1532. TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) {
  1533. Options options = CurrentOptions();
  1534. options.num_levels = 5;
  1535. options.disable_auto_compactions = true;
  1536. DestroyAndReopen(options);
  1537. std::vector<std::pair<std::string, std::string>> file_data;
  1538. std::map<std::string, std::string> true_data;
  1539. // Insert 100 -> 200 into the memtable
  1540. for (int i = 100; i <= 200; i++) {
  1541. ASSERT_OK(Put(Key(i), "memtable"));
  1542. true_data[Key(i)] = "memtable";
  1543. }
  1544. // Insert 0 -> 20 using AddFile
  1545. file_data.clear();
  1546. for (int i = 0; i <= 20; i++) {
  1547. file_data.emplace_back(Key(i), "L4");
  1548. }
  1549. bool write_global_seqno = std::get<0>(GetParam());
  1550. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1551. ASSERT_OK(GenerateAndAddExternalFile(
  1552. options, file_data, -1, true, write_global_seqno,
  1553. verify_checksums_before_ingest, false, false, &true_data));
  1554. // This file dont overlap with anything in the DB, will go to L4
  1555. ASSERT_EQ("0,0,0,0,1", FilesPerLevel());
  1556. // Insert 80 -> 130 using AddFile
  1557. file_data.clear();
  1558. for (int i = 80; i <= 130; i++) {
  1559. file_data.emplace_back(Key(i), "L0");
  1560. }
  1561. ASSERT_OK(GenerateAndAddExternalFile(
  1562. options, file_data, -1, true, write_global_seqno,
  1563. verify_checksums_before_ingest, false, false, &true_data));
  1564. // This file overlap with the memtable, so it will flush it and add
  1565. // it self to L0
  1566. ASSERT_EQ("2,0,0,0,1", FilesPerLevel());
  1567. // Insert 30 -> 50 using AddFile
  1568. file_data.clear();
  1569. for (int i = 30; i <= 50; i++) {
  1570. file_data.emplace_back(Key(i), "L4");
  1571. }
  1572. ASSERT_OK(GenerateAndAddExternalFile(
  1573. options, file_data, -1, true, write_global_seqno,
  1574. verify_checksums_before_ingest, false, false, &true_data));
  1575. // This file dont overlap with anything in the DB and fit in L4 as well
  1576. ASSERT_EQ("2,0,0,0,2", FilesPerLevel());
  1577. // Insert 10 -> 40 using AddFile
  1578. file_data.clear();
  1579. for (int i = 10; i <= 40; i++) {
  1580. file_data.emplace_back(Key(i), "L3");
  1581. }
  1582. ASSERT_OK(GenerateAndAddExternalFile(
  1583. options, file_data, -1, true, write_global_seqno,
  1584. verify_checksums_before_ingest, false, false, &true_data));
  1585. // This file overlap with files in L4, we will ingest it in L3
  1586. ASSERT_EQ("2,0,0,1,2", FilesPerLevel());
  1587. size_t kcnt = 0;
  1588. VerifyDBFromMap(true_data, &kcnt, false);
  1589. }
  1590. TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
  1591. Options options = CurrentOptions();
  1592. DestroyAndReopen(options);
  1593. uint64_t entries_in_memtable;
  1594. std::map<std::string, std::string> true_data;
  1595. for (int k : {10, 20, 40, 80}) {
  1596. ASSERT_OK(Put(Key(k), "memtable"));
  1597. true_data[Key(k)] = "memtable";
  1598. }
  1599. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1600. &entries_in_memtable);
  1601. ASSERT_GE(entries_in_memtable, 1);
  1602. bool write_global_seqno = std::get<0>(GetParam());
  1603. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1604. // No need for flush
  1605. ASSERT_OK(GenerateAndAddExternalFile(
  1606. options, {90, 100, 110}, -1, true, write_global_seqno,
  1607. verify_checksums_before_ingest, false, false, &true_data));
  1608. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1609. &entries_in_memtable);
  1610. ASSERT_GE(entries_in_memtable, 1);
  1611. // This file will flush the memtable
  1612. ASSERT_OK(GenerateAndAddExternalFile(
  1613. options, {19, 20, 21}, -1, true, write_global_seqno,
  1614. verify_checksums_before_ingest, false, false, &true_data));
  1615. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1616. &entries_in_memtable);
  1617. ASSERT_EQ(entries_in_memtable, 0);
  1618. for (int k : {200, 201, 205, 206}) {
  1619. ASSERT_OK(Put(Key(k), "memtable"));
  1620. true_data[Key(k)] = "memtable";
  1621. }
  1622. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1623. &entries_in_memtable);
  1624. ASSERT_GE(entries_in_memtable, 1);
  1625. // No need for flush, this file keys fit between the memtable keys
  1626. ASSERT_OK(GenerateAndAddExternalFile(
  1627. options, {202, 203, 204}, -1, true, write_global_seqno,
  1628. verify_checksums_before_ingest, false, false, &true_data));
  1629. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1630. &entries_in_memtable);
  1631. ASSERT_GE(entries_in_memtable, 1);
  1632. // This file will flush the memtable
  1633. ASSERT_OK(GenerateAndAddExternalFile(
  1634. options, {206, 207}, -1, true, write_global_seqno,
  1635. verify_checksums_before_ingest, false, false, &true_data));
  1636. db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
  1637. &entries_in_memtable);
  1638. ASSERT_EQ(entries_in_memtable, 0);
  1639. size_t kcnt = 0;
  1640. VerifyDBFromMap(true_data, &kcnt, false);
  1641. }
  1642. TEST_P(ExternalSSTFileTest, L0SortingIssue) {
  1643. Options options = CurrentOptions();
  1644. options.num_levels = 2;
  1645. DestroyAndReopen(options);
  1646. std::map<std::string, std::string> true_data;
  1647. ASSERT_OK(Put(Key(1), "memtable"));
  1648. ASSERT_OK(Put(Key(10), "memtable"));
  1649. bool write_global_seqno = std::get<0>(GetParam());
  1650. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1651. // No Flush needed, No global seqno needed, Ingest in L1
  1652. ASSERT_OK(
  1653. GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
  1654. verify_checksums_before_ingest, false, false));
  1655. // No Flush needed, but need a global seqno, Ingest in L0
  1656. ASSERT_OK(
  1657. GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
  1658. verify_checksums_before_ingest, false, false));
  1659. printf("%s\n", FilesPerLevel().c_str());
  1660. // Overwrite what we added using external files
  1661. ASSERT_OK(Put(Key(7), "memtable"));
  1662. ASSERT_OK(Put(Key(8), "memtable"));
  1663. // Read values from memtable
  1664. ASSERT_EQ(Get(Key(7)), "memtable");
  1665. ASSERT_EQ(Get(Key(8)), "memtable");
  1666. // Flush and read from L0
  1667. ASSERT_OK(Flush());
  1668. printf("%s\n", FilesPerLevel().c_str());
  1669. ASSERT_EQ(Get(Key(7)), "memtable");
  1670. ASSERT_EQ(Get(Key(8)), "memtable");
  1671. }
  1672. TEST_F(ExternalSSTFileTest, CompactionDeadlock) {
  1673. Options options = CurrentOptions();
  1674. options.num_levels = 2;
  1675. options.level0_file_num_compaction_trigger = 4;
  1676. options.level0_slowdown_writes_trigger = 4;
  1677. options.level0_stop_writes_trigger = 4;
  1678. DestroyAndReopen(options);
  1679. // atomic conter of currently running bg threads
  1680. std::atomic<int> running_threads(0);
  1681. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  1682. {"DBImpl::DelayWrite:Wait", "ExternalSSTFileTest::DeadLock:0"},
  1683. {"ExternalSSTFileTest::DeadLock:1", "DBImpl::AddFile:Start"},
  1684. {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::DeadLock:2"},
  1685. {"ExternalSSTFileTest::DeadLock:3", "BackgroundCallCompaction:0"},
  1686. });
  1687. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1688. // Start ingesting and extrnal file in the background
  1689. ROCKSDB_NAMESPACE::port::Thread bg_ingest_file([&]() {
  1690. running_threads += 1;
  1691. ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6}));
  1692. running_threads -= 1;
  1693. });
  1694. ASSERT_OK(Put(Key(1), "memtable"));
  1695. ASSERT_OK(Flush());
  1696. ASSERT_OK(Put(Key(2), "memtable"));
  1697. ASSERT_OK(Flush());
  1698. ASSERT_OK(Put(Key(3), "memtable"));
  1699. ASSERT_OK(Flush());
  1700. ASSERT_OK(Put(Key(4), "memtable"));
  1701. ASSERT_OK(Flush());
  1702. // This thread will try to insert into the memtable but since we have 4 L0
  1703. // files this thread will be blocked and hold the writer thread
  1704. ROCKSDB_NAMESPACE::port::Thread bg_block_put([&]() {
  1705. running_threads += 1;
  1706. ASSERT_OK(Put(Key(10), "memtable"));
  1707. running_threads -= 1;
  1708. });
  1709. // Make sure DelayWrite is called first
  1710. TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:0");
  1711. // `DBImpl::AddFile:Start` will wait until we be here
  1712. TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:1");
  1713. // Wait for IngestExternalFile() to start and aquire mutex
  1714. TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:2");
  1715. // Now let compaction start
  1716. TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:3");
  1717. // Wait for max 5 seconds, if we did not finish all bg threads
  1718. // then we hit the deadlock bug
  1719. for (int i = 0; i < 10; i++) {
  1720. if (running_threads.load() == 0) {
  1721. break;
  1722. }
  1723. env_->SleepForMicroseconds(500000);
  1724. }
  1725. ASSERT_EQ(running_threads.load(), 0);
  1726. bg_ingest_file.join();
  1727. bg_block_put.join();
  1728. }
  1729. TEST_F(ExternalSSTFileTest, DirtyExit) {
  1730. Options options = CurrentOptions();
  1731. DestroyAndReopen(options);
  1732. std::string file_path = sst_files_dir_ + "/dirty_exit";
  1733. std::unique_ptr<SstFileWriter> sst_file_writer;
  1734. // Destruct SstFileWriter without calling Finish()
  1735. sst_file_writer.reset(new SstFileWriter(EnvOptions(), options));
  1736. ASSERT_OK(sst_file_writer->Open(file_path));
  1737. sst_file_writer.reset();
  1738. // Destruct SstFileWriter with a failing Finish
  1739. sst_file_writer.reset(new SstFileWriter(EnvOptions(), options));
  1740. ASSERT_OK(sst_file_writer->Open(file_path));
  1741. ASSERT_NOK(sst_file_writer->Finish());
  1742. }
  1743. TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
  1744. Options options = CurrentOptions();
  1745. CreateAndReopenWithCF({"koko", "toto"}, options);
  1746. SstFileWriter sfw_default(EnvOptions(), options, handles_[0]);
  1747. SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
  1748. SstFileWriter sfw_cf2(EnvOptions(), options, handles_[2]);
  1749. SstFileWriter sfw_unknown(EnvOptions(), options);
  1750. // default_cf.sst
  1751. const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
  1752. ASSERT_OK(sfw_default.Open(cf_default_sst));
  1753. ASSERT_OK(sfw_default.Put("K1", "V1"));
  1754. ASSERT_OK(sfw_default.Put("K2", "V2"));
  1755. ASSERT_OK(sfw_default.Finish());
  1756. // cf1.sst
  1757. const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
  1758. ASSERT_OK(sfw_cf1.Open(cf1_sst));
  1759. ASSERT_OK(sfw_cf1.Put("K3", "V1"));
  1760. ASSERT_OK(sfw_cf1.Put("K4", "V2"));
  1761. ASSERT_OK(sfw_cf1.Finish());
  1762. // cf_unknown.sst
  1763. const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
  1764. ASSERT_OK(sfw_unknown.Open(unknown_sst));
  1765. ASSERT_OK(sfw_unknown.Put("K5", "V1"));
  1766. ASSERT_OK(sfw_unknown.Put("K6", "V2"));
  1767. ASSERT_OK(sfw_unknown.Finish());
  1768. IngestExternalFileOptions ifo;
  1769. // SST CF dont match
  1770. ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
  1771. // SST CF dont match
  1772. ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
  1773. // SST CF match
  1774. ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
  1775. // SST CF dont match
  1776. ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
  1777. // SST CF dont match
  1778. ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
  1779. // SST CF match
  1780. ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
  1781. // SST CF unknown
  1782. ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
  1783. // SST CF unknown
  1784. ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
  1785. // SST CF unknown
  1786. ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
  1787. // Cannot ingest a file into a dropped CF
  1788. ASSERT_OK(db_->DropColumnFamily(handles_[1]));
  1789. ASSERT_NOK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
  1790. // CF was not dropped, ok to Ingest
  1791. ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
  1792. }
  1793. /*
  1794. * Test and verify the functionality of ingestion_options.move_files and
  1795. * ingestion_options.failed_move_fall_back_to_copy
  1796. */
  1797. TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) {
  1798. const bool fail_link = std::get<0>(GetParam());
  1799. const bool failed_move_fall_back_to_copy = std::get<1>(GetParam());
  1800. test_env_->set_fail_link(fail_link);
  1801. const EnvOptions env_options;
  1802. DestroyAndReopen(options_);
  1803. const int kNumKeys = 10000;
  1804. IngestExternalFileOptions ifo;
  1805. ifo.move_files = true;
  1806. ifo.failed_move_fall_back_to_copy = failed_move_fall_back_to_copy;
  1807. std::string file_path = sst_files_dir_ + "file1.sst";
  1808. // Create SstFileWriter for default column family
  1809. SstFileWriter sst_file_writer(env_options, options_);
  1810. ASSERT_OK(sst_file_writer.Open(file_path));
  1811. for (int i = 0; i < kNumKeys; i++) {
  1812. ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_value"));
  1813. }
  1814. ASSERT_OK(sst_file_writer.Finish());
  1815. uint64_t file_size = 0;
  1816. ASSERT_OK(env_->GetFileSize(file_path, &file_size));
  1817. bool copyfile = false;
  1818. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1819. "ExternalSstFileIngestionJob::Prepare:CopyFile",
  1820. [&](void* /* arg */) { copyfile = true; });
  1821. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1822. const Status s = db_->IngestExternalFile({file_path}, ifo);
  1823. ColumnFamilyHandleImpl* cfh =
  1824. static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
  1825. ColumnFamilyData* cfd = cfh->cfd();
  1826. const InternalStats* internal_stats_ptr = cfd->internal_stats();
  1827. const std::vector<InternalStats::CompactionStats>& comp_stats =
  1828. internal_stats_ptr->TEST_GetCompactionStats();
  1829. uint64_t bytes_copied = 0;
  1830. uint64_t bytes_moved = 0;
  1831. for (const auto& stats : comp_stats) {
  1832. bytes_copied += stats.bytes_written;
  1833. bytes_moved += stats.bytes_moved;
  1834. }
  1835. if (!fail_link) {
  1836. // Link operation succeeds. External SST should be moved.
  1837. ASSERT_OK(s);
  1838. ASSERT_EQ(0, bytes_copied);
  1839. ASSERT_EQ(file_size, bytes_moved);
  1840. ASSERT_FALSE(copyfile);
  1841. } else {
  1842. // Link operation fails.
  1843. ASSERT_EQ(0, bytes_moved);
  1844. if (failed_move_fall_back_to_copy) {
  1845. ASSERT_OK(s);
  1846. // Copy file is true since a failed link falls back to copy file.
  1847. ASSERT_TRUE(copyfile);
  1848. ASSERT_EQ(file_size, bytes_copied);
  1849. } else {
  1850. ASSERT_TRUE(s.IsNotSupported());
  1851. // Copy file is false since a failed link does not fall back to copy file.
  1852. ASSERT_FALSE(copyfile);
  1853. ASSERT_EQ(0, bytes_copied);
  1854. }
  1855. }
  1856. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1857. }
  1858. class TestIngestExternalFileListener : public EventListener {
  1859. public:
  1860. void OnExternalFileIngested(DB* /*db*/,
  1861. const ExternalFileIngestionInfo& info) override {
  1862. ingested_files.push_back(info);
  1863. }
  1864. std::vector<ExternalFileIngestionInfo> ingested_files;
  1865. };
  1866. TEST_P(ExternalSSTFileTest, IngestionListener) {
  1867. Options options = CurrentOptions();
  1868. TestIngestExternalFileListener* listener =
  1869. new TestIngestExternalFileListener();
  1870. options.listeners.emplace_back(listener);
  1871. CreateAndReopenWithCF({"koko", "toto"}, options);
  1872. bool write_global_seqno = std::get<0>(GetParam());
  1873. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1874. // Ingest into default cf
  1875. ASSERT_OK(GenerateAndAddExternalFile(
  1876. options, {1, 2}, -1, true, write_global_seqno,
  1877. verify_checksums_before_ingest, false, true, nullptr, handles_[0]));
  1878. ASSERT_EQ(listener->ingested_files.size(), 1);
  1879. ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
  1880. ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
  1881. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
  1882. 0);
  1883. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
  1884. "default");
  1885. // Ingest into cf1
  1886. ASSERT_OK(GenerateAndAddExternalFile(
  1887. options, {1, 2}, -1, true, write_global_seqno,
  1888. verify_checksums_before_ingest, false, true, nullptr, handles_[1]));
  1889. ASSERT_EQ(listener->ingested_files.size(), 2);
  1890. ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
  1891. ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
  1892. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
  1893. 1);
  1894. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
  1895. "koko");
  1896. // Ingest into cf2
  1897. ASSERT_OK(GenerateAndAddExternalFile(
  1898. options, {1, 2}, -1, true, write_global_seqno,
  1899. verify_checksums_before_ingest, false, true, nullptr, handles_[2]));
  1900. ASSERT_EQ(listener->ingested_files.size(), 3);
  1901. ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
  1902. ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
  1903. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
  1904. 2);
  1905. ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
  1906. "toto");
  1907. }
  1908. TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) {
  1909. Options options = CurrentOptions();
  1910. DestroyAndReopen(options);
  1911. const int kNumKeys = 10000;
  1912. // Insert keys using normal path and take a snapshot
  1913. for (int i = 0; i < kNumKeys; i++) {
  1914. ASSERT_OK(Put(Key(i), Key(i) + "_V1"));
  1915. }
  1916. const Snapshot* snap = db_->GetSnapshot();
  1917. // Overwrite all keys using IngestExternalFile
  1918. std::string sst_file_path = sst_files_dir_ + "file1.sst";
  1919. SstFileWriter sst_file_writer(EnvOptions(), options);
  1920. ASSERT_OK(sst_file_writer.Open(sst_file_path));
  1921. for (int i = 0; i < kNumKeys; i++) {
  1922. ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_V2"));
  1923. }
  1924. ASSERT_OK(sst_file_writer.Finish());
  1925. IngestExternalFileOptions ifo;
  1926. ifo.move_files = true;
  1927. ASSERT_OK(db_->IngestExternalFile({sst_file_path}, ifo));
  1928. for (int i = 0; i < kNumKeys; i++) {
  1929. ASSERT_EQ(Get(Key(i), snap), Key(i) + "_V1");
  1930. ASSERT_EQ(Get(Key(i)), Key(i) + "_V2");
  1931. }
  1932. db_->ReleaseSnapshot(snap);
  1933. }
  1934. TEST_P(ExternalSSTFileTest, IngestBehind) {
  1935. Options options = CurrentOptions();
  1936. options.compaction_style = kCompactionStyleUniversal;
  1937. options.num_levels = 3;
  1938. options.disable_auto_compactions = false;
  1939. DestroyAndReopen(options);
  1940. std::vector<std::pair<std::string, std::string>> file_data;
  1941. std::map<std::string, std::string> true_data;
  1942. // Insert 100 -> 200 into the memtable
  1943. for (int i = 100; i <= 200; i++) {
  1944. ASSERT_OK(Put(Key(i), "memtable"));
  1945. true_data[Key(i)] = "memtable";
  1946. }
  1947. // Insert 100 -> 200 using IngestExternalFile
  1948. file_data.clear();
  1949. for (int i = 0; i <= 20; i++) {
  1950. file_data.emplace_back(Key(i), "ingest_behind");
  1951. }
  1952. bool allow_global_seqno = true;
  1953. bool ingest_behind = true;
  1954. bool write_global_seqno = std::get<0>(GetParam());
  1955. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1956. // Can't ingest behind since allow_ingest_behind isn't set to true
  1957. ASSERT_NOK(GenerateAndAddExternalFile(
  1958. options, file_data, -1, allow_global_seqno, write_global_seqno,
  1959. verify_checksums_before_ingest, ingest_behind, false /*sort_data*/,
  1960. &true_data));
  1961. options.allow_ingest_behind = true;
  1962. // check that we still can open the DB, as num_levels should be
  1963. // sanitized to 3
  1964. options.num_levels = 2;
  1965. DestroyAndReopen(options);
  1966. options.num_levels = 3;
  1967. DestroyAndReopen(options);
  1968. // Insert 100 -> 200 into the memtable
  1969. for (int i = 100; i <= 200; i++) {
  1970. ASSERT_OK(Put(Key(i), "memtable"));
  1971. true_data[Key(i)] = "memtable";
  1972. }
  1973. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  1974. // Universal picker should go at second from the bottom level
  1975. ASSERT_EQ("0,1", FilesPerLevel());
  1976. ASSERT_OK(GenerateAndAddExternalFile(
  1977. options, file_data, -1, allow_global_seqno, write_global_seqno,
  1978. verify_checksums_before_ingest, true /*ingest_behind*/,
  1979. false /*sort_data*/, &true_data));
  1980. ASSERT_EQ("0,1,1", FilesPerLevel());
  1981. // this time ingest should fail as the file doesn't fit to the bottom level
  1982. ASSERT_NOK(GenerateAndAddExternalFile(
  1983. options, file_data, -1, allow_global_seqno, write_global_seqno,
  1984. verify_checksums_before_ingest, true /*ingest_behind*/,
  1985. false /*sort_data*/, &true_data));
  1986. ASSERT_EQ("0,1,1", FilesPerLevel());
  1987. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  1988. // bottom level should be empty
  1989. ASSERT_EQ("0,1", FilesPerLevel());
  1990. size_t kcnt = 0;
  1991. VerifyDBFromMap(true_data, &kcnt, false);
  1992. }
  1993. TEST_F(ExternalSSTFileTest, SkipBloomFilter) {
  1994. Options options = CurrentOptions();
  1995. BlockBasedTableOptions table_options;
  1996. table_options.filter_policy.reset(NewBloomFilterPolicy(10));
  1997. table_options.cache_index_and_filter_blocks = true;
  1998. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  1999. // Create external SST file and include bloom filters
  2000. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  2001. DestroyAndReopen(options);
  2002. {
  2003. std::string file_path = sst_files_dir_ + "sst_with_bloom.sst";
  2004. SstFileWriter sst_file_writer(EnvOptions(), options);
  2005. ASSERT_OK(sst_file_writer.Open(file_path));
  2006. ASSERT_OK(sst_file_writer.Put("Key1", "Value1"));
  2007. ASSERT_OK(sst_file_writer.Finish());
  2008. ASSERT_OK(
  2009. db_->IngestExternalFile({file_path}, IngestExternalFileOptions()));
  2010. ASSERT_EQ(Get("Key1"), "Value1");
  2011. ASSERT_GE(
  2012. options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 1);
  2013. }
  2014. // Create external SST file but skip bloom filters
  2015. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  2016. DestroyAndReopen(options);
  2017. {
  2018. std::string file_path = sst_files_dir_ + "sst_with_no_bloom.sst";
  2019. SstFileWriter sst_file_writer(EnvOptions(), options, nullptr, true,
  2020. Env::IOPriority::IO_TOTAL,
  2021. true /* skip_filters */);
  2022. ASSERT_OK(sst_file_writer.Open(file_path));
  2023. ASSERT_OK(sst_file_writer.Put("Key1", "Value1"));
  2024. ASSERT_OK(sst_file_writer.Finish());
  2025. ASSERT_OK(
  2026. db_->IngestExternalFile({file_path}, IngestExternalFileOptions()));
  2027. ASSERT_EQ(Get("Key1"), "Value1");
  2028. ASSERT_EQ(
  2029. options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 0);
  2030. }
  2031. }
  2032. TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
  2033. if (!ZSTD_Supported()) {
  2034. return;
  2035. }
  2036. const int kNumEntries = 1 << 10;
  2037. const int kNumBytesPerEntry = 1 << 10;
  2038. Options options = CurrentOptions();
  2039. options.compression = kZSTD;
  2040. options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
  2041. options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
  2042. DestroyAndReopen(options);
  2043. std::atomic<int> num_compression_dicts(0);
  2044. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2045. "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
  2046. [&](void* /* arg */) { ++num_compression_dicts; });
  2047. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2048. Random rnd(301);
  2049. std::vector<std::pair<std::string, std::string>> random_data;
  2050. for (int i = 0; i < kNumEntries; i++) {
  2051. std::string val;
  2052. test::RandomString(&rnd, kNumBytesPerEntry, &val);
  2053. random_data.emplace_back(Key(i), std::move(val));
  2054. }
  2055. ASSERT_OK(GenerateAndAddExternalFile(options, std::move(random_data)));
  2056. ASSERT_EQ(1, num_compression_dicts);
  2057. }
  2058. TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
  2059. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2060. new FaultInjectionTestEnv(env_));
  2061. Options options = CurrentOptions();
  2062. options.env = fault_injection_env.get();
  2063. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2064. std::vector<ColumnFamilyHandle*> column_families;
  2065. column_families.push_back(handles_[0]);
  2066. column_families.push_back(handles_[1]);
  2067. column_families.push_back(handles_[2]);
  2068. std::vector<IngestExternalFileOptions> ifos(column_families.size());
  2069. for (auto& ifo : ifos) {
  2070. ifo.allow_global_seqno = true; // Always allow global_seqno
  2071. // May or may not write global_seqno
  2072. ifo.write_global_seqno = std::get<0>(GetParam());
  2073. // Whether to verify checksums before ingestion
  2074. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  2075. }
  2076. std::vector<std::vector<std::pair<std::string, std::string>>> data;
  2077. data.push_back(
  2078. {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
  2079. data.push_back(
  2080. {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
  2081. data.push_back(
  2082. {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
  2083. // Resize the true_data vector upon construction to avoid re-alloc
  2084. std::vector<std::map<std::string, std::string>> true_data(
  2085. column_families.size());
  2086. Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
  2087. -1, true, true_data);
  2088. ASSERT_OK(s);
  2089. Close();
  2090. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
  2091. options);
  2092. ASSERT_EQ(3, handles_.size());
  2093. int cf = 0;
  2094. for (const auto& verify_map : true_data) {
  2095. for (const auto& elem : verify_map) {
  2096. const std::string& key = elem.first;
  2097. const std::string& value = elem.second;
  2098. ASSERT_EQ(value, Get(cf, key));
  2099. }
  2100. ++cf;
  2101. }
  2102. Close();
  2103. Destroy(options, true /* delete_cf_paths */);
  2104. }
  2105. TEST_P(ExternalSSTFileTest,
  2106. IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) {
  2107. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2108. new FaultInjectionTestEnv(env_));
  2109. SyncPoint::GetInstance()->DisableProcessing();
  2110. SyncPoint::GetInstance()->ClearAllCallBacks();
  2111. SyncPoint::GetInstance()->LoadDependency({
  2112. {"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0",
  2113. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
  2114. "BeforeRead"},
  2115. {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
  2116. "AfterRead",
  2117. "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"},
  2118. });
  2119. SyncPoint::GetInstance()->EnableProcessing();
  2120. Options options = CurrentOptions();
  2121. options.env = fault_injection_env.get();
  2122. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2123. const std::vector<std::map<std::string, std::string>> data_before_ingestion =
  2124. {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
  2125. {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
  2126. {{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
  2127. for (size_t i = 0; i != handles_.size(); ++i) {
  2128. int cf = static_cast<int>(i);
  2129. const auto& orig_data = data_before_ingestion[i];
  2130. for (const auto& kv : orig_data) {
  2131. ASSERT_OK(Put(cf, kv.first, kv.second));
  2132. }
  2133. ASSERT_OK(Flush(cf));
  2134. }
  2135. std::vector<ColumnFamilyHandle*> column_families;
  2136. column_families.push_back(handles_[0]);
  2137. column_families.push_back(handles_[1]);
  2138. column_families.push_back(handles_[2]);
  2139. std::vector<IngestExternalFileOptions> ifos(column_families.size());
  2140. for (auto& ifo : ifos) {
  2141. ifo.allow_global_seqno = true; // Always allow global_seqno
  2142. // May or may not write global_seqno
  2143. ifo.write_global_seqno = std::get<0>(GetParam());
  2144. // Whether to verify checksums before ingestion
  2145. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  2146. }
  2147. std::vector<std::vector<std::pair<std::string, std::string>>> data;
  2148. data.push_back(
  2149. {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
  2150. data.push_back(
  2151. {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
  2152. data.push_back(
  2153. {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
  2154. // Resize the true_data vector upon construction to avoid re-alloc
  2155. std::vector<std::map<std::string, std::string>> true_data(
  2156. column_families.size());
  2157. // Take snapshot before ingestion starts
  2158. ReadOptions read_opts;
  2159. read_opts.total_order_seek = true;
  2160. read_opts.snapshot = dbfull()->GetSnapshot();
  2161. std::vector<Iterator*> iters(handles_.size());
  2162. // Range scan checks first kv of each CF before ingestion starts.
  2163. for (size_t i = 0; i != handles_.size(); ++i) {
  2164. iters[i] = dbfull()->NewIterator(read_opts, handles_[i]);
  2165. iters[i]->SeekToFirst();
  2166. ASSERT_TRUE(iters[i]->Valid());
  2167. const std::string& key = iters[i]->key().ToString();
  2168. const std::string& value = iters[i]->value().ToString();
  2169. const std::map<std::string, std::string>& orig_data =
  2170. data_before_ingestion[i];
  2171. std::map<std::string, std::string>::const_iterator it = orig_data.find(key);
  2172. ASSERT_NE(orig_data.end(), it);
  2173. ASSERT_EQ(it->second, value);
  2174. iters[i]->Next();
  2175. }
  2176. port::Thread ingest_thread([&]() {
  2177. ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
  2178. -1, true, true_data));
  2179. });
  2180. TEST_SYNC_POINT(
  2181. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
  2182. "BeforeRead");
  2183. // Should see only data before ingestion
  2184. for (size_t i = 0; i != handles_.size(); ++i) {
  2185. const auto& orig_data = data_before_ingestion[i];
  2186. for (; iters[i]->Valid(); iters[i]->Next()) {
  2187. const std::string& key = iters[i]->key().ToString();
  2188. const std::string& value = iters[i]->value().ToString();
  2189. std::map<std::string, std::string>::const_iterator it =
  2190. orig_data.find(key);
  2191. ASSERT_NE(orig_data.end(), it);
  2192. ASSERT_EQ(it->second, value);
  2193. }
  2194. }
  2195. TEST_SYNC_POINT(
  2196. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
  2197. "AfterRead");
  2198. ingest_thread.join();
  2199. for (auto* iter : iters) {
  2200. delete iter;
  2201. }
  2202. iters.clear();
  2203. dbfull()->ReleaseSnapshot(read_opts.snapshot);
  2204. Close();
  2205. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
  2206. options);
  2207. // Should see consistent state after ingestion for all column families even
  2208. // without snapshot.
  2209. ASSERT_EQ(3, handles_.size());
  2210. int cf = 0;
  2211. for (const auto& verify_map : true_data) {
  2212. for (const auto& elem : verify_map) {
  2213. const std::string& key = elem.first;
  2214. const std::string& value = elem.second;
  2215. ASSERT_EQ(value, Get(cf, key));
  2216. }
  2217. ++cf;
  2218. }
  2219. Close();
  2220. Destroy(options, true /* delete_cf_paths */);
  2221. }
  2222. TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
  2223. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2224. new FaultInjectionTestEnv(env_));
  2225. Options options = CurrentOptions();
  2226. options.env = fault_injection_env.get();
  2227. SyncPoint::GetInstance()->DisableProcessing();
  2228. SyncPoint::GetInstance()->ClearAllCallBacks();
  2229. SyncPoint::GetInstance()->LoadDependency({
  2230. {"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0",
  2231. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
  2232. "0"},
  2233. {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
  2234. "1",
  2235. "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
  2236. });
  2237. SyncPoint::GetInstance()->EnableProcessing();
  2238. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2239. std::vector<ColumnFamilyHandle*> column_families;
  2240. column_families.push_back(handles_[0]);
  2241. column_families.push_back(handles_[1]);
  2242. column_families.push_back(handles_[2]);
  2243. std::vector<IngestExternalFileOptions> ifos(column_families.size());
  2244. for (auto& ifo : ifos) {
  2245. ifo.allow_global_seqno = true; // Always allow global_seqno
  2246. // May or may not write global_seqno
  2247. ifo.write_global_seqno = std::get<0>(GetParam());
  2248. // Whether to verify block checksums before ingest
  2249. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  2250. }
  2251. std::vector<std::vector<std::pair<std::string, std::string>>> data;
  2252. data.push_back(
  2253. {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
  2254. data.push_back(
  2255. {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
  2256. data.push_back(
  2257. {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
  2258. // Resize the true_data vector upon construction to avoid re-alloc
  2259. std::vector<std::map<std::string, std::string>> true_data(
  2260. column_families.size());
  2261. port::Thread ingest_thread([&]() {
  2262. Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
  2263. -1, true, true_data);
  2264. ASSERT_NOK(s);
  2265. });
  2266. TEST_SYNC_POINT(
  2267. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
  2268. "0");
  2269. fault_injection_env->SetFilesystemActive(false);
  2270. TEST_SYNC_POINT(
  2271. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
  2272. "1");
  2273. ingest_thread.join();
  2274. fault_injection_env->SetFilesystemActive(true);
  2275. Close();
  2276. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
  2277. options);
  2278. ASSERT_EQ(3, handles_.size());
  2279. int cf = 0;
  2280. for (const auto& verify_map : true_data) {
  2281. for (const auto& elem : verify_map) {
  2282. const std::string& key = elem.first;
  2283. ASSERT_EQ("NOT_FOUND", Get(cf, key));
  2284. }
  2285. ++cf;
  2286. }
  2287. Close();
  2288. Destroy(options, true /* delete_cf_paths */);
  2289. }
  2290. TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
  2291. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2292. new FaultInjectionTestEnv(env_));
  2293. Options options = CurrentOptions();
  2294. options.env = fault_injection_env.get();
  2295. SyncPoint::GetInstance()->DisableProcessing();
  2296. SyncPoint::GetInstance()->ClearAllCallBacks();
  2297. SyncPoint::GetInstance()->LoadDependency({
  2298. {"DBImpl::IngestExternalFiles:BeforeJobsRun:0",
  2299. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
  2300. "0"},
  2301. {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
  2302. "1",
  2303. "DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
  2304. });
  2305. SyncPoint::GetInstance()->EnableProcessing();
  2306. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2307. std::vector<ColumnFamilyHandle*> column_families;
  2308. column_families.push_back(handles_[0]);
  2309. column_families.push_back(handles_[1]);
  2310. column_families.push_back(handles_[2]);
  2311. std::vector<IngestExternalFileOptions> ifos(column_families.size());
  2312. for (auto& ifo : ifos) {
  2313. ifo.allow_global_seqno = true; // Always allow global_seqno
  2314. // May or may not write global_seqno
  2315. ifo.write_global_seqno = std::get<0>(GetParam());
  2316. // Whether to verify block checksums before ingestion
  2317. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  2318. }
  2319. std::vector<std::vector<std::pair<std::string, std::string>>> data;
  2320. data.push_back(
  2321. {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
  2322. data.push_back(
  2323. {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
  2324. data.push_back(
  2325. {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
  2326. // Resize the true_data vector upon construction to avoid re-alloc
  2327. std::vector<std::map<std::string, std::string>> true_data(
  2328. column_families.size());
  2329. port::Thread ingest_thread([&]() {
  2330. Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
  2331. -1, true, true_data);
  2332. ASSERT_NOK(s);
  2333. });
  2334. TEST_SYNC_POINT(
  2335. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
  2336. "0");
  2337. fault_injection_env->SetFilesystemActive(false);
  2338. TEST_SYNC_POINT(
  2339. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
  2340. "1");
  2341. ingest_thread.join();
  2342. fault_injection_env->SetFilesystemActive(true);
  2343. Close();
  2344. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
  2345. options);
  2346. ASSERT_EQ(3, handles_.size());
  2347. int cf = 0;
  2348. for (const auto& verify_map : true_data) {
  2349. for (const auto& elem : verify_map) {
  2350. const std::string& key = elem.first;
  2351. ASSERT_EQ("NOT_FOUND", Get(cf, key));
  2352. }
  2353. ++cf;
  2354. }
  2355. Close();
  2356. Destroy(options, true /* delete_cf_paths */);
  2357. }
  2358. TEST_P(ExternalSSTFileTest,
  2359. IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) {
  2360. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2361. new FaultInjectionTestEnv(env_));
  2362. Options options = CurrentOptions();
  2363. options.env = fault_injection_env.get();
  2364. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2365. SyncPoint::GetInstance()->ClearTrace();
  2366. SyncPoint::GetInstance()->DisableProcessing();
  2367. SyncPoint::GetInstance()->ClearAllCallBacks();
  2368. SyncPoint::GetInstance()->LoadDependency({
  2369. {"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
  2370. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
  2371. "PartialManifestWriteFail:0"},
  2372. {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
  2373. "PartialManifestWriteFail:1",
  2374. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"},
  2375. });
  2376. SyncPoint::GetInstance()->EnableProcessing();
  2377. std::vector<ColumnFamilyHandle*> column_families;
  2378. column_families.push_back(handles_[0]);
  2379. column_families.push_back(handles_[1]);
  2380. column_families.push_back(handles_[2]);
  2381. std::vector<IngestExternalFileOptions> ifos(column_families.size());
  2382. for (auto& ifo : ifos) {
  2383. ifo.allow_global_seqno = true; // Always allow global_seqno
  2384. // May or may not write global_seqno
  2385. ifo.write_global_seqno = std::get<0>(GetParam());
  2386. // Whether to verify block checksums before ingestion
  2387. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  2388. }
  2389. std::vector<std::vector<std::pair<std::string, std::string>>> data;
  2390. data.push_back(
  2391. {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
  2392. data.push_back(
  2393. {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
  2394. data.push_back(
  2395. {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
  2396. // Resize the true_data vector upon construction to avoid re-alloc
  2397. std::vector<std::map<std::string, std::string>> true_data(
  2398. column_families.size());
  2399. port::Thread ingest_thread([&]() {
  2400. Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
  2401. -1, true, true_data);
  2402. ASSERT_NOK(s);
  2403. });
  2404. TEST_SYNC_POINT(
  2405. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
  2406. "PartialManifestWriteFail:0");
  2407. fault_injection_env->SetFilesystemActive(false);
  2408. TEST_SYNC_POINT(
  2409. "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
  2410. "PartialManifestWriteFail:1");
  2411. ingest_thread.join();
  2412. fault_injection_env->DropUnsyncedFileData();
  2413. fault_injection_env->SetFilesystemActive(true);
  2414. Close();
  2415. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
  2416. options);
  2417. ASSERT_EQ(3, handles_.size());
  2418. int cf = 0;
  2419. for (const auto& verify_map : true_data) {
  2420. for (const auto& elem : verify_map) {
  2421. const std::string& key = elem.first;
  2422. ASSERT_EQ("NOT_FOUND", Get(cf, key));
  2423. }
  2424. ++cf;
  2425. }
  2426. Close();
  2427. Destroy(options, true /* delete_cf_paths */);
  2428. }
  2429. TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
  2430. Options options = CurrentOptions();
  2431. // Use large buffer to avoid memtable flush
  2432. options.write_buffer_size = 1024 * 1024;
  2433. options.two_write_queues = true;
  2434. DestroyAndReopen(options);
  2435. ASSERT_OK(dbfull()->Put(WriteOptions(), "1000", "v1"));
  2436. ASSERT_OK(dbfull()->Put(WriteOptions(), "1001", "v1"));
  2437. ASSERT_OK(dbfull()->Put(WriteOptions(), "9999", "v1"));
  2438. // Put one key which is overlap with keys in memtable.
  2439. // It will trigger flushing memtable and require this thread is
  2440. // currently at the front of the 2nd writer queue. We must make
  2441. // sure that it won't enter the 2nd writer queue for the second time.
  2442. std::vector<std::pair<std::string, std::string>> data;
  2443. data.push_back(std::make_pair("1001", "v2"));
  2444. GenerateAndAddExternalFile(options, data);
  2445. }
  2446. INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
  2447. testing::Values(std::make_tuple(false, false),
  2448. std::make_tuple(false, true),
  2449. std::make_tuple(true, false),
  2450. std::make_tuple(true, true)));
  2451. INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
  2452. ExternSSTFileLinkFailFallbackTest,
  2453. testing::Values(std::make_tuple(true, false),
  2454. std::make_tuple(true, true),
  2455. std::make_tuple(false, false)));
  2456. } // namespace ROCKSDB_NAMESPACE
  2457. int main(int argc, char** argv) {
  2458. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2459. ::testing::InitGoogleTest(&argc, argv);
  2460. return RUN_ALL_TESTS();
  2461. }
  2462. #else
  2463. #include <stdio.h>
  2464. int main(int /*argc*/, char** /*argv*/) {
  2465. fprintf(stderr,
  2466. "SKIPPED as External SST File Writer and Ingestion are not supported "
  2467. "in ROCKSDB_LITE\n");
  2468. return 0;
  2469. }
  2470. #endif // !ROCKSDB_LITE