| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <algorithm>
- #include <atomic>
- #include <cinttypes>
- #include <functional>
- #include <string>
- #include <thread>
- #include "db/db_impl/db_impl.h"
- #include "db/dbformat.h"
- #include "port/port.h"
- #include "port/stack_trace.h"
- #include "rocksdb/db.h"
- #include "rocksdb/options.h"
- #include "rocksdb/types.h"
- #include "rocksdb/utilities/debug.h"
- #include "rocksdb/utilities/transaction.h"
- #include "rocksdb/utilities/transaction_db.h"
- #include "table/mock_table.h"
- #include "test_util/sync_point.h"
- #include "test_util/testharness.h"
- #include "test_util/testutil.h"
- #include "test_util/transaction_test_util.h"
- #include "util/mutexlock.h"
- #include "util/random.h"
- #include "util/string_util.h"
- #include "utilities/fault_injection_env.h"
- #include "utilities/merge_operators.h"
- #include "utilities/merge_operators/string_append/stringappend.h"
- #include "utilities/transactions/pessimistic_transaction_db.h"
- #include "utilities/transactions/transaction_test.h"
- #include "utilities/transactions/write_prepared_txn_db.h"
- using std::string;
- namespace ROCKSDB_NAMESPACE {
- using CommitEntry = WritePreparedTxnDB::CommitEntry;
- using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
- using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
- TEST(PreparedHeap, BasicsTest) {
- WritePreparedTxnDB::PreparedHeap heap;
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(14l);
- // Test with one element
- ASSERT_EQ(14l, heap.top());
- heap.push(24l);
- heap.push(34l);
- // Test that old min is still on top
- ASSERT_EQ(14l, heap.top());
- heap.push(44l);
- heap.push(54l);
- heap.push(64l);
- heap.push(74l);
- heap.push(84l);
- }
- // Test that old min is still on top
- ASSERT_EQ(14l, heap.top());
- heap.erase(24l);
- // Test that old min is still on top
- ASSERT_EQ(14l, heap.top());
- heap.erase(14l);
- // Test that the new comes to the top after multiple erase
- ASSERT_EQ(34l, heap.top());
- heap.erase(34l);
- // Test that the new comes to the top after single erase
- ASSERT_EQ(44l, heap.top());
- heap.erase(54l);
- ASSERT_EQ(44l, heap.top());
- heap.pop(); // pop 44l
- // Test that the erased items are ignored after pop
- ASSERT_EQ(64l, heap.top());
- heap.erase(44l);
- // Test that erasing an already popped item would work
- ASSERT_EQ(64l, heap.top());
- heap.erase(84l);
- ASSERT_EQ(64l, heap.top());
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(85l);
- heap.push(86l);
- heap.push(87l);
- heap.push(88l);
- heap.push(89l);
- }
- heap.erase(87l);
- heap.erase(85l);
- heap.erase(89l);
- heap.erase(86l);
- heap.erase(88l);
- // Test top remains the same after a random order of many erases
- ASSERT_EQ(64l, heap.top());
- heap.pop();
- // Test that pop works with a series of random pending erases
- ASSERT_EQ(74l, heap.top());
- ASSERT_FALSE(heap.empty());
- heap.pop();
- // Test that empty works
- ASSERT_TRUE(heap.empty());
- }
- // This is a scenario reconstructed from a buggy trace. Test that the bug does
- // not resurface again.
- TEST(PreparedHeap, EmptyAtTheEnd) {
- WritePreparedTxnDB::PreparedHeap heap;
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(40l);
- }
- ASSERT_EQ(40l, heap.top());
- // Although not a recommended scenario, we must be resilient against erase
- // without a prior push.
- heap.erase(50l);
- ASSERT_EQ(40l, heap.top());
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(60l);
- }
- ASSERT_EQ(40l, heap.top());
- heap.erase(60l);
- ASSERT_EQ(40l, heap.top());
- heap.erase(40l);
- ASSERT_TRUE(heap.empty());
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(40l);
- }
- ASSERT_EQ(40l, heap.top());
- heap.erase(50l);
- ASSERT_EQ(40l, heap.top());
- {
- MutexLock ml(heap.push_pop_mutex());
- heap.push(60l);
- }
- ASSERT_EQ(40l, heap.top());
- heap.erase(40l);
- // Test that the erase has not emptied the heap (we had a bug doing that)
- ASSERT_FALSE(heap.empty());
- ASSERT_EQ(60l, heap.top());
- heap.erase(60l);
- ASSERT_TRUE(heap.empty());
- }
- // Generate random order of PreparedHeap access and test that the heap will be
- // successfully emptied at the end.
- TEST(PreparedHeap, Concurrent) {
- const size_t t_cnt = 10;
- ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
- WritePreparedTxnDB::PreparedHeap heap;
- port::RWMutex prepared_mutex;
- std::atomic<size_t> last;
- for (size_t n = 0; n < 100; n++) {
- last = 0;
- t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
- Random rnd(1103);
- for (size_t seq = 1; seq <= t_cnt; seq++) {
- // This is not recommended usage but we should be resilient against it.
- bool skip_push = rnd.OneIn(5);
- if (!skip_push) {
- MutexLock ml(heap.push_pop_mutex());
- std::this_thread::yield();
- heap.push(seq);
- last.store(seq);
- }
- }
- });
- for (size_t i = 1; i <= t_cnt; i++) {
- t[i] =
- ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
- auto seq = i;
- do {
- std::this_thread::yield();
- } while (last.load() < seq);
- WriteLock wl(&prepared_mutex);
- heap.erase(seq);
- });
- }
- for (size_t i = 0; i <= t_cnt; i++) {
- t[i].join();
- }
- ASSERT_TRUE(heap.empty());
- }
- }
- // Test that WriteBatchWithIndex correctly counts the number of sub-batches
- TEST(WriteBatchWithIndex, SubBatchCnt) {
- ColumnFamilyOptions cf_options;
- std::string cf_name = "two";
- DB* db;
- Options options;
- options.create_if_missing = true;
- const std::string dbname = test::PerThreadDBPath("transaction_testdb");
- EXPECT_OK(DestroyDB(dbname, options));
- ASSERT_OK(DB::Open(options, dbname, &db));
- ColumnFamilyHandle* cf_handle = nullptr;
- ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
- WriteOptions write_options;
- size_t batch_cnt = 1;
- size_t save_points = 0;
- std::vector<size_t> batch_cnt_at;
- WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
- 0);
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- batch_cnt_at.push_back(batch_cnt);
- batch.SetSavePoint();
- save_points++;
- ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- batch_cnt_at.push_back(batch_cnt);
- batch.SetSavePoint();
- save_points++;
- ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- // duplicate the keys
- batch_cnt_at.push_back(batch_cnt);
- batch.SetSavePoint();
- save_points++;
- ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
- batch_cnt++;
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- // duplicate the 2nd key. It should not be counted duplicate since a
- // sub-patch is cut after the last duplicate.
- batch_cnt_at.push_back(batch_cnt);
- batch.SetSavePoint();
- save_points++;
- ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- // duplicate the keys but in a different cf. It should not be counted as
- // duplicate keys
- batch_cnt_at.push_back(batch_cnt);
- batch.SetSavePoint();
- save_points++;
- ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
- ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
- // Test that the number of sub-batches matches what we count with
- // SubBatchCounter
- std::map<uint32_t, const Comparator*> comparators;
- comparators[0] = db->DefaultColumnFamily()->GetComparator();
- comparators[cf_handle->GetID()] = cf_handle->GetComparator();
- SubBatchCounter counter(comparators);
- ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
- ASSERT_EQ(batch_cnt, counter.BatchCount());
- // Test that RollbackToSavePoint will properly resets the number of
- // sub-batches
- for (size_t i = save_points; i > 0; i--) {
- ASSERT_OK(batch.RollbackToSavePoint());
- ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
- }
- // Test the count is right with random batches
- {
- const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
- Random rnd(1131);
- std::string keys[TOTAL_KEYS];
- for (size_t k = 0; k < TOTAL_KEYS; k++) {
- int len = static_cast<int>(rnd.Uniform(50));
- keys[k] = test::RandomKey(&rnd, len);
- }
- for (size_t i = 0; i < 1000; i++) { // 1000 random batches
- WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
- 0, true, 0);
- for (size_t k = 0; k < 10; k++) { // 10 key per batch
- size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
- Slice key = Slice(keys[ki]);
- std::string tmp = rnd.RandomString(16);
- Slice value = Slice(tmp);
- ASSERT_OK(rndbatch.Put(key, value));
- }
- SubBatchCounter batch_counter(comparators);
- ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
- ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
- }
- }
- delete cf_handle;
- delete db;
- }
- TEST(CommitEntry64b, BasicTest) {
- const size_t INDEX_BITS = static_cast<size_t>(21);
- const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
- const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
- // zero-initialized CommitEntry64b should indicate an empty entry
- CommitEntry64b empty_entry64b;
- uint64_t empty_index = 11ul;
- CommitEntry empty_entry;
- bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
- ASSERT_FALSE(ok);
- // the zero entry is reserved for un-initialized entries
- const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
- // Samples over the numbers that are covered by that many index bits
- std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
- // Samples over the numbers that are covered by that many commit bits
- std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
- // Iterate over prepare numbers that have i) cover all bits of a sequence
- // number, and ii) include some bits that fall into the range of index or
- // commit bits
- for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
- for (uint64_t i : is) {
- for (uint64_t d : ds) {
- uint64_t p = base + i + d;
- for (uint64_t c : {p, p + d / 2, p + d}) {
- uint64_t index = p % INDEX_SIZE;
- CommitEntry before(p, c), after;
- CommitEntry64b entry64b(before, FORMAT);
- ok = entry64b.Parse(index, &after, FORMAT);
- ASSERT_TRUE(ok);
- if (!(before == after)) {
- printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
- " c %" PRIu64 " index %" PRIu64 "\n",
- base, i, d, p, c, index);
- }
- ASSERT_EQ(before, after);
- }
- }
- }
- }
- }
- class WritePreparedTxnDBMock : public WritePreparedTxnDB {
- public:
- WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
- : WritePreparedTxnDB(db_impl, opt) {}
- void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
- snapshots_ = snapshots;
- }
- void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
- protected:
- const std::vector<SequenceNumber> GetSnapshotListFromDB(
- SequenceNumber /* unused */) override {
- return snapshots_;
- }
- private:
- std::vector<SequenceNumber> snapshots_;
- };
- class WritePreparedTransactionTestBase : public TransactionTestBase {
- public:
- WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
- TxnDBWritePolicy write_policy,
- WriteOrdering write_ordering,
- bool user_per_key_point_lock_mgr,
- int64_t deadlock_timeout_us)
- : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
- write_ordering, user_per_key_point_lock_mgr,
- deadlock_timeout_us) {}
- protected:
- void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
- size_t commit_cache_bits) {
- txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
- txn_db_options.wp_commit_cache_bits = commit_cache_bits;
- }
- void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
- txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
- }
- // If expect_update is set, check if it actually updated old_commit_map_. If
- // it did not and yet suggested not to check the next snapshot, do the
- // opposite to check if it was not a bad suggestion.
- void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
- uint64_t snapshot,
- uint64_t next_snapshot,
- bool expect_update) {
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- // reset old_commit_map_empty_ so that its value indicate whether
- // old_commit_map_ was updated
- wp_db->old_commit_map_empty_ = true;
- bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
- snapshot < next_snapshot);
- if (expect_update == wp_db->old_commit_map_empty_) {
- printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
- " next: %" PRIu64 "\n",
- prepare, commit, snapshot, next_snapshot);
- }
- EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
- if (!check_next && wp_db->old_commit_map_empty_) {
- // do the opposite to make sure it was not a bad suggestion
- const bool dont_care_bool = true;
- wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
- dont_care_bool);
- if (!wp_db->old_commit_map_empty_) {
- printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
- " next: %" PRIu64 "\n",
- prepare, commit, snapshot, next_snapshot);
- }
- EXPECT_TRUE(wp_db->old_commit_map_empty_);
- }
- }
- // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
- // miss a snapshot because of a concurrent update by UpdateSnapshots that is
- // writing new_snapshots. Both threads are broken at two points. The sync
- // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
- // entry is expected to be vital for one of the snapshots that is common
- // between the old and new list of snapshots.
- void SnapshotConcurrentAccessTestInternal(
- WritePreparedTxnDB* wp_db,
- const std::vector<SequenceNumber>& old_snapshots,
- const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
- SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
- // First reset the snapshot list
- const std::vector<SequenceNumber> empty_snapshots;
- wp_db->old_commit_map_empty_ = true;
- wp_db->UpdateSnapshots(empty_snapshots, ++version);
- // Then initialize it with the old_snapshots
- wp_db->UpdateSnapshots(old_snapshots, ++version);
- // Starting from the first thread, cut each thread at two points
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
- "WritePreparedTxnDB::UpdateSnapshots:s:start"},
- {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
- "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
- {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
- "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
- {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
- "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
- {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
- "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- {
- ASSERT_TRUE(wp_db->old_commit_map_empty_);
- ROCKSDB_NAMESPACE::port::Thread t1(
- [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
- wp_db->CheckAgainstSnapshots(entry);
- t1.join();
- ASSERT_FALSE(wp_db->old_commit_map_empty_);
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- wp_db->old_commit_map_empty_ = true;
- wp_db->UpdateSnapshots(empty_snapshots, ++version);
- wp_db->UpdateSnapshots(old_snapshots, ++version);
- // Starting from the second thread, cut each thread at two points
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
- "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
- {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
- "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
- {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
- "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
- {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
- "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
- {"WritePreparedTxnDB::UpdateSnapshots:p:end",
- "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- {
- ASSERT_TRUE(wp_db->old_commit_map_empty_);
- ROCKSDB_NAMESPACE::port::Thread t1(
- [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
- wp_db->CheckAgainstSnapshots(entry);
- t1.join();
- ASSERT_FALSE(wp_db->old_commit_map_empty_);
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- // Verify value of keys.
- void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
- const Snapshot* snapshot = nullptr) {
- std::string value;
- ReadOptions read_options;
- read_options.snapshot = snapshot;
- for (auto& kv : data) {
- auto s = db->Get(read_options, kv.first, &value);
- ASSERT_TRUE(s.ok() || s.IsNotFound());
- if (s.ok()) {
- if (kv.second != value) {
- printf("key = %s\n", kv.first.c_str());
- }
- ASSERT_EQ(kv.second, value);
- } else {
- ASSERT_EQ(kv.second, "NOT_FOUND");
- }
- // Try with MultiGet API too
- std::vector<std::string> values;
- auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
- {kv.first}, &values);
- ASSERT_EQ(1, values.size());
- ASSERT_EQ(1, s_vec.size());
- s = s_vec[0];
- ASSERT_TRUE(s.ok() || s.IsNotFound());
- if (s.ok()) {
- ASSERT_TRUE(kv.second == values[0]);
- } else {
- ASSERT_EQ(kv.second, "NOT_FOUND");
- }
- }
- }
- // Verify all versions of keys.
- void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
- std::vector<KeyVersion> versions;
- const size_t kMaxKeys = 100000;
- ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
- expected_versions.back().user_key, kMaxKeys,
- &versions));
- ASSERT_EQ(expected_versions.size(), versions.size());
- for (size_t i = 0; i < versions.size(); i++) {
- ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
- ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
- ASSERT_EQ(expected_versions[i].type, versions[i].type);
- if (versions[i].type != kTypeDeletion &&
- versions[i].type != kTypeSingleDeletion) {
- ASSERT_EQ(expected_versions[i].value, versions[i].value);
- }
- // Range delete not supported.
- ASSERT_NE(expected_versions[i].type, kTypeRangeDeletion);
- }
- }
- };
- class WritePreparedTransactionTest
- : public WritePreparedTransactionTestBase,
- virtual public ::testing::WithParamInterface<std::tuple<
- bool, bool, TxnDBWritePolicy, WriteOrdering, bool, int64_t>> {
- public:
- WritePreparedTransactionTest()
- : WritePreparedTransactionTestBase(
- std::get<0>(GetParam()), std::get<1>(GetParam()),
- std::get<2>(GetParam()), std::get<3>(GetParam()),
- std::get<4>(GetParam()), std::get<5>(GetParam())) {}
- };
- #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- class SnapshotConcurrentAccessTest
- : public WritePreparedTransactionTestBase,
- virtual public ::testing::WithParamInterface<
- std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering, size_t,
- size_t, bool, int64_t>> {
- public:
- SnapshotConcurrentAccessTest()
- : WritePreparedTransactionTestBase(
- std::get<0>(GetParam()), std::get<1>(GetParam()),
- std::get<2>(GetParam()), std::get<3>(GetParam()),
- std::get<6>(GetParam()), std::get<7>(GetParam())),
- split_id_(std::get<4>(GetParam())),
- split_cnt_(std::get<5>(GetParam())) {}
- protected:
- // A test is split into split_cnt_ tests, each identified with split_id_ where
- // 0 <= split_id_ < split_cnt_
- size_t split_id_;
- size_t split_cnt_;
- };
- #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- class SeqAdvanceConcurrentTest
- : public WritePreparedTransactionTestBase,
- virtual public ::testing::WithParamInterface<
- std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering, size_t,
- size_t, bool, int64_t>> {
- public:
- SeqAdvanceConcurrentTest()
- : WritePreparedTransactionTestBase(
- std::get<0>(GetParam()), std::get<1>(GetParam()),
- std::get<2>(GetParam()), std::get<3>(GetParam()),
- std::get<6>(GetParam()), std::get<7>(GetParam())),
- split_id_(std::get<4>(GetParam())),
- split_cnt_(std::get<5>(GetParam())) {
- special_env.skip_fsync_ = true;
- };
- protected:
- // A test is split into split_cnt_ tests, each identified with split_id_ where
- // 0 <= split_id_ < split_cnt_
- size_t split_id_;
- size_t split_cnt_;
- };
- constexpr std::array WritePreparedTransactionTest_Params = {
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)};
- INSTANTIATE_TEST_CASE_P(
- WritePreparedTransaction, WritePreparedTransactionTest,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering),
- WritePreparedTransactionTest_Params)));
- #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- constexpr std::array TwoWriteQueue_SnapshotConcurrentAccessTest_Params = {
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)};
- INSTANTIATE_TEST_CASE_P(
- TwoWriteQueuesPointLockManager, SnapshotConcurrentAccessTest,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
- TwoWriteQueue_SnapshotConcurrentAccessTest_Params)));
- constexpr std::array OneWriteQueue_SnapshotConcurrentAccessTest_Params = {
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20),
- };
- INSTANTIATE_TEST_CASE_P(
- OneWriteQueue, SnapshotConcurrentAccessTest,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
- OneWriteQueue_SnapshotConcurrentAccessTest_Params)));
- constexpr std::array TwoWriteQueues_SeqAdvanceConcurrentTest_Params = {
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)};
- INSTANTIATE_TEST_CASE_P(
- TwoWriteQueues, SeqAdvanceConcurrentTest,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
- TwoWriteQueues_SeqAdvanceConcurrentTest_Params)));
- constexpr std::array OneWriteQueue_SeqAdvanceConcurrentTest_Params = {
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)};
- INSTANTIATE_TEST_CASE_P(
- OneWriteQueue, SeqAdvanceConcurrentTest,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t),
- OneWriteQueue_SeqAdvanceConcurrentTest_Params)));
- #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- TEST_P(WritePreparedTransactionTest, CommitMap) {
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ASSERT_NE(wp_db, nullptr);
- ASSERT_NE(wp_db->db_impl_, nullptr);
- size_t size = wp_db->COMMIT_CACHE_SIZE;
- CommitEntry c = {5, 12}, e;
- bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
- ASSERT_FALSE(evicted);
- // Should be able to read the same value
- CommitEntry64b dont_care;
- bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
- ASSERT_TRUE(found);
- ASSERT_EQ(c, e);
- // Should be able to distinguish between overlapping entries
- found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
- ASSERT_TRUE(found);
- ASSERT_NE(c.prep_seq + size, e.prep_seq);
- // Should be able to detect non-existent entry
- found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
- ASSERT_FALSE(found);
- // Reject an invalid exchange
- CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
- CommitEntry64b e2_64b(e2, wp_db->FORMAT);
- bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
- ASSERT_FALSE(exchanged);
- // check whether it did actually reject that
- found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
- ASSERT_TRUE(found);
- ASSERT_EQ(c, e);
- // Accept a valid exchange
- CommitEntry64b c_64b(c, wp_db->FORMAT);
- CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
- exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
- ASSERT_TRUE(exchanged);
- // check whether it did actually accepted that
- found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
- ASSERT_TRUE(found);
- ASSERT_EQ(e3, e);
- // Rewrite an entry
- CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
- evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
- ASSERT_TRUE(evicted);
- ASSERT_EQ(e3, e);
- found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
- ASSERT_TRUE(found);
- ASSERT_EQ(e4, e);
- }
- TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
- // If prepare <= snapshot < commit we should keep the entry around since its
- // nonexistence could be interpreted as committed in the snapshot while it is
- // not true. We keep such entries around by adding them to the
- // old_commit_map_.
- uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
- p = 10l, c = 15l, s = 20l, ns = 21l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- // If we do not expect the old commit map to be updated, try also with a next
- // snapshot that is expected to update the old commit map. This would test
- // that MaybeUpdateOldCommitMap would not prevent us from checking the next
- // snapshot that must be checked.
- p = 10l, c = 15l, s = 20l, ns = 11l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 10l, c = 20l, s = 20l, ns = 19l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 10l, c = 20l, s = 20l, ns = 21l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 20l, c = 20l, s = 20l, ns = 21l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 20l, c = 20l, s = 20l, ns = 19l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 10l, c = 25l, s = 20l, ns = 21l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
- p = 20l, c = 25l, s = 20l, ns = 21l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
- p = 21l, c = 25l, s = 20l, ns = 22l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- p = 21l, c = 25l, s = 20l, ns = 19l;
- MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
- }
- // Trigger the condition where some old memtables are skipped when doing
- // TransactionUtil::CheckKey(), and make sure the result is still correct.
- TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
- const int kAttemptHistoryMemtable = 0;
- const int kAttemptImmMemTable = 1;
- for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
- attempt++) {
- options.max_write_buffer_size_to_maintain =
- 3 * static_cast<int>(options.write_buffer_size);
- ASSERT_OK(ReOpen());
- WriteOptions write_options;
- ReadOptions read_options;
- TransactionOptions txn_options;
- txn_options.set_snapshot = true;
- string value;
- ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
- ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
- Transaction* txn = db->BeginTransaction(write_options, txn_options);
- ASSERT_TRUE(txn != nullptr);
- ASSERT_OK(txn->SetName("txn"));
- Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
- ASSERT_TRUE(txn2 != nullptr);
- ASSERT_OK(txn2->SetName("txn2"));
- // This transaction is created to cause potential conflict.
- Transaction* txn_x = db->BeginTransaction(write_options);
- ASSERT_OK(txn_x->SetName("txn_x"));
- ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
- ASSERT_OK(txn_x->Prepare());
- // Create snapshots after the prepare, but there should still
- // be a conflict when trying to read "foo".
- if (attempt == kAttemptImmMemTable) {
- // For the second attempt, hold flush from beginning. The memtable
- // will be switched to immutable after calling TEST_SwitchMemtable()
- // while CheckKey() is called.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
- "FlushJob::Start"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- }
- // force a memtable flush. The memtable should still be kept
- FlushOptions flush_ops;
- if (attempt == kAttemptHistoryMemtable) {
- ASSERT_OK(db->Flush(flush_ops));
- } else {
- ASSERT_EQ(attempt, kAttemptImmMemTable);
- DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
- ASSERT_OK(db_impl->TEST_SwitchMemtable());
- }
- uint64_t num_imm_mems;
- ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
- &num_imm_mems));
- if (attempt == kAttemptHistoryMemtable) {
- ASSERT_EQ(0, num_imm_mems);
- } else {
- ASSERT_EQ(attempt, kAttemptImmMemTable);
- ASSERT_EQ(1, num_imm_mems);
- }
- // Put something in active memtable
- ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
- // Create txn3 after flushing, but this transaction also needs to
- // check all memtables because of they contains uncommitted data.
- Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
- ASSERT_TRUE(txn3 != nullptr);
- ASSERT_OK(txn3->SetName("txn3"));
- // Commit the pending write
- ASSERT_OK(txn_x->Commit());
- // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
- // pass. In all cases, both memtables are queried.
- SetPerfLevel(PerfLevel::kEnableCount);
- get_perf_context()->Reset();
- ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
- // We should have checked two memtables, active and either immutable
- // or history memtable, depending on the test case.
- ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
- get_perf_context()->Reset();
- ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
- // We should have checked two memtables, active and either immutable
- // or history memtable, depending on the test case.
- ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
- get_perf_context()->Reset();
- ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
- ASSERT_EQ(value, "bar");
- // We should have checked two memtables, and since there is no
- // conflict, another Get() will be made and fetch the data from
- // DB. If it is in immutable memtable, two extra memtable reads
- // will be issued. If it is not (in history), only one will
- // be made, which is to the active memtable.
- if (attempt == kAttemptHistoryMemtable) {
- ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
- } else {
- ASSERT_EQ(attempt, kAttemptImmMemTable);
- ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
- }
- Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
- ASSERT_TRUE(txn4 != nullptr);
- ASSERT_OK(txn4->SetName("txn4"));
- get_perf_context()->Reset();
- ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
- if (attempt == kAttemptHistoryMemtable) {
- // Active memtable will be checked in snapshot validation and when
- // getting the value.
- ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
- } else {
- // Only active memtable will be checked in snapshot validation but
- // both of active and immutable snapshot will be queried when
- // getting the value.
- ASSERT_EQ(attempt, kAttemptImmMemTable);
- ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
- }
- ASSERT_OK(txn2->Commit());
- ASSERT_OK(txn4->Commit());
- TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- SetPerfLevel(PerfLevel::kDisable);
- delete txn;
- delete txn2;
- delete txn3;
- delete txn4;
- delete txn_x;
- }
- }
- // Reproduce the bug with two snapshots with the same seuqence number and test
- // that the release of the first snapshot will not affect the reads by the other
- // snapshot
- TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
- TransactionOptions txn_options;
- Status s;
- // Insert initial value
- ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- Transaction* txn =
- wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Put("key", "value2"));
- ASSERT_OK(txn->Prepare());
- // Three snapshots with the same seq number
- const Snapshot* snapshot0 = wp_db->GetSnapshot();
- const Snapshot* snapshot1 = wp_db->GetSnapshot();
- const Snapshot* snapshot2 = wp_db->GetSnapshot();
- ASSERT_OK(txn->Commit());
- SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
- SequenceNumber overlap_seq = txn->GetId() + cache_size;
- delete txn;
- // 4th snapshot with a larger seq
- const Snapshot* snapshot3 = wp_db->GetSnapshot();
- // Cause an eviction to advance max evicted seq number
- // This also fetches the 4 snapshots from db since their seq is lower than the
- // new max
- wp_db->AddCommitted(overlap_seq, overlap_seq);
- ReadOptions ropt;
- // It should see the value before commit
- ropt.snapshot = snapshot2;
- PinnableSlice pinnable_val;
- s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_OK(s);
- ASSERT_TRUE(pinnable_val == "value1");
- pinnable_val.Reset();
- wp_db->ReleaseSnapshot(snapshot1);
- // It should still see the value before commit
- s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_OK(s);
- ASSERT_TRUE(pinnable_val == "value1");
- pinnable_val.Reset();
- // Cause an eviction to advance max evicted seq number and trigger updating
- // the snapshot list
- overlap_seq += cache_size;
- wp_db->AddCommitted(overlap_seq, overlap_seq);
- // It should still see the value before commit
- s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_OK(s);
- ASSERT_TRUE(pinnable_val == "value1");
- pinnable_val.Reset();
- wp_db->ReleaseSnapshot(snapshot0);
- wp_db->ReleaseSnapshot(snapshot2);
- wp_db->ReleaseSnapshot(snapshot3);
- }
- size_t UniqueCnt(std::vector<SequenceNumber> vec) {
- std::set<SequenceNumber> aset;
- for (auto i : vec) {
- aset.insert(i);
- }
- return aset.size();
- }
- // Test that the entries in old_commit_map_ get garbage collected properly
- TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
- const size_t snapshot_cache_bits = 0;
- const size_t commit_cache_bits = 0;
- DBImpl* mock_db = new DBImpl(options, dbname);
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options));
- SequenceNumber seq = 0;
- // Take the first snapshot that overlaps with two txn
- auto prep_seq = ++seq;
- wp_db->AddPrepared(prep_seq);
- auto prep_seq2 = ++seq;
- wp_db->AddPrepared(prep_seq2);
- auto snap_seq1 = seq;
- wp_db->TakeSnapshot(snap_seq1);
- auto commit_seq = ++seq;
- wp_db->AddCommitted(prep_seq, commit_seq);
- wp_db->RemovePrepared(prep_seq);
- auto commit_seq2 = ++seq;
- wp_db->AddCommitted(prep_seq2, commit_seq2);
- wp_db->RemovePrepared(prep_seq2);
- // Take the 2nd and 3rd snapshot that overlap with the same txn
- prep_seq = ++seq;
- wp_db->AddPrepared(prep_seq);
- auto snap_seq2 = seq;
- wp_db->TakeSnapshot(snap_seq2);
- seq++;
- auto snap_seq3 = seq;
- wp_db->TakeSnapshot(snap_seq3);
- seq++;
- commit_seq = ++seq;
- wp_db->AddCommitted(prep_seq, commit_seq);
- wp_db->RemovePrepared(prep_seq);
- // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
- // only item in the commit_cache_ via another commit.
- prep_seq = ++seq;
- wp_db->AddPrepared(prep_seq);
- commit_seq = ++seq;
- wp_db->AddCommitted(prep_seq, commit_seq);
- wp_db->RemovePrepared(prep_seq);
- // Verify that the evicted commit entries for all snapshots are in the
- // old_commit_map_
- {
- ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
- ReadLock rl(&wp_db->old_commit_map_mutex_);
- ASSERT_EQ(3, wp_db->old_commit_map_.size());
- ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
- ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
- ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
- }
- // Verify that the 2nd snapshot is cleaned up after the release
- wp_db->ReleaseSnapshotInternal(snap_seq2);
- {
- ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
- ReadLock rl(&wp_db->old_commit_map_mutex_);
- ASSERT_EQ(2, wp_db->old_commit_map_.size());
- ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
- ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
- }
- // Verify that the 1st snapshot is cleaned up after the release
- wp_db->ReleaseSnapshotInternal(snap_seq1);
- {
- ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
- ReadLock rl(&wp_db->old_commit_map_mutex_);
- ASSERT_EQ(1, wp_db->old_commit_map_.size());
- ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
- }
- // Verify that the 3rd snapshot is cleaned up after the release
- wp_db->ReleaseSnapshotInternal(snap_seq3);
- {
- ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
- ReadLock rl(&wp_db->old_commit_map_mutex_);
- ASSERT_EQ(0, wp_db->old_commit_map_.size());
- }
- }
- TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
- std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
- 600l, 700l, 800l, 900l};
- const size_t snapshot_cache_bits = 2;
- const uint64_t cache_size = 1ul << snapshot_cache_bits;
- // Safety check to express the intended size in the test. Can be adjusted if
- // the snapshots lists changed.
- ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 1, snapshots.size());
- DBImpl* mock_db = new DBImpl(options, dbname);
- UpdateTransactionDBOptions(snapshot_cache_bits);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options));
- SequenceNumber version = 1000l;
- ASSERT_EQ(0, wp_db->snapshots_total_);
- wp_db->UpdateSnapshots(snapshots, version);
- ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
- // seq numbers are chosen so that we have two of them between each two
- // snapshots. If the diff of two consecutive seq is more than 5, there is a
- // snapshot between them.
- std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
- 355l, 450l, 455l, 550l, 555l, 650l, 655l,
- 750l, 755l, 850l, 855l, 950l, 955l};
- ASSERT_GT(seqs.size(), 1);
- for (size_t i = 0; i + 1 < seqs.size(); i++) {
- wp_db->old_commit_map_empty_ = true; // reset
- CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
- wp_db->CheckAgainstSnapshots(commit_entry);
- // Expect update if there is snapshot in between the prepare and commit
- bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
- commit_entry.commit_seq >= snapshots.front() &&
- commit_entry.prep_seq <= snapshots.back();
- ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
- }
- // Test that search will include multiple snapshot from snapshot cache
- {
- // exclude first and last item in the cache
- CommitEntry commit_entry = {snapshots.front() + 1,
- snapshots[cache_size - 1] - 1};
- wp_db->old_commit_map_empty_ = true; // reset
- wp_db->old_commit_map_.clear();
- wp_db->CheckAgainstSnapshots(commit_entry);
- ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
- }
- // Test that search will include multiple snapshot from old snapshots
- {
- // include two in the middle
- CommitEntry commit_entry = {snapshots[cache_size] + 1,
- snapshots[cache_size + 2] + 1};
- wp_db->old_commit_map_empty_ = true; // reset
- wp_db->old_commit_map_.clear();
- wp_db->CheckAgainstSnapshots(commit_entry);
- ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
- }
- // Test that search will include both snapshot cache and old snapshots
- // Case 1: includes all in snapshot cache
- {
- CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
- wp_db->old_commit_map_empty_ = true; // reset
- wp_db->old_commit_map_.clear();
- wp_db->CheckAgainstSnapshots(commit_entry);
- ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
- }
- // Case 2: includes all snapshot caches except the smallest
- {
- CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
- wp_db->old_commit_map_empty_ = true; // reset
- wp_db->old_commit_map_.clear();
- wp_db->CheckAgainstSnapshots(commit_entry);
- ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
- }
- // Case 3: includes only the largest of snapshot cache
- {
- CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
- snapshots.back() + 1};
- wp_db->old_commit_map_empty_ = true; // reset
- wp_db->old_commit_map_.clear();
- wp_db->CheckAgainstSnapshots(commit_entry);
- ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
- }
- }
- #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
- // parallel with UpdateSnapshots.
- TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
- // We have a sync point in the method under test after checking each snapshot.
- // If you increase the max number of snapshots in this test, more sync points
- // in the methods must also be added.
- const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
- 60l, 70l, 80l, 90l, 100l};
- const size_t snapshot_cache_bits = 2;
- // Safety check to express the intended size in the test. Can be adjusted if
- // the snapshots lists changed.
- ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 2, snapshots.size());
- SequenceNumber version = 1000l;
- // Choose the cache size so that the new snapshot list could replace all the
- // existing items in the cache and also have some overflow.
- DBImpl* mock_db = new DBImpl(options, dbname);
- UpdateTransactionDBOptions(snapshot_cache_bits);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options));
- const size_t extra = 2;
- size_t loop_id = 0;
- // Add up to extra items that do not fit into the cache
- for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
- old_size++) {
- const std::vector<SequenceNumber> old_snapshots(
- snapshots.begin(), snapshots.begin() + old_size);
- // Each member of old snapshot might or might not appear in the new list. We
- // create a common_snapshots for each combination.
- size_t new_comb_cnt = size_t(1) << old_size;
- for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
- if (loop_id % split_cnt_ != split_id_) {
- continue;
- }
- printf("."); // To signal progress
- fflush(stdout);
- std::vector<SequenceNumber> common_snapshots;
- for (size_t i = 0; i < old_snapshots.size(); i++) {
- if (IsInCombination(i, new_comb)) {
- common_snapshots.push_back(old_snapshots[i]);
- }
- }
- // And add some new snapshots to the common list
- for (size_t added_snapshots = 0;
- added_snapshots <= snapshots.size() - old_snapshots.size();
- added_snapshots++) {
- std::vector<SequenceNumber> new_snapshots = common_snapshots;
- for (size_t i = 0; i < added_snapshots; i++) {
- new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
- }
- for (auto it = common_snapshots.begin(); it != common_snapshots.end();
- ++it) {
- auto snapshot = *it;
- // Create a commit entry that is around the snapshot and thus should
- // be not be discarded
- CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
- snapshot + 1};
- // The critical part is when iterating the snapshot cache. Afterwards,
- // we are operating under the lock
- size_t a_range =
- std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
- size_t b_range =
- std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
- // Break each thread at two points
- for (size_t a1 = 1; a1 <= a_range; a1++) {
- for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
- for (size_t b1 = 1; b1 <= b_range; b1++) {
- for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
- SnapshotConcurrentAccessTestInternal(
- wp_db.get(), old_snapshots, new_snapshots, entry, version,
- a1, a2, b1, b2);
- }
- }
- }
- }
- }
- }
- }
- }
- printf("\n");
- }
- #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- // This test clarifies the contract of AdvanceMaxEvictedSeq method
- TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
- DBImpl* mock_db = new DBImpl(options, dbname);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options));
- // 1. Set the initial values for max, prepared, and snapshots
- SequenceNumber zero_max = 0l;
- // Set the initial list of prepared txns
- const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
- 150, 200, 250};
- for (auto p : initial_prepared) {
- wp_db->AddPrepared(p);
- }
- // This updates the max value and also set old prepared
- SequenceNumber init_max = 100;
- wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
- const std::vector<SequenceNumber> initial_snapshots = {20, 40};
- wp_db->SetDBSnapshots(initial_snapshots);
- // This will update the internal cache of snapshots from the DB
- wp_db->UpdateSnapshots(initial_snapshots, init_max);
- // 2. Invoke AdvanceMaxEvictedSeq
- const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
- wp_db->SetDBSnapshots(latest_snapshots);
- SequenceNumber new_max = 200;
- wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
- // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
- // a. max should be updated to new_max
- ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
- // b. delayed prepared should contain every txn <= max and prepared should
- // only contain txns > max
- auto it = initial_prepared.begin();
- for (; it != initial_prepared.end() && *it <= new_max; ++it) {
- ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
- }
- ASSERT_TRUE(wp_db->delayed_prepared_.empty());
- for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
- ++it, wp_db->prepared_txns_.pop()) {
- ASSERT_EQ(*it, wp_db->prepared_txns_.top());
- }
- ASSERT_TRUE(it == initial_prepared.end());
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- // c. snapshots should contain everything below new_max
- auto sit = latest_snapshots.begin();
- for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
- i < wp_db->snapshots_total_;
- sit++, i++) {
- ASSERT_TRUE(i < wp_db->snapshots_total_);
- // This test is in small scale and the list of snapshots are assumed to be
- // within the cache size limit. This is just a safety check to double check
- // that assumption.
- ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
- ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
- }
- }
- // A new snapshot should always be always larger than max_evicted_seq_
- // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
- TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
- WriteOptions woptions;
- TransactionOptions txn_options;
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
- ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
- ASSERT_OK(txn0->Commit());
- const SequenceNumber seq = txn0->GetId(); // is also prepare seq
- delete txn0;
- std::vector<Transaction*> txns;
- // Inc seq without committing anything
- for (int i = 0; i < 10; i++) {
- Transaction* txn = db->BeginTransaction(woptions, txn_options);
- ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
- ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
- ASSERT_OK(txn->Prepare());
- txns.push_back(txn);
- }
- // The new commit is seq + 10
- ASSERT_OK(db->Put(woptions, "key", "value"));
- auto snap = wp_db->GetSnapshot();
- const SequenceNumber last_seq = snap->GetSequenceNumber();
- wp_db->ReleaseSnapshot(snap);
- ASSERT_LT(seq, last_seq);
- // Otherwise our test is not effective
- ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
- // Evict seq out of commit cache
- const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
- // Check that the next write could make max go beyond last
- auto last_max = wp_db->max_evicted_seq_.load();
- wp_db->AddCommitted(overwrite_seq, overwrite_seq);
- // Check that eviction has advanced the max
- ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
- // Check that the new max has not advanced the last seq
- ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
- for (auto txn : txns) {
- ASSERT_OK(txn->Rollback());
- delete txn;
- }
- }
- // A new snapshot should always be always larger than max_evicted_seq_
- // In very rare cases max could be below last published seq. Test that
- // taking snapshot will wait for max to catch up.
- TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WriteOptions woptions;
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- const int writes = 50;
- const int batch_cnt = 4;
- ROCKSDB_NAMESPACE::port::Thread t1([&]() {
- for (int i = 0; i < writes; i++) {
- WriteBatch batch;
- // For duplicate keys cause 4 commit entries, each evicting an entry that
- // is not published yet, thus causing max evicted seq go higher than last
- // published.
- for (int b = 0; b < batch_cnt; b++) {
- ASSERT_OK(batch.Put("foo", "foo"));
- }
- ASSERT_OK(db->Write(woptions, &batch));
- }
- });
- ROCKSDB_NAMESPACE::port::Thread t2([&]() {
- while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
- std::this_thread::yield();
- }
- for (int i = 0; i < 10; i++) {
- SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
- auto snap = db->GetSnapshot();
- if (snap->GetSequenceNumber() != 0) {
- // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
- // compare with the lower bound instead as an approximation.
- ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
- } // seq 0 is ok to be less than max since nothing is visible to it
- db->ReleaseSnapshot(snap);
- }
- });
- t1.join();
- t2.join();
- // Make sure that the test has worked and seq number has advanced as we
- // thought
- auto snap = db->GetSnapshot();
- ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
- db->ReleaseSnapshot(snap);
- }
- // Test that reads without snapshots would not hit an undefined state
- TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WriteOptions woptions;
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- const int writes = 50;
- ROCKSDB_NAMESPACE::port::Thread t1([&]() {
- for (int i = 0; i < writes; i++) {
- WriteBatch batch;
- ASSERT_OK(batch.Put("key", "foo"));
- ASSERT_OK(db->Write(woptions, &batch));
- }
- });
- ROCKSDB_NAMESPACE::port::Thread t2([&]() {
- while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
- std::this_thread::yield();
- }
- ReadOptions ropt;
- PinnableSlice pinnable_val;
- TransactionOptions txn_options;
- for (int i = 0; i < 10; i++) {
- auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_TRUE(s.ok() || s.IsTryAgain());
- pinnable_val.Reset();
- Transaction* txn = db->BeginTransaction(woptions, txn_options);
- s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_TRUE(s.ok() || s.IsTryAgain());
- pinnable_val.Reset();
- std::vector<std::string> values;
- auto s_vec =
- txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
- ASSERT_EQ(1, values.size());
- ASSERT_EQ(1, s_vec.size());
- s = s_vec[0];
- ASSERT_TRUE(s.ok() || s.IsTryAgain());
- Slice key("key");
- txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
- true);
- ASSERT_TRUE(s.ok() || s.IsTryAgain());
- delete txn;
- }
- });
- t1.join();
- t2.join();
- // Make sure that the test has worked and seq number has advanced as we
- // thought
- auto snap = db->GetSnapshot();
- ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
- db->ReleaseSnapshot(snap);
- }
- // Check that old_commit_map_ cleanup works correctly if the snapshot equals
- // max_evicted_seq_.
- TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WriteOptions woptions;
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- // Insert something to increase seq
- ASSERT_OK(db->Put(woptions, "key", "value"));
- auto snap = db->GetSnapshot();
- auto snap_seq = snap->GetSequenceNumber();
- // Another insert should trigger eviction + load snapshot from db
- ASSERT_OK(db->Put(woptions, "key", "value"));
- // This is the scenario that we check agaisnt
- ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
- // old_commit_map_ now has some data that needs gc
- ASSERT_EQ(1, wp_db->snapshots_total_);
- ASSERT_EQ(1, wp_db->old_commit_map_.size());
- db->ReleaseSnapshot(snap);
- // Another insert should trigger eviction + load snapshot from db
- ASSERT_OK(db->Put(woptions, "key", "value"));
- // the snapshot and related metadata must be properly garbage collected
- ASSERT_EQ(0, wp_db->snapshots_total_);
- ASSERT_TRUE(wp_db->snapshots_all_.empty());
- ASSERT_EQ(0, wp_db->old_commit_map_.size());
- }
- TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
- auto snap = db->GetSnapshot();
- auto seq1 = snap->GetSequenceNumber();
- db->ReleaseSnapshot(snap);
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- wp_db->AdvanceSeqByOne();
- snap = db->GetSnapshot();
- auto seq2 = snap->GetSequenceNumber();
- db->ReleaseSnapshot(snap);
- ASSERT_LT(seq1, seq2);
- }
- // Test that the txn Initilize calls the overridden functions
- TEST_P(WritePreparedTransactionTest, TxnInitialize) {
- TransactionOptions txn_options;
- WriteOptions write_options;
- ASSERT_OK(db->Put(write_options, "key", "value"));
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- ASSERT_OK(txn0->SetName("xid"));
- ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
- ASSERT_OK(txn0->Prepare());
- // SetSnapshot is overridden to update min_uncommitted_
- txn_options.set_snapshot = true;
- Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
- auto snap = txn1->GetSnapshot();
- auto snap_impl = static_cast<const SnapshotImpl*>(snap);
- // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
- // udpated
- ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
- ASSERT_OK(txn0->Rollback());
- ASSERT_OK(txn1->Rollback());
- delete txn0;
- delete txn1;
- }
- // This tests that transactions with duplicate keys perform correctly after max
- // is advancing their prepared sequence numbers. This will not be the case if
- // for example the txn does not add the prepared seq for the second sub-batch to
- // the PreparedHeap structure.
- TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 1; // disable commit cache
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- ReadOptions ropt;
- PinnableSlice pinnable_val;
- WriteOptions write_options;
- TransactionOptions txn_options;
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- ASSERT_OK(txn0->SetName("xid"));
- ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
- ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
- ASSERT_OK(txn0->Prepare());
- ASSERT_OK(db->Put(write_options, "key2", "value"));
- // Will cause max advance due to disabled commit cache
- ASSERT_OK(db->Put(write_options, "key3", "value"));
- auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_TRUE(s.IsNotFound());
- delete txn0;
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
- wp_db->TEST_Crash();
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
- ASSERT_TRUE(s.IsNotFound());
- txn0 = db->GetTransactionByName("xid");
- ASSERT_OK(txn0->Rollback());
- delete txn0;
- }
- #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
- // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
- // which moves prepared txns from prepared_txns_ to delayed_prepared_.
- TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 1; // disable commit cache
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ReadOptions ropt;
- PinnableSlice pinnable_val;
- WriteOptions write_options;
- TransactionOptions txn_options;
- std::vector<Transaction*> txns, committed_txns;
- const int cnt = 100;
- for (int i = 0; i < cnt; i++) {
- Transaction* txn = db->BeginTransaction(write_options, txn_options);
- ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
- auto key = "key1" + std::to_string(i);
- auto value = "value1" + std::to_string(i);
- ASSERT_OK(txn->Put(Slice(key), Slice(value)));
- ASSERT_OK(txn->Prepare());
- txns.push_back(txn);
- }
- port::Mutex mutex;
- Random rnd(1103);
- ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
- for (int i = 0; i < cnt; i++) {
- uint32_t index = rnd.Uniform(cnt - i);
- Transaction* txn;
- {
- MutexLock l(&mutex);
- txn = txns[index];
- txns.erase(txns.begin() + index);
- }
- // Since commit cache is practically disabled, commit results in immediate
- // advance in max_evicted_seq_ and subsequently moving some prepared txns
- // to delayed_prepared_.
- ASSERT_OK(txn->Commit());
- committed_txns.push_back(txn);
- }
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- while (true) {
- MutexLock l(&mutex);
- if (txns.empty()) {
- break;
- }
- auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
- ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
- }
- });
- commit_thread.join();
- read_thread.join();
- for (auto txn : committed_txns) {
- delete txn;
- }
- }
- #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
- TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
- // Given the sequential run of txns, with this timeout we should never see a
- // deadlock nor a timeout unless we have a key conflict, which should be
- // almost infeasible.
- txn_db_options.transaction_lock_timeout = 1000;
- txn_db_options.default_lock_timeout = 1000;
- ASSERT_OK(ReOpen());
- FlushOptions fopt;
- // Number of different txn types we use in this test
- const size_t type_cnt = 5;
- // The size of the first write group
- // TODO(myabandeh): This should be increase for pre-release tests
- const size_t first_group_size = 2;
- // Total number of txns we run in each test
- // TODO(myabandeh): This should be increase for pre-release tests
- const size_t txn_cnt = first_group_size + 1;
- size_t base[txn_cnt + 1] = {
- 1,
- };
- for (size_t bi = 1; bi <= txn_cnt; bi++) {
- base[bi] = base[bi - 1] * type_cnt;
- }
- const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
- printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
- for (size_t n = 0; n < max_n; n++) {
- if (n > 0) {
- ASSERT_OK(ReOpen());
- }
- if (n % split_cnt_ != split_id_) {
- continue;
- }
- if (n % 1000 == 0) {
- printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
- }
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- auto seq = db_impl->TEST_GetLastVisibleSequence();
- with_empty_commits = 0;
- exp_seq = seq;
- // This is increased before writing the batch for commit
- commit_writes = 0;
- // This is increased before txn starts linking if it expects to do a commit
- // eventually
- expected_commits = 0;
- std::vector<port::Thread> threads;
- linked.store(0, std::memory_order_release);
- std::atomic<bool> batch_formed(false);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::EnterAsBatchGroupLeader:End",
- [&](void* /*arg*/) { batch_formed = true; });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
- size_t orig_linked = linked.fetch_add(1, std::memory_order_acq_rel);
- if (orig_linked == 0) {
- // Wait until the others are linked too.
- while (linked.load(std::memory_order_acquire) < first_group_size) {
- }
- } else if (orig_linked == first_group_size) {
- // Make the 2nd batch of the rest of writes plus any followup
- // commits from the first batch
- while (linked.load(std::memory_order_acquire) <
- txn_cnt + commit_writes) {
- }
- }
- // Then we will have one or more batches consisting of follow-up
- // commits from the 2nd batch. There is a bit of non-determinism here
- // but it should be tolerable.
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- for (size_t bi = 0; bi < txn_cnt; bi++) {
- // get the bi-th digit in number system based on type_cnt
- size_t d = (n % base[bi + 1]) / base[bi];
- switch (d) {
- case 0:
- threads.emplace_back(&TransactionTestBase::TestTxn0, this, bi);
- break;
- case 1:
- threads.emplace_back(&TransactionTestBase::TestTxn1, this, bi);
- break;
- case 2:
- threads.emplace_back(&TransactionTestBase::TestTxn2, this, bi);
- break;
- case 3:
- threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
- break;
- case 4:
- threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
- break;
- default:
- FAIL();
- }
- // wait to be linked
- while (linked.load(std::memory_order_acquire) <= bi) {
- }
- // after a queue of size first_group_size
- if (bi + 1 == first_group_size) {
- while (!batch_formed) {
- }
- // to make it more deterministic, wait until the commits are linked
- while (linked.load(std::memory_order_acquire) <=
- bi + expected_commits) {
- }
- }
- }
- for (auto& t : threads) {
- t.join();
- }
- if (options.two_write_queues) {
- // In this case none of the above scheduling tricks to deterministically
- // form merged batches works because the writes go to separate queues.
- // This would result in different write groups in each run of the test. We
- // still keep the test since although non-deterministic and hard to debug,
- // it is still useful to have.
- // TODO(myabandeh): Add a deterministic unit test for two_write_queues
- }
- // Check if memtable inserts advanced seq number as expected
- seq = db_impl->TEST_GetLastVisibleSequence();
- ASSERT_EQ(exp_seq, seq);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- // Check if recovery preserves the last sequence number
- ASSERT_OK(db_impl->FlushWAL(true));
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- seq = db_impl->TEST_GetLastVisibleSequence();
- ASSERT_LE(exp_seq, seq + with_empty_commits);
- // Check if flush preserves the last sequence number
- ASSERT_OK(db_impl->Flush(fopt));
- seq = db_impl->GetLatestSequenceNumber();
- ASSERT_LE(exp_seq, seq + with_empty_commits);
- // Check if recovery after flush preserves the last sequence number
- ASSERT_OK(db_impl->FlushWAL(true));
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- seq = db_impl->GetLatestSequenceNumber();
- ASSERT_LE(exp_seq, seq + with_empty_commits);
- }
- }
- // Run a couple of different txns among them some uncommitted. Restart the db at
- // a couple points to check whether the list of uncommitted txns are recovered
- // properly.
- TEST_P(WritePreparedTransactionTest, BasicRecovery) {
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- TestTxn0(0);
- TransactionOptions txn_options;
- WriteOptions write_options;
- size_t index = 1000;
- Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
- auto istr0 = std::to_string(index);
- auto s = txn0->SetName("xid" + istr0);
- ASSERT_OK(s);
- s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
- ASSERT_OK(s);
- s = txn0->Prepare();
- ASSERT_OK(s);
- auto prep_seq_0 = txn0->GetId();
- TestTxn1(0);
- index++;
- Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
- auto istr1 = std::to_string(index);
- s = txn1->SetName("xid" + istr1);
- ASSERT_OK(s);
- s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
- ASSERT_OK(s);
- s = txn1->Prepare();
- ASSERT_OK(s);
- auto prep_seq_1 = txn1->GetId();
- TestTxn2(0);
- ReadOptions ropt;
- PinnableSlice pinnable_val;
- // Check the value is not committed before restart
- s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
- ASSERT_TRUE(s.IsNotFound());
- pinnable_val.Reset();
- delete txn0;
- delete txn1;
- ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
- wp_db->TEST_Crash();
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- // After recovery, all the uncommitted txns (0 and 1) should be inserted into
- // delayed_prepared_
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- ASSERT_FALSE(wp_db->delayed_prepared_empty_);
- ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
- ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
- {
- ReadLock rl(&wp_db->prepared_mutex_);
- ASSERT_EQ(2, wp_db->delayed_prepared_.size());
- ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
- wp_db->delayed_prepared_.end());
- ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
- wp_db->delayed_prepared_.end());
- }
- // Check the value is still not committed after restart
- s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
- ASSERT_TRUE(s.IsNotFound());
- pinnable_val.Reset();
- TestTxn3(0);
- // Test that a recovered txns will be properly marked committed for the next
- // recovery
- txn1 = db->GetTransactionByName("xid" + istr1);
- ASSERT_NE(txn1, nullptr);
- ASSERT_OK(txn1->Commit());
- delete txn1;
- index++;
- Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
- auto istr2 = std::to_string(index);
- s = txn2->SetName("xid" + istr2);
- ASSERT_OK(s);
- s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
- ASSERT_OK(s);
- s = txn2->Prepare();
- ASSERT_OK(s);
- auto prep_seq_2 = txn2->GetId();
- delete txn2;
- ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
- wp_db->TEST_Crash();
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- ASSERT_FALSE(wp_db->delayed_prepared_empty_);
- // 0 and 2 are prepared and 1 is committed
- {
- ReadLock rl(&wp_db->prepared_mutex_);
- ASSERT_EQ(2, wp_db->delayed_prepared_.size());
- const auto& end = wp_db->delayed_prepared_.end();
- ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
- ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
- ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
- }
- ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
- ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
- // Commit all the remaining txns
- txn0 = db->GetTransactionByName("xid" + istr0);
- ASSERT_NE(txn0, nullptr);
- ASSERT_OK(txn0->Commit());
- txn2 = db->GetTransactionByName("xid" + istr2);
- ASSERT_NE(txn2, nullptr);
- ASSERT_OK(txn2->Commit());
- // Check the value is committed after commit
- s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
- ASSERT_TRUE(s.ok());
- ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
- pinnable_val.Reset();
- delete txn0;
- delete txn2;
- ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- ASSERT_TRUE(wp_db->delayed_prepared_empty_);
- // Check the value is still committed after recovery
- s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
- ASSERT_TRUE(s.ok());
- ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
- pinnable_val.Reset();
- }
- // After recovery the commit map is empty while the max is set. The code would
- // go through a different path which requires a separate test. Test that the
- // committed data before the restart is visible to all snapshots.
- TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
- for (bool end_with_prepare : {false, true}) {
- ASSERT_OK(ReOpen());
- WriteOptions woptions;
- ASSERT_OK(db->Put(woptions, "key", "value"));
- ASSERT_OK(db->Put(woptions, "key", "value"));
- ASSERT_OK(db->Put(woptions, "key", "value"));
- SequenceNumber prepare_seq = kMaxSequenceNumber;
- if (end_with_prepare) {
- TransactionOptions txn_options;
- Transaction* txn = db->BeginTransaction(woptions, txn_options);
- ASSERT_OK(txn->SetName("xid0"));
- ASSERT_OK(txn->Prepare());
- prepare_seq = txn->GetId();
- delete txn;
- }
- dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
- auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- ASSERT_OK(db_impl->FlushWAL(true));
- ASSERT_OK(ReOpenNoDelete());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- ASSERT_NE(wp_db, nullptr);
- ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
- // Take a snapshot right after recovery
- const Snapshot* snap = db->GetSnapshot();
- auto snap_seq = snap->GetSequenceNumber();
- ASSERT_GT(snap_seq, 0);
- for (SequenceNumber seq = 0;
- seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
- }
- if (end_with_prepare) {
- ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
- }
- // trivial check
- ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
- db->ReleaseSnapshot(snap);
- ASSERT_OK(db->Put(woptions, "key", "value"));
- // Take a snapshot after some writes
- snap = db->GetSnapshot();
- snap_seq = snap->GetSequenceNumber();
- for (SequenceNumber seq = 0;
- seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
- }
- if (end_with_prepare) {
- ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
- }
- // trivial check
- ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
- db->ReleaseSnapshot(snap);
- }
- }
- // Shows the contract of IsInSnapshot when called on invalid/released snapshots
- TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- WriteOptions woptions;
- ASSERT_OK(db->Put(woptions, "key", "value"));
- // snap seq = 1
- const Snapshot* snap1 = db->GetSnapshot();
- ASSERT_OK(db->Put(woptions, "key", "value"));
- ASSERT_OK(db->Put(woptions, "key", "value"));
- // snap seq = 3
- const Snapshot* snap2 = db->GetSnapshot();
- const SequenceNumber seq = 1;
- // Evict seq out of commit cache
- size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
- wp_db->AddCommitted(overwrite_seq, overwrite_seq);
- SequenceNumber snap_seq;
- uint64_t min_uncommitted = kMinUnCommittedSeq;
- bool released;
- released = false;
- snap_seq = snap1->GetSequenceNumber();
- ASSERT_LE(seq, snap_seq);
- // Valid snapshot lower than max
- ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
- ASSERT_FALSE(released);
- released = false;
- snap_seq = snap1->GetSequenceNumber();
- // Invaid snapshot lower than max
- ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
- ASSERT_TRUE(
- wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
- ASSERT_TRUE(released);
- db->ReleaseSnapshot(snap1);
- released = false;
- // Released snapshot lower than max
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
- // The release does not take affect until the next max advance
- ASSERT_FALSE(released);
- released = false;
- // Invaid snapshot lower than max
- ASSERT_TRUE(
- wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
- ASSERT_TRUE(released);
- // This make the snapshot release to reflect in txn db structures
- wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
- wp_db->max_evicted_seq_ + 1);
- released = false;
- // Released snapshot lower than max
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
- ASSERT_TRUE(released);
- released = false;
- // Invaid snapshot lower than max
- ASSERT_TRUE(
- wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
- ASSERT_TRUE(released);
- snap_seq = snap2->GetSequenceNumber();
- released = false;
- // Unreleased snapshot lower than max
- ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
- ASSERT_FALSE(released);
- db->ReleaseSnapshot(snap2);
- }
- // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
- // snapshot, max_committed_seq_, prepared, and commit entries.
- TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
- WriteOptions wo;
- // Use small commit cache to trigger lots of eviction and fast advance of
- // max_evicted_seq_
- const size_t commit_cache_bits = 3;
- // Same for snapshot cache size
- const size_t snapshot_cache_bits = 2;
- // Take some preliminary snapshots first. This is to stress the data structure
- // that holds the old snapshots as it will be designed to be efficient when
- // only a few snapshots are below the max_evicted_seq_.
- for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
- // Leave some gap between the preliminary snapshots and the final snapshot
- // that we check. This should test for also different overlapping scenarios
- // between the last snapshot and the commits.
- for (int max_gap = 1; max_gap < 10; max_gap++) {
- // Since we do not actually write to db, we mock the seq as it would be
- // increased by the db. The only exception is that we need db seq to
- // advance for our snapshots. for which we apply a dummy put each time we
- // increase our mock of seq.
- uint64_t seq = 0;
- // At each step we prepare a txn and then we commit it in the next txn.
- // This emulates the consecutive transactions that write to the same key
- uint64_t cur_txn = 0;
- // Number of snapshots taken so far
- int num_snapshots = 0;
- // Number of gaps applied so far
- int gap_cnt = 0;
- // The final snapshot that we will inspect
- uint64_t snapshot = 0;
- bool found_committed = false;
- // To stress the data structure that maintain prepared txns, at each cycle
- // we add a new prepare txn. These do not mean to be committed for
- // snapshot inspection.
- std::set<uint64_t> prepared;
- // We keep the list of txns committed before we take the last snapshot.
- // These should be the only seq numbers that will be found in the snapshot
- std::set<uint64_t> committed_before;
- // The set of commit seq numbers to be excluded from IsInSnapshot queries
- std::set<uint64_t> commit_seqs;
- DBImpl* mock_db = new DBImpl(options, dbname);
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- std::unique_ptr<WritePreparedTxnDBMock> wp_db(
- new WritePreparedTxnDBMock(mock_db, txn_db_options));
- // We continue until max advances a bit beyond the snapshot.
- while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
- // do prepare for a transaction
- seq++;
- wp_db->AddPrepared(seq);
- prepared.insert(seq);
- // If cur_txn is not started, do prepare for it.
- if (!cur_txn) {
- seq++;
- cur_txn = seq;
- wp_db->AddPrepared(cur_txn);
- } else { // else commit it
- seq++;
- wp_db->AddCommitted(cur_txn, seq);
- wp_db->RemovePrepared(cur_txn);
- commit_seqs.insert(seq);
- if (!snapshot) {
- committed_before.insert(cur_txn);
- }
- cur_txn = 0;
- }
- if (num_snapshots < max_snapshots - 1) {
- // Take preliminary snapshots
- wp_db->TakeSnapshot(seq);
- num_snapshots++;
- } else if (gap_cnt < max_gap) {
- // Wait for some gap before taking the final snapshot
- gap_cnt++;
- } else if (!snapshot) {
- // Take the final snapshot if it is not already taken
- snapshot = seq;
- wp_db->TakeSnapshot(snapshot);
- num_snapshots++;
- }
- // If the snapshot is taken, verify seq numbers visible to it. We redo
- // it at each cycle to test that the system is still sound when
- // max_evicted_seq_ advances.
- if (snapshot) {
- for (uint64_t s = 1;
- s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
- bool was_committed =
- (committed_before.find(s) != committed_before.end());
- bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
- if (was_committed != is_in_snapshot) {
- printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
- " snapshot %" PRIu64
- " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
- max_snapshots, max_gap, seq,
- wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
- num_snapshots, s);
- }
- ASSERT_EQ(was_committed, is_in_snapshot);
- found_committed = found_committed || is_in_snapshot;
- }
- }
- }
- // Safety check to make sure the test actually ran
- ASSERT_TRUE(found_committed);
- // As an extra check, check if prepared set will be properly empty after
- // they are committed.
- if (cur_txn) {
- wp_db->AddCommitted(cur_txn, seq);
- wp_db->RemovePrepared(cur_txn);
- }
- for (auto p : prepared) {
- wp_db->AddCommitted(p, seq);
- wp_db->RemovePrepared(p);
- }
- ASSERT_TRUE(wp_db->delayed_prepared_.empty());
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- }
- }
- }
- void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
- PinnableSlice& exp_v, Slice key) {
- Status s;
- PinnableSlice v;
- s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
- ASSERT_EQ(exp_s, s);
- ASSERT_TRUE(s.ok() || s.IsNotFound());
- if (s.ok()) {
- ASSERT_TRUE(exp_v == v);
- }
- // Try with MultiGet API too
- std::vector<std::string> values;
- auto s_vec =
- db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
- ASSERT_EQ(1, values.size());
- ASSERT_EQ(1, s_vec.size());
- s = s_vec[0];
- ASSERT_EQ(exp_s, s);
- ASSERT_TRUE(s.ok() || s.IsNotFound());
- if (s.ok()) {
- ASSERT_TRUE(exp_v == values[0]);
- }
- }
- void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
- Slice key) {
- ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
- }
- TEST_P(WritePreparedTransactionTest, Rollback) {
- ReadOptions roptions;
- WriteOptions woptions;
- TransactionOptions txn_options;
- const size_t num_keys = 4;
- const size_t num_values = 5;
- for (size_t ikey = 1; ikey <= num_keys; ikey++) {
- for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
- for (bool crash : {false, true}) {
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- std::string key_str = "key" + std::to_string(ikey);
- switch (ivalue) {
- case 0:
- break;
- case 1:
- ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
- break;
- case 2:
- ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
- break;
- case 3:
- ASSERT_OK(db->Delete(woptions, key_str));
- break;
- case 4:
- ASSERT_OK(db->SingleDelete(woptions, key_str));
- break;
- default:
- FAIL();
- }
- PinnableSlice v1;
- auto s1 =
- db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
- PinnableSlice v2;
- auto s2 =
- db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
- PinnableSlice v3;
- auto s3 =
- db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
- PinnableSlice v4;
- auto s4 =
- db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
- Transaction* txn = db->BeginTransaction(woptions, txn_options);
- auto s = txn->SetName("xid0");
- ASSERT_OK(s);
- s = txn->Put(Slice("key1"), Slice("value1"));
- ASSERT_OK(s);
- s = txn->Merge(Slice("key2"), Slice("value2"));
- ASSERT_OK(s);
- s = txn->Delete(Slice("key3"));
- ASSERT_OK(s);
- s = txn->SingleDelete(Slice("key4"));
- ASSERT_OK(s);
- s = txn->Prepare();
- ASSERT_OK(s);
- {
- ReadLock rl(&wp_db->prepared_mutex_);
- ASSERT_FALSE(wp_db->prepared_txns_.empty());
- ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
- }
- ASSERT_SAME(db, s1, v1, "key1");
- ASSERT_SAME(db, s2, v2, "key2");
- ASSERT_SAME(db, s3, v3, "key3");
- ASSERT_SAME(db, s4, v4, "key4");
- if (crash) {
- delete txn;
- auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- ASSERT_OK(db_impl->FlushWAL(true));
- dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
- ASSERT_OK(ReOpenNoDelete());
- ASSERT_NE(db, nullptr);
- wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- txn = db->GetTransactionByName("xid0");
- ASSERT_FALSE(wp_db->delayed_prepared_empty_);
- ReadLock rl(&wp_db->prepared_mutex_);
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- ASSERT_FALSE(wp_db->delayed_prepared_.empty());
- ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
- wp_db->delayed_prepared_.end());
- }
- ASSERT_SAME(db, s1, v1, "key1");
- ASSERT_SAME(db, s2, v2, "key2");
- ASSERT_SAME(db, s3, v3, "key3");
- ASSERT_SAME(db, s4, v4, "key4");
- s = txn->Rollback();
- ASSERT_OK(s);
- {
- ASSERT_TRUE(wp_db->delayed_prepared_empty_);
- ReadLock rl(&wp_db->prepared_mutex_);
- ASSERT_TRUE(wp_db->prepared_txns_.empty());
- ASSERT_TRUE(wp_db->delayed_prepared_.empty());
- }
- ASSERT_SAME(db, s1, v1, "key1");
- ASSERT_SAME(db, s2, v2, "key2");
- ASSERT_SAME(db, s3, v3, "key3");
- ASSERT_SAME(db, s4, v4, "key4");
- delete txn;
- }
- }
- }
- }
- TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
- // Use large buffer to avoid memtable flush after 1024 insertions
- options.write_buffer_size = 1024 * 1024;
- ASSERT_OK(ReOpen());
- std::vector<KeyVersion> versions;
- uint64_t seq = 0;
- for (uint64_t i = 1; i <= 1024; i++) {
- std::string v = "bar" + std::to_string(i);
- ASSERT_OK(db->Put(WriteOptions(), "foo", v));
- VerifyKeys({{"foo", v}});
- seq++; // one for the key/value
- KeyVersion kv = {"foo", v, seq, kTypeValue};
- if (options.two_write_queues) {
- seq++; // one for the commit
- }
- versions.emplace_back(kv);
- }
- std::reverse(std::begin(versions), std::end(versions));
- VerifyInternalKeys(versions);
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- ASSERT_OK(db_impl->FlushWAL(true));
- // Use small buffer to ensure memtable flush during recovery
- options.write_buffer_size = 1024;
- ASSERT_OK(ReOpenNoDelete());
- VerifyInternalKeys(versions);
- }
- TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
- ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
- VerifyKeys({{"foo", "bar"}});
- const Snapshot* snapshot = db->GetSnapshot();
- ASSERT_OK(db->Flush(FlushOptions()));
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // Compaction will output keys with sequence number 0, if it is visible to
- // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
- // visible to any snapshot.
- VerifyKeys({{"foo", "bar"}});
- VerifyKeys({{"foo", "bar"}}, snapshot);
- VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
- db->ReleaseSnapshot(snapshot);
- }
- // Compaction should not remove a key if it is not committed, and should
- // proceed with older versions of the key as-if the new version doesn't exist.
- TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- // Snapshots to avoid keys get evicted.
- std::vector<const Snapshot*> snapshots;
- // Keep track of expected sequence number.
- SequenceNumber expected_seq = 0;
- auto add_key = [&](std::function<Status()> func) {
- ASSERT_OK(func());
- expected_seq++;
- if (options.two_write_queues) {
- expected_seq++; // 1 for commit
- }
- ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
- snapshots.push_back(db->GetSnapshot());
- };
- // Each key here represent a standalone test case.
- add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
- add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
- add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
- add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
- add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
- add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
- add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
- add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
- ASSERT_OK(db->Flush(FlushOptions()));
- add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
- add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
- auto* transaction = db->BeginTransaction(WriteOptions());
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Put("key1", "value1_2"));
- ASSERT_OK(transaction->Delete("key2"));
- ASSERT_OK(transaction->SingleDelete("key3"));
- ASSERT_OK(transaction->Merge("key4", "value4_2"));
- ASSERT_OK(transaction->Merge("key5", "value5_3"));
- ASSERT_OK(transaction->Put("key6", "value6_2"));
- ASSERT_OK(transaction->Put("key7", "value7_2"));
- // Prepare but not commit.
- ASSERT_OK(transaction->Prepare());
- ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
- ASSERT_OK(db->Flush(FlushOptions()));
- for (auto* s : snapshots) {
- db->ReleaseSnapshot(s);
- }
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyKeys({
- {"key1", "value1_1"},
- {"key2", "value2_1"},
- {"key3", "value3_1"},
- {"key4", "value4_1"},
- {"key5", "value5_1,value5_2"},
- {"key6", "NOT_FOUND"},
- {"key7", "NOT_FOUND"},
- });
- VerifyInternalKeys({
- {"key1", "value1_2", expected_seq, kTypeValue},
- {"key1", "value1_1", 0, kTypeValue},
- {"key2", "", expected_seq, kTypeDeletion},
- {"key2", "value2_1", 0, kTypeValue},
- {"key3", "", expected_seq, kTypeSingleDeletion},
- {"key3", "value3_1", 0, kTypeValue},
- {"key4", "value4_2", expected_seq, kTypeMerge},
- {"key4", "value4_1", 0, kTypeValue},
- {"key5", "value5_3", expected_seq, kTypeMerge},
- {"key5", "value5_1,value5_2", 0, kTypeValue},
- {"key6", "value6_2", expected_seq, kTypeValue},
- {"key7", "value7_2", expected_seq, kTypeValue},
- });
- ASSERT_OK(transaction->Commit());
- VerifyKeys({
- {"key1", "value1_2"},
- {"key2", "NOT_FOUND"},
- {"key3", "NOT_FOUND"},
- {"key4", "value4_1,value4_2"},
- {"key5", "value5_1,value5_2,value5_3"},
- {"key6", "value6_2"},
- {"key7", "value7_2"},
- });
- delete transaction;
- }
- // Compaction should keep keys visible to a snapshot based on commit sequence,
- // not just prepare sequence.
- TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Keep track of expected sequence number.
- SequenceNumber expected_seq = 0;
- auto* txn1 = db->BeginTransaction(WriteOptions());
- ASSERT_OK(txn1->SetName("txn1"));
- ASSERT_OK(txn1->Put("key1", "value1_1"));
- ASSERT_OK(txn1->Prepare());
- ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
- ASSERT_OK(txn1->Commit());
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
- delete txn1;
- // Take a snapshots to avoid keys get evicted before compaction.
- const Snapshot* snapshot1 = db->GetSnapshot();
- auto* txn2 = db->BeginTransaction(WriteOptions());
- ASSERT_OK(txn2->SetName("txn2"));
- ASSERT_OK(txn2->Put("key2", "value2_1"));
- ASSERT_OK(txn2->Prepare());
- ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
- // txn1 commit before snapshot2 and it is visible to snapshot2.
- // txn2 commit after snapshot2 and it is not visible.
- const Snapshot* snapshot2 = db->GetSnapshot();
- ASSERT_OK(txn2->Commit());
- ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
- delete txn2;
- // Take a snapshots to avoid keys get evicted before compaction.
- const Snapshot* snapshot3 = db->GetSnapshot();
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
- expected_seq++; // 1 for write
- SequenceNumber seq1 = expected_seq;
- if (options.two_write_queues) {
- expected_seq++; // 1 for commit
- }
- ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
- ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
- expected_seq++; // 1 for write
- SequenceNumber seq2 = expected_seq;
- if (options.two_write_queues) {
- expected_seq++; // 1 for commit
- }
- ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
- ASSERT_OK(db->Flush(FlushOptions()));
- db->ReleaseSnapshot(snapshot1);
- db->ReleaseSnapshot(snapshot3);
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
- VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
- VerifyInternalKeys({
- {"key1", "value1_2", seq1, kTypeValue},
- // "value1_1" is visible to snapshot2. Also keys at bottom level visible
- // to earliest snapshot will output with seq = 0.
- {"key1", "value1_1", 0, kTypeValue},
- {"key2", "value2_2", seq2, kTypeValue},
- });
- db->ReleaseSnapshot(snapshot2);
- }
- TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // disable commit cache
- for (bool has_recent_prepare : {true, false}) {
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
- auto* transaction =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Delete("key1"));
- ASSERT_OK(transaction->Prepare());
- // snapshot1 should get min_uncommitted from prepared_txns_ heap.
- auto snapshot1 = db->GetSnapshot();
- ASSERT_EQ(transaction->GetId(),
- ((SnapshotImpl*)snapshot1)->min_uncommitted_);
- // Add a commit to advance max_evicted_seq and move the prepared transaction
- // into delayed_prepared_ set.
- ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
- Transaction* txn2 = nullptr;
- if (has_recent_prepare) {
- txn2 =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(txn2->SetName("txn2"));
- ASSERT_OK(txn2->Put("key3", "value3"));
- ASSERT_OK(txn2->Prepare());
- }
- // snapshot2 should get min_uncommitted from delayed_prepared_ set.
- auto snapshot2 = db->GetSnapshot();
- ASSERT_EQ(transaction->GetId(),
- ((SnapshotImpl*)snapshot1)->min_uncommitted_);
- ASSERT_OK(transaction->Commit());
- delete transaction;
- if (has_recent_prepare) {
- ASSERT_OK(txn2->Commit());
- delete txn2;
- }
- VerifyKeys({{"key1", "NOT_FOUND"}});
- VerifyKeys({{"key1", "value1"}}, snapshot1);
- VerifyKeys({{"key1", "value1"}}, snapshot2);
- db->ReleaseSnapshot(snapshot1);
- db->ReleaseSnapshot(snapshot2);
- }
- }
- // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
- // take two snapshots, s1 and s2. Release s1 during compaction.
- // Test to make sure compaction doesn't get confused and think s1 can see both
- // values, and thus compact out the older value by mistake.
- TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // minimum commit cache
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
- auto* transaction =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Put("key1", "value1_2"));
- ASSERT_OK(transaction->Prepare());
- auto snapshot1 = db->GetSnapshot();
- // Increment sequence number.
- ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
- auto snapshot2 = db->GetSnapshot();
- ASSERT_OK(transaction->Commit());
- delete transaction;
- VerifyKeys({{"key1", "value1_2"}});
- VerifyKeys({{"key1", "value1_1"}}, snapshot1);
- VerifyKeys({{"key1", "value1_1"}}, snapshot2);
- // Add a flush to avoid compaction to fallback to trivial move.
- // The callback might be called twice, record the calling state to
- // prevent double calling.
- bool callback_finished = false;
- auto callback = [&](void*) {
- if (callback_finished) {
- return;
- }
- // Release snapshot1 after CompactionIterator init.
- // CompactionIterator need to figure out the earliest snapshot
- // that can see key1:value1_2 is kMaxSequenceNumber, not
- // snapshot1 or snapshot2.
- db->ReleaseSnapshot(snapshot1);
- // Add some keys to advance max_evicted_seq.
- ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
- ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
- callback_finished = true;
- };
- SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
- callback);
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- VerifyKeys({{"key1", "value1_2"}});
- VerifyKeys({{"key1", "value1_1"}}, snapshot2);
- db->ReleaseSnapshot(snapshot2);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
- // after committing v2. Release s1 during compaction, right after compaction
- // processes v2 and before processes v1. Test to make sure compaction doesn't
- // get confused and believe v1 and v2 are visible to different snapshot
- // (v1 by s2, v2 by s1) and refuse to compact out v1.
- TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // minimum commit cache
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
- SequenceNumber v2_seq = db->GetLatestSequenceNumber();
- auto* s1 = db->GetSnapshot();
- // Advance sequence number.
- ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
- auto* s2 = db->GetSnapshot();
- int count_value = 0;
- auto callback = [&](void* arg) {
- auto* ikey = static_cast<ParsedInternalKey*>(arg);
- if (ikey->user_key == "key1") {
- count_value++;
- if (count_value == 2) {
- // Processing v1.
- db->ReleaseSnapshot(s1);
- // Add some keys to advance max_evicted_seq and update
- // old_commit_map.
- ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
- }
- }
- };
- SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
- callback);
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- // value1 should be compact out.
- VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
- // cleanup
- db->ReleaseSnapshot(s2);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // Insert two values, v1 and v2, for a key. Insert another dummy key
- // so to evict the commit cache for v2, while v1 is still in commit cache.
- // Take two snapshots, s1 and s2. Release s1 during compaction.
- // Since commit cache for v2 is evicted, and old_commit_map don't have
- // s1 (it is released),
- // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
- // (and not v1's)? Instead of putting a dummy, we can directly call
- // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
- TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 1; // commit cache size = 2
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
- // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
- auto add_dummy = [&]() {
- auto* txn_dummy =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(txn_dummy->SetName("txn_dummy"));
- ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
- ASSERT_OK(txn_dummy->Prepare());
- ASSERT_OK(txn_dummy->Commit());
- delete txn_dummy;
- };
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
- auto* txn =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Put("key1", "value2"));
- ASSERT_OK(txn->Prepare());
- // TODO(myabandeh): replace it with GetId()?
- auto v2_seq = db->GetLatestSequenceNumber();
- ASSERT_OK(txn->Commit());
- delete txn;
- auto* s1 = db->GetSnapshot();
- // Dummy key to advance sequence number.
- add_dummy();
- auto* s2 = db->GetSnapshot();
- // The callback might be called twice, record the calling state to
- // prevent double calling.
- bool callback_finished = false;
- auto callback = [&](void*) {
- if (callback_finished) {
- return;
- }
- db->ReleaseSnapshot(s1);
- // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
- add_dummy();
- add_dummy();
- callback_finished = true;
- };
- SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
- callback);
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- // value1 should be compact out.
- VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
- db->ReleaseSnapshot(s2);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 0; // minimum commit cache
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
- SequenceNumber put_seq = db->GetLatestSequenceNumber();
- auto* transaction =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Delete("key1"));
- ASSERT_OK(transaction->Prepare());
- SequenceNumber del_seq = db->GetLatestSequenceNumber();
- auto snapshot1 = db->GetSnapshot();
- // Increment sequence number.
- ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
- auto snapshot2 = db->GetSnapshot();
- ASSERT_OK(transaction->Commit());
- delete transaction;
- VerifyKeys({{"key1", "NOT_FOUND"}});
- VerifyKeys({{"key1", "value1"}}, snapshot1);
- VerifyKeys({{"key1", "value1"}}, snapshot2);
- ASSERT_OK(db->Flush(FlushOptions()));
- auto callback = [&](void* compaction) {
- // Release snapshot1 after CompactionIterator init.
- // CompactionIterator need to double check and find out snapshot2 is now
- // the earliest existing snapshot.
- if (compaction != nullptr) {
- db->ReleaseSnapshot(snapshot1);
- // Add some keys to advance max_evicted_seq.
- ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
- ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
- }
- };
- SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
- callback);
- SyncPoint::GetInstance()->EnableProcessing();
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // Only verify for key1. Both the put and delete for the key should be kept.
- // Since the delete tombstone is not visible to snapshot2, we need to keep
- // at least one version of the key, for write-conflict check.
- VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
- {"key1", "value1", put_seq, kTypeValue}});
- db->ReleaseSnapshot(snapshot2);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest,
- ReleaseEarliestSnapshotDuringCompaction_WithSD) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
- ASSERT_OK(db->Flush(FlushOptions()));
- auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr);
- ASSERT_OK(txn->SingleDelete("key"));
- ASSERT_OK(txn->Put("wow", "value"));
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Prepare());
- ASSERT_OK(db->Flush(FlushOptions()));
- const bool two_write_queues = std::get<1>(GetParam());
- if (two_write_queues) {
- // In the case of two queues, commit another txn just to bump
- // last_published_seq so that a subsequent GetSnapshot() call can return
- // a snapshot with higher sequence.
- auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr);
- ASSERT_OK(dummy_txn->Put("haha", "value"));
- ASSERT_OK(dummy_txn->Commit());
- delete dummy_txn;
- }
- auto* snapshot = db->GetSnapshot();
- ASSERT_OK(txn->Commit());
- delete txn;
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
- if (!arg) {
- return;
- }
- db->ReleaseSnapshot(snapshot);
- // Advance max_evicted_seq
- ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest,
- ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
- ASSERT_OK(db->Flush(FlushOptions()));
- auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr);
- ASSERT_OK(txn->Put("bar", "value"));
- ASSERT_OK(txn->SingleDelete("key"));
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Prepare());
- ASSERT_OK(db->Flush(FlushOptions()));
- ASSERT_OK(txn->Commit());
- delete txn;
- ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
- // Create a dummy transaction to take a snapshot for ww-conflict detection.
- TransactionOptions txn_opts;
- txn_opts.set_snapshot = true;
- auto* dummy_txn =
- db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
- ASSERT_OK(dummy_txn->Rollback());
- delete dummy_txn;
- ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
- auto* snapshot = db->GetSnapshot();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- db->ReleaseSnapshot(snapshot);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest,
- ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
- ASSERT_OK(db->Flush(FlushOptions()));
- auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr);
- ASSERT_OK(txn->Delete("b"));
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Prepare());
- const bool two_write_queues = std::get<1>(GetParam());
- if (two_write_queues) {
- // In the case of two queues, commit another txn just to bump
- // last_published_seq so that a subsequent GetSnapshot() call can return
- // a snapshot with higher sequence.
- auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr);
- ASSERT_OK(dummy_txn->Put("haha", "value"));
- ASSERT_OK(dummy_txn->Commit());
- delete dummy_txn;
- }
- auto* snapshot1 = db->GetSnapshot();
- ASSERT_OK(txn->Commit());
- delete txn;
- auto* snapshot2 = db->GetSnapshot();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
- if (!arg) {
- return;
- }
- db->ReleaseSnapshot(snapshot1);
- // Advance max_evicted_seq
- ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- db->ReleaseSnapshot(snapshot2);
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest,
- ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Create a dummy transaction to take a snapshot for ww-conflict detection.
- TransactionOptions txn_opts;
- txn_opts.set_snapshot = true;
- auto* dummy_txn =
- db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
- // Increment seq
- ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
- ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
- auto* snapshot1 = db->GetSnapshot();
- // Increment seq
- ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
- auto* snapshot2 = db->GetSnapshot();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
- db->ReleaseSnapshot(snapshot1);
- ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- db->ReleaseSnapshot(snapshot2);
- ASSERT_OK(dummy_txn->Commit());
- delete dummy_txn;
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest,
- ReleaseEarliestWriteConflictSnapshot_SingleDelete) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
- ASSERT_OK(db->Flush(FlushOptions()));
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- }
- std::unique_ptr<Transaction> txn;
- txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions(),
- /*old_txn=*/nullptr));
- ASSERT_OK(txn->SetName("txn1"));
- ASSERT_OK(txn->SingleDelete("b"));
- ASSERT_OK(txn->Prepare());
- ASSERT_OK(txn->Commit());
- auto* snapshot1 = db->GetSnapshot();
- // Bump seq of the db by performing writes so that
- // earliest_snapshot_ < earliest_write_conflict_snapshot_ in
- // CompactionIterator.
- ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
- // Create another snapshot for write conflict checking
- std::unique_ptr<Transaction> txn2;
- {
- TransactionOptions txn_opts;
- txn_opts.set_snapshot = true;
- txn2.reset(
- db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr));
- }
- // Bump seq so that the subsequent bg flush won't create a snapshot with the
- // same seq as the previous snapshot for conflict checking.
- ASSERT_OK(db->Put(WriteOptions(), "y", "dont"));
- ASSERT_OK(db->Flush(FlushOptions()));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* /*arg*/) {
- // Rolling back txn2 should release its snapshot(for ww checking).
- ASSERT_OK(txn2->Rollback());
- txn2.reset();
- // Advance max_evicted_seq
- ASSERT_OK(db->Put(WriteOptions(), "x", "value"));
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- db->ReleaseSnapshot(snapshot1);
- }
- TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
- ASSERT_OK(db->Flush(FlushOptions()));
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- }
- ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
- // Take a snapshot so that the SD won't be dropped during flush.
- auto* tmp_snapshot = db->GetSnapshot();
- ASSERT_OK(db->Put(WriteOptions(), "b", "value2"));
- auto* snapshot = db->GetSnapshot();
- ASSERT_OK(db->Flush(FlushOptions()));
- db->ReleaseSnapshot(tmp_snapshot);
- // Bump the sequence so that the below bg compaction job's snapshot will be
- // different from snapshot's sequence.
- ASSERT_OK(db->Put(WriteOptions(), "z", "foo"));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
- const auto* const ikey = static_cast<const ParsedInternalKey*>(arg);
- assert(ikey);
- if (ikey->user_key == "b") {
- assert(ikey->type == kTypeValue);
- db->ReleaseSnapshot(snapshot);
- // Bump max_evicted_seq.
- ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Generate an L0 with only SD for one key "b".
- ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
- ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
- // Take a snapshot so that subsequent flush outputs the SD for "b".
- auto* tmp_snapshot = db->GetSnapshot();
- ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
- ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) {
- if (!arg) {
- db->ReleaseSnapshot(tmp_snapshot);
- // Bump max_evicted_seq
- ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- // Finish generating L0 with only SD for "b".
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // Move the L0 to L2.
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- }
- ASSERT_OK(db->Put(WriteOptions(), "b", "value1"));
- auto* snapshot = db->GetSnapshot();
- // Bump seq so that a subsequent flush/compaction job's snapshot is larger
- // than the above snapshot's seq.
- ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
- // Generate a second L0.
- ASSERT_OK(db->Flush(FlushOptions()));
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
- const auto* const ikey = static_cast<const ParsedInternalKey*>(arg);
- assert(ikey);
- if (ikey->user_key == "b") {
- assert(ikey->type == kTypeValue);
- db->ReleaseSnapshot(snapshot);
- // Bump max_evicted_seq.
- ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
- /*end=*/nullptr));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // Although the user-contract indicates that a SD can only be issued for a key
- // that exists and has not been overwritten, it is still possible for a Delete
- // to be present when write-prepared transaction is rolled back.
- TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) {
- constexpr size_t kSnapshotCacheBits = 7; // same as default
- constexpr size_t kCommitCacheBits = 0; // minimum commit cache
- txn_db_options.rollback_deletion_type_callback =
- [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
- UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Get a write conflict snapshot by creating a transaction with
- // set_snapshot=true.
- TransactionOptions txn_opts;
- txn_opts.set_snapshot = true;
- std::unique_ptr<Transaction> dummy_txn(
- db->BeginTransaction(WriteOptions(), txn_opts));
- std::unique_ptr<Transaction> txn0(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn0->Put("foo", "value"));
- ASSERT_OK(txn0->SetName("xid0"));
- ASSERT_OK(txn0->Prepare());
- // Create an SST with only {"foo": "value"}.
- ASSERT_OK(db->Flush(FlushOptions()));
- // Insert a Delete to cancel out the prior Put by txn0.
- ASSERT_OK(txn0->Rollback());
- txn0.reset();
- // Create a second SST.
- ASSERT_OK(db->Flush(FlushOptions()));
- ASSERT_OK(db->Put(WriteOptions(), "foo", "value1"));
- auto* snapshot = db->GetSnapshot();
- ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
- int count = 0;
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
- const auto* const c = static_cast<const Compaction*>(arg);
- assert(!c);
- // Trigger once only for SingleDelete during flush.
- if (0 == count) {
- ++count;
- db->ReleaseSnapshot(snapshot);
- // Bump max_evicted_seq
- ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // Create a third SST containing a SD without its matching PUT.
- ASSERT_OK(db->Flush(FlushOptions()));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->EnableProcessing();
- DBImpl* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
- assert(dbimpl);
- ASSERT_OK(dbimpl->TEST_CompactRange(
- /*level=*/0, /*begin=*/nullptr, /*end=*/nullptr,
- /*column_family=*/nullptr, /*disallow_trivial_mode=*/true));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // Release the conflict-checking snapshot.
- ASSERT_OK(dummy_txn->Rollback());
- }
- // A more complex test to verify compaction/flush should keep keys visible
- // to snapshots.
- TEST_P(WritePreparedTransactionTest,
- CompactionKeepSnapshotVisibleKeysRandomized) {
- constexpr size_t kNumTransactions = 10;
- constexpr size_t kNumIterations = 1000;
- std::vector<Transaction*> transactions(kNumTransactions, nullptr);
- std::vector<size_t> versions(kNumTransactions, 0);
- std::unordered_map<std::string, std::string> current_data;
- std::vector<const Snapshot*> snapshots;
- std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
- Random rnd(1103);
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- for (size_t i = 0; i < kNumTransactions; i++) {
- std::string key = "key" + std::to_string(i);
- std::string value = "value0";
- ASSERT_OK(db->Put(WriteOptions(), key, value));
- current_data[key] = value;
- }
- VerifyKeys(current_data);
- for (size_t iter = 0; iter < kNumIterations; iter++) {
- auto r = rnd.Next() % (kNumTransactions + 1);
- if (r < kNumTransactions) {
- std::string key = "key" + std::to_string(r);
- if (transactions[r] == nullptr) {
- std::string value = "value" + std::to_string(versions[r] + 1);
- auto* txn = db->BeginTransaction(WriteOptions());
- ASSERT_OK(txn->SetName("txn" + std::to_string(r)));
- ASSERT_OK(txn->Put(key, value));
- ASSERT_OK(txn->Prepare());
- transactions[r] = txn;
- } else {
- std::string value = "value" + std::to_string(++versions[r]);
- ASSERT_OK(transactions[r]->Commit());
- delete transactions[r];
- transactions[r] = nullptr;
- current_data[key] = value;
- }
- } else {
- auto* snapshot = db->GetSnapshot();
- VerifyKeys(current_data, snapshot);
- snapshots.push_back(snapshot);
- snapshot_data.push_back(current_data);
- }
- VerifyKeys(current_data);
- }
- // Take a last snapshot to test compaction with uncommitted prepared
- // transaction.
- snapshots.push_back(db->GetSnapshot());
- snapshot_data.push_back(current_data);
- ASSERT_EQ(snapshots.size(), snapshot_data.size());
- for (size_t i = 0; i < snapshots.size(); i++) {
- VerifyKeys(snapshot_data[i], snapshots[i]);
- }
- ASSERT_OK(db->Flush(FlushOptions()));
- for (size_t i = 0; i < snapshots.size(); i++) {
- VerifyKeys(snapshot_data[i], snapshots[i]);
- }
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- for (size_t i = 0; i < snapshots.size(); i++) {
- VerifyKeys(snapshot_data[i], snapshots[i]);
- }
- // cleanup
- for (size_t i = 0; i < kNumTransactions; i++) {
- if (transactions[i] == nullptr) {
- continue;
- }
- ASSERT_OK(transactions[i]->Commit());
- delete transactions[i];
- }
- for (size_t i = 0; i < snapshots.size(); i++) {
- db->ReleaseSnapshot(snapshots[i]);
- }
- }
- // Compaction should not apply the optimization to output key with sequence
- // number equal to 0 if the key is not visible to earliest snapshot, based on
- // commit sequence number.
- TEST_P(WritePreparedTransactionTest,
- CompactionShouldKeepSequenceForUncommittedKeys) {
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- // Keep track of expected sequence number.
- SequenceNumber expected_seq = 0;
- auto* transaction = db->BeginTransaction(WriteOptions());
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Put("key1", "value1"));
- ASSERT_OK(transaction->Prepare());
- ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
- SequenceNumber seq1 = expected_seq;
- ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- expected_seq++; // one for data
- if (options.two_write_queues) {
- expected_seq++; // one for commit
- }
- ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
- ASSERT_OK(db->Flush(FlushOptions()));
- // Dummy keys to avoid compaction trivially move files and get around actual
- // compaction logic.
- ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
- ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- VerifyKeys({
- {"key1", "NOT_FOUND"},
- {"key2", "value2"},
- });
- VerifyInternalKeys({
- // "key1" has not been committed. It keeps its sequence number.
- {"key1", "value1", seq1, kTypeValue},
- // "key2" is committed and output with seq = 0.
- {"key2", "value2", 0, kTypeValue},
- });
- ASSERT_OK(transaction->Commit());
- VerifyKeys({
- {"key1", "value1"},
- {"key2", "value2"},
- });
- delete transaction;
- }
- TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
- options.disable_auto_compactions = true;
- ASSERT_OK(ReOpen());
- const Snapshot* snapshot = nullptr;
- ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
- auto* txn = db->BeginTransaction(WriteOptions());
- ASSERT_OK(txn->SetName("txn"));
- ASSERT_OK(txn->Put("key1", "value2"));
- ASSERT_OK(txn->Prepare());
- auto callback = [&](void*) {
- // Snapshot is taken after compaction start. It should be taken into
- // consideration for whether to compact out value1.
- snapshot = db->GetSnapshot();
- ASSERT_OK(txn->Commit());
- delete txn;
- };
- SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
- callback);
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db->Flush(FlushOptions()));
- ASSERT_NE(nullptr, snapshot);
- VerifyKeys({{"key1", "value2"}});
- VerifyKeys({{"key1", "value1"}}, snapshot);
- db->ReleaseSnapshot(snapshot);
- }
- TEST_P(WritePreparedTransactionTest, Iterate) {
- auto verify_state = [](Iterator* iter, const std::string& key,
- const std::string& value) {
- ASSERT_TRUE(iter->Valid());
- ASSERT_OK(iter->status());
- ASSERT_EQ(key, iter->key().ToString());
- ASSERT_EQ(value, iter->value().ToString());
- };
- auto verify_iter = [&](const std::string& expected_val) {
- // Get iterator from a concurrent transaction and make sure it has the
- // same view as an iterator from the DB.
- auto* txn = db->BeginTransaction(WriteOptions());
- for (int i = 0; i < 2; i++) {
- Iterator* iter = (i == 0) ? db->NewIterator(ReadOptions())
- : txn->GetIterator(ReadOptions());
- // Seek
- iter->Seek("foo");
- verify_state(iter, "foo", expected_val);
- // Next
- iter->Seek("a");
- verify_state(iter, "a", "va");
- iter->Next();
- verify_state(iter, "foo", expected_val);
- // SeekForPrev
- iter->SeekForPrev("y");
- verify_state(iter, "foo", expected_val);
- // Prev
- iter->SeekForPrev("z");
- verify_state(iter, "z", "vz");
- iter->Prev();
- verify_state(iter, "foo", expected_val);
- delete iter;
- }
- delete txn;
- };
- ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
- auto* transaction = db->BeginTransaction(WriteOptions());
- ASSERT_OK(transaction->SetName("txn"));
- ASSERT_OK(transaction->Put("foo", "v2"));
- ASSERT_OK(transaction->Prepare());
- VerifyKeys({{"foo", "v1"}});
- // dummy keys
- ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
- ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
- verify_iter("v1");
- ASSERT_OK(transaction->Commit());
- VerifyKeys({{"foo", "v2"}});
- verify_iter("v2");
- delete transaction;
- }
- TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
- Iterator* iter = db->NewIterator(ReadOptions());
- ASSERT_OK(iter->status());
- ASSERT_TRUE(iter->Refresh().IsNotSupported());
- delete iter;
- }
- // Committing an delayed prepared has two non-atomic steps: update commit cache,
- // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
- // non-atomic steps of checking these two data structures. This test breaks each
- // in the middle to ensure correctness in spite of non-atomic execution.
- // Note: This test is limitted to the case where snapshot is larger than the
- // max_evicted_seq_.
- TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 3; // 8 entries
- for (auto split_read : {true, false}) {
- std::vector<bool> split_options = {false};
- if (split_read) {
- // Also test for break before mutex
- split_options.push_back(true);
- }
- for (auto split_before_mutex : split_options) {
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
- // Fill up the commit cache
- std::string init_value("value1");
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
- }
- // Prepare a transaction but do not commit it
- Transaction* txn =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
- ASSERT_OK(txn->Prepare());
- // Commit a bunch of entries to advance max evicted seq and make the
- // prepared a delayed prepared
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- }
- // The snapshot should not see the delayed prepared entry
- auto snap = db->GetSnapshot();
- if (split_read) {
- if (split_before_mutex) {
- // split before acquiring prepare_mutex_
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
- "AtomicCommitOfDelayedPrepared:Commit:before"},
- {"AtomicCommitOfDelayedPrepared:Commit:after",
- "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
- } else {
- // split right after reading from the commit cache
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
- "AtomicCommitOfDelayedPrepared:Commit:before"},
- {"AtomicCommitOfDelayedPrepared:Commit:after",
- "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
- }
- } else { // split commit
- // split right before removing from delayed_prepared_
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTxnDB::RemovePrepared:pause",
- "AtomicCommitOfDelayedPrepared:Read:before"},
- {"AtomicCommitOfDelayedPrepared:Read:after",
- "WritePreparedTxnDB::RemovePrepared:resume"}});
- }
- SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
- TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
- ASSERT_OK(txn->Commit());
- if (split_before_mutex) {
- // Do bunch of inserts to evict the commit entry from the cache. This
- // would prevent the 2nd look into commit cache under prepare_mutex_
- // to see the commit entry.
- auto seq = db_impl->TEST_GetLastVisibleSequence();
- size_t tries = 0;
- while (wp_db->max_evicted_seq_ < seq && tries < 50) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- tries++;
- };
- ASSERT_LT(tries, 50);
- }
- TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
- delete txn;
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
- ReadOptions roptions;
- roptions.snapshot = snap;
- PinnableSlice value;
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
- ASSERT_OK(s);
- // It should not see the commit of delayed prepared
- ASSERT_TRUE(value == init_value);
- TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
- db->ReleaseSnapshot(snap);
- });
- read_thread.join();
- commit_thread.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- } // for split_before_mutex
- } // for split_read
- }
- // When max evicted seq advances a prepared seq, it involves two updates: i)
- // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
- // ::IsInSnapshot also reads these two values in a non-atomic way. This test
- // ensures correctness if the update occurs after ::IsInSnapshot reads
- // delayed_prepared_empty_ and before it reads max_evicted_seq_.
- // Note: this test focuses on read snapshot larger than max_evicted_seq_.
- TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 3; // 8 entries
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- // Fill up the commit cache
- std::string init_value("value1");
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
- }
- // Prepare a transaction but do not commit it
- Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
- ASSERT_OK(txn->Prepare());
- // Create a gap between prepare seq and snapshot seq
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- // The snapshot should not see the delayed prepared entry
- auto snap = db->GetSnapshot();
- ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
- // split right after reading delayed_prepared_empty_
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
- "AtomicUpdateOfDelayedPrepared:before"},
- {"AtomicUpdateOfDelayedPrepared:after",
- "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
- SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
- TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
- // Commit a bunch of entries to advance max evicted seq and make the
- // prepared a delayed prepared
- size_t tries = 0;
- while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- tries++;
- };
- ASSERT_LT(tries, 50);
- // This is the case on which the test focuses
- ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
- TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- ReadOptions roptions;
- roptions.snapshot = snap;
- PinnableSlice value;
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
- ASSERT_OK(s);
- // It should not see the uncommitted value of delayed prepared
- ASSERT_TRUE(value == init_value);
- db->ReleaseSnapshot(snap);
- });
- read_thread.join();
- commit_thread.join();
- ASSERT_OK(txn->Commit());
- delete txn;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // Eviction from commit cache and update of max evicted seq are two non-atomic
- // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
- // from commit cache are two non-atomic steps. This tests if the update occurs
- // after reading max_evicted_seq_ and before reading the commit cache.
- // Note: the test focuses on snapshot larger than max_evicted_seq_
- TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
- const size_t snapshot_cache_bits = 7; // same as default
- const size_t commit_cache_bits = 3; // 8 entries
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- // Fill up the commit cache
- std::string init_value("value1");
- std::string last_value("value_final");
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
- }
- // Do an uncommitted write to prevent min_uncommitted optimization
- Transaction* txn1 =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn1->SetName("xid1"));
- ASSERT_OK(txn1->Put(Slice("key0"), last_value));
- ASSERT_OK(txn1->Prepare());
- // Do a write with prepare to get the prepare seq
- Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- ASSERT_OK(txn->Put(Slice("key1"), last_value));
- ASSERT_OK(txn->Prepare());
- ASSERT_OK(txn->Commit());
- // Create a gap between commit entry and snapshot seq
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- // The snapshot should see the last commit
- auto snap = db->GetSnapshot();
- ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
- // split right after reading max_evicted_seq_
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
- "NonAtomicUpdateOfMaxEvictedSeq:before"},
- {"NonAtomicUpdateOfMaxEvictedSeq:after",
- "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
- SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
- TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
- // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
- size_t tries = 0;
- while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
- tries++;
- };
- ASSERT_LT(tries, 50);
- // This is the case on which the test focuses
- ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
- TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- ReadOptions roptions;
- roptions.snapshot = snap;
- PinnableSlice value;
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
- ASSERT_OK(s);
- // It should see the committed value of the evicted entry
- ASSERT_TRUE(value == last_value);
- db->ReleaseSnapshot(snap);
- });
- read_thread.join();
- commit_thread.join();
- delete txn;
- ASSERT_OK(txn1->Commit());
- delete txn1;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
- // that. The test focuses on a race condition between AddPrepared and
- // AdvanceMaxEvictedSeq functions.
- TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
- if (!options.two_write_queues) {
- // This test is only for two write queues
- return;
- }
- const size_t snapshot_cache_bits = 7; // same as default
- // 1 entry to advance max after the 2nd commit
- const size_t commit_cache_bits = 0;
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
- std::string some_value("value_some");
- std::string uncommitted_value("value_uncommitted");
- // Prepare two uncommitted transactions
- Transaction* txn1 =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn1->SetName("xid1"));
- ASSERT_OK(txn1->Put(Slice("key1"), some_value));
- ASSERT_OK(txn1->Prepare());
- Transaction* txn2 =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn2->SetName("xid2"));
- ASSERT_OK(txn2->Put(Slice("key2"), some_value));
- ASSERT_OK(txn2->Prepare());
- // Start the txn here so the other thread could get its id
- Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
- port::Mutex txn_mutex_;
- // t1) Insert prepared entry, t2) commit other entries to advance max
- // evicted sec and finish checking the existing prepared entries, t1)
- // AddPrepared, t2) update max_evicted_seq_
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"AddPreparedCallback::AddPrepared::begin:pause",
- "AddPreparedBeforeMax::read_thread:start"},
- {"AdvanceMaxEvictedSeq::update_max:pause",
- "AddPreparedCallback::AddPrepared::begin:resume"},
- {"AddPreparedCallback::AddPrepared::end",
- "AdvanceMaxEvictedSeq::update_max:resume"},
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
- txn_mutex_.Lock();
- ASSERT_OK(txn->Prepare());
- txn_mutex_.Unlock();
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
- // Publish seq number with a commit
- ASSERT_OK(txn1->Commit());
- // Since the commit cache size is one the 2nd commit evict the 1st one and
- // invokes AdcanceMaxEvictedSeq
- ASSERT_OK(txn2->Commit());
- ReadOptions roptions;
- PinnableSlice value;
- // The snapshot should not see the uncommitted value from write_thread
- auto snap = db->GetSnapshot();
- ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
- // This is the scenario that we test for
- txn_mutex_.Lock();
- ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
- txn_mutex_.Unlock();
- roptions.snapshot = snap;
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
- ASSERT_TRUE(s.IsNotFound());
- db->ReleaseSnapshot(snap);
- });
- read_thread.join();
- write_thread.join();
- delete txn1;
- delete txn2;
- ASSERT_OK(txn->Commit());
- delete txn;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- // When an old prepared entry gets committed, there is a gap between the time
- // that it is published and when it is cleaned up from old_prepared_. This test
- // stresses such cases.
- TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
- const size_t snapshot_cache_bits = 7; // same as default
- for (const size_t commit_cache_bits : {0, 2, 3}) {
- for (const size_t sub_batch_cnt : {1, 2, 3}) {
- UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
- ASSERT_OK(ReOpen());
- std::atomic<const Snapshot*> snap = {nullptr};
- std::atomic<SequenceNumber> exp_prepare = {0};
- ROCKSDB_NAMESPACE::port::Thread callback_thread;
- // Value is synchronized via snap
- PinnableSlice value;
- // Take a snapshot after publish and before RemovePrepared:Start
- auto snap_callback = [&]() {
- ASSERT_EQ(nullptr, snap.load());
- snap.store(db->GetSnapshot());
- ReadOptions roptions;
- roptions.snapshot = snap.load();
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
- ASSERT_OK(s);
- };
- auto callback = [&](void* param) {
- SequenceNumber prep_seq = *((SequenceNumber*)param);
- if (prep_seq == exp_prepare.load()) { // only for write_thread
- // We need to spawn a thread to avoid deadlock since getting a
- // snpashot might end up calling AdvanceSeqByOne which needs joining
- // the write queue.
- callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
- TEST_SYNC_POINT("callback:end");
- }
- };
- // Wait for the first snapshot be taken in GetSnapshotInternal. Although
- // it might be updated before GetSnapshotInternal finishes but this should
- // cover most of the cases.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
- });
- SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
- SyncPoint::GetInstance()->EnableProcessing();
- // Thread to cause frequent evictions
- ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
- // Too many txns might cause commit_seq - prepare_seq in another thread
- // to go beyond DELTA_UPPERBOUND
- for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("value1")));
- }
- });
- ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
- for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
- Transaction* txn =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- std::string val_str = "value" + std::to_string(i);
- for (size_t b = 0; b < sub_batch_cnt; b++) {
- ASSERT_OK(txn->Put(Slice("key2"), val_str));
- }
- ASSERT_OK(txn->Prepare());
- // Let an eviction to kick in
- std::this_thread::yield();
- exp_prepare.store(txn->GetId());
- ASSERT_OK(txn->Commit());
- delete txn;
- // Wait for the snapshot taking that is triggered by
- // RemovePrepared:Start callback
- callback_thread.join();
- // Read with the snapshot taken before delayed_prepared_ cleanup
- ReadOptions roptions;
- roptions.snapshot = snap.load();
- ASSERT_NE(nullptr, roptions.snapshot);
- PinnableSlice value2;
- auto s =
- db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
- ASSERT_OK(s);
- // It should see its own write
- ASSERT_TRUE(val_str == value2);
- // The value read by snapshot should not change
- ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
- db->ReleaseSnapshot(roptions.snapshot);
- snap.store(nullptr);
- }
- });
- write_thread.join();
- eviction_thread.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- }
- }
- // Test that updating the commit map will not affect the existing snapshots
- TEST_P(WritePreparedTransactionTest, AtomicCommit) {
- for (bool skip_prepare : {true, false}) {
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritePreparedTxnDB::AddCommitted:start",
- "AtomicCommit::GetSnapshot:start"},
- {"AtomicCommit::Get:end",
- "WritePreparedTxnDB::AddCommitted:start:pause"},
- {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
- {"AtomicCommit::Get2:end",
- "WritePreparedTxnDB::AddCommitted:end:pause:"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
- if (skip_prepare) {
- ASSERT_OK(db->Put(WriteOptions(), Slice("key"), Slice("value")));
- } else {
- Transaction* txn =
- db->BeginTransaction(WriteOptions(), TransactionOptions());
- ASSERT_OK(txn->SetName("xid"));
- ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
- ASSERT_OK(txn->Prepare());
- ASSERT_OK(txn->Commit());
- delete txn;
- }
- });
- ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
- ReadOptions roptions;
- TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
- roptions.snapshot = db->GetSnapshot();
- PinnableSlice val;
- auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
- TEST_SYNC_POINT("AtomicCommit::Get:end");
- TEST_SYNC_POINT("AtomicCommit::Get2:start");
- ASSERT_SAME(roptions, db, s, val, "key");
- TEST_SYNC_POINT("AtomicCommit::Get2:end");
- db->ReleaseSnapshot(roptions.snapshot);
- });
- read_thread.join();
- write_thread.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- }
- TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) {
- options.level0_file_num_compaction_trigger = 2;
- // Always use SingleDelete to rollback Put.
- txn_db_options.rollback_deletion_type_callback =
- [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
- const auto write_to_db = [&]() {
- assert(db);
- std::unique_ptr<Transaction> txn0(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn0->SetName("txn0"));
- ASSERT_OK(txn0->Put("a", "v0"));
- ASSERT_OK(txn0->Prepare());
- // Generate sst1: [PUT('a')]
- ASSERT_OK(db->Flush(FlushOptions()));
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = options.num_levels - 1;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
- ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- }
- ASSERT_OK(txn0->Rollback());
- txn0.reset();
- ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
- ASSERT_OK(db->SingleDelete(WriteOptions(), "a"));
- // Generate another SST with a SD to cover the oldest PUT('a')
- ASSERT_OK(db->Flush(FlushOptions()));
- auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
- assert(dbimpl);
- ASSERT_OK(dbimpl->TEST_WaitForCompact());
- {
- CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
- ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
- }
- {
- std::string value;
- const Status s = db->Get(ReadOptions(), "a", &value);
- ASSERT_TRUE(s.IsNotFound());
- }
- };
- // Destroy and reopen
- ASSERT_OK(ReOpen());
- write_to_db();
- }
- // Test that we can change write policy from WriteCommitted to WritePrepared
- // after a clean shutdown (which would empty the WAL)
- TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
- bool empty_wal = true;
- CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
- }
- // Test that we fail fast if WAL is not emptied between changing the write
- // policy from WriteCommitted to WritePrepared
- TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
- bool empty_wal = true;
- CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
- }
- // Test that we can change write policy from WritePrepare back to WriteCommitted
- // after a clean shutdown (which would empty the WAL)
- TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
- bool empty_wal = true;
- CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
- }
- // Test that we fail fast if WAL is not emptied between changing the write
- // policy from WriteCommitted to WritePrepared
- TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
- bool empty_wal = true;
- CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- if (getenv("CIRCLECI") || getenv("GITHUB_ACTIONS")) {
- // Looking for backtrace on "Resource temporarily unavailable" exceptions
- ::testing::FLAGS_gtest_catch_exceptions = false;
- }
- return RUN_ALL_TESTS();
- }
|