simulated_hybrid_file_system.cc 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "tools/simulated_hybrid_file_system.h"
  6. #include <algorithm>
  7. #include <sstream>
  8. #include <string>
  9. #include "rocksdb/rate_limiter.h"
  10. #include "util/stop_watch.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. const int64_t kUsPerSec = 1000000;
  13. const int64_t kDummyBytesPerUs = 1024;
  14. namespace {
  15. // From bytes to read/write, calculate service time needed by an HDD.
  16. // This is used to simulate latency from HDD.
  17. int CalculateServeTimeUs(size_t bytes) {
  18. return 12200 + static_cast<int>(static_cast<double>(bytes) * 0.005215);
  19. }
  20. // There is a bug in rater limiter that would crash with small requests
  21. // Hack to get it around.
  22. void RateLimiterRequest(RateLimiter* rater_limiter, int64_t amount) {
  23. int64_t left = amount * kDummyBytesPerUs;
  24. const int64_t kMaxToRequest = kDummyBytesPerUs * kUsPerSec / 1024;
  25. while (left > 0) {
  26. int64_t to_request = std::min(kMaxToRequest, left);
  27. rater_limiter->Request(to_request, Env::IOPriority::IO_LOW, nullptr);
  28. left -= to_request;
  29. }
  30. }
  31. } // namespace
  32. // The metadata file format: each line is a full filename of a file which is
  33. // warm
  34. SimulatedHybridFileSystem::SimulatedHybridFileSystem(
  35. const std::shared_ptr<FileSystem>& base,
  36. const std::string& metadata_file_name, int throughput_multiplier,
  37. bool is_full_fs_warm)
  38. : FileSystemWrapper(base),
  39. // Limit to 100 requests per second.
  40. rate_limiter_(NewGenericRateLimiter(
  41. int64_t{throughput_multiplier} * kDummyBytesPerUs *
  42. kUsPerSec /* rate_bytes_per_sec */,
  43. 1000 /* refill_period_us */)),
  44. metadata_file_name_(metadata_file_name),
  45. name_("SimulatedHybridFileSystem: " + std::string(target()->Name())),
  46. is_full_fs_warm_(is_full_fs_warm) {
  47. IOStatus s = base->FileExists(metadata_file_name, IOOptions(), nullptr);
  48. if (s.IsNotFound()) {
  49. return;
  50. }
  51. std::string metadata;
  52. s = ReadFileToString(base.get(), metadata_file_name, &metadata);
  53. if (!s.ok()) {
  54. fprintf(stderr, "Error reading from file %s: %s",
  55. metadata_file_name.c_str(), s.ToString().c_str());
  56. // Exit rather than assert as this file system is built to run with
  57. // benchmarks, which usually run on release mode.
  58. std::exit(1);
  59. }
  60. std::istringstream input;
  61. input.str(metadata);
  62. std::string line;
  63. while (std::getline(input, line)) {
  64. fprintf(stderr, "Warm file %s\n", line.c_str());
  65. warm_file_set_.insert(line);
  66. }
  67. }
  68. // Need to write out the metadata file to file. See comment of
  69. // SimulatedHybridFileSystem::SimulatedHybridFileSystem() for format of the
  70. // file.
  71. SimulatedHybridFileSystem::~SimulatedHybridFileSystem() {
  72. if (metadata_file_name_.empty()) {
  73. return;
  74. }
  75. std::string metadata;
  76. for (const auto& f : warm_file_set_) {
  77. metadata += f;
  78. metadata += "\n";
  79. }
  80. IOOptions opts;
  81. IOStatus s =
  82. WriteStringToFile(target(), metadata, metadata_file_name_, true, opts);
  83. if (!s.ok()) {
  84. fprintf(stderr, "Error writing to file %s: %s", metadata_file_name_.c_str(),
  85. s.ToString().c_str());
  86. }
  87. }
  88. IOStatus SimulatedHybridFileSystem::NewRandomAccessFile(
  89. const std::string& fname, const FileOptions& file_opts,
  90. std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
  91. Temperature temperature = Temperature::kUnknown;
  92. if (is_full_fs_warm_) {
  93. temperature = Temperature::kWarm;
  94. } else {
  95. const std::lock_guard<std::mutex> lock(mutex_);
  96. if (warm_file_set_.find(fname) != warm_file_set_.end()) {
  97. temperature = Temperature::kWarm;
  98. }
  99. assert(temperature == file_opts.temperature);
  100. }
  101. IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
  102. result->reset(
  103. new SimulatedHybridRaf(std::move(*result), rate_limiter_, temperature));
  104. return s;
  105. }
  106. IOStatus SimulatedHybridFileSystem::NewWritableFile(
  107. const std::string& fname, const FileOptions& file_opts,
  108. std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
  109. if (file_opts.temperature == Temperature::kWarm) {
  110. const std::lock_guard<std::mutex> lock(mutex_);
  111. warm_file_set_.insert(fname);
  112. }
  113. IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
  114. if (file_opts.temperature == Temperature::kWarm || is_full_fs_warm_) {
  115. result->reset(new SimulatedWritableFile(std::move(*result), rate_limiter_));
  116. }
  117. return s;
  118. }
  119. IOStatus SimulatedHybridFileSystem::DeleteFile(const std::string& fname,
  120. const IOOptions& options,
  121. IODebugContext* dbg) {
  122. {
  123. const std::lock_guard<std::mutex> lock(mutex_);
  124. warm_file_set_.erase(fname);
  125. }
  126. return target()->DeleteFile(fname, options, dbg);
  127. }
  128. IOStatus SimulatedHybridRaf::Read(uint64_t offset, size_t n,
  129. const IOOptions& options, Slice* result,
  130. char* scratch, IODebugContext* dbg) const {
  131. if (temperature_ == Temperature::kWarm) {
  132. SimulateIOWait(n);
  133. }
  134. return target()->Read(offset, n, options, result, scratch, dbg);
  135. }
  136. IOStatus SimulatedHybridRaf::MultiRead(FSReadRequest* reqs, size_t num_reqs,
  137. const IOOptions& options,
  138. IODebugContext* dbg) {
  139. if (temperature_ == Temperature::kWarm) {
  140. for (size_t i = 0; i < num_reqs; i++) {
  141. SimulateIOWait(reqs[i].len);
  142. }
  143. }
  144. return target()->MultiRead(reqs, num_reqs, options, dbg);
  145. }
  146. IOStatus SimulatedHybridRaf::Prefetch(uint64_t offset, size_t n,
  147. const IOOptions& options,
  148. IODebugContext* dbg) {
  149. if (temperature_ == Temperature::kWarm) {
  150. SimulateIOWait(n);
  151. }
  152. return target()->Prefetch(offset, n, options, dbg);
  153. }
  154. void SimulatedHybridRaf::SimulateIOWait(int64_t bytes) const {
  155. int serve_time = CalculateServeTimeUs(bytes);
  156. {
  157. StopWatchNano stop_watch(Env::Default()->GetSystemClock().get(),
  158. /*auto_start=*/true);
  159. RateLimiterRequest(rate_limiter_.get(), serve_time);
  160. int time_passed_us = static_cast<int>(stop_watch.ElapsedNanos() / 1000);
  161. if (time_passed_us < serve_time) {
  162. Env::Default()->SleepForMicroseconds(serve_time - time_passed_us);
  163. }
  164. }
  165. }
  166. void SimulatedWritableFile::SimulateIOWait(int64_t bytes) const {
  167. int serve_time = CalculateServeTimeUs(bytes);
  168. Env::Default()->SleepForMicroseconds(serve_time);
  169. RateLimiterRequest(rate_limiter_.get(), serve_time);
  170. }
  171. IOStatus SimulatedWritableFile::Append(const Slice& data, const IOOptions& ioo,
  172. IODebugContext* idc) {
  173. if (use_direct_io()) {
  174. SimulateIOWait(data.size());
  175. } else {
  176. unsynced_bytes += data.size();
  177. }
  178. return target()->Append(data, ioo, idc);
  179. }
  180. IOStatus SimulatedWritableFile::Append(
  181. const Slice& data, const IOOptions& options,
  182. const DataVerificationInfo& verification_info, IODebugContext* dbg) {
  183. if (use_direct_io()) {
  184. SimulateIOWait(data.size());
  185. } else {
  186. unsynced_bytes += data.size();
  187. }
  188. return target()->Append(data, options, verification_info, dbg);
  189. }
  190. IOStatus SimulatedWritableFile::PositionedAppend(const Slice& data,
  191. uint64_t offset,
  192. const IOOptions& options,
  193. IODebugContext* dbg) {
  194. if (use_direct_io()) {
  195. SimulateIOWait(data.size());
  196. } else {
  197. // This might be overcalculated, but it's probably OK.
  198. unsynced_bytes += data.size();
  199. }
  200. return target()->PositionedAppend(data, offset, options, dbg);
  201. }
  202. IOStatus SimulatedWritableFile::PositionedAppend(
  203. const Slice& data, uint64_t offset, const IOOptions& options,
  204. const DataVerificationInfo& verification_info, IODebugContext* dbg) {
  205. if (use_direct_io()) {
  206. SimulateIOWait(data.size());
  207. } else {
  208. // This might be overcalculated, but it's probably OK.
  209. unsynced_bytes += data.size();
  210. }
  211. return target()->PositionedAppend(data, offset, options, verification_info,
  212. dbg);
  213. }
  214. IOStatus SimulatedWritableFile::Sync(const IOOptions& options,
  215. IODebugContext* dbg) {
  216. if (unsynced_bytes > 0) {
  217. SimulateIOWait(unsynced_bytes);
  218. unsynced_bytes = 0;
  219. }
  220. return target()->Sync(options, dbg);
  221. }
  222. } // namespace ROCKSDB_NAMESPACE