persistent_cache_bench.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #ifndef ROCKSDB_LITE
  7. #ifndef GFLAGS
  8. #include <cstdio>
  9. int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
  10. #else
  11. #include <atomic>
  12. #include <functional>
  13. #include <memory>
  14. #include <sstream>
  15. #include <unordered_map>
  16. #include "rocksdb/env.h"
  17. #include "utilities/persistent_cache/block_cache_tier.h"
  18. #include "utilities/persistent_cache/persistent_cache_tier.h"
  19. #include "utilities/persistent_cache/volatile_tier_impl.h"
  20. #include "monitoring/histogram.h"
  21. #include "port/port.h"
  22. #include "table/block_based/block_builder.h"
  23. #include "util/gflags_compat.h"
  24. #include "util/mutexlock.h"
  25. #include "util/stop_watch.h"
  26. DEFINE_int32(nsec, 10, "nsec");
  27. DEFINE_int32(nthread_write, 1, "Insert threads");
  28. DEFINE_int32(nthread_read, 1, "Lookup threads");
  29. DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
  30. DEFINE_string(log_path, "/tmp/log", "Path for the log file");
  31. DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
  32. DEFINE_int32(iosize, 4 * 1024, "Read IO size");
  33. DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
  34. DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
  35. DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
  36. DEFINE_string(cache_type, "block_cache",
  37. "Cache type. (block_cache, volatile, tiered)");
  38. DEFINE_bool(benchmark, false, "Benchmark mode");
  39. DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
  40. namespace ROCKSDB_NAMESPACE {
  41. std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
  42. assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
  43. std::unique_ptr<PersistentCacheTier> pcache(
  44. new VolatileCacheTier(FLAGS_cache_size));
  45. return pcache;
  46. }
  47. std::unique_ptr<PersistentCacheTier> NewBlockCache() {
  48. std::shared_ptr<Logger> log;
  49. if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
  50. fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
  51. return nullptr;
  52. }
  53. PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
  54. opt.writer_dispatch_size = FLAGS_writer_iosize;
  55. opt.writer_qdepth = FLAGS_writer_qdepth;
  56. opt.pipeline_writes = FLAGS_enable_pipelined_writes;
  57. opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
  58. std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
  59. Status status = cache->Open();
  60. return cache;
  61. }
  62. // create a new cache tier
  63. // construct a tiered RAM+Block cache
  64. std::unique_ptr<PersistentTieredCache> NewTieredCache(
  65. const size_t mem_size, const PersistentCacheConfig& opt) {
  66. std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
  67. // create primary tier
  68. assert(mem_size);
  69. auto pcache =
  70. std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
  71. tcache->AddTier(pcache);
  72. // create secondary tier
  73. auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
  74. tcache->AddTier(scache);
  75. Status s = tcache->Open();
  76. assert(s.ok());
  77. return tcache;
  78. }
  79. std::unique_ptr<PersistentTieredCache> NewTieredCache() {
  80. std::shared_ptr<Logger> log;
  81. if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
  82. fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
  83. abort();
  84. }
  85. auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
  86. PersistentCacheConfig opt(Env::Default(), FLAGS_path,
  87. (1 - pct) * FLAGS_cache_size, log);
  88. opt.writer_dispatch_size = FLAGS_writer_iosize;
  89. opt.writer_qdepth = FLAGS_writer_qdepth;
  90. opt.pipeline_writes = FLAGS_enable_pipelined_writes;
  91. opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
  92. return NewTieredCache(FLAGS_cache_size * pct, opt);
  93. }
  94. //
  95. // Benchmark driver
  96. //
  97. class CacheTierBenchmark {
  98. public:
  99. explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
  100. : cache_(cache) {
  101. if (FLAGS_nthread_read) {
  102. fprintf(stdout, "Pre-populating\n");
  103. Prepop();
  104. fprintf(stdout, "Pre-population completed\n");
  105. }
  106. stats_.Clear();
  107. // Start IO threads
  108. std::list<port::Thread> threads;
  109. Spawn(FLAGS_nthread_write, &threads,
  110. std::bind(&CacheTierBenchmark::Write, this));
  111. Spawn(FLAGS_nthread_read, &threads,
  112. std::bind(&CacheTierBenchmark::Read, this));
  113. // Wait till FLAGS_nsec and then signal to quit
  114. StopWatchNano t(Env::Default(), /*auto_start=*/true);
  115. size_t sec = t.ElapsedNanos() / 1000000000ULL;
  116. while (!quit_) {
  117. sec = t.ElapsedNanos() / 1000000000ULL;
  118. quit_ = sec > size_t(FLAGS_nsec);
  119. /* sleep override */ sleep(1);
  120. }
  121. // Wait for threads to exit
  122. Join(&threads);
  123. // Print stats
  124. PrintStats(sec);
  125. // Close the cache
  126. cache_->TEST_Flush();
  127. cache_->Close();
  128. }
  129. private:
  130. void PrintStats(const size_t sec) {
  131. std::ostringstream msg;
  132. msg << "Test stats" << std::endl
  133. << "* Elapsed: " << sec << " s" << std::endl
  134. << "* Write Latency:" << std::endl
  135. << stats_.write_latency_.ToString() << std::endl
  136. << "* Read Latency:" << std::endl
  137. << stats_.read_latency_.ToString() << std::endl
  138. << "* Bytes written:" << std::endl
  139. << stats_.bytes_written_.ToString() << std::endl
  140. << "* Bytes read:" << std::endl
  141. << stats_.bytes_read_.ToString() << std::endl
  142. << "Cache stats:" << std::endl
  143. << cache_->PrintStats() << std::endl;
  144. fprintf(stderr, "%s\n", msg.str().c_str());
  145. }
  146. //
  147. // Insert implementation and corresponding helper functions
  148. //
  149. void Prepop() {
  150. for (uint64_t i = 0; i < 1024 * 1024; ++i) {
  151. InsertKey(i);
  152. insert_key_limit_++;
  153. read_key_limit_++;
  154. }
  155. // Wait until data is flushed
  156. cache_->TEST_Flush();
  157. // warmup the cache
  158. for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
  159. }
  160. }
  161. void Write() {
  162. while (!quit_) {
  163. InsertKey(insert_key_limit_++);
  164. }
  165. }
  166. void InsertKey(const uint64_t key) {
  167. // construct key
  168. uint64_t k[3];
  169. Slice block_key = FillKey(k, key);
  170. // construct value
  171. auto block = NewBlock(key);
  172. // insert
  173. StopWatchNano timer(Env::Default(), /*auto_start=*/true);
  174. while (true) {
  175. Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
  176. if (status.ok()) {
  177. break;
  178. }
  179. // transient error is possible if we run without pipelining
  180. assert(!FLAGS_enable_pipelined_writes);
  181. }
  182. // adjust stats
  183. const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
  184. stats_.write_latency_.Add(elapsed_micro);
  185. stats_.bytes_written_.Add(FLAGS_iosize);
  186. }
  187. //
  188. // Read implementation
  189. //
  190. void Read() {
  191. while (!quit_) {
  192. ReadKey(random() % read_key_limit_);
  193. }
  194. }
  195. void ReadKey(const uint64_t val) {
  196. // construct key
  197. uint64_t k[3];
  198. Slice key = FillKey(k, val);
  199. // Lookup in cache
  200. StopWatchNano timer(Env::Default(), /*auto_start=*/true);
  201. std::unique_ptr<char[]> block;
  202. size_t size;
  203. Status status = cache_->Lookup(key, &block, &size);
  204. if (!status.ok()) {
  205. fprintf(stderr, "%s\n", status.ToString().c_str());
  206. }
  207. assert(status.ok());
  208. assert(size == (size_t) FLAGS_iosize);
  209. // adjust stats
  210. const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
  211. stats_.read_latency_.Add(elapsed_micro);
  212. stats_.bytes_read_.Add(FLAGS_iosize);
  213. // verify content
  214. if (!FLAGS_benchmark) {
  215. auto expected_block = NewBlock(val);
  216. assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
  217. }
  218. }
  219. // create data for a key by filling with a certain pattern
  220. std::unique_ptr<char[]> NewBlock(const uint64_t val) {
  221. std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
  222. memset(data.get(), val % 255, FLAGS_iosize);
  223. return data;
  224. }
  225. // spawn threads
  226. void Spawn(const size_t n, std::list<port::Thread>* threads,
  227. const std::function<void()>& fn) {
  228. for (size_t i = 0; i < n; ++i) {
  229. threads->emplace_back(fn);
  230. }
  231. }
  232. // join threads
  233. void Join(std::list<port::Thread>* threads) {
  234. for (auto& th : *threads) {
  235. th.join();
  236. }
  237. }
  238. // construct key
  239. Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
  240. k[0] = k[1] = 0;
  241. k[2] = val;
  242. void* p = static_cast<void*>(&k);
  243. return Slice(static_cast<char*>(p), sizeof(k));
  244. }
  245. // benchmark stats
  246. struct Stats {
  247. void Clear() {
  248. bytes_written_.Clear();
  249. bytes_read_.Clear();
  250. read_latency_.Clear();
  251. write_latency_.Clear();
  252. }
  253. HistogramImpl bytes_written_;
  254. HistogramImpl bytes_read_;
  255. HistogramImpl read_latency_;
  256. HistogramImpl write_latency_;
  257. };
  258. std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
  259. std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
  260. std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
  261. bool quit_ = false; // Quit thread ?
  262. mutable Stats stats_; // Stats
  263. };
  264. } // namespace ROCKSDB_NAMESPACE
  265. //
  266. // main
  267. //
  268. int main(int argc, char** argv) {
  269. GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
  270. std::string(argv[0]) + " [OPTIONS]...");
  271. GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
  272. std::ostringstream msg;
  273. msg << "Config" << std::endl
  274. << "======" << std::endl
  275. << "* nsec=" << FLAGS_nsec << std::endl
  276. << "* nthread_write=" << FLAGS_nthread_write << std::endl
  277. << "* path=" << FLAGS_path << std::endl
  278. << "* cache_size=" << FLAGS_cache_size << std::endl
  279. << "* iosize=" << FLAGS_iosize << std::endl
  280. << "* writer_iosize=" << FLAGS_writer_iosize << std::endl
  281. << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
  282. << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
  283. << std::endl
  284. << "* cache_type=" << FLAGS_cache_type << std::endl
  285. << "* benchmark=" << FLAGS_benchmark << std::endl
  286. << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
  287. fprintf(stderr, "%s\n", msg.str().c_str());
  288. std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
  289. if (FLAGS_cache_type == "block_cache") {
  290. fprintf(stderr, "Using block cache implementation\n");
  291. cache = ROCKSDB_NAMESPACE::NewBlockCache();
  292. } else if (FLAGS_cache_type == "volatile") {
  293. fprintf(stderr, "Using volatile cache implementation\n");
  294. cache = ROCKSDB_NAMESPACE::NewVolatileCache();
  295. } else if (FLAGS_cache_type == "tiered") {
  296. fprintf(stderr, "Using tiered cache implementation\n");
  297. cache = ROCKSDB_NAMESPACE::NewTieredCache();
  298. } else {
  299. fprintf(stderr, "Unknown option for cache\n");
  300. }
  301. assert(cache);
  302. if (!cache) {
  303. fprintf(stderr, "Error creating cache\n");
  304. abort();
  305. }
  306. std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
  307. new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
  308. return 0;
  309. }
  310. #endif // #ifndef GFLAGS
  311. #else
  312. int main(int, char**) { return 0; }
  313. #endif