env_librados.cc 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497
  1. // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
  2. // vim: ts=8 sw=2 smarttab
  3. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  4. #include "rocksdb/utilities/env_librados.h"
  5. #include "util/random.h"
  6. #include <mutex>
  7. #include <cstdlib>
  8. namespace ROCKSDB_NAMESPACE {
  9. /* GLOBAL DIFINE */
  10. // #define DEBUG
  11. #ifdef DEBUG
  12. #include <cstdio>
  13. #include <sys/syscall.h>
  14. #include <unistd.h>
  15. #define LOG_DEBUG(...) do{\
  16. printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
  17. printf(__VA_ARGS__);\
  18. }while(0)
  19. #else
  20. #define LOG_DEBUG(...)
  21. #endif
  22. /* GLOBAL CONSTANT */
  23. const char *default_db_name = "default_envlibrados_db";
  24. const char *default_pool_name = "default_envlibrados_pool";
  25. const char *default_config_path = "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file
  26. // maximum dir/file that can store in the fs
  27. const int MAX_ITEMS_IN_FS = 1 << 30;
  28. // root dir tag
  29. const std::string ROOT_DIR_KEY = "/";
  30. const std::string DIR_ID_VALUE = "<DIR>";
  31. /**
  32. * @brief convert error code to status
  33. * @details Convert internal linux error code to Status
  34. *
  35. * @param r [description]
  36. * @return [description]
  37. */
  38. Status err_to_status(int r)
  39. {
  40. switch (r) {
  41. case 0:
  42. return Status::OK();
  43. case -ENOENT:
  44. return Status::IOError();
  45. case -ENODATA:
  46. case -ENOTDIR:
  47. return Status::NotFound(Status::kNone);
  48. case -EINVAL:
  49. return Status::InvalidArgument(Status::kNone);
  50. case -EIO:
  51. return Status::IOError(Status::kNone);
  52. default:
  53. // FIXME :(
  54. assert(0 == "unrecognized error code");
  55. return Status::NotSupported(Status::kNone);
  56. }
  57. }
  58. /**
  59. * @brief split file path into dir path and file name
  60. * @details
  61. * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
  62. * For example:
  63. * b/c => dir '/b', file 'c'
  64. * /a/b/c => dir '/b', file 'c'
  65. *
  66. * @param fn [description]
  67. * @param dir [description]
  68. * @param file [description]
  69. */
  70. void split(const std::string &fn, std::string *dir, std::string *file) {
  71. LOG_DEBUG("[IN]%s\n", fn.c_str());
  72. int pos = fn.size() - 1;
  73. while ('/' == fn[pos]) --pos;
  74. size_t fstart = fn.rfind('/', pos);
  75. *file = fn.substr(fstart + 1, pos - fstart);
  76. pos = fstart;
  77. while (pos >= 0 && '/' == fn[pos]) --pos;
  78. if (pos < 0) {
  79. *dir = "/";
  80. } else {
  81. size_t dstart = fn.rfind('/', pos);
  82. *dir = fn.substr(dstart + 1, pos - dstart);
  83. *dir = std::string("/") + *dir;
  84. }
  85. LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str());
  86. }
  87. // A file abstraction for reading sequentially through a file
  88. class LibradosSequentialFile : public SequentialFile {
  89. librados::IoCtx * _io_ctx;
  90. std::string _fid;
  91. std::string _hint;
  92. int _offset;
  93. public:
  94. LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
  95. _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {}
  96. ~LibradosSequentialFile() {}
  97. /**
  98. * @brief read file
  99. * @details
  100. * Read up to "n" bytes from the file. "scratch[0..n-1]" may be
  101. * written by this routine. Sets "*result" to the data that was
  102. * read (including if fewer than "n" bytes were successfully read).
  103. * May set "*result" to point at data in "scratch[0..n-1]", so
  104. * "scratch[0..n-1]" must be live when "*result" is used.
  105. * If an error was encountered, returns a non-OK status.
  106. *
  107. * REQUIRES: External synchronization
  108. *
  109. * @param n [description]
  110. * @param result [description]
  111. * @param scratch [description]
  112. * @return [description]
  113. */
  114. Status Read(size_t n, Slice* result, char* scratch) {
  115. LOG_DEBUG("[IN]%i\n", (int)n);
  116. librados::bufferlist buffer;
  117. Status s;
  118. int r = _io_ctx->read(_fid, buffer, n, _offset);
  119. if (r >= 0) {
  120. buffer.begin().copy(r, scratch);
  121. *result = Slice(scratch, r);
  122. _offset += r;
  123. s = Status::OK();
  124. } else {
  125. s = err_to_status(r);
  126. if (s == Status::IOError()) {
  127. *result = Slice();
  128. s = Status::OK();
  129. }
  130. }
  131. LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
  132. return s;
  133. }
  134. /**
  135. * @brief skip "n" bytes from the file
  136. * @details
  137. * Skip "n" bytes from the file. This is guaranteed to be no
  138. * slower that reading the same data, but may be faster.
  139. *
  140. * If end of file is reached, skipping will stop at the end of the
  141. * file, and Skip will return OK.
  142. *
  143. * REQUIRES: External synchronization
  144. *
  145. * @param n [description]
  146. * @return [description]
  147. */
  148. Status Skip(uint64_t n) {
  149. _offset += n;
  150. return Status::OK();
  151. }
  152. /**
  153. * @brief noop
  154. * @details
  155. * rocksdb has it's own caching capabilities that we should be able to use,
  156. * without relying on a cache here. This can safely be a no-op.
  157. *
  158. * @param offset [description]
  159. * @param length [description]
  160. *
  161. * @return [description]
  162. */
  163. Status InvalidateCache(size_t offset, size_t length) {
  164. return Status::OK();
  165. }
  166. };
  167. // A file abstraction for randomly reading the contents of a file.
  168. class LibradosRandomAccessFile : public RandomAccessFile {
  169. librados::IoCtx * _io_ctx;
  170. std::string _fid;
  171. std::string _hint;
  172. public:
  173. LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
  174. _io_ctx(io_ctx), _fid(fid), _hint(hint) {}
  175. ~LibradosRandomAccessFile() {}
  176. /**
  177. * @brief read file
  178. * @details similar to LibradosSequentialFile::Read
  179. *
  180. * @param offset [description]
  181. * @param n [description]
  182. * @param result [description]
  183. * @param scratch [description]
  184. * @return [description]
  185. */
  186. Status Read(uint64_t offset, size_t n, Slice* result,
  187. char* scratch) const {
  188. LOG_DEBUG("[IN]%i\n", (int)n);
  189. librados::bufferlist buffer;
  190. Status s;
  191. int r = _io_ctx->read(_fid, buffer, n, offset);
  192. if (r >= 0) {
  193. buffer.begin().copy(r, scratch);
  194. *result = Slice(scratch, r);
  195. s = Status::OK();
  196. } else {
  197. s = err_to_status(r);
  198. if (s == Status::IOError()) {
  199. *result = Slice();
  200. s = Status::OK();
  201. }
  202. }
  203. LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
  204. return s;
  205. }
  206. /**
  207. * @brief [brief description]
  208. * @details Get unique id for each file and guarantee this id is different for each file
  209. *
  210. * @param id [description]
  211. * @param max_size max size of id, it shoud be larger than 16
  212. *
  213. * @return [description]
  214. */
  215. size_t GetUniqueId(char* id, size_t max_size) const {
  216. // All fid has the same db_id prefix, so we need to ignore db_id prefix
  217. size_t s = std::min(max_size, _fid.size());
  218. strncpy(id, _fid.c_str() + (_fid.size() - s), s);
  219. id[s - 1] = '\0';
  220. return s;
  221. };
  222. //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
  223. void Hint(AccessPattern pattern) {
  224. /* Do nothing */
  225. }
  226. /**
  227. * @brief noop
  228. * @details [long description]
  229. *
  230. * @param offset [description]
  231. * @param length [description]
  232. *
  233. * @return [description]
  234. */
  235. Status InvalidateCache(size_t offset, size_t length) {
  236. return Status::OK();
  237. }
  238. };
  239. // A file abstraction for sequential writing. The implementation
  240. // must provide buffering since callers may append small fragments
  241. // at a time to the file.
  242. class LibradosWritableFile : public WritableFile {
  243. librados::IoCtx * _io_ctx;
  244. std::string _fid;
  245. std::string _hint;
  246. const EnvLibrados * const _env;
  247. std::mutex _mutex; // used to protect modification of all following variables
  248. librados::bufferlist _buffer; // write buffer
  249. uint64_t _buffer_size; // write buffer size
  250. uint64_t _file_size; // this file size doesn't include buffer size
  251. /**
  252. * @brief assuming caller holds lock
  253. * @details [long description]
  254. * @return [description]
  255. */
  256. int _SyncLocked() {
  257. // 1. sync append data to RADOS
  258. int r = _io_ctx->append(_fid, _buffer, _buffer_size);
  259. assert(r >= 0);
  260. // 2. update local variables
  261. if (0 == r) {
  262. _buffer.clear();
  263. _file_size += _buffer_size;
  264. _buffer_size = 0;
  265. }
  266. return r;
  267. }
  268. public:
  269. LibradosWritableFile(librados::IoCtx* io_ctx, std::string fid,
  270. std::string hint, const EnvLibrados* const env,
  271. const EnvOptions& options)
  272. : WritableFile(options),
  273. _io_ctx(io_ctx),
  274. _fid(fid),
  275. _hint(hint),
  276. _env(env),
  277. _buffer(),
  278. _buffer_size(0),
  279. _file_size(0) {
  280. int ret = _io_ctx->stat(_fid, &_file_size, nullptr);
  281. // if file not exist
  282. if (ret < 0) {
  283. _file_size = 0;
  284. }
  285. }
  286. ~LibradosWritableFile() {
  287. // sync before closeing writable file
  288. Sync();
  289. }
  290. /**
  291. * @brief append data to file
  292. * @details
  293. * Append will save all written data in buffer util buffer size
  294. * reaches buffer max size. Then, it will write buffer into rados
  295. *
  296. * @param data [description]
  297. * @return [description]
  298. */
  299. Status Append(const Slice& data) {
  300. // append buffer
  301. LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
  302. int r = 0;
  303. std::lock_guard<std::mutex> lock(_mutex);
  304. _buffer.append(data.data(), data.size());
  305. _buffer_size += data.size();
  306. if (_buffer_size > _env->_write_buffer_size) {
  307. r = _SyncLocked();
  308. }
  309. LOG_DEBUG("[OUT] %i\n", r);
  310. return err_to_status(r);
  311. }
  312. /**
  313. * @brief not supported
  314. * @details [long description]
  315. * @return [description]
  316. */
  317. Status PositionedAppend(
  318. const Slice& /* data */,
  319. uint64_t /* offset */) {
  320. return Status::NotSupported();
  321. }
  322. /**
  323. * @brief truncate file to assigned size
  324. * @details [long description]
  325. *
  326. * @param size [description]
  327. * @return [description]
  328. */
  329. Status Truncate(uint64_t size) {
  330. LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
  331. int r = 0;
  332. std::lock_guard<std::mutex> lock(_mutex);
  333. if (_file_size > size) {
  334. r = _io_ctx->trunc(_fid, size);
  335. if (r == 0) {
  336. _buffer.clear();
  337. _buffer_size = 0;
  338. _file_size = size;
  339. }
  340. } else if (_file_size == size) {
  341. _buffer.clear();
  342. _buffer_size = 0;
  343. } else {
  344. librados::bufferlist tmp;
  345. tmp.claim(_buffer);
  346. _buffer.substr_of(tmp, 0, size - _file_size);
  347. _buffer_size = size - _file_size;
  348. }
  349. LOG_DEBUG("[OUT] %i\n", r);
  350. return err_to_status(r);
  351. }
  352. /**
  353. * @brief close file
  354. * @details [long description]
  355. * @return [description]
  356. */
  357. Status Close() {
  358. LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
  359. return Sync();
  360. }
  361. /**
  362. * @brief flush file,
  363. * @details initiate an aio write and not wait
  364. *
  365. * @return [description]
  366. */
  367. Status Flush() {
  368. librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
  369. int r = 0;
  370. std::lock_guard<std::mutex> lock(_mutex);
  371. r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size);
  372. if (0 == r) {
  373. _file_size += _buffer_size;
  374. _buffer.clear();
  375. _buffer_size = 0;
  376. }
  377. write_completion->release();
  378. return err_to_status(r);
  379. }
  380. /**
  381. * @brief write buffer data to rados
  382. * @details initiate an aio write and wait for result
  383. * @return [description]
  384. */
  385. Status Sync() { // sync data
  386. int r = 0;
  387. std::lock_guard<std::mutex> lock(_mutex);
  388. if (_buffer_size > 0) {
  389. r = _SyncLocked();
  390. }
  391. return err_to_status(r);
  392. }
  393. /**
  394. * @brief [brief description]
  395. * @details [long description]
  396. * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
  397. */
  398. bool IsSyncThreadSafe() const {
  399. return true;
  400. }
  401. /**
  402. * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
  403. * @details [long description]
  404. * @return [description]
  405. */
  406. bool use_direct_io() const {
  407. return false;
  408. }
  409. /**
  410. * @brief Get file size
  411. * @details
  412. * This API will use cached file_size.
  413. * @return [description]
  414. */
  415. uint64_t GetFileSize() {
  416. LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);
  417. std::lock_guard<std::mutex> lock(_mutex);
  418. int file_size = _file_size + _buffer_size;
  419. return file_size;
  420. }
  421. /**
  422. * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
  423. * @details [long description]
  424. *
  425. * @param id [description]
  426. * @param max_size [description]
  427. *
  428. * @return [description]
  429. */
  430. size_t GetUniqueId(char* id, size_t max_size) const {
  431. // All fid has the same db_id prefix, so we need to ignore db_id prefix
  432. size_t s = std::min(max_size, _fid.size());
  433. strncpy(id, _fid.c_str() + (_fid.size() - s), s);
  434. id[s - 1] = '\0';
  435. return s;
  436. }
  437. /**
  438. * @brief noop
  439. * @details [long description]
  440. *
  441. * @param offset [description]
  442. * @param length [description]
  443. *
  444. * @return [description]
  445. */
  446. Status InvalidateCache(size_t offset, size_t length) {
  447. return Status::OK();
  448. }
  449. using WritableFile::RangeSync;
  450. /**
  451. * @brief No RangeSync support, just call Sync()
  452. * @details [long description]
  453. *
  454. * @param offset [description]
  455. * @param nbytes [description]
  456. *
  457. * @return [description]
  458. */
  459. Status RangeSync(off_t offset, off_t nbytes) {
  460. return Sync();
  461. }
  462. protected:
  463. using WritableFile::Allocate;
  464. /**
  465. * @brief noop
  466. * @details [long description]
  467. *
  468. * @param offset [description]
  469. * @param len [description]
  470. *
  471. * @return [description]
  472. */
  473. Status Allocate(off_t offset, off_t len) {
  474. return Status::OK();
  475. }
  476. };
  477. // Directory object represents collection of files and implements
  478. // filesystem operations that can be executed on directories.
  479. class LibradosDirectory : public Directory {
  480. librados::IoCtx * _io_ctx;
  481. std::string _fid;
  482. public:
  483. explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid):
  484. _io_ctx(io_ctx), _fid(fid) {}
  485. // Fsync directory. Can be called concurrently from multiple threads.
  486. Status Fsync() {
  487. return Status::OK();
  488. }
  489. };
  490. // Identifies a locked file.
  491. // This is exclusive lock and can't nested lock by same thread
  492. class LibradosFileLock : public FileLock {
  493. librados::IoCtx * _io_ctx;
  494. const std::string _obj_name;
  495. const std::string _lock_name;
  496. const std::string _cookie;
  497. int lock_state;
  498. public:
  499. LibradosFileLock(
  500. librados::IoCtx * io_ctx,
  501. const std::string obj_name):
  502. _io_ctx(io_ctx),
  503. _obj_name(obj_name),
  504. _lock_name("lock_name"),
  505. _cookie("cookie") {
  506. // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
  507. while (!_io_ctx->lock_exclusive(
  508. _obj_name,
  509. _lock_name,
  510. _cookie,
  511. "description", nullptr, 0));
  512. }
  513. ~LibradosFileLock() {
  514. _io_ctx->unlock(_obj_name, _lock_name, _cookie);
  515. }
  516. };
  517. // --------------------
  518. // --- EnvLibrados ----
  519. // --------------------
  520. /**
  521. * @brief EnvLibrados ctor
  522. * @details [long description]
  523. *
  524. * @param db_name unique database name
  525. * @param config_path the configure file path for rados
  526. */
  527. EnvLibrados::EnvLibrados(const std::string& db_name,
  528. const std::string& config_path,
  529. const std::string& db_pool)
  530. : EnvLibrados("client.admin",
  531. "ceph",
  532. 0,
  533. db_name,
  534. config_path,
  535. db_pool,
  536. "/wal",
  537. db_pool,
  538. 1 << 20) {}
  539. /**
  540. * @brief EnvLibrados ctor
  541. * @details [long description]
  542. *
  543. * @param client_name first 3 parameters is for RADOS client init
  544. * @param cluster_name
  545. * @param flags
  546. * @param db_name unique database name, used as db_id key
  547. * @param config_path the configure file path for rados
  548. * @param db_pool the pool for db data
  549. * @param wal_pool the pool for WAL data
  550. * @param write_buffer_size WritableFile buffer max size
  551. */
  552. EnvLibrados::EnvLibrados(const std::string& client_name,
  553. const std::string& cluster_name,
  554. const uint64_t flags,
  555. const std::string& db_name,
  556. const std::string& config_path,
  557. const std::string& db_pool,
  558. const std::string& wal_dir,
  559. const std::string& wal_pool,
  560. const uint64_t write_buffer_size)
  561. : EnvWrapper(Env::Default()),
  562. _client_name(client_name),
  563. _cluster_name(cluster_name),
  564. _flags(flags),
  565. _db_name(db_name),
  566. _config_path(config_path),
  567. _db_pool_name(db_pool),
  568. _wal_dir(wal_dir),
  569. _wal_pool_name(wal_pool),
  570. _write_buffer_size(write_buffer_size) {
  571. int ret = 0;
  572. // 1. create a Rados object and initialize it
  573. ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring
  574. if (ret < 0) { // let's handle any error that might have come back
  575. std::cerr << "couldn't initialize rados! error " << ret << std::endl;
  576. ret = EXIT_FAILURE;
  577. goto out;
  578. }
  579. // 2. read configure file
  580. ret = _rados.conf_read_file(_config_path.c_str());
  581. if (ret < 0) {
  582. // This could fail if the config file is malformed, but it'd be hard.
  583. std::cerr << "failed to parse config file " << _config_path
  584. << "! error" << ret << std::endl;
  585. ret = EXIT_FAILURE;
  586. goto out;
  587. }
  588. // 3. we actually connect to the cluster
  589. ret = _rados.connect();
  590. if (ret < 0) {
  591. std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
  592. ret = EXIT_FAILURE;
  593. goto out;
  594. }
  595. // 4. create db_pool if not exist
  596. ret = _rados.pool_create(_db_pool_name.c_str());
  597. if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
  598. std::cerr << "couldn't create pool! error " << ret << std::endl;
  599. goto out;
  600. }
  601. // 5. create db_pool_ioctx
  602. ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx);
  603. if (ret < 0) {
  604. std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
  605. ret = EXIT_FAILURE;
  606. goto out;
  607. }
  608. // 6. create wal_pool if not exist
  609. ret = _rados.pool_create(_wal_pool_name.c_str());
  610. if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
  611. std::cerr << "couldn't create pool! error " << ret << std::endl;
  612. goto out;
  613. }
  614. // 7. create wal_pool_ioctx
  615. ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx);
  616. if (ret < 0) {
  617. std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
  618. ret = EXIT_FAILURE;
  619. goto out;
  620. }
  621. // 8. add root dir
  622. _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE);
  623. out:
  624. LOG_DEBUG("rados connect result code : %i\n", ret);
  625. }
  626. /****************************************************
  627. private functions to handle fid operation.
  628. Dir also have fid, but the value is DIR_ID_VALUE
  629. ****************************************************/
  630. /**
  631. * @brief generate a new fid
  632. * @details [long description]
  633. * @return [description]
  634. */
  635. std::string EnvLibrados::_CreateFid() {
  636. return _db_name + "." + GenerateUniqueId();
  637. }
  638. /**
  639. * @brief get fid
  640. * @details [long description]
  641. *
  642. * @param fname [description]
  643. * @param fid [description]
  644. *
  645. * @return
  646. * Status::OK()
  647. * Status::NotFound()
  648. */
  649. Status EnvLibrados::_GetFid(
  650. const std::string &fname,
  651. std::string& fid) {
  652. std::set<std::string> keys;
  653. std::map<std::string, librados::bufferlist> kvs;
  654. keys.insert(fname);
  655. int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs);
  656. if (0 == r && 0 == kvs.size()) {
  657. return Status::NotFound();
  658. } else if (0 == r && 0 != kvs.size()) {
  659. fid.assign(kvs[fname].c_str(), kvs[fname].length());
  660. return Status::OK();
  661. } else {
  662. return err_to_status(r);
  663. }
  664. }
  665. /**
  666. * @brief rename fid
  667. * @details Only modify object in rados once,
  668. * so this rename operation is atomic in term of rados
  669. *
  670. * @param old_fname [description]
  671. * @param new_fname [description]
  672. *
  673. * @return [description]
  674. */
  675. Status EnvLibrados::_RenameFid(const std::string& old_fname,
  676. const std::string& new_fname) {
  677. std::string fid;
  678. Status s = _GetFid(old_fname, fid);
  679. if (Status::OK() != s) {
  680. return s;
  681. }
  682. librados::bufferlist bl;
  683. std::set<std::string> keys;
  684. std::map<std::string, librados::bufferlist> kvs;
  685. librados::ObjectWriteOperation o;
  686. bl.append(fid);
  687. keys.insert(old_fname);
  688. kvs[new_fname] = bl;
  689. o.omap_rm_keys(keys);
  690. o.omap_set(kvs);
  691. int r = _db_pool_ioctx.operate(_db_name, &o);
  692. return err_to_status(r);
  693. }
  694. /**
  695. * @brief add <file path, fid> to metadata object. It may overwrite exist key.
  696. * @details [long description]
  697. *
  698. * @param fname [description]
  699. * @param fid [description]
  700. *
  701. * @return [description]
  702. */
  703. Status EnvLibrados::_AddFid(
  704. const std::string& fname,
  705. const std::string& fid) {
  706. std::map<std::string, librados::bufferlist> kvs;
  707. librados::bufferlist value;
  708. value.append(fid);
  709. kvs[fname] = value;
  710. int r = _db_pool_ioctx.omap_set(_db_name, kvs);
  711. return err_to_status(r);
  712. }
  713. /**
  714. * @brief return subfile names of dir.
  715. * @details
  716. * RocksDB has a 2-level structure, so all keys
  717. * that have dir as prefix are subfiles of dir.
  718. * So we can just return these files' name.
  719. *
  720. * @param dir [description]
  721. * @param result [description]
  722. *
  723. * @return [description]
  724. */
  725. Status EnvLibrados::_GetSubFnames(
  726. const std::string& dir,
  727. std::vector<std::string> * result
  728. ) {
  729. std::string start_after(dir);
  730. std::string filter_prefix(dir);
  731. std::map<std::string, librados::bufferlist> kvs;
  732. _db_pool_ioctx.omap_get_vals(_db_name,
  733. start_after, filter_prefix,
  734. MAX_ITEMS_IN_FS, &kvs);
  735. result->clear();
  736. for (auto i = kvs.begin(); i != kvs.end(); i++) {
  737. result->push_back(i->first.substr(dir.size() + 1));
  738. }
  739. return Status::OK();
  740. }
  741. /**
  742. * @brief delete key fname from metadata object
  743. * @details [long description]
  744. *
  745. * @param fname [description]
  746. * @return [description]
  747. */
  748. Status EnvLibrados::_DelFid(
  749. const std::string& fname) {
  750. std::set<std::string> keys;
  751. keys.insert(fname);
  752. int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys);
  753. return err_to_status(r);
  754. }
  755. /**
  756. * @brief get match IoCtx from _prefix_pool_map
  757. * @details [long description]
  758. *
  759. * @param prefix [description]
  760. * @return [description]
  761. *
  762. */
  763. librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
  764. auto is_prefix = [](const std::string & s1, const std::string & s2) {
  765. auto it1 = s1.begin(), it2 = s2.begin();
  766. while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2;
  767. return it1 == s1.end();
  768. };
  769. if (is_prefix(_wal_dir, fpath)) {
  770. return &_wal_pool_ioctx;
  771. } else {
  772. return &_db_pool_ioctx;
  773. }
  774. }
  775. /************************************************************
  776. public functions
  777. ************************************************************/
  778. /**
  779. * @brief generate unique id
  780. * @details Combine system time and random number.
  781. * @return [description]
  782. */
  783. std::string EnvLibrados::GenerateUniqueId() {
  784. Random64 r(time(nullptr));
  785. uint64_t random_uuid_portion =
  786. r.Uniform(std::numeric_limits<uint64_t>::max());
  787. uint64_t nanos_uuid_portion = NowNanos();
  788. char uuid2[200];
  789. snprintf(uuid2,
  790. 200,
  791. "%16lx-%16lx",
  792. (unsigned long)nanos_uuid_portion,
  793. (unsigned long)random_uuid_portion);
  794. return uuid2;
  795. }
  796. /**
  797. * @brief create a new sequential read file handler
  798. * @details it will check the existence of fname
  799. *
  800. * @param fname [description]
  801. * @param result [description]
  802. * @param options [description]
  803. * @return [description]
  804. */
  805. Status EnvLibrados::NewSequentialFile(
  806. const std::string& fname,
  807. std::unique_ptr<SequentialFile>* result,
  808. const EnvOptions& options)
  809. {
  810. LOG_DEBUG("[IN]%s\n", fname.c_str());
  811. std::string dir, file, fid;
  812. split(fname, &dir, &file);
  813. Status s;
  814. std::string fpath = dir + "/" + file;
  815. do {
  816. s = _GetFid(dir, fid);
  817. if (!s.ok() || fid != DIR_ID_VALUE) {
  818. if (fid != DIR_ID_VALUE) s = Status::IOError();
  819. break;
  820. }
  821. s = _GetFid(fpath, fid);
  822. if (Status::NotFound() == s) {
  823. s = Status::IOError();
  824. errno = ENOENT;
  825. break;
  826. }
  827. result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath));
  828. s = Status::OK();
  829. } while (0);
  830. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  831. return s;
  832. }
  833. /**
  834. * @brief create a new random access file handler
  835. * @details it will check the existence of fname
  836. *
  837. * @param fname [description]
  838. * @param result [description]
  839. * @param options [description]
  840. * @return [description]
  841. */
  842. Status EnvLibrados::NewRandomAccessFile(
  843. const std::string& fname,
  844. std::unique_ptr<RandomAccessFile>* result,
  845. const EnvOptions& options)
  846. {
  847. LOG_DEBUG("[IN]%s\n", fname.c_str());
  848. std::string dir, file, fid;
  849. split(fname, &dir, &file);
  850. Status s;
  851. std::string fpath = dir + "/" + file;
  852. do {
  853. s = _GetFid(dir, fid);
  854. if (!s.ok() || fid != DIR_ID_VALUE) {
  855. s = Status::IOError();
  856. break;
  857. }
  858. s = _GetFid(fpath, fid);
  859. if (Status::NotFound() == s) {
  860. s = Status::IOError();
  861. errno = ENOENT;
  862. break;
  863. }
  864. result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath));
  865. s = Status::OK();
  866. } while (0);
  867. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  868. return s;
  869. }
  870. /**
  871. * @brief create a new write file handler
  872. * @details it will check the existence of fname
  873. *
  874. * @param fname [description]
  875. * @param result [description]
  876. * @param options [description]
  877. * @return [description]
  878. */
  879. Status EnvLibrados::NewWritableFile(
  880. const std::string& fname,
  881. std::unique_ptr<WritableFile>* result,
  882. const EnvOptions& options)
  883. {
  884. LOG_DEBUG("[IN]%s\n", fname.c_str());
  885. std::string dir, file, fid;
  886. split(fname, &dir, &file);
  887. Status s;
  888. std::string fpath = dir + "/" + file;
  889. do {
  890. // 1. check if dir exist
  891. s = _GetFid(dir, fid);
  892. if (!s.ok()) {
  893. break;
  894. }
  895. if (fid != DIR_ID_VALUE) {
  896. s = Status::IOError();
  897. break;
  898. }
  899. // 2. check if file exist.
  900. // 2.1 exist, use it
  901. // 2.2 not exist, create it
  902. s = _GetFid(fpath, fid);
  903. if (Status::NotFound() == s) {
  904. fid = _CreateFid();
  905. _AddFid(fpath, fid);
  906. }
  907. result->reset(
  908. new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this, options));
  909. s = Status::OK();
  910. } while (0);
  911. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  912. return s;
  913. }
  914. /**
  915. * @brief reuse write file handler
  916. * @details
  917. * This function will rename old_fname to new_fname,
  918. * then return the handler of new_fname
  919. *
  920. * @param new_fname [description]
  921. * @param old_fname [description]
  922. * @param result [description]
  923. * @param options [description]
  924. * @return [description]
  925. */
  926. Status EnvLibrados::ReuseWritableFile(
  927. const std::string& new_fname,
  928. const std::string& old_fname,
  929. std::unique_ptr<WritableFile>* result,
  930. const EnvOptions& options)
  931. {
  932. LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str());
  933. std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
  934. split(old_fname, &src_dir, &src_file);
  935. split(new_fname, &dst_dir, &dst_file);
  936. std::string src_fpath = src_dir + "/" + src_file;
  937. std::string dst_fpath = dst_dir + "/" + dst_file;
  938. Status r = Status::OK();
  939. do {
  940. r = _RenameFid(src_fpath,
  941. dst_fpath);
  942. if (!r.ok()) {
  943. break;
  944. }
  945. result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid,
  946. dst_fpath, this, options));
  947. } while (0);
  948. LOG_DEBUG("[OUT]%s\n", r.ToString().c_str());
  949. return r;
  950. }
  951. /**
  952. * @brief create a new directory handler
  953. * @details [long description]
  954. *
  955. * @param name [description]
  956. * @param result [description]
  957. *
  958. * @return [description]
  959. */
  960. Status EnvLibrados::NewDirectory(
  961. const std::string& name,
  962. std::unique_ptr<Directory>* result)
  963. {
  964. LOG_DEBUG("[IN]%s\n", name.c_str());
  965. std::string fid, dir, file;
  966. /* just want to get dir name */
  967. split(name + "/tmp", &dir, &file);
  968. Status s;
  969. do {
  970. s = _GetFid(dir, fid);
  971. if (!s.ok() || DIR_ID_VALUE != fid) {
  972. s = Status::IOError(name, strerror(-ENOENT));
  973. break;
  974. }
  975. if (Status::NotFound() == s) {
  976. s = _AddFid(dir, DIR_ID_VALUE);
  977. if (!s.ok()) break;
  978. } else if (!s.ok()) {
  979. break;
  980. }
  981. result->reset(new LibradosDirectory(_GetIoctx(dir), dir));
  982. s = Status::OK();
  983. } while (0);
  984. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  985. return s;
  986. }
  987. /**
  988. * @brief check if fname is exist
  989. * @details [long description]
  990. *
  991. * @param fname [description]
  992. * @return [description]
  993. */
  994. Status EnvLibrados::FileExists(const std::string& fname)
  995. {
  996. LOG_DEBUG("[IN]%s\n", fname.c_str());
  997. std::string fid, dir, file;
  998. split(fname, &dir, &file);
  999. Status s = _GetFid(dir + "/" + file, fid);
  1000. if (s.ok() && fid != DIR_ID_VALUE) {
  1001. s = Status::OK();
  1002. }
  1003. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1004. return s;
  1005. }
  1006. /**
  1007. * @brief get subfile name of dir_in
  1008. * @details [long description]
  1009. *
  1010. * @param dir_in [description]
  1011. * @param result [description]
  1012. *
  1013. * @return [description]
  1014. */
  1015. Status EnvLibrados::GetChildren(
  1016. const std::string& dir_in,
  1017. std::vector<std::string>* result)
  1018. {
  1019. LOG_DEBUG("[IN]%s\n", dir_in.c_str());
  1020. std::string fid, dir, file;
  1021. split(dir_in + "/temp", &dir, &file);
  1022. Status s;
  1023. do {
  1024. s = _GetFid(dir, fid);
  1025. if (!s.ok()) {
  1026. break;
  1027. }
  1028. if (fid != DIR_ID_VALUE) {
  1029. s = Status::IOError();
  1030. break;
  1031. }
  1032. s = _GetSubFnames(dir, result);
  1033. } while (0);
  1034. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1035. return s;
  1036. }
  1037. /**
  1038. * @brief delete fname
  1039. * @details [long description]
  1040. *
  1041. * @param fname [description]
  1042. * @return [description]
  1043. */
  1044. Status EnvLibrados::DeleteFile(const std::string& fname)
  1045. {
  1046. LOG_DEBUG("[IN]%s\n", fname.c_str());
  1047. std::string fid, dir, file;
  1048. split(fname, &dir, &file);
  1049. Status s = _GetFid(dir + "/" + file, fid);
  1050. if (s.ok() && DIR_ID_VALUE != fid) {
  1051. s = _DelFid(dir + "/" + file);
  1052. } else {
  1053. s = Status::NotFound();
  1054. }
  1055. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1056. return s;
  1057. }
  1058. /**
  1059. * @brief create new dir
  1060. * @details [long description]
  1061. *
  1062. * @param dirname [description]
  1063. * @return [description]
  1064. */
  1065. Status EnvLibrados::CreateDir(const std::string& dirname)
  1066. {
  1067. LOG_DEBUG("[IN]%s\n", dirname.c_str());
  1068. std::string fid, dir, file;
  1069. split(dirname + "/temp", &dir, &file);
  1070. Status s = _GetFid(dir + "/" + file, fid);
  1071. do {
  1072. if (Status::NotFound() != s && fid != DIR_ID_VALUE) {
  1073. break;
  1074. } else if (Status::OK() == s && fid == DIR_ID_VALUE) {
  1075. break;
  1076. }
  1077. s = _AddFid(dir, DIR_ID_VALUE);
  1078. } while (0);
  1079. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1080. return s;
  1081. }
  1082. /**
  1083. * @brief create dir if missing
  1084. * @details [long description]
  1085. *
  1086. * @param dirname [description]
  1087. * @return [description]
  1088. */
  1089. Status EnvLibrados::CreateDirIfMissing(const std::string& dirname)
  1090. {
  1091. LOG_DEBUG("[IN]%s\n", dirname.c_str());
  1092. std::string fid, dir, file;
  1093. split(dirname + "/temp", &dir, &file);
  1094. Status s = Status::OK();
  1095. do {
  1096. s = _GetFid(dir, fid);
  1097. if (Status::NotFound() != s) {
  1098. break;
  1099. }
  1100. s = _AddFid(dir, DIR_ID_VALUE);
  1101. } while (0);
  1102. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1103. return s;
  1104. }
  1105. /**
  1106. * @brief delete dir
  1107. * @details
  1108. *
  1109. * @param dirname [description]
  1110. * @return [description]
  1111. */
  1112. Status EnvLibrados::DeleteDir(const std::string& dirname)
  1113. {
  1114. LOG_DEBUG("[IN]%s\n", dirname.c_str());
  1115. std::string fid, dir, file;
  1116. split(dirname + "/temp", &dir, &file);
  1117. Status s = Status::OK();
  1118. s = _GetFid(dir, fid);
  1119. if (s.ok() && DIR_ID_VALUE == fid) {
  1120. std::vector<std::string> subs;
  1121. s = _GetSubFnames(dir, &subs);
  1122. // if subfiles exist, can't delete dir
  1123. if (subs.size() > 0) {
  1124. s = Status::IOError();
  1125. } else {
  1126. s = _DelFid(dir);
  1127. }
  1128. } else {
  1129. s = Status::NotFound();
  1130. }
  1131. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1132. return s;
  1133. }
  1134. /**
  1135. * @brief return file size
  1136. * @details [long description]
  1137. *
  1138. * @param fname [description]
  1139. * @param file_size [description]
  1140. *
  1141. * @return [description]
  1142. */
  1143. Status EnvLibrados::GetFileSize(
  1144. const std::string& fname,
  1145. uint64_t* file_size)
  1146. {
  1147. LOG_DEBUG("[IN]%s\n", fname.c_str());
  1148. std::string fid, dir, file;
  1149. split(fname, &dir, &file);
  1150. time_t mtime;
  1151. Status s;
  1152. do {
  1153. std::string fpath = dir + "/" + file;
  1154. s = _GetFid(fpath, fid);
  1155. if (!s.ok()) {
  1156. break;
  1157. }
  1158. int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime);
  1159. if (ret < 0) {
  1160. LOG_DEBUG("%i\n", ret);
  1161. if (-ENOENT == ret) {
  1162. *file_size = 0;
  1163. s = Status::OK();
  1164. } else {
  1165. s = err_to_status(ret);
  1166. }
  1167. } else {
  1168. s = Status::OK();
  1169. }
  1170. } while (0);
  1171. LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size);
  1172. return s;
  1173. }
  1174. /**
  1175. * @brief get file modification time
  1176. * @details [long description]
  1177. *
  1178. * @param fname [description]
  1179. * @param file_mtime [description]
  1180. *
  1181. * @return [description]
  1182. */
  1183. Status EnvLibrados::GetFileModificationTime(const std::string& fname,
  1184. uint64_t* file_mtime)
  1185. {
  1186. LOG_DEBUG("[IN]%s\n", fname.c_str());
  1187. std::string fid, dir, file;
  1188. split(fname, &dir, &file);
  1189. time_t mtime;
  1190. uint64_t file_size;
  1191. Status s = Status::OK();
  1192. do {
  1193. std::string fpath = dir + "/" + file;
  1194. s = _GetFid(dir + "/" + file, fid);
  1195. if (!s.ok()) {
  1196. break;
  1197. }
  1198. int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime);
  1199. if (ret < 0) {
  1200. if (Status::NotFound() == err_to_status(ret)) {
  1201. *file_mtime = static_cast<uint64_t>(mtime);
  1202. s = Status::OK();
  1203. } else {
  1204. s = err_to_status(ret);
  1205. }
  1206. } else {
  1207. s = Status::OK();
  1208. }
  1209. } while (0);
  1210. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1211. return s;
  1212. }
  1213. /**
  1214. * @brief rename file
  1215. * @details
  1216. *
  1217. * @param src [description]
  1218. * @param target_in [description]
  1219. *
  1220. * @return [description]
  1221. */
  1222. Status EnvLibrados::RenameFile(
  1223. const std::string& src,
  1224. const std::string& target_in)
  1225. {
  1226. LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str());
  1227. std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
  1228. split(src, &src_dir, &src_file);
  1229. split(target_in, &dst_dir, &dst_file);
  1230. auto s = _RenameFid(src_dir + "/" + src_file,
  1231. dst_dir + "/" + dst_file);
  1232. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1233. return s;
  1234. }
  1235. /**
  1236. * @brief not support
  1237. * @details [long description]
  1238. *
  1239. * @param src [description]
  1240. * @param target_in [description]
  1241. *
  1242. * @return [description]
  1243. */
  1244. Status EnvLibrados::LinkFile(
  1245. const std::string& src,
  1246. const std::string& target_in)
  1247. {
  1248. LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
  1249. return Status::NotSupported();
  1250. }
  1251. /**
  1252. * @brief lock file. create if missing.
  1253. * @details [long description]
  1254. *
  1255. * It seems that LockFile is used for preventing other instance of RocksDB
  1256. * from opening up the database at the same time. From RocksDB source code,
  1257. * the invokes of LockFile are at following locations:
  1258. *
  1259. * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover
  1260. * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB
  1261. *
  1262. * When db recovery and db destroy, RocksDB will call LockFile
  1263. *
  1264. * @param fname [description]
  1265. * @param lock [description]
  1266. *
  1267. * @return [description]
  1268. */
  1269. Status EnvLibrados::LockFile(
  1270. const std::string& fname,
  1271. FileLock** lock)
  1272. {
  1273. LOG_DEBUG("[IN]%s\n", fname.c_str());
  1274. std::string fid, dir, file;
  1275. split(fname, &dir, &file);
  1276. Status s = Status::OK();
  1277. do {
  1278. std::string fpath = dir + "/" + file;
  1279. s = _GetFid(fpath, fid);
  1280. if (Status::OK() != s &&
  1281. Status::NotFound() != s) {
  1282. break;
  1283. } else if (Status::NotFound() == s) {
  1284. s = _AddFid(fpath, _CreateFid());
  1285. if (!s.ok()) {
  1286. break;
  1287. }
  1288. } else if (Status::OK() == s && DIR_ID_VALUE == fid) {
  1289. s = Status::IOError();
  1290. break;
  1291. }
  1292. *lock = new LibradosFileLock(_GetIoctx(fpath), fpath);
  1293. } while (0);
  1294. LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
  1295. return s;
  1296. }
  1297. /**
  1298. * @brief unlock file
  1299. * @details [long description]
  1300. *
  1301. * @param lock [description]
  1302. * @return [description]
  1303. */
  1304. Status EnvLibrados::UnlockFile(FileLock* lock)
  1305. {
  1306. LOG_DEBUG("[IO]%p\n", lock);
  1307. if (nullptr != lock) {
  1308. delete lock;
  1309. }
  1310. return Status::OK();
  1311. }
  1312. /**
  1313. * @brief not support
  1314. * @details [long description]
  1315. *
  1316. * @param db_path [description]
  1317. * @param output_path [description]
  1318. *
  1319. * @return [description]
  1320. */
  1321. Status EnvLibrados::GetAbsolutePath(
  1322. const std::string& db_path,
  1323. std::string* output_path)
  1324. {
  1325. LOG_DEBUG("[IO]%s\n", db_path.c_str());
  1326. return Status::NotSupported();
  1327. }
  1328. /**
  1329. * @brief Get default EnvLibrados
  1330. * @details [long description]
  1331. * @return [description]
  1332. */
  1333. EnvLibrados* EnvLibrados::Default() {
  1334. static EnvLibrados default_env(default_db_name,
  1335. std::getenv(default_config_path),
  1336. default_pool_name);
  1337. return &default_env;
  1338. }
  1339. } // namespace ROCKSDB_NAMESPACE