123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197 |
- /*
- * Copyright (c) 2015, Peter Thorson. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the WebSocket++ Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- #ifndef WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP
- #define WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP
- #include <websocketpp/transport/asio/base.hpp>
- #include <websocketpp/transport/base/connection.hpp>
- #include <websocketpp/logger/levels.hpp>
- #include <websocketpp/http/constants.hpp>
- #include <websocketpp/base64/base64.hpp>
- #include <websocketpp/error.hpp>
- #include <websocketpp/uri.hpp>
- #include <websocketpp/common/asio.hpp>
- #include <websocketpp/common/chrono.hpp>
- #include <websocketpp/common/cpp11.hpp>
- #include <websocketpp/common/memory.hpp>
- #include <websocketpp/common/functional.hpp>
- #include <websocketpp/common/connection_hdl.hpp>
- #include <istream>
- #include <sstream>
- #include <string>
- #include <vector>
- namespace websocketpp {
- namespace transport {
- namespace asio {
- typedef lib::function<void(connection_hdl)> tcp_init_handler;
- /// Asio based connection transport component
- /**
- * transport::asio::connection implements a connection transport component using
- * Asio that works with the transport::asio::endpoint endpoint transport
- * component.
- */
- template <typename config>
- class connection : public config::socket_type::socket_con_type {
- public:
- /// Type of this connection transport component
- typedef connection<config> type;
- /// Type of a shared pointer to this connection transport component
- typedef lib::shared_ptr<type> ptr;
- /// Type of the socket connection component
- typedef typename config::socket_type::socket_con_type socket_con_type;
- /// Type of a shared pointer to the socket connection component
- typedef typename socket_con_type::ptr socket_con_ptr;
- /// Type of this transport's access logging policy
- typedef typename config::alog_type alog_type;
- /// Type of this transport's error logging policy
- typedef typename config::elog_type elog_type;
- typedef typename config::request_type request_type;
- typedef typename request_type::ptr request_ptr;
- typedef typename config::response_type response_type;
- typedef typename response_type::ptr response_ptr;
- /// Type of a pointer to the Asio io_service being used
- typedef lib::asio::io_service * io_service_ptr;
- /// Type of a pointer to the Asio io_service::strand being used
- typedef lib::shared_ptr<lib::asio::io_service::strand> strand_ptr;
- /// Type of a pointer to the Asio timer class
- typedef lib::shared_ptr<lib::asio::steady_timer> timer_ptr;
- // connection is friends with its associated endpoint to allow the endpoint
- // to call private/protected utility methods that we don't want to expose
- // to the public api.
- friend class endpoint<config>;
- // generate and manage our own io_service
- explicit connection(bool is_server, const lib::shared_ptr<alog_type> & alog, const lib::shared_ptr<elog_type> & elog)
- : m_is_server(is_server)
- , m_alog(alog)
- , m_elog(elog)
- {
- m_alog->write(log::alevel::devel,"asio con transport constructor");
- }
- /// Get a shared pointer to this component
- ptr get_shared() {
- return lib::static_pointer_cast<type>(socket_con_type::get_shared());
- }
- bool is_secure() const {
- return socket_con_type::is_secure();
- }
- /// Set uri hook
- /**
- * Called by the endpoint as a connection is being established to provide
- * the uri being connected to to the transport layer.
- *
- * This transport policy doesn't use the uri except to forward it to the
- * socket layer.
- *
- * @since 0.6.0
- *
- * @param u The uri to set
- */
- void set_uri(uri_ptr u) {
- socket_con_type::set_uri(u);
- }
- /// Sets the tcp pre init handler
- /**
- * The tcp pre init handler is called after the raw tcp connection has been
- * established but before any additional wrappers (proxy connects, TLS
- * handshakes, etc) have been performed.
- *
- * @since 0.3.0
- *
- * @param h The handler to call on tcp pre init.
- */
- void set_tcp_pre_init_handler(tcp_init_handler h) {
- m_tcp_pre_init_handler = h;
- }
- /// Sets the tcp pre init handler (deprecated)
- /**
- * The tcp pre init handler is called after the raw tcp connection has been
- * established but before any additional wrappers (proxy connects, TLS
- * handshakes, etc) have been performed.
- *
- * @deprecated Use set_tcp_pre_init_handler instead
- *
- * @param h The handler to call on tcp pre init.
- */
- void set_tcp_init_handler(tcp_init_handler h) {
- set_tcp_pre_init_handler(h);
- }
- /// Sets the tcp post init handler
- /**
- * The tcp post init handler is called after the tcp connection has been
- * established and all additional wrappers (proxy connects, TLS handshakes,
- * etc have been performed. This is fired before any bytes are read or any
- * WebSocket specific handshake logic has been performed.
- *
- * @since 0.3.0
- *
- * @param h The handler to call on tcp post init.
- */
- void set_tcp_post_init_handler(tcp_init_handler h) {
- m_tcp_post_init_handler = h;
- }
- /// Set the proxy to connect through (exception free)
- /**
- * The URI passed should be a complete URI including scheme. For example:
- * http://proxy.example.com:8080/
- *
- * The proxy must be set up as an explicit (CONNECT) proxy allowed to
- * connect to the port you specify. Traffic to the proxy is not encrypted.
- *
- * @param uri The full URI of the proxy to connect to.
- *
- * @param ec A status value
- */
- void set_proxy(std::string const & uri, lib::error_code & ec) {
- // TODO: return errors for illegal URIs here?
- // TODO: should https urls be illegal for the moment?
- m_proxy = uri;
- m_proxy_data = lib::make_shared<proxy_data>();
- ec = lib::error_code();
- }
- /// Set the proxy to connect through (exception)
- void set_proxy(std::string const & uri) {
- lib::error_code ec;
- set_proxy(uri,ec);
- if (ec) { throw exception(ec); }
- }
- /// Set the basic auth credentials to use (exception free)
- /**
- * The URI passed should be a complete URI including scheme. For example:
- * http://proxy.example.com:8080/
- *
- * The proxy must be set up as an explicit proxy
- *
- * @param username The username to send
- *
- * @param password The password to send
- *
- * @param ec A status value
- */
- void set_proxy_basic_auth(std::string const & username, std::string const &
- password, lib::error_code & ec)
- {
- if (!m_proxy_data) {
- ec = make_error_code(websocketpp::error::invalid_state);
- return;
- }
- // TODO: username can't contain ':'
- std::string val = "Basic "+base64_encode(username + ":" + password);
- m_proxy_data->req.replace_header("Proxy-Authorization",val);
- ec = lib::error_code();
- }
- /// Set the basic auth credentials to use (exception)
- void set_proxy_basic_auth(std::string const & username, std::string const &
- password)
- {
- lib::error_code ec;
- set_proxy_basic_auth(username,password,ec);
- if (ec) { throw exception(ec); }
- }
- /// Set the proxy timeout duration (exception free)
- /**
- * Duration is in milliseconds. Default value is based on the transport
- * config
- *
- * @param duration The number of milliseconds to wait before aborting the
- * proxy connection.
- *
- * @param ec A status value
- */
- void set_proxy_timeout(long duration, lib::error_code & ec) {
- if (!m_proxy_data) {
- ec = make_error_code(websocketpp::error::invalid_state);
- return;
- }
- m_proxy_data->timeout_proxy = duration;
- ec = lib::error_code();
- }
- /// Set the proxy timeout duration (exception)
- void set_proxy_timeout(long duration) {
- lib::error_code ec;
- set_proxy_timeout(duration,ec);
- if (ec) { throw exception(ec); }
- }
- std::string const & get_proxy() const {
- return m_proxy;
- }
- /// Get the remote endpoint address
- /**
- * The iostream transport has no information about the ultimate remote
- * endpoint. It will return the string "iostream transport". To indicate
- * this.
- *
- * TODO: allow user settable remote endpoint addresses if this seems useful
- *
- * @return A string identifying the address of the remote endpoint
- */
- std::string get_remote_endpoint() const {
- lib::error_code ec;
- std::string ret = socket_con_type::get_remote_endpoint(ec);
- if (ec) {
- m_elog->write(log::elevel::info,ret);
- return "Unknown";
- } else {
- return ret;
- }
- }
- /// Get the connection handle
- connection_hdl get_handle() const {
- return m_connection_hdl;
- }
- /// Call back a function after a period of time.
- /**
- * Sets a timer that calls back a function after the specified period of
- * milliseconds. Returns a handle that can be used to cancel the timer.
- * A cancelled timer will return the error code error::operation_aborted
- * A timer that expired will return no error.
- *
- * @param duration Length of time to wait in milliseconds
- *
- * @param callback The function to call back when the timer has expired
- *
- * @return A handle that can be used to cancel the timer if it is no longer
- * needed.
- */
- timer_ptr set_timer(long duration, timer_handler callback) {
- timer_ptr new_timer(
- new lib::asio::steady_timer(
- *m_io_service,
- lib::asio::milliseconds(duration))
- );
- if (config::enable_multithreading) {
- new_timer->async_wait(m_strand->wrap(lib::bind(
- &type::handle_timer, get_shared(),
- new_timer,
- callback,
- lib::placeholders::_1
- )));
- } else {
- new_timer->async_wait(lib::bind(
- &type::handle_timer, get_shared(),
- new_timer,
- callback,
- lib::placeholders::_1
- ));
- }
- return new_timer;
- }
- /// Timer callback
- /**
- * The timer pointer is included to ensure the timer isn't destroyed until
- * after it has expired.
- *
- * TODO: candidate for protected status
- *
- * @param post_timer Pointer to the timer in question
- * @param callback The function to call back
- * @param ec The status code
- */
- void handle_timer(timer_ptr, timer_handler callback,
- lib::asio::error_code const & ec)
- {
- if (ec) {
- if (ec == lib::asio::error::operation_aborted) {
- callback(make_error_code(transport::error::operation_aborted));
- } else {
- log_err(log::elevel::info,"asio handle_timer",ec);
- callback(make_error_code(error::pass_through));
- }
- } else {
- callback(lib::error_code());
- }
- }
- /// Get a pointer to this connection's strand
- strand_ptr get_strand() {
- return m_strand;
- }
- /// Get the internal transport error code for a closed/failed connection
- /**
- * Retrieves a machine readable detailed error code indicating the reason
- * that the connection was closed or failed. Valid only after the close or
- * fail handler is called.
- *
- * Primarily used if you are using mismatched asio / system_error
- * implementations such as `boost::asio` with `std::system_error`. In these
- * cases the transport error type is different than the library error type
- * and some WebSocket++ functions that return transport errors via the
- * library error code type will be coerced into a catch all `pass_through`
- * or `tls_error` error. This method will return the original machine
- * readable transport error in the native type.
- *
- * @since 0.7.0
- *
- * @return Error code indicating the reason the connection was closed or
- * failed
- */
- lib::asio::error_code get_transport_ec() const {
- return m_tec;
- }
- /// Initialize transport for reading
- /**
- * init_asio is called once immediately after construction to initialize
- * Asio components to the io_service
- *
- * The transport initialization sequence consists of the following steps:
- * - Pre-init: the underlying socket is initialized to the point where
- * bytes may be written. No bytes are actually written in this stage
- * - Proxy negotiation: if a proxy is set, a request is made to it to start
- * a tunnel to the final destination. This stage ends when the proxy is
- * ready to forward the
- * next byte to the remote endpoint.
- * - Post-init: Perform any i/o with the remote endpoint, such as setting up
- * tunnels for encryption. This stage ends when the connection is ready to
- * read or write the WebSocket handshakes. At this point the original
- * callback function is called.
- */
- protected:
- void init(init_handler callback) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection init");
- }
- // TODO: pre-init timeout. Right now no implemented socket policies
- // actually have an asyncronous pre-init
- socket_con_type::pre_init(
- lib::bind(
- &type::handle_pre_init,
- get_shared(),
- callback,
- lib::placeholders::_1
- )
- );
- }
- /// initialize the proxy buffers and http parsers
- /**
- *
- * @param authority The address of the server we want the proxy to tunnel to
- * in the format of a URI authority (host:port)
- *
- * @return Status code indicating what errors occurred, if any
- */
- lib::error_code proxy_init(std::string const & authority) {
- if (!m_proxy_data) {
- return websocketpp::error::make_error_code(
- websocketpp::error::invalid_state);
- }
- m_proxy_data->req.set_version("HTTP/1.1");
- m_proxy_data->req.set_method("CONNECT");
- m_proxy_data->req.set_uri(authority);
- m_proxy_data->req.replace_header("Host",authority);
- return lib::error_code();
- }
- /// Finish constructing the transport
- /**
- * init_asio is called once immediately after construction to initialize
- * Asio components to the io_service.
- *
- * @param io_service A pointer to the io_service to register with this
- * connection
- *
- * @return Status code for the success or failure of the initialization
- */
- lib::error_code init_asio (io_service_ptr io_service) {
- m_io_service = io_service;
- if (config::enable_multithreading) {
- m_strand.reset(new lib::asio::io_service::strand(*io_service));
- }
- lib::error_code ec = socket_con_type::init_asio(io_service, m_strand,
- m_is_server);
- return ec;
- }
- void handle_pre_init(init_handler callback, lib::error_code const & ec) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection handle pre_init");
- }
- if (m_tcp_pre_init_handler) {
- m_tcp_pre_init_handler(m_connection_hdl);
- }
- if (ec) {
- callback(ec);
- }
- // If we have a proxy set issue a proxy connect, otherwise skip to
- // post_init
- if (!m_proxy.empty()) {
- proxy_write(callback);
- } else {
- post_init(callback);
- }
- }
- void post_init(init_handler callback) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection post_init");
- }
- timer_ptr post_timer;
-
- if (config::timeout_socket_post_init > 0) {
- post_timer = set_timer(
- config::timeout_socket_post_init,
- lib::bind(
- &type::handle_post_init_timeout,
- get_shared(),
- post_timer,
- callback,
- lib::placeholders::_1
- )
- );
- }
- socket_con_type::post_init(
- lib::bind(
- &type::handle_post_init,
- get_shared(),
- post_timer,
- callback,
- lib::placeholders::_1
- )
- );
- }
- /// Post init timeout callback
- /**
- * The timer pointer is included to ensure the timer isn't destroyed until
- * after it has expired.
- *
- * @param post_timer Pointer to the timer in question
- * @param callback The function to call back
- * @param ec The status code
- */
- void handle_post_init_timeout(timer_ptr, init_handler callback,
- lib::error_code const & ec)
- {
- lib::error_code ret_ec;
- if (ec) {
- if (ec == transport::error::operation_aborted) {
- m_alog->write(log::alevel::devel,
- "asio post init timer cancelled");
- return;
- }
- log_err(log::elevel::devel,"asio handle_post_init_timeout",ec);
- ret_ec = ec;
- } else {
- if (socket_con_type::get_ec()) {
- ret_ec = socket_con_type::get_ec();
- } else {
- ret_ec = make_error_code(transport::error::timeout);
- }
- }
- m_alog->write(log::alevel::devel, "Asio transport post-init timed out");
- cancel_socket_checked();
- callback(ret_ec);
- }
- /// Post init timeout callback
- /**
- * The timer pointer is included to ensure the timer isn't destroyed until
- * after it has expired.
- *
- * @param post_timer Pointer to the timer in question
- * @param callback The function to call back
- * @param ec The status code
- */
- void handle_post_init(timer_ptr post_timer, init_handler callback,
- lib::error_code const & ec)
- {
- if (ec == transport::error::operation_aborted ||
- (post_timer && lib::asio::is_neg(post_timer->expires_from_now())))
- {
- m_alog->write(log::alevel::devel,"post_init cancelled");
- return;
- }
- if (post_timer) {
- post_timer->cancel();
- }
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection handle_post_init");
- }
- if (m_tcp_post_init_handler) {
- m_tcp_post_init_handler(m_connection_hdl);
- }
- callback(ec);
- }
- void proxy_write(init_handler callback) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection proxy_write");
- }
- if (!m_proxy_data) {
- m_elog->write(log::elevel::library,
- "assertion failed: !m_proxy_data in asio::connection::proxy_write");
- callback(make_error_code(error::general));
- return;
- }
- m_proxy_data->write_buf = m_proxy_data->req.raw();
- m_bufs.push_back(lib::asio::buffer(m_proxy_data->write_buf.data(),
- m_proxy_data->write_buf.size()));
- m_alog->write(log::alevel::devel,m_proxy_data->write_buf);
- // Set a timer so we don't wait forever for the proxy to respond
- m_proxy_data->timer = this->set_timer(
- m_proxy_data->timeout_proxy,
- lib::bind(
- &type::handle_proxy_timeout,
- get_shared(),
- callback,
- lib::placeholders::_1
- )
- );
- // Send proxy request
- if (config::enable_multithreading) {
- lib::asio::async_write(
- socket_con_type::get_next_layer(),
- m_bufs,
- m_strand->wrap(lib::bind(
- &type::handle_proxy_write, get_shared(),
- callback,
- lib::placeholders::_1
- ))
- );
- } else {
- lib::asio::async_write(
- socket_con_type::get_next_layer(),
- m_bufs,
- lib::bind(
- &type::handle_proxy_write, get_shared(),
- callback,
- lib::placeholders::_1
- )
- );
- }
- }
- void handle_proxy_timeout(init_handler callback, lib::error_code const & ec)
- {
- if (ec == transport::error::operation_aborted) {
- m_alog->write(log::alevel::devel,
- "asio handle_proxy_write timer cancelled");
- return;
- } else if (ec) {
- log_err(log::elevel::devel,"asio handle_proxy_write",ec);
- callback(ec);
- } else {
- m_alog->write(log::alevel::devel,
- "asio handle_proxy_write timer expired");
- cancel_socket_checked();
- callback(make_error_code(transport::error::timeout));
- }
- }
- void handle_proxy_write(init_handler callback,
- lib::asio::error_code const & ec)
- {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,
- "asio connection handle_proxy_write");
- }
- m_bufs.clear();
- // Timer expired or the operation was aborted for some reason.
- // Whatever aborted it will be issuing the callback so we are safe to
- // return
- if (ec == lib::asio::error::operation_aborted ||
- lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
- {
- m_elog->write(log::elevel::devel,"write operation aborted");
- return;
- }
- if (ec) {
- log_err(log::elevel::info,"asio handle_proxy_write",ec);
- m_proxy_data->timer->cancel();
- callback(make_error_code(error::pass_through));
- return;
- }
- proxy_read(callback);
- }
- void proxy_read(init_handler callback) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection proxy_read");
- }
- if (!m_proxy_data) {
- m_elog->write(log::elevel::library,
- "assertion failed: !m_proxy_data in asio::connection::proxy_read");
- m_proxy_data->timer->cancel();
- callback(make_error_code(error::general));
- return;
- }
- if (config::enable_multithreading) {
- lib::asio::async_read_until(
- socket_con_type::get_next_layer(),
- m_proxy_data->read_buf,
- "\r\n\r\n",
- m_strand->wrap(lib::bind(
- &type::handle_proxy_read, get_shared(),
- callback,
- lib::placeholders::_1, lib::placeholders::_2
- ))
- );
- } else {
- lib::asio::async_read_until(
- socket_con_type::get_next_layer(),
- m_proxy_data->read_buf,
- "\r\n\r\n",
- lib::bind(
- &type::handle_proxy_read, get_shared(),
- callback,
- lib::placeholders::_1, lib::placeholders::_2
- )
- );
- }
- }
- /// Proxy read callback
- /**
- * @param init_handler The function to call back
- * @param ec The status code
- * @param bytes_transferred The number of bytes read
- */
- void handle_proxy_read(init_handler callback,
- lib::asio::error_code const & ec, size_t)
- {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,
- "asio connection handle_proxy_read");
- }
- // Timer expired or the operation was aborted for some reason.
- // Whatever aborted it will be issuing the callback so we are safe to
- // return
- if (ec == lib::asio::error::operation_aborted ||
- lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
- {
- m_elog->write(log::elevel::devel,"read operation aborted");
- return;
- }
- // At this point there is no need to wait for the timer anymore
- m_proxy_data->timer->cancel();
- if (ec) {
- m_elog->write(log::elevel::info,
- "asio handle_proxy_read error: "+ec.message());
- callback(make_error_code(error::pass_through));
- } else {
- if (!m_proxy_data) {
- m_elog->write(log::elevel::library,
- "assertion failed: !m_proxy_data in asio::connection::handle_proxy_read");
- callback(make_error_code(error::general));
- return;
- }
- std::istream input(&m_proxy_data->read_buf);
- m_proxy_data->res.consume(input);
- if (!m_proxy_data->res.headers_ready()) {
- // we read until the headers were done in theory but apparently
- // they aren't. Internal endpoint error.
- callback(make_error_code(error::general));
- return;
- }
- m_alog->write(log::alevel::devel,m_proxy_data->res.raw());
- if (m_proxy_data->res.get_status_code() != http::status_code::ok) {
- // got an error response back
- // TODO: expose this error in a programmatically accessible way?
- // if so, see below for an option on how to do this.
- std::stringstream s;
- s << "Proxy connection error: "
- << m_proxy_data->res.get_status_code()
- << " ("
- << m_proxy_data->res.get_status_msg()
- << ")";
- m_elog->write(log::elevel::info,s.str());
- callback(make_error_code(error::proxy_failed));
- return;
- }
- // we have successfully established a connection to the proxy, now
- // we can continue and the proxy will transparently forward the
- // WebSocket connection.
- // TODO: decide if we want an on_proxy callback that would allow
- // access to the proxy response.
- // free the proxy buffers and req/res objects as they aren't needed
- // anymore
- m_proxy_data.reset();
- // Continue with post proxy initialization
- post_init(callback);
- }
- }
- /// read at least num_bytes bytes into buf and then call handler.
- void async_read_at_least(size_t num_bytes, char *buf, size_t len,
- read_handler handler)
- {
- if (m_alog->static_test(log::alevel::devel)) {
- std::stringstream s;
- s << "asio async_read_at_least: " << num_bytes;
- m_alog->write(log::alevel::devel,s.str());
- }
- // TODO: safety vs speed ?
- // maybe move into an if devel block
- /*if (num_bytes > len) {
- m_elog->write(log::elevel::devel,
- "asio async_read_at_least error::invalid_num_bytes");
- handler(make_error_code(transport::error::invalid_num_bytes),
- size_t(0));
- return;
- }*/
- if (config::enable_multithreading) {
- lib::asio::async_read(
- socket_con_type::get_socket(),
- lib::asio::buffer(buf,len),
- lib::asio::transfer_at_least(num_bytes),
- m_strand->wrap(make_custom_alloc_handler(
- m_read_handler_allocator,
- lib::bind(
- &type::handle_async_read, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- ))
- );
- } else {
- lib::asio::async_read(
- socket_con_type::get_socket(),
- lib::asio::buffer(buf,len),
- lib::asio::transfer_at_least(num_bytes),
- make_custom_alloc_handler(
- m_read_handler_allocator,
- lib::bind(
- &type::handle_async_read, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- )
- );
- }
-
- }
- void handle_async_read(read_handler handler, lib::asio::error_code const & ec,
- size_t bytes_transferred)
- {
- m_alog->write(log::alevel::devel, "asio con handle_async_read");
- // translate asio error codes into more lib::error_codes
- lib::error_code tec;
- if (ec == lib::asio::error::eof) {
- tec = make_error_code(transport::error::eof);
- } else if (ec) {
- // We don't know much more about the error at this point. As our
- // socket/security policy if it knows more:
- tec = socket_con_type::translate_ec(ec);
- m_tec = ec;
- if (tec == transport::error::tls_error ||
- tec == transport::error::pass_through)
- {
- // These are aggregate/catch all errors. Log some human readable
- // information to the info channel to give library users some
- // more details about why the upstream method may have failed.
- log_err(log::elevel::info,"asio async_read_at_least",ec);
- }
- }
- if (handler) {
- handler(tec,bytes_transferred);
- } else {
- // This can happen in cases where the connection is terminated while
- // the transport is waiting on a read.
- m_alog->write(log::alevel::devel,
- "handle_async_read called with null read handler");
- }
- }
- /// Initiate a potentially asyncronous write of the given buffer
- void async_write(const char* buf, size_t len, write_handler handler) {
- m_bufs.push_back(lib::asio::buffer(buf,len));
- if (config::enable_multithreading) {
- lib::asio::async_write(
- socket_con_type::get_socket(),
- m_bufs,
- m_strand->wrap(make_custom_alloc_handler(
- m_write_handler_allocator,
- lib::bind(
- &type::handle_async_write, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- ))
- );
- } else {
- lib::asio::async_write(
- socket_con_type::get_socket(),
- m_bufs,
- make_custom_alloc_handler(
- m_write_handler_allocator,
- lib::bind(
- &type::handle_async_write, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- )
- );
- }
- }
- /// Initiate a potentially asyncronous write of the given buffers
- void async_write(std::vector<buffer> const & bufs, write_handler handler) {
- std::vector<buffer>::const_iterator it;
- for (it = bufs.begin(); it != bufs.end(); ++it) {
- m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len));
- }
- if (config::enable_multithreading) {
- lib::asio::async_write(
- socket_con_type::get_socket(),
- m_bufs,
- m_strand->wrap(make_custom_alloc_handler(
- m_write_handler_allocator,
- lib::bind(
- &type::handle_async_write, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- ))
- );
- } else {
- lib::asio::async_write(
- socket_con_type::get_socket(),
- m_bufs,
- make_custom_alloc_handler(
- m_write_handler_allocator,
- lib::bind(
- &type::handle_async_write, get_shared(),
- handler,
- lib::placeholders::_1, lib::placeholders::_2
- )
- )
- );
- }
- }
- /// Async write callback
- /**
- * @param ec The status code
- * @param bytes_transferred The number of bytes read
- */
- void handle_async_write(write_handler handler, lib::asio::error_code const & ec, size_t) {
- m_bufs.clear();
- lib::error_code tec;
- if (ec) {
- log_err(log::elevel::info,"asio async_write",ec);
- tec = make_error_code(transport::error::pass_through);
- }
- if (handler) {
- handler(tec);
- } else {
- // This can happen in cases where the connection is terminated while
- // the transport is waiting on a read.
- m_alog->write(log::alevel::devel,
- "handle_async_write called with null write handler");
- }
- }
- /// Set Connection Handle
- /**
- * See common/connection_hdl.hpp for information
- *
- * @param hdl A connection_hdl that the transport will use to refer
- * to itself
- */
- void set_handle(connection_hdl hdl) {
- m_connection_hdl = hdl;
- socket_con_type::set_handle(hdl);
- }
- /// Trigger the on_interrupt handler
- /**
- * This needs to be thread safe
- */
- lib::error_code interrupt(interrupt_handler handler) {
- if (config::enable_multithreading) {
- m_io_service->post(m_strand->wrap(handler));
- } else {
- m_io_service->post(handler);
- }
- return lib::error_code();
- }
- lib::error_code dispatch(dispatch_handler handler) {
- if (config::enable_multithreading) {
- m_io_service->post(m_strand->wrap(handler));
- } else {
- m_io_service->post(handler);
- }
- return lib::error_code();
- }
- /*void handle_interrupt(interrupt_handler handler) {
- handler();
- }*/
- /// close and clean up the underlying socket
- void async_shutdown(shutdown_handler callback) {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,"asio connection async_shutdown");
- }
- timer_ptr shutdown_timer;
- shutdown_timer = set_timer(
- config::timeout_socket_shutdown,
- lib::bind(
- &type::handle_async_shutdown_timeout,
- get_shared(),
- shutdown_timer,
- callback,
- lib::placeholders::_1
- )
- );
- socket_con_type::async_shutdown(
- lib::bind(
- &type::handle_async_shutdown,
- get_shared(),
- shutdown_timer,
- callback,
- lib::placeholders::_1
- )
- );
- }
- /// Async shutdown timeout handler
- /**
- * @param shutdown_timer A pointer to the timer to keep it in scope
- * @param callback The function to call back
- * @param ec The status code
- */
- void handle_async_shutdown_timeout(timer_ptr, init_handler callback,
- lib::error_code const & ec)
- {
- lib::error_code ret_ec;
- if (ec) {
- if (ec == transport::error::operation_aborted) {
- m_alog->write(log::alevel::devel,
- "asio socket shutdown timer cancelled");
- return;
- }
- log_err(log::elevel::devel,"asio handle_async_shutdown_timeout",ec);
- ret_ec = ec;
- } else {
- ret_ec = make_error_code(transport::error::timeout);
- }
- m_alog->write(log::alevel::devel,
- "Asio transport socket shutdown timed out");
- cancel_socket_checked();
- callback(ret_ec);
- }
- void handle_async_shutdown(timer_ptr shutdown_timer, shutdown_handler
- callback, lib::asio::error_code const & ec)
- {
- if (ec == lib::asio::error::operation_aborted ||
- lib::asio::is_neg(shutdown_timer->expires_from_now()))
- {
- m_alog->write(log::alevel::devel,"async_shutdown cancelled");
- return;
- }
- shutdown_timer->cancel();
- lib::error_code tec;
- if (ec) {
- if (ec == lib::asio::error::not_connected) {
- // The socket was already closed when we tried to close it. This
- // happens periodically (usually if a read or write fails
- // earlier and if it is a real error will be caught at another
- // level of the stack.
- } else {
- // We don't know anything more about this error, give our
- // socket/security policy a crack at it.
- tec = socket_con_type::translate_ec(ec);
- m_tec = ec;
- // all other errors are effectively pass through errors of
- // some sort so print some detail on the info channel for
- // library users to look up if needed.
- log_err(log::elevel::info,"asio async_shutdown",ec);
- }
- } else {
- if (m_alog->static_test(log::alevel::devel)) {
- m_alog->write(log::alevel::devel,
- "asio con handle_async_shutdown");
- }
- }
- callback(tec);
- }
- /// Cancel the underlying socket and log any errors
- void cancel_socket_checked() {
- lib::asio::error_code cec = socket_con_type::cancel_socket();
- if (cec) {
- if (cec == lib::asio::error::operation_not_supported) {
- // cancel not supported on this OS, ignore and log at dev level
- m_alog->write(log::alevel::devel, "socket cancel not supported");
- } else {
- log_err(log::elevel::warn, "socket cancel failed", cec);
- }
- }
- }
- private:
- /// Convenience method for logging the code and message for an error_code
- template <typename error_type>
- void log_err(log::level l, const char * msg, const error_type & ec) {
- std::stringstream s;
- s << msg << " error: " << ec << " (" << ec.message() << ")";
- m_elog->write(l,s.str());
- }
- // static settings
- const bool m_is_server;
- lib::shared_ptr<alog_type> m_alog;
- lib::shared_ptr<elog_type> m_elog;
- struct proxy_data {
- proxy_data() : timeout_proxy(config::timeout_proxy) {}
- request_type req;
- response_type res;
- std::string write_buf;
- lib::asio::streambuf read_buf;
- long timeout_proxy;
- timer_ptr timer;
- };
- std::string m_proxy;
- lib::shared_ptr<proxy_data> m_proxy_data;
- // transport resources
- io_service_ptr m_io_service;
- strand_ptr m_strand;
- connection_hdl m_connection_hdl;
- std::vector<lib::asio::const_buffer> m_bufs;
- /// Detailed internal error code
- lib::asio::error_code m_tec;
- // Handlers
- tcp_init_handler m_tcp_pre_init_handler;
- tcp_init_handler m_tcp_post_init_handler;
- handler_allocator m_read_handler_allocator;
- handler_allocator m_write_handler_allocator;
- };
- } // namespace asio
- } // namespace transport
- } // namespace websocketpp
- #endif // WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP
|