fs_on_demand.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "env/fs_on_demand.h"
  6. #include <algorithm>
  7. #include <set>
  8. #include "file/filename.h"
  9. #include "port/port.h"
  10. #include "rocksdb/types.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. // Check if the input path is under orig (typically the local directory), and if
  13. // so, change it to the equivalent path under replace (typically the remote
  14. // directory). For example, if orig is "/data/follower", replace is
  15. // "/data/leader", and the given path is "/data/follower/000010.sst", on return
  16. // the path would be changed to
  17. // "/data/leader/000010.sst".
  18. // Return value is true if the path was modified, false otherwise
  19. bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig,
  20. const std::string& replace,
  21. std::string& path) {
  22. size_t pos = path.find(orig);
  23. if (pos > 0) {
  24. return false;
  25. }
  26. path.replace(pos, orig.length(), replace);
  27. return true;
  28. }
  29. bool OnDemandFileSystem::LookupFileType(const std::string& name,
  30. FileType* type) {
  31. std::size_t found = name.find_last_of('/');
  32. std::string file_name = name.substr(found);
  33. uint64_t number = 0;
  34. return ParseFileName(file_name, &number, type);
  35. }
  36. // RocksDB opens non-SST files for reading in sequential file mode. This
  37. // includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them
  38. // in place in the source directory. For files that are appendable or
  39. // can be renamed, which is MANIFEST and CURRENT files, we wrap the
  40. // underlying FSSequentialFile with another class that checks when EOF
  41. // has been reached and re-opens the file to see the latest data. On some
  42. // distributed file systems, this is necessary.
  43. IOStatus OnDemandFileSystem::NewSequentialFile(
  44. const std::string& fname, const FileOptions& file_opts,
  45. std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
  46. FileType type;
  47. static std::unordered_set<FileType> valid_types(
  48. {kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile});
  49. if (!LookupFileType(fname, &type) ||
  50. (valid_types.find(type) == valid_types.end())) {
  51. return IOStatus::NotSupported();
  52. }
  53. IOStatus s;
  54. std::string rname = fname;
  55. if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
  56. // First clear any local directory cache as it may be out of date
  57. target()->DiscardCacheForDirectory(rname);
  58. std::unique_ptr<FSSequentialFile> inner_file;
  59. s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg);
  60. if (s.ok() && type == kDescriptorFile) {
  61. result->reset(new OnDemandSequentialFile(std::move(inner_file), this,
  62. file_opts, rname));
  63. } else {
  64. *result = std::move(inner_file);
  65. }
  66. } else {
  67. s = target()->NewSequentialFile(fname, file_opts, result, dbg);
  68. }
  69. return s;
  70. }
  71. // This is only supported for SST files. If the file is present locally,
  72. // i.e in the destination dir, we just open it and return. If its in the
  73. // remote, i.e source dir, we link it locally and open the link.
  74. // TODO: Add support for blob files belonging to the new BlobDB
  75. IOStatus OnDemandFileSystem::NewRandomAccessFile(
  76. const std::string& fname, const FileOptions& file_opts,
  77. std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
  78. FileType type;
  79. if (!LookupFileType(fname, &type) || type != kTableFile) {
  80. return IOStatus::NotSupported();
  81. }
  82. IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr);
  83. if (s.IsNotFound() || s.IsPathNotFound()) {
  84. std::string rname = fname;
  85. if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
  86. // First clear any local directory cache as it may be out of date
  87. target()->DiscardCacheForDirectory(rname);
  88. s = target()->LinkFile(rname, fname, IOOptions(), nullptr);
  89. if (!s.ok()) {
  90. return s;
  91. }
  92. }
  93. }
  94. return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg)
  95. : s;
  96. }
  97. // We don't expect to create any writable file other than info LOG files.
  98. IOStatus OnDemandFileSystem::NewWritableFile(
  99. const std::string& fname, const FileOptions& file_opts,
  100. std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
  101. FileType type;
  102. if (!LookupFileType(fname, &type) || type != kInfoLogFile) {
  103. return IOStatus::NotSupported();
  104. }
  105. std::string rname = fname;
  106. if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
  107. // First clear any local directory cache as it may be out of date
  108. target()->DiscardCacheForDirectory(rname);
  109. IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg);
  110. if (s.ok()) {
  111. return IOStatus::InvalidArgument(
  112. "Writing to a file present in the remote directory not supoprted");
  113. }
  114. }
  115. return target()->NewWritableFile(fname, file_opts, result, dbg);
  116. }
  117. // Currently not supported, as there's no need for RocksDB to create a
  118. // directory object for a DB in read-only mode.
  119. IOStatus OnDemandFileSystem::NewDirectory(
  120. const std::string& /*name*/, const IOOptions& /*io_opts*/,
  121. std::unique_ptr<FSDirectory>* /*result*/, IODebugContext* /*dbg*/) {
  122. return IOStatus::NotSupported();
  123. }
  124. // Check if the given file exists, either locally or remote. If the file is an
  125. // SST file, then link it locally. We assume if the file existence is being
  126. // checked, its for verification purposes, for example while replaying the
  127. // MANIFEST. The file will be opened for reading some time in the future.
  128. IOStatus OnDemandFileSystem::FileExists(const std::string& fname,
  129. const IOOptions& options,
  130. IODebugContext* dbg) {
  131. IOStatus s = target()->FileExists(fname, options, dbg);
  132. if (!s.IsNotFound() && !s.IsPathNotFound()) {
  133. return s;
  134. }
  135. std::string rname = fname;
  136. if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
  137. // First clear any local directory cache as it may be out of date
  138. target()->DiscardCacheForDirectory(rname);
  139. FileType type;
  140. if (LookupFileType(fname, &type) && type == kTableFile) {
  141. s = target()->LinkFile(rname, fname, options, dbg);
  142. } else {
  143. s = target()->FileExists(rname, options, dbg);
  144. }
  145. }
  146. return s;
  147. }
  148. // Doa listing of both the local and remote directories and merge the two.
  149. IOStatus OnDemandFileSystem::GetChildren(const std::string& dir,
  150. const IOOptions& options,
  151. std::vector<std::string>* result,
  152. IODebugContext* dbg) {
  153. std::string rdir = dir;
  154. IOStatus s = target()->GetChildren(dir, options, result, dbg);
  155. if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
  156. return s;
  157. }
  158. std::vector<std::string> rchildren;
  159. // First clear any local directory cache as it may be out of date
  160. target()->DiscardCacheForDirectory(rdir);
  161. s = target()->GetChildren(rdir, options, &rchildren, dbg);
  162. if (s.ok()) {
  163. std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) {
  164. // Adjust name
  165. (void)CheckPathAndAdjust(remote_path_, local_path_, name);
  166. });
  167. std::sort(result->begin(), result->end());
  168. std::sort(rchildren.begin(), rchildren.end());
  169. std::vector<std::string> output;
  170. output.reserve(result->size() + rchildren.size());
  171. std::set_union(result->begin(), result->end(), rchildren.begin(),
  172. rchildren.end(), std::back_inserter(output));
  173. *result = std::move(output);
  174. }
  175. return s;
  176. }
  177. // Doa listing of both the local and remote directories and merge the two.
  178. IOStatus OnDemandFileSystem::GetChildrenFileAttributes(
  179. const std::string& dir, const IOOptions& options,
  180. std::vector<FileAttributes>* result, IODebugContext* dbg) {
  181. std::string rdir = dir;
  182. IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
  183. if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
  184. return s;
  185. }
  186. std::vector<FileAttributes> rchildren;
  187. // First clear any local directory cache as it may be out of date
  188. target()->DiscardCacheForDirectory(rdir);
  189. s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg);
  190. if (s.ok()) {
  191. struct FileAttributeSorter {
  192. bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) {
  193. return lhs.name < rhs.name;
  194. }
  195. } file_attr_sorter;
  196. std::for_each(
  197. rchildren.begin(), rchildren.end(), [&](FileAttributes& file) {
  198. // Adjust name
  199. (void)CheckPathAndAdjust(remote_path_, local_path_, file.name);
  200. });
  201. std::sort(result->begin(), result->end(), file_attr_sorter);
  202. std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter);
  203. std::vector<FileAttributes> output;
  204. output.reserve(result->size() + rchildren.size());
  205. std::set_union(rchildren.begin(), rchildren.end(), result->begin(),
  206. result->end(), std::back_inserter(output), file_attr_sorter);
  207. *result = std::move(output);
  208. }
  209. return s;
  210. }
  211. IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname,
  212. const IOOptions& options,
  213. uint64_t* file_size,
  214. IODebugContext* dbg) {
  215. uint64_t local_size = 0;
  216. IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg);
  217. if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) {
  218. return s;
  219. }
  220. if (s.IsNotFound() || s.IsPathNotFound()) {
  221. std::string rname = fname;
  222. if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
  223. // First clear any local directory cache as it may be out of date
  224. target()->DiscardCacheForDirectory(rname);
  225. FileType type;
  226. if (LookupFileType(fname, &type) && type == kTableFile) {
  227. s = target()->LinkFile(rname, fname, options, dbg);
  228. if (s.ok()) {
  229. s = target()->GetFileSize(fname, options, &local_size, dbg);
  230. }
  231. } else {
  232. s = target()->GetFileSize(rname, options, &local_size, dbg);
  233. }
  234. }
  235. }
  236. *file_size = local_size;
  237. return s;
  238. }
  239. // An implementation of Read that tracks whether we've reached EOF. If so,
  240. // re-open the file to try to read past the previous EOF offset. After
  241. // re-opening, positing it back to the last read offset.
  242. IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options,
  243. Slice* result, char* scratch,
  244. IODebugContext* dbg) {
  245. IOStatus s;
  246. if (eof_) {
  247. // Reopen the file. With some distributed file systems, this is required
  248. // in order to get the new size
  249. file_.reset();
  250. s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg);
  251. if (!s.ok()) {
  252. return IOStatus::IOError("While opening file after relinking, got error ",
  253. s.ToString());
  254. }
  255. s = file_->Skip(offset_);
  256. if (!s.ok()) {
  257. return IOStatus::IOError(
  258. "While seeking to offset" + std::to_string(offset_) + "got error",
  259. s.ToString());
  260. }
  261. eof_ = false;
  262. }
  263. s = file_->Read(n, options, result, scratch, dbg);
  264. if (s.ok()) {
  265. offset_ += result->size();
  266. if (result->size() < n) {
  267. // We reached EOF. Mark it so we know to relink next time
  268. eof_ = true;
  269. }
  270. }
  271. return s;
  272. }
  273. IOStatus OnDemandSequentialFile::Skip(uint64_t n) {
  274. IOStatus s = file_->Skip(n);
  275. if (s.ok()) {
  276. offset_ += n;
  277. }
  278. return s;
  279. }
  280. bool OnDemandSequentialFile::use_direct_io() const {
  281. return file_->use_direct_io();
  282. }
  283. size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const {
  284. return file_->GetRequiredBufferAlignment();
  285. }
  286. Temperature OnDemandSequentialFile::GetTemperature() const {
  287. return file_->GetTemperature();
  288. }
  289. std::shared_ptr<FileSystem> NewOnDemandFileSystem(
  290. const std::shared_ptr<FileSystem>& fs, std::string src_path,
  291. std::string dest_path) {
  292. return std::make_shared<OnDemandFileSystem>(fs, src_path, dest_path);
  293. }
  294. } // namespace ROCKSDB_NAMESPACE