| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/db_test_util.h"
- #include "port/stack_trace.h"
- #include "rocksdb/perf_context.h"
- #include "rocksdb/utilities/debug.h"
- #include "table/block_based/block_based_table_reader.h"
- #include "table/block_based/block_builder.h"
- #include "test_util/fault_injection_test_env.h"
- #if !defined(ROCKSDB_LITE)
- #include "test_util/sync_point.h"
- #endif
- namespace ROCKSDB_NAMESPACE {
- class DBBasicTest : public DBTestBase {
- public:
- DBBasicTest() : DBTestBase("/db_basic_test") {}
- };
- TEST_F(DBBasicTest, OpenWhenOpen) {
- Options options = CurrentOptions();
- options.env = env_;
- ROCKSDB_NAMESPACE::DB* db2 = nullptr;
- ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
- ASSERT_EQ(Status::Code::kIOError, s.code());
- ASSERT_EQ(Status::SubCode::kNone, s.subcode());
- ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
- delete db2;
- }
- #ifndef ROCKSDB_LITE
- TEST_F(DBBasicTest, ReadOnlyDB) {
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_OK(Put("bar", "v2"));
- ASSERT_OK(Put("foo", "v3"));
- Close();
- auto options = CurrentOptions();
- assert(options.env == env_);
- ASSERT_OK(ReadOnlyReopen(options));
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v2", Get("bar"));
- Iterator* iter = db_->NewIterator(ReadOptions());
- int count = 0;
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ASSERT_OK(iter->status());
- ++count;
- }
- ASSERT_EQ(count, 2);
- delete iter;
- Close();
- // Reopen and flush memtable.
- Reopen(options);
- Flush();
- Close();
- // Now check keys in read only mode.
- ASSERT_OK(ReadOnlyReopen(options));
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v2", Get("bar"));
- ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
- }
- TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_OK(Put("bar", "v2"));
- ASSERT_OK(Put("foo", "v3"));
- Close();
- auto options = CurrentOptions();
- options.write_dbid_to_manifest = true;
- assert(options.env == env_);
- ASSERT_OK(ReadOnlyReopen(options));
- std::string db_id1;
- db_->GetDbIdentity(db_id1);
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v2", Get("bar"));
- Iterator* iter = db_->NewIterator(ReadOptions());
- int count = 0;
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ASSERT_OK(iter->status());
- ++count;
- }
- ASSERT_EQ(count, 2);
- delete iter;
- Close();
- // Reopen and flush memtable.
- Reopen(options);
- Flush();
- Close();
- // Now check keys in read only mode.
- ASSERT_OK(ReadOnlyReopen(options));
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v2", Get("bar"));
- ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
- std::string db_id2;
- db_->GetDbIdentity(db_id2);
- ASSERT_EQ(db_id1, db_id2);
- }
- TEST_F(DBBasicTest, CompactedDB) {
- const uint64_t kFileSize = 1 << 20;
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.write_buffer_size = kFileSize;
- options.target_file_size_base = kFileSize;
- options.max_bytes_for_level_base = 1 << 30;
- options.compression = kNoCompression;
- Reopen(options);
- // 1 L0 file, use CompactedDB if max_open_files = -1
- ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
- Flush();
- Close();
- ASSERT_OK(ReadOnlyReopen(options));
- Status s = Put("new", "value");
- ASSERT_EQ(s.ToString(),
- "Not implemented: Not supported operation in read only mode.");
- ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
- Close();
- options.max_open_files = -1;
- ASSERT_OK(ReadOnlyReopen(options));
- s = Put("new", "value");
- ASSERT_EQ(s.ToString(),
- "Not implemented: Not supported in compacted db mode.");
- ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
- Close();
- Reopen(options);
- // Add more L0 files
- ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
- Flush();
- ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
- Flush();
- ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
- ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
- Flush();
- Close();
- ASSERT_OK(ReadOnlyReopen(options));
- // Fallback to read-only DB
- s = Put("new", "value");
- ASSERT_EQ(s.ToString(),
- "Not implemented: Not supported operation in read only mode.");
- Close();
- // Full compaction
- Reopen(options);
- // Add more keys
- ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
- ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
- ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
- ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
- db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_EQ(3, NumTableFilesAtLevel(1));
- Close();
- // CompactedDB
- ASSERT_OK(ReadOnlyReopen(options));
- s = Put("new", "value");
- ASSERT_EQ(s.ToString(),
- "Not implemented: Not supported in compacted db mode.");
- ASSERT_EQ("NOT_FOUND", Get("abc"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
- ASSERT_EQ("NOT_FOUND", Get("ccc"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
- ASSERT_EQ("NOT_FOUND", Get("ggg"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
- ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
- ASSERT_EQ("NOT_FOUND", Get("kkk"));
- // MultiGet
- std::vector<std::string> values;
- std::vector<Status> status_list = dbfull()->MultiGet(
- ReadOptions(),
- std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
- Slice("ggg"), Slice("iii"), Slice("kkk")}),
- &values);
- ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
- ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
- ASSERT_OK(status_list[0]);
- ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
- ASSERT_TRUE(status_list[1].IsNotFound());
- ASSERT_OK(status_list[2]);
- ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
- ASSERT_TRUE(status_list[3].IsNotFound());
- ASSERT_OK(status_list[4]);
- ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
- ASSERT_TRUE(status_list[5].IsNotFound());
- Reopen(options);
- // Add a key
- ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
- Close();
- ASSERT_OK(ReadOnlyReopen(options));
- s = Put("new", "value");
- ASSERT_EQ(s.ToString(),
- "Not implemented: Not supported operation in read only mode.");
- }
- TEST_F(DBBasicTest, LevelLimitReopen) {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu"}, options);
- const std::string value(1024 * 1024, ' ');
- int i = 0;
- while (NumTableFilesAtLevel(2, 1) == 0) {
- ASSERT_OK(Put(1, Key(i++), value));
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- }
- options.num_levels = 1;
- options.max_bytes_for_level_multiplier_additional.resize(1, 1);
- Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ(s.IsInvalidArgument(), true);
- ASSERT_EQ(s.ToString(),
- "Invalid argument: db has more levels than options.num_levels");
- options.num_levels = 10;
- options.max_bytes_for_level_multiplier_additional.resize(10, 1);
- ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
- }
- #endif // ROCKSDB_LITE
- TEST_F(DBBasicTest, PutDeleteGet) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_OK(Put(1, "foo", "v2"));
- ASSERT_EQ("v2", Get(1, "foo"));
- ASSERT_OK(Delete(1, "foo"));
- ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
- } while (ChangeOptions());
- }
- TEST_F(DBBasicTest, PutSingleDeleteGet) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_OK(Put(1, "foo2", "v2"));
- ASSERT_EQ("v2", Get(1, "foo2"));
- ASSERT_OK(SingleDelete(1, "foo"));
- ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
- // Ski FIFO and universal compaction because they do not apply to the test
- // case. Skip MergePut because single delete does not get removed when it
- // encounters a merge.
- } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
- kSkipMergePut));
- }
- TEST_F(DBBasicTest, EmptyFlush) {
- // It is possible to produce empty flushes when using single deletes. Tests
- // whether empty flushes cause issues.
- do {
- Random rnd(301);
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- CreateAndReopenWithCF({"pikachu"}, options);
- Put(1, "a", Slice());
- SingleDelete(1, "a");
- ASSERT_OK(Flush(1));
- ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
- // Skip FIFO and universal compaction as they do not apply to the test
- // case. Skip MergePut because merges cannot be combined with single
- // deletions.
- } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
- kSkipMergePut));
- }
- TEST_F(DBBasicTest, GetFromVersions) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Flush(1));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
- } while (ChangeOptions());
- }
- #ifndef ROCKSDB_LITE
- TEST_F(DBBasicTest, GetSnapshot) {
- anon::OptionsOverride options_override;
- options_override.skip_policy = kSkipNoSnapshot;
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
- // Try with both a short key and a long key
- for (int i = 0; i < 2; i++) {
- std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
- ASSERT_OK(Put(1, key, "v1"));
- const Snapshot* s1 = db_->GetSnapshot();
- ASSERT_OK(Put(1, key, "v2"));
- ASSERT_EQ("v2", Get(1, key));
- ASSERT_EQ("v1", Get(1, key, s1));
- ASSERT_OK(Flush(1));
- ASSERT_EQ("v2", Get(1, key));
- ASSERT_EQ("v1", Get(1, key, s1));
- db_->ReleaseSnapshot(s1);
- }
- } while (ChangeOptions());
- }
- #endif // ROCKSDB_LITE
- TEST_F(DBBasicTest, CheckLock) {
- do {
- DB* localdb;
- Options options = CurrentOptions();
- ASSERT_OK(TryReopen(options));
- // second open should fail
- ASSERT_TRUE(!(DB::Open(options, dbname_, &localdb)).ok());
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, FlushMultipleMemtable) {
- do {
- Options options = CurrentOptions();
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- options.max_write_buffer_number = 4;
- options.min_write_buffer_number_to_merge = 3;
- options.max_write_buffer_size_to_maintain = -1;
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- ASSERT_OK(Flush(1));
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
- // Block flush thread and disable compaction thread
- env_->SetBackgroundThreads(1, Env::HIGH);
- env_->SetBackgroundThreads(1, Env::LOW);
- test::SleepingBackgroundTask sleeping_task_low;
- env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
- Env::Priority::LOW);
- test::SleepingBackgroundTask sleeping_task_high;
- env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
- &sleeping_task_high, Env::Priority::HIGH);
- Options options = CurrentOptions();
- // disable compaction
- options.disable_auto_compactions = true;
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- options.max_write_buffer_number = 2;
- options.min_write_buffer_number_to_merge = 1;
- options.max_write_buffer_size_to_maintain =
- static_cast<int64_t>(options.write_buffer_size);
- CreateAndReopenWithCF({"pikachu"}, options);
- // Compaction can still go through even if no thread can flush the
- // mem table.
- ASSERT_OK(Flush(0));
- ASSERT_OK(Flush(1));
- // Insert can go through
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- ASSERT_EQ("v1", Get(0, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- sleeping_task_high.WakeUp();
- sleeping_task_high.WaitUntilDone();
- // Flush can still go through.
- ASSERT_OK(Flush(0));
- ASSERT_OK(Flush(1));
- sleeping_task_low.WakeUp();
- sleeping_task_low.WaitUntilDone();
- }
- TEST_F(DBBasicTest, FLUSH) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- SetPerfLevel(kEnableTime);
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
- // this will now also flush the last 2 writes
- ASSERT_OK(Flush(1));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- get_perf_context()->Reset();
- Get(1, "foo");
- ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
- ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
- ASSERT_OK(Flush(1));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v2", Get(1, "bar"));
- get_perf_context()->Reset();
- ASSERT_EQ("v2", Get(1, "foo"));
- ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
- ASSERT_OK(Flush(1));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- // 'foo' should be there because its put
- // has WAL enabled.
- ASSERT_EQ("v3", Get(1, "foo"));
- ASSERT_EQ("v3", Get(1, "bar"));
- SetPerfLevel(kDisable);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, ManifestRollOver) {
- do {
- Options options;
- options.max_manifest_file_size = 10; // 10 bytes
- options = CurrentOptions(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- {
- ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
- ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
- ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
- uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
- ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
- uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
- ASSERT_GT(manifest_after_flush, manifest_before_flush);
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
- // check if a new manifest file got inserted or not.
- ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
- ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
- ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
- }
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
- do {
- std::string id1;
- ASSERT_OK(db_->GetDbIdentity(id1));
- Options options = CurrentOptions();
- Reopen(options);
- std::string id2;
- ASSERT_OK(db_->GetDbIdentity(id2));
- // id1 should match id2 because identity was not regenerated
- ASSERT_EQ(id1.compare(id2), 0);
- std::string idfilename = IdentityFileName(dbname_);
- ASSERT_OK(env_->DeleteFile(idfilename));
- Reopen(options);
- std::string id3;
- ASSERT_OK(db_->GetDbIdentity(id3));
- if (options.write_dbid_to_manifest) {
- ASSERT_EQ(id1.compare(id3), 0);
- } else {
- // id1 should NOT match id3 because identity was regenerated
- ASSERT_NE(id1.compare(id3), 0);
- }
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
- do {
- std::string id1;
- ASSERT_OK(db_->GetDbIdentity(id1));
- Options options = CurrentOptions();
- options.write_dbid_to_manifest = true;
- Reopen(options);
- std::string id2;
- ASSERT_OK(db_->GetDbIdentity(id2));
- // id1 should match id2 because identity was not regenerated
- ASSERT_EQ(id1.compare(id2), 0);
- std::string idfilename = IdentityFileName(dbname_);
- ASSERT_OK(env_->DeleteFile(idfilename));
- Reopen(options);
- std::string id3;
- ASSERT_OK(db_->GetDbIdentity(id3));
- // id1 should NOT match id3 because identity was regenerated
- ASSERT_EQ(id1, id3);
- } while (ChangeCompactOptions());
- }
- #ifndef ROCKSDB_LITE
- TEST_F(DBBasicTest, Snapshot) {
- anon::OptionsOverride options_override;
- options_override.skip_policy = kSkipNoSnapshot;
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
- Put(0, "foo", "0v1");
- Put(1, "foo", "1v1");
- const Snapshot* s1 = db_->GetSnapshot();
- ASSERT_EQ(1U, GetNumSnapshots());
- uint64_t time_snap1 = GetTimeOldestSnapshots();
- ASSERT_GT(time_snap1, 0U);
- ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
- Put(0, "foo", "0v2");
- Put(1, "foo", "1v2");
- env_->addon_time_.fetch_add(1);
- const Snapshot* s2 = db_->GetSnapshot();
- ASSERT_EQ(2U, GetNumSnapshots());
- ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
- ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
- Put(0, "foo", "0v3");
- Put(1, "foo", "1v3");
- {
- ManagedSnapshot s3(db_);
- ASSERT_EQ(3U, GetNumSnapshots());
- ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
- ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
- Put(0, "foo", "0v4");
- Put(1, "foo", "1v4");
- ASSERT_EQ("0v1", Get(0, "foo", s1));
- ASSERT_EQ("1v1", Get(1, "foo", s1));
- ASSERT_EQ("0v2", Get(0, "foo", s2));
- ASSERT_EQ("1v2", Get(1, "foo", s2));
- ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
- ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
- ASSERT_EQ("0v4", Get(0, "foo"));
- ASSERT_EQ("1v4", Get(1, "foo"));
- }
- ASSERT_EQ(2U, GetNumSnapshots());
- ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
- ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
- ASSERT_EQ("0v1", Get(0, "foo", s1));
- ASSERT_EQ("1v1", Get(1, "foo", s1));
- ASSERT_EQ("0v2", Get(0, "foo", s2));
- ASSERT_EQ("1v2", Get(1, "foo", s2));
- ASSERT_EQ("0v4", Get(0, "foo"));
- ASSERT_EQ("1v4", Get(1, "foo"));
- db_->ReleaseSnapshot(s1);
- ASSERT_EQ("0v2", Get(0, "foo", s2));
- ASSERT_EQ("1v2", Get(1, "foo", s2));
- ASSERT_EQ("0v4", Get(0, "foo"));
- ASSERT_EQ("1v4", Get(1, "foo"));
- ASSERT_EQ(1U, GetNumSnapshots());
- ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
- ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
- db_->ReleaseSnapshot(s2);
- ASSERT_EQ(0U, GetNumSnapshots());
- ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
- ASSERT_EQ("0v4", Get(0, "foo"));
- ASSERT_EQ("1v4", Get(1, "foo"));
- } while (ChangeOptions());
- }
- #endif // ROCKSDB_LITE
- TEST_F(DBBasicTest, CompactBetweenSnapshots) {
- anon::OptionsOverride options_override;
- options_override.skip_policy = kSkipNoSnapshot;
- do {
- Options options = CurrentOptions(options_override);
- options.disable_auto_compactions = true;
- CreateAndReopenWithCF({"pikachu"}, options);
- Random rnd(301);
- FillLevels("a", "z", 1);
- Put(1, "foo", "first");
- const Snapshot* snapshot1 = db_->GetSnapshot();
- Put(1, "foo", "second");
- Put(1, "foo", "third");
- Put(1, "foo", "fourth");
- const Snapshot* snapshot2 = db_->GetSnapshot();
- Put(1, "foo", "fifth");
- Put(1, "foo", "sixth");
- // All entries (including duplicates) exist
- // before any compaction or flush is triggered.
- ASSERT_EQ(AllEntriesFor("foo", 1),
- "[ sixth, fifth, fourth, third, second, first ]");
- ASSERT_EQ("sixth", Get(1, "foo"));
- ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
- ASSERT_EQ("first", Get(1, "foo", snapshot1));
- // After a flush, "second", "third" and "fifth" should
- // be removed
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
- // after we release the snapshot1, only two values left
- db_->ReleaseSnapshot(snapshot1);
- FillLevels("a", "z", 1);
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- // We have only one valid snapshot snapshot2. Since snapshot1 is
- // not valid anymore, "first" should be removed by a compaction.
- ASSERT_EQ("sixth", Get(1, "foo"));
- ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
- // after we release the snapshot2, only one value should be left
- db_->ReleaseSnapshot(snapshot2);
- FillLevels("a", "z", 1);
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ("sixth", Get(1, "foo"));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
- } while (ChangeOptions(kSkipFIFOCompaction));
- }
- TEST_F(DBBasicTest, DBOpen_Options) {
- Options options = CurrentOptions();
- Close();
- Destroy(options);
- // Does not exist, and create_if_missing == false: error
- DB* db = nullptr;
- options.create_if_missing = false;
- Status s = DB::Open(options, dbname_, &db);
- ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
- ASSERT_TRUE(db == nullptr);
- // Does not exist, and create_if_missing == true: OK
- options.create_if_missing = true;
- s = DB::Open(options, dbname_, &db);
- ASSERT_OK(s);
- ASSERT_TRUE(db != nullptr);
- delete db;
- db = nullptr;
- // Does exist, and error_if_exists == true: error
- options.create_if_missing = false;
- options.error_if_exists = true;
- s = DB::Open(options, dbname_, &db);
- ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
- ASSERT_TRUE(db == nullptr);
- // Does exist, and error_if_exists == false: OK
- options.create_if_missing = true;
- options.error_if_exists = false;
- s = DB::Open(options, dbname_, &db);
- ASSERT_OK(s);
- ASSERT_TRUE(db != nullptr);
- delete db;
- db = nullptr;
- }
- TEST_F(DBBasicTest, CompactOnFlush) {
- anon::OptionsOverride options_override;
- options_override.skip_policy = kSkipNoSnapshot;
- do {
- Options options = CurrentOptions(options_override);
- options.disable_auto_compactions = true;
- CreateAndReopenWithCF({"pikachu"}, options);
- Put(1, "foo", "v1");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
- // Write two new keys
- Put(1, "a", "begin");
- Put(1, "z", "end");
- Flush(1);
- // Case1: Delete followed by a put
- Delete(1, "foo");
- Put(1, "foo", "v2");
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
- // After the current memtable is flushed, the DEL should
- // have been removed
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
- // Case 2: Delete followed by another delete
- Delete(1, "foo");
- Delete(1, "foo");
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
- // Case 3: Put followed by a delete
- Put(1, "foo", "v3");
- Delete(1, "foo");
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
- // Case 4: Put followed by another Put
- Put(1, "foo", "v4");
- Put(1, "foo", "v5");
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
- // clear database
- Delete(1, "foo");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
- // Case 5: Put followed by snapshot followed by another Put
- // Both puts should remain.
- Put(1, "foo", "v6");
- const Snapshot* snapshot = db_->GetSnapshot();
- Put(1, "foo", "v7");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
- db_->ReleaseSnapshot(snapshot);
- // clear database
- Delete(1, "foo");
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
- // Case 5: snapshot followed by a put followed by another Put
- // Only the last put should remain.
- const Snapshot* snapshot1 = db_->GetSnapshot();
- Put(1, "foo", "v8");
- Put(1, "foo", "v9");
- ASSERT_OK(Flush(1));
- ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
- db_->ReleaseSnapshot(snapshot1);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, FlushOneColumnFamily) {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
- "alyosha", "popovich"},
- options);
- ASSERT_OK(Put(0, "Default", "Default"));
- ASSERT_OK(Put(1, "pikachu", "pikachu"));
- ASSERT_OK(Put(2, "ilya", "ilya"));
- ASSERT_OK(Put(3, "muromec", "muromec"));
- ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
- ASSERT_OK(Put(5, "nikitich", "nikitich"));
- ASSERT_OK(Put(6, "alyosha", "alyosha"));
- ASSERT_OK(Put(7, "popovich", "popovich"));
- for (int i = 0; i < 8; ++i) {
- Flush(i);
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), i + 1U);
- }
- }
- TEST_F(DBBasicTest, MultiGetSimple) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- SetPerfLevel(kEnableCount);
- ASSERT_OK(Put(1, "k1", "v1"));
- ASSERT_OK(Put(1, "k2", "v2"));
- ASSERT_OK(Put(1, "k3", "v3"));
- ASSERT_OK(Put(1, "k4", "v4"));
- ASSERT_OK(Delete(1, "k4"));
- ASSERT_OK(Put(1, "k5", "v5"));
- ASSERT_OK(Delete(1, "no_key"));
- std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
- std::vector<std::string> values(20, "Temporary data to be overwritten");
- std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
- get_perf_context()->Reset();
- std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
- ASSERT_EQ(values.size(), keys.size());
- ASSERT_EQ(values[0], "v1");
- ASSERT_EQ(values[1], "v2");
- ASSERT_EQ(values[2], "v3");
- ASSERT_EQ(values[4], "v5");
- // four kv pairs * two bytes per value
- ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
- ASSERT_OK(s[0]);
- ASSERT_OK(s[1]);
- ASSERT_OK(s[2]);
- ASSERT_TRUE(s[3].IsNotFound());
- ASSERT_OK(s[4]);
- ASSERT_TRUE(s[5].IsNotFound());
- SetPerfLevel(kDisable);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, MultiGetEmpty) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- // Empty Key Set
- std::vector<Slice> keys;
- std::vector<std::string> values;
- std::vector<ColumnFamilyHandle*> cfs;
- std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
- ASSERT_EQ(s.size(), 0U);
- // Empty Database, Empty Key Set
- Options options = CurrentOptions();
- options.create_if_missing = true;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
- ASSERT_EQ(s.size(), 0U);
- // Empty Database, Search for Keys
- keys.resize(2);
- keys[0] = "a";
- keys[1] = "b";
- cfs.push_back(handles_[0]);
- cfs.push_back(handles_[1]);
- s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
- ASSERT_EQ(static_cast<int>(s.size()), 2);
- ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, ChecksumTest) {
- BlockBasedTableOptions table_options;
- Options options = CurrentOptions();
- // change when new checksum type added
- int max_checksum = static_cast<int>(kxxHash64);
- const int kNumPerFile = 2;
- // generate one table with each type of checksum
- for (int i = 0; i <= max_checksum; ++i) {
- table_options.checksum = static_cast<ChecksumType>(i);
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- Reopen(options);
- for (int j = 0; j < kNumPerFile; ++j) {
- ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
- }
- ASSERT_OK(Flush());
- }
- // with each valid checksum type setting...
- for (int i = 0; i <= max_checksum; ++i) {
- table_options.checksum = static_cast<ChecksumType>(i);
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- Reopen(options);
- // verify every type of checksum (should be regardless of that setting)
- for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
- ASSERT_EQ(Key(j), Get(Key(j)));
- }
- }
- }
- // On Windows you can have either memory mapped file or a file
- // with unbuffered access. So this asserts and does not make
- // sense to run
- #ifndef OS_WIN
- TEST_F(DBBasicTest, MmapAndBufferOptions) {
- if (!IsMemoryMappedAccessSupported()) {
- return;
- }
- Options options = CurrentOptions();
- options.use_direct_reads = true;
- options.allow_mmap_reads = true;
- ASSERT_NOK(TryReopen(options));
- // All other combinations are acceptable
- options.use_direct_reads = false;
- ASSERT_OK(TryReopen(options));
- if (IsDirectIOSupported()) {
- options.use_direct_reads = true;
- options.allow_mmap_reads = false;
- ASSERT_OK(TryReopen(options));
- }
- options.use_direct_reads = false;
- ASSERT_OK(TryReopen(options));
- }
- #endif
- class TestEnv : public EnvWrapper {
- public:
- explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
- class TestLogger : public Logger {
- public:
- using Logger::Logv;
- explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
- ~TestLogger() override {
- if (!closed_) {
- CloseHelper();
- }
- }
- void Logv(const char* /*format*/, va_list /*ap*/) override {}
- protected:
- Status CloseImpl() override { return CloseHelper(); }
- private:
- Status CloseHelper() {
- env->CloseCountInc();
- ;
- return Status::IOError();
- }
- TestEnv* env;
- };
- void CloseCountInc() { close_count++; }
- int GetCloseCount() { return close_count; }
- Status NewLogger(const std::string& /*fname*/,
- std::shared_ptr<Logger>* result) override {
- result->reset(new TestLogger(this));
- return Status::OK();
- }
- private:
- int close_count;
- };
- TEST_F(DBBasicTest, DBClose) {
- Options options = GetDefaultOptions();
- std::string dbname = test::PerThreadDBPath("db_close_test");
- ASSERT_OK(DestroyDB(dbname, options));
- DB* db = nullptr;
- TestEnv* env = new TestEnv(env_);
- std::unique_ptr<TestEnv> local_env_guard(env);
- options.create_if_missing = true;
- options.env = env;
- Status s = DB::Open(options, dbname, &db);
- ASSERT_OK(s);
- ASSERT_TRUE(db != nullptr);
- s = db->Close();
- ASSERT_EQ(env->GetCloseCount(), 1);
- ASSERT_EQ(s, Status::IOError());
- delete db;
- ASSERT_EQ(env->GetCloseCount(), 1);
- // Do not call DB::Close() and ensure our logger Close() still gets called
- s = DB::Open(options, dbname, &db);
- ASSERT_OK(s);
- ASSERT_TRUE(db != nullptr);
- delete db;
- ASSERT_EQ(env->GetCloseCount(), 2);
- // Provide our own logger and ensure DB::Close() does not close it
- options.info_log.reset(new TestEnv::TestLogger(env));
- options.create_if_missing = false;
- s = DB::Open(options, dbname, &db);
- ASSERT_OK(s);
- ASSERT_TRUE(db != nullptr);
- s = db->Close();
- ASSERT_EQ(s, Status::OK());
- delete db;
- ASSERT_EQ(env->GetCloseCount(), 2);
- options.info_log.reset();
- ASSERT_EQ(env->GetCloseCount(), 3);
- }
- TEST_F(DBBasicTest, DBCloseFlushError) {
- std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
- new FaultInjectionTestEnv(env_));
- Options options = GetDefaultOptions();
- options.create_if_missing = true;
- options.manual_wal_flush = true;
- options.write_buffer_size=100;
- options.env = fault_injection_env.get();
- Reopen(options);
- ASSERT_OK(Put("key1", "value1"));
- ASSERT_OK(Put("key2", "value2"));
- ASSERT_OK(dbfull()->TEST_SwitchMemtable());
- ASSERT_OK(Put("key3", "value3"));
- fault_injection_env->SetFilesystemActive(false);
- Status s = dbfull()->Close();
- fault_injection_env->SetFilesystemActive(true);
- ASSERT_NE(s, Status::OK());
- Destroy(options);
- }
- class DBMultiGetTestWithParam : public DBBasicTest,
- public testing::WithParamInterface<bool> {};
- TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
- "alyosha", "popovich"},
- options);
- // <CF, key, value> tuples
- std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
- static const int num_keys = 24;
- cf_kv_vec.reserve(num_keys);
- for (int i = 0; i < num_keys; ++i) {
- int cf = i / 3;
- int cf_key = 1 % 3;
- cf_kv_vec.emplace_back(std::make_tuple(
- cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
- "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
- ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
- std::get<2>(cf_kv_vec[i])));
- }
- int get_sv_count = 0;
- ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
- if (++get_sv_count == 2) {
- // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
- // is forced to repeat the process
- for (int i = 0; i < num_keys; ++i) {
- int cf = i / 3;
- int cf_key = i % 8;
- if (cf_key == 0) {
- ASSERT_OK(Flush(cf));
- }
- ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
- std::get<2>(cf_kv_vec[i]) + "_2"));
- }
- }
- if (get_sv_count == 11) {
- for (int i = 0; i < 8; ++i) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
- db->GetColumnFamilyHandle(i))
- ->cfd();
- ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
- }
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- std::vector<int> cfs;
- std::vector<std::string> keys;
- std::vector<std::string> values;
- for (int i = 0; i < num_keys; ++i) {
- cfs.push_back(std::get<0>(cf_kv_vec[i]));
- keys.push_back(std::get<1>(cf_kv_vec[i]));
- }
- values = MultiGet(cfs, keys, nullptr, GetParam());
- ASSERT_EQ(values.size(), num_keys);
- for (unsigned int j = 0; j < values.size(); ++j) {
- ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
- }
- keys.clear();
- cfs.clear();
- cfs.push_back(std::get<0>(cf_kv_vec[0]));
- keys.push_back(std::get<1>(cf_kv_vec[0]));
- cfs.push_back(std::get<0>(cf_kv_vec[3]));
- keys.push_back(std::get<1>(cf_kv_vec[3]));
- cfs.push_back(std::get<0>(cf_kv_vec[4]));
- keys.push_back(std::get<1>(cf_kv_vec[4]));
- values = MultiGet(cfs, keys, nullptr, GetParam());
- ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
- ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
- ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
- keys.clear();
- cfs.clear();
- cfs.push_back(std::get<0>(cf_kv_vec[7]));
- keys.push_back(std::get<1>(cf_kv_vec[7]));
- cfs.push_back(std::get<0>(cf_kv_vec[6]));
- keys.push_back(std::get<1>(cf_kv_vec[6]));
- cfs.push_back(std::get<0>(cf_kv_vec[1]));
- keys.push_back(std::get<1>(cf_kv_vec[1]));
- values = MultiGet(cfs, keys, nullptr, GetParam());
- ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
- ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
- ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
- for (int cf = 0; cf < 8; ++cf) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
- reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
- ->cfd();
- ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
- ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
- }
- }
- TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
- "alyosha", "popovich"},
- options);
- for (int i = 0; i < 8; ++i) {
- ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
- "cf" + std::to_string(i) + "_val"));
- }
- int get_sv_count = 0;
- int retries = 0;
- bool last_try = false;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
- last_try = true;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
- if (last_try) {
- return;
- }
- if (++get_sv_count == 2) {
- ++retries;
- get_sv_count = 0;
- for (int i = 0; i < 8; ++i) {
- ASSERT_OK(Flush(i));
- ASSERT_OK(Put(
- i, "cf" + std::to_string(i) + "_key",
- "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
- }
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- std::vector<int> cfs;
- std::vector<std::string> keys;
- std::vector<std::string> values;
- for (int i = 0; i < 8; ++i) {
- cfs.push_back(i);
- keys.push_back("cf" + std::to_string(i) + "_key");
- }
- values = MultiGet(cfs, keys, nullptr, GetParam());
- ASSERT_TRUE(last_try);
- ASSERT_EQ(values.size(), 8);
- for (unsigned int j = 0; j < values.size(); ++j) {
- ASSERT_EQ(values[j],
- "cf" + std::to_string(j) + "_val" + std::to_string(retries));
- }
- for (int i = 0; i < 8; ++i) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
- reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
- ->cfd();
- ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
- }
- }
- TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
- "alyosha", "popovich"},
- options);
- for (int i = 0; i < 8; ++i) {
- ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
- "cf" + std::to_string(i) + "_val"));
- }
- int get_sv_count = 0;
- ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
- if (++get_sv_count == 2) {
- for (int i = 0; i < 8; ++i) {
- ASSERT_OK(Flush(i));
- ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
- "cf" + std::to_string(i) + "_val2"));
- }
- }
- if (get_sv_count == 8) {
- for (int i = 0; i < 8; ++i) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
- db->GetColumnFamilyHandle(i))
- ->cfd();
- ASSERT_TRUE(
- (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
- (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
- }
- }
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- std::vector<int> cfs;
- std::vector<std::string> keys;
- std::vector<std::string> values;
- for (int i = 0; i < 8; ++i) {
- cfs.push_back(i);
- keys.push_back("cf" + std::to_string(i) + "_key");
- }
- const Snapshot* snapshot = db_->GetSnapshot();
- values = MultiGet(cfs, keys, snapshot, GetParam());
- db_->ReleaseSnapshot(snapshot);
- ASSERT_EQ(values.size(), 8);
- for (unsigned int j = 0; j < values.size(); ++j) {
- ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
- }
- for (int i = 0; i < 8; ++i) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
- reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
- ->cfd();
- ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
- }
- }
- INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
- testing::Bool());
- TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- SetPerfLevel(kEnableCount);
- ASSERT_OK(Put(1, "k1", "v1"));
- ASSERT_OK(Put(1, "k2", "v2"));
- ASSERT_OK(Put(1, "k3", "v3"));
- ASSERT_OK(Put(1, "k4", "v4"));
- ASSERT_OK(Delete(1, "k4"));
- ASSERT_OK(Put(1, "k5", "v5"));
- ASSERT_OK(Delete(1, "no_key"));
- get_perf_context()->Reset();
- std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
- std::vector<PinnableSlice> values(keys.size());
- std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
- std::vector<Status> s(keys.size());
- db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
- values.data(), s.data(), false);
- ASSERT_EQ(values.size(), keys.size());
- ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
- ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
- ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
- ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
- // four kv pairs * two bytes per value
- ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
- ASSERT_TRUE(s[0].IsNotFound());
- ASSERT_OK(s[1]);
- ASSERT_TRUE(s[2].IsNotFound());
- ASSERT_OK(s[3]);
- ASSERT_OK(s[4]);
- ASSERT_OK(s[5]);
- SetPerfLevel(kDisable);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- SetPerfLevel(kEnableCount);
- ASSERT_OK(Put(1, "k1", "v1"));
- ASSERT_OK(Put(1, "k2", "v2"));
- ASSERT_OK(Put(1, "k3", "v3"));
- ASSERT_OK(Put(1, "k4", "v4"));
- ASSERT_OK(Delete(1, "k4"));
- ASSERT_OK(Put(1, "k5", "v5"));
- ASSERT_OK(Delete(1, "no_key"));
- get_perf_context()->Reset();
- std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
- std::vector<PinnableSlice> values(keys.size());
- std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
- std::vector<Status> s(keys.size());
- db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
- values.data(), s.data(), true);
- ASSERT_EQ(values.size(), keys.size());
- ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
- ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
- ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
- ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
- // four kv pairs * two bytes per value
- ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
- ASSERT_OK(s[0]);
- ASSERT_OK(s[1]);
- ASSERT_OK(s[2]);
- ASSERT_TRUE(s[3].IsNotFound());
- ASSERT_OK(s[4]);
- ASSERT_TRUE(s[5].IsNotFound());
- SetPerfLevel(kDisable);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- Reopen(options);
- int num_keys = 0;
- for (int i = 0; i < 128; ++i) {
- ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- MoveFilesToLevel(2);
- for (int i = 0; i < 128; i += 3) {
- ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- MoveFilesToLevel(1);
- for (int i = 0; i < 128; i += 5) {
- ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- ASSERT_EQ(0, num_keys);
- for (int i = 0; i < 128; i += 9) {
- ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
- }
- std::vector<std::string> keys;
- std::vector<std::string> values;
- for (int i = 64; i < 80; ++i) {
- keys.push_back("key_" + std::to_string(i));
- }
- values = MultiGet(keys, nullptr);
- ASSERT_EQ(values.size(), 16);
- for (unsigned int j = 0; j < values.size(); ++j) {
- int key = j + 64;
- if (key % 9 == 0) {
- ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
- } else if (key % 5 == 0) {
- ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
- } else if (key % 3 == 0) {
- ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
- } else {
- ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
- }
- }
- }
- TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.merge_operator = MergeOperators::CreateStringAppendOperator();
- BlockBasedTableOptions bbto;
- bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- Reopen(options);
- int num_keys = 0;
- for (int i = 0; i < 128; ++i) {
- ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- MoveFilesToLevel(2);
- for (int i = 0; i < 128; i += 3) {
- ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- MoveFilesToLevel(1);
- for (int i = 0; i < 128; i += 5) {
- ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
- num_keys++;
- if (num_keys == 8) {
- Flush();
- num_keys = 0;
- }
- }
- if (num_keys > 0) {
- Flush();
- num_keys = 0;
- }
- ASSERT_EQ(0, num_keys);
- for (int i = 0; i < 128; i += 9) {
- ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
- }
- std::vector<std::string> keys;
- std::vector<std::string> values;
- for (int i = 32; i < 80; ++i) {
- keys.push_back("key_" + std::to_string(i));
- }
- values = MultiGet(keys, nullptr);
- ASSERT_EQ(values.size(), keys.size());
- for (unsigned int j = 0; j < 48; ++j) {
- int key = j + 32;
- std::string value;
- value.append("val_l2_" + std::to_string(key));
- if (key % 3 == 0) {
- value.append(",");
- value.append("val_l1_" + std::to_string(key));
- }
- if (key % 5 == 0) {
- value.append(",");
- value.append("val_l0_" + std::to_string(key));
- }
- if (key % 9 == 0) {
- value.append(",");
- value.append("val_mem_" + std::to_string(key));
- }
- ASSERT_EQ(values[j], value);
- }
- }
- // Test class for batched MultiGet with prefix extractor
- // Param bool - If true, use partitioned filters
- // If false, use full filter block
- class MultiGetPrefixExtractorTest : public DBBasicTest,
- public ::testing::WithParamInterface<bool> {
- };
- TEST_P(MultiGetPrefixExtractorTest, Batched) {
- Options options = CurrentOptions();
- options.prefix_extractor.reset(NewFixedPrefixTransform(2));
- options.memtable_prefix_bloom_size_ratio = 10;
- BlockBasedTableOptions bbto;
- if (GetParam()) {
- bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
- bbto.partition_filters = true;
- }
- bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
- bbto.whole_key_filtering = false;
- bbto.cache_index_and_filter_blocks = false;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- Reopen(options);
- SetPerfLevel(kEnableCount);
- get_perf_context()->Reset();
- // First key is not in the prefix_extractor domain
- ASSERT_OK(Put("k", "v0"));
- ASSERT_OK(Put("kk1", "v1"));
- ASSERT_OK(Put("kk2", "v2"));
- ASSERT_OK(Put("kk3", "v3"));
- ASSERT_OK(Put("kk4", "v4"));
- std::vector<std::string> mem_keys(
- {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
- std::vector<std::string> inmem_values;
- inmem_values = MultiGet(mem_keys, nullptr);
- ASSERT_EQ(inmem_values[0], "v0");
- ASSERT_EQ(inmem_values[1], "v1");
- ASSERT_EQ(inmem_values[2], "v2");
- ASSERT_EQ(inmem_values[3], "v3");
- ASSERT_EQ(inmem_values[4], "v4");
- ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
- ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
- ASSERT_OK(Flush());
- std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
- std::vector<std::string> values;
- get_perf_context()->Reset();
- values = MultiGet(keys, nullptr);
- ASSERT_EQ(values[0], "v0");
- ASSERT_EQ(values[1], "v1");
- ASSERT_EQ(values[2], "v2");
- ASSERT_EQ(values[3], "v3");
- ASSERT_EQ(values[4], "v4");
- // Filter hits for 4 in-domain keys
- ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
- }
- INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
- ::testing::Bool());
- #ifndef ROCKSDB_LITE
- class DBMultiGetRowCacheTest : public DBBasicTest,
- public ::testing::WithParamInterface<bool> {};
- TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
- do {
- option_config_ = kRowCache;
- Options options = CurrentOptions();
- options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
- CreateAndReopenWithCF({"pikachu"}, options);
- SetPerfLevel(kEnableCount);
- ASSERT_OK(Put(1, "k1", "v1"));
- ASSERT_OK(Put(1, "k2", "v2"));
- ASSERT_OK(Put(1, "k3", "v3"));
- ASSERT_OK(Put(1, "k4", "v4"));
- Flush(1);
- ASSERT_OK(Put(1, "k5", "v5"));
- const Snapshot* snap1 = dbfull()->GetSnapshot();
- ASSERT_OK(Delete(1, "k4"));
- Flush(1);
- const Snapshot* snap2 = dbfull()->GetSnapshot();
- get_perf_context()->Reset();
- std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
- std::vector<PinnableSlice> values(keys.size());
- std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
- std::vector<Status> s(keys.size());
- ReadOptions ro;
- bool use_snapshots = GetParam();
- if (use_snapshots) {
- ro.snapshot = snap2;
- }
- db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
- s.data(), false);
- ASSERT_EQ(values.size(), keys.size());
- ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
- ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
- ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
- // four kv pairs * two bytes per value
- ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
- ASSERT_TRUE(s[0].IsNotFound());
- ASSERT_OK(s[1]);
- ASSERT_TRUE(s[2].IsNotFound());
- ASSERT_OK(s[3]);
- ASSERT_OK(s[4]);
- // Call MultiGet() again with some intersection with the previous set of
- // keys. Those should already be in the row cache.
- keys.assign({"no_key", "k5", "k3", "k2"});
- for (size_t i = 0; i < keys.size(); ++i) {
- values[i].Reset();
- s[i] = Status::OK();
- }
- get_perf_context()->Reset();
- if (use_snapshots) {
- ro.snapshot = snap1;
- }
- db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
- values.data(), s.data(), false);
- ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
- ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
- ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
- // four kv pairs * two bytes per value
- ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
- ASSERT_TRUE(s[0].IsNotFound());
- ASSERT_OK(s[1]);
- ASSERT_OK(s[2]);
- ASSERT_OK(s[3]);
- if (use_snapshots) {
- // Only reads from the first SST file would have been cached, since
- // snapshot seq no is > fd.largest_seqno
- ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
- } else {
- ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
- }
- SetPerfLevel(kDisable);
- dbfull()->ReleaseSnapshot(snap1);
- dbfull()->ReleaseSnapshot(snap2);
- } while (ChangeCompactOptions());
- }
- INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
- testing::Values(true, false));
- TEST_F(DBBasicTest, GetAllKeyVersions) {
- Options options = CurrentOptions();
- options.env = env_;
- options.create_if_missing = true;
- options.disable_auto_compactions = true;
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_EQ(2, handles_.size());
- const size_t kNumInserts = 4;
- const size_t kNumDeletes = 4;
- const size_t kNumUpdates = 4;
- // Check default column family
- for (size_t i = 0; i != kNumInserts; ++i) {
- ASSERT_OK(Put(std::to_string(i), "value"));
- }
- for (size_t i = 0; i != kNumUpdates; ++i) {
- ASSERT_OK(Put(std::to_string(i), "value1"));
- }
- for (size_t i = 0; i != kNumDeletes; ++i) {
- ASSERT_OK(Delete(std::to_string(i)));
- }
- std::vector<KeyVersion> key_versions;
- ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
- db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
- &key_versions));
- ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
- ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
- db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
- &key_versions));
- ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
- // Check non-default column family
- for (size_t i = 0; i != kNumInserts - 1; ++i) {
- ASSERT_OK(Put(1, std::to_string(i), "value"));
- }
- for (size_t i = 0; i != kNumUpdates - 1; ++i) {
- ASSERT_OK(Put(1, std::to_string(i), "value1"));
- }
- for (size_t i = 0; i != kNumDeletes - 1; ++i) {
- ASSERT_OK(Delete(1, std::to_string(i)));
- }
- ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
- db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
- &key_versions));
- ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
- }
- #endif // !ROCKSDB_LITE
- TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
- Options options = CurrentOptions();
- Random rnd(301);
- BlockBasedTableOptions table_options;
- table_options.pin_l0_filter_and_index_blocks_in_cache = true;
- table_options.block_size = 16 * 1024;
- assert(table_options.block_size >
- BlockBasedTable::kMultiGetReadStackBufSize);
- options.table_factory.reset(new BlockBasedTableFactory(table_options));
- Reopen(options);
- std::string zero_str(128, '\0');
- for (int i = 0; i < 100; ++i) {
- // Make the value compressible. A purely random string doesn't compress
- // and the resultant data block will not be compressed
- std::string value(RandomString(&rnd, 128) + zero_str);
- assert(Put(Key(i), value) == Status::OK());
- }
- Flush();
- std::vector<std::string> key_data(10);
- std::vector<Slice> keys;
- // We cannot resize a PinnableSlice vector, so just set initial size to
- // largest we think we will need
- std::vector<PinnableSlice> values(10);
- std::vector<Status> statuses;
- ReadOptions ro;
- // Warm up the cache first
- key_data.emplace_back(Key(0));
- keys.emplace_back(Slice(key_data.back()));
- key_data.emplace_back(Key(50));
- keys.emplace_back(Slice(key_data.back()));
- statuses.resize(keys.size());
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- }
- class DBBasicTestWithParallelIO
- : public DBTestBase,
- public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
- public:
- DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
- bool compressed_cache = std::get<0>(GetParam());
- bool uncompressed_cache = std::get<1>(GetParam());
- compression_enabled_ = std::get<2>(GetParam());
- fill_cache_ = std::get<3>(GetParam());
- if (compressed_cache) {
- std::shared_ptr<Cache> cache = NewLRUCache(1048576);
- compressed_cache_ = std::make_shared<MyBlockCache>(cache);
- }
- if (uncompressed_cache) {
- std::shared_ptr<Cache> cache = NewLRUCache(1048576);
- uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
- }
- env_->count_random_reads_ = true;
- Options options = CurrentOptions();
- Random rnd(301);
- BlockBasedTableOptions table_options;
- #ifndef ROCKSDB_LITE
- if (compression_enabled_) {
- std::vector<CompressionType> compression_types;
- compression_types = GetSupportedCompressions();
- // Not every platform may have compression libraries available, so
- // dynamically pick based on what's available
- if (compression_types.size() == 0) {
- compression_enabled_ = false;
- } else {
- options.compression = compression_types[0];
- }
- }
- #else
- // GetSupportedCompressions() is not available in LITE build
- if (!Snappy_Supported()) {
- compression_enabled_ = false;
- }
- #endif //ROCKSDB_LITE
- table_options.block_cache = uncompressed_cache_;
- if (table_options.block_cache == nullptr) {
- table_options.no_block_cache = true;
- } else {
- table_options.pin_l0_filter_and_index_blocks_in_cache = true;
- }
- table_options.block_cache_compressed = compressed_cache_;
- table_options.flush_block_policy_factory.reset(
- new MyFlushBlockPolicyFactory());
- options.table_factory.reset(new BlockBasedTableFactory(table_options));
- if (!compression_enabled_) {
- options.compression = kNoCompression;
- }
- Reopen(options);
- std::string zero_str(128, '\0');
- for (int i = 0; i < 100; ++i) {
- // Make the value compressible. A purely random string doesn't compress
- // and the resultant data block will not be compressed
- values_.emplace_back(RandomString(&rnd, 128) + zero_str);
- assert(Put(Key(i), values_[i]) == Status::OK());
- }
- Flush();
- for (int i = 0; i < 100; ++i) {
- // block cannot gain space by compression
- uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
- std::string tmp_key = "a" + Key(i);
- assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
- }
- Flush();
- }
- bool CheckValue(int i, const std::string& value) {
- if (values_[i].compare(value) == 0) {
- return true;
- }
- return false;
- }
- bool CheckUncompressableValue(int i, const std::string& value) {
- if (uncompressable_values_[i].compare(value) == 0) {
- return true;
- }
- return false;
- }
- int num_lookups() { return uncompressed_cache_->num_lookups(); }
- int num_found() { return uncompressed_cache_->num_found(); }
- int num_inserts() { return uncompressed_cache_->num_inserts(); }
- int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
- int num_found_compressed() { return compressed_cache_->num_found(); }
- int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
- bool fill_cache() { return fill_cache_; }
- bool compression_enabled() { return compression_enabled_; }
- bool has_compressed_cache() { return compressed_cache_ != nullptr; }
- bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
- static void SetUpTestCase() {}
- static void TearDownTestCase() {}
- private:
- class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
- public:
- MyFlushBlockPolicyFactory() {}
- virtual const char* Name() const override {
- return "MyFlushBlockPolicyFactory";
- }
- virtual FlushBlockPolicy* NewFlushBlockPolicy(
- const BlockBasedTableOptions& /*table_options*/,
- const BlockBuilder& data_block_builder) const override {
- return new MyFlushBlockPolicy(data_block_builder);
- }
- };
- class MyFlushBlockPolicy : public FlushBlockPolicy {
- public:
- explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
- : num_keys_(0), data_block_builder_(data_block_builder) {}
- bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
- if (data_block_builder_.empty()) {
- // First key in this block
- num_keys_ = 1;
- return false;
- }
- // Flush every 10 keys
- if (num_keys_ == 10) {
- num_keys_ = 1;
- return true;
- }
- num_keys_++;
- return false;
- }
- private:
- int num_keys_;
- const BlockBuilder& data_block_builder_;
- };
- class MyBlockCache : public Cache {
- public:
- explicit MyBlockCache(std::shared_ptr<Cache>& target)
- : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
- virtual const char* Name() const override { return "MyBlockCache"; }
- virtual Status Insert(const Slice& key, void* value, size_t charge,
- void (*deleter)(const Slice& key, void* value),
- Handle** handle = nullptr,
- Priority priority = Priority::LOW) override {
- num_inserts_++;
- return target_->Insert(key, value, charge, deleter, handle, priority);
- }
- virtual Handle* Lookup(const Slice& key,
- Statistics* stats = nullptr) override {
- num_lookups_++;
- Handle* handle = target_->Lookup(key, stats);
- if (handle != nullptr) {
- num_found_++;
- }
- return handle;
- }
- virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
- virtual bool Release(Handle* handle, bool force_erase = false) override {
- return target_->Release(handle, force_erase);
- }
- virtual void* Value(Handle* handle) override {
- return target_->Value(handle);
- }
- virtual void Erase(const Slice& key) override { target_->Erase(key); }
- virtual uint64_t NewId() override { return target_->NewId(); }
- virtual void SetCapacity(size_t capacity) override {
- target_->SetCapacity(capacity);
- }
- virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
- target_->SetStrictCapacityLimit(strict_capacity_limit);
- }
- virtual bool HasStrictCapacityLimit() const override {
- return target_->HasStrictCapacityLimit();
- }
- virtual size_t GetCapacity() const override {
- return target_->GetCapacity();
- }
- virtual size_t GetUsage() const override { return target_->GetUsage(); }
- virtual size_t GetUsage(Handle* handle) const override {
- return target_->GetUsage(handle);
- }
- virtual size_t GetPinnedUsage() const override {
- return target_->GetPinnedUsage();
- }
- virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
- virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
- bool thread_safe) override {
- return target_->ApplyToAllCacheEntries(callback, thread_safe);
- }
- virtual void EraseUnRefEntries() override {
- return target_->EraseUnRefEntries();
- }
- int num_lookups() { return num_lookups_; }
- int num_found() { return num_found_; }
- int num_inserts() { return num_inserts_; }
- private:
- std::shared_ptr<Cache> target_;
- int num_lookups_;
- int num_found_;
- int num_inserts_;
- };
- std::shared_ptr<MyBlockCache> compressed_cache_;
- std::shared_ptr<MyBlockCache> uncompressed_cache_;
- bool compression_enabled_;
- std::vector<std::string> values_;
- std::vector<std::string> uncompressable_values_;
- bool fill_cache_;
- };
- TEST_P(DBBasicTestWithParallelIO, MultiGet) {
- std::vector<std::string> key_data(10);
- std::vector<Slice> keys;
- // We cannot resize a PinnableSlice vector, so just set initial size to
- // largest we think we will need
- std::vector<PinnableSlice> values(10);
- std::vector<Status> statuses;
- ReadOptions ro;
- ro.fill_cache = fill_cache();
- // Warm up the cache first
- key_data.emplace_back(Key(0));
- keys.emplace_back(Slice(key_data.back()));
- key_data.emplace_back(Key(50));
- keys.emplace_back(Slice(key_data.back()));
- statuses.resize(keys.size());
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- ASSERT_TRUE(CheckValue(0, values[0].ToString()));
- ASSERT_TRUE(CheckValue(50, values[1].ToString()));
- int random_reads = env_->random_read_counter_.Read();
- key_data[0] = Key(1);
- key_data[1] = Key(51);
- keys[0] = Slice(key_data[0]);
- keys[1] = Slice(key_data[1]);
- values[0].Reset();
- values[1].Reset();
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- ASSERT_TRUE(CheckValue(1, values[0].ToString()));
- ASSERT_TRUE(CheckValue(51, values[1].ToString()));
- bool read_from_cache = false;
- if (fill_cache()) {
- if (has_uncompressed_cache()) {
- read_from_cache = true;
- } else if (has_compressed_cache() && compression_enabled()) {
- read_from_cache = true;
- }
- }
- int expected_reads = random_reads + (read_from_cache ? 0 : 2);
- ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
- keys.resize(10);
- statuses.resize(10);
- std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
- for (size_t i = 0; i < key_ints.size(); ++i) {
- key_data[i] = Key(key_ints[i]);
- keys[i] = Slice(key_data[i]);
- statuses[i] = Status::OK();
- values[i].Reset();
- }
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- for (size_t i = 0; i < key_ints.size(); ++i) {
- ASSERT_OK(statuses[i]);
- ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
- }
- if (compression_enabled() && !has_compressed_cache()) {
- expected_reads += (read_from_cache ? 2 : 3);
- } else {
- expected_reads += (read_from_cache ? 2 : 4);
- }
- ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
- keys.resize(10);
- statuses.resize(10);
- std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
- for (size_t i = 0; i < key_uncmp.size(); ++i) {
- key_data[i] = "a" + Key(key_uncmp[i]);
- keys[i] = Slice(key_data[i]);
- statuses[i] = Status::OK();
- values[i].Reset();
- }
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- for (size_t i = 0; i < key_uncmp.size(); ++i) {
- ASSERT_OK(statuses[i]);
- ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
- }
- if (compression_enabled() && !has_compressed_cache()) {
- expected_reads += (read_from_cache ? 3 : 3);
- } else {
- expected_reads += (read_from_cache ? 4 : 4);
- }
- ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
- keys.resize(5);
- statuses.resize(5);
- std::vector<int> key_tr{1, 2, 15, 16, 55};
- for (size_t i = 0; i < key_tr.size(); ++i) {
- key_data[i] = "a" + Key(key_tr[i]);
- keys[i] = Slice(key_data[i]);
- statuses[i] = Status::OK();
- values[i].Reset();
- }
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- for (size_t i = 0; i < key_tr.size(); ++i) {
- ASSERT_OK(statuses[i]);
- ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
- }
- if (compression_enabled() && !has_compressed_cache()) {
- expected_reads += (read_from_cache ? 0 : 2);
- ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
- } else {
- if (has_uncompressed_cache()) {
- expected_reads += (read_from_cache ? 0 : 3);
- ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
- } else {
- // A rare case, even we enable the block compression but some of data
- // blocks are not compressed due to content. If user only enable the
- // compressed cache, the uncompressed blocks will not tbe cached, and
- // block reads will be triggered. The number of reads is related to
- // the compression algorithm.
- ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
- }
- }
- }
- TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
- std::vector<std::string> key_data(10);
- std::vector<Slice> keys;
- // We cannot resize a PinnableSlice vector, so just set initial size to
- // largest we think we will need
- std::vector<PinnableSlice> values(10);
- std::vector<Status> statuses;
- int read_count = 0;
- ReadOptions ro;
- ro.fill_cache = fill_cache();
- SyncPoint::GetInstance()->SetCallBack(
- "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
- Status* s = static_cast<Status*>(status);
- read_count++;
- if (read_count == 2) {
- *s = Status::Corruption();
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // Warm up the cache first
- key_data.emplace_back(Key(0));
- keys.emplace_back(Slice(key_data.back()));
- key_data.emplace_back(Key(50));
- keys.emplace_back(Slice(key_data.back()));
- statuses.resize(keys.size());
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- ASSERT_TRUE(CheckValue(0, values[0].ToString()));
- //ASSERT_TRUE(CheckValue(50, values[1].ToString()));
- ASSERT_EQ(statuses[0], Status::OK());
- ASSERT_EQ(statuses[1], Status::Corruption());
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
- std::vector<std::string> key_data(10);
- std::vector<Slice> keys;
- // We cannot resize a PinnableSlice vector, so just set initial size to
- // largest we think we will need
- std::vector<PinnableSlice> values(10);
- std::vector<Status> statuses;
- ReadOptions ro;
- ro.fill_cache = fill_cache();
- SyncPoint::GetInstance()->SetCallBack(
- "TableCache::MultiGet:FindTable", [&](void *status) {
- Status* s = static_cast<Status*>(status);
- *s = Status::IOError();
- });
- // DB open will create table readers unless we reduce the table cache
- // capacity.
- // SanitizeOptions will set max_open_files to minimum of 20. Table cache
- // is allocated with max_open_files - 10 as capacity. So override
- // max_open_files to 11 so table cache capacity will become 1. This will
- // prevent file open during DB open and force the file to be opened
- // during MultiGet
- SyncPoint::GetInstance()->SetCallBack(
- "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
- int* max_open_files = (int*)arg;
- *max_open_files = 11;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Reopen(CurrentOptions());
- // Warm up the cache first
- key_data.emplace_back(Key(0));
- keys.emplace_back(Slice(key_data.back()));
- key_data.emplace_back(Key(50));
- keys.emplace_back(Slice(key_data.back()));
- statuses.resize(keys.size());
- dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
- keys.data(), values.data(), statuses.data(), true);
- ASSERT_EQ(statuses[0], Status::IOError());
- ASSERT_EQ(statuses[1], Status::IOError());
- SyncPoint::GetInstance()->DisableProcessing();
- }
- INSTANTIATE_TEST_CASE_P(
- ParallelIO, DBBasicTestWithParallelIO,
- // Params are as follows -
- // Param 0 - Compressed cache enabled
- // Param 1 - Uncompressed cache enabled
- // Param 2 - Data compression enabled
- // Param 3 - ReadOptions::fill_cache
- ::testing::Combine(::testing::Bool(), ::testing::Bool(),
- ::testing::Bool(), ::testing::Bool()));
- class DBBasicTestWithTimestampBase : public DBTestBase {
- public:
- explicit DBBasicTestWithTimestampBase(const std::string& dbname)
- : DBTestBase(dbname) {}
- protected:
- class TestComparatorBase : public Comparator {
- public:
- explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
- const char* Name() const override { return "TestComparator"; }
- void FindShortSuccessor(std::string*) const override {}
- void FindShortestSeparator(std::string*, const Slice&) const override {}
- int Compare(const Slice& a, const Slice& b) const override {
- int r = CompareWithoutTimestamp(a, b);
- if (r != 0 || 0 == timestamp_size()) {
- return r;
- }
- return CompareTimestamp(
- Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
- Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
- }
- virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
- int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
- assert(a.size() >= timestamp_size());
- assert(b.size() >= timestamp_size());
- Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
- Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
- return CompareImpl(k1, k2);
- }
- int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
- if (!ts1.data() && !ts2.data()) {
- return 0;
- } else if (ts1.data() && !ts2.data()) {
- return 1;
- } else if (!ts1.data() && ts2.data()) {
- return -1;
- }
- assert(ts1.size() == ts2.size());
- uint64_t low1 = 0;
- uint64_t low2 = 0;
- uint64_t high1 = 0;
- uint64_t high2 = 0;
- auto* ptr1 = const_cast<Slice*>(&ts1);
- auto* ptr2 = const_cast<Slice*>(&ts2);
- if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
- !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
- assert(false);
- }
- if (high1 < high2) {
- return 1;
- } else if (high1 > high2) {
- return -1;
- }
- if (low1 < low2) {
- return 1;
- } else if (low1 > low2) {
- return -1;
- }
- return 0;
- }
- };
- Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
- assert(nullptr != ts);
- ts->clear();
- PutFixed64(ts, low);
- PutFixed64(ts, high);
- assert(ts->size() == sizeof(low) + sizeof(high));
- return Slice(*ts);
- }
- };
- class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
- public:
- DBBasicTestWithTimestamp()
- : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
- protected:
- class TestComparator : public TestComparatorBase {
- public:
- const int kKeyPrefixLength =
- 3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
- explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
- int CompareImpl(const Slice& a, const Slice& b) const override {
- int n1 = atoi(
- std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
- .c_str());
- int n2 = atoi(
- std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
- .c_str());
- return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
- }
- };
- };
- #ifndef ROCKSDB_LITE
- // A class which remembers the name of each flushed file.
- class FlushedFileCollector : public EventListener {
- public:
- FlushedFileCollector() {}
- ~FlushedFileCollector() override {}
- void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
- InstrumentedMutexLock lock(&mutex_);
- flushed_files_.push_back(info.file_path);
- }
- std::vector<std::string> GetFlushedFiles() {
- std::vector<std::string> result;
- {
- InstrumentedMutexLock lock(&mutex_);
- result = flushed_files_;
- }
- return result;
- }
- void ClearFlushedFiles() {
- InstrumentedMutexLock lock(&mutex_);
- flushed_files_.clear();
- }
- private:
- std::vector<std::string> flushed_files_;
- InstrumentedMutex mutex_;
- };
- TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
- const int kNumKeysPerFile = 8192;
- const size_t kNumTimestamps = 2;
- const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
- const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.env = env_;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
- FlushedFileCollector* collector = new FlushedFileCollector();
- options.listeners.emplace_back(collector);
- std::string tmp;
- size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
- TestComparator test_cmp(ts_sz);
- options.comparator = &test_cmp;
- BlockBasedTableOptions bbto;
- bbto.filter_policy.reset(NewBloomFilterPolicy(
- 10 /*bits_per_key*/, false /*use_block_based_builder*/));
- bbto.whole_key_filtering = true;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- size_t num_cfs = handles_.size();
- ASSERT_EQ(2, num_cfs);
- std::vector<std::string> write_ts_strs(kNumTimestamps);
- std::vector<std::string> read_ts_strs(kNumTimestamps);
- std::vector<Slice> write_ts_list;
- std::vector<Slice> read_ts_list;
- for (size_t i = 0; i != kNumTimestamps; ++i) {
- write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
- read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
- const Slice& write_ts = write_ts_list.back();
- WriteOptions wopts;
- wopts.timestamp = &write_ts;
- for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
- for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
- ASSERT_OK(Put(cf, "key" + std::to_string(j),
- "value_" + std::to_string(j) + "_" + std::to_string(i),
- wopts));
- if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
- // flush all keys with the same timestamp to two sst files, split at
- // incremental positions such that lowerlevel[1].smallest.userkey ==
- // higherlevel[0].largest.userkey
- ASSERT_OK(Flush(cf));
- // compact files (2 at each level) to a lower level such that all keys
- // with the same timestamp is at one level, with newer versions at
- // higher levels.
- CompactionOptions compact_opt;
- compact_opt.compression = kNoCompression;
- db_->CompactFiles(compact_opt, handles_[cf],
- collector->GetFlushedFiles(),
- static_cast<int>(kNumTimestamps - i));
- collector->ClearFlushedFiles();
- }
- }
- }
- }
- const auto& verify_db_func = [&]() {
- for (size_t i = 0; i != kNumTimestamps; ++i) {
- ReadOptions ropts;
- ropts.timestamp = &read_ts_list[i];
- for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
- ColumnFamilyHandle* cfh = handles_[cf];
- for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
- std::string value;
- ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
- ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
- value);
- }
- }
- }
- };
- verify_db_func();
- }
- #endif // !ROCKSDB_LITE
- class DBBasicTestWithTimestampWithParam
- : public DBBasicTestWithTimestampBase,
- public testing::WithParamInterface<bool> {
- public:
- DBBasicTestWithTimestampWithParam()
- : DBBasicTestWithTimestampBase(
- "/db_basic_test_with_timestamp_with_param") {}
- protected:
- class TestComparator : public TestComparatorBase {
- private:
- const Comparator* cmp_without_ts_;
- public:
- explicit TestComparator(size_t ts_sz)
- : TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
- cmp_without_ts_ = BytewiseComparator();
- }
- int CompareImpl(const Slice& a, const Slice& b) const override {
- return cmp_without_ts_->Compare(a, b);
- }
- };
- };
- TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
- const int kNumKeysPerFile = 8192;
- const size_t kNumTimestamps = 6;
- bool memtable_only = GetParam();
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.env = env_;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
- std::string tmp;
- size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
- TestComparator test_cmp(ts_sz);
- options.comparator = &test_cmp;
- BlockBasedTableOptions bbto;
- bbto.filter_policy.reset(NewBloomFilterPolicy(
- 10 /*bits_per_key*/, false /*use_block_based_builder*/));
- bbto.whole_key_filtering = true;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- std::vector<CompressionType> compression_types;
- compression_types.push_back(kNoCompression);
- if (Zlib_Supported()) {
- compression_types.push_back(kZlibCompression);
- }
- #if LZ4_VERSION_NUMBER >= 10400 // r124+
- compression_types.push_back(kLZ4Compression);
- compression_types.push_back(kLZ4HCCompression);
- #endif // LZ4_VERSION_NUMBER >= 10400
- if (ZSTD_Supported()) {
- compression_types.push_back(kZSTD);
- }
- // Switch compression dictionary on/off to check key extraction
- // correctness in kBuffered state
- std::vector<uint32_t> max_dict_bytes_list = {0, 1 << 14}; // 0 or 16KB
- for (auto compression_type : compression_types) {
- for (uint32_t max_dict_bytes : max_dict_bytes_list) {
- options.compression = compression_type;
- options.compression_opts.max_dict_bytes = max_dict_bytes;
- if (compression_type == kZSTD) {
- options.compression_opts.zstd_max_train_bytes = max_dict_bytes;
- }
- options.target_file_size_base = 1 << 26; // 64MB
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- size_t num_cfs = handles_.size();
- ASSERT_EQ(2, num_cfs);
- std::vector<std::string> write_ts_strs(kNumTimestamps);
- std::vector<std::string> read_ts_strs(kNumTimestamps);
- std::vector<Slice> write_ts_list;
- std::vector<Slice> read_ts_list;
- for (size_t i = 0; i != kNumTimestamps; ++i) {
- write_ts_list.emplace_back(
- EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
- read_ts_list.emplace_back(
- EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
- const Slice& write_ts = write_ts_list.back();
- WriteOptions wopts;
- wopts.timestamp = &write_ts;
- for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
- for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
- ASSERT_OK(Put(
- cf, "key" + std::to_string(j),
- "value_" + std::to_string(j) + "_" + std::to_string(i), wopts));
- }
- if (!memtable_only) {
- ASSERT_OK(Flush(cf));
- }
- }
- }
- const auto& verify_db_func = [&]() {
- for (size_t i = 0; i != kNumTimestamps; ++i) {
- ReadOptions ropts;
- ropts.timestamp = &read_ts_list[i];
- for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
- ColumnFamilyHandle* cfh = handles_[cf];
- for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps;
- ++j) {
- std::string value;
- ASSERT_OK(
- db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
- ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
- value);
- }
- }
- }
- };
- verify_db_func();
- }
- }
- }
- INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
- ::testing::Bool());
- } // namespace ROCKSDB_NAMESPACE
- #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
- extern "C" {
- void RegisterCustomObjects(int argc, char** argv);
- }
- #else
- void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
- #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- RegisterCustomObjects(argc, argv);
- return RUN_ALL_TESTS();
- }
|