123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- /*
- tests/test_iostream.cpp -- Usage of scoped_output_redirect
- Copyright (c) 2017 Henry F. Schreiner
- All rights reserved. Use of this source code is governed by a
- BSD-style license that can be found in the LICENSE file.
- */
- #if defined(_MSC_VER) && _MSC_VER < 1910 // VS 2015's MSVC
- # pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
- #endif
- #include <pybind11/iostream.h>
- #include "pybind11_tests.h"
- #include <atomic>
- #include <iostream>
- #include <thread>
- void noisy_function(std::string msg, bool flush) {
- std::cout << msg;
- if (flush)
- std::cout << std::flush;
- }
- void noisy_funct_dual(std::string msg, std::string emsg) {
- std::cout << msg;
- std::cerr << emsg;
- }
- // object to manage C++ thread
- // simply repeatedly write to std::cerr until stopped
- // redirect is called at some point to test the safety of scoped_estream_redirect
- struct TestThread {
- TestThread() : t_{nullptr}, stop_{false} {
- auto thread_f = [this] {
- while (!stop_) {
- std::cout << "x" << std::flush;
- std::this_thread::sleep_for(std::chrono::microseconds(50));
- } };
- t_ = new std::thread(std::move(thread_f));
- }
- ~TestThread() {
- delete t_;
- }
- void stop() { stop_ = true; }
- void join() {
- py::gil_scoped_release gil_lock;
- t_->join();
- }
- void sleep() {
- py::gil_scoped_release gil_lock;
- std::this_thread::sleep_for(std::chrono::milliseconds(50));
- }
- std::thread * t_;
- std::atomic<bool> stop_;
- };
- TEST_SUBMODULE(iostream, m) {
- add_ostream_redirect(m);
- // test_evals
- m.def("captured_output_default", [](std::string msg) {
- py::scoped_ostream_redirect redir;
- std::cout << msg << std::flush;
- });
- m.def("captured_output", [](std::string msg) {
- py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
- std::cout << msg << std::flush;
- });
- m.def("guard_output", &noisy_function,
- py::call_guard<py::scoped_ostream_redirect>(),
- py::arg("msg"), py::arg("flush")=true);
- m.def("captured_err", [](std::string msg) {
- py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
- std::cerr << msg << std::flush;
- });
- m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);
- m.def("dual_guard", &noisy_funct_dual,
- py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
- py::arg("msg"), py::arg("emsg"));
- m.def("raw_output", [](std::string msg) {
- std::cout << msg << std::flush;
- });
- m.def("raw_err", [](std::string msg) {
- std::cerr << msg << std::flush;
- });
- m.def("captured_dual", [](std::string msg, std::string emsg) {
- py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
- py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
- std::cout << msg << std::flush;
- std::cerr << emsg << std::flush;
- });
- py::class_<TestThread>(m, "TestThread")
- .def(py::init<>())
- .def("stop", &TestThread::stop)
- .def("join", &TestThread::join)
- .def("sleep", &TestThread::sleep);
- }
|