db_stress_compaction_service.h 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifdef GFLAGS
  6. #pragma once
  7. #include "db/compaction/compaction_job.h"
  8. #include "db_stress_shared_state.h"
  9. #include "rocksdb/options.h"
  10. #include "utilities/fault_injection_fs.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. // Service to simulate Remote Compaction in Stress Test
  13. class DbStressCompactionService : public CompactionService {
  14. public:
  15. explicit DbStressCompactionService(SharedState* shared,
  16. bool failure_should_fall_back_to_local)
  17. : shared_(shared),
  18. aborted_(false),
  19. failure_should_fall_back_to_local_(failure_should_fall_back_to_local) {}
  20. static const char* kClassName() { return "DbStressCompactionService"; }
  21. const char* Name() const override { return kClassName(); }
  22. static constexpr uint64_t kWaitIntervalInMicros = 10 * 1000; // 10ms
  23. static constexpr const char* kTempOutputDirectoryPrefix = "tmp_output_";
  24. CompactionServiceScheduleResponse Schedule(
  25. const CompactionServiceJobInfo& info,
  26. const std::string& compaction_service_input) override {
  27. std::string job_id = info.db_id + "_" + info.db_session_id + "_" +
  28. std::to_string(info.job_id);
  29. if (aborted_.load()) {
  30. return CompactionServiceScheduleResponse(
  31. job_id, CompactionServiceJobStatus::kUseLocal);
  32. }
  33. std::string output_directory = info.db_name + "/" +
  34. kTempOutputDirectoryPrefix +
  35. Env::Default()->GenerateUniqueId();
  36. shared_->EnqueueRemoteCompaction(
  37. job_id, info, compaction_service_input, output_directory,
  38. false /* was_cancelled */); // Not canceled initially
  39. CompactionServiceScheduleResponse response(
  40. job_id, CompactionServiceJobStatus::kSuccess);
  41. return response;
  42. }
  43. CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
  44. std::string* result) override;
  45. void OnInstallation(const std::string& scheduled_job_id,
  46. CompactionServiceJobStatus /*status*/) override {
  47. // Clean up tmp directory
  48. std::string serialized;
  49. CompactionServiceResult result;
  50. if (shared_->GetRemoteCompactionResult(scheduled_job_id, &serialized)
  51. .has_value()) {
  52. if (CompactionServiceResult::Read(serialized, &result).ok()) {
  53. std::vector<std::string> filenames;
  54. Status s = Env::Default()->GetChildren(result.output_path, &filenames);
  55. for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
  56. s = Env::Default()->DeleteFile(result.output_path + "/" +
  57. filenames[i]);
  58. if (!s.ok()) {
  59. // TODO - Handle clean up failure?
  60. break;
  61. }
  62. }
  63. if (s.ok()) {
  64. Env::Default()->DeleteDir(result.output_path).PermitUncheckedError();
  65. }
  66. }
  67. shared_->RemoveRemoteCompactionResult(scheduled_job_id);
  68. }
  69. }
  70. void CancelAwaitingJobs() override { aborted_.store(true); }
  71. private:
  72. SharedState* shared_;
  73. std::atomic_bool aborted_{false};
  74. bool failure_should_fall_back_to_local_;
  75. };
  76. } // namespace ROCKSDB_NAMESPACE
  77. #endif // GFLAGS