diff --git a/thirdparty/yasio/yasio.cpp b/thirdparty/yasio/yasio.cpp index edaca2a1f0..b7e527b688 100644 --- a/thirdparty/yasio/yasio.cpp +++ b/thirdparty/yasio/yasio.cpp @@ -79,10 +79,7 @@ SOFTWARE. #define yasio__setbits(x, m) ((x) |= (m)) #define yasio__clearbits(x, m) ((x) &= ~(m)) #define yasio__testbits(x, m) ((x) & (m)) - -#define yasio__testlobyte(x, v) (((x) & (uint16_t)0x00ff) == (v)) #define yasio__setlobyte(x, v) ((x) = ((x) & ~((decltype(x))0xff)) | (v)) -#define yasio__lobyte(x) ((x) & (uint16_t)0x00ff) #if defined(_MSC_VER) # pragma warning(push) @@ -118,11 +115,11 @@ enum /* whether multicast loopback, if 1, local machine can recv self multicast packet */ YCPF_MCAST_LOOPBACK = 1 << 18, - /* whether host modified */ - YCPF_HOST_MOD = 1 << 19, + /* whether host dirty */ + YCPF_HOST_DIRTY = 1 << 19, - /* whether port modified */ - YCPF_PORT_MOD = 1 << 20, + /* whether port dirty */ + YCPF_PORT_DIRTY = 1 << 20, /* host is domain name, needs resolve */ YCPF_NEEDS_RESOLVE = 1 << 21, @@ -273,8 +270,7 @@ void io_channel::join_multicast_group() // ttl socket_->set_optval(multiaddr_.af() == AF_INET ? IPPROTO_IP : IPPROTO_IPV6, multiaddr_.af() == AF_INET ? IP_MULTICAST_TTL : IPV6_MULTICAST_HOPS, YASIO_DEFAULT_MULTICAST_TTL); - int ret = configure_multicast_group(true); - if (yasio__unlikely(ret != 0)) + if (configure_multicast_group(true) != 0) { int ec = xxsocket::get_last_errno(); YASIO_KLOGE("[index: %d] join to multicast group %s failed, ec=%d, detail:%s", this->index_, multiaddr_.to_string().c_str(), ec, xxsocket::strerror(ec)); @@ -311,7 +307,7 @@ void io_channel::set_host(cxx17::string_view host) if (this->remote_host_ != host) { cxx17::assign(this->remote_host_, host); - yasio__setbits(properties_, YCPF_HOST_MOD); + yasio__setbits(properties_, YCPF_HOST_DIRTY); } } void io_channel::set_port(u_short port) @@ -321,7 +317,7 @@ void io_channel::set_port(u_short port) if (this->remote_port_ != port) { this->remote_port_ = port; - yasio__setbits(properties_, YCPF_PORT_MOD); + yasio__setbits(properties_, YCPF_PORT_DIRTY); } } int io_channel::__builtin_decode_len(void* d, int n) @@ -755,7 +751,6 @@ void io_service::start(event_cb_t cb) { if (state_ == io_service::state::IDLE) { - this->stop_flag_ = 0; auto& global_state = yasio__shared_globals(); if (!this->options_.print_) this->options_.print_ = global_state.cprint_; @@ -827,7 +822,7 @@ void io_service::initialize(const io_hostent* channel_eps, int channel_count) options_.resolv_ = [=](std::vector& eps, const char* host, unsigned short port) { return this->resolve(eps, host, port); }; register_descriptor(interrupter_.read_descriptor(), YEM_POLLIN); - // Create channels + // create channels create_channels(channel_eps, channel_count); #if !defined(YASIO_HAVE_CARES) @@ -844,15 +839,12 @@ void io_service::finalize() std::unique_lock lck(*life_mutex_); life_token_.reset(); #endif - destroy_channels(); - unregister_descriptor(interrupter_.read_descriptor(), YEM_POLLIN); options_.on_event_ = nullptr; options_.resolv_ = nullptr; - /// purge transport pool memory for (auto o : tpool_) ::operator delete(o); tpool_.clear(); @@ -997,7 +989,7 @@ void io_service::process_channels(fd_set* fds_array) bool finish = true; if (yasio__testbits(ctx->properties_, YCM_CLIENT)) { - if (yasio__unlikely(yasio__testbits(ctx->opmask_, YOPM_OPEN))) + if (yasio__testbits(ctx->opmask_, YOPM_OPEN)) { yasio__clearbits(ctx->opmask_, YOPM_OPEN); ctx->state_ = io_base::state::RESOLVING; @@ -1005,7 +997,7 @@ void io_service::process_channels(fd_set* fds_array) switch (static_cast(ctx->state_)) { - case io_base::state::OPENING: + case io_base::state::CONNECTING: do_connect_completion(ctx, fds_array); break; case io_base::state::RESOLVING: @@ -1014,7 +1006,6 @@ void io_service::process_channels(fd_set* fds_array) else if (ctx->error_ != EINPROGRESS) handle_connect_failed(ctx, ctx->error_); break; - default:; } finish = ctx->error_ != EINPROGRESS; } @@ -1071,10 +1062,9 @@ bool io_service::is_open(int index) const auto ctx = channel_at(index); return ctx != nullptr && ctx->state_ == io_base::state::OPENED; } -void io_service::open(size_t index, int kind) +bool io_service::open(size_t index, int kind) { assert((kind > 0 && kind <= 0xff) && ((kind & (kind - 1)) != 0)); - auto ctx = channel_at(index); if (ctx != nullptr) { @@ -1083,9 +1073,9 @@ void io_service::open(size_t index, int kind) ctx->socktype_ = SOCK_STREAM; else if (yasio__testbits(kind, YCM_UDP)) ctx->socktype_ = SOCK_DGRAM; - - open_internal(ctx); + return open_internal(ctx); } + return false; } io_channel* io_service::channel_at(size_t index) const { return (index < channels_.size()) ? channels_[index] : nullptr; } void io_service::handle_close(transport_handle_t thandle) @@ -1170,7 +1160,7 @@ void io_service::do_connect(io_channel* ctx) if (ctx->socket_->is_open()) cleanup_io(ctx); - ctx->state_ = io_base::state::OPENING; + ctx->state_ = io_base::state::CONNECTING; auto& ep = ctx->remote_eps_[0]; YASIO_KLOGD("[index: %d] connecting server %s(%s):%u...", ctx->index_, ctx->remote_host_.c_str(), ep.ip().c_str(), ctx->remote_port_); if (ctx->socket_->open(ep.af(), ctx->socktype_)) @@ -1182,7 +1172,7 @@ void io_service::do_connect(io_channel* ctx) ctx->socket_->exclusive_address(true); if (ctx->local_port_ != 0 || !ctx->local_host_.empty() || yasio__testbits(ctx->properties_, YCM_UDP)) { - if (yasio__likely(!yasio__testbits(ctx->properties_, YCM_UDS))) + if (!yasio__testbits(ctx->properties_, YCM_UDS)) { auto ifaddr = ctx->local_host_.empty() ? YASIO_ADDR_ANY(ep.af()) : ctx->local_host_.c_str(); ret = ctx->socket_->bind(ifaddr, ctx->local_port_); @@ -1230,11 +1220,11 @@ void io_service::do_connect(io_channel* ctx) void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array) { - assert(ctx->state_ == io_base::state::OPENING && yasio__testbits(ctx->properties_, YCM_TCP) && yasio__testbits(ctx->properties_, YCM_CLIENT)); - if (ctx->state_ == io_base::state::OPENING) + assert(ctx->state_ == io_base::state::CONNECTING); + if (ctx->state_ == io_base::state::CONNECTING) { -#if !defined(YASIO_SSL_BACKEND) int error = -1; +#if !defined(YASIO_SSL_BACKEND) if (FD_ISSET(ctx->socket_->native_handle(), &fds_array[write_op]) || FD_ISSET(ctx->socket_->native_handle(), &fds_array[read_op])) { if (ctx->socket_->get_optval(SOL_SOCKET, SO_ERROR, error) >= 0 && error == 0) @@ -1245,13 +1235,11 @@ void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array) } else handle_connect_failed(ctx, error); - ctx->timer_.cancel(*this); } #else if (!yasio__testbits(ctx->properties_, YCPF_SSL_HANDSHAKING)) { - int error = -1; if (FD_ISSET(ctx->socket_->native_handle(), &fds_array[write_op]) || FD_ISSET(ctx->socket_->native_handle(), &fds_array[read_op])) { if (ctx->socket_->get_optval(SOL_SOCKET, SO_ERROR, error) >= 0 && error == 0) @@ -1269,8 +1257,7 @@ void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array) } else do_ssl_handshake(ctx); - - if (ctx->state_ != io_base::state::OPENING) + if (ctx->state_ != io_base::state::CONNECTING) ctx->timer_.cancel(*this); #endif } @@ -1441,26 +1428,22 @@ void io_service::ares_getaddrinfo_cb(void* arg, int status, int /*timeouts*/, ar auto ctx = (io_channel*)arg; auto& current_service = ctx->get_service(); current_service.ares_work_finished(); - if (status == ARES_SUCCESS && answerlist != nullptr) { for (auto ai = answerlist->nodes; ai != nullptr; ai = ai->ai_next) { if (ai->ai_family == AF_INET6 || ai->ai_family == AF_INET) - { ctx->remote_eps_.push_back(ip::endpoint(ai->ai_addr)); - break; - } } } auto __get_cprint = [&]() -> const print_fn2_t& { return current_service.options_.print_; }; if (!ctx->remote_eps_.empty()) { - ctx->resolved_time_ = highp_clock(); + ctx->last_resolved_time_ = highp_clock(); # if defined(YASIO_ENABLE_ARES_PROFILER) YASIO_KLOGD("[index: %d] ares_getaddrinfo_cb: resolve %s succeed, cost:%g(ms)", ctx->index_, ctx->remote_host_.c_str(), - (ctx->resolved_time_ - ctx->ares_start_time_) / 1000.0); + (ctx->last_resolved_time_ - ctx->ares_start_time_) / 1000.0); # endif } else @@ -1469,7 +1452,6 @@ void io_service::ares_getaddrinfo_cb(void* arg, int status, int /*timeouts*/, ar YASIO_KLOGE("[index: %d] ares_getaddrinfo_cb: resolve %s failed, status=%d, detail:%s", ctx->index_, ctx->remote_host_.c_str(), status, ::ares_strerror(status)); } - current_service.interrupt(); } void io_service::process_ares_requests(fd_set* fds_array) @@ -1553,7 +1535,7 @@ void io_service::do_accept(io_channel* ctx) cleanup_channel(ctx); ip::endpoint ep; - if (yasio__likely(!yasio__testbits(ctx->properties_, YCM_UDS))) + if (!yasio__testbits(ctx->properties_, YCM_UDS)) { // server: don't need resolve, don't use remote_eps_ auto ifaddr = ctx->remote_host_.empty() ? YASIO_ADDR_ANY(local_address_family()) : ctx->remote_host_.c_str(); @@ -1771,7 +1753,7 @@ transport_handle_t io_service::allocate_transport(io_channel* ctx, std::shared_p if (yasio__testbits(ctx->properties_, YCM_TCP)) { // tcp like transport #if defined(YASIO_SSL_BACKEND) - if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_SSL))) + if (yasio__testbits(ctx->properties_, YCM_SSL)) { transport = new (vp) io_transport_ssl(ctx, socket); break; @@ -1782,7 +1764,7 @@ transport_handle_t io_service::allocate_transport(io_channel* ctx, std::shared_p else // udp like transport { #if defined(YASIO_HAVE_KCP) - if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_KCP))) + if (yasio__testbits(ctx->properties_, YCM_KCP)) { transport = new (vp) io_transport_kcp(ctx, socket); break; @@ -1841,10 +1823,8 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array) break; } } - else - { // process incompleted pdu + else // process incompleted pdu unpack(transport, transport->expected_size_ - static_cast(transport->expected_packet_.size()), n, 0); - } } else { // n < 0, regard as connection should close @@ -1921,12 +1901,12 @@ void io_service::remove_timer(highp_timer* timer) } } } -void io_service::open_internal(io_channel* ctx) +bool io_service::open_internal(io_channel* ctx) { - if (ctx->state_ == io_base::state::OPENING) - { // in-opening, do nothing - YASIO_KLOGD("[index: %d] the channel is in opening!", ctx->index_); - return; + if (ctx->state_ == io_base::state::CONNECTING || ctx->state_ == io_base::state::RESOLVING) + { + YASIO_KLOGD("[index: %d] the channel open operation is in progress!", ctx->index_); + return false; } yasio__clearbits(ctx->opmask_, YOPM_CLOSE); @@ -1940,6 +1920,7 @@ void io_service::open_internal(io_channel* ctx) this->channel_ops_mtx_.unlock(); this->interrupt(); + return true; } bool io_service::shutdown_internal(transport_handle_t transport) { @@ -2005,7 +1986,6 @@ int io_service::do_select(fd_set* fdsa, highp_time_t wait_duration) YASIO_KLOGV("[core] socket.select max_nfds_:%d waiting... %ld milliseconds", max_nfds_, waitd_tv.tv_sec * 1000 + waitd_tv.tv_usec / 1000); int retval = ::select(this->max_nfds_, &(fdsa[read_op]), &(fdsa[write_op]), nullptr, &waitd_tv); YASIO_KLOGV("[core] socket.select waked up, retval=%d", retval); - return retval; } highp_time_t io_service::get_timeout(highp_time_t usec) @@ -2053,13 +2033,13 @@ bool io_service::cleanup_io(io_base* obj, bool clear_mask) int io_service::do_resolve(io_channel* ctx) { - if (yasio__unlikely(yasio__testbits(ctx->properties_, YCPF_HOST_MOD))) + if (yasio__testbits(ctx->properties_, YCPF_HOST_DIRTY)) { - yasio__clearbits(ctx->properties_, YCPF_HOST_MOD); + yasio__clearbits(ctx->properties_, YCPF_HOST_DIRTY); ctx->remote_eps_.clear(); ip::endpoint ep; #if defined(YASIO_ENABLE_UDS) && YASIO__HAS_UDS - if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_UDS))) + if (yasio__testbits(ctx->properties_, YCM_UDS)) { ep.as_un(ctx->remote_host_.c_str()); ctx->remote_eps_.push_back(ep); @@ -2072,37 +2052,35 @@ int io_service::do_resolve(io_channel* ctx) yasio__setbits(ctx->properties_, YCPF_NEEDS_RESOLVE); } - if (yasio__unlikely(yasio__testbits(ctx->properties_, YCPF_PORT_MOD))) + if (yasio__testbits(ctx->properties_, YCPF_PORT_DIRTY)) { - yasio__clearbits(ctx->properties_, YCPF_PORT_MOD); + yasio__clearbits(ctx->properties_, YCPF_PORT_DIRTY); if (!ctx->remote_eps_.empty()) for (auto& ep : ctx->remote_eps_) ep.port(ctx->remote_port_); } - if (yasio__likely(!ctx->remote_eps_.empty())) + if (!ctx->remote_eps_.empty()) { if (!yasio__testbits(ctx->properties_, YCPF_NEEDS_RESOLVE)) return 0; - update_dns_status(); - if (yasio__likely((highp_clock() - ctx->resolved_time_) < options_.dns_cache_timeout_)) + if ((highp_clock() - ctx->last_resolved_time_) < options_.dns_cache_timeout_) return 0; ctx->remote_eps_.clear(); } - if (yasio__unlikely(!yasio__testbits(ctx->properties_, YCPF_NAME_RESOLVING))) + if (!ctx->remote_host_.empty()) { - if (yasio__likely(!ctx->remote_host_.empty())) + if (!yasio__testbits(ctx->properties_, YCPF_NAME_RESOLVING)) start_resolve(ctx); - else - ctx->error_ = yasio::errc::no_available_address; } + else + ctx->error_ = yasio::errc::no_available_address; return -1; } void io_service::start_resolve(io_channel* ctx) -{ // Only call at event-loop thread, so - // no need to consider thread safe. +{ yasio__setbits(ctx->properties_, YCPF_NAME_RESOLVING); ctx->set_last_errno(EINPROGRESS); YASIO_KLOGD("[index: %d] resolving %s", ctx->index_, ctx->remote_host_.c_str()); @@ -2136,8 +2114,8 @@ void io_service::start_resolve(io_channel* ctx) return; if (error == 0) { - ctx->remote_eps_ = std::move(remote_eps); - ctx->resolved_time_ = highp_clock(); + ctx->remote_eps_ = std::move(remote_eps); + ctx->last_resolved_time_ = highp_clock(); # if defined(YASIO_ENABLE_ARES_PROFILER) YASIO_KLOGD("[index: %d] resolve %s succeed, cost: %g(ms)", ctx->index_, ctx->remote_host_.c_str(), (ctx->resolved_time_ - ctx->ares_start_time_) / 1000.0); @@ -2172,7 +2150,7 @@ void io_service::update_dns_status() recreate_ares_channel(); #endif for (auto channel : this->channels_) - channel->resolved_time_ = 0; + channel->last_resolved_time_ = 0; } } int io_service::resolve(std::vector& endpoints, const char* hostname, unsigned short port) diff --git a/thirdparty/yasio/yasio.hpp b/thirdparty/yasio/yasio.hpp index 2926a3fb9c..8834ab17c5 100644 --- a/thirdparty/yasio/yasio.hpp +++ b/thirdparty/yasio/yasio.hpp @@ -415,8 +415,8 @@ struct YASIO_API io_base { enum class state : uint8_t { CLOSED, - RESOLVING, - OPENING, + RESOLVING, // for client only + CONNECTING, // for client only OPENED, }; enum class error_stage : uint8_t @@ -563,7 +563,7 @@ private: u_short remote_port_ = 0; // The last domain name resolved time in microseconds for dns cache support - highp_time_t resolved_time_ = 0; + highp_time_t last_resolved_time_ = 0; int index_; int socktype_ = 0; @@ -969,7 +969,7 @@ public: YASIO__DECL void set_option_internal(int opt, va_list args); // open a channel, default: YCK_TCP_CLIENT - YASIO__DECL void open(size_t index, int kind = YCK_TCP_CLIENT); + YASIO__DECL bool open(size_t index, int kind = YCK_TCP_CLIENT); // check whether the channel is open YASIO__DECL bool is_open(int index) const; @@ -1043,7 +1043,7 @@ private: // Try to dispose thread and other resources, service state will be IDLE when succeed YASIO__DECL void handle_stop(); - YASIO__DECL void open_internal(io_channel*); + YASIO__DECL bool open_internal(io_channel*); YASIO__DECL void process_transports(fd_set* fds_array); YASIO__DECL void process_channels(fd_set* fds_array);