composite_env.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include "env/composite_env_wrapper.h"
  7. #include "rocksdb/utilities/options_type.h"
  8. #include "util/string_util.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. namespace {
  11. // The CompositeEnvWrapper class provides an interface that is compatible
  12. // with the old monolithic Env API, and an implementation that wraps around
  13. // the new Env that provides threading and other OS related functionality, and
  14. // the new FileSystem API that provides storage functionality. By
  15. // providing the old Env interface, it allows the rest of RocksDB code to
  16. // be agnostic of whether the underlying Env implementation is a monolithic
  17. // Env or an Env + FileSystem. In the former case, the user will specify
  18. // Options::env only, whereas in the latter case, the user will specify
  19. // Options::env and Options::file_system.
  20. class CompositeSequentialFileWrapper : public SequentialFile {
  21. public:
  22. explicit CompositeSequentialFileWrapper(
  23. std::unique_ptr<FSSequentialFile>& target)
  24. : target_(std::move(target)) {}
  25. Status Read(size_t n, Slice* result, char* scratch) override {
  26. IOOptions io_opts;
  27. IODebugContext dbg;
  28. return target_->Read(n, io_opts, result, scratch, &dbg);
  29. }
  30. Status Skip(uint64_t n) override { return target_->Skip(n); }
  31. bool use_direct_io() const override { return target_->use_direct_io(); }
  32. size_t GetRequiredBufferAlignment() const override {
  33. return target_->GetRequiredBufferAlignment();
  34. }
  35. Status InvalidateCache(size_t offset, size_t length) override {
  36. return target_->InvalidateCache(offset, length);
  37. }
  38. Status PositionedRead(uint64_t offset, size_t n, Slice* result,
  39. char* scratch) override {
  40. IOOptions io_opts;
  41. IODebugContext dbg;
  42. return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
  43. }
  44. private:
  45. std::unique_ptr<FSSequentialFile> target_;
  46. };
  47. class CompositeRandomAccessFileWrapper : public RandomAccessFile {
  48. public:
  49. explicit CompositeRandomAccessFileWrapper(
  50. std::unique_ptr<FSRandomAccessFile>& target)
  51. : target_(std::move(target)) {}
  52. Status Read(uint64_t offset, size_t n, Slice* result,
  53. char* scratch) const override {
  54. IOOptions io_opts;
  55. IODebugContext dbg;
  56. return target_->Read(offset, n, io_opts, result, scratch, &dbg);
  57. }
  58. Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
  59. IOOptions io_opts;
  60. IODebugContext dbg;
  61. std::vector<FSReadRequest> fs_reqs;
  62. Status status;
  63. fs_reqs.resize(num_reqs);
  64. for (size_t i = 0; i < num_reqs; ++i) {
  65. fs_reqs[i].offset = reqs[i].offset;
  66. fs_reqs[i].len = reqs[i].len;
  67. fs_reqs[i].scratch = reqs[i].scratch;
  68. fs_reqs[i].status = IOStatus::OK();
  69. }
  70. status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
  71. for (size_t i = 0; i < num_reqs; ++i) {
  72. reqs[i].result = fs_reqs[i].result;
  73. reqs[i].status = fs_reqs[i].status;
  74. }
  75. return status;
  76. }
  77. Status Prefetch(uint64_t offset, size_t n) override {
  78. IOOptions io_opts;
  79. IODebugContext dbg;
  80. return target_->Prefetch(offset, n, io_opts, &dbg);
  81. }
  82. size_t GetUniqueId(char* id, size_t max_size) const override {
  83. return target_->GetUniqueId(id, max_size);
  84. }
  85. void Hint(AccessPattern pattern) override {
  86. target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
  87. }
  88. bool use_direct_io() const override { return target_->use_direct_io(); }
  89. size_t GetRequiredBufferAlignment() const override {
  90. return target_->GetRequiredBufferAlignment();
  91. }
  92. Status InvalidateCache(size_t offset, size_t length) override {
  93. return target_->InvalidateCache(offset, length);
  94. }
  95. Status GetFileSize(uint64_t* size) override {
  96. return target_->GetFileSize(size);
  97. }
  98. private:
  99. std::unique_ptr<FSRandomAccessFile> target_;
  100. };
  101. class CompositeWritableFileWrapper : public WritableFile {
  102. public:
  103. explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
  104. : target_(std::move(t)) {}
  105. Status Append(const Slice& data) override {
  106. IOOptions io_opts;
  107. IODebugContext dbg;
  108. return target_->Append(data, io_opts, &dbg);
  109. }
  110. Status Append(const Slice& data,
  111. const DataVerificationInfo& verification_info) override {
  112. IOOptions io_opts;
  113. IODebugContext dbg;
  114. return target_->Append(data, io_opts, verification_info, &dbg);
  115. }
  116. Status PositionedAppend(const Slice& data, uint64_t offset) override {
  117. IOOptions io_opts;
  118. IODebugContext dbg;
  119. return target_->PositionedAppend(data, offset, io_opts, &dbg);
  120. }
  121. Status PositionedAppend(
  122. const Slice& data, uint64_t offset,
  123. const DataVerificationInfo& verification_info) override {
  124. IOOptions io_opts;
  125. IODebugContext dbg;
  126. return target_->PositionedAppend(data, offset, io_opts, verification_info,
  127. &dbg);
  128. }
  129. Status Truncate(uint64_t size) override {
  130. IOOptions io_opts;
  131. IODebugContext dbg;
  132. return target_->Truncate(size, io_opts, &dbg);
  133. }
  134. Status Close() override {
  135. IOOptions io_opts;
  136. IODebugContext dbg;
  137. return target_->Close(io_opts, &dbg);
  138. }
  139. Status Flush() override {
  140. IOOptions io_opts;
  141. IODebugContext dbg;
  142. return target_->Flush(io_opts, &dbg);
  143. }
  144. Status Sync() override {
  145. IOOptions io_opts;
  146. IODebugContext dbg;
  147. return target_->Sync(io_opts, &dbg);
  148. }
  149. Status Fsync() override {
  150. IOOptions io_opts;
  151. IODebugContext dbg;
  152. return target_->Fsync(io_opts, &dbg);
  153. }
  154. bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
  155. bool use_direct_io() const override { return target_->use_direct_io(); }
  156. size_t GetRequiredBufferAlignment() const override {
  157. return target_->GetRequiredBufferAlignment();
  158. }
  159. void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
  160. target_->SetWriteLifeTimeHint(hint);
  161. }
  162. Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
  163. return target_->GetWriteLifeTimeHint();
  164. }
  165. uint64_t GetFileSize() override {
  166. IOOptions io_opts;
  167. IODebugContext dbg;
  168. return target_->GetFileSize(io_opts, &dbg);
  169. }
  170. void SetPreallocationBlockSize(size_t size) override {
  171. target_->SetPreallocationBlockSize(size);
  172. }
  173. void GetPreallocationStatus(size_t* block_size,
  174. size_t* last_allocated_block) override {
  175. target_->GetPreallocationStatus(block_size, last_allocated_block);
  176. }
  177. size_t GetUniqueId(char* id, size_t max_size) const override {
  178. return target_->GetUniqueId(id, max_size);
  179. }
  180. Status InvalidateCache(size_t offset, size_t length) override {
  181. return target_->InvalidateCache(offset, length);
  182. }
  183. Status RangeSync(uint64_t offset, uint64_t nbytes) override {
  184. IOOptions io_opts;
  185. IODebugContext dbg;
  186. return target_->RangeSync(offset, nbytes, io_opts, &dbg);
  187. }
  188. void PrepareWrite(size_t offset, size_t len) override {
  189. IOOptions io_opts;
  190. IODebugContext dbg;
  191. target_->PrepareWrite(offset, len, io_opts, &dbg);
  192. }
  193. Status Allocate(uint64_t offset, uint64_t len) override {
  194. IOOptions io_opts;
  195. IODebugContext dbg;
  196. return target_->Allocate(offset, len, io_opts, &dbg);
  197. }
  198. std::unique_ptr<FSWritableFile>* target() { return &target_; }
  199. private:
  200. std::unique_ptr<FSWritableFile> target_;
  201. };
  202. class CompositeRandomRWFileWrapper : public RandomRWFile {
  203. public:
  204. explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
  205. : target_(std::move(target)) {}
  206. bool use_direct_io() const override { return target_->use_direct_io(); }
  207. size_t GetRequiredBufferAlignment() const override {
  208. return target_->GetRequiredBufferAlignment();
  209. }
  210. Status Write(uint64_t offset, const Slice& data) override {
  211. IOOptions io_opts;
  212. IODebugContext dbg;
  213. return target_->Write(offset, data, io_opts, &dbg);
  214. }
  215. Status Read(uint64_t offset, size_t n, Slice* result,
  216. char* scratch) const override {
  217. IOOptions io_opts;
  218. IODebugContext dbg;
  219. return target_->Read(offset, n, io_opts, result, scratch, &dbg);
  220. }
  221. Status Flush() override {
  222. IOOptions io_opts;
  223. IODebugContext dbg;
  224. return target_->Flush(io_opts, &dbg);
  225. }
  226. Status Sync() override {
  227. IOOptions io_opts;
  228. IODebugContext dbg;
  229. return target_->Sync(io_opts, &dbg);
  230. }
  231. Status Fsync() override {
  232. IOOptions io_opts;
  233. IODebugContext dbg;
  234. return target_->Fsync(io_opts, &dbg);
  235. }
  236. Status Close() override {
  237. IOOptions io_opts;
  238. IODebugContext dbg;
  239. return target_->Close(io_opts, &dbg);
  240. }
  241. private:
  242. std::unique_ptr<FSRandomRWFile> target_;
  243. };
  244. class CompositeDirectoryWrapper : public Directory {
  245. public:
  246. explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
  247. : target_(std::move(target)) {}
  248. Status Fsync() override {
  249. IOOptions io_opts;
  250. IODebugContext dbg;
  251. return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
  252. }
  253. Status Close() override {
  254. IOOptions io_opts;
  255. IODebugContext dbg;
  256. return target_->Close(io_opts, &dbg);
  257. }
  258. size_t GetUniqueId(char* id, size_t max_size) const override {
  259. return target_->GetUniqueId(id, max_size);
  260. }
  261. private:
  262. std::unique_ptr<FSDirectory> target_;
  263. };
  264. } // namespace
  265. Status CompositeEnv::NewSequentialFile(const std::string& f,
  266. std::unique_ptr<SequentialFile>* r,
  267. const EnvOptions& options) {
  268. IODebugContext dbg;
  269. std::unique_ptr<FSSequentialFile> file;
  270. Status status;
  271. status =
  272. file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
  273. if (status.ok()) {
  274. r->reset(new CompositeSequentialFileWrapper(file));
  275. }
  276. return status;
  277. }
  278. Status CompositeEnv::NewRandomAccessFile(const std::string& f,
  279. std::unique_ptr<RandomAccessFile>* r,
  280. const EnvOptions& options) {
  281. IODebugContext dbg;
  282. std::unique_ptr<FSRandomAccessFile> file;
  283. Status status;
  284. status =
  285. file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
  286. if (status.ok()) {
  287. r->reset(new CompositeRandomAccessFileWrapper(file));
  288. }
  289. return status;
  290. }
  291. Status CompositeEnv::NewWritableFile(const std::string& f,
  292. std::unique_ptr<WritableFile>* r,
  293. const EnvOptions& options) {
  294. IODebugContext dbg;
  295. std::unique_ptr<FSWritableFile> file;
  296. Status status;
  297. status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
  298. if (status.ok()) {
  299. r->reset(new CompositeWritableFileWrapper(file));
  300. }
  301. return status;
  302. }
  303. Status CompositeEnv::ReopenWritableFile(const std::string& fname,
  304. std::unique_ptr<WritableFile>* result,
  305. const EnvOptions& options) {
  306. IODebugContext dbg;
  307. Status status;
  308. std::unique_ptr<FSWritableFile> file;
  309. status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
  310. &dbg);
  311. if (status.ok()) {
  312. result->reset(new CompositeWritableFileWrapper(file));
  313. }
  314. return status;
  315. }
  316. Status CompositeEnv::ReuseWritableFile(const std::string& fname,
  317. const std::string& old_fname,
  318. std::unique_ptr<WritableFile>* r,
  319. const EnvOptions& options) {
  320. IODebugContext dbg;
  321. Status status;
  322. std::unique_ptr<FSWritableFile> file;
  323. status = file_system_->ReuseWritableFile(fname, old_fname,
  324. FileOptions(options), &file, &dbg);
  325. if (status.ok()) {
  326. r->reset(new CompositeWritableFileWrapper(file));
  327. }
  328. return status;
  329. }
  330. Status CompositeEnv::NewRandomRWFile(const std::string& fname,
  331. std::unique_ptr<RandomRWFile>* result,
  332. const EnvOptions& options) {
  333. IODebugContext dbg;
  334. std::unique_ptr<FSRandomRWFile> file;
  335. Status status;
  336. status =
  337. file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
  338. if (status.ok()) {
  339. result->reset(new CompositeRandomRWFileWrapper(file));
  340. }
  341. return status;
  342. }
  343. Status CompositeEnv::NewDirectory(const std::string& name,
  344. std::unique_ptr<Directory>* result) {
  345. IOOptions io_opts;
  346. IODebugContext dbg;
  347. std::unique_ptr<FSDirectory> dir;
  348. Status status;
  349. status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
  350. if (status.ok()) {
  351. result->reset(new CompositeDirectoryWrapper(dir));
  352. }
  353. return status;
  354. }
  355. namespace {
  356. static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
  357. {"target",
  358. OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName,
  359. OptionTypeFlags::kDontSerialize)
  360. .SetParseFunc([](const ConfigOptions& opts,
  361. const std::string& /*name*/, const std::string& value,
  362. void* addr) {
  363. auto target = static_cast<EnvWrapper::Target*>(addr);
  364. return Env::CreateFromString(opts, value, &(target->env),
  365. &(target->guard));
  366. })
  367. .SetEqualsFunc([](const ConfigOptions& opts,
  368. const std::string& /*name*/, const void* addr1,
  369. const void* addr2, std::string* mismatch) {
  370. const auto target1 = static_cast<const EnvWrapper::Target*>(addr1);
  371. const auto target2 = static_cast<const EnvWrapper::Target*>(addr2);
  372. if (target1->env != nullptr) {
  373. return target1->env->AreEquivalent(opts, target2->env, mismatch);
  374. } else {
  375. return (target2->env == nullptr);
  376. }
  377. })
  378. .SetPrepareFunc([](const ConfigOptions& opts,
  379. const std::string& /*name*/, void* addr) {
  380. auto target = static_cast<EnvWrapper::Target*>(addr);
  381. if (target->guard.get() != nullptr) {
  382. target->env = target->guard.get();
  383. } else if (target->env == nullptr) {
  384. target->env = Env::Default();
  385. }
  386. return target->env->PrepareOptions(opts);
  387. })
  388. .SetValidateFunc([](const DBOptions& db_opts,
  389. const ColumnFamilyOptions& cf_opts,
  390. const std::string& /*name*/, const void* addr) {
  391. const auto target = static_cast<const EnvWrapper::Target*>(addr);
  392. if (target->env == nullptr) {
  393. return Status::InvalidArgument("Target Env not specified");
  394. } else {
  395. return target->env->ValidateOptions(db_opts, cf_opts);
  396. }
  397. })},
  398. };
  399. static std::unordered_map<std::string, OptionTypeInfo>
  400. composite_fs_wrapper_type_info = {
  401. {"file_system",
  402. OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
  403. 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
  404. };
  405. static std::unordered_map<std::string, OptionTypeInfo>
  406. composite_clock_wrapper_type_info = {
  407. {"clock",
  408. OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
  409. 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
  410. };
  411. } // namespace
  412. std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
  413. return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
  414. }
  415. CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
  416. const std::shared_ptr<FileSystem>& fs,
  417. const std::shared_ptr<SystemClock>& sc)
  418. : CompositeEnv(fs, sc), target_(env) {
  419. RegisterOptions("", &target_, &env_wrapper_type_info);
  420. RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
  421. RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
  422. }
  423. CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
  424. const std::shared_ptr<FileSystem>& fs,
  425. const std::shared_ptr<SystemClock>& sc)
  426. : CompositeEnv(fs, sc), target_(env) {
  427. RegisterOptions("", &target_, &env_wrapper_type_info);
  428. RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
  429. RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
  430. }
  431. Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
  432. target_.Prepare();
  433. if (file_system_ == nullptr) {
  434. file_system_ = target_.env->GetFileSystem();
  435. }
  436. if (system_clock_ == nullptr) {
  437. system_clock_ = target_.env->GetSystemClock();
  438. }
  439. return Env::PrepareOptions(options);
  440. }
  441. std::string CompositeEnvWrapper::SerializeOptions(
  442. const ConfigOptions& config_options, const std::string& header) const {
  443. auto options = CompositeEnv::SerializeOptions(config_options, header);
  444. if (target_.env != nullptr && target_.env != Env::Default()) {
  445. options.append("target=");
  446. options.append(target_.env->ToString(config_options));
  447. }
  448. return options;
  449. }
  450. EnvWrapper::EnvWrapper(Env* t) : target_(t) {
  451. RegisterOptions("", &target_, &env_wrapper_type_info);
  452. }
  453. EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
  454. RegisterOptions("", &target_, &env_wrapper_type_info);
  455. }
  456. EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
  457. RegisterOptions("", &target_, &env_wrapper_type_info);
  458. }
  459. EnvWrapper::~EnvWrapper() = default;
  460. Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
  461. target_.Prepare();
  462. return Env::PrepareOptions(options);
  463. }
  464. std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
  465. const std::string& header) const {
  466. auto parent = Env::SerializeOptions(config_options, "");
  467. if (config_options.IsShallow() || target_.env == nullptr ||
  468. target_.env == Env::Default()) {
  469. return parent;
  470. } else {
  471. std::string result = header;
  472. if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
  473. result.append(OptionTypeInfo::kIdPropName()).append("=");
  474. }
  475. result.append(parent);
  476. if (!EndsWith(result, config_options.delimiter)) {
  477. result.append(config_options.delimiter);
  478. }
  479. result.append("target=").append(target_.env->ToString(config_options));
  480. return result;
  481. }
  482. }
  483. } // namespace ROCKSDB_NAMESPACE