| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/db_test_util.h"
- #include "file/file_util.h"
- #include "port/stack_trace.h"
- #include "rocksdb/utilities/options_util.h"
- #include "table/unique_id_impl.h"
- #include "utilities/merge_operators/string_append/stringappend.h"
- namespace ROCKSDB_NAMESPACE {
- class MyTestCompactionService : public CompactionService {
- public:
- MyTestCompactionService(
- std::string db_path, Options& options,
- std::shared_ptr<Statistics>& statistics,
- std::vector<std::shared_ptr<EventListener>> listeners,
- std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
- table_properties_collector_factories)
- : db_path_(std::move(db_path)),
- statistics_(statistics),
- options_(options),
- start_info_("na", "na", "na", 0, "na", 0, Env::TOTAL,
- CompactionReason::kUnknown, false, false, false, -1, -1),
- wait_info_("na", "na", "na", 0, "na", 0, Env::TOTAL,
- CompactionReason::kUnknown, false, false, false, -1, -1),
- listeners_(std::move(listeners)),
- table_properties_collector_factories_(
- std::move(table_properties_collector_factories)) {}
- static const char* kClassName() { return "MyTestCompactionService"; }
- const char* Name() const override { return kClassName(); }
- CompactionServiceScheduleResponse Schedule(
- const CompactionServiceJobInfo& info,
- const std::string& compaction_service_input) override {
- InstrumentedMutexLock l(&mutex_);
- start_info_ = info;
- assert(info.db_name == db_path_);
- std::string unique_id = Env::Default()->GenerateUniqueId();
- jobs_.emplace(unique_id, compaction_service_input);
- infos_.emplace(unique_id, info);
- CompactionServiceScheduleResponse response(
- unique_id, is_override_start_status_
- ? override_start_status_
- : CompactionServiceJobStatus::kSuccess);
- return response;
- }
- CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
- std::string* result) override {
- std::string compaction_input;
- {
- InstrumentedMutexLock l(&mutex_);
- auto job_index = jobs_.find(scheduled_job_id);
- if (job_index == jobs_.end()) {
- return CompactionServiceJobStatus::kFailure;
- }
- compaction_input = std::move(job_index->second);
- jobs_.erase(job_index);
- auto info_index = infos_.find(scheduled_job_id);
- if (info_index == infos_.end()) {
- return CompactionServiceJobStatus::kFailure;
- }
- wait_info_ = std::move(info_index->second);
- infos_.erase(info_index);
- }
- if (is_override_wait_status_) {
- return override_wait_status_;
- }
- CompactionServiceOptionsOverride options_override = GetOptionsOverride();
- OpenAndCompactOptions options;
- options.canceled = &canceled_;
- Status s =
- DB::OpenAndCompact(options, db_path_, GetOutputPath(scheduled_job_id),
- compaction_input, result, options_override);
- {
- InstrumentedMutexLock l(&mutex_);
- if (is_override_wait_result_) {
- *result = override_wait_result_;
- }
- result_ = *result;
- }
- compaction_num_.fetch_add(1);
- if (s.ok()) {
- return CompactionServiceJobStatus::kSuccess;
- } else {
- return CompactionServiceJobStatus::kFailure;
- }
- }
- CompactionServiceOptionsOverride GetOptionsOverride() {
- CompactionServiceOptionsOverride options_override;
- options_override.env = options_.env;
- options_override.file_checksum_gen_factory =
- options_.file_checksum_gen_factory;
- options_override.comparator = options_.comparator;
- options_override.merge_operator = options_.merge_operator;
- options_override.compaction_filter = options_.compaction_filter;
- options_override.compaction_filter_factory =
- options_.compaction_filter_factory;
- options_override.prefix_extractor = options_.prefix_extractor;
- options_override.table_factory = options_.table_factory;
- options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
- options_override.statistics = statistics_;
- options_override.info_log = options_.info_log;
- if (!listeners_.empty()) {
- options_override.listeners = listeners_;
- }
- if (!table_properties_collector_factories_.empty()) {
- options_override.table_properties_collector_factories =
- table_properties_collector_factories_;
- }
- return options_override;
- }
- void CancelAwaitingJobs() override { canceled_ = true; }
- void OnInstallation(const std::string& /*scheduled_job_id*/,
- CompactionServiceJobStatus status) override {
- final_updated_status_ = status;
- }
- int GetCompactionNum() { return compaction_num_.load(); }
- CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
- CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
- void OverrideStartStatus(CompactionServiceJobStatus s) {
- is_override_start_status_ = true;
- override_start_status_ = s;
- }
- void OverrideWaitStatus(CompactionServiceJobStatus s) {
- is_override_wait_status_ = true;
- override_wait_status_ = s;
- }
- void OverrideWaitResult(std::string str) {
- is_override_wait_result_ = true;
- override_wait_result_ = std::move(str);
- }
- void ResetOverride() {
- is_override_wait_result_ = false;
- is_override_start_status_ = false;
- is_override_wait_status_ = false;
- }
- void SetCanceled(bool canceled) { canceled_ = canceled; }
- bool GetCanceled() { return canceled_; }
- void GetResult(CompactionServiceResult* deserialized) {
- CompactionServiceResult::Read(result_, deserialized).PermitUncheckedError();
- }
- CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() {
- return final_updated_status_.load();
- }
- protected:
- InstrumentedMutex mutex_;
- const std::string db_path_;
- std::shared_ptr<Statistics> statistics_;
- std::map<std::string, std::string> jobs_;
- std::map<std::string, CompactionServiceJobInfo> infos_;
- std::string result_;
- std::string GetOutputPath(const std::string& scheduled_job_id) {
- return db_path_ + "/" + scheduled_job_id;
- }
- private:
- std::atomic_int compaction_num_{0};
- Options options_;
- CompactionServiceJobInfo start_info_;
- CompactionServiceJobInfo wait_info_;
- bool is_override_start_status_ = false;
- CompactionServiceJobStatus override_start_status_ =
- CompactionServiceJobStatus::kFailure;
- bool is_override_wait_status_ = false;
- CompactionServiceJobStatus override_wait_status_ =
- CompactionServiceJobStatus::kFailure;
- bool is_override_wait_result_ = false;
- std::string override_wait_result_;
- std::vector<std::shared_ptr<EventListener>> listeners_;
- std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
- table_properties_collector_factories_;
- std::atomic_bool canceled_{false};
- std::atomic<CompactionServiceJobStatus> final_updated_status_{
- CompactionServiceJobStatus::kUseLocal};
- };
- class CompactionServiceTest : public DBTestBase {
- public:
- explicit CompactionServiceTest()
- : DBTestBase("compaction_service_test", true) {}
- protected:
- void ReopenWithCompactionService(Options* options) {
- options->env = env_;
- primary_statistics_ = CreateDBStatistics();
- options->statistics = primary_statistics_;
- compactor_statistics_ = CreateDBStatistics();
- auto my_cs = std::make_shared<MyTestCompactionService>(
- dbname_, *options, compactor_statistics_, remote_listeners,
- remote_table_properties_collector_factories);
- compaction_service_ = my_cs;
- options->compaction_service = compaction_service_;
- DestroyAndReopen(*options);
- CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, *options);
- my_cs->SetCanceled(false);
- }
- Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
- Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
- MyTestCompactionService* GetCompactionService() {
- CompactionService* cs = compaction_service_.get();
- return static_cast_with_check<MyTestCompactionService>(cs);
- }
- void GenerateTestData(bool move_files_manually = false) {
- // Generate 20 files @ L2 Per CF
- for (int cf_id = 0; cf_id < static_cast<int>(handles_.size()); cf_id++) {
- for (int i = 0; i < 20; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 10 + j;
- ASSERT_OK(Put(cf_id, Key(key_id), "value" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush(cf_id));
- }
- if (move_files_manually) {
- MoveFilesToLevel(2, cf_id);
- }
- // Generate 10 files @ L1 overlap with all 20 files @ L2
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 20 + j * 2;
- ASSERT_OK(
- Put(cf_id, Key(key_id), "value_new" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush(cf_id));
- }
- if (move_files_manually) {
- MoveFilesToLevel(1, cf_id);
- ASSERT_EQ(FilesPerLevel(cf_id), "0,10,20");
- }
- }
- }
- void VerifyTestData() {
- for (int cf_id = 0; cf_id < static_cast<int>(handles_.size()); cf_id++) {
- for (int i = 0; i < 200; i++) {
- auto result = Get(cf_id, Key(i));
- if (i % 2) {
- ASSERT_EQ(result, "value" + std::to_string(i));
- } else {
- ASSERT_EQ(result, "value_new" + std::to_string(i));
- }
- }
- }
- }
- std::vector<std::shared_ptr<EventListener>> remote_listeners;
- std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
- remote_table_properties_collector_factories;
- private:
- std::shared_ptr<Statistics> compactor_statistics_;
- std::shared_ptr<Statistics> primary_statistics_;
- std::shared_ptr<CompactionService> compaction_service_;
- };
- TEST_F(CompactionServiceTest, BasicCompactions) {
- Options options = CurrentOptions();
- ReopenWithCompactionService(&options);
- Statistics* primary_statistics = GetPrimaryStatistics();
- Statistics* compactor_statistics = GetCompactorStatistics();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "BlockBasedTable::PrefetchTail::TaiSizeNotRecorded",
- [&](void* /* arg */) {
- // Trigger assertion to verify precise tail prefetch size calculation
- assert(false);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- GenerateTestData();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- SyncPoint::GetInstance()->DisableProcessing();
- VerifyTestData();
- auto my_cs = GetCompactionService();
- ASSERT_GE(my_cs->GetCompactionNum(), 1);
- ASSERT_EQ(CompactionServiceJobStatus::kSuccess,
- my_cs->GetFinalCompactionServiceJobStatus());
- // make sure the compaction statistics is only recorded on the remote side
- ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
- ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
- ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
- // even with remote compaction, primary host still needs to read SST files to
- // `verify_table()`.
- ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
- // all the compaction write happens on the remote side
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
- compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
- ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
- ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
- primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
- // compactor is already the remote side, which doesn't have remote
- ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
- ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
- 0);
- // Test failed compaction
- SyncPoint::GetInstance()->SetCallBack(
- "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
- // override job status
- auto s = static_cast<Status*>(status);
- *s = Status::Aborted("MyTestCompactionService failed to compact!");
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Status s;
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 20 + j * 2;
- s = Put(Key(key_id), "value_new" + std::to_string(key_id));
- if (s.IsAborted()) {
- break;
- }
- }
- if (s.IsAborted()) {
- break;
- }
- s = Flush();
- if (s.IsAborted()) {
- break;
- }
- s = dbfull()->TEST_WaitForCompact();
- if (s.IsAborted()) {
- break;
- }
- }
- ASSERT_TRUE(s.IsAborted());
- // Test re-open and successful unique id verification
- std::atomic_int verify_passed{0};
- SyncPoint::GetInstance()->SetCallBack(
- "BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
- // override job status
- auto id = static_cast<UniqueId64x2*>(arg);
- assert(*id != kNullUniqueId64x2);
- verify_passed++;
- });
- Close();
- my_cs->SetCanceled(false);
- ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"},
- options);
- ASSERT_GT(verify_passed, 0);
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- if (s.IsAborted()) {
- ASSERT_NOK(result.status);
- } else {
- ASSERT_OK(result.status);
- }
- ASSERT_GE(result.internal_stats.output_level_stats.micros, 1);
- ASSERT_GE(result.internal_stats.output_level_stats.cpu_micros, 1);
- ASSERT_EQ(20, result.internal_stats.output_level_stats.num_output_records);
- ASSERT_EQ(result.output_files.size(),
- result.internal_stats.output_level_stats.num_output_files);
- uint64_t total_size = 0;
- for (auto output_file : result.output_files) {
- std::string file_name = result.output_path + "/" + output_file.file_name;
- uint64_t file_size = 0;
- ASSERT_OK(options.env->GetFileSize(file_name, &file_size));
- ASSERT_GT(file_size, 0);
- total_size += file_size;
- }
- ASSERT_EQ(total_size, result.internal_stats.TotalBytesWritten());
- ASSERT_TRUE(result.stats.is_remote_compaction);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_FALSE(result.stats.is_full_compaction);
- Close();
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(CompactionServiceTest, ManualCompaction) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- VerifyTestData();
- start_str = Key(120);
- start = start_str;
- comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- VerifyTestData();
- end_str = Key(92);
- end = end_str;
- comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- VerifyTestData();
- comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- VerifyTestData();
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- auto info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(0, info.cf_id);
- ASSERT_EQ(kDefaultColumnFamilyName, info.cf_name);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(0, info.cf_id);
- ASSERT_EQ(kDefaultColumnFamilyName, info.cf_name);
- // Test non-default CF
- ASSERT_OK(
- db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(handles_[1]->GetID(), info.cf_id);
- ASSERT_EQ(handles_[1]->GetName(), info.cf_name);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(handles_[1]->GetID(), info.cf_id);
- ASSERT_EQ(handles_[1]->GetName(), info.cf_name);
- }
- TEST_F(CompactionServiceTest, StandaloneDeleteRangeTombstoneOptimization) {
- Options options = CurrentOptions();
- size_t num_files_after_filtered = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "VersionSet::MakeInputIterator:NewCompactionMergingIterator",
- [&](void* arg) {
- num_files_after_filtered = *static_cast<size_t*>(arg);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- for (auto compaction_style : {CompactionStyle::kCompactionStyleLevel,
- CompactionStyle::kCompactionStyleUniversal}) {
- SCOPED_TRACE("Style: " + std::to_string(compaction_style));
- options.compaction_style = compaction_style;
- ReopenWithCompactionService(&options);
- num_files_after_filtered = 0;
- std::vector<std::string> files;
- {
- // Writes first version of data in range partitioned files.
- SstFileWriter sst_file_writer(EnvOptions(), options);
- std::string file1 = dbname_ + "file1.sst";
- ASSERT_OK(sst_file_writer.Open(file1));
- ASSERT_OK(sst_file_writer.Put("a", "a1"));
- ASSERT_OK(sst_file_writer.Put("b", "b1"));
- ExternalSstFileInfo file1_info;
- ASSERT_OK(sst_file_writer.Finish(&file1_info));
- files.push_back(std::move(file1));
- std::string file2 = dbname_ + "file2.sst";
- ASSERT_OK(sst_file_writer.Open(file2));
- ASSERT_OK(sst_file_writer.Put("x", "x1"));
- ASSERT_OK(sst_file_writer.Put("y", "y1"));
- ExternalSstFileInfo file2_info;
- ASSERT_OK(sst_file_writer.Finish(&file2_info));
- files.push_back(std::move(file2));
- }
- IngestExternalFileOptions ifo;
- ASSERT_OK(db_->IngestExternalFile(files, ifo));
- ASSERT_EQ(Get("a"), "a1");
- ASSERT_EQ(Get("b"), "b1");
- ASSERT_EQ(Get("x"), "x1");
- ASSERT_EQ(Get("y"), "y1");
- ASSERT_EQ(2, NumTableFilesAtLevel(6));
- auto my_cs = GetCompactionService();
- uint64_t comp_num = my_cs->GetCompactionNum();
- {
- // Atomically delete old version of data with one range delete file.
- // And a new batch of range partitioned files with new version of data.
- files.clear();
- SstFileWriter sst_file_writer(EnvOptions(), options);
- std::string file2 = dbname_ + "file2.sst";
- ASSERT_OK(sst_file_writer.Open(file2));
- ASSERT_OK(sst_file_writer.DeleteRange("a", "z"));
- ExternalSstFileInfo file2_info;
- ASSERT_OK(sst_file_writer.Finish(&file2_info));
- files.push_back(std::move(file2));
- std::string file3 = dbname_ + "file3.sst";
- ASSERT_OK(sst_file_writer.Open(file3));
- ASSERT_OK(sst_file_writer.Put("a", "a2"));
- ASSERT_OK(sst_file_writer.Put("b", "b2"));
- ExternalSstFileInfo file3_info;
- ASSERT_OK(sst_file_writer.Finish(&file3_info));
- files.push_back(std::move(file3));
- std::string file4 = dbname_ + "file4.sst";
- ASSERT_OK(sst_file_writer.Open(file4));
- ASSERT_OK(sst_file_writer.Put("x", "x2"));
- ASSERT_OK(sst_file_writer.Put("y", "y2"));
- ExternalSstFileInfo file4_info;
- ASSERT_OK(sst_file_writer.Finish(&file4_info));
- files.push_back(std::move(file4));
- }
- ASSERT_OK(db_->IngestExternalFile(files, ifo));
- ASSERT_OK(db_->WaitForCompact(WaitForCompactOptions()));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- if (compaction_style == kCompactionStyleUniversal) {
- ASSERT_EQ(num_files_after_filtered, 1);
- } else {
- // Not filtered
- ASSERT_EQ(num_files_after_filtered, 3);
- }
- Close();
- }
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(CompactionServiceTest, CompactionOutputFileIOError) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::FinishCompactionOutputFile()::AfterFinish",
- [&](void* status) {
- // override status
- auto s = static_cast<Status*>(status);
- *s = Status::IOError("Injected IOError!");
- });
- SyncPoint::GetInstance()->EnableProcessing();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_NOK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- TEST_F(CompactionServiceTest, PreservedOptionsLocalCompaction) {
- Options options = CurrentOptions();
- options.level0_file_num_compaction_trigger = 2;
- options.disable_auto_compactions = true;
- DestroyAndReopen(options);
- Random rnd(301);
- for (auto i = 0; i < 2; ++i) {
- for (auto j = 0; j < 10; ++j) {
- ASSERT_OK(
- Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
- }
- ASSERT_OK(Flush());
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
- auto compaction = static_cast<Compaction*>(arg);
- std::string options_file_name = OptionsFileName(
- dbname_,
- compaction->input_version()->version_set()->options_file_number());
- // Change option twice to make sure the very first OPTIONS file gets
- // purged
- ASSERT_OK(dbfull()->SetOptions(
- {{"level0_file_num_compaction_trigger", "4"}}));
- ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
- ASSERT_OK(dbfull()->SetOptions(
- {{"level0_file_num_compaction_trigger", "6"}}));
- ASSERT_EQ(6, dbfull()->GetOptions().level0_file_num_compaction_trigger);
- dbfull()->TEST_DeleteObsoleteFiles();
- // For non-remote compactions, OPTIONS file can be deleted while
- // using option at the start of the compaction
- Status s = env_->FileExists(options_file_name);
- ASSERT_NOK(s);
- ASSERT_TRUE(s.IsNotFound());
- // Should be old value
- ASSERT_EQ(2, compaction->mutable_cf_options()
- .level0_file_num_compaction_trigger);
- ASSERT_TRUE(dbfull()->min_options_file_numbers_.empty());
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_TRUE(s.ok());
- }
- TEST_F(CompactionServiceTest, PreservedOptionsRemoteCompaction) {
- // For non-remote compaction do not preserve options file
- Options options = CurrentOptions();
- options.level0_file_num_compaction_trigger = 2;
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- Random rnd(301);
- for (auto i = 0; i < 2; ++i) {
- for (auto j = 0; j < 10; ++j) {
- ASSERT_OK(
- Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
- }
- ASSERT_OK(Flush());
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"CompactionServiceTest::OptionsFileChanged",
- "DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:1"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:0",
- [&](void* arg) {
- auto options_file_number = static_cast<uint64_t*>(arg);
- // Change the option twice before the compaction run
- ASSERT_OK(dbfull()->SetOptions(
- {{"level0_file_num_compaction_trigger", "4"}}));
- ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
- ASSERT_TRUE(dbfull()->versions_->options_file_number() >
- *options_file_number);
- // Change the option twice before the compaction run
- ASSERT_OK(dbfull()->SetOptions(
- {{"level0_file_num_compaction_trigger", "5"}}));
- ASSERT_EQ(5, dbfull()->GetOptions().level0_file_num_compaction_trigger);
- ASSERT_TRUE(dbfull()->versions_->options_file_number() >
- *options_file_number);
- TEST_SYNC_POINT("CompactionServiceTest::OptionsFileChanged");
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
- [&](void* arg) {
- auto input = static_cast<CompactionServiceInput*>(arg);
- std::string options_file_name =
- OptionsFileName(dbname_, input->options_file_number);
- ASSERT_OK(env_->FileExists(options_file_name));
- ASSERT_FALSE(dbfull()->min_options_file_numbers_.empty());
- ASSERT_EQ(dbfull()->min_options_file_numbers_.front(),
- input->options_file_number);
- DBOptions db_options;
- ConfigOptions config_options;
- std::vector<ColumnFamilyDescriptor> all_column_families;
- config_options.env = env_;
- ASSERT_OK(LoadOptionsFromFile(config_options, options_file_name,
- &db_options, &all_column_families));
- bool has_cf = false;
- for (auto& cf : all_column_families) {
- if (cf.name == input->cf_name) {
- // Should be old value
- ASSERT_EQ(2, cf.options.level0_file_num_compaction_trigger);
- has_cf = true;
- }
- }
- ASSERT_TRUE(has_cf);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
- auto compaction = static_cast<Compaction*>(arg);
- ASSERT_EQ(2, compaction->mutable_cf_options()
- .level0_file_num_compaction_trigger);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_TRUE(s.ok());
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- class EventVerifier : public EventListener {
- public:
- explicit EventVerifier(uint64_t expected_num_input_records,
- size_t expected_num_input_files,
- uint64_t expected_num_output_records,
- size_t expected_num_output_files,
- const std::string& expected_smallest_output_key_prefix,
- const std::string& expected_largest_output_key_prefix,
- bool expected_is_remote_compaction_on_begin,
- bool expected_is_remote_compaction_on_complete)
- : expected_num_input_records_(expected_num_input_records),
- expected_num_input_files_(expected_num_input_files),
- expected_num_output_records_(expected_num_output_records),
- expected_num_output_files_(expected_num_output_files),
- expected_smallest_output_key_prefix_(
- expected_smallest_output_key_prefix),
- expected_largest_output_key_prefix_(expected_largest_output_key_prefix),
- expected_is_remote_compaction_on_begin_(
- expected_is_remote_compaction_on_begin),
- expected_is_remote_compaction_on_complete_(
- expected_is_remote_compaction_on_complete) {}
- void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
- ASSERT_EQ(expected_num_input_files_, ci.input_files.size());
- ASSERT_EQ(expected_num_input_files_, ci.input_file_infos.size());
- ASSERT_EQ(expected_is_remote_compaction_on_begin_,
- ci.stats.is_remote_compaction);
- ASSERT_TRUE(ci.stats.is_manual_compaction);
- ASSERT_FALSE(ci.stats.is_full_compaction);
- }
- void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
- ASSERT_GT(ci.stats.elapsed_micros, 0);
- ASSERT_GT(ci.stats.cpu_micros, 0);
- ASSERT_EQ(expected_num_input_records_, ci.stats.num_input_records);
- ASSERT_EQ(expected_num_input_files_, ci.stats.num_input_files);
- ASSERT_EQ(expected_num_output_records_, ci.stats.num_output_records);
- ASSERT_EQ(expected_num_output_files_, ci.stats.num_output_files);
- ASSERT_EQ(expected_smallest_output_key_prefix_,
- ci.stats.smallest_output_key_prefix);
- ASSERT_EQ(expected_largest_output_key_prefix_,
- ci.stats.largest_output_key_prefix);
- ASSERT_GT(ci.stats.total_input_bytes, 0);
- ASSERT_GT(ci.stats.total_output_bytes, 0);
- ASSERT_EQ(ci.stats.num_input_records,
- ci.stats.num_output_records + ci.stats.num_records_replaced);
- ASSERT_EQ(expected_is_remote_compaction_on_complete_,
- ci.stats.is_remote_compaction);
- ASSERT_TRUE(ci.stats.is_manual_compaction);
- ASSERT_FALSE(ci.stats.is_full_compaction);
- }
- private:
- uint64_t expected_num_input_records_;
- size_t expected_num_input_files_;
- uint64_t expected_num_output_records_;
- size_t expected_num_output_files_;
- std::string expected_smallest_output_key_prefix_;
- std::string expected_largest_output_key_prefix_;
- bool expected_is_remote_compaction_on_begin_;
- bool expected_is_remote_compaction_on_complete_;
- };
- TEST_F(CompactionServiceTest, VerifyStats) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- auto event_verifier = std::make_shared<EventVerifier>(
- 30 /* expected_num_input_records */, 3 /* expected_num_input_files */,
- 20 /* expected_num_output_records */, 1 /* expected_num_output_files */,
- "key00000" /* expected_smallest_output_key_prefix */,
- "key00001" /* expected_largest_output_key_prefix */,
- true /* expected_is_remote_compaction_on_begin */,
- true /* expected_is_remote_compaction_on_complete */);
- options.listeners.push_back(event_verifier);
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(0);
- std::string end_str = Key(1);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- VerifyTestData();
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- TEST_F(CompactionServiceTest, VerifyStatsLocalFallback) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- auto event_verifier = std::make_shared<EventVerifier>(
- 30 /* expected_num_input_records */, 3 /* expected_num_input_files */,
- 20 /* expected_num_output_records */, 1 /* expected_num_output_files */,
- "key00000" /* expected_smallest_output_key_prefix */,
- "key00001" /* expected_largest_output_key_prefix */,
- true /* expected_is_remote_compaction_on_begin */,
- false /* expected_is_remote_compaction_on_complete */);
- options.listeners.push_back(event_verifier);
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
- std::string start_str = Key(0);
- std::string end_str = Key(1);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- // Remote Compaction did not happen
- ASSERT_EQ(my_cs->GetCompactionNum(), comp_num);
- VerifyTestData();
- }
- TEST_F(CompactionServiceTest, VerifyInputRecordCount) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- // Only iterator through 10 keys and force compaction to finish.
- int num_iter = 0;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::ProcessKeyValueCompaction()::stop", [&](void* stop_ptr) {
- num_iter++;
- if (num_iter == 10) {
- *(bool*)stop_ptr = true;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // CompactRange() should fail
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_NOK(s);
- ASSERT_TRUE(s.IsCorruption());
- const char* expected_message =
- "Compaction number of input keys does not match number of keys "
- "processed.";
- ASSERT_TRUE(std::strstr(s.getState(), expected_message));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_F(CompactionServiceTest, EmptyResult) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- // Delete range to cover entire range
- ASSERT_OK(db_->DeleteRange(WriteOptions(), "key", "keyz"));
- ASSERT_OK(Flush());
- // In this unit test, both remote compaction and primary db instance are
- // running in the same process, so NewFileNumber will never have a collision.
- // In the real-world remote compactions, when the compaction is indeed running
- // in another process, this is not going to be the case.
- // To simulate the SST file with the same name created in the tmp directory,
- // override the file number in remote compaction to re-use old SST file
- // number.
- bool need_to_override_file_number = false;
- SyncPoint::GetInstance()->SetCallBack(
- "DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:0",
- [&](void*) { need_to_override_file_number = true; });
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::OpenCompactionOutputFile::NewFileNumber",
- [&](void* file_number) {
- if (need_to_override_file_number) {
- auto n = static_cast<uint64_t*>(file_number);
- ColumnFamilyMetaData cf_meta;
- db_->GetColumnFamilyMetaData(&cf_meta);
- for (const auto& level : cf_meta.levels) {
- for (const auto& file : level.files) {
- // Use one of the existing file name
- *n = test::GetFileNumber(file.name);
- need_to_override_file_number = false;
- return;
- }
- }
- }
- });
- // Inject failure, so that the remote compaction fails after
- // ProcessKeyValueCompaction()
- SyncPoint::GetInstance()->SetCallBack(
- "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
- // override job status
- auto s = static_cast<Status*>(status);
- *s = Status::Aborted("MyTestCompactionService failed to compact!");
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // Compaction should fail and SST files in the primary db should exist
- {
- ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- ColumnFamilyMetaData meta;
- db_->GetColumnFamilyMetaData(&meta);
- for (const auto& level : meta.levels) {
- for (const auto& file : level.files) {
- std::string fname = file.db_path + "/" + file.name;
- ASSERT_OK(db_->GetEnv()->FileExists(fname));
- }
- }
- }
- Close();
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_F(CompactionServiceTest, CorruptedOutput) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
- CompactionServiceResult* compaction_result =
- *(static_cast<CompactionServiceResult**>(arg));
- ASSERT_TRUE(compaction_result != nullptr &&
- !compaction_result->output_files.empty());
- // Corrupt files here
- for (const auto& output_file : compaction_result->output_files) {
- std::string file_name =
- compaction_result->output_path + "/" + output_file.file_name;
- uint64_t file_size = 0;
- Status s = options.env->GetFileSize(file_name, &file_size);
- ASSERT_OK(s);
- ASSERT_GT(file_size, 0);
- ASSERT_OK(test::CorruptFile(env_, file_name, 0,
- static_cast<int>(file_size),
- true /* verifyChecksum */));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // CompactRange() should fail
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_NOK(s);
- ASSERT_TRUE(s.IsCorruption());
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // On the worker side, the compaction is considered success
- // Verification is done on the primary side
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- TEST_F(CompactionServiceTest, CorruptedOutputParanoidFileCheck) {
- for (bool paranoid_file_check_enabled : {false, true}) {
- SCOPED_TRACE("paranoid_file_check_enabled=" +
- std::to_string(paranoid_file_check_enabled));
- Options options = CurrentOptions();
- Destroy(options);
- options.disable_auto_compactions = true;
- options.paranoid_file_checks = paranoid_file_check_enabled;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
- CompactionServiceResult* compaction_result =
- *(static_cast<CompactionServiceResult**>(arg));
- ASSERT_TRUE(compaction_result != nullptr &&
- !compaction_result->output_files.empty());
- // Corrupt files here
- for (const auto& output_file : compaction_result->output_files) {
- std::string file_name =
- compaction_result->output_path + "/" + output_file.file_name;
- // Corrupt very small range of bytes. This corruption is so small
- // that this isn't caught by default light-weight check
- ASSERT_OK(test::CorruptFile(env_, file_name, 0, 1,
- false /* verifyChecksum */));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- if (paranoid_file_check_enabled) {
- ASSERT_NOK(s);
- ASSERT_EQ(Status::Corruption("Paranoid checksums do not match"), s);
- } else {
- // CompactRange() goes through if paranoid file check is not enabled
- ASSERT_OK(s);
- }
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // On the worker side, the compaction is considered success
- // Verification is done on the primary side
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- }
- TEST_F(CompactionServiceTest, TruncatedOutput) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- // Skip calculating tail size to avoid crashing due to truncated file size
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "FileMetaData::CalculateTailSize", [&](void* arg) {
- bool* skip = static_cast<bool*>(arg);
- *skip = true;
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
- CompactionServiceResult* compaction_result =
- *(static_cast<CompactionServiceResult**>(arg));
- ASSERT_TRUE(compaction_result != nullptr &&
- !compaction_result->output_files.empty());
- // Truncate files here
- for (const auto& output_file : compaction_result->output_files) {
- std::string file_name =
- compaction_result->output_path + "/" + output_file.file_name;
- uint64_t file_size = 0;
- Status s = options.env->GetFileSize(file_name, &file_size);
- ASSERT_OK(s);
- ASSERT_GT(file_size, 0);
- ASSERT_OK(test::TruncateFile(env_, file_name, file_size / 4));
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // CompactRange() should fail
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_NOK(s);
- ASSERT_TRUE(s.IsCorruption());
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // On the worker side, the compaction is considered success
- // Verification is done on the primary side
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- TEST_F(CompactionServiceTest, CustomFileChecksum) {
- Options options = CurrentOptions();
- options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "CompactionServiceCompactionJob::Run:0", [&](void* arg) {
- CompactionServiceResult* compaction_result =
- *(static_cast<CompactionServiceResult**>(arg));
- ASSERT_TRUE(compaction_result != nullptr &&
- !compaction_result->output_files.empty());
- // Validate Checksum files here
- for (const auto& output_file : compaction_result->output_files) {
- std::string file_name =
- compaction_result->output_path + "/" + output_file.file_name;
- FileChecksumGenContext gen_context;
- gen_context.file_name = file_name;
- std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
- options.file_checksum_gen_factory->CreateFileChecksumGenerator(
- gen_context);
- std::unique_ptr<SequentialFile> file_reader;
- uint64_t file_size = 0;
- Status s = options.env->GetFileSize(file_name, &file_size);
- ASSERT_OK(s);
- ASSERT_GT(file_size, 0);
- s = options.env->NewSequentialFile(file_name, &file_reader,
- EnvOptions());
- ASSERT_OK(s);
- Slice result;
- std::unique_ptr<char[]> scratch(new char[file_size]);
- s = file_reader->Read(file_size, &result, scratch.get());
- ASSERT_OK(s);
- file_checksum_gen->Update(scratch.get(), result.size());
- file_checksum_gen->Finalize();
- // Verify actual checksum and the func name
- ASSERT_EQ(file_checksum_gen->Name(),
- output_file.file_checksum_func_name);
- ASSERT_EQ(file_checksum_gen->GetChecksum(),
- output_file.file_checksum);
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- }
- TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- // Test cancel compaction at the beginning
- my_cs->SetCanceled(true);
- auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_TRUE(s.IsIncomplete());
- // compaction number is not increased
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
- VerifyTestData();
- // Test cancel compaction in progress
- ReopenWithCompactionService(&options);
- GenerateTestData();
- my_cs = GetCompactionService();
- my_cs->SetCanceled(false);
- std::atomic_bool cancel_issued{false};
- SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
- [&](void* /*arg*/) {
- cancel_issued = true;
- my_cs->SetCanceled(true);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_TRUE(s.IsIncomplete());
- ASSERT_TRUE(cancel_issued);
- // compaction number is not increased
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
- VerifyTestData();
- }
- TEST_F(CompactionServiceTest, CancelCompactionOnPrimarySide) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ReopenWithCompactionService(&options);
- GenerateTestData();
- my_cs = GetCompactionService();
- // Primary DB calls CancelAllBackgroundWork() while the compaction is running
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::Run():Inprogress",
- [&](void* /*arg*/) { CancelAllBackgroundWork(db_, false /*wait*/); });
- SyncPoint::GetInstance()->EnableProcessing();
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_TRUE(s.IsIncomplete());
- // Check canceled_ was set to true by CancelAwaitingJobs()
- ASSERT_TRUE(my_cs->GetCanceled());
- // compaction number is not increased
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
- }
- TEST_F(CompactionServiceTest, FailedToStart) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_TRUE(s.IsIncomplete());
- }
- TEST_F(CompactionServiceTest, InvalidResult) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- auto my_cs = GetCompactionService();
- my_cs->OverrideWaitResult("Invalid Str");
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
- ASSERT_FALSE(s.ok());
- ASSERT_EQ(CompactionServiceJobStatus::kFailure,
- my_cs->GetFinalCompactionServiceJobStatus());
- }
- TEST_F(CompactionServiceTest, SubCompaction) {
- Options options = CurrentOptions();
- options.max_subcompactions = 10;
- options.target_file_size_base = 1 << 10; // 1KB
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- VerifyTestData();
- auto my_cs = GetCompactionService();
- int compaction_num_before = my_cs->GetCompactionNum();
- auto cro = CompactRangeOptions();
- cro.max_subcompactions = 10;
- Status s = db_->CompactRange(cro, nullptr, nullptr);
- ASSERT_OK(s);
- VerifyTestData();
- int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
- // make sure there's sub-compaction by checking the compaction number
- ASSERT_GE(compaction_num, 2);
- }
- class PartialDeleteCompactionFilter : public CompactionFilter {
- public:
- CompactionFilter::Decision FilterV2(
- int /*level*/, const Slice& key, ValueType /*value_type*/,
- const Slice& /*existing_value*/, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override {
- int i = std::stoi(key.ToString().substr(3));
- if (i > 5 && i <= 105) {
- return CompactionFilter::Decision::kRemove;
- }
- return CompactionFilter::Decision::kKeep;
- }
- const char* Name() const override { return "PartialDeleteCompactionFilter"; }
- };
- TEST_F(CompactionServiceTest, CompactionFilter) {
- Options options = CurrentOptions();
- std::unique_ptr<CompactionFilter> delete_comp_filter(
- new PartialDeleteCompactionFilter());
- options.compaction_filter = delete_comp_filter.get();
- ReopenWithCompactionService(&options);
- GenerateTestData();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // verify result
- for (int i = 0; i < 200; i++) {
- auto result = Get(Key(i));
- if (i > 5 && i <= 105) {
- ASSERT_EQ(result, "NOT_FOUND");
- } else if (i % 2) {
- ASSERT_EQ(result, "value" + std::to_string(i));
- } else {
- ASSERT_EQ(result, "value_new" + std::to_string(i));
- }
- }
- auto my_cs = GetCompactionService();
- ASSERT_GE(my_cs->GetCompactionNum(), 1);
- }
- TEST_F(CompactionServiceTest, MergeOperator) {
- Options options = CurrentOptions();
- options.merge_operator.reset(new StringAppendOperator(','));
- ReopenWithCompactionService(&options);
- GenerateTestData();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- for (int i = 0; i < 200; i++) {
- ASSERT_OK(db_->Merge(WriteOptions(), Key(i),
- "merge_op_append_" + std::to_string(i)));
- }
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- // verify result
- for (int i = 0; i < 200; i++) {
- auto result = Get(Key(i));
- if (i % 2) {
- ASSERT_EQ(result, "value" + std::to_string(i) + ",merge_op_append_" +
- std::to_string(i));
- } else {
- ASSERT_EQ(result, "value_new" + std::to_string(i) + ",merge_op_append_" +
- std::to_string(i));
- }
- }
- auto my_cs = GetCompactionService();
- ASSERT_GE(my_cs->GetCompactionNum(), 1);
- }
- TEST_F(CompactionServiceTest, Snapshot) {
- Options options = CurrentOptions();
- ReopenWithCompactionService(&options);
- ASSERT_OK(Put(Key(1), "value1"));
- ASSERT_OK(Put(Key(2), "value1"));
- const Snapshot* s1 = db_->GetSnapshot();
- ASSERT_OK(Flush());
- ASSERT_OK(Put(Key(1), "value2"));
- ASSERT_OK(Put(Key(3), "value2"));
- ASSERT_OK(Flush());
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- auto my_cs = GetCompactionService();
- ASSERT_GE(my_cs->GetCompactionNum(), 1);
- ASSERT_EQ("value1", Get(Key(1), s1));
- ASSERT_EQ("value2", Get(Key(1)));
- db_->ReleaseSnapshot(s1);
- }
- TEST_F(CompactionServiceTest, PrecludeLastLevel) {
- const int kNumTrigger = 4;
- const int kNumLevels = 7;
- const int kNumKeys = 100;
- Options options = CurrentOptions();
- options.compaction_style = kCompactionStyleUniversal;
- options.last_level_temperature = Temperature::kCold;
- options.level0_file_num_compaction_trigger = 4;
- options.max_subcompactions = 10;
- options.num_levels = kNumLevels;
- ReopenWithCompactionService(&options);
- // Alternate for comparison: DestroyAndReopen(options);
- // This is simpler than setting up mock time to make the user option work,
- // but is not as direct as testing with preclude option itself.
- SyncPoint::GetInstance()->SetCallBack(
- "Compaction::SupportsPerKeyPlacement:Enabled",
- [&](void* arg) { *static_cast<bool*>(arg) = true; });
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
- [&](void* arg) { *static_cast<SequenceNumber*>(arg) = 100; });
- SyncPoint::GetInstance()->EnableProcessing();
- for (int i = 0; i < kNumTrigger; i++) {
- for (int j = 0; j < kNumKeys; j++) {
- ASSERT_OK(Put(Key(j * kNumTrigger + i), "v" + std::to_string(i)));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- // Data split between proximal (kUnknown) and last (kCold) levels
- ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
- ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
- ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
- // TODO: Check FileSystem temperatures with FileTemperatureTestFS
- for (int i = 0; i < kNumTrigger; i++) {
- for (int j = 0; j < kNumKeys; j++) {
- ASSERT_EQ(Get(Key(j * kNumTrigger + i)), "v" + std::to_string(i));
- }
- }
- // Verify Output Stats
- auto my_cs = GetCompactionService();
- {
- CompactionServiceResult result;
- my_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_GT(result.internal_stats.output_level_stats.cpu_micros, 0);
- ASSERT_GT(result.internal_stats.output_level_stats.micros, 0);
- ASSERT_EQ(result.internal_stats.output_level_stats.num_output_records +
- result.internal_stats.proximal_level_stats.num_output_records,
- kNumTrigger * kNumKeys);
- ASSERT_EQ(result.internal_stats.output_level_stats.num_output_files +
- result.internal_stats.proximal_level_stats.num_output_files,
- 2);
- CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(0, info.base_input_level);
- ASSERT_EQ(kNumLevels - 1, info.output_level);
- }
- SyncPoint::GetInstance()->DisableProcessing();
- // Disable Preclude feature and run full compaction to the bottommost level
- {
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(kNumLevels - 2, info.base_input_level);
- ASSERT_EQ(kNumLevels - 1, info.output_level);
- }
- }
- TEST_F(CompactionServiceTest, ConcurrentCompaction) {
- Options options = CurrentOptions();
- options.level0_file_num_compaction_trigger = 100;
- options.max_background_jobs = 20;
- ReopenWithCompactionService(&options);
- GenerateTestData(true);
- ColumnFamilyMetaData meta;
- db_->GetColumnFamilyMetaData(&meta);
- std::vector<std::thread> threads;
- for (const auto& file : meta.levels[1].files) {
- threads.emplace_back([&]() {
- std::string fname = file.db_path + "/" + file.name;
- ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
- });
- }
- for (auto& thread : threads) {
- thread.join();
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- // verify result
- VerifyTestData();
- auto my_cs = GetCompactionService();
- ASSERT_EQ(my_cs->GetCompactionNum(), 10);
- ASSERT_EQ(FilesPerLevel(), "0,0,10");
- }
- TEST_F(CompactionServiceTest, CompactionInfo) {
- Options options = CurrentOptions();
- ReopenWithCompactionService(&options);
- GenerateTestData();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- auto my_cs =
- static_cast_with_check<MyTestCompactionService>(GetCompactionService());
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_GE(comp_num, 1);
- CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(dbname_, info.db_name);
- std::string db_id, db_session_id;
- ASSERT_OK(db_->GetDbIdentity(db_id));
- ASSERT_EQ(db_id, info.db_id);
- ASSERT_OK(db_->GetDbSessionId(db_session_id));
- ASSERT_EQ(db_session_id, info.db_session_id);
- ASSERT_EQ(Env::LOW, info.priority);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(dbname_, info.db_name);
- ASSERT_EQ(db_id, info.db_id);
- ASSERT_EQ(db_session_id, info.db_session_id);
- ASSERT_EQ(Env::LOW, info.priority);
- // Test priority USER
- ColumnFamilyMetaData meta;
- db_->GetColumnFamilyMetaData(&meta);
- SstFileMetaData file = meta.levels[1].files[0];
- ASSERT_OK(db_->CompactFiles(CompactionOptions(),
- {file.db_path + "/" + file.name}, 2));
- info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(Env::USER, info.priority);
- ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
- ASSERT_EQ(true, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(1, info.base_input_level);
- ASSERT_EQ(2, info.output_level);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(Env::USER, info.priority);
- ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
- ASSERT_EQ(true, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(1, info.base_input_level);
- ASSERT_EQ(2, info.output_level);
- ASSERT_EQ(kDefaultColumnFamilyName, info.cf_name);
- // Test priority BOTTOM
- env_->SetBackgroundThreads(1, Env::BOTTOM);
- // This will set bottommost_level = true but is_full_compaction = false
- options.num_levels = 2;
- ReopenWithCompactionService(&options);
- my_cs =
- static_cast_with_check<MyTestCompactionService>(GetCompactionService());
- for (int i = 0; i < 20; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 10 + j;
- ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- for (int i = 0; i < 4; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 20 + j * 2;
- ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
- ASSERT_EQ(false, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(Env::BOTTOM, info.priority);
- ASSERT_EQ(0, info.base_input_level);
- ASSERT_EQ(db_->NumberLevels() - 1, info.output_level);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(Env::BOTTOM, info.priority);
- ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
- ASSERT_EQ(false, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(0, info.base_input_level);
- ASSERT_EQ(db_->NumberLevels() - 1, info.output_level);
- // Test Non-Bottommost Level
- options.num_levels = 4;
- ReopenWithCompactionService(&options);
- my_cs =
- static_cast_with_check<MyTestCompactionService>(GetCompactionService());
- int compaction_num = my_cs->GetCompactionNum();
- ASSERT_EQ(0, compaction_num);
- for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 10 + j;
- ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- // This is trivial move. Done locally.
- ASSERT_EQ(0, my_cs->GetCompactionNum());
- info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(false, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(false, info.bottommost_level);
- ASSERT_EQ(-1, info.base_input_level);
- ASSERT_EQ(-1, info.output_level);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(false, info.is_manual_compaction);
- ASSERT_EQ(false, info.is_full_compaction);
- ASSERT_EQ(false, info.bottommost_level);
- ASSERT_EQ(-1, info.base_input_level);
- ASSERT_EQ(-1, info.output_level);
- // Test Full Compaction + Bottommost Level
- options.num_levels = 6;
- ReopenWithCompactionService(&options);
- my_cs =
- static_cast_with_check<MyTestCompactionService>(GetCompactionService());
- for (int i = 0; i < 20; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 10 + j;
- ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- MoveFilesToLevel(options.num_levels - 1);
- // Force final level compaction
- // base_input_level == output_level == last_level
- CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
- ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- info = my_cs->GetCompactionInfoForStart();
- ASSERT_EQ(true, info.is_manual_compaction);
- ASSERT_EQ(true, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
- info = my_cs->GetCompactionInfoForWait();
- ASSERT_EQ(options.num_levels - 1, info.base_input_level);
- ASSERT_EQ(options.num_levels - 1, info.output_level);
- ASSERT_EQ(true, info.is_manual_compaction);
- ASSERT_EQ(true, info.is_full_compaction);
- ASSERT_EQ(true, info.bottommost_level);
- ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
- ASSERT_EQ(options.num_levels - 1, info.base_input_level);
- ASSERT_EQ(options.num_levels - 1, info.output_level);
- ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
- }
- TEST_F(CompactionServiceTest, FallbackLocalAuto) {
- Options options = CurrentOptions();
- ReopenWithCompactionService(&options);
- auto my_cs = GetCompactionService();
- Statistics* compactor_statistics = GetCompactorStatistics();
- Statistics* primary_statistics = GetPrimaryStatistics();
- uint64_t compactor_write_bytes =
- compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- uint64_t primary_write_bytes =
- primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
- GenerateTestData();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- VerifyTestData();
- ASSERT_EQ(my_cs->GetCompactionNum(), 0);
- // make sure the compaction statistics is only recorded on the local side
- ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- compactor_write_bytes);
- ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- primary_write_bytes);
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
- }
- TEST_F(CompactionServiceTest, FallbackLocalManual) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- VerifyTestData();
- auto my_cs = GetCompactionService();
- Statistics* compactor_statistics = GetCompactorStatistics();
- Statistics* primary_statistics = GetPrimaryStatistics();
- uint64_t compactor_write_bytes =
- compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- uint64_t primary_write_bytes =
- primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- // re-enable remote compaction
- my_cs->ResetOverride();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- uint64_t comp_num = my_cs->GetCompactionNum();
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
- ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
- // make sure the compaction statistics is only recorded on the remote side
- ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- compactor_write_bytes);
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
- compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
- ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- primary_write_bytes);
- // return run local again with API WaitForComplete
- my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
- start_str = Key(120);
- start = start_str;
- comp_num = my_cs->GetCompactionNum();
- compactor_write_bytes =
- compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
- ASSERT_EQ(my_cs->GetCompactionNum(),
- comp_num); // no remote compaction is run
- // make sure the compaction statistics is only recorded on the local side
- ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- compactor_write_bytes);
- ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
- primary_write_bytes);
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
- compactor_write_bytes);
- // verify result after 2 manual compactions
- VerifyTestData();
- }
- TEST_F(CompactionServiceTest, AbortedWhileWait) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- ReopenWithCompactionService(&options);
- GenerateTestData();
- VerifyTestData();
- auto my_cs = GetCompactionService();
- Statistics* compactor_statistics = GetCompactorStatistics();
- Statistics* primary_statistics = GetPrimaryStatistics();
- my_cs->ResetOverride();
- std::string start_str = Key(15);
- std::string end_str = Key(45);
- Slice start(start_str);
- Slice end(end_str);
- // Override Wait() result with kAborted
- my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kAborted);
- start_str = Key(120);
- start = start_str;
- Status s = db_->CompactRange(CompactRangeOptions(), &start, nullptr);
- ASSERT_NOK(s);
- ASSERT_TRUE(s.IsAborted());
- // no remote compaction is run
- ASSERT_EQ(my_cs->GetCompactionNum(), 0);
- // make sure the compaction statistics is not recorded any side
- ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
- ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
- ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
- }
- TEST_F(CompactionServiceTest, RemoteEventListener) {
- class RemoteEventListenerTest : public EventListener {
- public:
- const char* Name() const override { return "RemoteEventListenerTest"; }
- void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
- auto result = on_going_compactions.emplace(info.job_id);
- ASSERT_TRUE(result.second); // make sure there's no duplication
- compaction_num++;
- EventListener::OnSubcompactionBegin(info);
- }
- void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
- auto num = on_going_compactions.erase(info.job_id);
- ASSERT_TRUE(num == 1); // make sure the compaction id exists
- EventListener::OnSubcompactionCompleted(info);
- }
- void OnTableFileCreated(const TableFileCreationInfo& info) override {
- ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
- file_created++;
- EventListener::OnTableFileCreated(info);
- }
- void OnTableFileCreationStarted(
- const TableFileCreationBriefInfo& info) override {
- ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
- file_creation_started++;
- EventListener::OnTableFileCreationStarted(info);
- }
- bool ShouldBeNotifiedOnFileIO() override {
- file_io_notified++;
- return EventListener::ShouldBeNotifiedOnFileIO();
- }
- std::atomic_uint64_t file_io_notified{0};
- std::atomic_uint64_t file_creation_started{0};
- std::atomic_uint64_t file_created{0};
- std::set<int> on_going_compactions; // store the job_id
- std::atomic_uint64_t compaction_num{0};
- };
- auto listener = new RemoteEventListenerTest();
- remote_listeners.emplace_back(listener);
- Options options = CurrentOptions();
- ReopenWithCompactionService(&options);
- for (int i = 0; i < 20; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 10 + j;
- ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- int key_id = i * 20 + j * 2;
- ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- // check the events are triggered
- ASSERT_TRUE(listener->file_io_notified > 0);
- ASSERT_TRUE(listener->file_creation_started > 0);
- ASSERT_TRUE(listener->file_created > 0);
- ASSERT_TRUE(listener->compaction_num > 0);
- ASSERT_TRUE(listener->on_going_compactions.empty());
- // verify result
- for (int i = 0; i < 200; i++) {
- auto result = Get(Key(i));
- if (i % 2) {
- ASSERT_EQ(result, "value" + std::to_string(i));
- } else {
- ASSERT_EQ(result, "value_new" + std::to_string(i));
- }
- }
- }
- TEST_F(CompactionServiceTest, TablePropertiesCollector) {
- const static std::string kUserPropertyName = "TestCount";
- class TablePropertiesCollectorTest : public TablePropertiesCollector {
- public:
- Status Finish(UserCollectedProperties* properties) override {
- *properties = UserCollectedProperties{
- {kUserPropertyName, std::to_string(count_)},
- };
- return Status::OK();
- }
- UserCollectedProperties GetReadableProperties() const override {
- return UserCollectedProperties();
- }
- const char* Name() const override { return "TablePropertiesCollectorTest"; }
- Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
- EntryType /*type*/, SequenceNumber /*seq*/,
- uint64_t /*file_size*/) override {
- count_++;
- return Status::OK();
- }
- private:
- uint32_t count_ = 0;
- };
- class TablePropertiesCollectorFactoryTest
- : public TablePropertiesCollectorFactory {
- public:
- TablePropertiesCollector* CreateTablePropertiesCollector(
- TablePropertiesCollectorFactory::Context /*context*/) override {
- return new TablePropertiesCollectorTest();
- }
- const char* Name() const override {
- return "TablePropertiesCollectorFactoryTest";
- }
- };
- auto factory = new TablePropertiesCollectorFactoryTest();
- remote_table_properties_collector_factories.emplace_back(factory);
- const int kNumSst = 3;
- const int kLevel0Trigger = 4;
- Options options = CurrentOptions();
- options.level0_file_num_compaction_trigger = kLevel0Trigger;
- ReopenWithCompactionService(&options);
- // generate a few SSTs locally which should not have user property
- for (int i = 0; i < kNumSst; i++) {
- for (int j = 0; j < 100; j++) {
- ASSERT_OK(Put(Key(i * 10 + j), "value"));
- }
- ASSERT_OK(Flush());
- }
- TablePropertiesCollection fname_to_props;
- ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
- for (const auto& file_props : fname_to_props) {
- auto properties = file_props.second->user_collected_properties;
- auto it = properties.find(kUserPropertyName);
- ASSERT_EQ(it, properties.end());
- }
- // trigger compaction
- for (int i = kNumSst; i < kLevel0Trigger; i++) {
- for (int j = 0; j < 100; j++) {
- ASSERT_OK(Put(Key(i * 10 + j), "value"));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
- bool has_user_property = false;
- for (const auto& file_props : fname_to_props) {
- auto properties = file_props.second->user_collected_properties;
- auto it = properties.find(kUserPropertyName);
- if (it != properties.end()) {
- has_user_property = true;
- ASSERT_GT(std::stoi(it->second), 0);
- }
- }
- ASSERT_TRUE(has_user_property);
- }
- class ResumableCompactionService : public MyTestCompactionService {
- public:
- enum class TestScenario {
- // Test scenario 1: Two-phase compaction with resumption
- // - Phase 1: Cancel the compaction running with resumption enabled (saves
- // progress)
- // - Phase 2: Resume from saved progress and complete
- // Validates: Resumption reduces redundant work
- kCancelThenResume,
- // Test scenario 2: Two-phase compaction without resumption
- // - Phase 1: Cancel the compaction running with resumption enabled (saves
- // progress)
- // - Phase 2: Start fresh without resumption (ignores saved progress) and
- // complete
- // Validates: Disabling resumption causes full reprocessing
- kCancelThenFreshStart,
- // Test scenario 3: Three-phase compaction toggling resumption on/off/on
- // - Phase 1: Cancel the compaction running with resumption enabled (saves
- // progress)
- // - Phase 2: Start fresh wtihout resumption (ignores saved progress) and
- // cancel agains
- // - Phase 3: Resume with resumption support (loads Phase 1's progress) and
- // complete
- // Validates: Resumption state can be toggled;
- kMultipleCancelToggleResumption
- };
- ResumableCompactionService(const std::string& db_path, Options& options,
- std::shared_ptr<Statistics> statistics,
- TestScenario scenario)
- : MyTestCompactionService(db_path, options, statistics,
- {} /* listeners */,
- {} /* table_properties_collector_factories */),
- scenario_(scenario) {}
- CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
- std::string* result) override {
- std::string compaction_input = ExtractCompactionInput(scheduled_job_id);
- EXPECT_FALSE(compaction_input.empty());
- OpenAndCompactOptions open_and_compaction_options;
- auto override_options = GetOptionsOverride();
- // Force creation of one key per output file for test simplicity.
- // ASSUMPTION: This makes stats.count directly proportional to keys
- // processed.
- SyncPoint::GetInstance()->SetCallBack(
- "CompactionOutputs::ShouldStopBefore::manual_decision", [](void* p) {
- auto* pair = static_cast<std::pair<bool*, const Slice>*>(p);
- *(pair->first) = true;
- });
- // Simulate cancelled compaction by overriding status at completion. So
- // compaction processes all keys before this point to make stats.count
- // comparison straightforward.
- SyncPoint::GetInstance()->SetCallBack(
- "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
- auto s = static_cast<Status*>(status);
- *s = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- // Phase 1: Run compaction with resumption enabled and cancel it
- // - Processes all input keys
- // - Creates output files and saves progress
- // - Status overridden to "paused"
- open_and_compaction_options.allow_resumption = true;
- auto phase1_stats =
- RunCancelledCompaction(open_and_compaction_options, scheduled_job_id,
- compaction_input, override_options);
- HistogramData phase2_stats;
- if (scenario_ == TestScenario::kMultipleCancelToggleResumption) {
- // Phase 2: Run compaction WITHOUT resumption (fresh start) and cancel it
- // - Delete all files left behind Phase 1 before calling OpenAndCompact()
- // - Processes all input keys again from scratch
- // - Creates output files but does NOT save progress
- // - Status overridden to "paused"
- open_and_compaction_options.allow_resumption = false;
- // Clean up output folder for fresh start
- std::string output_dir = GetOutputPath(scheduled_job_id);
- Status cleanup_status = DestroyDir(override_options.env, output_dir);
- EXPECT_TRUE(cleanup_status.ok());
- EXPECT_OK(override_options.env->CreateDir(output_dir));
- phase2_stats =
- RunCancelledCompaction(open_and_compaction_options, scheduled_job_id,
- compaction_input, override_options);
- // Validation: Phase 2 starts from scratch, so it processes the same
- // input keys as Phase 1.
- // ASSUMPTION: With fixed input (10 keys) and deterministic cancellation
- // (after processing), both phases create the same number of output files.
- EXPECT_EQ(phase2_stats.count, phase1_stats.count);
- }
- SyncPoint::GetInstance()->ClearCallBack(
- "DBImplSecondary::CompactWithoutInstallation::End");
- // Final phase: Run compaction to completion (no cancellation)
- if (scenario_ == TestScenario::kMultipleCancelToggleResumption) {
- // Attempt to resume but it ends up starting fresh
- open_and_compaction_options.allow_resumption = true;
- } else if (scenario_ == TestScenario::kCancelThenResume) {
- // Resume from Phase 1's saved progress
- open_and_compaction_options.allow_resumption = true;
- } else { // kCancelThenFreshStart
- // Start fresh without resumption
- open_and_compaction_options.allow_resumption = false;
- // Clean up output folder for fresh start
- std::string output_dir = GetOutputPath(scheduled_job_id);
- Status cleanup_status = DestroyDir(override_options.env, output_dir);
- EXPECT_TRUE(cleanup_status.ok());
- EXPECT_OK(override_options.env->CreateDir(output_dir));
- }
- auto final_phase_stats =
- RunCompaction(open_and_compaction_options, scheduled_job_id,
- compaction_input, override_options, result);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // Validate statistics based on scenario
- if (scenario_ == TestScenario::kMultipleCancelToggleResumption) {
- // ASSUMPTION: Phase 1 processes all keys before cancellation
- EXPECT_GT(phase1_stats.count, 0);
- // ASSUMPTION: Phase 2 runs with allow_resumption=false and an empty
- // folder. Phase 2 then creates its own output files (but doesn't save
- // progress). When Phase 3 starts with allow_resumption=true, it finds no
- // progress file exists, so it cannot resume and must start from scratch,
- // processing all input keys again.
- // Result: Phase 3 does the same amount of work as Phase 1.
- EXPECT_EQ(final_phase_stats.count, phase1_stats.count);
- } else if (scenario_ == TestScenario::kCancelThenResume) {
- // ASSUMPTION: Phase 1 processes all keys before cancellation
- EXPECT_GT(phase1_stats.count, 0);
- // ASSUMPTION: Phase 1 processes all keys and saves progress before
- // cancellation. Final phase resumes from Phase 1's saved progress.
- // Since Phase 1 completed all processing before being cancelled, the
- // final phase should do less work than Phase 1.
- EXPECT_LT(final_phase_stats.count, phase1_stats.count);
- } else { // kCancelThenFreshStart
- // ASSUMPTION: Phase 1 processes all keys before cancellation
- EXPECT_GT(phase1_stats.count, 0);
- // ASSUMPTION: Final phase starts fresh without resumption, so it
- // processes all input keys again and creates the same number of files
- EXPECT_EQ(final_phase_stats.count, phase1_stats.count);
- }
- StoreResult(*result);
- return CompactionServiceJobStatus::kSuccess;
- }
- private:
- std::string ExtractCompactionInput(const std::string& scheduled_job_id) {
- InstrumentedMutexLock l(&mutex_);
- auto job_index = jobs_.find(scheduled_job_id);
- if (job_index == jobs_.end()) {
- return "";
- }
- std::string compaction_input = std::move(job_index->second);
- jobs_.erase(job_index);
- auto info_index = infos_.find(scheduled_job_id);
- if (info_index == infos_.end()) {
- return "";
- }
- infos_.erase(info_index);
- return compaction_input;
- }
- HistogramData RunCancelledCompaction(
- const OpenAndCompactOptions& options, const std::string& scheduled_job_id,
- const std::string& compaction_input,
- const CompactionServiceOptionsOverride& override_options) {
- std::string temp_result;
- EXPECT_OK(statistics_->Reset());
- Status s =
- DB::OpenAndCompact(options, db_path_, GetOutputPath(scheduled_job_id),
- compaction_input, &temp_result, override_options);
- EXPECT_TRUE(s.IsManualCompactionPaused());
- HistogramData stats;
- statistics_->histogramData(FILE_WRITE_COMPACTION_MICROS, &stats);
- return stats;
- }
- HistogramData RunCompaction(
- const OpenAndCompactOptions& options, const std::string& scheduled_job_id,
- const std::string& compaction_input,
- const CompactionServiceOptionsOverride& override_options,
- std::string* result) {
- EXPECT_OK(statistics_->Reset());
- Status s =
- DB::OpenAndCompact(options, db_path_, GetOutputPath(scheduled_job_id),
- compaction_input, result, override_options);
- EXPECT_TRUE(s.ok());
- HistogramData stats;
- statistics_->histogramData(FILE_WRITE_COMPACTION_MICROS, &stats);
- return stats;
- }
- void StoreResult(const std::string& result) {
- InstrumentedMutexLock l(&mutex_);
- result_ = result;
- }
- TestScenario scenario_;
- };
- class ResumableCompactionServiceTest : public CompactionServiceTest {
- public:
- explicit ResumableCompactionServiceTest() : CompactionServiceTest() {}
- void RunCompactionCancelTest(
- ResumableCompactionService::TestScenario scenario) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- std::shared_ptr<Statistics> statistics = CreateDBStatistics();
- options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
- BlockBasedTableOptions table_options;
- table_options.verify_compression = true;
- options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- auto resume_cs = std::make_shared<ResumableCompactionService>(
- dbname_, options, statistics, scenario);
- options.compaction_service = resume_cs;
- DestroyAndReopen(options);
- GenerateTestData();
- ASSERT_OK(statistics->Reset());
- CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
- Status s = db_->CompactRange(cro, nullptr, nullptr);
- ASSERT_OK(s);
- VerifyTestData();
- s = db_->VerifyChecksum();
- ASSERT_OK(s);
- s = db_->VerifyFileChecksums(ReadOptions());
- ASSERT_OK(s);
- CompactionServiceResult result;
- resume_cs->GetResult(&result);
- ASSERT_OK(result.status);
- ASSERT_TRUE(result.stats.is_manual_compaction);
- ASSERT_TRUE(result.stats.is_remote_compaction);
- ASSERT_GT(result.output_files.size(), 0);
- uint64_t resumed_bytes =
- statistics->getTickerCount(REMOTE_COMPACT_RESUMED_BYTES);
- if (scenario ==
- ResumableCompactionService::TestScenario::kCancelThenResume) {
- // When resuming compaction, some bytes should be resumed from previous
- // progress
- ASSERT_GT(resumed_bytes, 0);
- } else if (scenario == ResumableCompactionService::TestScenario::
- kCancelThenFreshStart) {
- // When starting fresh (ignoring existing progress), no bytes should be
- // resumed
- ASSERT_EQ(resumed_bytes, 0);
- } else { // kMultipleCancelToggleResumption
- // Phase 2 ran without resumption (fresh start), so Phase 3 has no
- // progress to resume from. It processes all keys again from scratch.
- ASSERT_EQ(resumed_bytes, 0);
- }
- }
- void GenerateTestData() {
- for (int i = 0; i < kNumKeys; ++i) {
- ASSERT_OK(Put(Key(i), "value"));
- ASSERT_OK(Flush());
- if (i % 2 == 0) {
- ASSERT_OK(Delete(Key(i)));
- ASSERT_OK(Flush());
- }
- }
- }
- void VerifyTestData() {
- for (int i = 0; i < kNumKeys; ++i) {
- if (i % 2 == 0) {
- ASSERT_EQ("NOT_FOUND", Get((Key(i))));
- } else {
- ASSERT_EQ("value", Get((Key(i))));
- }
- }
- }
- private:
- static constexpr int kNumKeys = 10;
- };
- TEST_F(ResumableCompactionServiceTest, CompactionCancelThenResume) {
- RunCompactionCancelTest(
- ResumableCompactionService::TestScenario::kCancelThenResume);
- }
- TEST_F(ResumableCompactionServiceTest, CompactionCancelThenFreshStart) {
- RunCompactionCancelTest(
- ResumableCompactionService::TestScenario::kCancelThenFreshStart);
- }
- TEST_F(ResumableCompactionServiceTest,
- CompactionMultipleCancelToggleResumption) {
- RunCompactionCancelTest(ResumableCompactionService::TestScenario::
- kMultipleCancelToggleResumption);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- RegisterCustomObjects(argc, argv);
- return RUN_ALL_TESTS();
- }
|