////////////////////////////////////////////////////////////////////////////////////////// // A multi-platform support c++11 library with focus on asynchronous socket I/O for any // client application. ////////////////////////////////////////////////////////////////////////////////////////// /* The MIT License (MIT) Copyright (c) 2012-2023 HALX99 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef YASIO__IO_SERVICE_HPP #define YASIO__IO_SERVICE_HPP #include #include #include #include #include #include #include #include #include "yasio/core/sz.hpp" #include "yasio/config.hpp" #include "yasio/core/singleton.hpp" #include "yasio/core/concurrent_queue.hpp" #include "yasio/core/utils.hpp" #include "yasio/core/errc.hpp" #include "yasio/stl/memory.hpp" #include "yasio/stl/string_view.hpp" #include "yasio/core/object_pool.hpp" #include "yasio/core/byte_buffer.hpp" #include "yasio/core/xxsocket.hpp" #include "yasio/core/io_watcher.hpp" #if !defined(YASIO_USE_CARES) # include "yasio/stl/shared_mutex.hpp" #endif #if defined(YASIO_SSL_BACKEND) typedef struct ssl_ctx_st yssl_ctx_st; struct yssl_st; #endif #if defined(YASIO_USE_CARES) typedef struct ares_channeldata* ares_channel; typedef struct ares_addrinfo ares_addrinfo; #endif #define yasio__find(cont, v) ::std::find(cont.begin(), cont.end(), v) #define yasio__find_if(cont, pred) ::std::find_if(cont.begin(), cont.end(), pred) namespace yasio { YASIO__NS_INLINE namespace inet { // options enum { // lgtm [cpp/irregular-enum-init] // Set whether disable internal dispatch, if yes // user must invoke dispatch on thread which care about // network events, it's useful for game engine update ui // when recv network events. // params: no_dispatch:int(0) YOPT_S_NO_DISPATCH = 1, // Set custom resolve function, native C++ ONLY. // params: func:resolv_fn_t* // remarks: you must ensure thread safe of it. YOPT_S_RESOLV_FN, // Set custom print function, native C++ ONLY. // parmas: func:print_fn_t // remarks: you must ensure thread safe of it. YOPT_S_PRINT_FN, // Set custom print function, native C++ ONLY. // parmas: func:print_fn2_t // remarks: you must ensure thread safe of it. YOPT_S_PRINT_FN2, // Set event callback // params: func:event_cb_t* // remarks: this callback will be invoke at io_service::dispatch caller thread YOPT_S_EVENT_CB, // Sets callback before enque event to defer queue. // params: func:defer_event_cb_t* // remarks: this callback invoke at io_service thread YOPT_S_DEFER_EVENT_CB, // Set tcp keepalive in seconds, probes is tries. // params: idle:int(7200), interal:int(75), probes:int(10) YOPT_S_TCP_KEEPALIVE, // Don't start a new thread to run event loop // params: value:int(0) YOPT_S_NO_NEW_THREAD, // Sets ssl verification cert, if empty, don't verify // params: path:const char* YOPT_S_SSL_CACERT, // Set connect timeout in seconds // params: connect_timeout:int(10) YOPT_S_CONNECT_TIMEOUT, // Set connect timeout in milliseconds // params: connect_timeout : int(10000), YOPT_S_CONNECT_TIMEOUTMS, // Set dns cache timeout in seconds // params: dns_cache_timeout : int(600), YOPT_S_DNS_CACHE_TIMEOUT, // Set dns cache timeout in milliseconds // params: dns_cache_timeout : int(600000), YOPT_S_DNS_CACHE_TIMEOUTMS, // Set dns queries timeout in seconds, default is: 5 // params: dns_queries_timeout : int(5) // remarks: // a. this option must be set before 'io_service::start' // b. only works when have c-ares // c. the timeout algorithm of c-ares is complicated, usually, by default, dns queries // will failed with timeout after more than 75 seconds. // d. for more detail, please see: // https://c-ares.haxx.se/ares_init_options.html YOPT_S_DNS_QUERIES_TIMEOUT, // Set dns queries timeout in milliseconds, default is: 5000 // see also: YOPT_S_DNS_QUERIES_TIMEOUT YOPT_S_DNS_QUERIES_TIMEOUTMS, // Set dns queries tries when timeout reached, default is: 4 // params: dns_queries_tries : int(4) // remarks: // a. this option must be set before 'io_service::start' // b. relative option: YOPT_S_DNS_QUERIES_TIMEOUT YOPT_S_DNS_QUERIES_TRIES, // Set dns server dirty // params: reserved : int(1) // remarks: you should set this option after your device network changed YOPT_S_DNS_DIRTY, // Set custom dns servers // params: servers: const char* // remarks: // a. IPv4 address is 8.8.8.8 or 8.8.8.8:53, the port is optional // b. IPv6 addresses with ports require square brackets [fe80::1%lo0]:53 YOPT_S_DNS_LIST, // Set ssl server cert and private key file // params: // crtfile: const char* // keyfile: const char* YOPT_S_SSL_CERT, // Set whether forward packet without GC alloc // params: forward: int(0) // reamrks: // when forward packet enabled, the packet will always dispach when recv data from OS kernel immediately YOPT_S_FORWARD_PACKET, // Sets channel length field based frame decode function, native C++ ONLY // params: index:int, func:decode_len_fn_t* YOPT_C_UNPACK_FN = 101, YOPT_C_LFBFD_FN = YOPT_C_UNPACK_FN, // Sets channel length field based frame decode params // params: // index:int, // max_frame_length:int(10MBytes), // length_field_offset:int(-1), // length_field_length:int(4), // length_adjustment:int(0), YOPT_C_UNPACK_PARAMS, YOPT_C_LFBFD_PARAMS = YOPT_C_UNPACK_PARAMS, // Sets channel length field based frame decode initial bytes to strip // params: // index:int, // initial_bytes_to_strip:int(0) YOPT_C_UNPACK_STRIP, YOPT_C_LFBFD_IBTS = YOPT_C_UNPACK_STRIP, // Sets channel remote host // params: index:int, ip:const char* YOPT_C_REMOTE_HOST, // Sets channel remote port // params: index:int, port:int YOPT_C_REMOTE_PORT, // Sets channel remote endpoint // params: index:int, ip:const char*, port:int YOPT_C_REMOTE_ENDPOINT, // Sets local host for client channel only // params: index:int, ip:const char* YOPT_C_LOCAL_HOST, // Sets local port for client channel only // params: index:int, port:int YOPT_C_LOCAL_PORT, // Sets local endpoint for client channel only // params: index:int, ip:const char*, port:int YOPT_C_LOCAL_ENDPOINT, // Mods channl flags // params: index:int, flagsToAdd:int, flagsToRemove:int YOPT_C_MOD_FLAGS, // Sets channel multicast interface, required on BSD-like system // params: index:int, multi_ifaddr:const char* // remarks: // a. On BSD-like(APPLE, etc...) system: ipv6 addr must be "::1%lo0" or "::%en0" YOPT_C_MCAST_IF, // Enable channel multicast mode // params: index:int, multi_addr:const char*, loopback:int, // remarks: // a. On BSD-like(APPLE, etc...) system: ipv6 addr must be: "ff02::1%lo0" or "ff02::1%en0" // refer to: https://www.tldp.org/HOWTO/Multicast-HOWTO-2.html YOPT_C_ENABLE_MCAST, // Disable channel multicast mode // params: index:int YOPT_C_DISABLE_MCAST, // The kcp conv id, must equal in two endpoint from the same connection // params: index:int, conv:int YOPT_C_KCP_CONV, // Whether never perform bswap for length field // params: index:int, no_bswap:int(0) YOPT_C_UNPACK_NO_BSWAP, // Change 4-tuple association for io_transport_udp // params: transport:transport_handle_t // remarks: only works for udp client transport YOPT_T_CONNECT, // Dissolve 4-tuple association for io_transport_udp // params: transport:transport_handle_t // remarks: only works for udp client transport YOPT_T_DISCONNECT, // Sets io_base sockopt // params: io_base*,level:int,optname:int,optval:int,optlen:int YOPT_B_SOCKOPT = 201, }; // channel masks: only for internal use, not for user enum { YCM_CLIENT = 1, YCM_SERVER = 1 << 1, YCM_TCP = 1 << 2, YCM_UDP = 1 << 3, YCM_KCP = 1 << 4, YCM_SSL = 1 << 5, YCM_UDS = 1 << 6, // IPC: posix domain socket }; // channel kinds: for user to call io_service::open enum { YCK_TCP_CLIENT = YCM_TCP | YCM_CLIENT, YCK_TCP_SERVER = YCM_TCP | YCM_SERVER, YCK_UDP_CLIENT = YCM_UDP | YCM_CLIENT, YCK_UDP_SERVER = YCM_UDP | YCM_SERVER, YCK_KCP_CLIENT = YCM_KCP | YCM_CLIENT | YCM_UDP, YCK_KCP_SERVER = YCM_KCP | YCM_SERVER | YCM_UDP, YCK_SSL_CLIENT = YCK_TCP_CLIENT | YCM_SSL, YCK_SSL_SERVER = YCK_TCP_SERVER | YCM_SSL, }; // channel flags enum { /* Whether setsockopt SO_REUSEADDR and SO_REUSEPORT */ YCF_REUSEADDR = 1 << 9, /* For winsock security issue, see: https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse */ YCF_EXCLUSIVEADDRUSE = 1 << 10, }; // event kinds enum { YEK_ON_OPEN = 1, YEK_ON_CLOSE, YEK_ON_PACKET, YEK_CONNECT_RESPONSE = YEK_ON_OPEN, // implicit deprecated alias YEK_CONNECTION_LOST = YEK_ON_CLOSE, // implicit deprecated alias YEK_PACKET = YEK_ON_PACKET, // implicit deprecated alias }; // the network core service log level enum { YLOG_V, YLOG_D, YLOG_I, YLOG_W, YLOG_E, }; // class fwds class highp_timer; class io_send_op; class io_sendto_op; class io_event; class io_channel; class io_transport; class io_transport_tcp; // tcp client/server class io_transport_ssl; // ssl client class io_transport_udp; // udp client/server class io_service; // recommand user always use transport_handle_t, in the future, it's maybe void* or intptr_t typedef io_transport* transport_handle_t; // typedefs typedef std::shared_ptr xxsocket_ptr; typedef std::unique_ptr send_op_ptr; typedef std::unique_ptr event_ptr; typedef std::shared_ptr highp_timer_ptr; typedef std::function timer_cb_t; typedef std::function timerv_cb_t; typedef std::function event_cb_t; typedef std::function defer_event_cb_t; typedef std::function completion_cb_t; typedef std::function decode_len_fn_t; typedef std::function&, const char*, unsigned short)> resolv_fn_t; typedef std::function print_fn_t; typedef std::function print_fn2_t; typedef std::pair timer_impl_t; // alias, for compatible purpose only typedef highp_timer deadline_timer; typedef highp_timer_ptr deadline_timer_ptr; typedef event_cb_t io_event_cb_t; typedef completion_cb_t io_completion_cb_t; // the ssl role enum ssl_role { YSSL_CLIENT, YSSL_SERVER, }; struct io_hostent { io_hostent() = default; io_hostent(cxx17::string_view ip, u_short port) : host_(cxx17::svtos(ip)), port_(port) {} io_hostent(io_hostent&& rhs) YASIO__NOEXCEPT : host_(std::move(rhs.host_)), port_(rhs.port_) {} io_hostent(const io_hostent& rhs) : host_(rhs.host_), port_(rhs.port_) {} void set_ip(cxx17::string_view ip) { cxx17::assign(host_, ip); } const std::string& get_ip() const { return host_; } void set_port(u_short port) { port_ = port; } u_short get_port() const { return port_; } std::string host_; u_short port_ = 0; }; class YASIO_API highp_timer { public: highp_timer(io_service& service) : service_(service){}; highp_timer(const highp_timer&) = delete; highp_timer(highp_timer&&) = delete; void expires_from_now(const std::chrono::microseconds& duration) { this->duration_ = duration; this->expire_time_ = yasio::steady_clock_t::now() + this->duration_; } void expires_from_now() { this->expire_time_ = yasio::steady_clock_t::now() + this->duration_; } // Wait timer timeout once. void async_wait_once(timerv_cb_t cb) { #if YASIO__HAS_CXX14 this->async_wait([cb = std::move(cb)](io_service& service) { #else this->async_wait([cb](io_service& service) { #endif cb(service); return true; }); } // Wait timer timeout // @retval of timer_cb_t: // true: wait once // false: wait again after expired YASIO__DECL void async_wait(timer_cb_t); // Cancel the timer YASIO__DECL void cancel(); // Check if timer is expired? bool expired() const { return this->wait_duration().count() <= 0; } // Gets wait duration of timer. YASIO__DECL std::chrono::microseconds wait_duration() const; io_service& service_; std::chrono::microseconds duration_ = {}; std::chrono::time_point expire_time_ = {}; }; struct YASIO_API io_base { enum class state : uint8_t { CLOSED, RESOLVING, // for client only CONNECTING, // for client only OPENED, }; enum class error_stage : uint8_t { NONE, READ, WRITE, OPEN_SOCKET, BIND_SOCKET, LISTEN_SOCKET }; io_base() : error_(0), state_(state::CLOSED), opmask_(0) { static unsigned int s_object_id = 0; this->id_ = ++s_object_id; } virtual ~io_base() {} void set_last_errno(int error, error_stage stage = error_stage::NONE) { error_ = error; error_stage_ = stage; } #if !defined(YASIO_MINIFY_EVENT) // usedata union { void* ptr; int ival; } ud_{}; #endif xxsocket_ptr socket_; int error_; // socket error(>= -1), application error(< -1) // 0: none, 1: read, 2: write error_stage error_stage_ = error_stage::NONE; // mark whether pollout event registerred. bool pollout_registerred_ = false; std::atomic state_; uint8_t opmask_; unsigned int id_; public: unsigned int id() const { return id_; } }; class YASIO_API io_channel : public io_base { friend class io_service; friend class io_transport; friend class io_transport_tcp; friend class io_transport_ssl; friend class io_transport_udp; public: io_service& get_service() const { return service_; } #if defined(YASIO_SSL_BACKEND) YASIO__DECL yssl_ctx_st* get_ssl_context(bool client) const; #endif int index() const { return index_; } u_short remote_port() const { return remote_port_; } YASIO__DECL std::string format_destination() const; long long bytes_transferred() const { return bytes_transferred_; } unsigned int connect_id() const { return connect_id_; } #if !defined(YASIO_NO_USER_TIMER) highp_timer& get_user_timer() { return this->user_timer_; } #endif protected: YASIO__DECL void enable_multicast(const char* addr, int loopback); YASIO__DECL void disable_multicast(); YASIO__DECL void join_multicast_group(); YASIO__DECL int configure_multicast_group(bool onoff); // For log macro only YASIO__DECL const print_fn2_t& __get_cprint() const; private: YASIO__DECL io_channel(io_service& service, int index); void set_address(cxx17::string_view host, u_short port) { set_host(host); set_port(port); } YASIO__DECL void set_host(cxx17::string_view host); YASIO__DECL void set_port(u_short port); void clear_mutable_flags() { properties_ &= 0x00ffffff; } // -1 indicate failed, connection will be closed YASIO__DECL int __builtin_decode_len(void* d, int n); io_service& service_; /* Since v3.33.0 mask,kind,flags,private_flags are stored to this field ** bit[1-8] mask & kinds ** bit[9-16] flags ** bit[17-24] byte1 of private flags ** bit[25~32] byte2 of private flags (mutable) */ uint32_t properties_ = 0; /* ** !!! tcp/udp client only, if not zero, will use it as fixed port, ** !!! otherwise system will generate a random port for local socket. */ u_short local_port_ = 0; /* ** !!! tcp/udp client, the port to connect ** !!! tcp/udp server, the port to listening */ u_short remote_port_ = 0; // The last query success time in microseconds for dns cache support highp_time_t query_success_time_ = 0; #if defined(YASIO_ENABLE_ARES_PROFILER) highp_time_t query_start_time_; #endif int index_; int socktype_ = 0; // The timer for check resolve & connect timeout highp_timer timer_; #if !defined(YASIO_NO_USER_TIMER) // The timer for user highp_timer user_timer_; #endif // The stream mode application protocol (based on tcp/udp/kcp) unpack params struct __unnamed01 { int max_frame_length = YASIO_SZ(10, M); // 10MBytes int length_field_offset = -1; // -1: directly, >= 0: store as 1~4bytes integer int length_field_length = 4; // 1,2,3,4 int length_adjustment = 0; int initial_bytes_to_strip = 0; int no_bswap = 0; } uparams_; decode_len_fn_t decode_len_; /* !!! for tcp/udp client to bind local specific network adapter, empty for any */ std::string local_host_; /* !!! for tcp/udp client to connect remote host. !!! for tcp/udp server to bind local specific network adapter, empty for any !!! for multicast, it's used as multicast address, doesn't connect even through recvfrom on packet from remote */ std::string remote_host_; std::vector remote_eps_; ip::endpoint multiaddr_, multiif_; // Current it's only for UDP sbyte_buffer buffer_; // The bytes transferred from socket low layer, currently, only works for client channel long long bytes_transferred_ = 0; unsigned int connect_id_ = 0; }; class io_send_buffer { public: explicit io_send_buffer(yasio::sbyte_buffer&& mutable_buffer) { mutable_buffer_ = std::move(mutable_buffer); data_ = mutable_buffer_.data(); size_ = mutable_buffer_.size(); } io_send_buffer(const char* const_buffer, size_t const_buffer_size) { data_ = const_buffer; size_ = const_buffer_size; } io_send_buffer(const io_send_buffer&) = delete; io_send_buffer(io_send_buffer&& rhs) YASIO__NOEXCEPT { mutable_buffer_ = std::move(rhs.mutable_buffer_); data_ = rhs.data_; size_ = rhs.size_; } bool empty() const { return size_ == 0; } const char* data() const { return data_; } size_t size() const { return size_; } private: yasio::sbyte_buffer mutable_buffer_; const char* data_; size_t size_; }; // for tcp transport only class YASIO_API io_send_op { public: io_send_op(io_send_buffer&& buffer, completion_cb_t&& handler) : offset_(0), buffer_(std::move(buffer)), handler_(std::move(handler)) {} virtual ~io_send_op() {} size_t offset_; // read pos from sending buffer io_send_buffer buffer_; // sending data buffer completion_cb_t handler_; YASIO__DECL virtual int perform(transport_handle_t transport, const void* buf, int n, int& error); #if !defined(YASIO_DISABLE_OBJECT_POOL) DEFINE_CONCURRENT_OBJECT_POOL_ALLOCATION(io_send_op, 128) #endif }; // for udp transport only class YASIO_API io_sendto_op : public io_send_op { public: io_sendto_op(io_send_buffer&& buffer, completion_cb_t&& handler, const ip::endpoint& destination) : io_send_op(std::move(buffer), std::move(handler)), destination_(destination) {} YASIO__DECL int perform(transport_handle_t transport, const void* buf, int n, int& error) override; #if !defined(YASIO_DISABLE_OBJECT_POOL) DEFINE_CONCURRENT_OBJECT_POOL_ALLOCATION(io_sendto_op, 128) #endif ip::endpoint destination_; }; class io_transport : public io_base { friend class io_service; friend class io_send_op; friend class io_sendto_op; friend class io_event; io_transport(const io_transport&) = delete; public: int cindex() const { return ctx_->index_; } ip::endpoint local_endpoint() const { return socket_->local_endpoint(); } virtual ip::endpoint remote_endpoint() const { return socket_->peer_endpoint(); } io_channel* get_context() const { return ctx_; } virtual ~io_transport() { ctx_ = nullptr; send_queue_.clear(); } protected: io_service& get_service() const { return ctx_->get_service(); } bool is_open() const { return state_ == state::OPENED && socket_ && socket_->is_open(); } sbyte_buffer fetch_packet() { expected_size_ = -1; return std::move(expected_packet_); } // For log macro only YASIO__DECL const print_fn2_t& __get_cprint() const; // Call at user thread YASIO__DECL virtual int write(io_send_buffer&&, completion_cb_t&&); // Call at user thread virtual int write_to(io_send_buffer&&, const ip::endpoint&, completion_cb_t&&) { YASIO_LOG("[warning] io_transport doesn't support 'write_to' operation!"); return 0; } YASIO__DECL int call_read(void* data, int size, int revent, int& error); YASIO__DECL int call_write(io_send_op*, int& error); YASIO__DECL void complete_op(io_send_op*, int error); // Call at io_service YASIO__DECL virtual int do_read(int revent, int& error, highp_time_t& wait_duration); // Call at io_service, try flush pending packet YASIO__DECL virtual bool do_write(highp_time_t& wait_duration); // Sets the underlying layer socket io primitives. YASIO__DECL virtual void set_primitives(); YASIO__DECL io_transport(io_channel* ctx, xxsocket_ptr&& s); bool is_valid() const { return ctx_ != nullptr; } char buffer_[YASIO_INET_BUFFER_SIZE]; // recv buffer, 64K int offset_ = 0; // recv buffer offset int expected_size_ = -1; sbyte_buffer expected_packet_; io_channel* ctx_; std::function write_cb_; std::function read_cb_; privacy::concurrent_queue send_queue_; }; class YASIO_API io_transport_tcp : public io_transport { friend class io_service; public: io_transport_tcp(io_channel* ctx, xxsocket_ptr&& s); }; #if defined(YASIO_SSL_BACKEND) class io_transport_ssl : public io_transport_tcp { public: YASIO__DECL io_transport_ssl(io_channel* ctx, xxsocket_ptr&& s); YASIO__DECL void set_primitives() override; YASIO__DECL void do_ssl_shutdown(); protected: YASIO__DECL int do_ssl_handshake(int& error); // always invoke at do_read yssl_st* ssl_ = nullptr; }; #else class io_transport_ssl {}; #endif class YASIO_API io_transport_udp : public io_transport { friend class io_service; public: YASIO__DECL io_transport_udp(io_channel* ctx, xxsocket_ptr&& s); YASIO__DECL ~io_transport_udp(); YASIO__DECL ip::endpoint remote_endpoint() const override; protected: YASIO__DECL void connect(); YASIO__DECL void disconnect(); YASIO__DECL int write(io_send_buffer&&, completion_cb_t&&) override; YASIO__DECL int write_to(io_send_buffer&&, const ip::endpoint&, completion_cb_t&&) override; YASIO__DECL void set_primitives() override; // ensure destination for sendto valid, if not, assign from ctx_->remote_eps_[0] YASIO__DECL const ip::endpoint& ensure_destination() const; // configure remote with specific endpoint YASIO__DECL void confgure_remote(const ip::endpoint& peer); // process received data from low level YASIO__DECL virtual int handle_input(const char* data, int bytes_transferred, int& error, highp_time_t& wait_duration); ip::endpoint peer_; // for recv only, unstable mutable ip::endpoint destination_; // for sendto only, stable bool connected_ = false; }; using io_packet = sbyte_buffer; #if !defined(YASIO_USE_SHARED_PACKET) using packet_t = io_packet; inline packet_t wrap_packet(io_packet& raw_packet) { return std::move(raw_packet); } inline bool is_packet_empty(packet_t& pkt) { return pkt.empty(); } inline io_packet& forward_packet(packet_t& pkt) { return pkt; } inline io_packet&& forward_packet(packet_t&& pkt) { return std::move(pkt); } inline io_packet::pointer packet_data(packet_t& pkt) { return pkt.data(); } inline io_packet::size_type packet_len(packet_t& pkt) { return pkt.size(); } #else using packet_t = std::shared_ptr; inline packet_t wrap_packet(io_packet& raw_packet) { return std::make_shared(std::move(raw_packet)); } inline bool is_packet_empty(packet_t& pkt) { return !pkt; } inline io_packet& forward_packet(packet_t& pkt) { return *pkt; } inline io_packet&& forward_packet(packet_t&& pkt) { return std::move(*pkt); } inline io_packet::pointer packet_data(packet_t& pkt) { return pkt->data(); } inline io_packet::size_type packet_len(packet_t& pkt) { return pkt->size(); } #endif class io_packet_view { public: io_packet_view() = default; io_packet_view(char* d, int n) : data_(d), size_(n) {} char* data() { return this->data_; } const char* data() const { return this->data_; } size_t size() const { return this->size_; } private: char* data_ = nullptr; size_t size_ = 0; }; /* * Notes: store some properties of event source to make sure user can safe get them deferred */ class io_event final { public: io_event(int cidx, int kind, int status, io_channel* source /*not nullable*/, int passive = 0) : kind_(kind), writable_(0), passive_(passive), status_(status), cindex_(cidx), source_id_(source->id_), source_(source) { #if !defined(YASIO_MINIFY_EVENT) source_ud_ = source_->ud_.ptr; #endif } io_event(int cidx, int kind, int status, io_transport* source /*not nullable*/) : kind_(kind), writable_(1), passive_(0), status_(status), cindex_(cidx), source_id_(source->id_), source_(source) { #if !defined(YASIO_MINIFY_EVENT) source_ud_ = source_->ud_.ptr; #endif } io_event(int cidx, io_packet&& pkt, io_transport* source /*not nullable*/) : kind_(YEK_ON_PACKET), writable_(1), passive_(0), status_(0), cindex_(cidx), source_id_(source->id_), source_(source), packet_(wrap_packet(pkt)) { #if !defined(YASIO_MINIFY_EVENT) source_ud_ = source_->ud_.ptr; #endif } io_event(int cidx, io_packet_view pkt, io_transport* source /*not nullable*/) : kind_(YEK_ON_PACKET), writable_(1), passive_(0), status_(0), cindex_(cidx), source_id_(source->id_), source_(source), packet_view_(pkt) { #if !defined(YASIO_MINIFY_EVENT) source_ud_ = source_->ud_.ptr; #endif } io_event(const io_event&) = delete; io_event(io_event&& rhs) = delete; ~io_event() {} public: int cindex() const { return cindex_; } int status() const { return status_; } int kind() const { return kind_; } // whether the event triggered by server channel int passive() const { return passive_; } packet_t& packet() { return packet_; } io_packet_view packet_view() const { return packet_view_; } /*[nullable]*/ transport_handle_t transport() const { return writable_ ? static_cast(source_) : nullptr; } io_base* source() const { return source_; } unsigned int source_id() const { return source_id_; } #if !defined(YASIO_MINIFY_EVENT) /* Gets to transport user data when process this event */ template _Uty transport_ud() const { return (_Uty)(uintptr_t)source_ud_; } /* Sets trasnport user data when process this event */ template void transport_ud(_Uty uval) { source_ud_ = (void*)(uintptr_t)uval; auto t = this->transport(); if (t) t->ud_.ptr = (void*)(uintptr_t)uval; } highp_time_t timestamp() const { return timestamp_; } #endif #if !defined(YASIO_DISABLE_OBJECT_POOL) DEFINE_CONCURRENT_OBJECT_POOL_ALLOCATION(io_event, 128) #endif private: unsigned int kind_ : 30; unsigned int writable_ : 1; unsigned int passive_ : 1; int status_; int cindex_; unsigned int source_id_; io_base* source_; packet_t packet_; io_packet_view packet_view_; #if !defined(YASIO_MINIFY_EVENT) void* source_ud_; highp_time_t timestamp_ = highp_clock(); #endif }; class YASIO_API io_service // lgtm [cpp/class-many-fields] { friend class highp_timer; friend class io_transport; friend class io_transport_tcp; friend class io_transport_udp; #if defined(YASIO_SSL_BACKEND) friend class io_transport_ssl; #endif friend class io_channel; public: enum class state { UNINITIALIZED, IDLE, RUNNING, AT_EXITING, }; /* ** summary: init global state with custom print function, you must ensure thread safe of it. ** remark: ** a. this function is not required, if you don't want print init log to custom console. ** b. this function only works once ** c. you should call once before call any 'io_servic::start' */ YASIO__DECL static void init_globals(const yasio::inet::print_fn2_t&); YASIO__DECL static void cleanup_globals(); // the additional API to get rtt of tcp transport YASIO__DECL static unsigned int tcp_rtt(transport_handle_t); public: YASIO__DECL io_service(); YASIO__DECL io_service(int channel_count); YASIO__DECL io_service(const io_hostent& channel_eps); YASIO__DECL io_service(const std::vector& channel_eps); YASIO__DECL io_service(const io_hostent* channel_eps, int channel_count); YASIO__DECL ~io_service(); YASIO__DECL void start(event_cb_t cb); /* summary: stop the io_service ** ** remark: ** a. IF caller thread isn't service worker thread, the service state will be IDLE. ** b. IF caller thread is service worker thread, the service state will be STOPPING, ** then you needs to invoke this API again at other thread ** c. Strong recommend invoke this API at your event dispatch thread. */ YASIO__DECL void stop(); bool is_running() const { return this->state_ == io_service::state::RUNNING; } bool is_stopping() const { return !!this->stop_flag_; } // should call at the thread who care about async io // events(CONNECT_RESPONSE,CONNECTION_LOST,PACKET), such cocos2d-x opengl or // any other game engines' render thread. // returns: The remain events in queue YASIO__DECL size_t dispatch(int max_count = 128); // set option, see enum YOPT_XXX YASIO__DECL void set_option(int opt, ...); YASIO__DECL void set_option_internal(int opt, va_list args); // open a channel, default: YCK_TCP_CLIENT YASIO__DECL bool open(size_t index, int kind = YCK_TCP_CLIENT); // check whether the channel is open YASIO__DECL bool is_open(int index) const; // check whether the transport is open YASIO__DECL bool is_open(transport_handle_t) const; // close transport YASIO__DECL void close(transport_handle_t); // close channel YASIO__DECL void close(int index); /* ** summary: Write data to a TCP or connected UDP transport with last peer address ** retval: < 0: failed ** params: ** 'thandle': the transport to write, could be tcp/udp/kcp ** 'buf': the data to write ** 'len': the data len ** 'handler': send finish callback ** remark: ** + TCP/UDP: Use queue to store user message, flush at io_service thread ** + KCP: Use queue provided by kcp internal, flush at io_service thread */ int write(transport_handle_t thandle, const void* buf, size_t len, completion_cb_t completion_handler = nullptr) { return write(thandle, sbyte_buffer{(const char*)buf, (const char*)buf + len, std::true_type{}}, std::move(completion_handler)); } YASIO__DECL int write(transport_handle_t thandle, sbyte_buffer buffer, completion_cb_t completion_handler = nullptr); YASIO__DECL int forward(transport_handle_t thandle, const void* buf, size_t len, completion_cb_t completion_handler); /* ** Summary: Write data to unconnected UDP transport with specified address. ** retval: < 0: failed ** remark: This function only for UDP like transport (UDP or KCP) ** + UDP: Use queue to store user message, flush at io_service thread ** + KCP: Use the queue provided by kcp internal, flush at io_service thread */ int write_to(transport_handle_t thandle, const void* buf, size_t len, const ip::endpoint& to, completion_cb_t completion_handler = nullptr) { return write_to(thandle, sbyte_buffer{(const char*)buf, (const char*)buf + len, std::true_type{}}, to, std::move(completion_handler)); } YASIO__DECL int write_to(transport_handle_t thandle, sbyte_buffer buffer, const ip::endpoint& to, completion_cb_t completion_handler = nullptr); YASIO__DECL int forward_to(transport_handle_t thandle, const void* buf, size_t len, const ip::endpoint& to, completion_cb_t completion_handler); // The highp_timer support, !important, the callback is called on the thread of io_service YASIO__DECL highp_timer_ptr schedule(const std::chrono::microseconds& duration, timer_cb_t); YASIO__DECL int resolve(std::vector& endpoints, const char* hostname, unsigned short port = 0); // Gets channel by index YASIO__DECL io_channel* channel_at(size_t index) const; YASIO__DECL static const char* strerror(int error); private: YASIO__DECL void do_stop(uint8_t flags); YASIO__DECL void schedule_timer(highp_timer*, timer_cb_t&&); YASIO__DECL void remove_timer(highp_timer*); std::vector::iterator find_timer(highp_timer* key) { return yasio__find_if(timer_queue_, [=](const timer_impl_t& timer) { return timer.first == key; }); } void sort_timers() { std::sort(this->timer_queue_.begin(), this->timer_queue_.end(), [](const timer_impl_t& lhs, const timer_impl_t& rhs) { return lhs.first->expire_time_ > rhs.first->expire_time_; }); } // Start a async domain name query YASIO__DECL void start_query(io_channel*); YASIO__DECL void initialize(const io_hostent* channel_eps /* could be nullptr */, int channel_count); YASIO__DECL void finalize(); // Try to dispose thread and other resources, service state will be IDLE when succeed YASIO__DECL void handle_stop(); YASIO__DECL bool open_internal(io_channel*); YASIO__DECL void process_transports(); YASIO__DECL void process_channels(); YASIO__DECL void process_timers(); YASIO__DECL void process_deferred_events(); YASIO__DECL void wakeup(); YASIO__DECL highp_time_t get_timeout(highp_time_t usec); YASIO__DECL int do_resolve(io_channel* ctx); YASIO__DECL void do_connect(io_channel*); YASIO__DECL void do_connect_completion(io_channel*); #if defined(YASIO_SSL_BACKEND) YASIO__DECL yssl_ctx_st* init_ssl_context(ssl_role role); YASIO__DECL void cleanup_ssl_context(ssl_role role); #endif #if defined(YASIO_USE_CARES) YASIO__DECL static void ares_getaddrinfo_cb(void* data, int status, int timeouts, ares_addrinfo* answerlist); YASIO__DECL static void ares_sock_state_cb(void* data, socket_native_type socket_fd, int readable, int writable); YASIO__DECL void ares_work_started(); YASIO__DECL void ares_work_finished(); YASIO__DECL int ares_get_fds(socket_native_type* socks, highp_time_t& waitd_usec); YASIO__DECL void do_ares_process_fds(socket_native_type* socks, int count); YASIO__DECL void recreate_ares_channel(); YASIO__DECL void config_ares_name_servers(); YASIO__DECL void destroy_ares_channel(); #endif void handle_connect_succeed(io_channel* ctx, xxsocket_ptr s) { handle_connect_succeed(allocate_transport(ctx, std::move(s))); } YASIO__DECL void handle_connect_succeed(transport_handle_t); YASIO__DECL void handle_connect_failed(io_channel*, int ec); YASIO__DECL void active_transport(transport_handle_t); YASIO__DECL transport_handle_t allocate_transport(io_channel*, xxsocket_ptr&&); YASIO__DECL void deallocate_transport(transport_handle_t); // The major non-blocking event-loop YASIO__DECL void run(void); YASIO__DECL bool do_read(transport_handle_t); bool do_write(transport_handle_t transport) { return transport->do_write(this->wait_duration_); } YASIO__DECL void unpack(transport_handle_t, int bytes_expected, int bytes_transferred, int bytes_to_strip); YASIO__DECL bool cleanup_channel(io_channel* channel, bool clear_mask = true); YASIO__DECL bool cleanup_io(io_base* obj, bool clear_mask = true); YASIO__DECL void handle_close(transport_handle_t); template inline void fire_event(_Types&&... args) { auto event = cxx14::make_unique(std::forward<_Types>(args)...); if (options_.on_defer_event_ && options_.on_defer_event_(event)) return; events_.emplace(std::move(event)); } template inline void forward_packet(_Types&&... args) { options_.on_event_(cxx14::make_unique(std::forward<_Types>(args)...)); } // new/delete client socket connection channel // please call this at initialization, don't new channel at runtime // dynmaically: because this API is not thread safe. YASIO__DECL void create_channels(const io_hostent* eps, int count); YASIO__DECL void destroy_channels(); // destroy all channels YASIO__DECL void clear_transports(); // clear all transports YASIO__DECL bool close_internal(io_channel*); // supporting server YASIO__DECL void do_accept(io_channel*); YASIO__DECL void do_accept_completion(io_channel*); /* ** summary: For udp-server only, make dgram handle to communicate with client */ YASIO__DECL transport_handle_t do_dgram_accept(io_channel*, const ip::endpoint& peer, int& error); int local_address_family() const { return ((ipsv_ & ipsv_ipv4) || !ipsv_) ? AF_INET : AF_INET6; } YASIO__DECL void update_dns_status(); bool address_expired(io_channel* ctx) const { return (highp_clock() - ctx->query_success_time_) > options_.dns_cache_timeout_; } /* For log macro only */ inline const print_fn2_t& __get_cprint() const { return options_.print_; } void update_time() { this->time_ = yasio::steady_clock_t::now(); } private: state state_ = state::UNINITIALIZED; // The service state std::thread worker_; std::thread::id worker_id_; /* The current time according to the event loop. in msecs. */ std::chrono::time_point time_; privacy::concurrent_queue events_; std::vector channels_; std::recursive_mutex channel_ops_mtx_; std::vector channel_ops_; std::vector transports_; std::vector tpool_; // timer support timer_pair, back is earliest expire timer std::vector timer_queue_; std::recursive_mutex timer_queue_mtx_; // the next wait duration for socket.select highp_time_t wait_duration_; io_watcher io_watcher_; // options struct __unnamed_options { highp_time_t connect_timeout_ = 10LL * std::micro::den; highp_time_t dns_cache_timeout_ = 600LL * std::micro::den; highp_time_t dns_queries_timeout_ = 5LL * std::micro::den; int dns_queries_tries_ = 5; bool dns_dirty_ = false; bool deferred_event_ = true; defer_event_cb_t on_defer_event_; bool no_dispatch_ = false; // since v4.0.0 bool forward_packet_ = false; // since v3.39.8 // tcp keepalive settings struct __unnamed01 { int onoff = 0; int idle = 7200; int interval = 75; int probs = 10; } tcp_keepalive_; bool no_new_thread_ = false; // The resolve function resolv_fn_t resolv_; // the event callback event_cb_t on_event_; // The custom debug print function print_fn2_t print_; #if defined(YASIO_SSL_BACKEND) // SSL client, the full path cacert(.pem) file for ssl verifaction std::string cafile_; // SSL server std::string crtfile_; std::string keyfile_; #endif #if defined(YASIO_USE_CARES) std::string name_servers_; #endif } options_; // The ip stack version supported by localhost u_short ipsv_ = 0; // The stop flag to notify all transports needs close uint8_t stop_flag_ = 0; #if defined(YASIO_SSL_BACKEND) yssl_ctx_st* ssl_roles_[2]; #endif #if defined(YASIO_USE_CARES) ares_channel ares_ = nullptr; // the ares handle for non blocking io dns resolve support int ares_outstanding_work_ = 0; #else // we need life_token + life_mutex struct life_token {}; std::shared_ptr life_token_; std::shared_ptr life_mutex_; #endif }; // io_service } // namespace inet #if !YASIO__HAS_CXX11 using namespace yasio::inet; #endif } /* namespace yasio */ #define yasio_shared_service yasio::singleton::instance #if defined(YASIO_HEADER_ONLY) # include "yasio/core/io_service.cpp" // lgtm [cpp/include-non-header] #endif #endif