external_sst_file_basic_test.cc 127 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <functional>
  6. #include "db/db_test_util.h"
  7. #include "db/version_edit.h"
  8. #include "port/port.h"
  9. #include "port/stack_trace.h"
  10. #include "rocksdb/advanced_options.h"
  11. #include "rocksdb/options.h"
  12. #include "rocksdb/perf_context.h"
  13. #include "rocksdb/sst_file_writer.h"
  14. #include "test_util/testharness.h"
  15. #include "test_util/testutil.h"
  16. #include "util/defer.h"
  17. #include "util/file_checksum_helper.h"
  18. #include "util/random.h"
  19. #include "utilities/fault_injection_env.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class ExternalSSTFileBasicTest
  22. : public DBTestBase,
  23. public ::testing::WithParamInterface<std::tuple<bool, bool>> {
  24. public:
  25. ExternalSSTFileBasicTest()
  26. : DBTestBase("external_sst_file_basic_test", /*env_do_fsync=*/true) {
  27. sst_files_dir_ = dbname_ + "_sst_files/";
  28. fault_injection_test_env_.reset(new FaultInjectionTestEnv(env_));
  29. DestroyAndRecreateExternalSSTFilesDir();
  30. // Check if the Env supports RandomRWFile
  31. std::string file_path = sst_files_dir_ + "test_random_rw_file";
  32. std::unique_ptr<WritableFile> wfile;
  33. assert(env_->NewWritableFile(file_path, &wfile, EnvOptions()).ok());
  34. wfile.reset();
  35. std::unique_ptr<RandomRWFile> rwfile;
  36. Status s = env_->NewRandomRWFile(file_path, &rwfile, EnvOptions());
  37. if (s.IsNotSupported()) {
  38. random_rwfile_supported_ = false;
  39. } else {
  40. EXPECT_OK(s);
  41. random_rwfile_supported_ = true;
  42. }
  43. rwfile.reset();
  44. EXPECT_OK(env_->DeleteFile(file_path));
  45. }
  46. void DestroyAndRecreateExternalSSTFilesDir() {
  47. ASSERT_OK(DestroyDir(env_, sst_files_dir_));
  48. ASSERT_OK(env_->CreateDir(sst_files_dir_));
  49. }
  50. Status DeprecatedAddFile(const std::vector<std::string>& files,
  51. bool move_files = false,
  52. bool skip_snapshot_check = false) {
  53. IngestExternalFileOptions opts;
  54. opts.move_files = move_files;
  55. opts.snapshot_consistency = !skip_snapshot_check;
  56. opts.allow_global_seqno = false;
  57. opts.allow_blocking_flush = false;
  58. return db_->IngestExternalFile(files, opts);
  59. }
  60. Status AddFileWithFileChecksum(
  61. const std::vector<std::string>& files,
  62. const std::vector<std::string>& files_checksums,
  63. const std::vector<std::string>& files_checksum_func_names,
  64. bool verify_file_checksum = true, bool move_files = false,
  65. bool skip_snapshot_check = false, bool write_global_seqno = true) {
  66. IngestExternalFileOptions opts;
  67. opts.move_files = move_files;
  68. opts.snapshot_consistency = !skip_snapshot_check;
  69. opts.allow_global_seqno = false;
  70. opts.allow_blocking_flush = false;
  71. opts.write_global_seqno = write_global_seqno;
  72. opts.verify_file_checksum = verify_file_checksum;
  73. IngestExternalFileArg arg;
  74. arg.column_family = db_->DefaultColumnFamily();
  75. arg.external_files = files;
  76. arg.options = opts;
  77. arg.files_checksums = files_checksums;
  78. arg.files_checksum_func_names = files_checksum_func_names;
  79. return db_->IngestExternalFiles({arg});
  80. }
  81. Status GenerateAndAddExternalFile(
  82. const Options options, std::vector<int> keys,
  83. const std::vector<ValueType>& value_types,
  84. std::vector<std::pair<int, int>> range_deletions, int file_id,
  85. bool write_global_seqno, bool verify_checksums_before_ingest,
  86. std::map<std::string, std::string>* true_data) {
  87. assert(value_types.size() == 1 || keys.size() == value_types.size());
  88. std::string file_path = sst_files_dir_ + std::to_string(file_id);
  89. SstFileWriter sst_file_writer(EnvOptions(), options);
  90. Status s = sst_file_writer.Open(file_path);
  91. if (!s.ok()) {
  92. return s;
  93. }
  94. for (size_t i = 0; i < range_deletions.size(); i++) {
  95. // Account for the effect of range deletions on true_data before
  96. // all point operators, even though sst_file_writer.DeleteRange
  97. // must be called before other sst_file_writer methods. This is
  98. // because point writes take precedence over range deletions
  99. // in the same ingested sst. This precedence is part of
  100. // `SstFileWriter::DeleteRange()`'s API contract.
  101. std::string start_key = Key(range_deletions[i].first);
  102. std::string end_key = Key(range_deletions[i].second);
  103. s = sst_file_writer.DeleteRange(start_key, end_key);
  104. if (!s.ok()) {
  105. sst_file_writer.Finish();
  106. return s;
  107. }
  108. auto start_key_it = true_data->find(start_key);
  109. if (start_key_it == true_data->end()) {
  110. start_key_it = true_data->upper_bound(start_key);
  111. }
  112. auto end_key_it = true_data->find(end_key);
  113. if (end_key_it == true_data->end()) {
  114. end_key_it = true_data->upper_bound(end_key);
  115. }
  116. true_data->erase(start_key_it, end_key_it);
  117. }
  118. for (size_t i = 0; i < keys.size(); i++) {
  119. std::string key = Key(keys[i]);
  120. std::string value = Key(keys[i]) + std::to_string(file_id);
  121. ValueType value_type =
  122. (value_types.size() == 1 ? value_types[0] : value_types[i]);
  123. switch (value_type) {
  124. case ValueType::kTypeValue:
  125. s = sst_file_writer.Put(key, value);
  126. (*true_data)[key] = value;
  127. break;
  128. case ValueType::kTypeMerge:
  129. s = sst_file_writer.Merge(key, value);
  130. // we only use TestPutOperator in this test
  131. (*true_data)[key] = value;
  132. break;
  133. case ValueType::kTypeDeletion:
  134. s = sst_file_writer.Delete(key);
  135. true_data->erase(key);
  136. break;
  137. default:
  138. return Status::InvalidArgument("Value type is not supported");
  139. }
  140. if (!s.ok()) {
  141. sst_file_writer.Finish();
  142. return s;
  143. }
  144. }
  145. s = sst_file_writer.Finish();
  146. if (s.ok()) {
  147. IngestExternalFileOptions ifo;
  148. ifo.allow_global_seqno = true;
  149. ifo.write_global_seqno = write_global_seqno;
  150. ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
  151. s = db_->IngestExternalFile({file_path}, ifo);
  152. }
  153. return s;
  154. }
  155. Status GenerateAndAddExternalFile(
  156. const Options options, std::vector<int> keys,
  157. const std::vector<ValueType>& value_types, int file_id,
  158. bool write_global_seqno, bool verify_checksums_before_ingest,
  159. std::map<std::string, std::string>* true_data) {
  160. return GenerateAndAddExternalFile(
  161. options, keys, value_types, {}, file_id, write_global_seqno,
  162. verify_checksums_before_ingest, true_data);
  163. }
  164. Status GenerateAndAddExternalFile(
  165. const Options options, std::vector<int> keys, const ValueType value_type,
  166. int file_id, bool write_global_seqno, bool verify_checksums_before_ingest,
  167. std::map<std::string, std::string>* true_data) {
  168. return GenerateAndAddExternalFile(
  169. options, keys, std::vector<ValueType>(1, value_type), file_id,
  170. write_global_seqno, verify_checksums_before_ingest, true_data);
  171. }
  172. void VerifyInputFilesInternalStatsForOutputLevel(
  173. int output_level, int num_input_files_in_non_output_levels,
  174. int num_input_files_in_output_level,
  175. int num_filtered_input_files_in_non_output_levels,
  176. int num_filtered_input_files_in_output_level,
  177. uint64_t bytes_skipped_non_output_levels,
  178. uint64_t bytes_skipped_output_level) {
  179. ColumnFamilyHandleImpl* cfh =
  180. static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
  181. ColumnFamilyData* cfd = cfh->cfd();
  182. const InternalStats* internal_stats_ptr = cfd->internal_stats();
  183. const std::vector<InternalStats::CompactionStats>& comp_stats =
  184. internal_stats_ptr->TEST_GetCompactionStats();
  185. EXPECT_EQ(num_input_files_in_non_output_levels,
  186. comp_stats[output_level].num_input_files_in_non_output_levels);
  187. EXPECT_EQ(num_input_files_in_output_level,
  188. comp_stats[output_level].num_input_files_in_output_level);
  189. EXPECT_EQ(
  190. num_filtered_input_files_in_non_output_levels,
  191. comp_stats[output_level].num_filtered_input_files_in_non_output_levels);
  192. EXPECT_EQ(
  193. num_filtered_input_files_in_output_level,
  194. comp_stats[output_level].num_filtered_input_files_in_output_level);
  195. EXPECT_EQ(bytes_skipped_non_output_levels,
  196. comp_stats[output_level].bytes_skipped_non_output_levels);
  197. EXPECT_EQ(bytes_skipped_output_level,
  198. comp_stats[output_level].bytes_skipped_output_level);
  199. }
  200. ~ExternalSSTFileBasicTest() override {
  201. DestroyDir(env_, sst_files_dir_).PermitUncheckedError();
  202. }
  203. protected:
  204. std::string sst_files_dir_;
  205. std::unique_ptr<FaultInjectionTestEnv> fault_injection_test_env_;
  206. bool random_rwfile_supported_;
  207. };
  208. TEST_F(ExternalSSTFileBasicTest, Basic) {
  209. Options options = CurrentOptions();
  210. SstFileWriter sst_file_writer(EnvOptions(), options);
  211. // Current file size should be 0 after sst_file_writer init and before open a
  212. // file.
  213. ASSERT_EQ(sst_file_writer.FileSize(), 0);
  214. // file1.sst (0 => 99)
  215. std::string file1 = sst_files_dir_ + "file1.sst";
  216. ASSERT_OK(sst_file_writer.Open(file1));
  217. for (int k = 0; k < 100; k++) {
  218. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  219. }
  220. ExternalSstFileInfo file1_info;
  221. Status s = sst_file_writer.Finish(&file1_info);
  222. ASSERT_OK(s) << s.ToString();
  223. // Current file size should be non-zero after success write.
  224. ASSERT_GT(sst_file_writer.FileSize(), 0);
  225. ASSERT_EQ(file1_info.file_path, file1);
  226. ASSERT_EQ(file1_info.num_entries, 100);
  227. ASSERT_EQ(file1_info.smallest_key, Key(0));
  228. ASSERT_EQ(file1_info.largest_key, Key(99));
  229. ASSERT_EQ(file1_info.num_range_del_entries, 0);
  230. ASSERT_EQ(file1_info.smallest_range_del_key, "");
  231. ASSERT_EQ(file1_info.largest_range_del_key, "");
  232. ASSERT_EQ(file1_info.file_checksum, kUnknownFileChecksum);
  233. ASSERT_EQ(file1_info.file_checksum_func_name, kUnknownFileChecksumFuncName);
  234. // sst_file_writer already finished, cannot add this value
  235. s = sst_file_writer.Put(Key(100), "bad_val");
  236. ASSERT_NOK(s) << s.ToString();
  237. s = sst_file_writer.DeleteRange(Key(100), Key(200));
  238. ASSERT_NOK(s) << s.ToString();
  239. DestroyAndRecreateExternalSSTFilesDir();
  240. }
  241. TEST_F(ExternalSSTFileBasicTest, AlignedBufferedWrite) {
  242. class AlignedWriteFS : public FileSystemWrapper {
  243. public:
  244. explicit AlignedWriteFS(const std::shared_ptr<FileSystem>& _target)
  245. : FileSystemWrapper(_target) {}
  246. ~AlignedWriteFS() override {}
  247. const char* Name() const override { return "AlignedWriteFS"; }
  248. IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
  249. std::unique_ptr<FSWritableFile>* result,
  250. IODebugContext* dbg) override {
  251. class AlignedWritableFile : public FSWritableFileOwnerWrapper {
  252. public:
  253. AlignedWritableFile(std::unique_ptr<FSWritableFile>& file)
  254. : FSWritableFileOwnerWrapper(std::move(file)), last_write_(false) {}
  255. using FSWritableFileOwnerWrapper::Append;
  256. IOStatus Append(const Slice& data, const IOOptions& options,
  257. IODebugContext* dbg) override {
  258. EXPECT_FALSE(last_write_);
  259. if ((data.size() & (data.size() - 1)) != 0) {
  260. last_write_ = true;
  261. }
  262. return target()->Append(data, options, dbg);
  263. }
  264. private:
  265. bool last_write_;
  266. };
  267. std::unique_ptr<FSWritableFile> file;
  268. IOStatus s = target()->NewWritableFile(fname, opts, &file, dbg);
  269. if (s.ok()) {
  270. result->reset(new AlignedWritableFile(file));
  271. }
  272. return s;
  273. }
  274. };
  275. Options options = CurrentOptions();
  276. std::shared_ptr<AlignedWriteFS> aligned_fs =
  277. std::make_shared<AlignedWriteFS>(env_->GetFileSystem());
  278. std::unique_ptr<Env> wrap_env(
  279. new CompositeEnvWrapper(options.env, aligned_fs));
  280. options.env = wrap_env.get();
  281. EnvOptions env_options;
  282. env_options.writable_file_max_buffer_size = 64 * 1024 * 1024;
  283. SstFileWriter sst_file_writer(env_options, options);
  284. // Current file size should be 0 after sst_file_writer init and before open a
  285. // file.
  286. ASSERT_EQ(sst_file_writer.FileSize(), 0);
  287. // file1.sst (0 => 99)
  288. std::string file1 = sst_files_dir_ + "file1.sst";
  289. ASSERT_OK(sst_file_writer.Open(file1));
  290. Random r(301);
  291. for (int k = 0; k < 16 * 1024; k++) {
  292. uint32_t num = 4096 + r.Uniform(8192);
  293. std::string random_string = r.RandomString(num);
  294. ASSERT_OK(sst_file_writer.Put(Key(k), random_string));
  295. }
  296. Status s = sst_file_writer.Finish();
  297. ASSERT_OK(s) << s.ToString();
  298. // Current file size should be non-zero after success write.
  299. ASSERT_GT(sst_file_writer.FileSize(), 0);
  300. DestroyAndRecreateExternalSSTFilesDir();
  301. }
  302. class ChecksumVerifyHelper {
  303. private:
  304. Options options_;
  305. public:
  306. ChecksumVerifyHelper(Options& options) : options_(options) {}
  307. ~ChecksumVerifyHelper() = default;
  308. Status GetSingleFileChecksumAndFuncName(
  309. const std::string& file_path, std::string* file_checksum,
  310. std::string* file_checksum_func_name,
  311. const std::string& requested_func_name = {}) {
  312. Status s;
  313. EnvOptions soptions;
  314. std::unique_ptr<SequentialFile> file_reader;
  315. s = options_.env->NewSequentialFile(file_path, &file_reader, soptions);
  316. if (!s.ok()) {
  317. return s;
  318. }
  319. std::unique_ptr<char[]> scratch(new char[2048]);
  320. Slice result;
  321. FileChecksumGenFactory* file_checksum_gen_factory =
  322. options_.file_checksum_gen_factory.get();
  323. if (file_checksum_gen_factory == nullptr) {
  324. *file_checksum = kUnknownFileChecksum;
  325. *file_checksum_func_name = kUnknownFileChecksumFuncName;
  326. return Status::OK();
  327. } else {
  328. FileChecksumGenContext gen_context;
  329. gen_context.file_name = file_path;
  330. gen_context.requested_checksum_func_name = requested_func_name;
  331. std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
  332. file_checksum_gen_factory->CreateFileChecksumGenerator(gen_context);
  333. *file_checksum_func_name = file_checksum_gen->Name();
  334. s = file_reader->Read(2048, &result, scratch.get());
  335. if (!s.ok()) {
  336. return s;
  337. }
  338. while (result.size() != 0) {
  339. file_checksum_gen->Update(scratch.get(), result.size());
  340. s = file_reader->Read(2048, &result, scratch.get());
  341. if (!s.ok()) {
  342. return s;
  343. }
  344. }
  345. file_checksum_gen->Finalize();
  346. *file_checksum = file_checksum_gen->GetChecksum();
  347. }
  348. return Status::OK();
  349. }
  350. };
  351. TEST_F(ExternalSSTFileBasicTest, BasicWithFileChecksumCrc32c) {
  352. Options options = CurrentOptions();
  353. options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
  354. ChecksumVerifyHelper checksum_helper(options);
  355. SstFileWriter sst_file_writer(EnvOptions(), options);
  356. // Current file size should be 0 after sst_file_writer init and before open a
  357. // file.
  358. ASSERT_EQ(sst_file_writer.FileSize(), 0);
  359. // file1.sst (0 => 99)
  360. std::string file1 = sst_files_dir_ + "file1.sst";
  361. ASSERT_OK(sst_file_writer.Open(file1));
  362. for (int k = 0; k < 100; k++) {
  363. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  364. }
  365. ExternalSstFileInfo file1_info;
  366. Status s = sst_file_writer.Finish(&file1_info);
  367. ASSERT_OK(s) << s.ToString();
  368. std::string file_checksum, file_checksum_func_name;
  369. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  370. file1, &file_checksum, &file_checksum_func_name));
  371. // Current file size should be non-zero after success write.
  372. ASSERT_GT(sst_file_writer.FileSize(), 0);
  373. ASSERT_EQ(file1_info.file_path, file1);
  374. ASSERT_EQ(file1_info.num_entries, 100);
  375. ASSERT_EQ(file1_info.smallest_key, Key(0));
  376. ASSERT_EQ(file1_info.largest_key, Key(99));
  377. ASSERT_EQ(file1_info.num_range_del_entries, 0);
  378. ASSERT_EQ(file1_info.smallest_range_del_key, "");
  379. ASSERT_EQ(file1_info.largest_range_del_key, "");
  380. ASSERT_EQ(file1_info.file_checksum, file_checksum);
  381. ASSERT_EQ(file1_info.file_checksum_func_name, file_checksum_func_name);
  382. // sst_file_writer already finished, cannot add this value
  383. s = sst_file_writer.Put(Key(100), "bad_val");
  384. ASSERT_NOK(s) << s.ToString();
  385. s = sst_file_writer.DeleteRange(Key(100), Key(200));
  386. ASSERT_NOK(s) << s.ToString();
  387. DestroyAndReopen(options);
  388. // Add file using file path
  389. s = DeprecatedAddFile({file1});
  390. ASSERT_OK(s) << s.ToString();
  391. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  392. for (int k = 0; k < 100; k++) {
  393. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  394. }
  395. DestroyAndRecreateExternalSSTFilesDir();
  396. }
  397. namespace {
  398. class VariousFileChecksumGenerator : public FileChecksumGenCrc32c {
  399. public:
  400. explicit VariousFileChecksumGenerator(const std::string& name)
  401. : FileChecksumGenCrc32c({}), name_(name) {}
  402. const char* Name() const override { return name_.c_str(); }
  403. std::string GetChecksum() const override {
  404. return FileChecksumGenCrc32c::GetChecksum() + "_" + name_;
  405. }
  406. private:
  407. const std::string name_;
  408. };
  409. class VariousFileChecksumGenFactory : public FileChecksumGenFactory {
  410. public:
  411. std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
  412. const FileChecksumGenContext& context) override {
  413. static RelaxedAtomic<int> counter{0};
  414. if (Slice(context.requested_checksum_func_name).starts_with("Various")) {
  415. return std::make_unique<VariousFileChecksumGenerator>(
  416. context.requested_checksum_func_name);
  417. } else if (context.requested_checksum_func_name.empty()) {
  418. // Lacking a specific request, use a different function name for each
  419. // result.
  420. return std::make_unique<VariousFileChecksumGenerator>(
  421. "Various" + std::to_string(counter.FetchAddRelaxed(1)));
  422. } else {
  423. return nullptr;
  424. }
  425. }
  426. static const char* kClassName() { return "VariousFileChecksumGenFactory"; }
  427. const char* Name() const override { return kClassName(); }
  428. };
  429. } // namespace
  430. TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
  431. Options old_options = CurrentOptions();
  432. Options options = CurrentOptions();
  433. options.file_checksum_gen_factory =
  434. std::make_shared<VariousFileChecksumGenFactory>();
  435. const ImmutableCFOptions ioptions(options);
  436. ChecksumVerifyHelper checksum_helper(options);
  437. SstFileWriter sst_file_writer(EnvOptions(), options);
  438. // file01.sst (1000 => 1099)
  439. std::string file1 = sst_files_dir_ + "file01.sst";
  440. ASSERT_OK(sst_file_writer.Open(file1));
  441. for (int k = 1000; k < 1100; k++) {
  442. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  443. }
  444. ExternalSstFileInfo file1_info;
  445. Status s = sst_file_writer.Finish(&file1_info);
  446. ASSERT_OK(s) << s.ToString();
  447. ASSERT_EQ(file1_info.file_path, file1);
  448. ASSERT_EQ(file1_info.num_entries, 100);
  449. ASSERT_EQ(file1_info.smallest_key, Key(1000));
  450. ASSERT_EQ(file1_info.largest_key, Key(1099));
  451. std::string file_checksum1, file_checksum_func_name1;
  452. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  453. file1, &file_checksum1, &file_checksum_func_name1,
  454. file1_info.file_checksum_func_name));
  455. ASSERT_EQ(file1_info.file_checksum, file_checksum1);
  456. ASSERT_EQ(file1_info.file_checksum_func_name, file_checksum_func_name1);
  457. // file02.sst (1100 => 1299)
  458. std::string file2 = sst_files_dir_ + "file02.sst";
  459. ASSERT_OK(sst_file_writer.Open(file2));
  460. for (int k = 1100; k < 1300; k++) {
  461. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  462. }
  463. ExternalSstFileInfo file2_info;
  464. s = sst_file_writer.Finish(&file2_info);
  465. ASSERT_OK(s) << s.ToString();
  466. ASSERT_EQ(file2_info.file_path, file2);
  467. ASSERT_EQ(file2_info.num_entries, 200);
  468. ASSERT_EQ(file2_info.smallest_key, Key(1100));
  469. ASSERT_EQ(file2_info.largest_key, Key(1299));
  470. std::string file_checksum2, file_checksum_func_name2;
  471. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  472. file2, &file_checksum2, &file_checksum_func_name2,
  473. file2_info.file_checksum_func_name));
  474. ASSERT_EQ(file2_info.file_checksum, file_checksum2);
  475. ASSERT_EQ(file2_info.file_checksum_func_name, file_checksum_func_name2);
  476. // file03.sst (1300 => 1499)
  477. std::string file3 = sst_files_dir_ + "file03.sst";
  478. ASSERT_OK(sst_file_writer.Open(file3));
  479. for (int k = 1300; k < 1500; k++) {
  480. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  481. }
  482. ExternalSstFileInfo file3_info;
  483. s = sst_file_writer.Finish(&file3_info);
  484. ASSERT_OK(s) << s.ToString();
  485. ASSERT_EQ(file3_info.file_path, file3);
  486. ASSERT_EQ(file3_info.num_entries, 200);
  487. ASSERT_EQ(file3_info.smallest_key, Key(1300));
  488. ASSERT_EQ(file3_info.largest_key, Key(1499));
  489. std::string file_checksum3, file_checksum_func_name3;
  490. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  491. file3, &file_checksum3, &file_checksum_func_name3,
  492. file3_info.file_checksum_func_name));
  493. ASSERT_EQ(file3_info.file_checksum, file_checksum3);
  494. ASSERT_EQ(file3_info.file_checksum_func_name, file_checksum_func_name3);
  495. // file04.sst (1500 => 1799)
  496. std::string file4 = sst_files_dir_ + "file04.sst";
  497. ASSERT_OK(sst_file_writer.Open(file4));
  498. for (int k = 1500; k < 1800; k++) {
  499. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  500. }
  501. ExternalSstFileInfo file4_info;
  502. s = sst_file_writer.Finish(&file4_info);
  503. ASSERT_OK(s) << s.ToString();
  504. ASSERT_EQ(file4_info.file_path, file4);
  505. ASSERT_EQ(file4_info.num_entries, 300);
  506. ASSERT_EQ(file4_info.smallest_key, Key(1500));
  507. ASSERT_EQ(file4_info.largest_key, Key(1799));
  508. std::string file_checksum4, file_checksum_func_name4;
  509. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  510. file4, &file_checksum4, &file_checksum_func_name4,
  511. file4_info.file_checksum_func_name));
  512. ASSERT_EQ(file4_info.file_checksum, file_checksum4);
  513. ASSERT_EQ(file4_info.file_checksum_func_name, file_checksum_func_name4);
  514. // file05.sst (1800 => 1899)
  515. std::string file5 = sst_files_dir_ + "file05.sst";
  516. ASSERT_OK(sst_file_writer.Open(file5));
  517. for (int k = 1800; k < 2000; k++) {
  518. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  519. }
  520. ExternalSstFileInfo file5_info;
  521. s = sst_file_writer.Finish(&file5_info);
  522. ASSERT_OK(s) << s.ToString();
  523. ASSERT_EQ(file5_info.file_path, file5);
  524. ASSERT_EQ(file5_info.num_entries, 200);
  525. ASSERT_EQ(file5_info.smallest_key, Key(1800));
  526. ASSERT_EQ(file5_info.largest_key, Key(1999));
  527. std::string file_checksum5, file_checksum_func_name5;
  528. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  529. file5, &file_checksum5, &file_checksum_func_name5,
  530. file5_info.file_checksum_func_name));
  531. ASSERT_EQ(file5_info.file_checksum, file_checksum5);
  532. ASSERT_EQ(file5_info.file_checksum_func_name, file_checksum_func_name5);
  533. // file06.sst (2000 => 2199)
  534. std::string file6 = sst_files_dir_ + "file06.sst";
  535. ASSERT_OK(sst_file_writer.Open(file6));
  536. for (int k = 2000; k < 2200; k++) {
  537. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  538. }
  539. ExternalSstFileInfo file6_info;
  540. s = sst_file_writer.Finish(&file6_info);
  541. ASSERT_OK(s) << s.ToString();
  542. ASSERT_EQ(file6_info.file_path, file6);
  543. ASSERT_EQ(file6_info.num_entries, 200);
  544. ASSERT_EQ(file6_info.smallest_key, Key(2000));
  545. ASSERT_EQ(file6_info.largest_key, Key(2199));
  546. std::string file_checksum6, file_checksum_func_name6;
  547. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  548. file6, &file_checksum6, &file_checksum_func_name6,
  549. file6_info.file_checksum_func_name));
  550. ASSERT_EQ(file6_info.file_checksum, file_checksum6);
  551. ASSERT_EQ(file6_info.file_checksum_func_name, file_checksum_func_name6);
  552. s = AddFileWithFileChecksum({file1}, {file_checksum1, "xyz"},
  553. {file_checksum1}, true, false, false, false);
  554. // does not care the checksum input since db does not enable file checksum
  555. ASSERT_OK(s) << s.ToString();
  556. ASSERT_OK(env_->FileExists(file1));
  557. std::vector<LiveFileMetaData> live_files;
  558. dbfull()->GetLiveFilesMetaData(&live_files);
  559. std::set<std::string> set1;
  560. for (const auto& f : live_files) {
  561. set1.insert(f.name);
  562. ASSERT_EQ(f.file_checksum, kUnknownFileChecksum);
  563. ASSERT_EQ(f.file_checksum_func_name, kUnknownFileChecksumFuncName);
  564. }
  565. // check the temperature of the file being ingested
  566. ColumnFamilyMetaData metadata;
  567. db_->GetColumnFamilyMetaData(&metadata);
  568. ASSERT_EQ(1, metadata.file_count);
  569. ASSERT_EQ(Temperature::kUnknown, metadata.levels[6].files[0].temperature);
  570. auto size = GetSstSizeHelper(Temperature::kUnknown);
  571. ASSERT_GT(size, 0);
  572. size = GetSstSizeHelper(Temperature::kWarm);
  573. ASSERT_EQ(size, 0);
  574. size = GetSstSizeHelper(Temperature::kHot);
  575. ASSERT_EQ(size, 0);
  576. size = GetSstSizeHelper(Temperature::kCold);
  577. ASSERT_EQ(size, 0);
  578. // Reopen Db with checksum enabled
  579. Reopen(options);
  580. // Enable verify_file_checksum option
  581. // The checksum vector does not match, fail the ingestion
  582. s = AddFileWithFileChecksum({file2}, {file_checksum2, "xyz"},
  583. {file_checksum_func_name2}, true, false, false,
  584. false);
  585. ASSERT_NOK(s) << s.ToString();
  586. // Enable verify_file_checksum option
  587. // The checksum name does not match, fail the ingestion
  588. s = AddFileWithFileChecksum({file2}, {file_checksum2}, {"xyz"}, true, false,
  589. false, false);
  590. ASSERT_NOK(s) << s.ToString();
  591. // Enable verify_file_checksum option
  592. // The checksum itself does not match, fail the ingestion
  593. s = AddFileWithFileChecksum({file2}, {"xyz"}, {file_checksum_func_name2},
  594. true, false, false, false);
  595. ASSERT_NOK(s) << s.ToString();
  596. // Enable verify_file_checksum option
  597. // All matches, ingestion is successful
  598. s = AddFileWithFileChecksum({file2}, {file_checksum2},
  599. {file_checksum_func_name2}, true, false, false,
  600. false);
  601. ASSERT_OK(s) << s.ToString();
  602. std::vector<LiveFileMetaData> live_files1;
  603. dbfull()->GetLiveFilesMetaData(&live_files1);
  604. for (const auto& f : live_files1) {
  605. if (set1.find(f.name) == set1.end()) {
  606. ASSERT_EQ(f.file_checksum, file_checksum2);
  607. ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name2);
  608. set1.insert(f.name);
  609. }
  610. }
  611. ASSERT_OK(env_->FileExists(file2));
  612. // Enable verify_file_checksum option. No checksum information is provided,
  613. // so it is generated when ingesting. The configured checksum factory will
  614. // use a different function than before.
  615. s = AddFileWithFileChecksum({file3}, {}, {}, true, false, false, false);
  616. ASSERT_OK(s) << s.ToString();
  617. std::vector<LiveFileMetaData> live_files2;
  618. dbfull()->GetLiveFilesMetaData(&live_files2);
  619. for (const auto& f : live_files2) {
  620. if (set1.find(f.name) == set1.end()) {
  621. // Recomputed checksum, different function
  622. EXPECT_NE(f.file_checksum_func_name, file_checksum_func_name3);
  623. std::string cur_checksum3, cur_checksum_func_name3;
  624. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  625. dbname_ + f.name, &cur_checksum3, &cur_checksum_func_name3,
  626. f.file_checksum_func_name));
  627. EXPECT_EQ(f.file_checksum, cur_checksum3);
  628. EXPECT_EQ(f.file_checksum_func_name, cur_checksum_func_name3);
  629. set1.insert(f.name);
  630. }
  631. }
  632. ASSERT_OK(s) << s.ToString();
  633. ASSERT_OK(env_->FileExists(file3));
  634. // Does not enable verify_file_checksum options
  635. // The checksum name does not match, fail the ingestion
  636. s = AddFileWithFileChecksum({file4}, {file_checksum4}, {"xyz"}, false, false,
  637. false, false);
  638. ASSERT_NOK(s) << s.ToString();
  639. // Does not enable verify_file_checksum options
  640. // Checksum function name is recognized, so store the checksum being ingested.
  641. std::string file_checksum_func_name4alt = "VariousABCD";
  642. s = AddFileWithFileChecksum({file4}, {"asd"}, {file_checksum_func_name4alt},
  643. false, false, false, false);
  644. ASSERT_OK(s) << s.ToString();
  645. std::vector<LiveFileMetaData> live_files3;
  646. dbfull()->GetLiveFilesMetaData(&live_files3);
  647. for (const auto& f : live_files3) {
  648. if (set1.find(f.name) == set1.end()) {
  649. ASSERT_FALSE(f.file_checksum == file_checksum4);
  650. ASSERT_EQ(f.file_checksum, "asd");
  651. ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name4alt);
  652. set1.insert(f.name);
  653. }
  654. }
  655. ASSERT_OK(s) << s.ToString();
  656. ASSERT_OK(env_->FileExists(file4));
  657. // enable verify_file_checksum options, DB enable checksum, and enable
  658. // write_global_seq. So the checksum stored is different from the one
  659. // ingested due to the sequence number changes. The checksum function name
  660. // may also change since the checksum is recomputed.
  661. s = AddFileWithFileChecksum({file5}, {file_checksum5},
  662. {file_checksum_func_name5}, true, false, false,
  663. true);
  664. ASSERT_OK(s) << s.ToString();
  665. std::vector<LiveFileMetaData> live_files4;
  666. dbfull()->GetLiveFilesMetaData(&live_files4);
  667. for (const auto& f : live_files4) {
  668. if (set1.find(f.name) == set1.end()) {
  669. // Recomputed checksum, different function
  670. EXPECT_NE(f.file_checksum_func_name, file_checksum_func_name5);
  671. std::string cur_checksum5, cur_checksum_func_name5;
  672. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  673. dbname_ + f.name, &cur_checksum5, &cur_checksum_func_name5,
  674. f.file_checksum_func_name));
  675. EXPECT_EQ(f.file_checksum, cur_checksum5);
  676. EXPECT_EQ(f.file_checksum_func_name, cur_checksum_func_name5);
  677. set1.insert(f.name);
  678. }
  679. }
  680. ASSERT_OK(s) << s.ToString();
  681. ASSERT_OK(env_->FileExists(file5));
  682. // Does not enable verify_file_checksum options and also the ingested file
  683. // checksum information is empty. DB will generate and store file checksum
  684. // in Manifest, which could be different from the previous invocation.
  685. s = AddFileWithFileChecksum({file6}, {}, {}, false, false, false, false);
  686. ASSERT_OK(s) << s.ToString();
  687. std::vector<LiveFileMetaData> live_files6;
  688. dbfull()->GetLiveFilesMetaData(&live_files6);
  689. for (const auto& f : live_files6) {
  690. if (set1.find(f.name) == set1.end()) {
  691. // Recomputed checksum, different function
  692. EXPECT_NE(f.file_checksum_func_name, file_checksum_func_name6);
  693. std::string cur_checksum6, cur_checksum_func_name6;
  694. ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
  695. dbname_ + f.name, &cur_checksum6, &cur_checksum_func_name6,
  696. f.file_checksum_func_name));
  697. EXPECT_EQ(f.file_checksum, cur_checksum6);
  698. EXPECT_EQ(f.file_checksum_func_name, cur_checksum_func_name6);
  699. set1.insert(f.name);
  700. }
  701. }
  702. ASSERT_OK(s) << s.ToString();
  703. ASSERT_OK(env_->FileExists(file6));
  704. db_->GetColumnFamilyMetaData(&metadata);
  705. size = GetSstSizeHelper(Temperature::kUnknown);
  706. ASSERT_GT(size, 0);
  707. size = GetSstSizeHelper(Temperature::kWarm);
  708. ASSERT_EQ(size, 0);
  709. size = GetSstSizeHelper(Temperature::kHot);
  710. ASSERT_EQ(size, 0);
  711. size = GetSstSizeHelper(Temperature::kCold);
  712. ASSERT_EQ(size, 0);
  713. }
  714. TEST_F(ExternalSSTFileBasicTest, NoCopy) {
  715. Options options = CurrentOptions();
  716. const ImmutableCFOptions ioptions(options);
  717. SstFileWriter sst_file_writer(EnvOptions(), options);
  718. // file1.sst (0 => 99)
  719. std::string file1 = sst_files_dir_ + "file1.sst";
  720. ASSERT_OK(sst_file_writer.Open(file1));
  721. for (int k = 0; k < 100; k++) {
  722. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  723. }
  724. ExternalSstFileInfo file1_info;
  725. Status s = sst_file_writer.Finish(&file1_info);
  726. ASSERT_OK(s) << s.ToString();
  727. ASSERT_EQ(file1_info.file_path, file1);
  728. ASSERT_EQ(file1_info.num_entries, 100);
  729. ASSERT_EQ(file1_info.smallest_key, Key(0));
  730. ASSERT_EQ(file1_info.largest_key, Key(99));
  731. // file2.sst (100 => 299)
  732. std::string file2 = sst_files_dir_ + "file2.sst";
  733. ASSERT_OK(sst_file_writer.Open(file2));
  734. for (int k = 100; k < 300; k++) {
  735. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  736. }
  737. ExternalSstFileInfo file2_info;
  738. s = sst_file_writer.Finish(&file2_info);
  739. ASSERT_OK(s) << s.ToString();
  740. ASSERT_EQ(file2_info.file_path, file2);
  741. ASSERT_EQ(file2_info.num_entries, 200);
  742. ASSERT_EQ(file2_info.smallest_key, Key(100));
  743. ASSERT_EQ(file2_info.largest_key, Key(299));
  744. // file3.sst (110 => 124) .. overlap with file2.sst
  745. std::string file3 = sst_files_dir_ + "file3.sst";
  746. ASSERT_OK(sst_file_writer.Open(file3));
  747. for (int k = 110; k < 125; k++) {
  748. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
  749. }
  750. ExternalSstFileInfo file3_info;
  751. s = sst_file_writer.Finish(&file3_info);
  752. ASSERT_OK(s) << s.ToString();
  753. ASSERT_EQ(file3_info.file_path, file3);
  754. ASSERT_EQ(file3_info.num_entries, 15);
  755. ASSERT_EQ(file3_info.smallest_key, Key(110));
  756. ASSERT_EQ(file3_info.largest_key, Key(124));
  757. s = DeprecatedAddFile({file1}, true /* move file */);
  758. ASSERT_OK(s) << s.ToString();
  759. ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));
  760. s = DeprecatedAddFile({file2}, false /* copy file */);
  761. ASSERT_OK(s) << s.ToString();
  762. ASSERT_OK(env_->FileExists(file2));
  763. // This file has overlapping values with the existing data
  764. s = DeprecatedAddFile({file3}, true /* move file */);
  765. ASSERT_NOK(s) << s.ToString();
  766. ASSERT_OK(env_->FileExists(file3));
  767. for (int k = 0; k < 300; k++) {
  768. ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
  769. }
  770. }
  771. TEST_P(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
  772. bool write_global_seqno = std::get<0>(GetParam());
  773. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  774. do {
  775. Options options = CurrentOptions();
  776. options.disable_auto_compactions = true;
  777. DestroyAndReopen(options);
  778. std::map<std::string, std::string> true_data;
  779. int file_id = 1;
  780. ASSERT_OK(GenerateAndAddExternalFile(
  781. options, {1, 2, 3, 4, 5, 6}, ValueType::kTypeValue, file_id++,
  782. write_global_seqno, verify_checksums_before_ingest, &true_data));
  783. // File doesn't overwrite any keys, no seqno needed
  784. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  785. ASSERT_OK(GenerateAndAddExternalFile(
  786. options, {10, 11, 12, 13}, ValueType::kTypeValue, file_id++,
  787. write_global_seqno, verify_checksums_before_ingest, &true_data));
  788. // File doesn't overwrite any keys, no seqno needed
  789. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  790. ASSERT_OK(GenerateAndAddExternalFile(
  791. options, {1, 4, 6}, ValueType::kTypeValue, file_id++,
  792. write_global_seqno, verify_checksums_before_ingest, &true_data));
  793. // File overwrites some keys, a seqno will be assigned
  794. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
  795. ASSERT_OK(GenerateAndAddExternalFile(
  796. options, {11, 15, 19}, ValueType::kTypeValue, file_id++,
  797. write_global_seqno, verify_checksums_before_ingest, &true_data));
  798. // File overwrites some keys, a seqno will be assigned
  799. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  800. ASSERT_OK(GenerateAndAddExternalFile(
  801. options, {120, 130}, ValueType::kTypeValue, file_id++,
  802. write_global_seqno, verify_checksums_before_ingest, &true_data));
  803. // File doesn't overwrite any keys, no seqno needed
  804. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  805. ASSERT_OK(GenerateAndAddExternalFile(
  806. options, {1, 130}, ValueType::kTypeValue, file_id++, write_global_seqno,
  807. verify_checksums_before_ingest, &true_data));
  808. // File overwrites some keys, a seqno will be assigned
  809. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
  810. // Write some keys through normal write path
  811. for (int i = 0; i < 50; i++) {
  812. ASSERT_OK(Put(Key(i), "memtable"));
  813. true_data[Key(i)] = "memtable";
  814. }
  815. SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
  816. ASSERT_OK(GenerateAndAddExternalFile(
  817. options, {60, 61, 62}, ValueType::kTypeValue, file_id++,
  818. write_global_seqno, verify_checksums_before_ingest, &true_data));
  819. // File doesn't overwrite any keys, no seqno needed
  820. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
  821. ASSERT_OK(GenerateAndAddExternalFile(
  822. options, {40, 41, 42}, ValueType::kTypeValue, file_id++,
  823. write_global_seqno, verify_checksums_before_ingest, &true_data));
  824. // File overwrites some keys, a seqno will be assigned
  825. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
  826. ASSERT_OK(GenerateAndAddExternalFile(
  827. options, {20, 30, 40}, ValueType::kTypeValue, file_id++,
  828. write_global_seqno, verify_checksums_before_ingest, &true_data));
  829. // File overwrites some keys, a seqno will be assigned
  830. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
  831. const Snapshot* snapshot = db_->GetSnapshot();
  832. // We will need a seqno for the file regardless if the file overwrite
  833. // keys in the DB or not because we have a snapshot
  834. ASSERT_OK(GenerateAndAddExternalFile(
  835. options, {1000, 1002}, ValueType::kTypeValue, file_id++,
  836. write_global_seqno, verify_checksums_before_ingest, &true_data));
  837. // A global seqno will be assigned anyway because of the snapshot
  838. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
  839. ASSERT_OK(GenerateAndAddExternalFile(
  840. options, {2000, 3002}, ValueType::kTypeValue, file_id++,
  841. write_global_seqno, verify_checksums_before_ingest, &true_data));
  842. // A global seqno will be assigned anyway because of the snapshot
  843. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
  844. ASSERT_OK(GenerateAndAddExternalFile(
  845. options, {1, 20, 40, 100, 150}, ValueType::kTypeValue, file_id++,
  846. write_global_seqno, verify_checksums_before_ingest, &true_data));
  847. // A global seqno will be assigned anyway because of the snapshot
  848. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  849. db_->ReleaseSnapshot(snapshot);
  850. ASSERT_OK(GenerateAndAddExternalFile(
  851. options, {5000, 5001}, ValueType::kTypeValue, file_id++,
  852. write_global_seqno, verify_checksums_before_ingest, &true_data));
  853. // No snapshot anymore, no need to assign a seqno
  854. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  855. size_t kcnt = 0;
  856. VerifyDBFromMap(true_data, &kcnt, false);
  857. } while (ChangeOptionsForFileIngestionTest());
  858. }
  859. TEST_P(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
  860. bool write_global_seqno = std::get<0>(GetParam());
  861. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  862. do {
  863. Options options = CurrentOptions();
  864. options.disable_auto_compactions = true;
  865. options.merge_operator.reset(new TestPutOperator());
  866. DestroyAndReopen(options);
  867. std::map<std::string, std::string> true_data;
  868. int file_id = 1;
  869. ASSERT_OK(GenerateAndAddExternalFile(
  870. options, {1, 2, 3, 4, 5, 6}, ValueType::kTypeValue, file_id++,
  871. write_global_seqno, verify_checksums_before_ingest, &true_data));
  872. // File doesn't overwrite any keys, no seqno needed
  873. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  874. ASSERT_OK(GenerateAndAddExternalFile(
  875. options, {10, 11, 12, 13}, ValueType::kTypeValue, file_id++,
  876. write_global_seqno, verify_checksums_before_ingest, &true_data));
  877. // File doesn't overwrite any keys, no seqno needed
  878. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  879. ASSERT_OK(GenerateAndAddExternalFile(
  880. options, {1, 4, 6}, ValueType::kTypeMerge, file_id++,
  881. write_global_seqno, verify_checksums_before_ingest, &true_data));
  882. // File overwrites some keys, a seqno will be assigned
  883. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
  884. ASSERT_OK(GenerateAndAddExternalFile(
  885. options, {11, 15, 19}, ValueType::kTypeDeletion, file_id++,
  886. write_global_seqno, verify_checksums_before_ingest, &true_data));
  887. // File overwrites some keys, a seqno will be assigned
  888. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  889. ASSERT_OK(GenerateAndAddExternalFile(
  890. options, {120, 130}, ValueType::kTypeMerge, file_id++,
  891. write_global_seqno, verify_checksums_before_ingest, &true_data));
  892. // File doesn't overwrite any keys, no seqno needed
  893. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  894. ASSERT_OK(GenerateAndAddExternalFile(
  895. options, {1, 130}, ValueType::kTypeDeletion, file_id++,
  896. write_global_seqno, verify_checksums_before_ingest, &true_data));
  897. // File overwrites some keys, a seqno will be assigned
  898. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
  899. ASSERT_OK(GenerateAndAddExternalFile(
  900. options, {120}, {ValueType::kTypeValue}, {{120, 135}}, file_id++,
  901. write_global_seqno, verify_checksums_before_ingest, &true_data));
  902. // File overwrites some keys, a seqno will be assigned
  903. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4);
  904. ASSERT_OK(GenerateAndAddExternalFile(
  905. options, {}, {}, {{110, 120}}, file_id++, write_global_seqno,
  906. verify_checksums_before_ingest, &true_data));
  907. // The range deletion ends on a key, but it doesn't actually delete
  908. // this key because the largest key in the range is exclusive. Still,
  909. // it counts as an overlap so a new seqno will be assigned.
  910. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
  911. ASSERT_OK(GenerateAndAddExternalFile(
  912. options, {}, {}, {{100, 109}}, file_id++, write_global_seqno,
  913. verify_checksums_before_ingest, &true_data));
  914. // File doesn't overwrite any keys, no seqno needed
  915. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
  916. // Write some keys through normal write path
  917. for (int i = 0; i < 50; i++) {
  918. ASSERT_OK(Put(Key(i), "memtable"));
  919. true_data[Key(i)] = "memtable";
  920. }
  921. SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
  922. ASSERT_OK(GenerateAndAddExternalFile(
  923. options, {60, 61, 62}, ValueType::kTypeValue, file_id++,
  924. write_global_seqno, verify_checksums_before_ingest, &true_data));
  925. // File doesn't overwrite any keys, no seqno needed
  926. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
  927. ASSERT_OK(GenerateAndAddExternalFile(
  928. options, {40, 41, 42}, ValueType::kTypeMerge, file_id++,
  929. write_global_seqno, verify_checksums_before_ingest, &true_data));
  930. // File overwrites some keys, a seqno will be assigned
  931. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
  932. ASSERT_OK(GenerateAndAddExternalFile(
  933. options, {20, 30, 40}, ValueType::kTypeDeletion, file_id++,
  934. write_global_seqno, verify_checksums_before_ingest, &true_data));
  935. // File overwrites some keys, a seqno will be assigned
  936. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
  937. const Snapshot* snapshot = db_->GetSnapshot();
  938. // We will need a seqno for the file regardless if the file overwrite
  939. // keys in the DB or not because we have a snapshot
  940. ASSERT_OK(GenerateAndAddExternalFile(
  941. options, {1000, 1002}, ValueType::kTypeMerge, file_id++,
  942. write_global_seqno, verify_checksums_before_ingest, &true_data));
  943. // A global seqno will be assigned anyway because of the snapshot
  944. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
  945. ASSERT_OK(GenerateAndAddExternalFile(
  946. options, {2000, 3002}, ValueType::kTypeMerge, file_id++,
  947. write_global_seqno, verify_checksums_before_ingest, &true_data));
  948. // A global seqno will be assigned anyway because of the snapshot
  949. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
  950. ASSERT_OK(GenerateAndAddExternalFile(
  951. options, {1, 20, 40, 100, 150}, ValueType::kTypeMerge, file_id++,
  952. write_global_seqno, verify_checksums_before_ingest, &true_data));
  953. // A global seqno will be assigned anyway because of the snapshot
  954. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  955. db_->ReleaseSnapshot(snapshot);
  956. ASSERT_OK(GenerateAndAddExternalFile(
  957. options, {5000, 5001}, ValueType::kTypeValue, file_id++,
  958. write_global_seqno, verify_checksums_before_ingest, &true_data));
  959. // No snapshot anymore, no need to assign a seqno
  960. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  961. size_t kcnt = 0;
  962. VerifyDBFromMap(true_data, &kcnt, false);
  963. } while (ChangeOptionsForFileIngestionTest());
  964. }
  965. TEST_P(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
  966. bool write_global_seqno = std::get<0>(GetParam());
  967. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  968. do {
  969. Options options = CurrentOptions();
  970. options.disable_auto_compactions = true;
  971. options.merge_operator.reset(new TestPutOperator());
  972. DestroyAndReopen(options);
  973. std::map<std::string, std::string> true_data;
  974. int file_id = 1;
  975. ASSERT_OK(GenerateAndAddExternalFile(
  976. options, {1, 2, 3, 4, 5, 6},
  977. {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
  978. ValueType::kTypeMerge, ValueType::kTypeValue, ValueType::kTypeMerge},
  979. file_id++, write_global_seqno, verify_checksums_before_ingest,
  980. &true_data));
  981. // File doesn't overwrite any keys, no seqno needed
  982. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  983. ASSERT_OK(GenerateAndAddExternalFile(
  984. options, {10, 11, 12, 13},
  985. {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
  986. ValueType::kTypeMerge},
  987. file_id++, write_global_seqno, verify_checksums_before_ingest,
  988. &true_data));
  989. // File doesn't overwrite any keys, no seqno needed
  990. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
  991. ASSERT_OK(GenerateAndAddExternalFile(
  992. options, {1, 4, 6},
  993. {ValueType::kTypeDeletion, ValueType::kTypeValue,
  994. ValueType::kTypeMerge},
  995. file_id++, write_global_seqno, verify_checksums_before_ingest,
  996. &true_data));
  997. // File overwrites some keys, a seqno will be assigned
  998. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
  999. ASSERT_OK(GenerateAndAddExternalFile(
  1000. options, {11, 15, 19},
  1001. {ValueType::kTypeDeletion, ValueType::kTypeMerge,
  1002. ValueType::kTypeValue},
  1003. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1004. &true_data));
  1005. // File overwrites some keys, a seqno will be assigned
  1006. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  1007. ASSERT_OK(GenerateAndAddExternalFile(
  1008. options, {120, 130}, {ValueType::kTypeValue, ValueType::kTypeMerge},
  1009. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1010. &true_data));
  1011. // File doesn't overwrite any keys, no seqno needed
  1012. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
  1013. ASSERT_OK(GenerateAndAddExternalFile(
  1014. options, {1, 130}, {ValueType::kTypeMerge, ValueType::kTypeDeletion},
  1015. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1016. &true_data));
  1017. // File overwrites some keys, a seqno will be assigned
  1018. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
  1019. ASSERT_OK(GenerateAndAddExternalFile(
  1020. options, {150, 151, 152},
  1021. {ValueType::kTypeValue, ValueType::kTypeMerge,
  1022. ValueType::kTypeDeletion},
  1023. {{150, 160}, {180, 190}}, file_id++, write_global_seqno,
  1024. verify_checksums_before_ingest, &true_data));
  1025. // File doesn't overwrite any keys, no seqno needed
  1026. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
  1027. ASSERT_OK(GenerateAndAddExternalFile(
  1028. options, {150, 151, 152},
  1029. {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue},
  1030. {{200, 250}}, file_id++, write_global_seqno,
  1031. verify_checksums_before_ingest, &true_data));
  1032. // File overwrites some keys, a seqno will be assigned
  1033. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4);
  1034. ASSERT_OK(GenerateAndAddExternalFile(
  1035. options, {300, 301, 302},
  1036. {ValueType::kTypeValue, ValueType::kTypeMerge,
  1037. ValueType::kTypeDeletion},
  1038. {{1, 2}, {152, 154}}, file_id++, write_global_seqno,
  1039. verify_checksums_before_ingest, &true_data));
  1040. // File overwrites some keys, a seqno will be assigned
  1041. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
  1042. // Write some keys through normal write path
  1043. for (int i = 0; i < 50; i++) {
  1044. ASSERT_OK(Put(Key(i), "memtable"));
  1045. true_data[Key(i)] = "memtable";
  1046. }
  1047. SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
  1048. ASSERT_OK(GenerateAndAddExternalFile(
  1049. options, {60, 61, 62},
  1050. {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue},
  1051. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1052. &true_data));
  1053. // File doesn't overwrite any keys, no seqno needed
  1054. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
  1055. ASSERT_OK(GenerateAndAddExternalFile(
  1056. options, {40, 41, 42},
  1057. {ValueType::kTypeValue, ValueType::kTypeDeletion,
  1058. ValueType::kTypeDeletion},
  1059. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1060. &true_data));
  1061. // File overwrites some keys, a seqno will be assigned
  1062. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
  1063. ASSERT_OK(GenerateAndAddExternalFile(
  1064. options, {20, 30, 40},
  1065. {ValueType::kTypeDeletion, ValueType::kTypeDeletion,
  1066. ValueType::kTypeDeletion},
  1067. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1068. &true_data));
  1069. // File overwrites some keys, a seqno will be assigned
  1070. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
  1071. const Snapshot* snapshot = db_->GetSnapshot();
  1072. // We will need a seqno for the file regardless if the file overwrite
  1073. // keys in the DB or not because we have a snapshot
  1074. ASSERT_OK(GenerateAndAddExternalFile(
  1075. options, {1000, 1002}, {ValueType::kTypeValue, ValueType::kTypeMerge},
  1076. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1077. &true_data));
  1078. // A global seqno will be assigned anyway because of the snapshot
  1079. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
  1080. ASSERT_OK(GenerateAndAddExternalFile(
  1081. options, {2000, 3002}, {ValueType::kTypeValue, ValueType::kTypeMerge},
  1082. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1083. &true_data));
  1084. // A global seqno will be assigned anyway because of the snapshot
  1085. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
  1086. ASSERT_OK(GenerateAndAddExternalFile(
  1087. options, {1, 20, 40, 100, 150},
  1088. {ValueType::kTypeDeletion, ValueType::kTypeDeletion,
  1089. ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeMerge},
  1090. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1091. &true_data));
  1092. // A global seqno will be assigned anyway because of the snapshot
  1093. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  1094. db_->ReleaseSnapshot(snapshot);
  1095. ASSERT_OK(GenerateAndAddExternalFile(
  1096. options, {5000, 5001}, {ValueType::kTypeValue, ValueType::kTypeMerge},
  1097. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1098. &true_data));
  1099. // No snapshot anymore, no need to assign a seqno
  1100. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
  1101. size_t kcnt = 0;
  1102. VerifyDBFromMap(true_data, &kcnt, false);
  1103. } while (ChangeOptionsForFileIngestionTest());
  1104. }
  1105. TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
  1106. Options options = CurrentOptions();
  1107. const int kNumKeys = 10000;
  1108. size_t total_fadvised_bytes = 0;
  1109. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1110. "SstFileWriter::Rep::InvalidatePageCache", [&](void* arg) {
  1111. size_t fadvise_size = *(static_cast<size_t*>(arg));
  1112. total_fadvised_bytes += fadvise_size;
  1113. });
  1114. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1115. std::unique_ptr<SstFileWriter> sst_file_writer;
  1116. std::string sst_file_path = sst_files_dir_ + "file_fadvise_disable.sst";
  1117. sst_file_writer.reset(
  1118. new SstFileWriter(EnvOptions(), options, nullptr, false));
  1119. ASSERT_OK(sst_file_writer->Open(sst_file_path));
  1120. for (int i = 0; i < kNumKeys; i++) {
  1121. ASSERT_OK(sst_file_writer->Put(Key(i), Key(i)));
  1122. }
  1123. ASSERT_OK(sst_file_writer->Finish());
  1124. // fadvise disabled
  1125. ASSERT_EQ(total_fadvised_bytes, 0);
  1126. sst_file_path = sst_files_dir_ + "file_fadvise_enable.sst";
  1127. sst_file_writer.reset(
  1128. new SstFileWriter(EnvOptions(), options, nullptr, true));
  1129. ASSERT_OK(sst_file_writer->Open(sst_file_path));
  1130. for (int i = 0; i < kNumKeys; i++) {
  1131. ASSERT_OK(sst_file_writer->Put(Key(i), Key(i)));
  1132. }
  1133. ASSERT_OK(sst_file_writer->Finish());
  1134. // fadvise enabled
  1135. ASSERT_EQ(total_fadvised_bytes, sst_file_writer->FileSize());
  1136. ASSERT_GT(total_fadvised_bytes, 0);
  1137. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1138. }
  1139. TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
  1140. Options options;
  1141. options.create_if_missing = true;
  1142. options.env = fault_injection_test_env_.get();
  1143. std::vector<std::pair<std::string, std::string>> test_cases = {
  1144. {"ExternalSstFileIngestionJob::BeforeSyncIngestedFile",
  1145. "ExternalSstFileIngestionJob::AfterSyncIngestedFile"},
  1146. {"ExternalSstFileIngestionJob::BeforeSyncDir",
  1147. "ExternalSstFileIngestionJob::AfterSyncDir"},
  1148. {"ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno",
  1149. "ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"}};
  1150. for (size_t i = 0; i < test_cases.size(); i++) {
  1151. bool no_sync = false;
  1152. SyncPoint::GetInstance()->SetCallBack(test_cases[i].first, [&](void*) {
  1153. fault_injection_test_env_->SetFilesystemActive(false);
  1154. });
  1155. SyncPoint::GetInstance()->SetCallBack(test_cases[i].second, [&](void*) {
  1156. fault_injection_test_env_->SetFilesystemActive(true);
  1157. });
  1158. if (i == 0) {
  1159. SyncPoint::GetInstance()->SetCallBack(
  1160. "ExternalSstFileIngestionJob::Prepare:Reopen", [&](void* s) {
  1161. Status* status = static_cast<Status*>(s);
  1162. if (status->IsNotSupported()) {
  1163. no_sync = true;
  1164. }
  1165. });
  1166. }
  1167. if (i == 2) {
  1168. SyncPoint::GetInstance()->SetCallBack(
  1169. "ExternalSstFileIngestionJob::NewRandomRWFile", [&](void* s) {
  1170. Status* status = static_cast<Status*>(s);
  1171. if (status->IsNotSupported()) {
  1172. no_sync = true;
  1173. }
  1174. });
  1175. }
  1176. SyncPoint::GetInstance()->EnableProcessing();
  1177. DestroyAndReopen(options);
  1178. if (i == 2) {
  1179. ASSERT_OK(Put("foo", "v1"));
  1180. }
  1181. Options sst_file_writer_options;
  1182. sst_file_writer_options.env = fault_injection_test_env_.get();
  1183. std::unique_ptr<SstFileWriter> sst_file_writer(
  1184. new SstFileWriter(EnvOptions(), sst_file_writer_options));
  1185. std::string file_name =
  1186. sst_files_dir_ + "sync_failure_test_" + std::to_string(i) + ".sst";
  1187. ASSERT_OK(sst_file_writer->Open(file_name));
  1188. ASSERT_OK(sst_file_writer->Put("bar", "v2"));
  1189. ASSERT_OK(sst_file_writer->Finish());
  1190. IngestExternalFileOptions ingest_opt;
  1191. ASSERT_FALSE(ingest_opt.write_global_seqno); // new default
  1192. if (i == 0) {
  1193. ingest_opt.move_files = true;
  1194. }
  1195. const Snapshot* snapshot = db_->GetSnapshot();
  1196. if (i == 2) {
  1197. ingest_opt.write_global_seqno = true;
  1198. }
  1199. Status s = db_->IngestExternalFile({file_name}, ingest_opt);
  1200. if (no_sync) {
  1201. ASSERT_OK(s);
  1202. } else {
  1203. ASSERT_NOK(s);
  1204. }
  1205. db_->ReleaseSnapshot(snapshot);
  1206. SyncPoint::GetInstance()->DisableProcessing();
  1207. SyncPoint::GetInstance()->ClearAllCallBacks();
  1208. Destroy(options);
  1209. }
  1210. }
  1211. TEST_F(ExternalSSTFileBasicTest, ReopenNotSupported) {
  1212. Options options;
  1213. options.create_if_missing = true;
  1214. options.env = env_;
  1215. SyncPoint::GetInstance()->SetCallBack(
  1216. "ExternalSstFileIngestionJob::Prepare:Reopen", [&](void* arg) {
  1217. Status* s = static_cast<Status*>(arg);
  1218. *s = Status::NotSupported();
  1219. });
  1220. SyncPoint::GetInstance()->EnableProcessing();
  1221. DestroyAndReopen(options);
  1222. Options sst_file_writer_options;
  1223. sst_file_writer_options.env = env_;
  1224. std::unique_ptr<SstFileWriter> sst_file_writer(
  1225. new SstFileWriter(EnvOptions(), sst_file_writer_options));
  1226. std::string file_name =
  1227. sst_files_dir_ + "reopen_not_supported_test_" + ".sst";
  1228. ASSERT_OK(sst_file_writer->Open(file_name));
  1229. ASSERT_OK(sst_file_writer->Put("bar", "v2"));
  1230. ASSERT_OK(sst_file_writer->Finish());
  1231. IngestExternalFileOptions ingest_opt;
  1232. ingest_opt.move_files = true;
  1233. const Snapshot* snapshot = db_->GetSnapshot();
  1234. ASSERT_OK(db_->IngestExternalFile({file_name}, ingest_opt));
  1235. db_->ReleaseSnapshot(snapshot);
  1236. SyncPoint::GetInstance()->DisableProcessing();
  1237. SyncPoint::GetInstance()->ClearAllCallBacks();
  1238. Destroy(options);
  1239. }
  1240. TEST_F(ExternalSSTFileBasicTest, VerifyChecksumReadahead) {
  1241. Options options;
  1242. options.create_if_missing = true;
  1243. SpecialEnv senv(env_);
  1244. options.env = &senv;
  1245. DestroyAndReopen(options);
  1246. Options sst_file_writer_options;
  1247. sst_file_writer_options.env = env_;
  1248. std::unique_ptr<SstFileWriter> sst_file_writer(
  1249. new SstFileWriter(EnvOptions(), sst_file_writer_options));
  1250. std::string file_name = sst_files_dir_ + "verify_checksum_readahead_test.sst";
  1251. ASSERT_OK(sst_file_writer->Open(file_name));
  1252. Random rnd(301);
  1253. std::string value = rnd.RandomString(4000);
  1254. for (int i = 0; i < 5000; i++) {
  1255. ASSERT_OK(sst_file_writer->Put(DBTestBase::Key(i), value));
  1256. }
  1257. ASSERT_OK(sst_file_writer->Finish());
  1258. // Ingest it once without verifying checksums to see the baseline
  1259. // preads.
  1260. IngestExternalFileOptions ingest_opt;
  1261. ingest_opt.move_files = false;
  1262. senv.count_random_reads_ = true;
  1263. senv.random_read_bytes_counter_ = 0;
  1264. ASSERT_OK(db_->IngestExternalFile({file_name}, ingest_opt));
  1265. auto base_num_reads = senv.random_read_counter_.Read();
  1266. // Make sure the counter is enabled.
  1267. ASSERT_GT(base_num_reads, 0);
  1268. // Ingest again and observe the reads made for for readahead.
  1269. ingest_opt.move_files = false;
  1270. ingest_opt.verify_checksums_before_ingest = true;
  1271. ingest_opt.verify_checksums_readahead_size = size_t{2 * 1024 * 1024};
  1272. senv.count_random_reads_ = true;
  1273. senv.random_read_bytes_counter_ = 0;
  1274. ASSERT_OK(db_->IngestExternalFile({file_name}, ingest_opt));
  1275. // Make sure the counter is enabled.
  1276. ASSERT_GT(senv.random_read_counter_.Read() - base_num_reads, 0);
  1277. // The SST file is about 20MB. Readahead size is 2MB.
  1278. // Give a conservative 15 reads for metadata blocks, the number
  1279. // of random reads should be within 20 MB / 2MB + 15 = 25.
  1280. ASSERT_LE(senv.random_read_counter_.Read() - base_num_reads, 40);
  1281. Destroy(options);
  1282. }
  1283. TEST_F(ExternalSSTFileBasicTest, ReadOldValueOfIngestedKeyBug) {
  1284. Options options = CurrentOptions();
  1285. options.compaction_style = kCompactionStyleUniversal;
  1286. options.disable_auto_compactions = true;
  1287. options.num_levels = 3;
  1288. options.preserve_internal_time_seconds = 36000;
  1289. DestroyAndReopen(options);
  1290. // To create the following LSM tree to trigger the bug:
  1291. // L0
  1292. // L1 with seqno [1, 2]
  1293. // L2 with seqno [3, 4]
  1294. // To create L1 shape
  1295. ASSERT_OK(
  1296. db_->Put(WriteOptions(), db_->DefaultColumnFamily(), "k1", "seqno1"));
  1297. ASSERT_OK(db_->Flush(FlushOptions()));
  1298. ASSERT_OK(
  1299. db_->Put(WriteOptions(), db_->DefaultColumnFamily(), "k1", "seqno2"));
  1300. ASSERT_OK(db_->Flush(FlushOptions()));
  1301. ColumnFamilyMetaData meta_1;
  1302. db_->GetColumnFamilyMetaData(&meta_1);
  1303. auto& files_1 = meta_1.levels[0].files;
  1304. ASSERT_EQ(files_1.size(), 2);
  1305. std::string file1 = files_1[0].db_path + files_1[0].name;
  1306. std::string file2 = files_1[1].db_path + files_1[1].name;
  1307. ASSERT_OK(db_->CompactFiles(CompactionOptions(), {file1, file2}, 1));
  1308. // To confirm L1 shape
  1309. ColumnFamilyMetaData meta_2;
  1310. db_->GetColumnFamilyMetaData(&meta_2);
  1311. ASSERT_EQ(meta_2.levels[0].files.size(), 0);
  1312. ASSERT_EQ(meta_2.levels[1].files.size(), 1);
  1313. // Seqno starts from non-zero due to seqno reservation for
  1314. // preserve_internal_time_seconds greater than 0;
  1315. ASSERT_EQ(meta_2.levels[1].files[0].largest_seqno, 102);
  1316. ASSERT_EQ(meta_2.levels[2].files.size(), 0);
  1317. // To create L2 shape
  1318. ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), "k2overlap",
  1319. "old_value"));
  1320. ASSERT_OK(db_->Flush(FlushOptions()));
  1321. ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), "k2overlap",
  1322. "old_value"));
  1323. ASSERT_OK(db_->Flush(FlushOptions()));
  1324. ColumnFamilyMetaData meta_3;
  1325. db_->GetColumnFamilyMetaData(&meta_3);
  1326. auto& files_3 = meta_3.levels[0].files;
  1327. std::string file3 = files_3[0].db_path + files_3[0].name;
  1328. std::string file4 = files_3[1].db_path + files_3[1].name;
  1329. ASSERT_OK(db_->CompactFiles(CompactionOptions(), {file3, file4}, 2));
  1330. // To confirm L2 shape
  1331. ColumnFamilyMetaData meta_4;
  1332. db_->GetColumnFamilyMetaData(&meta_4);
  1333. ASSERT_EQ(meta_4.levels[0].files.size(), 0);
  1334. ASSERT_EQ(meta_4.levels[1].files.size(), 1);
  1335. ASSERT_EQ(meta_4.levels[2].files.size(), 1);
  1336. ASSERT_EQ(meta_4.levels[2].files[0].largest_seqno, 104);
  1337. // Ingest a file with new value of the key "k2overlap"
  1338. SstFileWriter sst_file_writer(EnvOptions(), options);
  1339. std::string f = sst_files_dir_ + "f.sst";
  1340. ASSERT_OK(sst_file_writer.Open(f));
  1341. ASSERT_OK(sst_file_writer.Put("k2overlap", "new_value"));
  1342. ExternalSstFileInfo f_info;
  1343. ASSERT_OK(sst_file_writer.Finish(&f_info));
  1344. ASSERT_OK(db_->IngestExternalFile({f}, IngestExternalFileOptions()));
  1345. // To verify new value of the key "k2overlap" is correctly returned
  1346. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  1347. std::string value;
  1348. ASSERT_OK(db_->Get(ReadOptions(), "k2overlap", &value));
  1349. // Before the fix, the value would be "old_value" and assertion failed
  1350. ASSERT_EQ(value, "new_value");
  1351. }
  1352. TEST_F(ExternalSSTFileBasicTest, IngestRangeDeletionTombstoneWithGlobalSeqno) {
  1353. for (int i = 5; i < 25; i++) {
  1354. ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), Key(i),
  1355. Key(i) + "_val"));
  1356. }
  1357. Options options = CurrentOptions();
  1358. options.disable_auto_compactions = true;
  1359. Reopen(options);
  1360. SstFileWriter sst_file_writer(EnvOptions(), options);
  1361. // file.sst (delete 0 => 30)
  1362. std::string file = sst_files_dir_ + "file.sst";
  1363. ASSERT_OK(sst_file_writer.Open(file));
  1364. ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(30)));
  1365. ExternalSstFileInfo file_info;
  1366. ASSERT_OK(sst_file_writer.Finish(&file_info));
  1367. ASSERT_EQ(file_info.file_path, file);
  1368. ASSERT_EQ(file_info.num_entries, 0);
  1369. ASSERT_EQ(file_info.smallest_key, "");
  1370. ASSERT_EQ(file_info.largest_key, "");
  1371. ASSERT_EQ(file_info.num_range_del_entries, 1);
  1372. ASSERT_EQ(file_info.smallest_range_del_key, Key(0));
  1373. ASSERT_EQ(file_info.largest_range_del_key, Key(30));
  1374. IngestExternalFileOptions ifo;
  1375. ifo.move_files = true;
  1376. ifo.snapshot_consistency = true;
  1377. ifo.allow_global_seqno = true;
  1378. ifo.write_global_seqno = true;
  1379. ifo.verify_checksums_before_ingest = false;
  1380. ASSERT_OK(db_->IngestExternalFile({file}, ifo));
  1381. for (int i = 5; i < 25; i++) {
  1382. std::string res;
  1383. ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &res).IsNotFound());
  1384. }
  1385. }
  1386. TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
  1387. int kNumLevels = 7;
  1388. Options options = CurrentOptions();
  1389. options.disable_auto_compactions = true;
  1390. options.num_levels = kNumLevels;
  1391. Reopen(options);
  1392. std::map<std::string, std::string> true_data;
  1393. int file_id = 1;
  1394. // prevent range deletions from being dropped due to becoming obsolete.
  1395. const Snapshot* snapshot = db_->GetSnapshot();
  1396. // range del [0, 50) in L6 file, [50, 100) in L0 file, [100, 150) in memtable
  1397. for (int i = 0; i < 3; i++) {
  1398. if (i != 0) {
  1399. ASSERT_OK(db_->Flush(FlushOptions()));
  1400. if (i == 1) {
  1401. MoveFilesToLevel(kNumLevels - 1);
  1402. }
  1403. }
  1404. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
  1405. Key(50 * i), Key(50 * (i + 1))));
  1406. }
  1407. ASSERT_EQ(1, NumTableFilesAtLevel(0));
  1408. ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2));
  1409. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 1));
  1410. bool write_global_seqno = std::get<0>(GetParam());
  1411. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1412. // overlaps with L0 file but not memtable, so flush is skipped and file is
  1413. // ingested into L0
  1414. SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
  1415. ASSERT_OK(GenerateAndAddExternalFile(
  1416. options, {60, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1417. {{65, 70}, {70, 85}}, file_id++, write_global_seqno,
  1418. verify_checksums_before_ingest, &true_data));
  1419. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
  1420. ASSERT_EQ(2, NumTableFilesAtLevel(0));
  1421. ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2));
  1422. ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
  1423. // overlaps with L6 file but not memtable or L0 file, so flush is skipped and
  1424. // file is ingested into L5
  1425. ASSERT_OK(GenerateAndAddExternalFile(
  1426. options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1427. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1428. &true_data));
  1429. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
  1430. ASSERT_EQ(2, NumTableFilesAtLevel(0));
  1431. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
  1432. ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
  1433. // overlaps with L5 file but not memtable or L0 file, so flush is skipped and
  1434. // file is ingested into L4
  1435. ASSERT_OK(GenerateAndAddExternalFile(
  1436. options, {}, {}, {{5, 15}}, file_id++, write_global_seqno,
  1437. verify_checksums_before_ingest, &true_data));
  1438. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
  1439. ASSERT_EQ(2, NumTableFilesAtLevel(0));
  1440. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
  1441. ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 2));
  1442. ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
  1443. // ingested file overlaps with memtable, so flush is triggered before the file
  1444. // is ingested such that the ingested data is considered newest. So L0 file
  1445. // count increases by two.
  1446. ASSERT_OK(GenerateAndAddExternalFile(
  1447. options, {100, 140}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1448. file_id++, write_global_seqno, verify_checksums_before_ingest,
  1449. &true_data));
  1450. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
  1451. ASSERT_EQ(4, NumTableFilesAtLevel(0));
  1452. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
  1453. ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
  1454. // snapshot unneeded now that all range deletions are persisted
  1455. db_->ReleaseSnapshot(snapshot);
  1456. // overlaps with nothing, so places at bottom level and skips incrementing
  1457. // seqnum.
  1458. ASSERT_OK(GenerateAndAddExternalFile(
  1459. options, {151, 175}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1460. {{160, 200}}, file_id++, write_global_seqno,
  1461. verify_checksums_before_ingest, &true_data));
  1462. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
  1463. ASSERT_EQ(4, NumTableFilesAtLevel(0));
  1464. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
  1465. ASSERT_EQ(2, NumTableFilesAtLevel(options.num_levels - 1));
  1466. VerifyDBFromMap(true_data);
  1467. }
  1468. TEST_F(ExternalSSTFileBasicTest, AdjacentRangeDeletionTombstones) {
  1469. Options options = CurrentOptions();
  1470. SstFileWriter sst_file_writer(EnvOptions(), options);
  1471. // file8.sst (delete 300 => 400)
  1472. std::string file8 = sst_files_dir_ + "file8.sst";
  1473. ASSERT_OK(sst_file_writer.Open(file8));
  1474. ASSERT_OK(sst_file_writer.DeleteRange(Key(300), Key(400)));
  1475. ExternalSstFileInfo file8_info;
  1476. Status s = sst_file_writer.Finish(&file8_info);
  1477. ASSERT_OK(s) << s.ToString();
  1478. ASSERT_EQ(file8_info.file_path, file8);
  1479. ASSERT_EQ(file8_info.num_entries, 0);
  1480. ASSERT_EQ(file8_info.smallest_key, "");
  1481. ASSERT_EQ(file8_info.largest_key, "");
  1482. ASSERT_EQ(file8_info.num_range_del_entries, 1);
  1483. ASSERT_EQ(file8_info.smallest_range_del_key, Key(300));
  1484. ASSERT_EQ(file8_info.largest_range_del_key, Key(400));
  1485. // file9.sst (delete 400 => 500)
  1486. std::string file9 = sst_files_dir_ + "file9.sst";
  1487. ASSERT_OK(sst_file_writer.Open(file9));
  1488. ASSERT_OK(sst_file_writer.DeleteRange(Key(400), Key(500)));
  1489. ExternalSstFileInfo file9_info;
  1490. s = sst_file_writer.Finish(&file9_info);
  1491. ASSERT_OK(s) << s.ToString();
  1492. ASSERT_EQ(file9_info.file_path, file9);
  1493. ASSERT_EQ(file9_info.num_entries, 0);
  1494. ASSERT_EQ(file9_info.smallest_key, "");
  1495. ASSERT_EQ(file9_info.largest_key, "");
  1496. ASSERT_EQ(file9_info.num_range_del_entries, 1);
  1497. ASSERT_EQ(file9_info.smallest_range_del_key, Key(400));
  1498. ASSERT_EQ(file9_info.largest_range_del_key, Key(500));
  1499. // Range deletion tombstones are exclusive on their end key, so these SSTs
  1500. // should not be considered as overlapping.
  1501. s = DeprecatedAddFile({file8, file9});
  1502. ASSERT_OK(s) << s.ToString();
  1503. ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
  1504. DestroyAndRecreateExternalSSTFilesDir();
  1505. }
  1506. TEST_F(ExternalSSTFileBasicTest, UnorderedRangeDeletions) {
  1507. int kNumLevels = 7;
  1508. Options options = CurrentOptions();
  1509. options.disable_auto_compactions = true;
  1510. options.num_levels = kNumLevels;
  1511. Reopen(options);
  1512. std::map<std::string, std::string> true_data;
  1513. int file_id = 1;
  1514. // prevent range deletions from being dropped due to becoming obsolete.
  1515. const Snapshot* snapshot = db_->GetSnapshot();
  1516. // Range del [0, 50) in memtable
  1517. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
  1518. Key(50)));
  1519. // Out of order range del overlaps memtable, so flush is required before file
  1520. // is ingested into L0
  1521. ASSERT_OK(GenerateAndAddExternalFile(
  1522. options, {60, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1523. {{65, 70}, {45, 50}}, file_id++, true /* write_global_seqno */,
  1524. true /* verify_checksums_before_ingest */, &true_data));
  1525. ASSERT_EQ(2, true_data.size());
  1526. ASSERT_EQ(2, NumTableFilesAtLevel(0));
  1527. ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 1));
  1528. VerifyDBFromMap(true_data);
  1529. // Compact to L6
  1530. MoveFilesToLevel(kNumLevels - 1);
  1531. ASSERT_EQ(0, NumTableFilesAtLevel(0));
  1532. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 1));
  1533. VerifyDBFromMap(true_data);
  1534. // Ingest a file containing out of order range dels that cover nothing
  1535. ASSERT_OK(GenerateAndAddExternalFile(
  1536. options, {151, 175}, {ValueType::kTypeValue, ValueType::kTypeValue},
  1537. {{160, 200}, {120, 180}}, file_id++, true /* write_global_seqno */,
  1538. true /* verify_checksums_before_ingest */, &true_data));
  1539. ASSERT_EQ(4, true_data.size());
  1540. ASSERT_EQ(0, NumTableFilesAtLevel(0));
  1541. ASSERT_EQ(2, NumTableFilesAtLevel(kNumLevels - 1));
  1542. VerifyDBFromMap(true_data);
  1543. // Ingest a file containing out of order range dels that cover keys in L6
  1544. ASSERT_OK(GenerateAndAddExternalFile(
  1545. options, {}, {}, {{190, 200}, {170, 180}, {55, 65}}, file_id++,
  1546. true /* write_global_seqno */, true /* verify_checksums_before_ingest */,
  1547. &true_data));
  1548. ASSERT_EQ(2, true_data.size());
  1549. ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
  1550. ASSERT_EQ(2, NumTableFilesAtLevel(kNumLevels - 1));
  1551. VerifyDBFromMap(true_data);
  1552. db_->ReleaseSnapshot(snapshot);
  1553. }
  1554. TEST_F(ExternalSSTFileBasicTest, RangeDeletionEndComesBeforeStart) {
  1555. Options options = CurrentOptions();
  1556. SstFileWriter sst_file_writer(EnvOptions(), options);
  1557. // "file.sst"
  1558. // Verify attempt to delete 300 => 200 fails.
  1559. // Then, verify attempt to delete 300 => 300 succeeds but writes nothing.
  1560. // Afterwards, verify attempt to delete 300 => 400 works normally.
  1561. std::string file = sst_files_dir_ + "file.sst";
  1562. ASSERT_OK(sst_file_writer.Open(file));
  1563. ASSERT_TRUE(
  1564. sst_file_writer.DeleteRange(Key(300), Key(200)).IsInvalidArgument());
  1565. ASSERT_OK(sst_file_writer.DeleteRange(Key(300), Key(300)));
  1566. ASSERT_OK(sst_file_writer.DeleteRange(Key(300), Key(400)));
  1567. ExternalSstFileInfo file_info;
  1568. Status s = sst_file_writer.Finish(&file_info);
  1569. ASSERT_OK(s) << s.ToString();
  1570. ASSERT_EQ(file_info.file_path, file);
  1571. ASSERT_EQ(file_info.num_entries, 0);
  1572. ASSERT_EQ(file_info.smallest_key, "");
  1573. ASSERT_EQ(file_info.largest_key, "");
  1574. ASSERT_EQ(file_info.num_range_del_entries, 1);
  1575. ASSERT_EQ(file_info.smallest_range_del_key, Key(300));
  1576. ASSERT_EQ(file_info.largest_range_del_key, Key(400));
  1577. }
  1578. TEST_P(ExternalSSTFileBasicTest, IngestFileWithBadBlockChecksum) {
  1579. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1580. if (!verify_checksums_before_ingest) {
  1581. ROCKSDB_GTEST_BYPASS("Bypassing test when !verify_checksums_before_ingest");
  1582. return;
  1583. }
  1584. bool change_checksum_called = false;
  1585. const auto& change_checksum = [&](void* arg) {
  1586. if (!change_checksum_called) {
  1587. char* buf = static_cast<char*>(arg);
  1588. assert(nullptr != buf);
  1589. buf[0] ^= 0x1;
  1590. change_checksum_called = true;
  1591. }
  1592. };
  1593. SyncPoint::GetInstance()->DisableProcessing();
  1594. SyncPoint::GetInstance()->ClearAllCallBacks();
  1595. SyncPoint::GetInstance()->SetCallBack(
  1596. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
  1597. change_checksum);
  1598. SyncPoint::GetInstance()->EnableProcessing();
  1599. int file_id = 0;
  1600. bool write_global_seqno = std::get<0>(GetParam());
  1601. do {
  1602. Options options = CurrentOptions();
  1603. DestroyAndReopen(options);
  1604. std::map<std::string, std::string> true_data;
  1605. Status s = GenerateAndAddExternalFile(
  1606. options, {1, 2, 3, 4, 5, 6}, ValueType::kTypeValue, file_id++,
  1607. write_global_seqno, /*verify_checksums_before_ingest=*/true,
  1608. &true_data);
  1609. ASSERT_NOK(s);
  1610. change_checksum_called = false;
  1611. } while (ChangeOptionsForFileIngestionTest());
  1612. }
  1613. TEST_P(ExternalSSTFileBasicTest, IngestFileWithCorruptedDataBlock) {
  1614. if (!random_rwfile_supported_) {
  1615. ROCKSDB_GTEST_SKIP("Test requires NewRandomRWFile support");
  1616. return;
  1617. }
  1618. SyncPoint::GetInstance()->DisableProcessing();
  1619. int file_id = 0;
  1620. EnvOptions env_options;
  1621. Random rnd(301);
  1622. do {
  1623. Options options = CurrentOptions();
  1624. options.compression = kNoCompression;
  1625. BlockBasedTableOptions table_options;
  1626. table_options.block_size = 4 * 1024;
  1627. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  1628. std::string file_path = sst_files_dir_ + std::to_string(file_id++);
  1629. SstFileWriter sst_file_writer(env_options, options);
  1630. Status s = sst_file_writer.Open(file_path);
  1631. ASSERT_OK(s);
  1632. // This should write more than 2 data blocks.
  1633. for (int i = 0; i != 100; ++i) {
  1634. std::string key = Key(i);
  1635. std::string value = rnd.RandomString(200);
  1636. ASSERT_OK(sst_file_writer.Put(key, value));
  1637. }
  1638. ASSERT_OK(sst_file_writer.Finish());
  1639. {
  1640. // Get file size
  1641. uint64_t file_size = 0;
  1642. ASSERT_OK(env_->GetFileSize(file_path, &file_size));
  1643. ASSERT_GT(file_size, 8);
  1644. std::unique_ptr<RandomRWFile> rwfile;
  1645. ASSERT_OK(env_->NewRandomRWFile(file_path, &rwfile, EnvOptions()));
  1646. // Corrupt the second data block.
  1647. // We need to corrupt a non-first and non-last data block
  1648. // since we access them to get smallest and largest internal
  1649. // key in the file in GetIngestedFileInfo().
  1650. const uint64_t offset = 5000;
  1651. char scratch[8] = {0};
  1652. Slice buf;
  1653. ASSERT_OK(rwfile->Read(offset, sizeof(scratch), &buf, scratch));
  1654. scratch[0] ^= 0xff; // flip one bit
  1655. ASSERT_OK(rwfile->Write(offset, buf));
  1656. }
  1657. // Ingest file.
  1658. IngestExternalFileOptions ifo;
  1659. ifo.write_global_seqno = std::get<0>(GetParam());
  1660. ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
  1661. s = db_->IngestExternalFile({file_path}, ifo);
  1662. if (ifo.verify_checksums_before_ingest) {
  1663. ASSERT_NOK(s);
  1664. } else {
  1665. ASSERT_OK(s);
  1666. }
  1667. } while (ChangeOptionsForFileIngestionTest());
  1668. }
  1669. TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) {
  1670. bool verify_checksums_before_ingest = std::get<1>(GetParam());
  1671. if (!verify_checksums_before_ingest) {
  1672. ROCKSDB_GTEST_BYPASS("Bypassing test when !verify_checksums_before_ingest");
  1673. return;
  1674. }
  1675. if (!random_rwfile_supported_) {
  1676. ROCKSDB_GTEST_SKIP("Test requires NewRandomRWFile support");
  1677. return;
  1678. }
  1679. uint64_t props_block_offset = 0;
  1680. size_t props_block_size = 0;
  1681. const auto& get_props_block_offset = [&](void* arg) {
  1682. props_block_offset = *static_cast<uint64_t*>(arg);
  1683. };
  1684. const auto& get_props_block_size = [&](void* arg) {
  1685. props_block_size = *static_cast<uint64_t*>(arg);
  1686. };
  1687. SyncPoint::GetInstance()->DisableProcessing();
  1688. SyncPoint::GetInstance()->ClearAllCallBacks();
  1689. SyncPoint::GetInstance()->SetCallBack(
  1690. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
  1691. get_props_block_offset);
  1692. SyncPoint::GetInstance()->SetCallBack(
  1693. "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
  1694. get_props_block_size);
  1695. SyncPoint::GetInstance()->EnableProcessing();
  1696. int file_id = 0;
  1697. Random64 rand(time(nullptr));
  1698. do {
  1699. std::string file_path = sst_files_dir_ + std::to_string(file_id++);
  1700. Options options = CurrentOptions();
  1701. SstFileWriter sst_file_writer(EnvOptions(), options);
  1702. Status s = sst_file_writer.Open(file_path);
  1703. ASSERT_OK(s);
  1704. for (int i = 0; i != 100; ++i) {
  1705. std::string key = Key(i);
  1706. std::string value = Key(i) + std::to_string(0);
  1707. ASSERT_OK(sst_file_writer.Put(key, value));
  1708. }
  1709. ASSERT_OK(sst_file_writer.Finish());
  1710. {
  1711. std::unique_ptr<RandomRWFile> rwfile;
  1712. ASSERT_OK(env_->NewRandomRWFile(file_path, &rwfile, EnvOptions()));
  1713. // Manually corrupt the file
  1714. ASSERT_GT(props_block_size, 8);
  1715. uint64_t offset =
  1716. props_block_offset + rand.Next() % (props_block_size - 8);
  1717. char scratch[8] = {0};
  1718. Slice buf;
  1719. ASSERT_OK(rwfile->Read(offset, sizeof(scratch), &buf, scratch));
  1720. scratch[0] ^= 0xff; // flip one bit
  1721. ASSERT_OK(rwfile->Write(offset, buf));
  1722. }
  1723. // Ingest file.
  1724. IngestExternalFileOptions ifo;
  1725. ifo.write_global_seqno = std::get<0>(GetParam());
  1726. ifo.verify_checksums_before_ingest = true;
  1727. s = db_->IngestExternalFile({file_path}, ifo);
  1728. ASSERT_NOK(s);
  1729. } while (ChangeOptionsForFileIngestionTest());
  1730. }
  1731. TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
  1732. Options options = CurrentOptions();
  1733. std::vector<std::string> files;
  1734. {
  1735. SstFileWriter sst_file_writer(EnvOptions(), options);
  1736. std::string file1 = sst_files_dir_ + "file1.sst";
  1737. ASSERT_OK(sst_file_writer.Open(file1));
  1738. ASSERT_OK(sst_file_writer.Put("a", "a1"));
  1739. ASSERT_OK(sst_file_writer.Put("i", "i1"));
  1740. ExternalSstFileInfo file1_info;
  1741. ASSERT_OK(sst_file_writer.Finish(&file1_info));
  1742. files.push_back(std::move(file1));
  1743. }
  1744. {
  1745. SstFileWriter sst_file_writer(EnvOptions(), options);
  1746. std::string file2 = sst_files_dir_ + "file2.sst";
  1747. ASSERT_OK(sst_file_writer.Open(file2));
  1748. ASSERT_OK(sst_file_writer.Put("i", "i2"));
  1749. ExternalSstFileInfo file2_info;
  1750. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  1751. files.push_back(std::move(file2));
  1752. }
  1753. {
  1754. SstFileWriter sst_file_writer(EnvOptions(), options);
  1755. std::string file3 = sst_files_dir_ + "file3.sst";
  1756. ASSERT_OK(sst_file_writer.Open(file3));
  1757. ASSERT_OK(sst_file_writer.Put("k", "k1"));
  1758. ASSERT_OK(sst_file_writer.Put("m", "m1"));
  1759. ExternalSstFileInfo file3_info;
  1760. ASSERT_OK(sst_file_writer.Finish(&file3_info));
  1761. files.push_back(std::move(file3));
  1762. }
  1763. // This could be ingested to the same level as file3 and file4, but the
  1764. // greedy/simple overlap check relegates it to a later level
  1765. {
  1766. SstFileWriter sst_file_writer(EnvOptions(), options);
  1767. std::string file4 = sst_files_dir_ + "file4.sst";
  1768. ASSERT_OK(sst_file_writer.Open(file4));
  1769. ASSERT_OK(sst_file_writer.Put("j", "j1"));
  1770. ExternalSstFileInfo file4_info;
  1771. ASSERT_OK(sst_file_writer.Finish(&file4_info));
  1772. files.push_back(std::move(file4));
  1773. }
  1774. {
  1775. SstFileWriter sst_file_writer(EnvOptions(), options);
  1776. std::string file5 = sst_files_dir_ + "file5.sst";
  1777. ASSERT_OK(sst_file_writer.Open(file5));
  1778. ASSERT_OK(sst_file_writer.Put("i", "i3"));
  1779. ExternalSstFileInfo file5_info;
  1780. ASSERT_OK(sst_file_writer.Finish(&file5_info));
  1781. files.push_back(std::move(file5));
  1782. }
  1783. IngestExternalFileOptions ifo;
  1784. ifo.allow_global_seqno = false;
  1785. ASSERT_NOK(db_->IngestExternalFile(files, ifo));
  1786. ifo.allow_global_seqno = true;
  1787. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  1788. ASSERT_EQ(Get("a"), "a1");
  1789. ASSERT_EQ(Get("i"), "i3");
  1790. ASSERT_EQ(Get("j"), "j1");
  1791. ASSERT_EQ(Get("k"), "k1");
  1792. ASSERT_EQ(Get("m"), "m1");
  1793. int total_keys = 0;
  1794. Iterator* iter = db_->NewIterator(ReadOptions());
  1795. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  1796. ASSERT_OK(iter->status());
  1797. total_keys++;
  1798. }
  1799. ASSERT_OK(iter->status());
  1800. delete iter;
  1801. ASSERT_EQ(total_keys, 5);
  1802. ASSERT_EQ(1, NumTableFilesAtLevel(6));
  1803. ASSERT_EQ(2, NumTableFilesAtLevel(5));
  1804. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  1805. }
  1806. class CompactionJobStatsCheckerForFilteredFiles : public EventListener {
  1807. public:
  1808. CompactionJobStatsCheckerForFilteredFiles(
  1809. int num_input_files, int num_input_files_at_output_level,
  1810. int num_filtered_input_files,
  1811. int num_filtered_input_files_at_output_level)
  1812. : num_input_files_(num_input_files),
  1813. num_input_files_at_output_level_(num_input_files_at_output_level),
  1814. num_filtered_input_files_(num_filtered_input_files),
  1815. num_filtered_input_files_at_output_level_(
  1816. num_filtered_input_files_at_output_level) {}
  1817. void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
  1818. std::lock_guard<std::mutex> lock(mutex_);
  1819. ASSERT_EQ(num_input_files_, ci.stats.num_input_files);
  1820. ASSERT_EQ(num_input_files_at_output_level_,
  1821. ci.stats.num_input_files_at_output_level);
  1822. ASSERT_EQ(num_filtered_input_files_, ci.stats.num_filtered_input_files);
  1823. ASSERT_EQ(num_filtered_input_files_at_output_level_,
  1824. ci.stats.num_filtered_input_files_at_output_level);
  1825. ASSERT_EQ(ci.stats.total_skipped_input_bytes,
  1826. expected_compaction_skipped_file_size_);
  1827. }
  1828. void SetExpectedCompactionSkippedFileSize(uint64_t expected_size) {
  1829. std::lock_guard<std::mutex> lock(mutex_);
  1830. expected_compaction_skipped_file_size_ = expected_size;
  1831. }
  1832. private:
  1833. int num_input_files_ = 0;
  1834. int num_input_files_at_output_level_ = 0;
  1835. int num_filtered_input_files_ = 0;
  1836. int num_filtered_input_files_at_output_level_ = 0;
  1837. std::mutex mutex_;
  1838. uint64_t expected_compaction_skipped_file_size_ = 0;
  1839. };
  1840. TEST_F(ExternalSSTFileBasicTest, AtomicReplaceDataWithStandaloneRangeDeletion) {
  1841. Options options = CurrentOptions();
  1842. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  1843. int kCompactionNumInputFiles = 1;
  1844. int kCompactionNumInputFilesAtOutputLevel = 0;
  1845. int kCompactionNumFilteredInputFiles = 2;
  1846. int kCompactionNumFilteredInputFilesAtOutputLevel = 2;
  1847. auto compaction_listener =
  1848. std::make_shared<CompactionJobStatsCheckerForFilteredFiles>(
  1849. kCompactionNumInputFiles, kCompactionNumInputFilesAtOutputLevel,
  1850. kCompactionNumFilteredInputFiles,
  1851. kCompactionNumFilteredInputFilesAtOutputLevel);
  1852. options.listeners.push_back(compaction_listener);
  1853. DestroyAndReopen(options);
  1854. size_t compaction_skipped_file_size = 0;
  1855. std::vector<std::string> files;
  1856. {
  1857. // Writes first version of data in range partitioned files.
  1858. SstFileWriter sst_file_writer(EnvOptions(), options);
  1859. std::string file1 = sst_files_dir_ + "file1.sst";
  1860. ASSERT_OK(sst_file_writer.Open(file1));
  1861. ASSERT_OK(sst_file_writer.Put("a", "a1"));
  1862. ASSERT_OK(sst_file_writer.Put("b", "b1"));
  1863. ExternalSstFileInfo file1_info;
  1864. ASSERT_OK(sst_file_writer.Finish(&file1_info));
  1865. compaction_skipped_file_size += file1_info.file_size;
  1866. files.push_back(std::move(file1));
  1867. std::string file2 = sst_files_dir_ + "file2.sst";
  1868. ASSERT_OK(sst_file_writer.Open(file2));
  1869. ASSERT_OK(sst_file_writer.Put("x", "x1"));
  1870. ASSERT_OK(sst_file_writer.Put("y", "y1"));
  1871. ExternalSstFileInfo file2_info;
  1872. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  1873. compaction_skipped_file_size += file2_info.file_size;
  1874. files.push_back(std::move(file2));
  1875. compaction_listener->SetExpectedCompactionSkippedFileSize(
  1876. compaction_skipped_file_size);
  1877. }
  1878. IngestExternalFileOptions ifo;
  1879. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  1880. ASSERT_EQ(Get("a"), "a1");
  1881. ASSERT_EQ(Get("b"), "b1");
  1882. ASSERT_EQ(Get("x"), "x1");
  1883. ASSERT_EQ(Get("y"), "y1");
  1884. ASSERT_EQ(2, NumTableFilesAtLevel(6));
  1885. {
  1886. // Atomically delete old version of data with one range delete file.
  1887. // And a new batch of range partitioned files with new version of data.
  1888. files.clear();
  1889. SstFileWriter sst_file_writer(EnvOptions(), options);
  1890. std::string file2 = sst_files_dir_ + "file2.sst";
  1891. ASSERT_OK(sst_file_writer.Open(file2));
  1892. ASSERT_OK(sst_file_writer.DeleteRange("a", "z"));
  1893. ExternalSstFileInfo file2_info;
  1894. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  1895. files.push_back(std::move(file2));
  1896. std::string file3 = sst_files_dir_ + "file3.sst";
  1897. ASSERT_OK(sst_file_writer.Open(file3));
  1898. ASSERT_OK(sst_file_writer.Put("a", "a2"));
  1899. ASSERT_OK(sst_file_writer.Put("b", "b2"));
  1900. ExternalSstFileInfo file3_info;
  1901. ASSERT_OK(sst_file_writer.Finish(&file3_info));
  1902. files.push_back(std::move(file3));
  1903. std::string file4 = sst_files_dir_ + "file4.sst";
  1904. ASSERT_OK(sst_file_writer.Open(file4));
  1905. ASSERT_OK(sst_file_writer.Put("x", "x2"));
  1906. ASSERT_OK(sst_file_writer.Put("y", "y2"));
  1907. ExternalSstFileInfo file4_info;
  1908. ASSERT_OK(sst_file_writer.Finish(&file4_info));
  1909. files.push_back(std::move(file4));
  1910. }
  1911. const Snapshot* snapshot = db_->GetSnapshot();
  1912. auto seqno_before_ingestion = db_->GetLatestSequenceNumber();
  1913. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  1914. // Overlapping files each occupy one new sequence number.
  1915. ASSERT_EQ(db_->GetLatestSequenceNumber(), seqno_before_ingestion + 3);
  1916. // Check old version of data, big range deletion, new version of data are
  1917. // on separate levels.
  1918. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  1919. ASSERT_EQ(1, NumTableFilesAtLevel(5));
  1920. ASSERT_EQ(2, NumTableFilesAtLevel(6));
  1921. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1922. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  1923. ASSERT_EQ(1, NumTableFilesAtLevel(5));
  1924. ASSERT_EQ(2, NumTableFilesAtLevel(6));
  1925. bool compaction_iter_input_checked = false;
  1926. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1927. "VersionSet::MakeInputIterator:NewCompactionMergingIterator",
  1928. [&](void* arg) {
  1929. size_t* num_input_files = static_cast<size_t*>(arg);
  1930. EXPECT_EQ(1, *num_input_files);
  1931. compaction_iter_input_checked = true;
  1932. });
  1933. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1934. db_->ReleaseSnapshot(snapshot);
  1935. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  1936. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  1937. ASSERT_EQ(0, NumTableFilesAtLevel(5));
  1938. ASSERT_EQ(0, NumTableFilesAtLevel(6));
  1939. ASSERT_TRUE(compaction_iter_input_checked);
  1940. ASSERT_EQ(Get("a"), "a2");
  1941. ASSERT_EQ(Get("b"), "b2");
  1942. ASSERT_EQ(Get("x"), "x2");
  1943. ASSERT_EQ(Get("y"), "y2");
  1944. VerifyInputFilesInternalStatsForOutputLevel(
  1945. /*output_level*/ 6,
  1946. kCompactionNumInputFiles - kCompactionNumInputFilesAtOutputLevel,
  1947. kCompactionNumInputFilesAtOutputLevel,
  1948. kCompactionNumFilteredInputFiles -
  1949. kCompactionNumFilteredInputFilesAtOutputLevel,
  1950. kCompactionNumFilteredInputFilesAtOutputLevel,
  1951. /*bytes_skipped_non_output_levels*/ 0,
  1952. /*bytes_skipped_output_level*/ compaction_skipped_file_size);
  1953. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  1954. }
  1955. TEST_F(ExternalSSTFileBasicTest,
  1956. PartiallyReplaceDataWithOneStandaloneRangeDeletion) {
  1957. Options options = CurrentOptions();
  1958. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  1959. int kCompactionNumInputFiles = 2;
  1960. int kCompactionNumInputFilesAtOutputLevel = 1;
  1961. int kCompactionNumFilteredInputFiles = 1;
  1962. int kCompactionNumFilteredInputFilesAtOutputLevel = 1;
  1963. auto compaction_listener =
  1964. std::make_shared<CompactionJobStatsCheckerForFilteredFiles>(
  1965. kCompactionNumInputFiles, kCompactionNumInputFilesAtOutputLevel,
  1966. kCompactionNumFilteredInputFiles,
  1967. kCompactionNumFilteredInputFilesAtOutputLevel);
  1968. options.listeners.push_back(compaction_listener);
  1969. DestroyAndReopen(options);
  1970. std::vector<std::string> files;
  1971. size_t compaction_skipped_file_size = 0;
  1972. {
  1973. // Writes first version of data in range partitioned files.
  1974. SstFileWriter sst_file_writer(EnvOptions(), options);
  1975. std::string file1 = sst_files_dir_ + "file1.sst";
  1976. ASSERT_OK(sst_file_writer.Open(file1));
  1977. ASSERT_OK(sst_file_writer.Put("a", "a1"));
  1978. ASSERT_OK(sst_file_writer.Put("b", "b1"));
  1979. ExternalSstFileInfo file1_info;
  1980. ASSERT_OK(sst_file_writer.Finish(&file1_info));
  1981. compaction_skipped_file_size += file1_info.file_size;
  1982. files.push_back(std::move(file1));
  1983. compaction_listener->SetExpectedCompactionSkippedFileSize(
  1984. compaction_skipped_file_size);
  1985. std::string file2 = sst_files_dir_ + "file2.sst";
  1986. ASSERT_OK(sst_file_writer.Open(file2));
  1987. ASSERT_OK(sst_file_writer.Put("x", "x1"));
  1988. ASSERT_OK(sst_file_writer.Put("y", "y"));
  1989. ExternalSstFileInfo file2_info;
  1990. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  1991. files.push_back(std::move(file2));
  1992. }
  1993. IngestExternalFileOptions ifo;
  1994. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  1995. ASSERT_EQ(Get("a"), "a1");
  1996. ASSERT_EQ(Get("b"), "b1");
  1997. ASSERT_EQ(Get("x"), "x1");
  1998. ASSERT_EQ(Get("y"), "y");
  1999. ASSERT_EQ(2, NumTableFilesAtLevel(6));
  2000. {
  2001. // Partially delete old version of data with one range delete file. And
  2002. // add new version of data for deleted range.
  2003. files.clear();
  2004. SstFileWriter sst_file_writer(EnvOptions(), options);
  2005. std::string file2 = sst_files_dir_ + "file2.sst";
  2006. ASSERT_OK(sst_file_writer.Open(file2));
  2007. ASSERT_OK(sst_file_writer.DeleteRange("a", "y"));
  2008. ExternalSstFileInfo file2_info;
  2009. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  2010. files.push_back(std::move(file2));
  2011. std::string file3 = sst_files_dir_ + "file3.sst";
  2012. ASSERT_OK(sst_file_writer.Open(file3));
  2013. ASSERT_OK(sst_file_writer.Put("a", "a2"));
  2014. ASSERT_OK(sst_file_writer.Put("b", "b2"));
  2015. ExternalSstFileInfo file3_info;
  2016. ASSERT_OK(sst_file_writer.Finish(&file3_info));
  2017. files.push_back(std::move(file3));
  2018. std::string file4 = sst_files_dir_ + "file4.sst";
  2019. ASSERT_OK(sst_file_writer.Open(file4));
  2020. ASSERT_OK(sst_file_writer.Put("h", "h1"));
  2021. ASSERT_OK(sst_file_writer.Put("x", "x2"));
  2022. ExternalSstFileInfo file4_info;
  2023. ASSERT_OK(sst_file_writer.Finish(&file4_info));
  2024. files.push_back(std::move(file4));
  2025. }
  2026. bool compaction_iter_input_checked = false;
  2027. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2028. "VersionSet::MakeInputIterator:NewCompactionMergingIterator",
  2029. [&](void* arg) {
  2030. size_t* num_input_files = static_cast<size_t*>(arg);
  2031. EXPECT_EQ(2, *num_input_files);
  2032. compaction_iter_input_checked = true;
  2033. });
  2034. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2035. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  2036. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2037. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  2038. ASSERT_EQ(0, NumTableFilesAtLevel(5));
  2039. ASSERT_EQ(1, NumTableFilesAtLevel(6));
  2040. ASSERT_TRUE(compaction_iter_input_checked);
  2041. ASSERT_EQ(Get("a"), "a2");
  2042. ASSERT_EQ(Get("b"), "b2");
  2043. ASSERT_EQ(Get("h"), "h1");
  2044. ASSERT_EQ(Get("x"), "x2");
  2045. ASSERT_EQ(Get("y"), "y");
  2046. VerifyInputFilesInternalStatsForOutputLevel(
  2047. /*output_level*/ 6,
  2048. kCompactionNumInputFiles - kCompactionNumInputFilesAtOutputLevel,
  2049. kCompactionNumInputFilesAtOutputLevel,
  2050. kCompactionNumFilteredInputFiles -
  2051. kCompactionNumFilteredInputFilesAtOutputLevel,
  2052. kCompactionNumFilteredInputFilesAtOutputLevel,
  2053. /*bytes_skipped_non_output_levels*/ 0,
  2054. /*bytes_skipped_output_level*/ compaction_skipped_file_size);
  2055. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2056. }
  2057. TEST_F(ExternalSSTFileBasicTest,
  2058. PartiallyReplaceDataWithMultipleStandaloneRangeDeletions) {
  2059. Options options = CurrentOptions();
  2060. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  2061. int kCompactionNumInputFiles = 2;
  2062. int kCompactionNumInputFilesAtOutputLevel = 0;
  2063. int kCompactionNumFilteredInputFiles = 2;
  2064. int kCompactionNumFilteredInputFilesAtOutputLevel = 2;
  2065. // Two compactions each included on standalone range deletion file that
  2066. // filters input file on the non start level.
  2067. auto compaction_listener =
  2068. std::make_shared<CompactionJobStatsCheckerForFilteredFiles>(
  2069. kCompactionNumInputFiles / 2,
  2070. kCompactionNumInputFilesAtOutputLevel / 2,
  2071. kCompactionNumFilteredInputFiles / 2,
  2072. kCompactionNumFilteredInputFilesAtOutputLevel / 2);
  2073. options.listeners.push_back(compaction_listener);
  2074. DestroyAndReopen(options);
  2075. std::vector<std::string> files;
  2076. ExternalSstFileInfo file1_info;
  2077. ExternalSstFileInfo file3_info;
  2078. {
  2079. SstFileWriter sst_file_writer(EnvOptions(), options);
  2080. std::string file1 = sst_files_dir_ + "file1.sst";
  2081. ASSERT_OK(sst_file_writer.Open(file1));
  2082. ASSERT_OK(sst_file_writer.Put("a", "a1"));
  2083. ASSERT_OK(sst_file_writer.Finish(&file1_info));
  2084. files.push_back(std::move(file1));
  2085. std::string file2 = sst_files_dir_ + "file2.sst";
  2086. ASSERT_OK(sst_file_writer.Open(file2));
  2087. ASSERT_OK(sst_file_writer.Put("h", "h"));
  2088. ExternalSstFileInfo file2_info;
  2089. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  2090. files.push_back(std::move(file2));
  2091. std::string file3 = sst_files_dir_ + "file3.sst";
  2092. ASSERT_OK(sst_file_writer.Open(file3));
  2093. ASSERT_OK(sst_file_writer.Put("x", "x1"));
  2094. ASSERT_OK(sst_file_writer.Finish(&file3_info));
  2095. files.push_back(std::move(file3));
  2096. }
  2097. IngestExternalFileOptions ifo;
  2098. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  2099. ASSERT_EQ(Get("a"), "a1");
  2100. ASSERT_EQ(Get("h"), "h");
  2101. ASSERT_EQ(Get("x"), "x1");
  2102. ASSERT_EQ(3, NumTableFilesAtLevel(6));
  2103. {
  2104. files.clear();
  2105. SstFileWriter sst_file_writer(EnvOptions(), options);
  2106. std::string file4 = sst_files_dir_ + "file4.sst";
  2107. ASSERT_OK(sst_file_writer.Open(file4));
  2108. ASSERT_OK(sst_file_writer.DeleteRange("a", "b"));
  2109. ExternalSstFileInfo file4_info;
  2110. ASSERT_OK(sst_file_writer.Finish(&file4_info));
  2111. files.push_back(std::move(file4));
  2112. std::string file5 = sst_files_dir_ + "file5.sst";
  2113. ASSERT_OK(sst_file_writer.Open(file5));
  2114. ASSERT_OK(sst_file_writer.DeleteRange("x", "y"));
  2115. ExternalSstFileInfo file5_info;
  2116. ASSERT_OK(sst_file_writer.Finish(&file5_info));
  2117. files.push_back(std::move(file5));
  2118. std::string file6 = sst_files_dir_ + "file6.sst";
  2119. ASSERT_OK(sst_file_writer.Open(file6));
  2120. ASSERT_OK(sst_file_writer.Put("a", "a2"));
  2121. ExternalSstFileInfo file6_info;
  2122. ASSERT_OK(sst_file_writer.Finish(&file6_info));
  2123. files.push_back(std::move(file6));
  2124. std::string file7 = sst_files_dir_ + "file7.sst";
  2125. ASSERT_OK(sst_file_writer.Open(file7));
  2126. ASSERT_OK(sst_file_writer.Put("x", "x2"));
  2127. ExternalSstFileInfo file7_info;
  2128. ASSERT_OK(sst_file_writer.Finish(&file7_info));
  2129. files.push_back(std::move(file7));
  2130. }
  2131. int num_compactions = 0;
  2132. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2133. "VersionSet::MakeInputIterator:NewCompactionMergingIterator",
  2134. [&](void* arg) {
  2135. size_t* num_input_files = static_cast<size_t*>(arg);
  2136. EXPECT_EQ(1, *num_input_files);
  2137. num_compactions += 1;
  2138. if (num_compactions == 2) {
  2139. compaction_listener->SetExpectedCompactionSkippedFileSize(
  2140. file3_info.file_size);
  2141. }
  2142. });
  2143. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2144. compaction_listener->SetExpectedCompactionSkippedFileSize(
  2145. file1_info.file_size);
  2146. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  2147. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2148. ASSERT_EQ(2, NumTableFilesAtLevel(4));
  2149. ASSERT_EQ(0, NumTableFilesAtLevel(5));
  2150. ASSERT_EQ(1, NumTableFilesAtLevel(6));
  2151. ASSERT_EQ(2, num_compactions);
  2152. ASSERT_EQ(Get("a"), "a2");
  2153. ASSERT_EQ(Get("h"), "h");
  2154. ASSERT_EQ(Get("x"), "x2");
  2155. VerifyInputFilesInternalStatsForOutputLevel(
  2156. /*output_level*/ 6,
  2157. kCompactionNumInputFiles - kCompactionNumInputFilesAtOutputLevel,
  2158. kCompactionNumInputFilesAtOutputLevel,
  2159. kCompactionNumFilteredInputFiles -
  2160. kCompactionNumFilteredInputFilesAtOutputLevel,
  2161. kCompactionNumFilteredInputFilesAtOutputLevel,
  2162. /*bytes_skipped_non_output_levels*/ 0,
  2163. /*bytes_skipped_output_level*/ file1_info.file_size +
  2164. file3_info.file_size);
  2165. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2166. }
  2167. TEST_F(ExternalSSTFileBasicTest, StandaloneRangeDeletionEndKeyIsExclusive) {
  2168. Options options = CurrentOptions();
  2169. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  2170. int kCompactionNumInputFiles = 2;
  2171. int kCompactionNumInputFilesAtOutputLevel = 1;
  2172. int kCompactionNumFilteredInputFiles = 0;
  2173. int kCompactionNumFilteredInputFilesAtOutputLevel = 0;
  2174. auto compaction_listener =
  2175. std::make_shared<CompactionJobStatsCheckerForFilteredFiles>(
  2176. kCompactionNumInputFiles, kCompactionNumInputFilesAtOutputLevel,
  2177. kCompactionNumFilteredInputFiles,
  2178. kCompactionNumFilteredInputFilesAtOutputLevel);
  2179. options.listeners.push_back(compaction_listener);
  2180. // No compaction input files are filtered because the range deletion file's
  2181. // end is exclusive, so it cannot cover the whole file.
  2182. compaction_listener->SetExpectedCompactionSkippedFileSize(0);
  2183. DestroyAndReopen(options);
  2184. std::vector<std::string> files;
  2185. {
  2186. SstFileWriter sst_file_writer(EnvOptions(), options);
  2187. std::string file1 = sst_files_dir_ + "file1.sst";
  2188. ASSERT_OK(sst_file_writer.Open(file1));
  2189. ASSERT_OK(sst_file_writer.Put("a", "a"));
  2190. ASSERT_OK(sst_file_writer.Put("b", "b"));
  2191. ExternalSstFileInfo file1_info;
  2192. ASSERT_OK(sst_file_writer.Finish(&file1_info));
  2193. files.push_back(std::move(file1));
  2194. }
  2195. IngestExternalFileOptions ifo;
  2196. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  2197. ASSERT_EQ(Get("a"), "a");
  2198. ASSERT_EQ(Get("b"), "b");
  2199. ASSERT_EQ(1, NumTableFilesAtLevel(6));
  2200. {
  2201. // A standalone range deletion with its exclusive end matching the range end
  2202. // of file doesn't fully delete it.
  2203. files.clear();
  2204. SstFileWriter sst_file_writer(EnvOptions(), options);
  2205. std::string file2 = sst_files_dir_ + "file2.sst";
  2206. ASSERT_OK(sst_file_writer.Open(file2));
  2207. ASSERT_OK(sst_file_writer.DeleteRange("a", "b"));
  2208. ExternalSstFileInfo file2_info;
  2209. ASSERT_OK(sst_file_writer.Finish(&file2_info));
  2210. files.push_back(std::move(file2));
  2211. }
  2212. bool compaction_iter_input_checked = false;
  2213. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2214. "VersionSet::MakeInputIterator:NewCompactionMergingIterator",
  2215. [&](void* arg) {
  2216. size_t* num_input_files = static_cast<size_t*>(arg);
  2217. // Standalone range deletion file for ["a", "b") + file with ["a", "b"].
  2218. EXPECT_EQ(2, *num_input_files);
  2219. compaction_iter_input_checked = true;
  2220. });
  2221. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2222. ASSERT_OK(db_->IngestExternalFile(files, ifo));
  2223. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2224. ASSERT_EQ(0, NumTableFilesAtLevel(4));
  2225. ASSERT_EQ(0, NumTableFilesAtLevel(5));
  2226. ASSERT_EQ(1, NumTableFilesAtLevel(6));
  2227. ASSERT_TRUE(compaction_iter_input_checked);
  2228. ASSERT_EQ(Get("a"), "NOT_FOUND");
  2229. ASSERT_EQ(Get("b"), "b");
  2230. VerifyInputFilesInternalStatsForOutputLevel(
  2231. /*output_level*/ 6,
  2232. kCompactionNumInputFiles - kCompactionNumInputFilesAtOutputLevel,
  2233. kCompactionNumInputFilesAtOutputLevel,
  2234. kCompactionNumFilteredInputFiles -
  2235. kCompactionNumFilteredInputFilesAtOutputLevel,
  2236. kCompactionNumFilteredInputFilesAtOutputLevel,
  2237. /*bytes_skipped_non_output_levels*/ 0,
  2238. /*bytes_skipped_output_level*/ 0);
  2239. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2240. }
  2241. TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
  2242. // Repro https://github.com/facebook/rocksdb/issues/6245.
  2243. // Flush three files to L0. Ingest one more file to trigger L0->L1 compaction
  2244. // via trivial move. The bug happened when L1 files were incorrectly sorted
  2245. // resulting in an old value for "k" returned by `Get()`.
  2246. Options options = CurrentOptions();
  2247. ASSERT_OK(Put("k", "a"));
  2248. ASSERT_OK(Flush());
  2249. ASSERT_OK(Put("k", "a"));
  2250. ASSERT_OK(Flush());
  2251. ASSERT_OK(Put("k", "a"));
  2252. ASSERT_OK(Flush());
  2253. SstFileWriter sst_file_writer(EnvOptions(), options);
  2254. // Current file size should be 0 after sst_file_writer init and before open a
  2255. // file.
  2256. ASSERT_EQ(sst_file_writer.FileSize(), 0);
  2257. std::string file1 = sst_files_dir_ + "file1.sst";
  2258. ASSERT_OK(sst_file_writer.Open(file1));
  2259. ASSERT_OK(sst_file_writer.Put("k", "b"));
  2260. ExternalSstFileInfo file1_info;
  2261. Status s = sst_file_writer.Finish(&file1_info);
  2262. ASSERT_OK(s) << s.ToString();
  2263. // Current file size should be non-zero after success write.
  2264. ASSERT_GT(sst_file_writer.FileSize(), 0);
  2265. IngestExternalFileOptions ifo;
  2266. s = db_->IngestExternalFile({file1}, ifo);
  2267. ASSERT_OK(s);
  2268. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  2269. ASSERT_EQ(Get("k"), "b");
  2270. }
  2271. TEST_F(ExternalSSTFileBasicTest, IngestWithTemperature) {
  2272. // Rather than doubling the running time of this test, this boolean
  2273. // field gets a random starting value and then alternates between
  2274. // true and false.
  2275. bool alternate_hint = Random::GetTLSInstance()->OneIn(2);
  2276. Destroy(CurrentOptions());
  2277. for (std::string mode : {"ingest_behind", "fail_if_not", "neither"}) {
  2278. SCOPED_TRACE("Mode: " + mode);
  2279. Options options = CurrentOptions();
  2280. auto test_fs =
  2281. std::make_shared<FileTemperatureTestFS>(options.env->GetFileSystem());
  2282. std::unique_ptr<Env> env(new CompositeEnvWrapper(options.env, test_fs));
  2283. options.env = env.get();
  2284. const ImmutableCFOptions ioptions(options);
  2285. options.last_level_temperature = Temperature::kCold;
  2286. options.default_write_temperature = Temperature::kHot;
  2287. SstFileWriter sst_file_writer(EnvOptions(), options);
  2288. options.level0_file_num_compaction_trigger = 2;
  2289. bool cf_option = Random::GetTLSInstance()->OneIn(2);
  2290. SCOPED_TRACE(std::string("Use ") + (cf_option ? "CF" : "DB") +
  2291. " option for ingest behind");
  2292. if (cf_option) {
  2293. options.cf_allow_ingest_behind = (mode == "ingest_behind");
  2294. } else {
  2295. options.allow_ingest_behind = (mode == "ingest_behind");
  2296. }
  2297. Reopen(options);
  2298. Defer destroyer([&]() { Destroy(options); });
  2299. #define VERIFY_SST_COUNT(temp, expected_count_in_db, \
  2300. expected_count_outside_db) \
  2301. { \
  2302. /* Partially verify against FileSystem */ \
  2303. ASSERT_EQ( \
  2304. test_fs->CountCurrentSstFilesWithTemperature(temp), \
  2305. size_t{expected_count_in_db} + size_t{expected_count_outside_db}); \
  2306. /* Partially verify against DB manifest */ \
  2307. if (expected_count_in_db == 0) { \
  2308. ASSERT_EQ(GetSstSizeHelper(temp), 0); \
  2309. } else { \
  2310. ASSERT_GE(GetSstSizeHelper(temp), 1); \
  2311. } \
  2312. }
  2313. size_t ex_unknown_in_db = 0;
  2314. size_t ex_hot_in_db = 0;
  2315. size_t ex_warm_in_db = 0;
  2316. size_t ex_cold_in_db = 0;
  2317. size_t ex_unknown_outside_db = 0;
  2318. size_t ex_hot_outside_db = 0;
  2319. size_t ex_warm_outside_db = 0;
  2320. size_t ex_cold_outside_db = 0;
  2321. #define VERIFY_SST_COUNTS() \
  2322. { \
  2323. VERIFY_SST_COUNT(Temperature::kUnknown, ex_unknown_in_db, \
  2324. ex_unknown_outside_db); \
  2325. VERIFY_SST_COUNT(Temperature::kHot, ex_hot_in_db, ex_hot_outside_db); \
  2326. VERIFY_SST_COUNT(Temperature::kWarm, ex_warm_in_db, ex_warm_outside_db); \
  2327. VERIFY_SST_COUNT(Temperature::kCold, ex_cold_in_db, ex_cold_outside_db); \
  2328. }
  2329. // Create sst file, using a name recognized by FileTemperatureTestFS and
  2330. // specified temperature
  2331. std::string file1 = sst_files_dir_ + "9000000.sst";
  2332. ASSERT_OK(sst_file_writer.Open(file1, Temperature::kWarm));
  2333. for (int k = 1000; k < 1100; k++) {
  2334. ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
  2335. }
  2336. ExternalSstFileInfo file1_info;
  2337. Status s = sst_file_writer.Finish(&file1_info);
  2338. ASSERT_OK(s);
  2339. ex_warm_outside_db++;
  2340. VERIFY_SST_COUNTS();
  2341. ASSERT_EQ(file1_info.file_path, file1);
  2342. ASSERT_EQ(file1_info.num_entries, 100);
  2343. ASSERT_EQ(file1_info.smallest_key, Key(1000));
  2344. ASSERT_EQ(file1_info.largest_key, Key(1099));
  2345. std::vector<std::string> files;
  2346. std::vector<std::string> files_checksums;
  2347. std::vector<std::string> files_checksum_func_names;
  2348. files.push_back(file1);
  2349. IngestExternalFileOptions in_opts;
  2350. in_opts.move_files = false;
  2351. in_opts.snapshot_consistency = true;
  2352. in_opts.allow_global_seqno = false;
  2353. in_opts.allow_blocking_flush = false;
  2354. in_opts.write_global_seqno = true;
  2355. in_opts.verify_file_checksum = false;
  2356. in_opts.ingest_behind = (mode == "ingest_behind");
  2357. in_opts.fail_if_not_bottommost_level = (mode == "fail_if_not");
  2358. IngestExternalFileArg arg;
  2359. arg.column_family = db_->DefaultColumnFamily();
  2360. arg.external_files = files;
  2361. arg.options = in_opts;
  2362. arg.files_checksums = files_checksums;
  2363. arg.files_checksum_func_names = files_checksum_func_names;
  2364. alternate_hint = !alternate_hint;
  2365. if (alternate_hint) {
  2366. // Provide correct hint (for optimal file open performance)
  2367. arg.file_temperature = Temperature::kWarm;
  2368. } else {
  2369. // No hint (also works because ingestion will read the temperature
  2370. // according to storage)
  2371. arg.file_temperature = Temperature::kUnknown;
  2372. }
  2373. s = db_->IngestExternalFiles({arg});
  2374. ASSERT_OK(s);
  2375. // check the temperature of the file ingested (copied)
  2376. ColumnFamilyMetaData metadata;
  2377. db_->GetColumnFamilyMetaData(&metadata);
  2378. ASSERT_EQ(1, metadata.file_count);
  2379. if (mode != "neither") {
  2380. ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
  2381. ex_cold_in_db++;
  2382. } else {
  2383. // Currently, we are only able to use last_level_temperature for ingestion
  2384. // when using an ingestion option that guarantees ingestion to last level.
  2385. ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
  2386. ex_hot_in_db++;
  2387. }
  2388. VERIFY_SST_COUNTS();
  2389. // non-bottommost file still has kHot temperature
  2390. ASSERT_OK(Put("foo", "bar"));
  2391. ASSERT_OK(Put("bar", "bar"));
  2392. ASSERT_OK(Flush());
  2393. db_->GetColumnFamilyMetaData(&metadata);
  2394. ASSERT_EQ(2, metadata.file_count);
  2395. ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);
  2396. ex_hot_in_db++;
  2397. VERIFY_SST_COUNTS();
  2398. // reopen and check the information is persisted
  2399. Reopen(options);
  2400. db_->GetColumnFamilyMetaData(&metadata);
  2401. ASSERT_EQ(2, metadata.file_count);
  2402. ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);
  2403. if (mode != "neither") {
  2404. ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
  2405. } else {
  2406. ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
  2407. }
  2408. // (no change)
  2409. VERIFY_SST_COUNTS();
  2410. // check invalid temperature with DB property. Not sure why the original
  2411. // author is testing this case, but perhaps so that downgrading DB with
  2412. // new GetProperty code using a new Temperature will report something
  2413. // reasonable and not an error.
  2414. std::string prop;
  2415. ASSERT_TRUE(dbfull()->GetProperty(
  2416. DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
  2417. &prop));
  2418. ASSERT_EQ(std::atoi(prop.c_str()), 0);
  2419. #undef VERIFY_SST_COUNT
  2420. }
  2421. }
  2422. // This tests an internal user's exact usage and expectation of the
  2423. // IngestExternalFiles APIs to bulk load and replace files.
  2424. TEST_F(ExternalSSTFileBasicTest,
  2425. AtomicReplaceColumnFamilyWithIngestedVersionKey) {
  2426. Options options = GetDefaultOptions();
  2427. options.create_if_missing = true;
  2428. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  2429. options.num_levels = 7;
  2430. options.disallow_memtable_writes = false;
  2431. DestroyAndReopen(options);
  2432. SstFileWriter sst_file_writer(EnvOptions(), options);
  2433. std::string data_file_original = sst_files_dir_ + "data_original";
  2434. ASSERT_OK(sst_file_writer.Open(data_file_original));
  2435. ASSERT_OK(sst_file_writer.Put("ukey1", "uval1_orig"));
  2436. ASSERT_OK(sst_file_writer.Put("ukey2", "uval2_orig"));
  2437. ASSERT_OK(sst_file_writer.Finish());
  2438. ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(),
  2439. {data_file_original},
  2440. IngestExternalFileOptions()));
  2441. ASSERT_OK(Put("data_version", "v_original"));
  2442. ASSERT_OK(Flush());
  2443. std::string value;
  2444. ASSERT_OK(db_->Get(ReadOptions(), "data_version", &value));
  2445. ASSERT_EQ(value, "v_original");
  2446. ASSERT_OK(db_->Get(ReadOptions(), "ukey1", &value));
  2447. ASSERT_EQ(value, "uval1_orig");
  2448. ASSERT_OK(db_->Get(ReadOptions(), "ukey2", &value));
  2449. ASSERT_EQ(value, "uval2_orig");
  2450. // Set up a 1) data version key file on L0, and 2) a user data file on L6
  2451. // to test the initial transitioning to use `atomic_replace_range`.
  2452. ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
  2453. // Test multiple cycles of replacing by atomically ingest a data file and a
  2454. // version key file while replace the whole range in the column family.
  2455. for (int i = 0; i < 10; i++) {
  2456. std::string version_file_path =
  2457. sst_files_dir_ + "version" + std::to_string(i);
  2458. ASSERT_OK(sst_file_writer.Open(version_file_path));
  2459. ASSERT_OK(sst_file_writer.Put("data_version", "v" + std::to_string(i)));
  2460. ASSERT_OK(sst_file_writer.Finish());
  2461. std::string file_path = sst_files_dir_ + std::to_string(i);
  2462. ASSERT_OK(sst_file_writer.Open(file_path));
  2463. ASSERT_OK(sst_file_writer.Put("ukey1", "uval1" + std::to_string(i)));
  2464. ASSERT_OK(sst_file_writer.Put("ukey2", "uval2" + std::to_string(i)));
  2465. ASSERT_OK(sst_file_writer.Finish());
  2466. IngestExternalFileArg arg;
  2467. arg.column_family = db_->DefaultColumnFamily();
  2468. arg.external_files = {version_file_path, file_path};
  2469. arg.atomic_replace_range = {{nullptr, nullptr}};
  2470. // Test both fail_if_not_bottomost_level: true and false
  2471. arg.options.fail_if_not_bottommost_level = i % 2 == 0;
  2472. arg.options.snapshot_consistency = false;
  2473. // Ingest 1) a new data version file and 2) a new user data file while erase
  2474. // the whole column family
  2475. Status s = db_->IngestExternalFiles({arg});
  2476. ASSERT_OK(s);
  2477. // Check ingestion result and the expected LSM shape:
  2478. // Two files on L6, 1) a data version file 2) a user data file.
  2479. ASSERT_OK(db_->Get(ReadOptions(), "ukey1", &value));
  2480. ASSERT_EQ(value, "uval1" + std::to_string(i));
  2481. ASSERT_OK(db_->Get(ReadOptions(), "ukey2", &value));
  2482. ASSERT_EQ(value, "uval2" + std::to_string(i));
  2483. ASSERT_OK(db_->Get(ReadOptions(), "data_version", &value));
  2484. ASSERT_EQ(value, "v" + std::to_string(i));
  2485. ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
  2486. }
  2487. Close();
  2488. }
  2489. TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevelAndDisallowMemtable) {
  2490. for (bool disallow_memtable : {false, true}) {
  2491. Options options = GetDefaultOptions();
  2492. // First test with universal compaction
  2493. options.create_if_missing = true;
  2494. options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
  2495. DestroyAndReopen(options);
  2496. // And a CF potentially disallowing memtable write
  2497. options.disallow_memtable_writes = disallow_memtable;
  2498. CreateColumnFamilies({"cf0"}, options);
  2499. ASSERT_EQ(db_->GetOptions(handles_[0]).disallow_memtable_writes,
  2500. disallow_memtable);
  2501. // Ingest with snapshot consistency
  2502. std::string file_path = sst_files_dir_ + std::to_string(1);
  2503. std::string file_path2 = sst_files_dir_ + std::to_string(2);
  2504. SstFileWriter sfw(EnvOptions(), options);
  2505. ASSERT_OK(sfw.Open(file_path));
  2506. ASSERT_OK(sfw.Put("b", "0"));
  2507. ASSERT_OK(sfw.Finish());
  2508. {
  2509. const Snapshot* snapshot = db_->GetSnapshot();
  2510. ManagedSnapshot snapshot_guard(db_, snapshot);
  2511. IngestExternalFileOptions ifo;
  2512. ifo.fail_if_not_bottommost_level = true;
  2513. ifo.snapshot_consistency = true;
  2514. ASSERT_OK(db_->IngestExternalFile(handles_[0], {file_path}, ifo));
  2515. }
  2516. ASSERT_EQ(Get(0, "b"), "0");
  2517. // Test level compaction
  2518. options.compaction_style = CompactionStyle::kCompactionStyleLevel;
  2519. options.num_levels = 2;
  2520. CreateColumnFamilies({"cf1"}, options);
  2521. ASSERT_EQ(db_->GetOptions(handles_[1]).disallow_memtable_writes,
  2522. disallow_memtable);
  2523. if (!disallow_memtable) {
  2524. ASSERT_OK(Put(1, "a", "1"));
  2525. ASSERT_OK(Put(1, "c", "3"));
  2526. ASSERT_OK(Flush(1));
  2527. ASSERT_OK(Put(1, "b", "2"));
  2528. ASSERT_OK(Put(1, "d", "4"));
  2529. ASSERT_OK(Flush(1));
  2530. } else {
  2531. // Memtable write disallowed
  2532. EXPECT_EQ(Put(1, "a", "1").code(), Status::Code::kInvalidArgument);
  2533. // Use ingestion to get to the same state as above
  2534. ASSERT_OK(sfw.Open(file_path2));
  2535. ASSERT_OK(sfw.Put("a", "1"));
  2536. ASSERT_OK(sfw.Put("c", "3"));
  2537. ASSERT_OK(sfw.Finish());
  2538. ASSERT_OK(db_->IngestExternalFile(handles_[1], {file_path2}, {}));
  2539. ASSERT_OK(sfw.Open(file_path2));
  2540. ASSERT_OK(sfw.Put("b", "2"));
  2541. ASSERT_OK(sfw.Put("d", "4"));
  2542. ASSERT_OK(sfw.Finish());
  2543. ASSERT_OK(db_->IngestExternalFile(handles_[1], {file_path2}, {}));
  2544. }
  2545. ASSERT_EQ(Get(1, "a"), "1");
  2546. ASSERT_EQ(Get(1, "b"), "2");
  2547. ASSERT_EQ(Get(1, "c"), "3");
  2548. ASSERT_EQ(Get(1, "d"), "4");
  2549. {
  2550. // Test fail_if_not_bottommost_level, which fails if there's any overlap
  2551. // anywhere, even with snapshot_consistency=false
  2552. IngestExternalFileOptions ifo;
  2553. ASSERT_FALSE(ifo.fail_if_not_bottommost_level);
  2554. ifo.fail_if_not_bottommost_level = true;
  2555. ifo.snapshot_consistency = false;
  2556. // Fails with overlap on earlier level
  2557. Status s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
  2558. ASSERT_EQ(s.code(), Status::Code::kTryAgain);
  2559. CompactRangeOptions cro;
  2560. cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
  2561. ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
  2562. // Fails with overlap on last level
  2563. s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
  2564. ASSERT_EQ(s.code(), Status::Code::kTryAgain);
  2565. // No change to data
  2566. ASSERT_EQ(Get(1, "a"), "1");
  2567. ASSERT_EQ(Get(1, "b"), "2");
  2568. ASSERT_EQ(Get(1, "c"), "3");
  2569. ASSERT_EQ(Get(1, "d"), "4");
  2570. }
  2571. if (!disallow_memtable) {
  2572. // Test allow_blocking_flush=false (fail because of memtable overlap)
  2573. IngestExternalFileOptions ifo;
  2574. ASSERT_TRUE(ifo.allow_blocking_flush);
  2575. ifo.allow_blocking_flush = false;
  2576. ASSERT_OK(Put(1, "b", "42"));
  2577. Status s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
  2578. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2579. ASSERT_EQ(Get(1, "a"), "1");
  2580. ASSERT_EQ(Get(1, "b"), "42");
  2581. ASSERT_EQ(Get(1, "c"), "3");
  2582. ASSERT_EQ(Get(1, "d"), "4");
  2583. // Revert state
  2584. ASSERT_OK(Put(1, "b", "2"));
  2585. ASSERT_OK(Flush(1));
  2586. }
  2587. {
  2588. // Test atomic_replace_range
  2589. IngestExternalFileArg arg;
  2590. arg.column_family = handles_[1];
  2591. arg.external_files = {file_path};
  2592. arg.atomic_replace_range = {{"a", "zzz"}};
  2593. // start with some failure cases
  2594. // TODO: support snapshot consistency with tombstone file
  2595. ASSERT_TRUE(arg.options.snapshot_consistency);
  2596. Status s = db_->IngestExternalFiles({arg});
  2597. ASSERT_EQ(s.code(), Status::Code::kNotSupported);
  2598. ASSERT_EQ(Get(1, "a"), "1");
  2599. ASSERT_EQ(Get(1, "b"), "2");
  2600. ASSERT_EQ(Get(1, "c"), "3");
  2601. ASSERT_EQ(Get(1, "d"), "4");
  2602. arg.options.snapshot_consistency = false;
  2603. // Can usually be used with atomic_replace_range and
  2604. // snapshot_consistency=false, except it requires no input overlap
  2605. arg.options.fail_if_not_bottommost_level = true;
  2606. // one-sided ranges not yet supported
  2607. arg.atomic_replace_range = {{{}, "zzz"}};
  2608. s = db_->IngestExternalFiles({arg});
  2609. ASSERT_EQ(s.code(), Status::Code::kNotSupported);
  2610. arg.atomic_replace_range = {{"a", {}}};
  2611. s = db_->IngestExternalFiles({arg});
  2612. ASSERT_EQ(s.code(), Status::Code::kNotSupported);
  2613. // rejected because doesn't cover ingested file
  2614. arg.atomic_replace_range = {{"x", "z"}};
  2615. s = db_->IngestExternalFiles({arg});
  2616. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2617. // rejected because of partial file overlap
  2618. arg.atomic_replace_range = {{"a", "c"}};
  2619. s = db_->IngestExternalFiles({arg});
  2620. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2621. if (!disallow_memtable) {
  2622. // memtable overlap with replace range
  2623. ASSERT_OK(Put(1, "e", "5"));
  2624. arg.options.allow_blocking_flush = false;
  2625. // rejected because of memtable overlap
  2626. arg.atomic_replace_range = {{"a", "z"}};
  2627. s = db_->IngestExternalFiles({arg});
  2628. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2629. // rejected because of memtable overlap
  2630. arg.atomic_replace_range = {{nullptr, nullptr}};
  2631. s = db_->IngestExternalFiles({arg});
  2632. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2633. // FIXME: upper bound should be exclusive (DeleteRange semantics).
  2634. // currently rejected because of documented bug
  2635. arg.atomic_replace_range = {{"a", "e"}};
  2636. s = db_->IngestExternalFiles({arg});
  2637. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2638. // work-around ensuring no memtable overlap
  2639. arg.atomic_replace_range = {{"a", "d2"}};
  2640. ASSERT_OK(db_->IngestExternalFiles({arg}));
  2641. ASSERT_EQ(Get(1, "e"), "5");
  2642. } else {
  2643. // rejected because of partial file overlap
  2644. arg.atomic_replace_range = {{"b", "z"}};
  2645. s = db_->IngestExternalFiles({arg});
  2646. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2647. // no memtable complications
  2648. arg.atomic_replace_range = {{"a", "z"}};
  2649. ASSERT_OK(db_->IngestExternalFiles({arg}));
  2650. ASSERT_EQ(Get(1, "e"), "NOT_FOUND");
  2651. }
  2652. ASSERT_EQ(Get(1, "a"), "NOT_FOUND");
  2653. ASSERT_EQ(Get(1, "b"), "0");
  2654. ASSERT_EQ(Get(1, "c"), "NOT_FOUND");
  2655. ASSERT_EQ(Get(1, "d"), "NOT_FOUND");
  2656. // The single ingested file replaced everything (except perhaps memtable)
  2657. std::vector<LiveFileMetaData> live_files;
  2658. db_->GetLiveFilesMetaData(&live_files);
  2659. // One file in each CF
  2660. ASSERT_EQ(live_files.size(), 2);
  2661. ASSERT_OK(sfw.Open(file_path));
  2662. ASSERT_OK(sfw.Put("f", "6"));
  2663. ASSERT_OK(sfw.Finish());
  2664. // Another file
  2665. ASSERT_OK(sfw.Open(file_path2));
  2666. ASSERT_OK(sfw.Put("f", "7"));
  2667. ASSERT_OK(sfw.Put("g", "8"));
  2668. ASSERT_OK(sfw.Finish());
  2669. if (!disallow_memtable) {
  2670. // rejected because of memtable overlap with range
  2671. arg.atomic_replace_range = {{"e", "z"}};
  2672. s = db_->IngestExternalFiles({arg});
  2673. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2674. // allow blocking flush of "e" (which is then replaced), and the file
  2675. // with just "b" is not replaced
  2676. arg.options.allow_blocking_flush = true;
  2677. ASSERT_OK(db_->IngestExternalFiles({arg}));
  2678. ASSERT_EQ(Get(1, "b"), "0");
  2679. ASSERT_EQ(Get(1, "e"), "NOT_FOUND");
  2680. ASSERT_EQ(Get(1, "f"), "6");
  2681. ASSERT_EQ(Get(1, "g"), "NOT_FOUND");
  2682. // memtable overlap with replace range
  2683. ASSERT_OK(Put(1, "e", "5"));
  2684. arg.options.allow_blocking_flush = false;
  2685. arg.external_files = {file_path2};
  2686. // rejected because of memtable overlap
  2687. arg.atomic_replace_range = {{nullptr, nullptr}};
  2688. s = db_->IngestExternalFiles({arg});
  2689. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2690. // Replace everything, including with memtable flush
  2691. arg.options.allow_blocking_flush = true;
  2692. ASSERT_OK(db_->IngestExternalFiles({arg}));
  2693. ASSERT_EQ(Get(1, "b"), "NOT_FOUND");
  2694. ASSERT_EQ(Get(1, "e"), "NOT_FOUND");
  2695. ASSERT_EQ(Get(1, "f"), "7");
  2696. ASSERT_EQ(Get(1, "g"), "8");
  2697. } else {
  2698. arg.external_files = {file_path2, file_path};
  2699. // rejected because of overlap in files to ingest with fail_if_ = true
  2700. arg.atomic_replace_range = {{"e", "z"}};
  2701. s = db_->IngestExternalFiles({arg});
  2702. ASSERT_EQ(s.code(), Status::Code::kTryAgain);
  2703. arg.options.fail_if_not_bottommost_level = false;
  2704. // rejected because range doesn't cover ingested files
  2705. // FIXME: upper bound should be exclusive "g" instead
  2706. arg.atomic_replace_range = {{"e", "f2"}};
  2707. s = db_->IngestExternalFiles({arg});
  2708. ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);
  2709. // Loaded into different levels, and the file with just "b" is not
  2710. // replaced
  2711. arg.atomic_replace_range = {{"e", "z"}};
  2712. ASSERT_OK(db_->IngestExternalFiles({arg}));
  2713. ASSERT_EQ(Get(1, "b"), "0");
  2714. ASSERT_EQ(Get(1, "f"), "6"); // earlier file listed later to ingest
  2715. ASSERT_EQ(Get(1, "g"), "8");
  2716. }
  2717. }
  2718. }
  2719. }
  2720. TEST_F(ExternalSSTFileBasicTest, VerifyChecksum) {
  2721. const std::string kPutVal = "put_val";
  2722. const std::string kIngestedVal = "ingested_val";
  2723. ASSERT_OK(Put("k", kPutVal, WriteOptions()));
  2724. ASSERT_OK(Flush());
  2725. std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
  2726. {
  2727. SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
  2728. ASSERT_OK(sst_file_writer.Open(external_file));
  2729. ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
  2730. ASSERT_OK(sst_file_writer.Finish());
  2731. }
  2732. ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
  2733. IngestExternalFileOptions()));
  2734. ASSERT_OK(db_->VerifyChecksum());
  2735. }
  2736. TEST_F(ExternalSSTFileBasicTest, VerifySstUniqueId) {
  2737. const std::string kPutVal = "put_val";
  2738. const std::string kIngestedVal = "ingested_val";
  2739. ASSERT_OK(Put("k", kPutVal, WriteOptions()));
  2740. ASSERT_OK(Flush());
  2741. std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
  2742. {
  2743. SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
  2744. ASSERT_OK(sst_file_writer.Open(external_file));
  2745. ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
  2746. ASSERT_OK(sst_file_writer.Finish());
  2747. }
  2748. ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
  2749. IngestExternalFileOptions()));
  2750. // Test ingest file without session_id and db_id (for example generated by an
  2751. // older version of sst_writer)
  2752. SyncPoint::GetInstance()->SetCallBack(
  2753. "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
  2754. auto props = static_cast<TableProperties*>(props_vs);
  2755. // update table property session_id to a different one
  2756. props->db_session_id = "";
  2757. props->db_id = "";
  2758. });
  2759. std::atomic_int skipped = 0, passed = 0;
  2760. SyncPoint::GetInstance()->SetCallBack(
  2761. "BlockBasedTable::Open::SkippedVerifyUniqueId",
  2762. [&](void* /*arg*/) { skipped++; });
  2763. SyncPoint::GetInstance()->SetCallBack(
  2764. "BlockBasedTable::Open::PassedVerifyUniqueId",
  2765. [&](void* /*arg*/) { passed++; });
  2766. SyncPoint::GetInstance()->EnableProcessing();
  2767. auto options = CurrentOptions();
  2768. ASSERT_TRUE(options.verify_sst_unique_id_in_manifest);
  2769. Reopen(options);
  2770. ASSERT_EQ(skipped, 0);
  2771. ASSERT_EQ(passed, 2); // one flushed + one ingested
  2772. external_file = sst_files_dir_ + "/file_to_ingest2.sst";
  2773. {
  2774. SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
  2775. ASSERT_OK(sst_file_writer.Open(external_file));
  2776. ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
  2777. ASSERT_OK(sst_file_writer.Finish());
  2778. }
  2779. ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
  2780. IngestExternalFileOptions()));
  2781. // Two table file opens skipping verification:
  2782. // * ExternalSstFileIngestionJob::GetIngestedFileInfo
  2783. // * TableCache::GetTableReader
  2784. ASSERT_EQ(skipped, 2);
  2785. ASSERT_EQ(passed, 2);
  2786. // Check same after re-open (except no GetIngestedFileInfo)
  2787. skipped = 0;
  2788. passed = 0;
  2789. Reopen(options);
  2790. ASSERT_EQ(skipped, 1);
  2791. ASSERT_EQ(passed, 2);
  2792. }
  2793. TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) {
  2794. const std::string kPutVal = "put_val";
  2795. const std::string kIngestedVal = "ingested_val";
  2796. ASSERT_OK(Put("k", kPutVal, WriteOptions()));
  2797. ASSERT_OK(Flush());
  2798. std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
  2799. {
  2800. SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
  2801. ASSERT_OK(sst_file_writer.Open(external_file));
  2802. ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
  2803. ASSERT_OK(sst_file_writer.Finish());
  2804. }
  2805. const Snapshot* snapshot = nullptr;
  2806. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2807. "VersionSet::LogAndApply:WriteManifest", [&](void* /* arg */) {
  2808. // prevent background compaction job to call this callback
  2809. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  2810. snapshot = db_->GetSnapshot();
  2811. ReadOptions read_opts;
  2812. read_opts.snapshot = snapshot;
  2813. std::string value;
  2814. ASSERT_OK(db_->Get(read_opts, "k", &value));
  2815. ASSERT_EQ(kPutVal, value);
  2816. });
  2817. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2818. ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
  2819. IngestExternalFileOptions()));
  2820. auto ingested_file_seqno = db_->GetLatestSequenceNumber();
  2821. ASSERT_NE(nullptr, snapshot);
  2822. // snapshot is taken before SST ingestion is done
  2823. ASSERT_EQ(ingested_file_seqno, snapshot->GetSequenceNumber() + 1);
  2824. ReadOptions read_opts;
  2825. read_opts.snapshot = snapshot;
  2826. std::string value;
  2827. ASSERT_OK(db_->Get(read_opts, "k", &value));
  2828. ASSERT_EQ(kPutVal, value);
  2829. db_->ReleaseSnapshot(snapshot);
  2830. // After reopen, sequence number should be up current such that
  2831. // ingested value is read
  2832. Reopen(CurrentOptions());
  2833. ASSERT_OK(db_->Get(ReadOptions(), "k", &value));
  2834. ASSERT_EQ(kIngestedVal, value);
  2835. // New write should get higher seqno compared to ingested file
  2836. ASSERT_OK(Put("k", kPutVal, WriteOptions()));
  2837. ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1);
  2838. }
  2839. TEST_F(ExternalSSTFileBasicTest, ConcurrentIngestionAndDropColumnFamily) {
  2840. int kNumCFs = 10;
  2841. Options options = CurrentOptions();
  2842. CreateColumnFamilies({"cf_0", "cf_1", "cf_2", "cf_3", "cf_4", "cf_5", "cf_6",
  2843. "cf_7", "cf_8", "cf_9"},
  2844. options);
  2845. IngestExternalFileArg ingest_arg;
  2846. IngestExternalFileOptions ifo;
  2847. std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
  2848. SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
  2849. ASSERT_OK(sst_file_writer.Open(external_file));
  2850. ASSERT_OK(sst_file_writer.Put("key", "value"));
  2851. ASSERT_OK(sst_file_writer.Finish());
  2852. ifo.move_files = false;
  2853. ingest_arg.external_files = {external_file};
  2854. ingest_arg.options = ifo;
  2855. std::vector<std::thread> threads;
  2856. threads.reserve(2 * kNumCFs);
  2857. std::atomic<int> success_ingestion_count = 0;
  2858. std::atomic<int> failed_ingestion_count = 0;
  2859. for (int i = 0; i < kNumCFs; i++) {
  2860. threads.emplace_back(
  2861. [this, i]() { ASSERT_OK(db_->DropColumnFamily(handles_[i])); });
  2862. threads.emplace_back([this, i, ingest_arg, &success_ingestion_count,
  2863. &failed_ingestion_count]() {
  2864. IngestExternalFileArg arg_copy = ingest_arg;
  2865. arg_copy.column_family = handles_[i];
  2866. Status s = db_->IngestExternalFiles({arg_copy});
  2867. ReadOptions ropts;
  2868. std::string value;
  2869. if (s.ok()) {
  2870. ASSERT_OK(db_->Get(ropts, handles_[i], "key", &value));
  2871. ASSERT_EQ("value", value);
  2872. success_ingestion_count.fetch_add(1);
  2873. } else {
  2874. ASSERT_TRUE(db_->Get(ropts, handles_[i], "key", &value).IsNotFound());
  2875. failed_ingestion_count.fetch_add(1);
  2876. }
  2877. });
  2878. }
  2879. for (auto& t : threads) {
  2880. t.join();
  2881. }
  2882. ASSERT_EQ(kNumCFs, success_ingestion_count + failed_ingestion_count);
  2883. Close();
  2884. }
  2885. INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
  2886. testing::Values(std::make_tuple(true, true),
  2887. std::make_tuple(true, false),
  2888. std::make_tuple(false, true),
  2889. std::make_tuple(false, false)));
  2890. } // namespace ROCKSDB_NAMESPACE
  2891. int main(int argc, char** argv) {
  2892. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2893. ::testing::InitGoogleTest(&argc, argv);
  2894. RegisterCustomObjects(argc, argv);
  2895. return RUN_ALL_TESTS();
  2896. }