multi_processes_example.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. // How to use this example
  6. // Open two terminals, in one of them, run `./multi_processes_example 0` to
  7. // start a process running the primary instance. This will create a new DB in
  8. // kDBPath. The process will run for a while inserting keys to the normal
  9. // RocksDB database.
  10. // Next, go to the other terminal and run `./multi_processes_example 1` to
  11. // start a process running the secondary instance. This will create a secondary
  12. // instance following the aforementioned primary instance. This process will
  13. // run for a while, tailing the logs of the primary. After process with primary
  14. // instance exits, this process will keep running until you hit 'CTRL+C'.
  15. #include <chrono>
  16. #include <cinttypes>
  17. #include <cstdio>
  18. #include <cstdlib>
  19. #include <ctime>
  20. #include <string>
  21. #include <thread>
  22. #include <vector>
  23. // TODO: port this example to other systems. It should be straightforward for
  24. // POSIX-compliant systems.
  25. #if defined(OS_LINUX)
  26. #include <dirent.h>
  27. #include <signal.h>
  28. #include <sys/stat.h>
  29. #include <sys/types.h>
  30. #include <sys/wait.h>
  31. #include <unistd.h>
  32. #include "rocksdb/db.h"
  33. #include "rocksdb/options.h"
  34. #include "rocksdb/slice.h"
  35. using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
  36. using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
  37. using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
  38. using ROCKSDB_NAMESPACE::DB;
  39. using ROCKSDB_NAMESPACE::FlushOptions;
  40. using ROCKSDB_NAMESPACE::Iterator;
  41. using ROCKSDB_NAMESPACE::Options;
  42. using ROCKSDB_NAMESPACE::ReadOptions;
  43. using ROCKSDB_NAMESPACE::Slice;
  44. using ROCKSDB_NAMESPACE::Status;
  45. using ROCKSDB_NAMESPACE::WriteOptions;
  46. const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
  47. const std::string kPrimaryStatusFile =
  48. "/tmp/rocksdb_multi_processes_example_primary_status";
  49. const uint64_t kMaxKey = 600000;
  50. const size_t kMaxValueLength = 256;
  51. const size_t kNumKeysPerFlush = 1000;
  52. const std::vector<std::string>& GetColumnFamilyNames() {
  53. static std::vector<std::string> column_family_names = {
  54. ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"};
  55. return column_family_names;
  56. }
  57. inline bool IsLittleEndian() {
  58. uint32_t x = 1;
  59. return *static_cast<char*>(&x) != 0;
  60. }
  61. static std::atomic<int>& ShouldSecondaryWait() {
  62. static std::atomic<int> should_secondary_wait{1};
  63. return should_secondary_wait;
  64. }
  65. static std::string Key(uint64_t k) {
  66. std::string ret;
  67. if (IsLittleEndian()) {
  68. ret.append(static_cast<char*>(&k), sizeof(k));
  69. } else {
  70. char buf[sizeof(k)];
  71. buf[0] = k & 0xff;
  72. buf[1] = (k >> 8) & 0xff;
  73. buf[2] = (k >> 16) & 0xff;
  74. buf[3] = (k >> 24) & 0xff;
  75. buf[4] = (k >> 32) & 0xff;
  76. buf[5] = (k >> 40) & 0xff;
  77. buf[6] = (k >> 48) & 0xff;
  78. buf[7] = (k >> 56) & 0xff;
  79. ret.append(buf, sizeof(k));
  80. }
  81. size_t i = 0, j = ret.size() - 1;
  82. while (i < j) {
  83. char tmp = ret[i];
  84. ret[i] = ret[j];
  85. ret[j] = tmp;
  86. ++i;
  87. --j;
  88. }
  89. return ret;
  90. }
  91. static uint64_t Key(std::string key) {
  92. assert(key.size() == sizeof(uint64_t));
  93. size_t i = 0, j = key.size() - 1;
  94. while (i < j) {
  95. char tmp = key[i];
  96. key[i] = key[j];
  97. key[j] = tmp;
  98. ++i;
  99. --j;
  100. }
  101. uint64_t ret = 0;
  102. if (IsLittleEndian()) {
  103. memcpy(&ret, key.c_str(), sizeof(uint64_t));
  104. } else {
  105. const char* buf = key.c_str();
  106. ret |= static_cast<uint64_t>(buf[0]);
  107. ret |= (static_cast<uint64_t>(buf[1]) << 8);
  108. ret |= (static_cast<uint64_t>(buf[2]) << 16);
  109. ret |= (static_cast<uint64_t>(buf[3]) << 24);
  110. ret |= (static_cast<uint64_t>(buf[4]) << 32);
  111. ret |= (static_cast<uint64_t>(buf[5]) << 40);
  112. ret |= (static_cast<uint64_t>(buf[6]) << 48);
  113. ret |= (static_cast<uint64_t>(buf[7]) << 56);
  114. }
  115. return ret;
  116. }
  117. static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
  118. size_t sz = 1 + (std::rand() % max_length);
  119. int rnd = std::rand();
  120. for (size_t i = 0; i != sz; ++i) {
  121. scratch[i] = static_cast<char>(rnd ^ i);
  122. }
  123. return Slice(scratch, sz);
  124. }
  125. static bool ShouldCloseDB() { return true; }
  126. void CreateDB() {
  127. long my_pid = static_cast<long>(getpid());
  128. Options options;
  129. Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
  130. if (!s.ok()) {
  131. fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
  132. s.ToString().c_str());
  133. assert(false);
  134. }
  135. options.create_if_missing = true;
  136. DB* db = nullptr;
  137. s = DB::Open(options, kDBPath, &db);
  138. if (!s.ok()) {
  139. fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
  140. s.ToString().c_str());
  141. assert(false);
  142. }
  143. std::vector<ColumnFamilyHandle*> handles;
  144. ColumnFamilyOptions cf_opts(options);
  145. for (const auto& cf_name : GetColumnFamilyNames()) {
  146. if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) {
  147. ColumnFamilyHandle* handle = nullptr;
  148. s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
  149. if (!s.ok()) {
  150. fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
  151. cf_name.c_str(), s.ToString().c_str());
  152. assert(false);
  153. }
  154. handles.push_back(handle);
  155. }
  156. }
  157. fprintf(stdout, "[process %ld] Column families created\n", my_pid);
  158. for (auto h : handles) {
  159. delete h;
  160. }
  161. handles.clear();
  162. delete db;
  163. }
  164. void RunPrimary() {
  165. long my_pid = static_cast<long>(getpid());
  166. fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
  167. CreateDB();
  168. std::srand(time(nullptr));
  169. DB* db = nullptr;
  170. Options options;
  171. options.create_if_missing = false;
  172. std::vector<ColumnFamilyDescriptor> column_families;
  173. for (const auto& cf_name : GetColumnFamilyNames()) {
  174. column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
  175. }
  176. std::vector<ColumnFamilyHandle*> handles;
  177. WriteOptions write_opts;
  178. char val_buf[kMaxValueLength] = {0};
  179. uint64_t curr_key = 0;
  180. while (curr_key < kMaxKey) {
  181. Status s;
  182. if (nullptr == db) {
  183. s = DB::Open(options, kDBPath, column_families, &handles, &db);
  184. if (!s.ok()) {
  185. fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
  186. s.ToString().c_str());
  187. assert(false);
  188. }
  189. }
  190. assert(nullptr != db);
  191. assert(handles.size() == GetColumnFamilyNames().size());
  192. for (auto h : handles) {
  193. assert(nullptr != h);
  194. for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
  195. Slice key = Key(curr_key + static_cast<uint64_t>(i));
  196. Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
  197. s = db->Put(write_opts, h, key, value);
  198. if (!s.ok()) {
  199. fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
  200. assert(false);
  201. }
  202. }
  203. s = db->Flush(FlushOptions(), h);
  204. if (!s.ok()) {
  205. fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
  206. assert(false);
  207. }
  208. }
  209. curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
  210. if (ShouldCloseDB()) {
  211. for (auto h : handles) {
  212. delete h;
  213. }
  214. handles.clear();
  215. delete db;
  216. db = nullptr;
  217. }
  218. }
  219. if (nullptr != db) {
  220. for (auto h : handles) {
  221. delete h;
  222. }
  223. handles.clear();
  224. delete db;
  225. db = nullptr;
  226. }
  227. fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
  228. }
  229. void secondary_instance_sigint_handler(int signal) {
  230. ShouldSecondaryWait().store(0, std::memory_order_relaxed);
  231. fprintf(stdout, "\n");
  232. fflush(stdout);
  233. };
  234. void RunSecondary() {
  235. ::signal(SIGINT, secondary_instance_sigint_handler);
  236. long my_pid = static_cast<long>(getpid());
  237. const std::string kSecondaryPath =
  238. "/tmp/rocksdb_multi_processes_example_secondary";
  239. // Create directory if necessary
  240. if (nullptr == opendir(kSecondaryPath.c_str())) {
  241. int ret =
  242. mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
  243. if (ret < 0) {
  244. perror("failed to create directory for secondary instance");
  245. exit(0);
  246. }
  247. }
  248. DB* db = nullptr;
  249. Options options;
  250. options.create_if_missing = false;
  251. options.max_open_files = -1;
  252. Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
  253. if (!s.ok()) {
  254. fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
  255. my_pid, s.ToString().c_str());
  256. assert(false);
  257. } else {
  258. fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
  259. }
  260. ReadOptions ropts;
  261. ropts.verify_checksums = true;
  262. ropts.total_order_seek = true;
  263. std::vector<std::thread> test_threads;
  264. test_threads.emplace_back([&]() {
  265. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  266. std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
  267. iter->SeekToFirst();
  268. size_t count = 0;
  269. for (; iter->Valid(); iter->Next()) {
  270. ++count;
  271. }
  272. }
  273. fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
  274. });
  275. test_threads.emplace_back([&]() {
  276. std::srand(time(nullptr));
  277. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  278. Slice key = Key(std::rand() % kMaxKey);
  279. std::string value;
  280. db->Get(ropts, key, &value);
  281. }
  282. fprintf(stdout, "[process %ld] Point lookup thread finished\n", my_pid);
  283. });
  284. uint64_t curr_key = 0;
  285. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  286. s = db->TryCatchUpWithPrimary();
  287. if (!s.ok()) {
  288. fprintf(stderr,
  289. "[process %ld] error while trying to catch up with "
  290. "primary %s\n",
  291. my_pid, s.ToString().c_str());
  292. assert(false);
  293. }
  294. {
  295. std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
  296. if (!iter) {
  297. fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
  298. assert(false);
  299. }
  300. iter->SeekToLast();
  301. if (iter->Valid()) {
  302. uint64_t curr_max_key = Key(iter->key().ToString());
  303. if (curr_max_key != curr_key) {
  304. fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
  305. curr_key);
  306. curr_key = curr_max_key;
  307. }
  308. }
  309. }
  310. std::this_thread::sleep_for(std::chrono::seconds(1));
  311. }
  312. s = db->TryCatchUpWithPrimary();
  313. if (!s.ok()) {
  314. fprintf(stderr,
  315. "[process %ld] error while trying to catch up with "
  316. "primary %s\n",
  317. my_pid, s.ToString().c_str());
  318. assert(false);
  319. }
  320. std::vector<ColumnFamilyDescriptor> column_families;
  321. for (const auto& cf_name : GetColumnFamilyNames()) {
  322. column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
  323. }
  324. std::vector<ColumnFamilyHandle*> handles;
  325. DB* verification_db = nullptr;
  326. s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
  327. &verification_db);
  328. assert(s.ok());
  329. Iterator* iter1 = verification_db->NewIterator(ropts);
  330. iter1->SeekToFirst();
  331. Iterator* iter = db->NewIterator(ropts);
  332. iter->SeekToFirst();
  333. for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
  334. if (iter->key().ToString() != iter1->key().ToString()) {
  335. fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
  336. Key(iter->key().ToString()), Key(iter1->key().ToString()));
  337. assert(false);
  338. } else if (iter->value().ToString() != iter1->value().ToString()) {
  339. fprintf(stderr, "Value mismatch\n");
  340. assert(false);
  341. }
  342. }
  343. fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
  344. for (auto& thr : test_threads) {
  345. thr.join();
  346. }
  347. delete iter;
  348. delete iter1;
  349. delete db;
  350. delete verification_db;
  351. }
  352. int main(int argc, char** argv) {
  353. if (argc < 2) {
  354. fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
  355. return 0;
  356. }
  357. if (atoi(argv[1]) == 0) {
  358. RunPrimary();
  359. } else {
  360. RunSecondary();
  361. }
  362. return 0;
  363. }
  364. #else // OS_LINUX
  365. int main() {
  366. fprintf(stderr, "Not implemented.\n");
  367. return 0;
  368. }
  369. #endif // !OS_LINUX