| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include <mutex>#include <string>#include <thread>#include <vector>#include "db/db_impl/db_impl.h"#include "port/port.h"#include "rocksdb/db.h"#include "rocksdb/env.h"#include "test_util/sync_point.h"#include "test_util/testharness.h"#include "util/string_util.h"namespace ROCKSDB_NAMESPACE {class CompactFilesTest : public testing::Test { public:  CompactFilesTest() {    env_ = Env::Default();    db_name_ = test::PerThreadDBPath("compact_files_test");  }  std::string db_name_;  Env* env_;};// A class which remembers the name of each flushed file.class FlushedFileCollector : public EventListener { public:  FlushedFileCollector() {}  ~FlushedFileCollector() override {}  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {    std::lock_guard<std::mutex> lock(mutex_);    flushed_files_.push_back(info.file_path);  }  std::vector<std::string> GetFlushedFiles() {    std::lock_guard<std::mutex> lock(mutex_);    std::vector<std::string> result;    for (auto fname : flushed_files_) {      result.push_back(fname);    }    return result;  }  void ClearFlushedFiles() {    std::lock_guard<std::mutex> lock(mutex_);    flushed_files_.clear();  } private:  std::vector<std::string> flushed_files_;  std::mutex mutex_;};TEST_F(CompactFilesTest, L0ConflictsFiles) {  Options options;  // to trigger compaction more easily  const int kWriteBufferSize = 10000;  const int kLevel0Trigger = 2;  options.create_if_missing = true;  options.compaction_style = kCompactionStyleLevel;  // Small slowdown and stop trigger for experimental purpose.  options.level0_slowdown_writes_trigger = 20;  options.level0_stop_writes_trigger = 20;  options.level0_stop_writes_trigger = 20;  options.write_buffer_size = kWriteBufferSize;  options.level0_file_num_compaction_trigger = kLevel0Trigger;  options.compression = kNoCompression;  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  assert(s.ok());  assert(db);  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({      {"CompactFilesImpl:0", "BackgroundCallCompaction:0"},      {"BackgroundCallCompaction:1", "CompactFilesImpl:1"},  });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  // create couple files  // Background compaction starts and waits in BackgroundCallCompaction:0  for (int i = 0; i < kLevel0Trigger * 4; ++i) {    db->Put(WriteOptions(), ToString(i), "");    db->Put(WriteOptions(), ToString(100 - i), "");    db->Flush(FlushOptions());  }  ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;  db->GetColumnFamilyMetaData(&meta);  std::string file1;  for (auto& file : meta.levels[0].files) {    ASSERT_EQ(0, meta.levels[0].level);    if (file1 == "") {      file1 = file.db_path + "/" + file.name;    } else {      std::string file2 = file.db_path + "/" + file.name;      // Another thread starts a compact files and creates an L0 compaction      // The background compaction then notices that there is an L0 compaction      // already in progress and doesn't do an L0 compaction      // Once the background compaction finishes, the compact files finishes      ASSERT_OK(db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),                                 {file1, file2}, 0));      break;    }  }  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();  delete db;}TEST_F(CompactFilesTest, ObsoleteFiles) {  Options options;  // to trigger compaction more easily  const int kWriteBufferSize = 65536;  options.create_if_missing = true;  // Disable RocksDB background compaction.  options.compaction_style = kCompactionStyleNone;  options.level0_slowdown_writes_trigger = (1 << 30);  options.level0_stop_writes_trigger = (1 << 30);  options.write_buffer_size = kWriteBufferSize;  options.max_write_buffer_number = 2;  options.compression = kNoCompression;  // Add listener  FlushedFileCollector* collector = new FlushedFileCollector();  options.listeners.emplace_back(collector);  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  assert(s.ok());  assert(db);  // create couple files  for (int i = 1000; i < 2000; ++i) {    db->Put(WriteOptions(), ToString(i),            std::string(kWriteBufferSize / 10, 'a' + (i % 26)));  }  auto l0_files = collector->GetFlushedFiles();  ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));  reinterpret_cast<DBImpl*>(db)->TEST_WaitForCompact();  // verify all compaction input files are deleted  for (auto fname : l0_files) {    ASSERT_EQ(Status::NotFound(), env_->FileExists(fname));  }  delete db;}TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {  Options options;  options.create_if_missing = true;  // Disable RocksDB background compaction.  options.compaction_style = kCompactionStyleNone;  options.level0_slowdown_writes_trigger = 1000;  options.level0_stop_writes_trigger = 1000;  options.write_buffer_size = 65536;  options.max_write_buffer_number = 2;  options.compression = kNoCompression;  options.max_compaction_bytes = 5000;  // Add listener  FlushedFileCollector* collector = new FlushedFileCollector();  options.listeners.emplace_back(collector);  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  assert(s.ok());  assert(db);  // create couple files  for (int i = 0; i < 500; ++i) {    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));  }  reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();  auto l0_files_1 = collector->GetFlushedFiles();  collector->ClearFlushedFiles();  for (int i = 0; i < 500; ++i) {    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));  }  reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();  auto l0_files_2 = collector->GetFlushedFiles();  ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));  ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));  // no assertion failure  delete db;}TEST_F(CompactFilesTest, CapturingPendingFiles) {  Options options;  options.create_if_missing = true;  // Disable RocksDB background compaction.  options.compaction_style = kCompactionStyleNone;  // Always do full scans for obsolete files (needed to reproduce the issue).  options.delete_obsolete_files_period_micros = 0;  // Add listener.  FlushedFileCollector* collector = new FlushedFileCollector();  options.listeners.emplace_back(collector);  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  assert(s.ok());  assert(db);  // Create 5 files.  for (int i = 0; i < 5; ++i) {    db->Put(WriteOptions(), "key" + ToString(i), "value");    db->Flush(FlushOptions());  }  auto l0_files = collector->GetFlushedFiles();  EXPECT_EQ(5, l0_files.size());  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({      {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},      {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},  });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  // Start compacting files.  ROCKSDB_NAMESPACE::port::Thread compaction_thread(      [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });  // In the meantime flush another file.  TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");  db->Put(WriteOptions(), "key5", "value");  db->Flush(FlushOptions());  TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");  compaction_thread.join();  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();  delete db;  // Make sure we can reopen the DB.  s = DB::Open(options, db_name_, &db);  ASSERT_TRUE(s.ok());  assert(db);  delete db;}TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {  class FilterWithGet : public CompactionFilter {   public:    bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,                std::string* /*new_value*/,                bool* /*value_changed*/) const override {      if (db_ == nullptr) {        return true;      }      std::string res;      db_->Get(ReadOptions(), "", &res);      return true;    }    void SetDB(DB* db) {      db_ = db;    }    const char* Name() const override { return "FilterWithGet"; }   private:    DB* db_;  };  std::shared_ptr<FilterWithGet> cf(new FilterWithGet());  Options options;  options.create_if_missing = true;  options.compaction_filter = cf.get();  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  ASSERT_OK(s);  cf->SetDB(db);  // Write one L0 file  db->Put(WriteOptions(), "K1", "V1");  db->Flush(FlushOptions());  // Compact all L0 files using CompactFiles  ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;  db->GetColumnFamilyMetaData(&meta);  for (auto& file : meta.levels[0].files) {    std::string fname = file.db_path + "/" + file.name;    ASSERT_OK(        db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));  }  delete db;}TEST_F(CompactFilesTest, SentinelCompressionType) {  if (!Zlib_Supported()) {    fprintf(stderr, "zlib compression not supported, skip this test\n");    return;  }  if (!Snappy_Supported()) {    fprintf(stderr, "snappy compression not supported, skip this test\n");    return;  }  // Check that passing `CompressionType::kDisableCompressionOption` to  // `CompactFiles` causes it to use the column family compression options.  for (auto compaction_style :       {CompactionStyle::kCompactionStyleLevel,        CompactionStyle::kCompactionStyleUniversal,        CompactionStyle::kCompactionStyleNone}) {    DestroyDB(db_name_, Options());    Options options;    options.compaction_style = compaction_style;    // L0: Snappy, L1: ZSTD, L2: Snappy    options.compression_per_level = {CompressionType::kSnappyCompression,                                     CompressionType::kZlibCompression,                                     CompressionType::kSnappyCompression};    options.create_if_missing = true;    FlushedFileCollector* collector = new FlushedFileCollector();    options.listeners.emplace_back(collector);    DB* db = nullptr;    ASSERT_OK(DB::Open(options, db_name_, &db));    db->Put(WriteOptions(), "key", "val");    db->Flush(FlushOptions());    auto l0_files = collector->GetFlushedFiles();    ASSERT_EQ(1, l0_files.size());    // L0->L1 compaction, so output should be ZSTD-compressed    CompactionOptions compaction_opts;    compaction_opts.compression = CompressionType::kDisableCompressionOption;    ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));    ROCKSDB_NAMESPACE::TablePropertiesCollection all_tables_props;    ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));    for (const auto& name_and_table_props : all_tables_props) {      ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),                name_and_table_props.second->compression_name);    }    delete db;  }}TEST_F(CompactFilesTest, GetCompactionJobInfo) {  Options options;  options.create_if_missing = true;  // Disable RocksDB background compaction.  options.compaction_style = kCompactionStyleNone;  options.level0_slowdown_writes_trigger = 1000;  options.level0_stop_writes_trigger = 1000;  options.write_buffer_size = 65536;  options.max_write_buffer_number = 2;  options.compression = kNoCompression;  options.max_compaction_bytes = 5000;  // Add listener  FlushedFileCollector* collector = new FlushedFileCollector();  options.listeners.emplace_back(collector);  DB* db = nullptr;  DestroyDB(db_name_, options);  Status s = DB::Open(options, db_name_, &db);  assert(s.ok());  assert(db);  // create couple files  for (int i = 0; i < 500; ++i) {    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));  }  reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();  auto l0_files_1 = collector->GetFlushedFiles();  CompactionOptions co;  co.compression = CompressionType::kLZ4Compression;  CompactionJobInfo compaction_job_info{};  ASSERT_OK(      db->CompactFiles(co, l0_files_1, 0, -1, nullptr, &compaction_job_info));  ASSERT_EQ(compaction_job_info.base_input_level, 0);  ASSERT_EQ(compaction_job_info.cf_id, db->DefaultColumnFamily()->GetID());  ASSERT_EQ(compaction_job_info.cf_name, db->DefaultColumnFamily()->GetName());  ASSERT_EQ(compaction_job_info.compaction_reason,            CompactionReason::kManualCompaction);  ASSERT_EQ(compaction_job_info.compression, CompressionType::kLZ4Compression);  ASSERT_EQ(compaction_job_info.output_level, 0);  ASSERT_OK(compaction_job_info.status);  // no assertion failure  delete db;}}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}#else#include <stdio.h>int main(int /*argc*/, char** /*argv*/) {  fprintf(stderr,          "SKIPPED as DBImpl::CompactFiles is not supported in ROCKSDB_LITE\n");  return 0;}#endif  // !ROCKSDB_LITE
 |