multi_processes_example.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. // How to use this example
  6. // Open two terminals, in one of them, run `./multi_processes_example 0` to
  7. // start a process running the primary instance. This will create a new DB in
  8. // kDBPath. The process will run for a while inserting keys to the normal
  9. // RocksDB database.
  10. // Next, go to the other terminal and run `./multi_processes_example 1` to
  11. // start a process running the secondary instance. This will create a secondary
  12. // instance following the aforementioned primary instance. This process will
  13. // run for a while, tailing the logs of the primary. After process with primary
  14. // instance exits, this process will keep running until you hit 'CTRL+C'.
  15. #include <chrono>
  16. #include <cinttypes>
  17. #include <cstdio>
  18. #include <cstdlib>
  19. #include <ctime>
  20. #include <string>
  21. #include <thread>
  22. #include <vector>
  23. #if defined(OS_LINUX)
  24. #include <dirent.h>
  25. #include <signal.h>
  26. #include <sys/stat.h>
  27. #include <sys/types.h>
  28. #include <sys/wait.h>
  29. #include <unistd.h>
  30. #endif // !OS_LINUX
  31. #include "rocksdb/db.h"
  32. #include "rocksdb/options.h"
  33. #include "rocksdb/slice.h"
  34. using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
  35. using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
  36. using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
  37. using ROCKSDB_NAMESPACE::DB;
  38. using ROCKSDB_NAMESPACE::FlushOptions;
  39. using ROCKSDB_NAMESPACE::Iterator;
  40. using ROCKSDB_NAMESPACE::Options;
  41. using ROCKSDB_NAMESPACE::ReadOptions;
  42. using ROCKSDB_NAMESPACE::Slice;
  43. using ROCKSDB_NAMESPACE::Status;
  44. using ROCKSDB_NAMESPACE::WriteOptions;
  45. const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
  46. const std::string kPrimaryStatusFile =
  47. "/tmp/rocksdb_multi_processes_example_primary_status";
  48. const uint64_t kMaxKey = 600000;
  49. const size_t kMaxValueLength = 256;
  50. const size_t kNumKeysPerFlush = 1000;
  51. const std::vector<std::string>& GetColumnFamilyNames() {
  52. static std::vector<std::string> column_family_names = {
  53. ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"};
  54. return column_family_names;
  55. }
  56. inline bool IsLittleEndian() {
  57. uint32_t x = 1;
  58. return *reinterpret_cast<char*>(&x) != 0;
  59. }
  60. static std::atomic<int>& ShouldSecondaryWait() {
  61. static std::atomic<int> should_secondary_wait{1};
  62. return should_secondary_wait;
  63. }
  64. static std::string Key(uint64_t k) {
  65. std::string ret;
  66. if (IsLittleEndian()) {
  67. ret.append(reinterpret_cast<char*>(&k), sizeof(k));
  68. } else {
  69. char buf[sizeof(k)];
  70. buf[0] = k & 0xff;
  71. buf[1] = (k >> 8) & 0xff;
  72. buf[2] = (k >> 16) & 0xff;
  73. buf[3] = (k >> 24) & 0xff;
  74. buf[4] = (k >> 32) & 0xff;
  75. buf[5] = (k >> 40) & 0xff;
  76. buf[6] = (k >> 48) & 0xff;
  77. buf[7] = (k >> 56) & 0xff;
  78. ret.append(buf, sizeof(k));
  79. }
  80. size_t i = 0, j = ret.size() - 1;
  81. while (i < j) {
  82. char tmp = ret[i];
  83. ret[i] = ret[j];
  84. ret[j] = tmp;
  85. ++i;
  86. --j;
  87. }
  88. return ret;
  89. }
  90. static uint64_t Key(std::string key) {
  91. assert(key.size() == sizeof(uint64_t));
  92. size_t i = 0, j = key.size() - 1;
  93. while (i < j) {
  94. char tmp = key[i];
  95. key[i] = key[j];
  96. key[j] = tmp;
  97. ++i;
  98. --j;
  99. }
  100. uint64_t ret = 0;
  101. if (IsLittleEndian()) {
  102. memcpy(&ret, key.c_str(), sizeof(uint64_t));
  103. } else {
  104. const char* buf = key.c_str();
  105. ret |= static_cast<uint64_t>(buf[0]);
  106. ret |= (static_cast<uint64_t>(buf[1]) << 8);
  107. ret |= (static_cast<uint64_t>(buf[2]) << 16);
  108. ret |= (static_cast<uint64_t>(buf[3]) << 24);
  109. ret |= (static_cast<uint64_t>(buf[4]) << 32);
  110. ret |= (static_cast<uint64_t>(buf[5]) << 40);
  111. ret |= (static_cast<uint64_t>(buf[6]) << 48);
  112. ret |= (static_cast<uint64_t>(buf[7]) << 56);
  113. }
  114. return ret;
  115. }
  116. static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
  117. size_t sz = 1 + (std::rand() % max_length);
  118. int rnd = std::rand();
  119. for (size_t i = 0; i != sz; ++i) {
  120. scratch[i] = static_cast<char>(rnd ^ i);
  121. }
  122. return Slice(scratch, sz);
  123. }
  124. static bool ShouldCloseDB() { return true; }
  125. // TODO: port this example to other systems. It should be straightforward for
  126. // POSIX-compliant systems.
  127. #if defined(OS_LINUX)
  128. void CreateDB() {
  129. long my_pid = static_cast<long>(getpid());
  130. Options options;
  131. Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
  132. if (!s.ok()) {
  133. fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
  134. s.ToString().c_str());
  135. assert(false);
  136. }
  137. options.create_if_missing = true;
  138. DB* db = nullptr;
  139. s = DB::Open(options, kDBPath, &db);
  140. if (!s.ok()) {
  141. fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
  142. s.ToString().c_str());
  143. assert(false);
  144. }
  145. std::vector<ColumnFamilyHandle*> handles;
  146. ColumnFamilyOptions cf_opts(options);
  147. for (const auto& cf_name : GetColumnFamilyNames()) {
  148. if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) {
  149. ColumnFamilyHandle* handle = nullptr;
  150. s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
  151. if (!s.ok()) {
  152. fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
  153. cf_name.c_str(), s.ToString().c_str());
  154. assert(false);
  155. }
  156. handles.push_back(handle);
  157. }
  158. }
  159. fprintf(stdout, "[process %ld] Column families created\n", my_pid);
  160. for (auto h : handles) {
  161. delete h;
  162. }
  163. handles.clear();
  164. delete db;
  165. }
  166. void RunPrimary() {
  167. long my_pid = static_cast<long>(getpid());
  168. fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
  169. CreateDB();
  170. std::srand(time(nullptr));
  171. DB* db = nullptr;
  172. Options options;
  173. options.create_if_missing = false;
  174. std::vector<ColumnFamilyDescriptor> column_families;
  175. for (const auto& cf_name : GetColumnFamilyNames()) {
  176. column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
  177. }
  178. std::vector<ColumnFamilyHandle*> handles;
  179. WriteOptions write_opts;
  180. char val_buf[kMaxValueLength] = {0};
  181. uint64_t curr_key = 0;
  182. while (curr_key < kMaxKey) {
  183. Status s;
  184. if (nullptr == db) {
  185. s = DB::Open(options, kDBPath, column_families, &handles, &db);
  186. if (!s.ok()) {
  187. fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
  188. s.ToString().c_str());
  189. assert(false);
  190. }
  191. }
  192. assert(nullptr != db);
  193. assert(handles.size() == GetColumnFamilyNames().size());
  194. for (auto h : handles) {
  195. assert(nullptr != h);
  196. for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
  197. Slice key = Key(curr_key + static_cast<uint64_t>(i));
  198. Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
  199. s = db->Put(write_opts, h, key, value);
  200. if (!s.ok()) {
  201. fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
  202. assert(false);
  203. }
  204. }
  205. s = db->Flush(FlushOptions(), h);
  206. if (!s.ok()) {
  207. fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
  208. assert(false);
  209. }
  210. }
  211. curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
  212. if (ShouldCloseDB()) {
  213. for (auto h : handles) {
  214. delete h;
  215. }
  216. handles.clear();
  217. delete db;
  218. db = nullptr;
  219. }
  220. }
  221. if (nullptr != db) {
  222. for (auto h : handles) {
  223. delete h;
  224. }
  225. handles.clear();
  226. delete db;
  227. db = nullptr;
  228. }
  229. fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
  230. }
  231. void secondary_instance_sigint_handler(int signal) {
  232. ShouldSecondaryWait().store(0, std::memory_order_relaxed);
  233. fprintf(stdout, "\n");
  234. fflush(stdout);
  235. };
  236. void RunSecondary() {
  237. ::signal(SIGINT, secondary_instance_sigint_handler);
  238. long my_pid = static_cast<long>(getpid());
  239. const std::string kSecondaryPath =
  240. "/tmp/rocksdb_multi_processes_example_secondary";
  241. // Create directory if necessary
  242. if (nullptr == opendir(kSecondaryPath.c_str())) {
  243. int ret =
  244. mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
  245. if (ret < 0) {
  246. perror("failed to create directory for secondary instance");
  247. exit(0);
  248. }
  249. }
  250. DB* db = nullptr;
  251. Options options;
  252. options.create_if_missing = false;
  253. options.max_open_files = -1;
  254. Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
  255. if (!s.ok()) {
  256. fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
  257. my_pid, s.ToString().c_str());
  258. assert(false);
  259. } else {
  260. fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
  261. }
  262. ReadOptions ropts;
  263. ropts.verify_checksums = true;
  264. ropts.total_order_seek = true;
  265. std::vector<std::thread> test_threads;
  266. test_threads.emplace_back([&]() {
  267. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  268. std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
  269. iter->SeekToFirst();
  270. size_t count = 0;
  271. for (; iter->Valid(); iter->Next()) {
  272. ++count;
  273. }
  274. }
  275. fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
  276. });
  277. test_threads.emplace_back([&]() {
  278. std::srand(time(nullptr));
  279. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  280. Slice key = Key(std::rand() % kMaxKey);
  281. std::string value;
  282. db->Get(ropts, key, &value);
  283. }
  284. fprintf(stdout, "[process %ld] Point lookup thread finished\n");
  285. });
  286. uint64_t curr_key = 0;
  287. while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
  288. s = db->TryCatchUpWithPrimary();
  289. if (!s.ok()) {
  290. fprintf(stderr,
  291. "[process %ld] error while trying to catch up with "
  292. "primary %s\n",
  293. my_pid, s.ToString().c_str());
  294. assert(false);
  295. }
  296. {
  297. std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
  298. if (!iter) {
  299. fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
  300. assert(false);
  301. }
  302. iter->SeekToLast();
  303. if (iter->Valid()) {
  304. uint64_t curr_max_key = Key(iter->key().ToString());
  305. if (curr_max_key != curr_key) {
  306. fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
  307. curr_key);
  308. curr_key = curr_max_key;
  309. }
  310. }
  311. }
  312. std::this_thread::sleep_for(std::chrono::seconds(1));
  313. }
  314. s = db->TryCatchUpWithPrimary();
  315. if (!s.ok()) {
  316. fprintf(stderr,
  317. "[process %ld] error while trying to catch up with "
  318. "primary %s\n",
  319. my_pid, s.ToString().c_str());
  320. assert(false);
  321. }
  322. std::vector<ColumnFamilyDescriptor> column_families;
  323. for (const auto& cf_name : GetColumnFamilyNames()) {
  324. column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
  325. }
  326. std::vector<ColumnFamilyHandle*> handles;
  327. DB* verification_db = nullptr;
  328. s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
  329. &verification_db);
  330. assert(s.ok());
  331. Iterator* iter1 = verification_db->NewIterator(ropts);
  332. iter1->SeekToFirst();
  333. Iterator* iter = db->NewIterator(ropts);
  334. iter->SeekToFirst();
  335. for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
  336. if (iter->key().ToString() != iter1->key().ToString()) {
  337. fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
  338. Key(iter->key().ToString()), Key(iter1->key().ToString()));
  339. assert(false);
  340. } else if (iter->value().ToString() != iter1->value().ToString()) {
  341. fprintf(stderr, "Value mismatch\n");
  342. assert(false);
  343. }
  344. }
  345. fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
  346. for (auto& thr : test_threads) {
  347. thr.join();
  348. }
  349. delete iter;
  350. delete iter1;
  351. delete db;
  352. delete verification_db;
  353. }
  354. int main(int argc, char** argv) {
  355. if (argc < 2) {
  356. fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
  357. return 0;
  358. }
  359. if (atoi(argv[1]) == 0) {
  360. RunPrimary();
  361. } else {
  362. RunSecondary();
  363. }
  364. return 0;
  365. }
  366. #else // OS_LINUX
  367. int main() {
  368. fpritnf(stderr, "Not implemented.\n");
  369. return 0;
  370. }
  371. #endif // !OS_LINUX