| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- /*
- * Copyright (c) 2016-present, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed under both the BSD-style license (found in the
- * LICENSE file in the root directory of this source tree) and the GPLv2 (found
- * in the COPYING file in the root directory of this source tree).
- */
- #include "util/work_queue.h"
- #include <gtest/gtest.h>
- #include <iostream>
- #include <memory>
- #include <mutex>
- #include <thread>
- #include <vector>
- #include "port/stack_trace.h"
- namespace ROCKSDB_NAMESPACE {
- // Unit test for work_queue.h.
- //
- // This file is an excerpt from Facebook's zstd repo at
- // https://github.com/facebook/zstd/. The relevant file is
- // contrib/pzstd/utils/test/WorkQueueTest.cpp.
- struct Popper {
- WorkQueue<int>* queue;
- int* results;
- std::mutex* mutex;
- void operator()() {
- int result;
- while (queue->pop(result)) {
- std::lock_guard<std::mutex> lock(*mutex);
- results[result] = result;
- }
- }
- };
- TEST(WorkQueue, SingleThreaded) {
- WorkQueue<int> queue;
- int result;
- queue.push(5);
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(5, result);
- queue.push(1);
- queue.push(2);
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(1, result);
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(2, result);
- queue.push(1);
- queue.push(2);
- queue.finish();
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(1, result);
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(2, result);
- EXPECT_FALSE(queue.pop(result));
- queue.waitUntilFinished();
- }
- TEST(WorkQueue, SPSC) {
- WorkQueue<int> queue;
- const int max = 100;
- for (int i = 0; i < 10; ++i) {
- queue.push(i);
- }
- std::thread thread([&queue, max] {
- int result;
- for (int i = 0;; ++i) {
- if (!queue.pop(result)) {
- EXPECT_EQ(i, max);
- break;
- }
- EXPECT_EQ(i, result);
- }
- });
- std::this_thread::yield();
- for (int i = 10; i < max; ++i) {
- queue.push(i);
- }
- queue.finish();
- thread.join();
- }
- TEST(WorkQueue, SPMC) {
- WorkQueue<int> queue;
- std::vector<int> results(50, -1);
- std::mutex mutex;
- std::vector<std::thread> threads;
- for (int i = 0; i < 5; ++i) {
- threads.emplace_back(Popper{&queue, results.data(), &mutex});
- }
- for (int i = 0; i < 50; ++i) {
- queue.push(i);
- }
- queue.finish();
- for (auto& thread : threads) {
- thread.join();
- }
- for (int i = 0; i < 50; ++i) {
- EXPECT_EQ(i, results[i]);
- }
- }
- TEST(WorkQueue, MPMC) {
- WorkQueue<int> queue;
- std::vector<int> results(100, -1);
- std::mutex mutex;
- std::vector<std::thread> popperThreads;
- for (int i = 0; i < 4; ++i) {
- popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
- }
- std::vector<std::thread> pusherThreads;
- for (int i = 0; i < 2; ++i) {
- auto min = i * 50;
- auto max = (i + 1) * 50;
- pusherThreads.emplace_back([&queue, min, max] {
- for (int j = min; j < max; ++j) {
- queue.push(j);
- }
- });
- }
- for (auto& thread : pusherThreads) {
- thread.join();
- }
- queue.finish();
- for (auto& thread : popperThreads) {
- thread.join();
- }
- for (int i = 0; i < 100; ++i) {
- EXPECT_EQ(i, results[i]);
- }
- }
- TEST(WorkQueue, BoundedSizeWorks) {
- WorkQueue<int> queue(1);
- int result;
- queue.push(5);
- queue.pop(result);
- queue.push(5);
- queue.pop(result);
- queue.push(5);
- queue.finish();
- queue.pop(result);
- EXPECT_EQ(5, result);
- }
- TEST(WorkQueue, BoundedSizePushAfterFinish) {
- WorkQueue<int> queue(1);
- int result;
- queue.push(5);
- std::thread pusher([&queue] { queue.push(6); });
- // Dirtily try and make sure that pusher has run.
- std::this_thread::sleep_for(std::chrono::seconds(1));
- queue.finish();
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(5, result);
- EXPECT_FALSE(queue.pop(result));
- pusher.join();
- }
- TEST(WorkQueue, SetMaxSize) {
- WorkQueue<int> queue(2);
- int result;
- queue.push(5);
- queue.push(6);
- queue.setMaxSize(1);
- std::thread pusher([&queue] { queue.push(7); });
- // Dirtily try and make sure that pusher has run.
- std::this_thread::sleep_for(std::chrono::seconds(1));
- queue.finish();
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(5, result);
- EXPECT_TRUE(queue.pop(result));
- EXPECT_EQ(6, result);
- EXPECT_FALSE(queue.pop(result));
- pusher.join();
- }
- TEST(WorkQueue, BoundedSizeMPMC) {
- WorkQueue<int> queue(10);
- std::vector<int> results(200, -1);
- std::mutex mutex;
- std::cerr << "Creating popperThreads" << std::endl;
- std::vector<std::thread> popperThreads;
- for (int i = 0; i < 4; ++i) {
- popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
- }
- std::cerr << "Creating pusherThreads" << std::endl;
- std::vector<std::thread> pusherThreads;
- for (int i = 0; i < 2; ++i) {
- auto min = i * 100;
- auto max = (i + 1) * 100;
- pusherThreads.emplace_back([&queue, min, max] {
- for (int j = min; j < max; ++j) {
- queue.push(j);
- }
- });
- }
- std::cerr << "Joining pusherThreads" << std::endl;
- for (auto& thread : pusherThreads) {
- thread.join();
- }
- std::cerr << "Finishing queue" << std::endl;
- queue.finish();
- std::cerr << "Joining popperThreads" << std::endl;
- for (auto& thread : popperThreads) {
- thread.join();
- }
- std::cerr << "Inspecting results" << std::endl;
- for (int i = 0; i < 200; ++i) {
- EXPECT_EQ(i, results[i]);
- }
- }
- TEST(WorkQueue, FailedPush) {
- WorkQueue<int> queue;
- EXPECT_TRUE(queue.push(1));
- queue.finish();
- EXPECT_FALSE(queue.push(1));
- }
- TEST(WorkQueue, FailedPop) {
- WorkQueue<int> queue;
- int x = 5;
- EXPECT_TRUE(queue.push(x));
- queue.finish();
- x = 0;
- EXPECT_TRUE(queue.pop(x));
- EXPECT_EQ(5, x);
- EXPECT_FALSE(queue.pop(x));
- EXPECT_EQ(5, x);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|