write_prepared_transaction_test.cc 159 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <algorithm>
  6. #include <atomic>
  7. #include <cinttypes>
  8. #include <functional>
  9. #include <string>
  10. #include <thread>
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/dbformat.h"
  13. #include "port/port.h"
  14. #include "port/stack_trace.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/options.h"
  17. #include "rocksdb/types.h"
  18. #include "rocksdb/utilities/debug.h"
  19. #include "rocksdb/utilities/transaction.h"
  20. #include "rocksdb/utilities/transaction_db.h"
  21. #include "table/mock_table.h"
  22. #include "test_util/sync_point.h"
  23. #include "test_util/testharness.h"
  24. #include "test_util/testutil.h"
  25. #include "test_util/transaction_test_util.h"
  26. #include "util/mutexlock.h"
  27. #include "util/random.h"
  28. #include "util/string_util.h"
  29. #include "utilities/fault_injection_env.h"
  30. #include "utilities/merge_operators.h"
  31. #include "utilities/merge_operators/string_append/stringappend.h"
  32. #include "utilities/transactions/pessimistic_transaction_db.h"
  33. #include "utilities/transactions/transaction_test.h"
  34. #include "utilities/transactions/write_prepared_txn_db.h"
  35. using std::string;
  36. namespace ROCKSDB_NAMESPACE {
  37. using CommitEntry = WritePreparedTxnDB::CommitEntry;
  38. using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
  39. using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
  40. TEST(PreparedHeap, BasicsTest) {
  41. WritePreparedTxnDB::PreparedHeap heap;
  42. {
  43. MutexLock ml(heap.push_pop_mutex());
  44. heap.push(14l);
  45. // Test with one element
  46. ASSERT_EQ(14l, heap.top());
  47. heap.push(24l);
  48. heap.push(34l);
  49. // Test that old min is still on top
  50. ASSERT_EQ(14l, heap.top());
  51. heap.push(44l);
  52. heap.push(54l);
  53. heap.push(64l);
  54. heap.push(74l);
  55. heap.push(84l);
  56. }
  57. // Test that old min is still on top
  58. ASSERT_EQ(14l, heap.top());
  59. heap.erase(24l);
  60. // Test that old min is still on top
  61. ASSERT_EQ(14l, heap.top());
  62. heap.erase(14l);
  63. // Test that the new comes to the top after multiple erase
  64. ASSERT_EQ(34l, heap.top());
  65. heap.erase(34l);
  66. // Test that the new comes to the top after single erase
  67. ASSERT_EQ(44l, heap.top());
  68. heap.erase(54l);
  69. ASSERT_EQ(44l, heap.top());
  70. heap.pop(); // pop 44l
  71. // Test that the erased items are ignored after pop
  72. ASSERT_EQ(64l, heap.top());
  73. heap.erase(44l);
  74. // Test that erasing an already popped item would work
  75. ASSERT_EQ(64l, heap.top());
  76. heap.erase(84l);
  77. ASSERT_EQ(64l, heap.top());
  78. {
  79. MutexLock ml(heap.push_pop_mutex());
  80. heap.push(85l);
  81. heap.push(86l);
  82. heap.push(87l);
  83. heap.push(88l);
  84. heap.push(89l);
  85. }
  86. heap.erase(87l);
  87. heap.erase(85l);
  88. heap.erase(89l);
  89. heap.erase(86l);
  90. heap.erase(88l);
  91. // Test top remains the same after a random order of many erases
  92. ASSERT_EQ(64l, heap.top());
  93. heap.pop();
  94. // Test that pop works with a series of random pending erases
  95. ASSERT_EQ(74l, heap.top());
  96. ASSERT_FALSE(heap.empty());
  97. heap.pop();
  98. // Test that empty works
  99. ASSERT_TRUE(heap.empty());
  100. }
  101. // This is a scenario reconstructed from a buggy trace. Test that the bug does
  102. // not resurface again.
  103. TEST(PreparedHeap, EmptyAtTheEnd) {
  104. WritePreparedTxnDB::PreparedHeap heap;
  105. {
  106. MutexLock ml(heap.push_pop_mutex());
  107. heap.push(40l);
  108. }
  109. ASSERT_EQ(40l, heap.top());
  110. // Although not a recommended scenario, we must be resilient against erase
  111. // without a prior push.
  112. heap.erase(50l);
  113. ASSERT_EQ(40l, heap.top());
  114. {
  115. MutexLock ml(heap.push_pop_mutex());
  116. heap.push(60l);
  117. }
  118. ASSERT_EQ(40l, heap.top());
  119. heap.erase(60l);
  120. ASSERT_EQ(40l, heap.top());
  121. heap.erase(40l);
  122. ASSERT_TRUE(heap.empty());
  123. {
  124. MutexLock ml(heap.push_pop_mutex());
  125. heap.push(40l);
  126. }
  127. ASSERT_EQ(40l, heap.top());
  128. heap.erase(50l);
  129. ASSERT_EQ(40l, heap.top());
  130. {
  131. MutexLock ml(heap.push_pop_mutex());
  132. heap.push(60l);
  133. }
  134. ASSERT_EQ(40l, heap.top());
  135. heap.erase(40l);
  136. // Test that the erase has not emptied the heap (we had a bug doing that)
  137. ASSERT_FALSE(heap.empty());
  138. ASSERT_EQ(60l, heap.top());
  139. heap.erase(60l);
  140. ASSERT_TRUE(heap.empty());
  141. }
  142. // Generate random order of PreparedHeap access and test that the heap will be
  143. // successfully emptied at the end.
  144. TEST(PreparedHeap, Concurrent) {
  145. const size_t t_cnt = 10;
  146. ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
  147. WritePreparedTxnDB::PreparedHeap heap;
  148. port::RWMutex prepared_mutex;
  149. std::atomic<size_t> last;
  150. for (size_t n = 0; n < 100; n++) {
  151. last = 0;
  152. t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
  153. Random rnd(1103);
  154. for (size_t seq = 1; seq <= t_cnt; seq++) {
  155. // This is not recommended usage but we should be resilient against it.
  156. bool skip_push = rnd.OneIn(5);
  157. if (!skip_push) {
  158. MutexLock ml(heap.push_pop_mutex());
  159. std::this_thread::yield();
  160. heap.push(seq);
  161. last.store(seq);
  162. }
  163. }
  164. });
  165. for (size_t i = 1; i <= t_cnt; i++) {
  166. t[i] =
  167. ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
  168. auto seq = i;
  169. do {
  170. std::this_thread::yield();
  171. } while (last.load() < seq);
  172. WriteLock wl(&prepared_mutex);
  173. heap.erase(seq);
  174. });
  175. }
  176. for (size_t i = 0; i <= t_cnt; i++) {
  177. t[i].join();
  178. }
  179. ASSERT_TRUE(heap.empty());
  180. }
  181. }
  182. // Test that WriteBatchWithIndex correctly counts the number of sub-batches
  183. TEST(WriteBatchWithIndex, SubBatchCnt) {
  184. ColumnFamilyOptions cf_options;
  185. std::string cf_name = "two";
  186. DB* db;
  187. Options options;
  188. options.create_if_missing = true;
  189. const std::string dbname = test::PerThreadDBPath("transaction_testdb");
  190. EXPECT_OK(DestroyDB(dbname, options));
  191. ASSERT_OK(DB::Open(options, dbname, &db));
  192. ColumnFamilyHandle* cf_handle = nullptr;
  193. ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
  194. WriteOptions write_options;
  195. size_t batch_cnt = 1;
  196. size_t save_points = 0;
  197. std::vector<size_t> batch_cnt_at;
  198. WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
  199. 0);
  200. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  201. batch_cnt_at.push_back(batch_cnt);
  202. batch.SetSavePoint();
  203. save_points++;
  204. ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
  205. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  206. batch_cnt_at.push_back(batch_cnt);
  207. batch.SetSavePoint();
  208. save_points++;
  209. ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
  210. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  211. // duplicate the keys
  212. batch_cnt_at.push_back(batch_cnt);
  213. batch.SetSavePoint();
  214. save_points++;
  215. ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
  216. batch_cnt++;
  217. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  218. // duplicate the 2nd key. It should not be counted duplicate since a
  219. // sub-patch is cut after the last duplicate.
  220. batch_cnt_at.push_back(batch_cnt);
  221. batch.SetSavePoint();
  222. save_points++;
  223. ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
  224. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  225. // duplicate the keys but in a different cf. It should not be counted as
  226. // duplicate keys
  227. batch_cnt_at.push_back(batch_cnt);
  228. batch.SetSavePoint();
  229. save_points++;
  230. ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
  231. ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
  232. // Test that the number of sub-batches matches what we count with
  233. // SubBatchCounter
  234. std::map<uint32_t, const Comparator*> comparators;
  235. comparators[0] = db->DefaultColumnFamily()->GetComparator();
  236. comparators[cf_handle->GetID()] = cf_handle->GetComparator();
  237. SubBatchCounter counter(comparators);
  238. ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
  239. ASSERT_EQ(batch_cnt, counter.BatchCount());
  240. // Test that RollbackToSavePoint will properly resets the number of
  241. // sub-batches
  242. for (size_t i = save_points; i > 0; i--) {
  243. ASSERT_OK(batch.RollbackToSavePoint());
  244. ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
  245. }
  246. // Test the count is right with random batches
  247. {
  248. const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
  249. Random rnd(1131);
  250. std::string keys[TOTAL_KEYS];
  251. for (size_t k = 0; k < TOTAL_KEYS; k++) {
  252. int len = static_cast<int>(rnd.Uniform(50));
  253. keys[k] = test::RandomKey(&rnd, len);
  254. }
  255. for (size_t i = 0; i < 1000; i++) { // 1000 random batches
  256. WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
  257. 0, true, 0);
  258. for (size_t k = 0; k < 10; k++) { // 10 key per batch
  259. size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
  260. Slice key = Slice(keys[ki]);
  261. std::string tmp = rnd.RandomString(16);
  262. Slice value = Slice(tmp);
  263. ASSERT_OK(rndbatch.Put(key, value));
  264. }
  265. SubBatchCounter batch_counter(comparators);
  266. ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
  267. ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
  268. }
  269. }
  270. delete cf_handle;
  271. delete db;
  272. }
  273. TEST(CommitEntry64b, BasicTest) {
  274. const size_t INDEX_BITS = static_cast<size_t>(21);
  275. const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
  276. const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
  277. // zero-initialized CommitEntry64b should indicate an empty entry
  278. CommitEntry64b empty_entry64b;
  279. uint64_t empty_index = 11ul;
  280. CommitEntry empty_entry;
  281. bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
  282. ASSERT_FALSE(ok);
  283. // the zero entry is reserved for un-initialized entries
  284. const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
  285. // Samples over the numbers that are covered by that many index bits
  286. std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
  287. // Samples over the numbers that are covered by that many commit bits
  288. std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
  289. // Iterate over prepare numbers that have i) cover all bits of a sequence
  290. // number, and ii) include some bits that fall into the range of index or
  291. // commit bits
  292. for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
  293. for (uint64_t i : is) {
  294. for (uint64_t d : ds) {
  295. uint64_t p = base + i + d;
  296. for (uint64_t c : {p, p + d / 2, p + d}) {
  297. uint64_t index = p % INDEX_SIZE;
  298. CommitEntry before(p, c), after;
  299. CommitEntry64b entry64b(before, FORMAT);
  300. ok = entry64b.Parse(index, &after, FORMAT);
  301. ASSERT_TRUE(ok);
  302. if (!(before == after)) {
  303. printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
  304. " c %" PRIu64 " index %" PRIu64 "\n",
  305. base, i, d, p, c, index);
  306. }
  307. ASSERT_EQ(before, after);
  308. }
  309. }
  310. }
  311. }
  312. }
  313. class WritePreparedTxnDBMock : public WritePreparedTxnDB {
  314. public:
  315. WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
  316. : WritePreparedTxnDB(db_impl, opt) {}
  317. void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
  318. snapshots_ = snapshots;
  319. }
  320. void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
  321. protected:
  322. const std::vector<SequenceNumber> GetSnapshotListFromDB(
  323. SequenceNumber /* unused */) override {
  324. return snapshots_;
  325. }
  326. private:
  327. std::vector<SequenceNumber> snapshots_;
  328. };
  329. class WritePreparedTransactionTestBase : public TransactionTestBase {
  330. public:
  331. WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
  332. TxnDBWritePolicy write_policy,
  333. WriteOrdering write_ordering,
  334. bool user_per_key_point_lock_mgr,
  335. int64_t deadlock_timeout_us)
  336. : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
  337. write_ordering, user_per_key_point_lock_mgr,
  338. deadlock_timeout_us) {}
  339. protected:
  340. void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
  341. size_t commit_cache_bits) {
  342. txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
  343. txn_db_options.wp_commit_cache_bits = commit_cache_bits;
  344. }
  345. void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
  346. txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
  347. }
  348. // If expect_update is set, check if it actually updated old_commit_map_. If
  349. // it did not and yet suggested not to check the next snapshot, do the
  350. // opposite to check if it was not a bad suggestion.
  351. void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
  352. uint64_t snapshot,
  353. uint64_t next_snapshot,
  354. bool expect_update) {
  355. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  356. // reset old_commit_map_empty_ so that its value indicate whether
  357. // old_commit_map_ was updated
  358. wp_db->old_commit_map_empty_ = true;
  359. bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
  360. snapshot < next_snapshot);
  361. if (expect_update == wp_db->old_commit_map_empty_) {
  362. printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
  363. " next: %" PRIu64 "\n",
  364. prepare, commit, snapshot, next_snapshot);
  365. }
  366. EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
  367. if (!check_next && wp_db->old_commit_map_empty_) {
  368. // do the opposite to make sure it was not a bad suggestion
  369. const bool dont_care_bool = true;
  370. wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
  371. dont_care_bool);
  372. if (!wp_db->old_commit_map_empty_) {
  373. printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
  374. " next: %" PRIu64 "\n",
  375. prepare, commit, snapshot, next_snapshot);
  376. }
  377. EXPECT_TRUE(wp_db->old_commit_map_empty_);
  378. }
  379. }
  380. // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
  381. // miss a snapshot because of a concurrent update by UpdateSnapshots that is
  382. // writing new_snapshots. Both threads are broken at two points. The sync
  383. // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
  384. // entry is expected to be vital for one of the snapshots that is common
  385. // between the old and new list of snapshots.
  386. void SnapshotConcurrentAccessTestInternal(
  387. WritePreparedTxnDB* wp_db,
  388. const std::vector<SequenceNumber>& old_snapshots,
  389. const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
  390. SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
  391. // First reset the snapshot list
  392. const std::vector<SequenceNumber> empty_snapshots;
  393. wp_db->old_commit_map_empty_ = true;
  394. wp_db->UpdateSnapshots(empty_snapshots, ++version);
  395. // Then initialize it with the old_snapshots
  396. wp_db->UpdateSnapshots(old_snapshots, ++version);
  397. // Starting from the first thread, cut each thread at two points
  398. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  399. {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
  400. "WritePreparedTxnDB::UpdateSnapshots:s:start"},
  401. {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
  402. "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
  403. {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
  404. "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
  405. {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
  406. "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
  407. {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
  408. "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
  409. });
  410. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  411. {
  412. ASSERT_TRUE(wp_db->old_commit_map_empty_);
  413. ROCKSDB_NAMESPACE::port::Thread t1(
  414. [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
  415. wp_db->CheckAgainstSnapshots(entry);
  416. t1.join();
  417. ASSERT_FALSE(wp_db->old_commit_map_empty_);
  418. }
  419. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  420. wp_db->old_commit_map_empty_ = true;
  421. wp_db->UpdateSnapshots(empty_snapshots, ++version);
  422. wp_db->UpdateSnapshots(old_snapshots, ++version);
  423. // Starting from the second thread, cut each thread at two points
  424. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  425. {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
  426. "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
  427. {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
  428. "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
  429. {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
  430. "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
  431. {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
  432. "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
  433. {"WritePreparedTxnDB::UpdateSnapshots:p:end",
  434. "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
  435. });
  436. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  437. {
  438. ASSERT_TRUE(wp_db->old_commit_map_empty_);
  439. ROCKSDB_NAMESPACE::port::Thread t1(
  440. [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
  441. wp_db->CheckAgainstSnapshots(entry);
  442. t1.join();
  443. ASSERT_FALSE(wp_db->old_commit_map_empty_);
  444. }
  445. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  446. }
  447. // Verify value of keys.
  448. void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
  449. const Snapshot* snapshot = nullptr) {
  450. std::string value;
  451. ReadOptions read_options;
  452. read_options.snapshot = snapshot;
  453. for (auto& kv : data) {
  454. auto s = db->Get(read_options, kv.first, &value);
  455. ASSERT_TRUE(s.ok() || s.IsNotFound());
  456. if (s.ok()) {
  457. if (kv.second != value) {
  458. printf("key = %s\n", kv.first.c_str());
  459. }
  460. ASSERT_EQ(kv.second, value);
  461. } else {
  462. ASSERT_EQ(kv.second, "NOT_FOUND");
  463. }
  464. // Try with MultiGet API too
  465. std::vector<std::string> values;
  466. auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
  467. {kv.first}, &values);
  468. ASSERT_EQ(1, values.size());
  469. ASSERT_EQ(1, s_vec.size());
  470. s = s_vec[0];
  471. ASSERT_TRUE(s.ok() || s.IsNotFound());
  472. if (s.ok()) {
  473. ASSERT_TRUE(kv.second == values[0]);
  474. } else {
  475. ASSERT_EQ(kv.second, "NOT_FOUND");
  476. }
  477. }
  478. }
  479. // Verify all versions of keys.
  480. void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
  481. std::vector<KeyVersion> versions;
  482. const size_t kMaxKeys = 100000;
  483. ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
  484. expected_versions.back().user_key, kMaxKeys,
  485. &versions));
  486. ASSERT_EQ(expected_versions.size(), versions.size());
  487. for (size_t i = 0; i < versions.size(); i++) {
  488. ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
  489. ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
  490. ASSERT_EQ(expected_versions[i].type, versions[i].type);
  491. if (versions[i].type != kTypeDeletion &&
  492. versions[i].type != kTypeSingleDeletion) {
  493. ASSERT_EQ(expected_versions[i].value, versions[i].value);
  494. }
  495. // Range delete not supported.
  496. ASSERT_NE(expected_versions[i].type, kTypeRangeDeletion);
  497. }
  498. }
  499. };
  500. class WritePreparedTransactionTest
  501. : public WritePreparedTransactionTestBase,
  502. virtual public ::testing::WithParamInterface<std::tuple<
  503. bool, bool, TxnDBWritePolicy, WriteOrdering, bool, int64_t>> {
  504. public:
  505. WritePreparedTransactionTest()
  506. : WritePreparedTransactionTestBase(
  507. std::get<0>(GetParam()), std::get<1>(GetParam()),
  508. std::get<2>(GetParam()), std::get<3>(GetParam()),
  509. std::get<4>(GetParam()), std::get<5>(GetParam())) {}
  510. };
  511. #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  512. class SnapshotConcurrentAccessTest
  513. : public WritePreparedTransactionTestBase,
  514. virtual public ::testing::WithParamInterface<
  515. std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering, size_t,
  516. size_t, bool, int64_t>> {
  517. public:
  518. SnapshotConcurrentAccessTest()
  519. : WritePreparedTransactionTestBase(
  520. std::get<0>(GetParam()), std::get<1>(GetParam()),
  521. std::get<2>(GetParam()), std::get<3>(GetParam()),
  522. std::get<6>(GetParam()), std::get<7>(GetParam())),
  523. split_id_(std::get<4>(GetParam())),
  524. split_cnt_(std::get<5>(GetParam())) {}
  525. protected:
  526. // A test is split into split_cnt_ tests, each identified with split_id_ where
  527. // 0 <= split_id_ < split_cnt_
  528. size_t split_id_;
  529. size_t split_cnt_;
  530. };
  531. #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  532. class SeqAdvanceConcurrentTest
  533. : public WritePreparedTransactionTestBase,
  534. virtual public ::testing::WithParamInterface<
  535. std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering, size_t,
  536. size_t, bool, int64_t>> {
  537. public:
  538. SeqAdvanceConcurrentTest()
  539. : WritePreparedTransactionTestBase(
  540. std::get<0>(GetParam()), std::get<1>(GetParam()),
  541. std::get<2>(GetParam()), std::get<3>(GetParam()),
  542. std::get<6>(GetParam()), std::get<7>(GetParam())),
  543. split_id_(std::get<4>(GetParam())),
  544. split_cnt_(std::get<5>(GetParam())) {
  545. special_env.skip_fsync_ = true;
  546. };
  547. protected:
  548. // A test is split into split_cnt_ tests, each identified with split_id_ where
  549. // 0 <= split_id_ < split_cnt_
  550. size_t split_id_;
  551. size_t split_cnt_;
  552. };
  553. constexpr std::array WritePreparedTransactionTest_Params = {
  554. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
  555. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
  556. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)};
  557. INSTANTIATE_TEST_CASE_P(
  558. WritePreparedTransaction, WritePreparedTransactionTest,
  559. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  560. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering),
  561. WritePreparedTransactionTest_Params)));
  562. #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  563. constexpr std::array TwoWriteQueue_SnapshotConcurrentAccessTest_Params = {
  564. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
  565. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
  566. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
  567. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
  568. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
  569. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
  570. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
  571. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
  572. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
  573. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
  574. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
  575. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
  576. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
  577. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
  578. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
  579. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
  580. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
  581. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
  582. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
  583. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
  584. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
  585. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
  586. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
  587. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
  588. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
  589. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
  590. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
  591. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
  592. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
  593. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
  594. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
  595. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
  596. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
  597. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
  598. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
  599. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
  600. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
  601. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
  602. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
  603. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)};
  604. INSTANTIATE_TEST_CASE_P(
  605. TwoWriteQueuesPointLockManager, SnapshotConcurrentAccessTest,
  606. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  607. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
  608. TwoWriteQueue_SnapshotConcurrentAccessTest_Params)));
  609. constexpr std::array OneWriteQueue_SnapshotConcurrentAccessTest_Params = {
  610. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
  611. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
  612. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
  613. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
  614. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
  615. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
  616. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
  617. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
  618. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
  619. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
  620. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
  621. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
  622. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
  623. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
  624. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
  625. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
  626. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
  627. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
  628. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
  629. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20),
  630. };
  631. INSTANTIATE_TEST_CASE_P(
  632. OneWriteQueue, SnapshotConcurrentAccessTest,
  633. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  634. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
  635. OneWriteQueue_SnapshotConcurrentAccessTest_Params)));
  636. constexpr std::array TwoWriteQueues_SeqAdvanceConcurrentTest_Params = {
  637. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
  638. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
  639. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
  640. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
  641. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
  642. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
  643. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
  644. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
  645. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
  646. std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
  647. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
  648. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
  649. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
  650. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
  651. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
  652. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
  653. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
  654. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
  655. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
  656. std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)};
  657. INSTANTIATE_TEST_CASE_P(
  658. TwoWriteQueues, SeqAdvanceConcurrentTest,
  659. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  660. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
  661. TwoWriteQueues_SeqAdvanceConcurrentTest_Params)));
  662. constexpr std::array OneWriteQueue_SeqAdvanceConcurrentTest_Params = {
  663. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
  664. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
  665. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
  666. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
  667. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
  668. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
  669. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
  670. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
  671. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
  672. std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)};
  673. INSTANTIATE_TEST_CASE_P(
  674. OneWriteQueue, SeqAdvanceConcurrentTest,
  675. ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
  676. WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
  677. OneWriteQueue_SeqAdvanceConcurrentTest_Params)));
  678. #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  679. TEST_P(WritePreparedTransactionTest, CommitMap) {
  680. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  681. ASSERT_NE(wp_db, nullptr);
  682. ASSERT_NE(wp_db->db_impl_, nullptr);
  683. size_t size = wp_db->COMMIT_CACHE_SIZE;
  684. CommitEntry c = {5, 12}, e;
  685. bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
  686. ASSERT_FALSE(evicted);
  687. // Should be able to read the same value
  688. CommitEntry64b dont_care;
  689. bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
  690. ASSERT_TRUE(found);
  691. ASSERT_EQ(c, e);
  692. // Should be able to distinguish between overlapping entries
  693. found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
  694. ASSERT_TRUE(found);
  695. ASSERT_NE(c.prep_seq + size, e.prep_seq);
  696. // Should be able to detect non-existent entry
  697. found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
  698. ASSERT_FALSE(found);
  699. // Reject an invalid exchange
  700. CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
  701. CommitEntry64b e2_64b(e2, wp_db->FORMAT);
  702. bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
  703. ASSERT_FALSE(exchanged);
  704. // check whether it did actually reject that
  705. found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
  706. ASSERT_TRUE(found);
  707. ASSERT_EQ(c, e);
  708. // Accept a valid exchange
  709. CommitEntry64b c_64b(c, wp_db->FORMAT);
  710. CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
  711. exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
  712. ASSERT_TRUE(exchanged);
  713. // check whether it did actually accepted that
  714. found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
  715. ASSERT_TRUE(found);
  716. ASSERT_EQ(e3, e);
  717. // Rewrite an entry
  718. CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
  719. evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
  720. ASSERT_TRUE(evicted);
  721. ASSERT_EQ(e3, e);
  722. found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
  723. ASSERT_TRUE(found);
  724. ASSERT_EQ(e4, e);
  725. }
  726. TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
  727. // If prepare <= snapshot < commit we should keep the entry around since its
  728. // nonexistence could be interpreted as committed in the snapshot while it is
  729. // not true. We keep such entries around by adding them to the
  730. // old_commit_map_.
  731. uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
  732. p = 10l, c = 15l, s = 20l, ns = 21l;
  733. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  734. // If we do not expect the old commit map to be updated, try also with a next
  735. // snapshot that is expected to update the old commit map. This would test
  736. // that MaybeUpdateOldCommitMap would not prevent us from checking the next
  737. // snapshot that must be checked.
  738. p = 10l, c = 15l, s = 20l, ns = 11l;
  739. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  740. p = 10l, c = 20l, s = 20l, ns = 19l;
  741. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  742. p = 10l, c = 20l, s = 20l, ns = 21l;
  743. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  744. p = 20l, c = 20l, s = 20l, ns = 21l;
  745. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  746. p = 20l, c = 20l, s = 20l, ns = 19l;
  747. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  748. p = 10l, c = 25l, s = 20l, ns = 21l;
  749. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
  750. p = 20l, c = 25l, s = 20l, ns = 21l;
  751. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
  752. p = 21l, c = 25l, s = 20l, ns = 22l;
  753. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  754. p = 21l, c = 25l, s = 20l, ns = 19l;
  755. MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
  756. }
  757. // Trigger the condition where some old memtables are skipped when doing
  758. // TransactionUtil::CheckKey(), and make sure the result is still correct.
  759. TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
  760. const int kAttemptHistoryMemtable = 0;
  761. const int kAttemptImmMemTable = 1;
  762. for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
  763. attempt++) {
  764. options.max_write_buffer_size_to_maintain =
  765. 3 * static_cast<int>(options.write_buffer_size);
  766. ASSERT_OK(ReOpen());
  767. WriteOptions write_options;
  768. ReadOptions read_options;
  769. TransactionOptions txn_options;
  770. txn_options.set_snapshot = true;
  771. string value;
  772. ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
  773. ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
  774. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  775. ASSERT_TRUE(txn != nullptr);
  776. ASSERT_OK(txn->SetName("txn"));
  777. Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
  778. ASSERT_TRUE(txn2 != nullptr);
  779. ASSERT_OK(txn2->SetName("txn2"));
  780. // This transaction is created to cause potential conflict.
  781. Transaction* txn_x = db->BeginTransaction(write_options);
  782. ASSERT_OK(txn_x->SetName("txn_x"));
  783. ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
  784. ASSERT_OK(txn_x->Prepare());
  785. // Create snapshots after the prepare, but there should still
  786. // be a conflict when trying to read "foo".
  787. if (attempt == kAttemptImmMemTable) {
  788. // For the second attempt, hold flush from beginning. The memtable
  789. // will be switched to immutable after calling TEST_SwitchMemtable()
  790. // while CheckKey() is called.
  791. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  792. {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
  793. "FlushJob::Start"}});
  794. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  795. }
  796. // force a memtable flush. The memtable should still be kept
  797. FlushOptions flush_ops;
  798. if (attempt == kAttemptHistoryMemtable) {
  799. ASSERT_OK(db->Flush(flush_ops));
  800. } else {
  801. ASSERT_EQ(attempt, kAttemptImmMemTable);
  802. DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
  803. ASSERT_OK(db_impl->TEST_SwitchMemtable());
  804. }
  805. uint64_t num_imm_mems;
  806. ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
  807. &num_imm_mems));
  808. if (attempt == kAttemptHistoryMemtable) {
  809. ASSERT_EQ(0, num_imm_mems);
  810. } else {
  811. ASSERT_EQ(attempt, kAttemptImmMemTable);
  812. ASSERT_EQ(1, num_imm_mems);
  813. }
  814. // Put something in active memtable
  815. ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
  816. // Create txn3 after flushing, but this transaction also needs to
  817. // check all memtables because of they contains uncommitted data.
  818. Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
  819. ASSERT_TRUE(txn3 != nullptr);
  820. ASSERT_OK(txn3->SetName("txn3"));
  821. // Commit the pending write
  822. ASSERT_OK(txn_x->Commit());
  823. // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
  824. // pass. In all cases, both memtables are queried.
  825. SetPerfLevel(PerfLevel::kEnableCount);
  826. get_perf_context()->Reset();
  827. ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
  828. // We should have checked two memtables, active and either immutable
  829. // or history memtable, depending on the test case.
  830. ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
  831. get_perf_context()->Reset();
  832. ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
  833. // We should have checked two memtables, active and either immutable
  834. // or history memtable, depending on the test case.
  835. ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
  836. get_perf_context()->Reset();
  837. ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
  838. ASSERT_EQ(value, "bar");
  839. // We should have checked two memtables, and since there is no
  840. // conflict, another Get() will be made and fetch the data from
  841. // DB. If it is in immutable memtable, two extra memtable reads
  842. // will be issued. If it is not (in history), only one will
  843. // be made, which is to the active memtable.
  844. if (attempt == kAttemptHistoryMemtable) {
  845. ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
  846. } else {
  847. ASSERT_EQ(attempt, kAttemptImmMemTable);
  848. ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
  849. }
  850. Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
  851. ASSERT_TRUE(txn4 != nullptr);
  852. ASSERT_OK(txn4->SetName("txn4"));
  853. get_perf_context()->Reset();
  854. ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
  855. if (attempt == kAttemptHistoryMemtable) {
  856. // Active memtable will be checked in snapshot validation and when
  857. // getting the value.
  858. ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
  859. } else {
  860. // Only active memtable will be checked in snapshot validation but
  861. // both of active and immutable snapshot will be queried when
  862. // getting the value.
  863. ASSERT_EQ(attempt, kAttemptImmMemTable);
  864. ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
  865. }
  866. ASSERT_OK(txn2->Commit());
  867. ASSERT_OK(txn4->Commit());
  868. TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
  869. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  870. SetPerfLevel(PerfLevel::kDisable);
  871. delete txn;
  872. delete txn2;
  873. delete txn3;
  874. delete txn4;
  875. delete txn_x;
  876. }
  877. }
  878. // Reproduce the bug with two snapshots with the same seuqence number and test
  879. // that the release of the first snapshot will not affect the reads by the other
  880. // snapshot
  881. TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
  882. TransactionOptions txn_options;
  883. Status s;
  884. // Insert initial value
  885. ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
  886. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  887. Transaction* txn =
  888. wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
  889. ASSERT_OK(txn->SetName("txn"));
  890. ASSERT_OK(txn->Put("key", "value2"));
  891. ASSERT_OK(txn->Prepare());
  892. // Three snapshots with the same seq number
  893. const Snapshot* snapshot0 = wp_db->GetSnapshot();
  894. const Snapshot* snapshot1 = wp_db->GetSnapshot();
  895. const Snapshot* snapshot2 = wp_db->GetSnapshot();
  896. ASSERT_OK(txn->Commit());
  897. SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
  898. SequenceNumber overlap_seq = txn->GetId() + cache_size;
  899. delete txn;
  900. // 4th snapshot with a larger seq
  901. const Snapshot* snapshot3 = wp_db->GetSnapshot();
  902. // Cause an eviction to advance max evicted seq number
  903. // This also fetches the 4 snapshots from db since their seq is lower than the
  904. // new max
  905. wp_db->AddCommitted(overlap_seq, overlap_seq);
  906. ReadOptions ropt;
  907. // It should see the value before commit
  908. ropt.snapshot = snapshot2;
  909. PinnableSlice pinnable_val;
  910. s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
  911. ASSERT_OK(s);
  912. ASSERT_TRUE(pinnable_val == "value1");
  913. pinnable_val.Reset();
  914. wp_db->ReleaseSnapshot(snapshot1);
  915. // It should still see the value before commit
  916. s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
  917. ASSERT_OK(s);
  918. ASSERT_TRUE(pinnable_val == "value1");
  919. pinnable_val.Reset();
  920. // Cause an eviction to advance max evicted seq number and trigger updating
  921. // the snapshot list
  922. overlap_seq += cache_size;
  923. wp_db->AddCommitted(overlap_seq, overlap_seq);
  924. // It should still see the value before commit
  925. s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
  926. ASSERT_OK(s);
  927. ASSERT_TRUE(pinnable_val == "value1");
  928. pinnable_val.Reset();
  929. wp_db->ReleaseSnapshot(snapshot0);
  930. wp_db->ReleaseSnapshot(snapshot2);
  931. wp_db->ReleaseSnapshot(snapshot3);
  932. }
  933. size_t UniqueCnt(std::vector<SequenceNumber> vec) {
  934. std::set<SequenceNumber> aset;
  935. for (auto i : vec) {
  936. aset.insert(i);
  937. }
  938. return aset.size();
  939. }
  940. // Test that the entries in old_commit_map_ get garbage collected properly
  941. TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
  942. const size_t snapshot_cache_bits = 0;
  943. const size_t commit_cache_bits = 0;
  944. DBImpl* mock_db = new DBImpl(options, dbname);
  945. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  946. std::unique_ptr<WritePreparedTxnDBMock> wp_db(
  947. new WritePreparedTxnDBMock(mock_db, txn_db_options));
  948. SequenceNumber seq = 0;
  949. // Take the first snapshot that overlaps with two txn
  950. auto prep_seq = ++seq;
  951. wp_db->AddPrepared(prep_seq);
  952. auto prep_seq2 = ++seq;
  953. wp_db->AddPrepared(prep_seq2);
  954. auto snap_seq1 = seq;
  955. wp_db->TakeSnapshot(snap_seq1);
  956. auto commit_seq = ++seq;
  957. wp_db->AddCommitted(prep_seq, commit_seq);
  958. wp_db->RemovePrepared(prep_seq);
  959. auto commit_seq2 = ++seq;
  960. wp_db->AddCommitted(prep_seq2, commit_seq2);
  961. wp_db->RemovePrepared(prep_seq2);
  962. // Take the 2nd and 3rd snapshot that overlap with the same txn
  963. prep_seq = ++seq;
  964. wp_db->AddPrepared(prep_seq);
  965. auto snap_seq2 = seq;
  966. wp_db->TakeSnapshot(snap_seq2);
  967. seq++;
  968. auto snap_seq3 = seq;
  969. wp_db->TakeSnapshot(snap_seq3);
  970. seq++;
  971. commit_seq = ++seq;
  972. wp_db->AddCommitted(prep_seq, commit_seq);
  973. wp_db->RemovePrepared(prep_seq);
  974. // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
  975. // only item in the commit_cache_ via another commit.
  976. prep_seq = ++seq;
  977. wp_db->AddPrepared(prep_seq);
  978. commit_seq = ++seq;
  979. wp_db->AddCommitted(prep_seq, commit_seq);
  980. wp_db->RemovePrepared(prep_seq);
  981. // Verify that the evicted commit entries for all snapshots are in the
  982. // old_commit_map_
  983. {
  984. ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
  985. ReadLock rl(&wp_db->old_commit_map_mutex_);
  986. ASSERT_EQ(3, wp_db->old_commit_map_.size());
  987. ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
  988. ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
  989. ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
  990. }
  991. // Verify that the 2nd snapshot is cleaned up after the release
  992. wp_db->ReleaseSnapshotInternal(snap_seq2);
  993. {
  994. ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
  995. ReadLock rl(&wp_db->old_commit_map_mutex_);
  996. ASSERT_EQ(2, wp_db->old_commit_map_.size());
  997. ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
  998. ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
  999. }
  1000. // Verify that the 1st snapshot is cleaned up after the release
  1001. wp_db->ReleaseSnapshotInternal(snap_seq1);
  1002. {
  1003. ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
  1004. ReadLock rl(&wp_db->old_commit_map_mutex_);
  1005. ASSERT_EQ(1, wp_db->old_commit_map_.size());
  1006. ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
  1007. }
  1008. // Verify that the 3rd snapshot is cleaned up after the release
  1009. wp_db->ReleaseSnapshotInternal(snap_seq3);
  1010. {
  1011. ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
  1012. ReadLock rl(&wp_db->old_commit_map_mutex_);
  1013. ASSERT_EQ(0, wp_db->old_commit_map_.size());
  1014. }
  1015. }
  1016. TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
  1017. std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
  1018. 600l, 700l, 800l, 900l};
  1019. const size_t snapshot_cache_bits = 2;
  1020. const uint64_t cache_size = 1ul << snapshot_cache_bits;
  1021. // Safety check to express the intended size in the test. Can be adjusted if
  1022. // the snapshots lists changed.
  1023. ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 1, snapshots.size());
  1024. DBImpl* mock_db = new DBImpl(options, dbname);
  1025. UpdateTransactionDBOptions(snapshot_cache_bits);
  1026. std::unique_ptr<WritePreparedTxnDBMock> wp_db(
  1027. new WritePreparedTxnDBMock(mock_db, txn_db_options));
  1028. SequenceNumber version = 1000l;
  1029. ASSERT_EQ(0, wp_db->snapshots_total_);
  1030. wp_db->UpdateSnapshots(snapshots, version);
  1031. ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
  1032. // seq numbers are chosen so that we have two of them between each two
  1033. // snapshots. If the diff of two consecutive seq is more than 5, there is a
  1034. // snapshot between them.
  1035. std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
  1036. 355l, 450l, 455l, 550l, 555l, 650l, 655l,
  1037. 750l, 755l, 850l, 855l, 950l, 955l};
  1038. ASSERT_GT(seqs.size(), 1);
  1039. for (size_t i = 0; i + 1 < seqs.size(); i++) {
  1040. wp_db->old_commit_map_empty_ = true; // reset
  1041. CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
  1042. wp_db->CheckAgainstSnapshots(commit_entry);
  1043. // Expect update if there is snapshot in between the prepare and commit
  1044. bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
  1045. commit_entry.commit_seq >= snapshots.front() &&
  1046. commit_entry.prep_seq <= snapshots.back();
  1047. ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
  1048. }
  1049. // Test that search will include multiple snapshot from snapshot cache
  1050. {
  1051. // exclude first and last item in the cache
  1052. CommitEntry commit_entry = {snapshots.front() + 1,
  1053. snapshots[cache_size - 1] - 1};
  1054. wp_db->old_commit_map_empty_ = true; // reset
  1055. wp_db->old_commit_map_.clear();
  1056. wp_db->CheckAgainstSnapshots(commit_entry);
  1057. ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
  1058. }
  1059. // Test that search will include multiple snapshot from old snapshots
  1060. {
  1061. // include two in the middle
  1062. CommitEntry commit_entry = {snapshots[cache_size] + 1,
  1063. snapshots[cache_size + 2] + 1};
  1064. wp_db->old_commit_map_empty_ = true; // reset
  1065. wp_db->old_commit_map_.clear();
  1066. wp_db->CheckAgainstSnapshots(commit_entry);
  1067. ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
  1068. }
  1069. // Test that search will include both snapshot cache and old snapshots
  1070. // Case 1: includes all in snapshot cache
  1071. {
  1072. CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
  1073. wp_db->old_commit_map_empty_ = true; // reset
  1074. wp_db->old_commit_map_.clear();
  1075. wp_db->CheckAgainstSnapshots(commit_entry);
  1076. ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
  1077. }
  1078. // Case 2: includes all snapshot caches except the smallest
  1079. {
  1080. CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
  1081. wp_db->old_commit_map_empty_ = true; // reset
  1082. wp_db->old_commit_map_.clear();
  1083. wp_db->CheckAgainstSnapshots(commit_entry);
  1084. ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
  1085. }
  1086. // Case 3: includes only the largest of snapshot cache
  1087. {
  1088. CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
  1089. snapshots.back() + 1};
  1090. wp_db->old_commit_map_empty_ = true; // reset
  1091. wp_db->old_commit_map_.clear();
  1092. wp_db->CheckAgainstSnapshots(commit_entry);
  1093. ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
  1094. }
  1095. }
  1096. #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  1097. // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
  1098. // parallel with UpdateSnapshots.
  1099. TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
  1100. // We have a sync point in the method under test after checking each snapshot.
  1101. // If you increase the max number of snapshots in this test, more sync points
  1102. // in the methods must also be added.
  1103. const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
  1104. 60l, 70l, 80l, 90l, 100l};
  1105. const size_t snapshot_cache_bits = 2;
  1106. // Safety check to express the intended size in the test. Can be adjusted if
  1107. // the snapshots lists changed.
  1108. ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 2, snapshots.size());
  1109. SequenceNumber version = 1000l;
  1110. // Choose the cache size so that the new snapshot list could replace all the
  1111. // existing items in the cache and also have some overflow.
  1112. DBImpl* mock_db = new DBImpl(options, dbname);
  1113. UpdateTransactionDBOptions(snapshot_cache_bits);
  1114. std::unique_ptr<WritePreparedTxnDBMock> wp_db(
  1115. new WritePreparedTxnDBMock(mock_db, txn_db_options));
  1116. const size_t extra = 2;
  1117. size_t loop_id = 0;
  1118. // Add up to extra items that do not fit into the cache
  1119. for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
  1120. old_size++) {
  1121. const std::vector<SequenceNumber> old_snapshots(
  1122. snapshots.begin(), snapshots.begin() + old_size);
  1123. // Each member of old snapshot might or might not appear in the new list. We
  1124. // create a common_snapshots for each combination.
  1125. size_t new_comb_cnt = size_t(1) << old_size;
  1126. for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
  1127. if (loop_id % split_cnt_ != split_id_) {
  1128. continue;
  1129. }
  1130. printf("."); // To signal progress
  1131. fflush(stdout);
  1132. std::vector<SequenceNumber> common_snapshots;
  1133. for (size_t i = 0; i < old_snapshots.size(); i++) {
  1134. if (IsInCombination(i, new_comb)) {
  1135. common_snapshots.push_back(old_snapshots[i]);
  1136. }
  1137. }
  1138. // And add some new snapshots to the common list
  1139. for (size_t added_snapshots = 0;
  1140. added_snapshots <= snapshots.size() - old_snapshots.size();
  1141. added_snapshots++) {
  1142. std::vector<SequenceNumber> new_snapshots = common_snapshots;
  1143. for (size_t i = 0; i < added_snapshots; i++) {
  1144. new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
  1145. }
  1146. for (auto it = common_snapshots.begin(); it != common_snapshots.end();
  1147. ++it) {
  1148. auto snapshot = *it;
  1149. // Create a commit entry that is around the snapshot and thus should
  1150. // be not be discarded
  1151. CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
  1152. snapshot + 1};
  1153. // The critical part is when iterating the snapshot cache. Afterwards,
  1154. // we are operating under the lock
  1155. size_t a_range =
  1156. std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
  1157. size_t b_range =
  1158. std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
  1159. // Break each thread at two points
  1160. for (size_t a1 = 1; a1 <= a_range; a1++) {
  1161. for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
  1162. for (size_t b1 = 1; b1 <= b_range; b1++) {
  1163. for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
  1164. SnapshotConcurrentAccessTestInternal(
  1165. wp_db.get(), old_snapshots, new_snapshots, entry, version,
  1166. a1, a2, b1, b2);
  1167. }
  1168. }
  1169. }
  1170. }
  1171. }
  1172. }
  1173. }
  1174. }
  1175. printf("\n");
  1176. }
  1177. #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  1178. // This test clarifies the contract of AdvanceMaxEvictedSeq method
  1179. TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
  1180. DBImpl* mock_db = new DBImpl(options, dbname);
  1181. std::unique_ptr<WritePreparedTxnDBMock> wp_db(
  1182. new WritePreparedTxnDBMock(mock_db, txn_db_options));
  1183. // 1. Set the initial values for max, prepared, and snapshots
  1184. SequenceNumber zero_max = 0l;
  1185. // Set the initial list of prepared txns
  1186. const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
  1187. 150, 200, 250};
  1188. for (auto p : initial_prepared) {
  1189. wp_db->AddPrepared(p);
  1190. }
  1191. // This updates the max value and also set old prepared
  1192. SequenceNumber init_max = 100;
  1193. wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
  1194. const std::vector<SequenceNumber> initial_snapshots = {20, 40};
  1195. wp_db->SetDBSnapshots(initial_snapshots);
  1196. // This will update the internal cache of snapshots from the DB
  1197. wp_db->UpdateSnapshots(initial_snapshots, init_max);
  1198. // 2. Invoke AdvanceMaxEvictedSeq
  1199. const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
  1200. wp_db->SetDBSnapshots(latest_snapshots);
  1201. SequenceNumber new_max = 200;
  1202. wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
  1203. // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
  1204. // a. max should be updated to new_max
  1205. ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
  1206. // b. delayed prepared should contain every txn <= max and prepared should
  1207. // only contain txns > max
  1208. auto it = initial_prepared.begin();
  1209. for (; it != initial_prepared.end() && *it <= new_max; ++it) {
  1210. ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
  1211. }
  1212. ASSERT_TRUE(wp_db->delayed_prepared_.empty());
  1213. for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
  1214. ++it, wp_db->prepared_txns_.pop()) {
  1215. ASSERT_EQ(*it, wp_db->prepared_txns_.top());
  1216. }
  1217. ASSERT_TRUE(it == initial_prepared.end());
  1218. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  1219. // c. snapshots should contain everything below new_max
  1220. auto sit = latest_snapshots.begin();
  1221. for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
  1222. i < wp_db->snapshots_total_;
  1223. sit++, i++) {
  1224. ASSERT_TRUE(i < wp_db->snapshots_total_);
  1225. // This test is in small scale and the list of snapshots are assumed to be
  1226. // within the cache size limit. This is just a safety check to double check
  1227. // that assumption.
  1228. ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
  1229. ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
  1230. }
  1231. }
  1232. // A new snapshot should always be always larger than max_evicted_seq_
  1233. // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
  1234. TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
  1235. WriteOptions woptions;
  1236. TransactionOptions txn_options;
  1237. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1238. Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
  1239. ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
  1240. ASSERT_OK(txn0->Commit());
  1241. const SequenceNumber seq = txn0->GetId(); // is also prepare seq
  1242. delete txn0;
  1243. std::vector<Transaction*> txns;
  1244. // Inc seq without committing anything
  1245. for (int i = 0; i < 10; i++) {
  1246. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  1247. ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
  1248. ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
  1249. ASSERT_OK(txn->Prepare());
  1250. txns.push_back(txn);
  1251. }
  1252. // The new commit is seq + 10
  1253. ASSERT_OK(db->Put(woptions, "key", "value"));
  1254. auto snap = wp_db->GetSnapshot();
  1255. const SequenceNumber last_seq = snap->GetSequenceNumber();
  1256. wp_db->ReleaseSnapshot(snap);
  1257. ASSERT_LT(seq, last_seq);
  1258. // Otherwise our test is not effective
  1259. ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
  1260. // Evict seq out of commit cache
  1261. const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
  1262. // Check that the next write could make max go beyond last
  1263. auto last_max = wp_db->max_evicted_seq_.load();
  1264. wp_db->AddCommitted(overwrite_seq, overwrite_seq);
  1265. // Check that eviction has advanced the max
  1266. ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
  1267. // Check that the new max has not advanced the last seq
  1268. ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
  1269. for (auto txn : txns) {
  1270. ASSERT_OK(txn->Rollback());
  1271. delete txn;
  1272. }
  1273. }
  1274. // A new snapshot should always be always larger than max_evicted_seq_
  1275. // In very rare cases max could be below last published seq. Test that
  1276. // taking snapshot will wait for max to catch up.
  1277. TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
  1278. const size_t snapshot_cache_bits = 7; // same as default
  1279. const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
  1280. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1281. ASSERT_OK(ReOpen());
  1282. WriteOptions woptions;
  1283. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1284. const int writes = 50;
  1285. const int batch_cnt = 4;
  1286. ROCKSDB_NAMESPACE::port::Thread t1([&]() {
  1287. for (int i = 0; i < writes; i++) {
  1288. WriteBatch batch;
  1289. // For duplicate keys cause 4 commit entries, each evicting an entry that
  1290. // is not published yet, thus causing max evicted seq go higher than last
  1291. // published.
  1292. for (int b = 0; b < batch_cnt; b++) {
  1293. ASSERT_OK(batch.Put("foo", "foo"));
  1294. }
  1295. ASSERT_OK(db->Write(woptions, &batch));
  1296. }
  1297. });
  1298. ROCKSDB_NAMESPACE::port::Thread t2([&]() {
  1299. while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
  1300. std::this_thread::yield();
  1301. }
  1302. for (int i = 0; i < 10; i++) {
  1303. SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
  1304. auto snap = db->GetSnapshot();
  1305. if (snap->GetSequenceNumber() != 0) {
  1306. // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
  1307. // compare with the lower bound instead as an approximation.
  1308. ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
  1309. } // seq 0 is ok to be less than max since nothing is visible to it
  1310. db->ReleaseSnapshot(snap);
  1311. }
  1312. });
  1313. t1.join();
  1314. t2.join();
  1315. // Make sure that the test has worked and seq number has advanced as we
  1316. // thought
  1317. auto snap = db->GetSnapshot();
  1318. ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
  1319. db->ReleaseSnapshot(snap);
  1320. }
  1321. // Test that reads without snapshots would not hit an undefined state
  1322. TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
  1323. const size_t snapshot_cache_bits = 7; // same as default
  1324. const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
  1325. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1326. ASSERT_OK(ReOpen());
  1327. WriteOptions woptions;
  1328. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1329. const int writes = 50;
  1330. ROCKSDB_NAMESPACE::port::Thread t1([&]() {
  1331. for (int i = 0; i < writes; i++) {
  1332. WriteBatch batch;
  1333. ASSERT_OK(batch.Put("key", "foo"));
  1334. ASSERT_OK(db->Write(woptions, &batch));
  1335. }
  1336. });
  1337. ROCKSDB_NAMESPACE::port::Thread t2([&]() {
  1338. while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
  1339. std::this_thread::yield();
  1340. }
  1341. ReadOptions ropt;
  1342. PinnableSlice pinnable_val;
  1343. TransactionOptions txn_options;
  1344. for (int i = 0; i < 10; i++) {
  1345. auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
  1346. ASSERT_TRUE(s.ok() || s.IsTryAgain());
  1347. pinnable_val.Reset();
  1348. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  1349. s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
  1350. ASSERT_TRUE(s.ok() || s.IsTryAgain());
  1351. pinnable_val.Reset();
  1352. std::vector<std::string> values;
  1353. auto s_vec =
  1354. txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
  1355. ASSERT_EQ(1, values.size());
  1356. ASSERT_EQ(1, s_vec.size());
  1357. s = s_vec[0];
  1358. ASSERT_TRUE(s.ok() || s.IsTryAgain());
  1359. Slice key("key");
  1360. txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
  1361. true);
  1362. ASSERT_TRUE(s.ok() || s.IsTryAgain());
  1363. delete txn;
  1364. }
  1365. });
  1366. t1.join();
  1367. t2.join();
  1368. // Make sure that the test has worked and seq number has advanced as we
  1369. // thought
  1370. auto snap = db->GetSnapshot();
  1371. ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
  1372. db->ReleaseSnapshot(snap);
  1373. }
  1374. // Check that old_commit_map_ cleanup works correctly if the snapshot equals
  1375. // max_evicted_seq_.
  1376. TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
  1377. const size_t snapshot_cache_bits = 7; // same as default
  1378. const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
  1379. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1380. ASSERT_OK(ReOpen());
  1381. WriteOptions woptions;
  1382. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1383. // Insert something to increase seq
  1384. ASSERT_OK(db->Put(woptions, "key", "value"));
  1385. auto snap = db->GetSnapshot();
  1386. auto snap_seq = snap->GetSequenceNumber();
  1387. // Another insert should trigger eviction + load snapshot from db
  1388. ASSERT_OK(db->Put(woptions, "key", "value"));
  1389. // This is the scenario that we check agaisnt
  1390. ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
  1391. // old_commit_map_ now has some data that needs gc
  1392. ASSERT_EQ(1, wp_db->snapshots_total_);
  1393. ASSERT_EQ(1, wp_db->old_commit_map_.size());
  1394. db->ReleaseSnapshot(snap);
  1395. // Another insert should trigger eviction + load snapshot from db
  1396. ASSERT_OK(db->Put(woptions, "key", "value"));
  1397. // the snapshot and related metadata must be properly garbage collected
  1398. ASSERT_EQ(0, wp_db->snapshots_total_);
  1399. ASSERT_TRUE(wp_db->snapshots_all_.empty());
  1400. ASSERT_EQ(0, wp_db->old_commit_map_.size());
  1401. }
  1402. TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
  1403. auto snap = db->GetSnapshot();
  1404. auto seq1 = snap->GetSequenceNumber();
  1405. db->ReleaseSnapshot(snap);
  1406. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1407. wp_db->AdvanceSeqByOne();
  1408. snap = db->GetSnapshot();
  1409. auto seq2 = snap->GetSequenceNumber();
  1410. db->ReleaseSnapshot(snap);
  1411. ASSERT_LT(seq1, seq2);
  1412. }
  1413. // Test that the txn Initilize calls the overridden functions
  1414. TEST_P(WritePreparedTransactionTest, TxnInitialize) {
  1415. TransactionOptions txn_options;
  1416. WriteOptions write_options;
  1417. ASSERT_OK(db->Put(write_options, "key", "value"));
  1418. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  1419. ASSERT_OK(txn0->SetName("xid"));
  1420. ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
  1421. ASSERT_OK(txn0->Prepare());
  1422. // SetSnapshot is overridden to update min_uncommitted_
  1423. txn_options.set_snapshot = true;
  1424. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  1425. auto snap = txn1->GetSnapshot();
  1426. auto snap_impl = static_cast<const SnapshotImpl*>(snap);
  1427. // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
  1428. // udpated
  1429. ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
  1430. ASSERT_OK(txn0->Rollback());
  1431. ASSERT_OK(txn1->Rollback());
  1432. delete txn0;
  1433. delete txn1;
  1434. }
  1435. // This tests that transactions with duplicate keys perform correctly after max
  1436. // is advancing their prepared sequence numbers. This will not be the case if
  1437. // for example the txn does not add the prepared seq for the second sub-batch to
  1438. // the PreparedHeap structure.
  1439. TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
  1440. const size_t snapshot_cache_bits = 7; // same as default
  1441. const size_t commit_cache_bits = 1; // disable commit cache
  1442. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1443. ASSERT_OK(ReOpen());
  1444. ReadOptions ropt;
  1445. PinnableSlice pinnable_val;
  1446. WriteOptions write_options;
  1447. TransactionOptions txn_options;
  1448. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  1449. ASSERT_OK(txn0->SetName("xid"));
  1450. ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
  1451. ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
  1452. ASSERT_OK(txn0->Prepare());
  1453. ASSERT_OK(db->Put(write_options, "key2", "value"));
  1454. // Will cause max advance due to disabled commit cache
  1455. ASSERT_OK(db->Put(write_options, "key3", "value"));
  1456. auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
  1457. ASSERT_TRUE(s.IsNotFound());
  1458. delete txn0;
  1459. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1460. ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
  1461. wp_db->TEST_Crash();
  1462. ASSERT_OK(ReOpenNoDelete());
  1463. ASSERT_NE(db, nullptr);
  1464. s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
  1465. ASSERT_TRUE(s.IsNotFound());
  1466. txn0 = db->GetTransactionByName("xid");
  1467. ASSERT_OK(txn0->Rollback());
  1468. delete txn0;
  1469. }
  1470. #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  1471. // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
  1472. // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
  1473. // which moves prepared txns from prepared_txns_ to delayed_prepared_.
  1474. TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
  1475. const size_t snapshot_cache_bits = 7; // same as default
  1476. const size_t commit_cache_bits = 1; // disable commit cache
  1477. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1478. ASSERT_OK(ReOpen());
  1479. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1480. ReadOptions ropt;
  1481. PinnableSlice pinnable_val;
  1482. WriteOptions write_options;
  1483. TransactionOptions txn_options;
  1484. std::vector<Transaction*> txns, committed_txns;
  1485. const int cnt = 100;
  1486. for (int i = 0; i < cnt; i++) {
  1487. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  1488. ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
  1489. auto key = "key1" + std::to_string(i);
  1490. auto value = "value1" + std::to_string(i);
  1491. ASSERT_OK(txn->Put(Slice(key), Slice(value)));
  1492. ASSERT_OK(txn->Prepare());
  1493. txns.push_back(txn);
  1494. }
  1495. port::Mutex mutex;
  1496. Random rnd(1103);
  1497. ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
  1498. for (int i = 0; i < cnt; i++) {
  1499. uint32_t index = rnd.Uniform(cnt - i);
  1500. Transaction* txn;
  1501. {
  1502. MutexLock l(&mutex);
  1503. txn = txns[index];
  1504. txns.erase(txns.begin() + index);
  1505. }
  1506. // Since commit cache is practically disabled, commit results in immediate
  1507. // advance in max_evicted_seq_ and subsequently moving some prepared txns
  1508. // to delayed_prepared_.
  1509. ASSERT_OK(txn->Commit());
  1510. committed_txns.push_back(txn);
  1511. }
  1512. });
  1513. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  1514. while (true) {
  1515. MutexLock l(&mutex);
  1516. if (txns.empty()) {
  1517. break;
  1518. }
  1519. auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
  1520. ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
  1521. }
  1522. });
  1523. commit_thread.join();
  1524. read_thread.join();
  1525. for (auto txn : committed_txns) {
  1526. delete txn;
  1527. }
  1528. }
  1529. #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
  1530. TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
  1531. // Given the sequential run of txns, with this timeout we should never see a
  1532. // deadlock nor a timeout unless we have a key conflict, which should be
  1533. // almost infeasible.
  1534. txn_db_options.transaction_lock_timeout = 1000;
  1535. txn_db_options.default_lock_timeout = 1000;
  1536. ASSERT_OK(ReOpen());
  1537. FlushOptions fopt;
  1538. // Number of different txn types we use in this test
  1539. const size_t type_cnt = 5;
  1540. // The size of the first write group
  1541. // TODO(myabandeh): This should be increase for pre-release tests
  1542. const size_t first_group_size = 2;
  1543. // Total number of txns we run in each test
  1544. // TODO(myabandeh): This should be increase for pre-release tests
  1545. const size_t txn_cnt = first_group_size + 1;
  1546. size_t base[txn_cnt + 1] = {
  1547. 1,
  1548. };
  1549. for (size_t bi = 1; bi <= txn_cnt; bi++) {
  1550. base[bi] = base[bi - 1] * type_cnt;
  1551. }
  1552. const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
  1553. printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
  1554. for (size_t n = 0; n < max_n; n++) {
  1555. if (n > 0) {
  1556. ASSERT_OK(ReOpen());
  1557. }
  1558. if (n % split_cnt_ != split_id_) {
  1559. continue;
  1560. }
  1561. if (n % 1000 == 0) {
  1562. printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
  1563. }
  1564. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  1565. auto seq = db_impl->TEST_GetLastVisibleSequence();
  1566. with_empty_commits = 0;
  1567. exp_seq = seq;
  1568. // This is increased before writing the batch for commit
  1569. commit_writes = 0;
  1570. // This is increased before txn starts linking if it expects to do a commit
  1571. // eventually
  1572. expected_commits = 0;
  1573. std::vector<port::Thread> threads;
  1574. linked.store(0, std::memory_order_release);
  1575. std::atomic<bool> batch_formed(false);
  1576. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1577. "WriteThread::EnterAsBatchGroupLeader:End",
  1578. [&](void* /*arg*/) { batch_formed = true; });
  1579. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1580. "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
  1581. size_t orig_linked = linked.fetch_add(1, std::memory_order_acq_rel);
  1582. if (orig_linked == 0) {
  1583. // Wait until the others are linked too.
  1584. while (linked.load(std::memory_order_acquire) < first_group_size) {
  1585. }
  1586. } else if (orig_linked == first_group_size) {
  1587. // Make the 2nd batch of the rest of writes plus any followup
  1588. // commits from the first batch
  1589. while (linked.load(std::memory_order_acquire) <
  1590. txn_cnt + commit_writes) {
  1591. }
  1592. }
  1593. // Then we will have one or more batches consisting of follow-up
  1594. // commits from the 2nd batch. There is a bit of non-determinism here
  1595. // but it should be tolerable.
  1596. });
  1597. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1598. for (size_t bi = 0; bi < txn_cnt; bi++) {
  1599. // get the bi-th digit in number system based on type_cnt
  1600. size_t d = (n % base[bi + 1]) / base[bi];
  1601. switch (d) {
  1602. case 0:
  1603. threads.emplace_back(&TransactionTestBase::TestTxn0, this, bi);
  1604. break;
  1605. case 1:
  1606. threads.emplace_back(&TransactionTestBase::TestTxn1, this, bi);
  1607. break;
  1608. case 2:
  1609. threads.emplace_back(&TransactionTestBase::TestTxn2, this, bi);
  1610. break;
  1611. case 3:
  1612. threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
  1613. break;
  1614. case 4:
  1615. threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
  1616. break;
  1617. default:
  1618. FAIL();
  1619. }
  1620. // wait to be linked
  1621. while (linked.load(std::memory_order_acquire) <= bi) {
  1622. }
  1623. // after a queue of size first_group_size
  1624. if (bi + 1 == first_group_size) {
  1625. while (!batch_formed) {
  1626. }
  1627. // to make it more deterministic, wait until the commits are linked
  1628. while (linked.load(std::memory_order_acquire) <=
  1629. bi + expected_commits) {
  1630. }
  1631. }
  1632. }
  1633. for (auto& t : threads) {
  1634. t.join();
  1635. }
  1636. if (options.two_write_queues) {
  1637. // In this case none of the above scheduling tricks to deterministically
  1638. // form merged batches works because the writes go to separate queues.
  1639. // This would result in different write groups in each run of the test. We
  1640. // still keep the test since although non-deterministic and hard to debug,
  1641. // it is still useful to have.
  1642. // TODO(myabandeh): Add a deterministic unit test for two_write_queues
  1643. }
  1644. // Check if memtable inserts advanced seq number as expected
  1645. seq = db_impl->TEST_GetLastVisibleSequence();
  1646. ASSERT_EQ(exp_seq, seq);
  1647. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1648. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1649. // Check if recovery preserves the last sequence number
  1650. ASSERT_OK(db_impl->FlushWAL(true));
  1651. ASSERT_OK(ReOpenNoDelete());
  1652. ASSERT_NE(db, nullptr);
  1653. db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  1654. seq = db_impl->TEST_GetLastVisibleSequence();
  1655. ASSERT_LE(exp_seq, seq + with_empty_commits);
  1656. // Check if flush preserves the last sequence number
  1657. ASSERT_OK(db_impl->Flush(fopt));
  1658. seq = db_impl->GetLatestSequenceNumber();
  1659. ASSERT_LE(exp_seq, seq + with_empty_commits);
  1660. // Check if recovery after flush preserves the last sequence number
  1661. ASSERT_OK(db_impl->FlushWAL(true));
  1662. ASSERT_OK(ReOpenNoDelete());
  1663. ASSERT_NE(db, nullptr);
  1664. db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  1665. seq = db_impl->GetLatestSequenceNumber();
  1666. ASSERT_LE(exp_seq, seq + with_empty_commits);
  1667. }
  1668. }
  1669. // Run a couple of different txns among them some uncommitted. Restart the db at
  1670. // a couple points to check whether the list of uncommitted txns are recovered
  1671. // properly.
  1672. TEST_P(WritePreparedTransactionTest, BasicRecovery) {
  1673. options.disable_auto_compactions = true;
  1674. ASSERT_OK(ReOpen());
  1675. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1676. TestTxn0(0);
  1677. TransactionOptions txn_options;
  1678. WriteOptions write_options;
  1679. size_t index = 1000;
  1680. Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
  1681. auto istr0 = std::to_string(index);
  1682. auto s = txn0->SetName("xid" + istr0);
  1683. ASSERT_OK(s);
  1684. s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
  1685. ASSERT_OK(s);
  1686. s = txn0->Prepare();
  1687. ASSERT_OK(s);
  1688. auto prep_seq_0 = txn0->GetId();
  1689. TestTxn1(0);
  1690. index++;
  1691. Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
  1692. auto istr1 = std::to_string(index);
  1693. s = txn1->SetName("xid" + istr1);
  1694. ASSERT_OK(s);
  1695. s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
  1696. ASSERT_OK(s);
  1697. s = txn1->Prepare();
  1698. ASSERT_OK(s);
  1699. auto prep_seq_1 = txn1->GetId();
  1700. TestTxn2(0);
  1701. ReadOptions ropt;
  1702. PinnableSlice pinnable_val;
  1703. // Check the value is not committed before restart
  1704. s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
  1705. ASSERT_TRUE(s.IsNotFound());
  1706. pinnable_val.Reset();
  1707. delete txn0;
  1708. delete txn1;
  1709. ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
  1710. wp_db->TEST_Crash();
  1711. ASSERT_OK(ReOpenNoDelete());
  1712. ASSERT_NE(db, nullptr);
  1713. wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1714. // After recovery, all the uncommitted txns (0 and 1) should be inserted into
  1715. // delayed_prepared_
  1716. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  1717. ASSERT_FALSE(wp_db->delayed_prepared_empty_);
  1718. ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
  1719. ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
  1720. {
  1721. ReadLock rl(&wp_db->prepared_mutex_);
  1722. ASSERT_EQ(2, wp_db->delayed_prepared_.size());
  1723. ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
  1724. wp_db->delayed_prepared_.end());
  1725. ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
  1726. wp_db->delayed_prepared_.end());
  1727. }
  1728. // Check the value is still not committed after restart
  1729. s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
  1730. ASSERT_TRUE(s.IsNotFound());
  1731. pinnable_val.Reset();
  1732. TestTxn3(0);
  1733. // Test that a recovered txns will be properly marked committed for the next
  1734. // recovery
  1735. txn1 = db->GetTransactionByName("xid" + istr1);
  1736. ASSERT_NE(txn1, nullptr);
  1737. ASSERT_OK(txn1->Commit());
  1738. delete txn1;
  1739. index++;
  1740. Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
  1741. auto istr2 = std::to_string(index);
  1742. s = txn2->SetName("xid" + istr2);
  1743. ASSERT_OK(s);
  1744. s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
  1745. ASSERT_OK(s);
  1746. s = txn2->Prepare();
  1747. ASSERT_OK(s);
  1748. auto prep_seq_2 = txn2->GetId();
  1749. delete txn2;
  1750. ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
  1751. wp_db->TEST_Crash();
  1752. ASSERT_OK(ReOpenNoDelete());
  1753. ASSERT_NE(db, nullptr);
  1754. wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1755. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  1756. ASSERT_FALSE(wp_db->delayed_prepared_empty_);
  1757. // 0 and 2 are prepared and 1 is committed
  1758. {
  1759. ReadLock rl(&wp_db->prepared_mutex_);
  1760. ASSERT_EQ(2, wp_db->delayed_prepared_.size());
  1761. const auto& end = wp_db->delayed_prepared_.end();
  1762. ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
  1763. ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
  1764. ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
  1765. }
  1766. ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
  1767. ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
  1768. // Commit all the remaining txns
  1769. txn0 = db->GetTransactionByName("xid" + istr0);
  1770. ASSERT_NE(txn0, nullptr);
  1771. ASSERT_OK(txn0->Commit());
  1772. txn2 = db->GetTransactionByName("xid" + istr2);
  1773. ASSERT_NE(txn2, nullptr);
  1774. ASSERT_OK(txn2->Commit());
  1775. // Check the value is committed after commit
  1776. s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
  1777. ASSERT_TRUE(s.ok());
  1778. ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
  1779. pinnable_val.Reset();
  1780. delete txn0;
  1781. delete txn2;
  1782. ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
  1783. ASSERT_OK(ReOpenNoDelete());
  1784. ASSERT_NE(db, nullptr);
  1785. wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1786. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  1787. ASSERT_TRUE(wp_db->delayed_prepared_empty_);
  1788. // Check the value is still committed after recovery
  1789. s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
  1790. ASSERT_TRUE(s.ok());
  1791. ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
  1792. pinnable_val.Reset();
  1793. }
  1794. // After recovery the commit map is empty while the max is set. The code would
  1795. // go through a different path which requires a separate test. Test that the
  1796. // committed data before the restart is visible to all snapshots.
  1797. TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
  1798. for (bool end_with_prepare : {false, true}) {
  1799. ASSERT_OK(ReOpen());
  1800. WriteOptions woptions;
  1801. ASSERT_OK(db->Put(woptions, "key", "value"));
  1802. ASSERT_OK(db->Put(woptions, "key", "value"));
  1803. ASSERT_OK(db->Put(woptions, "key", "value"));
  1804. SequenceNumber prepare_seq = kMaxSequenceNumber;
  1805. if (end_with_prepare) {
  1806. TransactionOptions txn_options;
  1807. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  1808. ASSERT_OK(txn->SetName("xid0"));
  1809. ASSERT_OK(txn->Prepare());
  1810. prepare_seq = txn->GetId();
  1811. delete txn;
  1812. }
  1813. dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
  1814. auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  1815. ASSERT_OK(db_impl->FlushWAL(true));
  1816. ASSERT_OK(ReOpenNoDelete());
  1817. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1818. ASSERT_NE(wp_db, nullptr);
  1819. ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
  1820. // Take a snapshot right after recovery
  1821. const Snapshot* snap = db->GetSnapshot();
  1822. auto snap_seq = snap->GetSequenceNumber();
  1823. ASSERT_GT(snap_seq, 0);
  1824. for (SequenceNumber seq = 0;
  1825. seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
  1826. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
  1827. }
  1828. if (end_with_prepare) {
  1829. ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
  1830. }
  1831. // trivial check
  1832. ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
  1833. db->ReleaseSnapshot(snap);
  1834. ASSERT_OK(db->Put(woptions, "key", "value"));
  1835. // Take a snapshot after some writes
  1836. snap = db->GetSnapshot();
  1837. snap_seq = snap->GetSequenceNumber();
  1838. for (SequenceNumber seq = 0;
  1839. seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
  1840. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
  1841. }
  1842. if (end_with_prepare) {
  1843. ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
  1844. }
  1845. // trivial check
  1846. ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
  1847. db->ReleaseSnapshot(snap);
  1848. }
  1849. }
  1850. // Shows the contract of IsInSnapshot when called on invalid/released snapshots
  1851. TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
  1852. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  1853. WriteOptions woptions;
  1854. ASSERT_OK(db->Put(woptions, "key", "value"));
  1855. // snap seq = 1
  1856. const Snapshot* snap1 = db->GetSnapshot();
  1857. ASSERT_OK(db->Put(woptions, "key", "value"));
  1858. ASSERT_OK(db->Put(woptions, "key", "value"));
  1859. // snap seq = 3
  1860. const Snapshot* snap2 = db->GetSnapshot();
  1861. const SequenceNumber seq = 1;
  1862. // Evict seq out of commit cache
  1863. size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
  1864. wp_db->AddCommitted(overwrite_seq, overwrite_seq);
  1865. SequenceNumber snap_seq;
  1866. uint64_t min_uncommitted = kMinUnCommittedSeq;
  1867. bool released;
  1868. released = false;
  1869. snap_seq = snap1->GetSequenceNumber();
  1870. ASSERT_LE(seq, snap_seq);
  1871. // Valid snapshot lower than max
  1872. ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
  1873. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
  1874. ASSERT_FALSE(released);
  1875. released = false;
  1876. snap_seq = snap1->GetSequenceNumber();
  1877. // Invaid snapshot lower than max
  1878. ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
  1879. ASSERT_TRUE(
  1880. wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
  1881. ASSERT_TRUE(released);
  1882. db->ReleaseSnapshot(snap1);
  1883. released = false;
  1884. // Released snapshot lower than max
  1885. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
  1886. // The release does not take affect until the next max advance
  1887. ASSERT_FALSE(released);
  1888. released = false;
  1889. // Invaid snapshot lower than max
  1890. ASSERT_TRUE(
  1891. wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
  1892. ASSERT_TRUE(released);
  1893. // This make the snapshot release to reflect in txn db structures
  1894. wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
  1895. wp_db->max_evicted_seq_ + 1);
  1896. released = false;
  1897. // Released snapshot lower than max
  1898. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
  1899. ASSERT_TRUE(released);
  1900. released = false;
  1901. // Invaid snapshot lower than max
  1902. ASSERT_TRUE(
  1903. wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
  1904. ASSERT_TRUE(released);
  1905. snap_seq = snap2->GetSequenceNumber();
  1906. released = false;
  1907. // Unreleased snapshot lower than max
  1908. ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
  1909. ASSERT_FALSE(released);
  1910. db->ReleaseSnapshot(snap2);
  1911. }
  1912. // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
  1913. // snapshot, max_committed_seq_, prepared, and commit entries.
  1914. TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
  1915. WriteOptions wo;
  1916. // Use small commit cache to trigger lots of eviction and fast advance of
  1917. // max_evicted_seq_
  1918. const size_t commit_cache_bits = 3;
  1919. // Same for snapshot cache size
  1920. const size_t snapshot_cache_bits = 2;
  1921. // Take some preliminary snapshots first. This is to stress the data structure
  1922. // that holds the old snapshots as it will be designed to be efficient when
  1923. // only a few snapshots are below the max_evicted_seq_.
  1924. for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
  1925. // Leave some gap between the preliminary snapshots and the final snapshot
  1926. // that we check. This should test for also different overlapping scenarios
  1927. // between the last snapshot and the commits.
  1928. for (int max_gap = 1; max_gap < 10; max_gap++) {
  1929. // Since we do not actually write to db, we mock the seq as it would be
  1930. // increased by the db. The only exception is that we need db seq to
  1931. // advance for our snapshots. for which we apply a dummy put each time we
  1932. // increase our mock of seq.
  1933. uint64_t seq = 0;
  1934. // At each step we prepare a txn and then we commit it in the next txn.
  1935. // This emulates the consecutive transactions that write to the same key
  1936. uint64_t cur_txn = 0;
  1937. // Number of snapshots taken so far
  1938. int num_snapshots = 0;
  1939. // Number of gaps applied so far
  1940. int gap_cnt = 0;
  1941. // The final snapshot that we will inspect
  1942. uint64_t snapshot = 0;
  1943. bool found_committed = false;
  1944. // To stress the data structure that maintain prepared txns, at each cycle
  1945. // we add a new prepare txn. These do not mean to be committed for
  1946. // snapshot inspection.
  1947. std::set<uint64_t> prepared;
  1948. // We keep the list of txns committed before we take the last snapshot.
  1949. // These should be the only seq numbers that will be found in the snapshot
  1950. std::set<uint64_t> committed_before;
  1951. // The set of commit seq numbers to be excluded from IsInSnapshot queries
  1952. std::set<uint64_t> commit_seqs;
  1953. DBImpl* mock_db = new DBImpl(options, dbname);
  1954. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  1955. std::unique_ptr<WritePreparedTxnDBMock> wp_db(
  1956. new WritePreparedTxnDBMock(mock_db, txn_db_options));
  1957. // We continue until max advances a bit beyond the snapshot.
  1958. while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
  1959. // do prepare for a transaction
  1960. seq++;
  1961. wp_db->AddPrepared(seq);
  1962. prepared.insert(seq);
  1963. // If cur_txn is not started, do prepare for it.
  1964. if (!cur_txn) {
  1965. seq++;
  1966. cur_txn = seq;
  1967. wp_db->AddPrepared(cur_txn);
  1968. } else { // else commit it
  1969. seq++;
  1970. wp_db->AddCommitted(cur_txn, seq);
  1971. wp_db->RemovePrepared(cur_txn);
  1972. commit_seqs.insert(seq);
  1973. if (!snapshot) {
  1974. committed_before.insert(cur_txn);
  1975. }
  1976. cur_txn = 0;
  1977. }
  1978. if (num_snapshots < max_snapshots - 1) {
  1979. // Take preliminary snapshots
  1980. wp_db->TakeSnapshot(seq);
  1981. num_snapshots++;
  1982. } else if (gap_cnt < max_gap) {
  1983. // Wait for some gap before taking the final snapshot
  1984. gap_cnt++;
  1985. } else if (!snapshot) {
  1986. // Take the final snapshot if it is not already taken
  1987. snapshot = seq;
  1988. wp_db->TakeSnapshot(snapshot);
  1989. num_snapshots++;
  1990. }
  1991. // If the snapshot is taken, verify seq numbers visible to it. We redo
  1992. // it at each cycle to test that the system is still sound when
  1993. // max_evicted_seq_ advances.
  1994. if (snapshot) {
  1995. for (uint64_t s = 1;
  1996. s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
  1997. bool was_committed =
  1998. (committed_before.find(s) != committed_before.end());
  1999. bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
  2000. if (was_committed != is_in_snapshot) {
  2001. printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
  2002. " snapshot %" PRIu64
  2003. " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
  2004. max_snapshots, max_gap, seq,
  2005. wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
  2006. num_snapshots, s);
  2007. }
  2008. ASSERT_EQ(was_committed, is_in_snapshot);
  2009. found_committed = found_committed || is_in_snapshot;
  2010. }
  2011. }
  2012. }
  2013. // Safety check to make sure the test actually ran
  2014. ASSERT_TRUE(found_committed);
  2015. // As an extra check, check if prepared set will be properly empty after
  2016. // they are committed.
  2017. if (cur_txn) {
  2018. wp_db->AddCommitted(cur_txn, seq);
  2019. wp_db->RemovePrepared(cur_txn);
  2020. }
  2021. for (auto p : prepared) {
  2022. wp_db->AddCommitted(p, seq);
  2023. wp_db->RemovePrepared(p);
  2024. }
  2025. ASSERT_TRUE(wp_db->delayed_prepared_.empty());
  2026. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  2027. }
  2028. }
  2029. }
  2030. void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
  2031. PinnableSlice& exp_v, Slice key) {
  2032. Status s;
  2033. PinnableSlice v;
  2034. s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
  2035. ASSERT_EQ(exp_s, s);
  2036. ASSERT_TRUE(s.ok() || s.IsNotFound());
  2037. if (s.ok()) {
  2038. ASSERT_TRUE(exp_v == v);
  2039. }
  2040. // Try with MultiGet API too
  2041. std::vector<std::string> values;
  2042. auto s_vec =
  2043. db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
  2044. ASSERT_EQ(1, values.size());
  2045. ASSERT_EQ(1, s_vec.size());
  2046. s = s_vec[0];
  2047. ASSERT_EQ(exp_s, s);
  2048. ASSERT_TRUE(s.ok() || s.IsNotFound());
  2049. if (s.ok()) {
  2050. ASSERT_TRUE(exp_v == values[0]);
  2051. }
  2052. }
  2053. void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
  2054. Slice key) {
  2055. ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
  2056. }
  2057. TEST_P(WritePreparedTransactionTest, Rollback) {
  2058. ReadOptions roptions;
  2059. WriteOptions woptions;
  2060. TransactionOptions txn_options;
  2061. const size_t num_keys = 4;
  2062. const size_t num_values = 5;
  2063. for (size_t ikey = 1; ikey <= num_keys; ikey++) {
  2064. for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
  2065. for (bool crash : {false, true}) {
  2066. ASSERT_OK(ReOpen());
  2067. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  2068. std::string key_str = "key" + std::to_string(ikey);
  2069. switch (ivalue) {
  2070. case 0:
  2071. break;
  2072. case 1:
  2073. ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
  2074. break;
  2075. case 2:
  2076. ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
  2077. break;
  2078. case 3:
  2079. ASSERT_OK(db->Delete(woptions, key_str));
  2080. break;
  2081. case 4:
  2082. ASSERT_OK(db->SingleDelete(woptions, key_str));
  2083. break;
  2084. default:
  2085. FAIL();
  2086. }
  2087. PinnableSlice v1;
  2088. auto s1 =
  2089. db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
  2090. PinnableSlice v2;
  2091. auto s2 =
  2092. db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
  2093. PinnableSlice v3;
  2094. auto s3 =
  2095. db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
  2096. PinnableSlice v4;
  2097. auto s4 =
  2098. db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
  2099. Transaction* txn = db->BeginTransaction(woptions, txn_options);
  2100. auto s = txn->SetName("xid0");
  2101. ASSERT_OK(s);
  2102. s = txn->Put(Slice("key1"), Slice("value1"));
  2103. ASSERT_OK(s);
  2104. s = txn->Merge(Slice("key2"), Slice("value2"));
  2105. ASSERT_OK(s);
  2106. s = txn->Delete(Slice("key3"));
  2107. ASSERT_OK(s);
  2108. s = txn->SingleDelete(Slice("key4"));
  2109. ASSERT_OK(s);
  2110. s = txn->Prepare();
  2111. ASSERT_OK(s);
  2112. {
  2113. ReadLock rl(&wp_db->prepared_mutex_);
  2114. ASSERT_FALSE(wp_db->prepared_txns_.empty());
  2115. ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
  2116. }
  2117. ASSERT_SAME(db, s1, v1, "key1");
  2118. ASSERT_SAME(db, s2, v2, "key2");
  2119. ASSERT_SAME(db, s3, v3, "key3");
  2120. ASSERT_SAME(db, s4, v4, "key4");
  2121. if (crash) {
  2122. delete txn;
  2123. auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  2124. ASSERT_OK(db_impl->FlushWAL(true));
  2125. dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
  2126. ASSERT_OK(ReOpenNoDelete());
  2127. ASSERT_NE(db, nullptr);
  2128. wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  2129. txn = db->GetTransactionByName("xid0");
  2130. ASSERT_FALSE(wp_db->delayed_prepared_empty_);
  2131. ReadLock rl(&wp_db->prepared_mutex_);
  2132. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  2133. ASSERT_FALSE(wp_db->delayed_prepared_.empty());
  2134. ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
  2135. wp_db->delayed_prepared_.end());
  2136. }
  2137. ASSERT_SAME(db, s1, v1, "key1");
  2138. ASSERT_SAME(db, s2, v2, "key2");
  2139. ASSERT_SAME(db, s3, v3, "key3");
  2140. ASSERT_SAME(db, s4, v4, "key4");
  2141. s = txn->Rollback();
  2142. ASSERT_OK(s);
  2143. {
  2144. ASSERT_TRUE(wp_db->delayed_prepared_empty_);
  2145. ReadLock rl(&wp_db->prepared_mutex_);
  2146. ASSERT_TRUE(wp_db->prepared_txns_.empty());
  2147. ASSERT_TRUE(wp_db->delayed_prepared_.empty());
  2148. }
  2149. ASSERT_SAME(db, s1, v1, "key1");
  2150. ASSERT_SAME(db, s2, v2, "key2");
  2151. ASSERT_SAME(db, s3, v3, "key3");
  2152. ASSERT_SAME(db, s4, v4, "key4");
  2153. delete txn;
  2154. }
  2155. }
  2156. }
  2157. }
  2158. TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
  2159. // Use large buffer to avoid memtable flush after 1024 insertions
  2160. options.write_buffer_size = 1024 * 1024;
  2161. ASSERT_OK(ReOpen());
  2162. std::vector<KeyVersion> versions;
  2163. uint64_t seq = 0;
  2164. for (uint64_t i = 1; i <= 1024; i++) {
  2165. std::string v = "bar" + std::to_string(i);
  2166. ASSERT_OK(db->Put(WriteOptions(), "foo", v));
  2167. VerifyKeys({{"foo", v}});
  2168. seq++; // one for the key/value
  2169. KeyVersion kv = {"foo", v, seq, kTypeValue};
  2170. if (options.two_write_queues) {
  2171. seq++; // one for the commit
  2172. }
  2173. versions.emplace_back(kv);
  2174. }
  2175. std::reverse(std::begin(versions), std::end(versions));
  2176. VerifyInternalKeys(versions);
  2177. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  2178. ASSERT_OK(db_impl->FlushWAL(true));
  2179. // Use small buffer to ensure memtable flush during recovery
  2180. options.write_buffer_size = 1024;
  2181. ASSERT_OK(ReOpenNoDelete());
  2182. VerifyInternalKeys(versions);
  2183. }
  2184. TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
  2185. ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
  2186. VerifyKeys({{"foo", "bar"}});
  2187. const Snapshot* snapshot = db->GetSnapshot();
  2188. ASSERT_OK(db->Flush(FlushOptions()));
  2189. // Dummy keys to avoid compaction trivially move files and get around actual
  2190. // compaction logic.
  2191. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  2192. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  2193. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2194. // Compaction will output keys with sequence number 0, if it is visible to
  2195. // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
  2196. // visible to any snapshot.
  2197. VerifyKeys({{"foo", "bar"}});
  2198. VerifyKeys({{"foo", "bar"}}, snapshot);
  2199. VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
  2200. db->ReleaseSnapshot(snapshot);
  2201. }
  2202. // Compaction should not remove a key if it is not committed, and should
  2203. // proceed with older versions of the key as-if the new version doesn't exist.
  2204. TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
  2205. options.disable_auto_compactions = true;
  2206. ASSERT_OK(ReOpen());
  2207. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  2208. // Snapshots to avoid keys get evicted.
  2209. std::vector<const Snapshot*> snapshots;
  2210. // Keep track of expected sequence number.
  2211. SequenceNumber expected_seq = 0;
  2212. auto add_key = [&](std::function<Status()> func) {
  2213. ASSERT_OK(func());
  2214. expected_seq++;
  2215. if (options.two_write_queues) {
  2216. expected_seq++; // 1 for commit
  2217. }
  2218. ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
  2219. snapshots.push_back(db->GetSnapshot());
  2220. };
  2221. // Each key here represent a standalone test case.
  2222. add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
  2223. add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
  2224. add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
  2225. add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
  2226. add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
  2227. add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
  2228. add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
  2229. add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
  2230. ASSERT_OK(db->Flush(FlushOptions()));
  2231. add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
  2232. add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
  2233. auto* transaction = db->BeginTransaction(WriteOptions());
  2234. ASSERT_OK(transaction->SetName("txn"));
  2235. ASSERT_OK(transaction->Put("key1", "value1_2"));
  2236. ASSERT_OK(transaction->Delete("key2"));
  2237. ASSERT_OK(transaction->SingleDelete("key3"));
  2238. ASSERT_OK(transaction->Merge("key4", "value4_2"));
  2239. ASSERT_OK(transaction->Merge("key5", "value5_3"));
  2240. ASSERT_OK(transaction->Put("key6", "value6_2"));
  2241. ASSERT_OK(transaction->Put("key7", "value7_2"));
  2242. // Prepare but not commit.
  2243. ASSERT_OK(transaction->Prepare());
  2244. ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
  2245. ASSERT_OK(db->Flush(FlushOptions()));
  2246. for (auto* s : snapshots) {
  2247. db->ReleaseSnapshot(s);
  2248. }
  2249. // Dummy keys to avoid compaction trivially move files and get around actual
  2250. // compaction logic.
  2251. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  2252. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  2253. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2254. VerifyKeys({
  2255. {"key1", "value1_1"},
  2256. {"key2", "value2_1"},
  2257. {"key3", "value3_1"},
  2258. {"key4", "value4_1"},
  2259. {"key5", "value5_1,value5_2"},
  2260. {"key6", "NOT_FOUND"},
  2261. {"key7", "NOT_FOUND"},
  2262. });
  2263. VerifyInternalKeys({
  2264. {"key1", "value1_2", expected_seq, kTypeValue},
  2265. {"key1", "value1_1", 0, kTypeValue},
  2266. {"key2", "", expected_seq, kTypeDeletion},
  2267. {"key2", "value2_1", 0, kTypeValue},
  2268. {"key3", "", expected_seq, kTypeSingleDeletion},
  2269. {"key3", "value3_1", 0, kTypeValue},
  2270. {"key4", "value4_2", expected_seq, kTypeMerge},
  2271. {"key4", "value4_1", 0, kTypeValue},
  2272. {"key5", "value5_3", expected_seq, kTypeMerge},
  2273. {"key5", "value5_1,value5_2", 0, kTypeValue},
  2274. {"key6", "value6_2", expected_seq, kTypeValue},
  2275. {"key7", "value7_2", expected_seq, kTypeValue},
  2276. });
  2277. ASSERT_OK(transaction->Commit());
  2278. VerifyKeys({
  2279. {"key1", "value1_2"},
  2280. {"key2", "NOT_FOUND"},
  2281. {"key3", "NOT_FOUND"},
  2282. {"key4", "value4_1,value4_2"},
  2283. {"key5", "value5_1,value5_2,value5_3"},
  2284. {"key6", "value6_2"},
  2285. {"key7", "value7_2"},
  2286. });
  2287. delete transaction;
  2288. }
  2289. // Compaction should keep keys visible to a snapshot based on commit sequence,
  2290. // not just prepare sequence.
  2291. TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
  2292. options.disable_auto_compactions = true;
  2293. ASSERT_OK(ReOpen());
  2294. // Keep track of expected sequence number.
  2295. SequenceNumber expected_seq = 0;
  2296. auto* txn1 = db->BeginTransaction(WriteOptions());
  2297. ASSERT_OK(txn1->SetName("txn1"));
  2298. ASSERT_OK(txn1->Put("key1", "value1_1"));
  2299. ASSERT_OK(txn1->Prepare());
  2300. ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
  2301. ASSERT_OK(txn1->Commit());
  2302. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  2303. ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
  2304. delete txn1;
  2305. // Take a snapshots to avoid keys get evicted before compaction.
  2306. const Snapshot* snapshot1 = db->GetSnapshot();
  2307. auto* txn2 = db->BeginTransaction(WriteOptions());
  2308. ASSERT_OK(txn2->SetName("txn2"));
  2309. ASSERT_OK(txn2->Put("key2", "value2_1"));
  2310. ASSERT_OK(txn2->Prepare());
  2311. ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
  2312. // txn1 commit before snapshot2 and it is visible to snapshot2.
  2313. // txn2 commit after snapshot2 and it is not visible.
  2314. const Snapshot* snapshot2 = db->GetSnapshot();
  2315. ASSERT_OK(txn2->Commit());
  2316. ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
  2317. delete txn2;
  2318. // Take a snapshots to avoid keys get evicted before compaction.
  2319. const Snapshot* snapshot3 = db->GetSnapshot();
  2320. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
  2321. expected_seq++; // 1 for write
  2322. SequenceNumber seq1 = expected_seq;
  2323. if (options.two_write_queues) {
  2324. expected_seq++; // 1 for commit
  2325. }
  2326. ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
  2327. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
  2328. expected_seq++; // 1 for write
  2329. SequenceNumber seq2 = expected_seq;
  2330. if (options.two_write_queues) {
  2331. expected_seq++; // 1 for commit
  2332. }
  2333. ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
  2334. ASSERT_OK(db->Flush(FlushOptions()));
  2335. db->ReleaseSnapshot(snapshot1);
  2336. db->ReleaseSnapshot(snapshot3);
  2337. // Dummy keys to avoid compaction trivially move files and get around actual
  2338. // compaction logic.
  2339. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  2340. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  2341. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2342. VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
  2343. VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
  2344. VerifyInternalKeys({
  2345. {"key1", "value1_2", seq1, kTypeValue},
  2346. // "value1_1" is visible to snapshot2. Also keys at bottom level visible
  2347. // to earliest snapshot will output with seq = 0.
  2348. {"key1", "value1_1", 0, kTypeValue},
  2349. {"key2", "value2_2", seq2, kTypeValue},
  2350. });
  2351. db->ReleaseSnapshot(snapshot2);
  2352. }
  2353. TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
  2354. const size_t snapshot_cache_bits = 7; // same as default
  2355. const size_t commit_cache_bits = 0; // disable commit cache
  2356. for (bool has_recent_prepare : {true, false}) {
  2357. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  2358. ASSERT_OK(ReOpen());
  2359. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
  2360. auto* transaction =
  2361. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2362. ASSERT_OK(transaction->SetName("txn"));
  2363. ASSERT_OK(transaction->Delete("key1"));
  2364. ASSERT_OK(transaction->Prepare());
  2365. // snapshot1 should get min_uncommitted from prepared_txns_ heap.
  2366. auto snapshot1 = db->GetSnapshot();
  2367. ASSERT_EQ(transaction->GetId(),
  2368. ((SnapshotImpl*)snapshot1)->min_uncommitted_);
  2369. // Add a commit to advance max_evicted_seq and move the prepared transaction
  2370. // into delayed_prepared_ set.
  2371. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
  2372. Transaction* txn2 = nullptr;
  2373. if (has_recent_prepare) {
  2374. txn2 =
  2375. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2376. ASSERT_OK(txn2->SetName("txn2"));
  2377. ASSERT_OK(txn2->Put("key3", "value3"));
  2378. ASSERT_OK(txn2->Prepare());
  2379. }
  2380. // snapshot2 should get min_uncommitted from delayed_prepared_ set.
  2381. auto snapshot2 = db->GetSnapshot();
  2382. ASSERT_EQ(transaction->GetId(),
  2383. ((SnapshotImpl*)snapshot1)->min_uncommitted_);
  2384. ASSERT_OK(transaction->Commit());
  2385. delete transaction;
  2386. if (has_recent_prepare) {
  2387. ASSERT_OK(txn2->Commit());
  2388. delete txn2;
  2389. }
  2390. VerifyKeys({{"key1", "NOT_FOUND"}});
  2391. VerifyKeys({{"key1", "value1"}}, snapshot1);
  2392. VerifyKeys({{"key1", "value1"}}, snapshot2);
  2393. db->ReleaseSnapshot(snapshot1);
  2394. db->ReleaseSnapshot(snapshot2);
  2395. }
  2396. }
  2397. // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
  2398. // take two snapshots, s1 and s2. Release s1 during compaction.
  2399. // Test to make sure compaction doesn't get confused and think s1 can see both
  2400. // values, and thus compact out the older value by mistake.
  2401. TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
  2402. const size_t snapshot_cache_bits = 7; // same as default
  2403. const size_t commit_cache_bits = 0; // minimum commit cache
  2404. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  2405. options.disable_auto_compactions = true;
  2406. ASSERT_OK(ReOpen());
  2407. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
  2408. auto* transaction =
  2409. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2410. ASSERT_OK(transaction->SetName("txn"));
  2411. ASSERT_OK(transaction->Put("key1", "value1_2"));
  2412. ASSERT_OK(transaction->Prepare());
  2413. auto snapshot1 = db->GetSnapshot();
  2414. // Increment sequence number.
  2415. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
  2416. auto snapshot2 = db->GetSnapshot();
  2417. ASSERT_OK(transaction->Commit());
  2418. delete transaction;
  2419. VerifyKeys({{"key1", "value1_2"}});
  2420. VerifyKeys({{"key1", "value1_1"}}, snapshot1);
  2421. VerifyKeys({{"key1", "value1_1"}}, snapshot2);
  2422. // Add a flush to avoid compaction to fallback to trivial move.
  2423. // The callback might be called twice, record the calling state to
  2424. // prevent double calling.
  2425. bool callback_finished = false;
  2426. auto callback = [&](void*) {
  2427. if (callback_finished) {
  2428. return;
  2429. }
  2430. // Release snapshot1 after CompactionIterator init.
  2431. // CompactionIterator need to figure out the earliest snapshot
  2432. // that can see key1:value1_2 is kMaxSequenceNumber, not
  2433. // snapshot1 or snapshot2.
  2434. db->ReleaseSnapshot(snapshot1);
  2435. // Add some keys to advance max_evicted_seq.
  2436. ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
  2437. ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
  2438. callback_finished = true;
  2439. };
  2440. SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
  2441. callback);
  2442. SyncPoint::GetInstance()->EnableProcessing();
  2443. ASSERT_OK(db->Flush(FlushOptions()));
  2444. VerifyKeys({{"key1", "value1_2"}});
  2445. VerifyKeys({{"key1", "value1_1"}}, snapshot2);
  2446. db->ReleaseSnapshot(snapshot2);
  2447. SyncPoint::GetInstance()->ClearAllCallBacks();
  2448. }
  2449. // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
  2450. // after committing v2. Release s1 during compaction, right after compaction
  2451. // processes v2 and before processes v1. Test to make sure compaction doesn't
  2452. // get confused and believe v1 and v2 are visible to different snapshot
  2453. // (v1 by s2, v2 by s1) and refuse to compact out v1.
  2454. TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
  2455. const size_t snapshot_cache_bits = 7; // same as default
  2456. const size_t commit_cache_bits = 0; // minimum commit cache
  2457. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  2458. options.disable_auto_compactions = true;
  2459. ASSERT_OK(ReOpen());
  2460. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
  2461. ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
  2462. SequenceNumber v2_seq = db->GetLatestSequenceNumber();
  2463. auto* s1 = db->GetSnapshot();
  2464. // Advance sequence number.
  2465. ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
  2466. auto* s2 = db->GetSnapshot();
  2467. int count_value = 0;
  2468. auto callback = [&](void* arg) {
  2469. auto* ikey = static_cast<ParsedInternalKey*>(arg);
  2470. if (ikey->user_key == "key1") {
  2471. count_value++;
  2472. if (count_value == 2) {
  2473. // Processing v1.
  2474. db->ReleaseSnapshot(s1);
  2475. // Add some keys to advance max_evicted_seq and update
  2476. // old_commit_map.
  2477. ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
  2478. ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
  2479. }
  2480. }
  2481. };
  2482. SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
  2483. callback);
  2484. SyncPoint::GetInstance()->EnableProcessing();
  2485. ASSERT_OK(db->Flush(FlushOptions()));
  2486. // value1 should be compact out.
  2487. VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
  2488. // cleanup
  2489. db->ReleaseSnapshot(s2);
  2490. SyncPoint::GetInstance()->ClearAllCallBacks();
  2491. }
  2492. // Insert two values, v1 and v2, for a key. Insert another dummy key
  2493. // so to evict the commit cache for v2, while v1 is still in commit cache.
  2494. // Take two snapshots, s1 and s2. Release s1 during compaction.
  2495. // Since commit cache for v2 is evicted, and old_commit_map don't have
  2496. // s1 (it is released),
  2497. // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
  2498. // (and not v1's)? Instead of putting a dummy, we can directly call
  2499. // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
  2500. TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
  2501. const size_t snapshot_cache_bits = 7; // same as default
  2502. const size_t commit_cache_bits = 1; // commit cache size = 2
  2503. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  2504. options.disable_auto_compactions = true;
  2505. ASSERT_OK(ReOpen());
  2506. // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
  2507. // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
  2508. auto add_dummy = [&]() {
  2509. auto* txn_dummy =
  2510. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2511. ASSERT_OK(txn_dummy->SetName("txn_dummy"));
  2512. ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
  2513. ASSERT_OK(txn_dummy->Prepare());
  2514. ASSERT_OK(txn_dummy->Commit());
  2515. delete txn_dummy;
  2516. };
  2517. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
  2518. auto* txn =
  2519. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2520. ASSERT_OK(txn->SetName("txn"));
  2521. ASSERT_OK(txn->Put("key1", "value2"));
  2522. ASSERT_OK(txn->Prepare());
  2523. // TODO(myabandeh): replace it with GetId()?
  2524. auto v2_seq = db->GetLatestSequenceNumber();
  2525. ASSERT_OK(txn->Commit());
  2526. delete txn;
  2527. auto* s1 = db->GetSnapshot();
  2528. // Dummy key to advance sequence number.
  2529. add_dummy();
  2530. auto* s2 = db->GetSnapshot();
  2531. // The callback might be called twice, record the calling state to
  2532. // prevent double calling.
  2533. bool callback_finished = false;
  2534. auto callback = [&](void*) {
  2535. if (callback_finished) {
  2536. return;
  2537. }
  2538. db->ReleaseSnapshot(s1);
  2539. // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
  2540. add_dummy();
  2541. add_dummy();
  2542. callback_finished = true;
  2543. };
  2544. SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
  2545. callback);
  2546. SyncPoint::GetInstance()->EnableProcessing();
  2547. ASSERT_OK(db->Flush(FlushOptions()));
  2548. // value1 should be compact out.
  2549. VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
  2550. db->ReleaseSnapshot(s2);
  2551. SyncPoint::GetInstance()->ClearAllCallBacks();
  2552. }
  2553. TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
  2554. const size_t snapshot_cache_bits = 7; // same as default
  2555. const size_t commit_cache_bits = 0; // minimum commit cache
  2556. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  2557. options.disable_auto_compactions = true;
  2558. ASSERT_OK(ReOpen());
  2559. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
  2560. SequenceNumber put_seq = db->GetLatestSequenceNumber();
  2561. auto* transaction =
  2562. db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
  2563. ASSERT_OK(transaction->SetName("txn"));
  2564. ASSERT_OK(transaction->Delete("key1"));
  2565. ASSERT_OK(transaction->Prepare());
  2566. SequenceNumber del_seq = db->GetLatestSequenceNumber();
  2567. auto snapshot1 = db->GetSnapshot();
  2568. // Increment sequence number.
  2569. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
  2570. auto snapshot2 = db->GetSnapshot();
  2571. ASSERT_OK(transaction->Commit());
  2572. delete transaction;
  2573. VerifyKeys({{"key1", "NOT_FOUND"}});
  2574. VerifyKeys({{"key1", "value1"}}, snapshot1);
  2575. VerifyKeys({{"key1", "value1"}}, snapshot2);
  2576. ASSERT_OK(db->Flush(FlushOptions()));
  2577. auto callback = [&](void* compaction) {
  2578. // Release snapshot1 after CompactionIterator init.
  2579. // CompactionIterator need to double check and find out snapshot2 is now
  2580. // the earliest existing snapshot.
  2581. if (compaction != nullptr) {
  2582. db->ReleaseSnapshot(snapshot1);
  2583. // Add some keys to advance max_evicted_seq.
  2584. ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
  2585. ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
  2586. }
  2587. };
  2588. SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
  2589. callback);
  2590. SyncPoint::GetInstance()->EnableProcessing();
  2591. // Dummy keys to avoid compaction trivially move files and get around actual
  2592. // compaction logic.
  2593. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  2594. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  2595. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2596. // Only verify for key1. Both the put and delete for the key should be kept.
  2597. // Since the delete tombstone is not visible to snapshot2, we need to keep
  2598. // at least one version of the key, for write-conflict check.
  2599. VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
  2600. {"key1", "value1", put_seq, kTypeValue}});
  2601. db->ReleaseSnapshot(snapshot2);
  2602. SyncPoint::GetInstance()->ClearAllCallBacks();
  2603. }
  2604. TEST_P(WritePreparedTransactionTest,
  2605. ReleaseEarliestSnapshotDuringCompaction_WithSD) {
  2606. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2607. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2608. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2609. options.disable_auto_compactions = true;
  2610. ASSERT_OK(ReOpen());
  2611. ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
  2612. ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
  2613. ASSERT_OK(db->Flush(FlushOptions()));
  2614. auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2615. /*old_txn=*/nullptr);
  2616. ASSERT_OK(txn->SingleDelete("key"));
  2617. ASSERT_OK(txn->Put("wow", "value"));
  2618. ASSERT_OK(txn->SetName("txn"));
  2619. ASSERT_OK(txn->Prepare());
  2620. ASSERT_OK(db->Flush(FlushOptions()));
  2621. const bool two_write_queues = std::get<1>(GetParam());
  2622. if (two_write_queues) {
  2623. // In the case of two queues, commit another txn just to bump
  2624. // last_published_seq so that a subsequent GetSnapshot() call can return
  2625. // a snapshot with higher sequence.
  2626. auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2627. /*old_txn=*/nullptr);
  2628. ASSERT_OK(dummy_txn->Put("haha", "value"));
  2629. ASSERT_OK(dummy_txn->Commit());
  2630. delete dummy_txn;
  2631. }
  2632. auto* snapshot = db->GetSnapshot();
  2633. ASSERT_OK(txn->Commit());
  2634. delete txn;
  2635. SyncPoint::GetInstance()->SetCallBack(
  2636. "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
  2637. if (!arg) {
  2638. return;
  2639. }
  2640. db->ReleaseSnapshot(snapshot);
  2641. // Advance max_evicted_seq
  2642. ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
  2643. });
  2644. SyncPoint::GetInstance()->EnableProcessing();
  2645. ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  2646. /*end=*/nullptr));
  2647. SyncPoint::GetInstance()->ClearAllCallBacks();
  2648. }
  2649. TEST_P(WritePreparedTransactionTest,
  2650. ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
  2651. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2652. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2653. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2654. options.disable_auto_compactions = true;
  2655. ASSERT_OK(ReOpen());
  2656. ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
  2657. ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
  2658. ASSERT_OK(db->Flush(FlushOptions()));
  2659. auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2660. /*old_txn=*/nullptr);
  2661. ASSERT_OK(txn->Put("bar", "value"));
  2662. ASSERT_OK(txn->SingleDelete("key"));
  2663. ASSERT_OK(txn->SetName("txn"));
  2664. ASSERT_OK(txn->Prepare());
  2665. ASSERT_OK(db->Flush(FlushOptions()));
  2666. ASSERT_OK(txn->Commit());
  2667. delete txn;
  2668. ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
  2669. // Create a dummy transaction to take a snapshot for ww-conflict detection.
  2670. TransactionOptions txn_opts;
  2671. txn_opts.set_snapshot = true;
  2672. auto* dummy_txn =
  2673. db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
  2674. SyncPoint::GetInstance()->SetCallBack(
  2675. "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
  2676. ASSERT_OK(dummy_txn->Rollback());
  2677. delete dummy_txn;
  2678. ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
  2679. });
  2680. SyncPoint::GetInstance()->EnableProcessing();
  2681. ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
  2682. auto* snapshot = db->GetSnapshot();
  2683. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  2684. db->ReleaseSnapshot(snapshot);
  2685. SyncPoint::GetInstance()->ClearAllCallBacks();
  2686. }
  2687. TEST_P(WritePreparedTransactionTest,
  2688. ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
  2689. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2690. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2691. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2692. options.disable_auto_compactions = true;
  2693. ASSERT_OK(ReOpen());
  2694. ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
  2695. ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
  2696. ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
  2697. ASSERT_OK(db->Flush(FlushOptions()));
  2698. auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2699. /*old_txn=*/nullptr);
  2700. ASSERT_OK(txn->Delete("b"));
  2701. ASSERT_OK(txn->SetName("txn"));
  2702. ASSERT_OK(txn->Prepare());
  2703. const bool two_write_queues = std::get<1>(GetParam());
  2704. if (two_write_queues) {
  2705. // In the case of two queues, commit another txn just to bump
  2706. // last_published_seq so that a subsequent GetSnapshot() call can return
  2707. // a snapshot with higher sequence.
  2708. auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2709. /*old_txn=*/nullptr);
  2710. ASSERT_OK(dummy_txn->Put("haha", "value"));
  2711. ASSERT_OK(dummy_txn->Commit());
  2712. delete dummy_txn;
  2713. }
  2714. auto* snapshot1 = db->GetSnapshot();
  2715. ASSERT_OK(txn->Commit());
  2716. delete txn;
  2717. auto* snapshot2 = db->GetSnapshot();
  2718. SyncPoint::GetInstance()->SetCallBack(
  2719. "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
  2720. if (!arg) {
  2721. return;
  2722. }
  2723. db->ReleaseSnapshot(snapshot1);
  2724. // Advance max_evicted_seq
  2725. ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
  2726. });
  2727. SyncPoint::GetInstance()->EnableProcessing();
  2728. ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  2729. /*end=*/nullptr));
  2730. db->ReleaseSnapshot(snapshot2);
  2731. SyncPoint::GetInstance()->ClearAllCallBacks();
  2732. }
  2733. TEST_P(WritePreparedTransactionTest,
  2734. ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
  2735. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2736. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2737. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2738. options.disable_auto_compactions = true;
  2739. ASSERT_OK(ReOpen());
  2740. // Create a dummy transaction to take a snapshot for ww-conflict detection.
  2741. TransactionOptions txn_opts;
  2742. txn_opts.set_snapshot = true;
  2743. auto* dummy_txn =
  2744. db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
  2745. // Increment seq
  2746. ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
  2747. ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
  2748. ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
  2749. auto* snapshot1 = db->GetSnapshot();
  2750. // Increment seq
  2751. ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
  2752. auto* snapshot2 = db->GetSnapshot();
  2753. SyncPoint::GetInstance()->SetCallBack(
  2754. "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
  2755. db->ReleaseSnapshot(snapshot1);
  2756. ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
  2757. });
  2758. SyncPoint::GetInstance()->EnableProcessing();
  2759. ASSERT_OK(db->Flush(FlushOptions()));
  2760. db->ReleaseSnapshot(snapshot2);
  2761. ASSERT_OK(dummy_txn->Commit());
  2762. delete dummy_txn;
  2763. SyncPoint::GetInstance()->ClearAllCallBacks();
  2764. }
  2765. TEST_P(WritePreparedTransactionTest,
  2766. ReleaseEarliestWriteConflictSnapshot_SingleDelete) {
  2767. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2768. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2769. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2770. options.disable_auto_compactions = true;
  2771. ASSERT_OK(ReOpen());
  2772. ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
  2773. ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
  2774. ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
  2775. ASSERT_OK(db->Flush(FlushOptions()));
  2776. {
  2777. CompactRangeOptions cro;
  2778. cro.change_level = true;
  2779. cro.target_level = 2;
  2780. ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  2781. }
  2782. std::unique_ptr<Transaction> txn;
  2783. txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions(),
  2784. /*old_txn=*/nullptr));
  2785. ASSERT_OK(txn->SetName("txn1"));
  2786. ASSERT_OK(txn->SingleDelete("b"));
  2787. ASSERT_OK(txn->Prepare());
  2788. ASSERT_OK(txn->Commit());
  2789. auto* snapshot1 = db->GetSnapshot();
  2790. // Bump seq of the db by performing writes so that
  2791. // earliest_snapshot_ < earliest_write_conflict_snapshot_ in
  2792. // CompactionIterator.
  2793. ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
  2794. // Create another snapshot for write conflict checking
  2795. std::unique_ptr<Transaction> txn2;
  2796. {
  2797. TransactionOptions txn_opts;
  2798. txn_opts.set_snapshot = true;
  2799. txn2.reset(
  2800. db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr));
  2801. }
  2802. // Bump seq so that the subsequent bg flush won't create a snapshot with the
  2803. // same seq as the previous snapshot for conflict checking.
  2804. ASSERT_OK(db->Put(WriteOptions(), "y", "dont"));
  2805. ASSERT_OK(db->Flush(FlushOptions()));
  2806. SyncPoint::GetInstance()->DisableProcessing();
  2807. SyncPoint::GetInstance()->ClearAllCallBacks();
  2808. SyncPoint::GetInstance()->SetCallBack(
  2809. "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* /*arg*/) {
  2810. // Rolling back txn2 should release its snapshot(for ww checking).
  2811. ASSERT_OK(txn2->Rollback());
  2812. txn2.reset();
  2813. // Advance max_evicted_seq
  2814. ASSERT_OK(db->Put(WriteOptions(), "x", "value"));
  2815. });
  2816. SyncPoint::GetInstance()->EnableProcessing();
  2817. ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  2818. /*end=*/nullptr));
  2819. SyncPoint::GetInstance()->DisableProcessing();
  2820. SyncPoint::GetInstance()->ClearAllCallBacks();
  2821. db->ReleaseSnapshot(snapshot1);
  2822. }
  2823. TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) {
  2824. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2825. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2826. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2827. options.disable_auto_compactions = true;
  2828. ASSERT_OK(ReOpen());
  2829. ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
  2830. ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
  2831. ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
  2832. ASSERT_OK(db->Flush(FlushOptions()));
  2833. {
  2834. CompactRangeOptions cro;
  2835. cro.change_level = true;
  2836. cro.target_level = 2;
  2837. ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  2838. }
  2839. ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
  2840. // Take a snapshot so that the SD won't be dropped during flush.
  2841. auto* tmp_snapshot = db->GetSnapshot();
  2842. ASSERT_OK(db->Put(WriteOptions(), "b", "value2"));
  2843. auto* snapshot = db->GetSnapshot();
  2844. ASSERT_OK(db->Flush(FlushOptions()));
  2845. db->ReleaseSnapshot(tmp_snapshot);
  2846. // Bump the sequence so that the below bg compaction job's snapshot will be
  2847. // different from snapshot's sequence.
  2848. ASSERT_OK(db->Put(WriteOptions(), "z", "foo"));
  2849. SyncPoint::GetInstance()->DisableProcessing();
  2850. SyncPoint::GetInstance()->ClearAllCallBacks();
  2851. SyncPoint::GetInstance()->SetCallBack(
  2852. "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
  2853. const auto* const ikey = static_cast<const ParsedInternalKey*>(arg);
  2854. assert(ikey);
  2855. if (ikey->user_key == "b") {
  2856. assert(ikey->type == kTypeValue);
  2857. db->ReleaseSnapshot(snapshot);
  2858. // Bump max_evicted_seq.
  2859. ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
  2860. }
  2861. });
  2862. SyncPoint::GetInstance()->EnableProcessing();
  2863. ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  2864. /*end=*/nullptr));
  2865. SyncPoint::GetInstance()->DisableProcessing();
  2866. SyncPoint::GetInstance()->ClearAllCallBacks();
  2867. }
  2868. TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) {
  2869. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2870. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2871. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2872. options.disable_auto_compactions = true;
  2873. ASSERT_OK(ReOpen());
  2874. // Generate an L0 with only SD for one key "b".
  2875. ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
  2876. ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
  2877. // Take a snapshot so that subsequent flush outputs the SD for "b".
  2878. auto* tmp_snapshot = db->GetSnapshot();
  2879. ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
  2880. ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
  2881. SyncPoint::GetInstance()->DisableProcessing();
  2882. SyncPoint::GetInstance()->ClearAllCallBacks();
  2883. SyncPoint::GetInstance()->SetCallBack(
  2884. "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) {
  2885. if (!arg) {
  2886. db->ReleaseSnapshot(tmp_snapshot);
  2887. // Bump max_evicted_seq
  2888. ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
  2889. }
  2890. });
  2891. SyncPoint::GetInstance()->EnableProcessing();
  2892. ASSERT_OK(db->Flush(FlushOptions()));
  2893. // Finish generating L0 with only SD for "b".
  2894. SyncPoint::GetInstance()->DisableProcessing();
  2895. SyncPoint::GetInstance()->ClearAllCallBacks();
  2896. // Move the L0 to L2.
  2897. {
  2898. CompactRangeOptions cro;
  2899. cro.change_level = true;
  2900. cro.target_level = 2;
  2901. ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  2902. }
  2903. ASSERT_OK(db->Put(WriteOptions(), "b", "value1"));
  2904. auto* snapshot = db->GetSnapshot();
  2905. // Bump seq so that a subsequent flush/compaction job's snapshot is larger
  2906. // than the above snapshot's seq.
  2907. ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
  2908. // Generate a second L0.
  2909. ASSERT_OK(db->Flush(FlushOptions()));
  2910. SyncPoint::GetInstance()->SetCallBack(
  2911. "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
  2912. const auto* const ikey = static_cast<const ParsedInternalKey*>(arg);
  2913. assert(ikey);
  2914. if (ikey->user_key == "b") {
  2915. assert(ikey->type == kTypeValue);
  2916. db->ReleaseSnapshot(snapshot);
  2917. // Bump max_evicted_seq.
  2918. ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
  2919. }
  2920. });
  2921. SyncPoint::GetInstance()->EnableProcessing();
  2922. ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
  2923. /*end=*/nullptr));
  2924. SyncPoint::GetInstance()->DisableProcessing();
  2925. SyncPoint::GetInstance()->ClearAllCallBacks();
  2926. }
  2927. // Although the user-contract indicates that a SD can only be issued for a key
  2928. // that exists and has not been overwritten, it is still possible for a Delete
  2929. // to be present when write-prepared transaction is rolled back.
  2930. TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) {
  2931. constexpr size_t kSnapshotCacheBits = 7; // same as default
  2932. constexpr size_t kCommitCacheBits = 0; // minimum commit cache
  2933. txn_db_options.rollback_deletion_type_callback =
  2934. [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
  2935. UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
  2936. options.disable_auto_compactions = true;
  2937. ASSERT_OK(ReOpen());
  2938. // Get a write conflict snapshot by creating a transaction with
  2939. // set_snapshot=true.
  2940. TransactionOptions txn_opts;
  2941. txn_opts.set_snapshot = true;
  2942. std::unique_ptr<Transaction> dummy_txn(
  2943. db->BeginTransaction(WriteOptions(), txn_opts));
  2944. std::unique_ptr<Transaction> txn0(
  2945. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  2946. ASSERT_OK(txn0->Put("foo", "value"));
  2947. ASSERT_OK(txn0->SetName("xid0"));
  2948. ASSERT_OK(txn0->Prepare());
  2949. // Create an SST with only {"foo": "value"}.
  2950. ASSERT_OK(db->Flush(FlushOptions()));
  2951. // Insert a Delete to cancel out the prior Put by txn0.
  2952. ASSERT_OK(txn0->Rollback());
  2953. txn0.reset();
  2954. // Create a second SST.
  2955. ASSERT_OK(db->Flush(FlushOptions()));
  2956. ASSERT_OK(db->Put(WriteOptions(), "foo", "value1"));
  2957. auto* snapshot = db->GetSnapshot();
  2958. ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
  2959. int count = 0;
  2960. SyncPoint::GetInstance()->DisableProcessing();
  2961. SyncPoint::GetInstance()->ClearAllCallBacks();
  2962. SyncPoint::GetInstance()->SetCallBack(
  2963. "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
  2964. const auto* const c = static_cast<const Compaction*>(arg);
  2965. assert(!c);
  2966. // Trigger once only for SingleDelete during flush.
  2967. if (0 == count) {
  2968. ++count;
  2969. db->ReleaseSnapshot(snapshot);
  2970. // Bump max_evicted_seq
  2971. ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
  2972. }
  2973. });
  2974. SyncPoint::GetInstance()->EnableProcessing();
  2975. // Create a third SST containing a SD without its matching PUT.
  2976. ASSERT_OK(db->Flush(FlushOptions()));
  2977. SyncPoint::GetInstance()->DisableProcessing();
  2978. SyncPoint::GetInstance()->ClearAllCallBacks();
  2979. SyncPoint::GetInstance()->EnableProcessing();
  2980. DBImpl* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
  2981. assert(dbimpl);
  2982. ASSERT_OK(dbimpl->TEST_CompactRange(
  2983. /*level=*/0, /*begin=*/nullptr, /*end=*/nullptr,
  2984. /*column_family=*/nullptr, /*disallow_trivial_mode=*/true));
  2985. SyncPoint::GetInstance()->DisableProcessing();
  2986. SyncPoint::GetInstance()->ClearAllCallBacks();
  2987. // Release the conflict-checking snapshot.
  2988. ASSERT_OK(dummy_txn->Rollback());
  2989. }
  2990. // A more complex test to verify compaction/flush should keep keys visible
  2991. // to snapshots.
  2992. TEST_P(WritePreparedTransactionTest,
  2993. CompactionKeepSnapshotVisibleKeysRandomized) {
  2994. constexpr size_t kNumTransactions = 10;
  2995. constexpr size_t kNumIterations = 1000;
  2996. std::vector<Transaction*> transactions(kNumTransactions, nullptr);
  2997. std::vector<size_t> versions(kNumTransactions, 0);
  2998. std::unordered_map<std::string, std::string> current_data;
  2999. std::vector<const Snapshot*> snapshots;
  3000. std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
  3001. Random rnd(1103);
  3002. options.disable_auto_compactions = true;
  3003. ASSERT_OK(ReOpen());
  3004. for (size_t i = 0; i < kNumTransactions; i++) {
  3005. std::string key = "key" + std::to_string(i);
  3006. std::string value = "value0";
  3007. ASSERT_OK(db->Put(WriteOptions(), key, value));
  3008. current_data[key] = value;
  3009. }
  3010. VerifyKeys(current_data);
  3011. for (size_t iter = 0; iter < kNumIterations; iter++) {
  3012. auto r = rnd.Next() % (kNumTransactions + 1);
  3013. if (r < kNumTransactions) {
  3014. std::string key = "key" + std::to_string(r);
  3015. if (transactions[r] == nullptr) {
  3016. std::string value = "value" + std::to_string(versions[r] + 1);
  3017. auto* txn = db->BeginTransaction(WriteOptions());
  3018. ASSERT_OK(txn->SetName("txn" + std::to_string(r)));
  3019. ASSERT_OK(txn->Put(key, value));
  3020. ASSERT_OK(txn->Prepare());
  3021. transactions[r] = txn;
  3022. } else {
  3023. std::string value = "value" + std::to_string(++versions[r]);
  3024. ASSERT_OK(transactions[r]->Commit());
  3025. delete transactions[r];
  3026. transactions[r] = nullptr;
  3027. current_data[key] = value;
  3028. }
  3029. } else {
  3030. auto* snapshot = db->GetSnapshot();
  3031. VerifyKeys(current_data, snapshot);
  3032. snapshots.push_back(snapshot);
  3033. snapshot_data.push_back(current_data);
  3034. }
  3035. VerifyKeys(current_data);
  3036. }
  3037. // Take a last snapshot to test compaction with uncommitted prepared
  3038. // transaction.
  3039. snapshots.push_back(db->GetSnapshot());
  3040. snapshot_data.push_back(current_data);
  3041. ASSERT_EQ(snapshots.size(), snapshot_data.size());
  3042. for (size_t i = 0; i < snapshots.size(); i++) {
  3043. VerifyKeys(snapshot_data[i], snapshots[i]);
  3044. }
  3045. ASSERT_OK(db->Flush(FlushOptions()));
  3046. for (size_t i = 0; i < snapshots.size(); i++) {
  3047. VerifyKeys(snapshot_data[i], snapshots[i]);
  3048. }
  3049. // Dummy keys to avoid compaction trivially move files and get around actual
  3050. // compaction logic.
  3051. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  3052. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  3053. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  3054. for (size_t i = 0; i < snapshots.size(); i++) {
  3055. VerifyKeys(snapshot_data[i], snapshots[i]);
  3056. }
  3057. // cleanup
  3058. for (size_t i = 0; i < kNumTransactions; i++) {
  3059. if (transactions[i] == nullptr) {
  3060. continue;
  3061. }
  3062. ASSERT_OK(transactions[i]->Commit());
  3063. delete transactions[i];
  3064. }
  3065. for (size_t i = 0; i < snapshots.size(); i++) {
  3066. db->ReleaseSnapshot(snapshots[i]);
  3067. }
  3068. }
  3069. // Compaction should not apply the optimization to output key with sequence
  3070. // number equal to 0 if the key is not visible to earliest snapshot, based on
  3071. // commit sequence number.
  3072. TEST_P(WritePreparedTransactionTest,
  3073. CompactionShouldKeepSequenceForUncommittedKeys) {
  3074. options.disable_auto_compactions = true;
  3075. ASSERT_OK(ReOpen());
  3076. // Keep track of expected sequence number.
  3077. SequenceNumber expected_seq = 0;
  3078. auto* transaction = db->BeginTransaction(WriteOptions());
  3079. ASSERT_OK(transaction->SetName("txn"));
  3080. ASSERT_OK(transaction->Put("key1", "value1"));
  3081. ASSERT_OK(transaction->Prepare());
  3082. ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
  3083. SequenceNumber seq1 = expected_seq;
  3084. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
  3085. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  3086. expected_seq++; // one for data
  3087. if (options.two_write_queues) {
  3088. expected_seq++; // one for commit
  3089. }
  3090. ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
  3091. ASSERT_OK(db->Flush(FlushOptions()));
  3092. // Dummy keys to avoid compaction trivially move files and get around actual
  3093. // compaction logic.
  3094. ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
  3095. ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
  3096. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  3097. VerifyKeys({
  3098. {"key1", "NOT_FOUND"},
  3099. {"key2", "value2"},
  3100. });
  3101. VerifyInternalKeys({
  3102. // "key1" has not been committed. It keeps its sequence number.
  3103. {"key1", "value1", seq1, kTypeValue},
  3104. // "key2" is committed and output with seq = 0.
  3105. {"key2", "value2", 0, kTypeValue},
  3106. });
  3107. ASSERT_OK(transaction->Commit());
  3108. VerifyKeys({
  3109. {"key1", "value1"},
  3110. {"key2", "value2"},
  3111. });
  3112. delete transaction;
  3113. }
  3114. TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
  3115. options.disable_auto_compactions = true;
  3116. ASSERT_OK(ReOpen());
  3117. const Snapshot* snapshot = nullptr;
  3118. ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
  3119. auto* txn = db->BeginTransaction(WriteOptions());
  3120. ASSERT_OK(txn->SetName("txn"));
  3121. ASSERT_OK(txn->Put("key1", "value2"));
  3122. ASSERT_OK(txn->Prepare());
  3123. auto callback = [&](void*) {
  3124. // Snapshot is taken after compaction start. It should be taken into
  3125. // consideration for whether to compact out value1.
  3126. snapshot = db->GetSnapshot();
  3127. ASSERT_OK(txn->Commit());
  3128. delete txn;
  3129. };
  3130. SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
  3131. callback);
  3132. SyncPoint::GetInstance()->EnableProcessing();
  3133. ASSERT_OK(db->Flush(FlushOptions()));
  3134. ASSERT_NE(nullptr, snapshot);
  3135. VerifyKeys({{"key1", "value2"}});
  3136. VerifyKeys({{"key1", "value1"}}, snapshot);
  3137. db->ReleaseSnapshot(snapshot);
  3138. }
  3139. TEST_P(WritePreparedTransactionTest, Iterate) {
  3140. auto verify_state = [](Iterator* iter, const std::string& key,
  3141. const std::string& value) {
  3142. ASSERT_TRUE(iter->Valid());
  3143. ASSERT_OK(iter->status());
  3144. ASSERT_EQ(key, iter->key().ToString());
  3145. ASSERT_EQ(value, iter->value().ToString());
  3146. };
  3147. auto verify_iter = [&](const std::string& expected_val) {
  3148. // Get iterator from a concurrent transaction and make sure it has the
  3149. // same view as an iterator from the DB.
  3150. auto* txn = db->BeginTransaction(WriteOptions());
  3151. for (int i = 0; i < 2; i++) {
  3152. Iterator* iter = (i == 0) ? db->NewIterator(ReadOptions())
  3153. : txn->GetIterator(ReadOptions());
  3154. // Seek
  3155. iter->Seek("foo");
  3156. verify_state(iter, "foo", expected_val);
  3157. // Next
  3158. iter->Seek("a");
  3159. verify_state(iter, "a", "va");
  3160. iter->Next();
  3161. verify_state(iter, "foo", expected_val);
  3162. // SeekForPrev
  3163. iter->SeekForPrev("y");
  3164. verify_state(iter, "foo", expected_val);
  3165. // Prev
  3166. iter->SeekForPrev("z");
  3167. verify_state(iter, "z", "vz");
  3168. iter->Prev();
  3169. verify_state(iter, "foo", expected_val);
  3170. delete iter;
  3171. }
  3172. delete txn;
  3173. };
  3174. ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
  3175. auto* transaction = db->BeginTransaction(WriteOptions());
  3176. ASSERT_OK(transaction->SetName("txn"));
  3177. ASSERT_OK(transaction->Put("foo", "v2"));
  3178. ASSERT_OK(transaction->Prepare());
  3179. VerifyKeys({{"foo", "v1"}});
  3180. // dummy keys
  3181. ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
  3182. ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
  3183. verify_iter("v1");
  3184. ASSERT_OK(transaction->Commit());
  3185. VerifyKeys({{"foo", "v2"}});
  3186. verify_iter("v2");
  3187. delete transaction;
  3188. }
  3189. TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
  3190. Iterator* iter = db->NewIterator(ReadOptions());
  3191. ASSERT_OK(iter->status());
  3192. ASSERT_TRUE(iter->Refresh().IsNotSupported());
  3193. delete iter;
  3194. }
  3195. // Committing an delayed prepared has two non-atomic steps: update commit cache,
  3196. // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
  3197. // non-atomic steps of checking these two data structures. This test breaks each
  3198. // in the middle to ensure correctness in spite of non-atomic execution.
  3199. // Note: This test is limitted to the case where snapshot is larger than the
  3200. // max_evicted_seq_.
  3201. TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
  3202. const size_t snapshot_cache_bits = 7; // same as default
  3203. const size_t commit_cache_bits = 3; // 8 entries
  3204. for (auto split_read : {true, false}) {
  3205. std::vector<bool> split_options = {false};
  3206. if (split_read) {
  3207. // Also test for break before mutex
  3208. split_options.push_back(true);
  3209. }
  3210. for (auto split_before_mutex : split_options) {
  3211. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  3212. ASSERT_OK(ReOpen());
  3213. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  3214. DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
  3215. // Fill up the commit cache
  3216. std::string init_value("value1");
  3217. for (int i = 0; i < 10; i++) {
  3218. ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
  3219. }
  3220. // Prepare a transaction but do not commit it
  3221. Transaction* txn =
  3222. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3223. ASSERT_OK(txn->SetName("xid"));
  3224. ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
  3225. ASSERT_OK(txn->Prepare());
  3226. // Commit a bunch of entries to advance max evicted seq and make the
  3227. // prepared a delayed prepared
  3228. for (int i = 0; i < 10; i++) {
  3229. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3230. }
  3231. // The snapshot should not see the delayed prepared entry
  3232. auto snap = db->GetSnapshot();
  3233. if (split_read) {
  3234. if (split_before_mutex) {
  3235. // split before acquiring prepare_mutex_
  3236. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  3237. {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
  3238. "AtomicCommitOfDelayedPrepared:Commit:before"},
  3239. {"AtomicCommitOfDelayedPrepared:Commit:after",
  3240. "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
  3241. } else {
  3242. // split right after reading from the commit cache
  3243. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  3244. {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
  3245. "AtomicCommitOfDelayedPrepared:Commit:before"},
  3246. {"AtomicCommitOfDelayedPrepared:Commit:after",
  3247. "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
  3248. }
  3249. } else { // split commit
  3250. // split right before removing from delayed_prepared_
  3251. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  3252. {{"WritePreparedTxnDB::RemovePrepared:pause",
  3253. "AtomicCommitOfDelayedPrepared:Read:before"},
  3254. {"AtomicCommitOfDelayedPrepared:Read:after",
  3255. "WritePreparedTxnDB::RemovePrepared:resume"}});
  3256. }
  3257. SyncPoint::GetInstance()->EnableProcessing();
  3258. ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
  3259. TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
  3260. ASSERT_OK(txn->Commit());
  3261. if (split_before_mutex) {
  3262. // Do bunch of inserts to evict the commit entry from the cache. This
  3263. // would prevent the 2nd look into commit cache under prepare_mutex_
  3264. // to see the commit entry.
  3265. auto seq = db_impl->TEST_GetLastVisibleSequence();
  3266. size_t tries = 0;
  3267. while (wp_db->max_evicted_seq_ < seq && tries < 50) {
  3268. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3269. tries++;
  3270. };
  3271. ASSERT_LT(tries, 50);
  3272. }
  3273. TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
  3274. delete txn;
  3275. });
  3276. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  3277. TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
  3278. ReadOptions roptions;
  3279. roptions.snapshot = snap;
  3280. PinnableSlice value;
  3281. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
  3282. ASSERT_OK(s);
  3283. // It should not see the commit of delayed prepared
  3284. ASSERT_TRUE(value == init_value);
  3285. TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
  3286. db->ReleaseSnapshot(snap);
  3287. });
  3288. read_thread.join();
  3289. commit_thread.join();
  3290. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3291. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  3292. } // for split_before_mutex
  3293. } // for split_read
  3294. }
  3295. // When max evicted seq advances a prepared seq, it involves two updates: i)
  3296. // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
  3297. // ::IsInSnapshot also reads these two values in a non-atomic way. This test
  3298. // ensures correctness if the update occurs after ::IsInSnapshot reads
  3299. // delayed_prepared_empty_ and before it reads max_evicted_seq_.
  3300. // Note: this test focuses on read snapshot larger than max_evicted_seq_.
  3301. TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
  3302. const size_t snapshot_cache_bits = 7; // same as default
  3303. const size_t commit_cache_bits = 3; // 8 entries
  3304. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  3305. ASSERT_OK(ReOpen());
  3306. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  3307. // Fill up the commit cache
  3308. std::string init_value("value1");
  3309. for (int i = 0; i < 10; i++) {
  3310. ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
  3311. }
  3312. // Prepare a transaction but do not commit it
  3313. Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
  3314. ASSERT_OK(txn->SetName("xid"));
  3315. ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
  3316. ASSERT_OK(txn->Prepare());
  3317. // Create a gap between prepare seq and snapshot seq
  3318. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3319. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3320. // The snapshot should not see the delayed prepared entry
  3321. auto snap = db->GetSnapshot();
  3322. ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
  3323. // split right after reading delayed_prepared_empty_
  3324. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  3325. {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
  3326. "AtomicUpdateOfDelayedPrepared:before"},
  3327. {"AtomicUpdateOfDelayedPrepared:after",
  3328. "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
  3329. SyncPoint::GetInstance()->EnableProcessing();
  3330. ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
  3331. TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
  3332. // Commit a bunch of entries to advance max evicted seq and make the
  3333. // prepared a delayed prepared
  3334. size_t tries = 0;
  3335. while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
  3336. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3337. tries++;
  3338. };
  3339. ASSERT_LT(tries, 50);
  3340. // This is the case on which the test focuses
  3341. ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
  3342. TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
  3343. });
  3344. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  3345. ReadOptions roptions;
  3346. roptions.snapshot = snap;
  3347. PinnableSlice value;
  3348. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
  3349. ASSERT_OK(s);
  3350. // It should not see the uncommitted value of delayed prepared
  3351. ASSERT_TRUE(value == init_value);
  3352. db->ReleaseSnapshot(snap);
  3353. });
  3354. read_thread.join();
  3355. commit_thread.join();
  3356. ASSERT_OK(txn->Commit());
  3357. delete txn;
  3358. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3359. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  3360. }
  3361. // Eviction from commit cache and update of max evicted seq are two non-atomic
  3362. // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
  3363. // from commit cache are two non-atomic steps. This tests if the update occurs
  3364. // after reading max_evicted_seq_ and before reading the commit cache.
  3365. // Note: the test focuses on snapshot larger than max_evicted_seq_
  3366. TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
  3367. const size_t snapshot_cache_bits = 7; // same as default
  3368. const size_t commit_cache_bits = 3; // 8 entries
  3369. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  3370. ASSERT_OK(ReOpen());
  3371. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  3372. // Fill up the commit cache
  3373. std::string init_value("value1");
  3374. std::string last_value("value_final");
  3375. for (int i = 0; i < 10; i++) {
  3376. ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
  3377. }
  3378. // Do an uncommitted write to prevent min_uncommitted optimization
  3379. Transaction* txn1 =
  3380. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3381. ASSERT_OK(txn1->SetName("xid1"));
  3382. ASSERT_OK(txn1->Put(Slice("key0"), last_value));
  3383. ASSERT_OK(txn1->Prepare());
  3384. // Do a write with prepare to get the prepare seq
  3385. Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
  3386. ASSERT_OK(txn->SetName("xid"));
  3387. ASSERT_OK(txn->Put(Slice("key1"), last_value));
  3388. ASSERT_OK(txn->Prepare());
  3389. ASSERT_OK(txn->Commit());
  3390. // Create a gap between commit entry and snapshot seq
  3391. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3392. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3393. // The snapshot should see the last commit
  3394. auto snap = db->GetSnapshot();
  3395. ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
  3396. // split right after reading max_evicted_seq_
  3397. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  3398. {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
  3399. "NonAtomicUpdateOfMaxEvictedSeq:before"},
  3400. {"NonAtomicUpdateOfMaxEvictedSeq:after",
  3401. "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
  3402. SyncPoint::GetInstance()->EnableProcessing();
  3403. ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
  3404. TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
  3405. // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
  3406. size_t tries = 0;
  3407. while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
  3408. ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
  3409. tries++;
  3410. };
  3411. ASSERT_LT(tries, 50);
  3412. // This is the case on which the test focuses
  3413. ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
  3414. TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
  3415. });
  3416. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  3417. ReadOptions roptions;
  3418. roptions.snapshot = snap;
  3419. PinnableSlice value;
  3420. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
  3421. ASSERT_OK(s);
  3422. // It should see the committed value of the evicted entry
  3423. ASSERT_TRUE(value == last_value);
  3424. db->ReleaseSnapshot(snap);
  3425. });
  3426. read_thread.join();
  3427. commit_thread.join();
  3428. delete txn;
  3429. ASSERT_OK(txn1->Commit());
  3430. delete txn1;
  3431. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3432. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  3433. }
  3434. // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
  3435. // that. The test focuses on a race condition between AddPrepared and
  3436. // AdvanceMaxEvictedSeq functions.
  3437. TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
  3438. if (!options.two_write_queues) {
  3439. // This test is only for two write queues
  3440. return;
  3441. }
  3442. const size_t snapshot_cache_bits = 7; // same as default
  3443. // 1 entry to advance max after the 2nd commit
  3444. const size_t commit_cache_bits = 0;
  3445. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  3446. ASSERT_OK(ReOpen());
  3447. WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
  3448. std::string some_value("value_some");
  3449. std::string uncommitted_value("value_uncommitted");
  3450. // Prepare two uncommitted transactions
  3451. Transaction* txn1 =
  3452. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3453. ASSERT_OK(txn1->SetName("xid1"));
  3454. ASSERT_OK(txn1->Put(Slice("key1"), some_value));
  3455. ASSERT_OK(txn1->Prepare());
  3456. Transaction* txn2 =
  3457. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3458. ASSERT_OK(txn2->SetName("xid2"));
  3459. ASSERT_OK(txn2->Put(Slice("key2"), some_value));
  3460. ASSERT_OK(txn2->Prepare());
  3461. // Start the txn here so the other thread could get its id
  3462. Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
  3463. ASSERT_OK(txn->SetName("xid"));
  3464. ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
  3465. port::Mutex txn_mutex_;
  3466. // t1) Insert prepared entry, t2) commit other entries to advance max
  3467. // evicted sec and finish checking the existing prepared entries, t1)
  3468. // AddPrepared, t2) update max_evicted_seq_
  3469. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  3470. {"AddPreparedCallback::AddPrepared::begin:pause",
  3471. "AddPreparedBeforeMax::read_thread:start"},
  3472. {"AdvanceMaxEvictedSeq::update_max:pause",
  3473. "AddPreparedCallback::AddPrepared::begin:resume"},
  3474. {"AddPreparedCallback::AddPrepared::end",
  3475. "AdvanceMaxEvictedSeq::update_max:resume"},
  3476. });
  3477. SyncPoint::GetInstance()->EnableProcessing();
  3478. ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
  3479. txn_mutex_.Lock();
  3480. ASSERT_OK(txn->Prepare());
  3481. txn_mutex_.Unlock();
  3482. });
  3483. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  3484. TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
  3485. // Publish seq number with a commit
  3486. ASSERT_OK(txn1->Commit());
  3487. // Since the commit cache size is one the 2nd commit evict the 1st one and
  3488. // invokes AdcanceMaxEvictedSeq
  3489. ASSERT_OK(txn2->Commit());
  3490. ReadOptions roptions;
  3491. PinnableSlice value;
  3492. // The snapshot should not see the uncommitted value from write_thread
  3493. auto snap = db->GetSnapshot();
  3494. ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
  3495. // This is the scenario that we test for
  3496. txn_mutex_.Lock();
  3497. ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
  3498. txn_mutex_.Unlock();
  3499. roptions.snapshot = snap;
  3500. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
  3501. ASSERT_TRUE(s.IsNotFound());
  3502. db->ReleaseSnapshot(snap);
  3503. });
  3504. read_thread.join();
  3505. write_thread.join();
  3506. delete txn1;
  3507. delete txn2;
  3508. ASSERT_OK(txn->Commit());
  3509. delete txn;
  3510. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3511. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  3512. }
  3513. // When an old prepared entry gets committed, there is a gap between the time
  3514. // that it is published and when it is cleaned up from old_prepared_. This test
  3515. // stresses such cases.
  3516. TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
  3517. const size_t snapshot_cache_bits = 7; // same as default
  3518. for (const size_t commit_cache_bits : {0, 2, 3}) {
  3519. for (const size_t sub_batch_cnt : {1, 2, 3}) {
  3520. UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
  3521. ASSERT_OK(ReOpen());
  3522. std::atomic<const Snapshot*> snap = {nullptr};
  3523. std::atomic<SequenceNumber> exp_prepare = {0};
  3524. ROCKSDB_NAMESPACE::port::Thread callback_thread;
  3525. // Value is synchronized via snap
  3526. PinnableSlice value;
  3527. // Take a snapshot after publish and before RemovePrepared:Start
  3528. auto snap_callback = [&]() {
  3529. ASSERT_EQ(nullptr, snap.load());
  3530. snap.store(db->GetSnapshot());
  3531. ReadOptions roptions;
  3532. roptions.snapshot = snap.load();
  3533. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
  3534. ASSERT_OK(s);
  3535. };
  3536. auto callback = [&](void* param) {
  3537. SequenceNumber prep_seq = *((SequenceNumber*)param);
  3538. if (prep_seq == exp_prepare.load()) { // only for write_thread
  3539. // We need to spawn a thread to avoid deadlock since getting a
  3540. // snpashot might end up calling AdvanceSeqByOne which needs joining
  3541. // the write queue.
  3542. callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
  3543. TEST_SYNC_POINT("callback:end");
  3544. }
  3545. };
  3546. // Wait for the first snapshot be taken in GetSnapshotInternal. Although
  3547. // it might be updated before GetSnapshotInternal finishes but this should
  3548. // cover most of the cases.
  3549. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  3550. {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
  3551. });
  3552. SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
  3553. SyncPoint::GetInstance()->EnableProcessing();
  3554. // Thread to cause frequent evictions
  3555. ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
  3556. // Too many txns might cause commit_seq - prepare_seq in another thread
  3557. // to go beyond DELTA_UPPERBOUND
  3558. for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
  3559. ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("value1")));
  3560. }
  3561. });
  3562. ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
  3563. for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
  3564. Transaction* txn =
  3565. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3566. ASSERT_OK(txn->SetName("xid"));
  3567. std::string val_str = "value" + std::to_string(i);
  3568. for (size_t b = 0; b < sub_batch_cnt; b++) {
  3569. ASSERT_OK(txn->Put(Slice("key2"), val_str));
  3570. }
  3571. ASSERT_OK(txn->Prepare());
  3572. // Let an eviction to kick in
  3573. std::this_thread::yield();
  3574. exp_prepare.store(txn->GetId());
  3575. ASSERT_OK(txn->Commit());
  3576. delete txn;
  3577. // Wait for the snapshot taking that is triggered by
  3578. // RemovePrepared:Start callback
  3579. callback_thread.join();
  3580. // Read with the snapshot taken before delayed_prepared_ cleanup
  3581. ReadOptions roptions;
  3582. roptions.snapshot = snap.load();
  3583. ASSERT_NE(nullptr, roptions.snapshot);
  3584. PinnableSlice value2;
  3585. auto s =
  3586. db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
  3587. ASSERT_OK(s);
  3588. // It should see its own write
  3589. ASSERT_TRUE(val_str == value2);
  3590. // The value read by snapshot should not change
  3591. ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
  3592. db->ReleaseSnapshot(roptions.snapshot);
  3593. snap.store(nullptr);
  3594. }
  3595. });
  3596. write_thread.join();
  3597. eviction_thread.join();
  3598. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3599. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  3600. }
  3601. }
  3602. }
  3603. // Test that updating the commit map will not affect the existing snapshots
  3604. TEST_P(WritePreparedTransactionTest, AtomicCommit) {
  3605. for (bool skip_prepare : {true, false}) {
  3606. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  3607. {"WritePreparedTxnDB::AddCommitted:start",
  3608. "AtomicCommit::GetSnapshot:start"},
  3609. {"AtomicCommit::Get:end",
  3610. "WritePreparedTxnDB::AddCommitted:start:pause"},
  3611. {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
  3612. {"AtomicCommit::Get2:end",
  3613. "WritePreparedTxnDB::AddCommitted:end:pause:"},
  3614. });
  3615. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  3616. ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
  3617. if (skip_prepare) {
  3618. ASSERT_OK(db->Put(WriteOptions(), Slice("key"), Slice("value")));
  3619. } else {
  3620. Transaction* txn =
  3621. db->BeginTransaction(WriteOptions(), TransactionOptions());
  3622. ASSERT_OK(txn->SetName("xid"));
  3623. ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
  3624. ASSERT_OK(txn->Prepare());
  3625. ASSERT_OK(txn->Commit());
  3626. delete txn;
  3627. }
  3628. });
  3629. ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
  3630. ReadOptions roptions;
  3631. TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
  3632. roptions.snapshot = db->GetSnapshot();
  3633. PinnableSlice val;
  3634. auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
  3635. TEST_SYNC_POINT("AtomicCommit::Get:end");
  3636. TEST_SYNC_POINT("AtomicCommit::Get2:start");
  3637. ASSERT_SAME(roptions, db, s, val, "key");
  3638. TEST_SYNC_POINT("AtomicCommit::Get2:end");
  3639. db->ReleaseSnapshot(roptions.snapshot);
  3640. });
  3641. read_thread.join();
  3642. write_thread.join();
  3643. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3644. }
  3645. }
  3646. TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) {
  3647. options.level0_file_num_compaction_trigger = 2;
  3648. // Always use SingleDelete to rollback Put.
  3649. txn_db_options.rollback_deletion_type_callback =
  3650. [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
  3651. const auto write_to_db = [&]() {
  3652. assert(db);
  3653. std::unique_ptr<Transaction> txn0(
  3654. db->BeginTransaction(WriteOptions(), TransactionOptions()));
  3655. ASSERT_OK(txn0->SetName("txn0"));
  3656. ASSERT_OK(txn0->Put("a", "v0"));
  3657. ASSERT_OK(txn0->Prepare());
  3658. // Generate sst1: [PUT('a')]
  3659. ASSERT_OK(db->Flush(FlushOptions()));
  3660. {
  3661. CompactRangeOptions cro;
  3662. cro.change_level = true;
  3663. cro.target_level = options.num_levels - 1;
  3664. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  3665. ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  3666. }
  3667. ASSERT_OK(txn0->Rollback());
  3668. txn0.reset();
  3669. ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
  3670. ASSERT_OK(db->SingleDelete(WriteOptions(), "a"));
  3671. // Generate another SST with a SD to cover the oldest PUT('a')
  3672. ASSERT_OK(db->Flush(FlushOptions()));
  3673. auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
  3674. assert(dbimpl);
  3675. ASSERT_OK(dbimpl->TEST_WaitForCompact());
  3676. {
  3677. CompactRangeOptions cro;
  3678. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  3679. ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
  3680. }
  3681. {
  3682. std::string value;
  3683. const Status s = db->Get(ReadOptions(), "a", &value);
  3684. ASSERT_TRUE(s.IsNotFound());
  3685. }
  3686. };
  3687. // Destroy and reopen
  3688. ASSERT_OK(ReOpen());
  3689. write_to_db();
  3690. }
  3691. // Test that we can change write policy from WriteCommitted to WritePrepared
  3692. // after a clean shutdown (which would empty the WAL)
  3693. TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
  3694. bool empty_wal = true;
  3695. CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
  3696. }
  3697. // Test that we fail fast if WAL is not emptied between changing the write
  3698. // policy from WriteCommitted to WritePrepared
  3699. TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
  3700. bool empty_wal = true;
  3701. CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
  3702. }
  3703. // Test that we can change write policy from WritePrepare back to WriteCommitted
  3704. // after a clean shutdown (which would empty the WAL)
  3705. TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
  3706. bool empty_wal = true;
  3707. CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
  3708. }
  3709. // Test that we fail fast if WAL is not emptied between changing the write
  3710. // policy from WriteCommitted to WritePrepared
  3711. TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
  3712. bool empty_wal = true;
  3713. CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
  3714. }
  3715. } // namespace ROCKSDB_NAMESPACE
  3716. int main(int argc, char** argv) {
  3717. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  3718. ::testing::InitGoogleTest(&argc, argv);
  3719. if (getenv("CIRCLECI") || getenv("GITHUB_ACTIONS")) {
  3720. // Looking for backtrace on "Resource temporarily unavailable" exceptions
  3721. ::testing::FLAGS_gtest_catch_exceptions = false;
  3722. }
  3723. return RUN_ALL_TESTS();
  3724. }