work_queue_test.cc 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. /*
  6. * Copyright (c) 2016-present, Facebook, Inc.
  7. * All rights reserved.
  8. *
  9. * This source code is licensed under both the BSD-style license (found in the
  10. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  11. * in the COPYING file in the root directory of this source tree).
  12. */
  13. #include "util/work_queue.h"
  14. #include <gtest/gtest.h>
  15. #include <iostream>
  16. #include <memory>
  17. #include <mutex>
  18. #include <thread>
  19. #include <vector>
  20. #include "port/stack_trace.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. // Unit test for work_queue.h.
  23. //
  24. // This file is an excerpt from Facebook's zstd repo at
  25. // https://github.com/facebook/zstd/. The relevant file is
  26. // contrib/pzstd/utils/test/WorkQueueTest.cpp.
  27. struct Popper {
  28. WorkQueue<int>* queue;
  29. int* results;
  30. std::mutex* mutex;
  31. void operator()() {
  32. int result;
  33. while (queue->pop(result)) {
  34. std::lock_guard<std::mutex> lock(*mutex);
  35. results[result] = result;
  36. }
  37. }
  38. };
  39. TEST(WorkQueue, SingleThreaded) {
  40. WorkQueue<int> queue;
  41. int result;
  42. queue.push(5);
  43. EXPECT_TRUE(queue.pop(result));
  44. EXPECT_EQ(5, result);
  45. queue.push(1);
  46. queue.push(2);
  47. EXPECT_TRUE(queue.pop(result));
  48. EXPECT_EQ(1, result);
  49. EXPECT_TRUE(queue.pop(result));
  50. EXPECT_EQ(2, result);
  51. queue.push(1);
  52. queue.push(2);
  53. queue.finish();
  54. EXPECT_TRUE(queue.pop(result));
  55. EXPECT_EQ(1, result);
  56. EXPECT_TRUE(queue.pop(result));
  57. EXPECT_EQ(2, result);
  58. EXPECT_FALSE(queue.pop(result));
  59. queue.waitUntilFinished();
  60. }
  61. TEST(WorkQueue, SPSC) {
  62. WorkQueue<int> queue;
  63. const int max = 100;
  64. for (int i = 0; i < 10; ++i) {
  65. queue.push(i);
  66. }
  67. std::thread thread([&queue, max] {
  68. int result;
  69. for (int i = 0;; ++i) {
  70. if (!queue.pop(result)) {
  71. EXPECT_EQ(i, max);
  72. break;
  73. }
  74. EXPECT_EQ(i, result);
  75. }
  76. });
  77. std::this_thread::yield();
  78. for (int i = 10; i < max; ++i) {
  79. queue.push(i);
  80. }
  81. queue.finish();
  82. thread.join();
  83. }
  84. TEST(WorkQueue, SPMC) {
  85. WorkQueue<int> queue;
  86. std::vector<int> results(50, -1);
  87. std::mutex mutex;
  88. std::vector<std::thread> threads;
  89. for (int i = 0; i < 5; ++i) {
  90. threads.emplace_back(Popper{&queue, results.data(), &mutex});
  91. }
  92. for (int i = 0; i < 50; ++i) {
  93. queue.push(i);
  94. }
  95. queue.finish();
  96. for (auto& thread : threads) {
  97. thread.join();
  98. }
  99. for (int i = 0; i < 50; ++i) {
  100. EXPECT_EQ(i, results[i]);
  101. }
  102. }
  103. TEST(WorkQueue, MPMC) {
  104. WorkQueue<int> queue;
  105. std::vector<int> results(100, -1);
  106. std::mutex mutex;
  107. std::vector<std::thread> popperThreads;
  108. for (int i = 0; i < 4; ++i) {
  109. popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
  110. }
  111. std::vector<std::thread> pusherThreads;
  112. for (int i = 0; i < 2; ++i) {
  113. auto min = i * 50;
  114. auto max = (i + 1) * 50;
  115. pusherThreads.emplace_back([&queue, min, max] {
  116. for (int j = min; j < max; ++j) {
  117. queue.push(j);
  118. }
  119. });
  120. }
  121. for (auto& thread : pusherThreads) {
  122. thread.join();
  123. }
  124. queue.finish();
  125. for (auto& thread : popperThreads) {
  126. thread.join();
  127. }
  128. for (int i = 0; i < 100; ++i) {
  129. EXPECT_EQ(i, results[i]);
  130. }
  131. }
  132. TEST(WorkQueue, BoundedSizeWorks) {
  133. WorkQueue<int> queue(1);
  134. int result;
  135. queue.push(5);
  136. queue.pop(result);
  137. queue.push(5);
  138. queue.pop(result);
  139. queue.push(5);
  140. queue.finish();
  141. queue.pop(result);
  142. EXPECT_EQ(5, result);
  143. }
  144. TEST(WorkQueue, BoundedSizePushAfterFinish) {
  145. WorkQueue<int> queue(1);
  146. int result;
  147. queue.push(5);
  148. std::thread pusher([&queue] { queue.push(6); });
  149. // Dirtily try and make sure that pusher has run.
  150. std::this_thread::sleep_for(std::chrono::seconds(1));
  151. queue.finish();
  152. EXPECT_TRUE(queue.pop(result));
  153. EXPECT_EQ(5, result);
  154. EXPECT_FALSE(queue.pop(result));
  155. pusher.join();
  156. }
  157. TEST(WorkQueue, SetMaxSize) {
  158. WorkQueue<int> queue(2);
  159. int result;
  160. queue.push(5);
  161. queue.push(6);
  162. queue.setMaxSize(1);
  163. std::thread pusher([&queue] { queue.push(7); });
  164. // Dirtily try and make sure that pusher has run.
  165. std::this_thread::sleep_for(std::chrono::seconds(1));
  166. queue.finish();
  167. EXPECT_TRUE(queue.pop(result));
  168. EXPECT_EQ(5, result);
  169. EXPECT_TRUE(queue.pop(result));
  170. EXPECT_EQ(6, result);
  171. EXPECT_FALSE(queue.pop(result));
  172. pusher.join();
  173. }
  174. TEST(WorkQueue, BoundedSizeMPMC) {
  175. WorkQueue<int> queue(10);
  176. std::vector<int> results(200, -1);
  177. std::mutex mutex;
  178. std::cerr << "Creating popperThreads" << std::endl;
  179. std::vector<std::thread> popperThreads;
  180. for (int i = 0; i < 4; ++i) {
  181. popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
  182. }
  183. std::cerr << "Creating pusherThreads" << std::endl;
  184. std::vector<std::thread> pusherThreads;
  185. for (int i = 0; i < 2; ++i) {
  186. auto min = i * 100;
  187. auto max = (i + 1) * 100;
  188. pusherThreads.emplace_back([&queue, min, max] {
  189. for (int j = min; j < max; ++j) {
  190. queue.push(j);
  191. }
  192. });
  193. }
  194. std::cerr << "Joining pusherThreads" << std::endl;
  195. for (auto& thread : pusherThreads) {
  196. thread.join();
  197. }
  198. std::cerr << "Finishing queue" << std::endl;
  199. queue.finish();
  200. std::cerr << "Joining popperThreads" << std::endl;
  201. for (auto& thread : popperThreads) {
  202. thread.join();
  203. }
  204. std::cerr << "Inspecting results" << std::endl;
  205. for (int i = 0; i < 200; ++i) {
  206. EXPECT_EQ(i, results[i]);
  207. }
  208. }
  209. TEST(WorkQueue, FailedPush) {
  210. WorkQueue<int> queue;
  211. EXPECT_TRUE(queue.push(1));
  212. queue.finish();
  213. EXPECT_FALSE(queue.push(1));
  214. }
  215. TEST(WorkQueue, FailedPop) {
  216. WorkQueue<int> queue;
  217. int x = 5;
  218. EXPECT_TRUE(queue.push(x));
  219. queue.finish();
  220. x = 0;
  221. EXPECT_TRUE(queue.pop(x));
  222. EXPECT_EQ(5, x);
  223. EXPECT_FALSE(queue.pop(x));
  224. EXPECT_EQ(5, x);
  225. }
  226. } // namespace ROCKSDB_NAMESPACE
  227. int main(int argc, char** argv) {
  228. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  229. ::testing::InitGoogleTest(&argc, argv);
  230. return RUN_ALL_TESTS();
  231. }