test_iostream.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. /*
  2. tests/test_iostream.cpp -- Usage of scoped_output_redirect
  3. Copyright (c) 2017 Henry F. Schreiner
  4. All rights reserved. Use of this source code is governed by a
  5. BSD-style license that can be found in the LICENSE file.
  6. */
  7. #if defined(_MSC_VER) && _MSC_VER < 1910 // VS 2015's MSVC
  8. # pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
  9. #endif
  10. #include <pybind11/iostream.h>
  11. #include "pybind11_tests.h"
  12. #include <atomic>
  13. #include <iostream>
  14. #include <thread>
  15. void noisy_function(std::string msg, bool flush) {
  16. std::cout << msg;
  17. if (flush)
  18. std::cout << std::flush;
  19. }
  20. void noisy_funct_dual(std::string msg, std::string emsg) {
  21. std::cout << msg;
  22. std::cerr << emsg;
  23. }
  24. // object to manage C++ thread
  25. // simply repeatedly write to std::cerr until stopped
  26. // redirect is called at some point to test the safety of scoped_estream_redirect
  27. struct TestThread {
  28. TestThread() : t_{nullptr}, stop_{false} {
  29. auto thread_f = [this] {
  30. while (!stop_) {
  31. std::cout << "x" << std::flush;
  32. std::this_thread::sleep_for(std::chrono::microseconds(50));
  33. } };
  34. t_ = new std::thread(std::move(thread_f));
  35. }
  36. ~TestThread() {
  37. delete t_;
  38. }
  39. void stop() { stop_ = true; }
  40. void join() {
  41. py::gil_scoped_release gil_lock;
  42. t_->join();
  43. }
  44. void sleep() {
  45. py::gil_scoped_release gil_lock;
  46. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  47. }
  48. std::thread * t_;
  49. std::atomic<bool> stop_;
  50. };
  51. TEST_SUBMODULE(iostream, m) {
  52. add_ostream_redirect(m);
  53. // test_evals
  54. m.def("captured_output_default", [](std::string msg) {
  55. py::scoped_ostream_redirect redir;
  56. std::cout << msg << std::flush;
  57. });
  58. m.def("captured_output", [](std::string msg) {
  59. py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
  60. std::cout << msg << std::flush;
  61. });
  62. m.def("guard_output", &noisy_function,
  63. py::call_guard<py::scoped_ostream_redirect>(),
  64. py::arg("msg"), py::arg("flush")=true);
  65. m.def("captured_err", [](std::string msg) {
  66. py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
  67. std::cerr << msg << std::flush;
  68. });
  69. m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);
  70. m.def("dual_guard", &noisy_funct_dual,
  71. py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
  72. py::arg("msg"), py::arg("emsg"));
  73. m.def("raw_output", [](std::string msg) {
  74. std::cout << msg << std::flush;
  75. });
  76. m.def("raw_err", [](std::string msg) {
  77. std::cerr << msg << std::flush;
  78. });
  79. m.def("captured_dual", [](std::string msg, std::string emsg) {
  80. py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
  81. py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
  82. std::cout << msg << std::flush;
  83. std::cerr << emsg << std::flush;
  84. });
  85. py::class_<TestThread>(m, "TestThread")
  86. .def(py::init<>())
  87. .def("stop", &TestThread::stop)
  88. .def("join", &TestThread::join)
  89. .def("sleep", &TestThread::sleep);
  90. }