Update yasio

This commit is contained in:
halx99 2021-11-08 14:43:24 +08:00
parent ac6636641b
commit 10f2220b47
2 changed files with 51 additions and 73 deletions

View File

@ -79,10 +79,7 @@ SOFTWARE.
#define yasio__setbits(x, m) ((x) |= (m))
#define yasio__clearbits(x, m) ((x) &= ~(m))
#define yasio__testbits(x, m) ((x) & (m))
#define yasio__testlobyte(x, v) (((x) & (uint16_t)0x00ff) == (v))
#define yasio__setlobyte(x, v) ((x) = ((x) & ~((decltype(x))0xff)) | (v))
#define yasio__lobyte(x) ((x) & (uint16_t)0x00ff)
#if defined(_MSC_VER)
# pragma warning(push)
@ -118,11 +115,11 @@ enum
/* whether multicast loopback, if 1, local machine can recv self multicast packet */
YCPF_MCAST_LOOPBACK = 1 << 18,
/* whether host modified */
YCPF_HOST_MOD = 1 << 19,
/* whether host dirty */
YCPF_HOST_DIRTY = 1 << 19,
/* whether port modified */
YCPF_PORT_MOD = 1 << 20,
/* whether port dirty */
YCPF_PORT_DIRTY = 1 << 20,
/* host is domain name, needs resolve */
YCPF_NEEDS_RESOLVE = 1 << 21,
@ -273,8 +270,7 @@ void io_channel::join_multicast_group()
// ttl
socket_->set_optval(multiaddr_.af() == AF_INET ? IPPROTO_IP : IPPROTO_IPV6, multiaddr_.af() == AF_INET ? IP_MULTICAST_TTL : IPV6_MULTICAST_HOPS,
YASIO_DEFAULT_MULTICAST_TTL);
int ret = configure_multicast_group(true);
if (yasio__unlikely(ret != 0))
if (configure_multicast_group(true) != 0)
{
int ec = xxsocket::get_last_errno();
YASIO_KLOGE("[index: %d] join to multicast group %s failed, ec=%d, detail:%s", this->index_, multiaddr_.to_string().c_str(), ec, xxsocket::strerror(ec));
@ -311,7 +307,7 @@ void io_channel::set_host(cxx17::string_view host)
if (this->remote_host_ != host)
{
cxx17::assign(this->remote_host_, host);
yasio__setbits(properties_, YCPF_HOST_MOD);
yasio__setbits(properties_, YCPF_HOST_DIRTY);
}
}
void io_channel::set_port(u_short port)
@ -321,7 +317,7 @@ void io_channel::set_port(u_short port)
if (this->remote_port_ != port)
{
this->remote_port_ = port;
yasio__setbits(properties_, YCPF_PORT_MOD);
yasio__setbits(properties_, YCPF_PORT_DIRTY);
}
}
int io_channel::__builtin_decode_len(void* d, int n)
@ -755,7 +751,6 @@ void io_service::start(event_cb_t cb)
{
if (state_ == io_service::state::IDLE)
{
this->stop_flag_ = 0;
auto& global_state = yasio__shared_globals();
if (!this->options_.print_)
this->options_.print_ = global_state.cprint_;
@ -827,7 +822,7 @@ void io_service::initialize(const io_hostent* channel_eps, int channel_count)
options_.resolv_ = [=](std::vector<ip::endpoint>& eps, const char* host, unsigned short port) { return this->resolve(eps, host, port); };
register_descriptor(interrupter_.read_descriptor(), YEM_POLLIN);
// Create channels
// create channels
create_channels(channel_eps, channel_count);
#if !defined(YASIO_HAVE_CARES)
@ -844,15 +839,12 @@ void io_service::finalize()
std::unique_lock<cxx17::shared_mutex> lck(*life_mutex_);
life_token_.reset();
#endif
destroy_channels();
unregister_descriptor(interrupter_.read_descriptor(), YEM_POLLIN);
options_.on_event_ = nullptr;
options_.resolv_ = nullptr;
/// purge transport pool memory
for (auto o : tpool_)
::operator delete(o);
tpool_.clear();
@ -997,7 +989,7 @@ void io_service::process_channels(fd_set* fds_array)
bool finish = true;
if (yasio__testbits(ctx->properties_, YCM_CLIENT))
{
if (yasio__unlikely(yasio__testbits(ctx->opmask_, YOPM_OPEN)))
if (yasio__testbits(ctx->opmask_, YOPM_OPEN))
{
yasio__clearbits(ctx->opmask_, YOPM_OPEN);
ctx->state_ = io_base::state::RESOLVING;
@ -1005,7 +997,7 @@ void io_service::process_channels(fd_set* fds_array)
switch (static_cast<io_base::state>(ctx->state_))
{
case io_base::state::OPENING:
case io_base::state::CONNECTING:
do_connect_completion(ctx, fds_array);
break;
case io_base::state::RESOLVING:
@ -1014,7 +1006,6 @@ void io_service::process_channels(fd_set* fds_array)
else if (ctx->error_ != EINPROGRESS)
handle_connect_failed(ctx, ctx->error_);
break;
default:;
}
finish = ctx->error_ != EINPROGRESS;
}
@ -1071,10 +1062,9 @@ bool io_service::is_open(int index) const
auto ctx = channel_at(index);
return ctx != nullptr && ctx->state_ == io_base::state::OPENED;
}
void io_service::open(size_t index, int kind)
bool io_service::open(size_t index, int kind)
{
assert((kind > 0 && kind <= 0xff) && ((kind & (kind - 1)) != 0));
auto ctx = channel_at(index);
if (ctx != nullptr)
{
@ -1083,9 +1073,9 @@ void io_service::open(size_t index, int kind)
ctx->socktype_ = SOCK_STREAM;
else if (yasio__testbits(kind, YCM_UDP))
ctx->socktype_ = SOCK_DGRAM;
open_internal(ctx);
return open_internal(ctx);
}
return false;
}
io_channel* io_service::channel_at(size_t index) const { return (index < channels_.size()) ? channels_[index] : nullptr; }
void io_service::handle_close(transport_handle_t thandle)
@ -1170,7 +1160,7 @@ void io_service::do_connect(io_channel* ctx)
if (ctx->socket_->is_open())
cleanup_io(ctx);
ctx->state_ = io_base::state::OPENING;
ctx->state_ = io_base::state::CONNECTING;
auto& ep = ctx->remote_eps_[0];
YASIO_KLOGD("[index: %d] connecting server %s(%s):%u...", ctx->index_, ctx->remote_host_.c_str(), ep.ip().c_str(), ctx->remote_port_);
if (ctx->socket_->open(ep.af(), ctx->socktype_))
@ -1182,7 +1172,7 @@ void io_service::do_connect(io_channel* ctx)
ctx->socket_->exclusive_address(true);
if (ctx->local_port_ != 0 || !ctx->local_host_.empty() || yasio__testbits(ctx->properties_, YCM_UDP))
{
if (yasio__likely(!yasio__testbits(ctx->properties_, YCM_UDS)))
if (!yasio__testbits(ctx->properties_, YCM_UDS))
{
auto ifaddr = ctx->local_host_.empty() ? YASIO_ADDR_ANY(ep.af()) : ctx->local_host_.c_str();
ret = ctx->socket_->bind(ifaddr, ctx->local_port_);
@ -1230,11 +1220,11 @@ void io_service::do_connect(io_channel* ctx)
void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array)
{
assert(ctx->state_ == io_base::state::OPENING && yasio__testbits(ctx->properties_, YCM_TCP) && yasio__testbits(ctx->properties_, YCM_CLIENT));
if (ctx->state_ == io_base::state::OPENING)
assert(ctx->state_ == io_base::state::CONNECTING);
if (ctx->state_ == io_base::state::CONNECTING)
{
#if !defined(YASIO_SSL_BACKEND)
int error = -1;
#if !defined(YASIO_SSL_BACKEND)
if (FD_ISSET(ctx->socket_->native_handle(), &fds_array[write_op]) || FD_ISSET(ctx->socket_->native_handle(), &fds_array[read_op]))
{
if (ctx->socket_->get_optval(SOL_SOCKET, SO_ERROR, error) >= 0 && error == 0)
@ -1245,13 +1235,11 @@ void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array)
}
else
handle_connect_failed(ctx, error);
ctx->timer_.cancel(*this);
}
#else
if (!yasio__testbits(ctx->properties_, YCPF_SSL_HANDSHAKING))
{
int error = -1;
if (FD_ISSET(ctx->socket_->native_handle(), &fds_array[write_op]) || FD_ISSET(ctx->socket_->native_handle(), &fds_array[read_op]))
{
if (ctx->socket_->get_optval(SOL_SOCKET, SO_ERROR, error) >= 0 && error == 0)
@ -1269,8 +1257,7 @@ void io_service::do_connect_completion(io_channel* ctx, fd_set* fds_array)
}
else
do_ssl_handshake(ctx);
if (ctx->state_ != io_base::state::OPENING)
if (ctx->state_ != io_base::state::CONNECTING)
ctx->timer_.cancel(*this);
#endif
}
@ -1441,26 +1428,22 @@ void io_service::ares_getaddrinfo_cb(void* arg, int status, int /*timeouts*/, ar
auto ctx = (io_channel*)arg;
auto& current_service = ctx->get_service();
current_service.ares_work_finished();
if (status == ARES_SUCCESS && answerlist != nullptr)
{
for (auto ai = answerlist->nodes; ai != nullptr; ai = ai->ai_next)
{
if (ai->ai_family == AF_INET6 || ai->ai_family == AF_INET)
{
ctx->remote_eps_.push_back(ip::endpoint(ai->ai_addr));
break;
}
}
}
auto __get_cprint = [&]() -> const print_fn2_t& { return current_service.options_.print_; };
if (!ctx->remote_eps_.empty())
{
ctx->resolved_time_ = highp_clock();
ctx->last_resolved_time_ = highp_clock();
# if defined(YASIO_ENABLE_ARES_PROFILER)
YASIO_KLOGD("[index: %d] ares_getaddrinfo_cb: resolve %s succeed, cost:%g(ms)", ctx->index_, ctx->remote_host_.c_str(),
(ctx->resolved_time_ - ctx->ares_start_time_) / 1000.0);
(ctx->last_resolved_time_ - ctx->ares_start_time_) / 1000.0);
# endif
}
else
@ -1469,7 +1452,6 @@ void io_service::ares_getaddrinfo_cb(void* arg, int status, int /*timeouts*/, ar
YASIO_KLOGE("[index: %d] ares_getaddrinfo_cb: resolve %s failed, status=%d, detail:%s", ctx->index_, ctx->remote_host_.c_str(), status,
::ares_strerror(status));
}
current_service.interrupt();
}
void io_service::process_ares_requests(fd_set* fds_array)
@ -1553,7 +1535,7 @@ void io_service::do_accept(io_channel* ctx)
cleanup_channel(ctx);
ip::endpoint ep;
if (yasio__likely(!yasio__testbits(ctx->properties_, YCM_UDS)))
if (!yasio__testbits(ctx->properties_, YCM_UDS))
{
// server: don't need resolve, don't use remote_eps_
auto ifaddr = ctx->remote_host_.empty() ? YASIO_ADDR_ANY(local_address_family()) : ctx->remote_host_.c_str();
@ -1771,7 +1753,7 @@ transport_handle_t io_service::allocate_transport(io_channel* ctx, std::shared_p
if (yasio__testbits(ctx->properties_, YCM_TCP))
{ // tcp like transport
#if defined(YASIO_SSL_BACKEND)
if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_SSL)))
if (yasio__testbits(ctx->properties_, YCM_SSL))
{
transport = new (vp) io_transport_ssl(ctx, socket);
break;
@ -1782,7 +1764,7 @@ transport_handle_t io_service::allocate_transport(io_channel* ctx, std::shared_p
else // udp like transport
{
#if defined(YASIO_HAVE_KCP)
if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_KCP)))
if (yasio__testbits(ctx->properties_, YCM_KCP))
{
transport = new (vp) io_transport_kcp(ctx, socket);
break;
@ -1841,10 +1823,8 @@ bool io_service::do_read(transport_handle_t transport, fd_set* fds_array)
break;
}
}
else
{ // process incompleted pdu
else // process incompleted pdu
unpack(transport, transport->expected_size_ - static_cast<int>(transport->expected_packet_.size()), n, 0);
}
}
else
{ // n < 0, regard as connection should close
@ -1921,12 +1901,12 @@ void io_service::remove_timer(highp_timer* timer)
}
}
}
void io_service::open_internal(io_channel* ctx)
bool io_service::open_internal(io_channel* ctx)
{
if (ctx->state_ == io_base::state::OPENING)
{ // in-opening, do nothing
YASIO_KLOGD("[index: %d] the channel is in opening!", ctx->index_);
return;
if (ctx->state_ == io_base::state::CONNECTING || ctx->state_ == io_base::state::RESOLVING)
{
YASIO_KLOGD("[index: %d] the channel open operation is in progress!", ctx->index_);
return false;
}
yasio__clearbits(ctx->opmask_, YOPM_CLOSE);
@ -1940,6 +1920,7 @@ void io_service::open_internal(io_channel* ctx)
this->channel_ops_mtx_.unlock();
this->interrupt();
return true;
}
bool io_service::shutdown_internal(transport_handle_t transport)
{
@ -2005,7 +1986,6 @@ int io_service::do_select(fd_set* fdsa, highp_time_t wait_duration)
YASIO_KLOGV("[core] socket.select max_nfds_:%d waiting... %ld milliseconds", max_nfds_, waitd_tv.tv_sec * 1000 + waitd_tv.tv_usec / 1000);
int retval = ::select(this->max_nfds_, &(fdsa[read_op]), &(fdsa[write_op]), nullptr, &waitd_tv);
YASIO_KLOGV("[core] socket.select waked up, retval=%d", retval);
return retval;
}
highp_time_t io_service::get_timeout(highp_time_t usec)
@ -2053,13 +2033,13 @@ bool io_service::cleanup_io(io_base* obj, bool clear_mask)
int io_service::do_resolve(io_channel* ctx)
{
if (yasio__unlikely(yasio__testbits(ctx->properties_, YCPF_HOST_MOD)))
if (yasio__testbits(ctx->properties_, YCPF_HOST_DIRTY))
{
yasio__clearbits(ctx->properties_, YCPF_HOST_MOD);
yasio__clearbits(ctx->properties_, YCPF_HOST_DIRTY);
ctx->remote_eps_.clear();
ip::endpoint ep;
#if defined(YASIO_ENABLE_UDS) && YASIO__HAS_UDS
if (yasio__unlikely(yasio__testbits(ctx->properties_, YCM_UDS)))
if (yasio__testbits(ctx->properties_, YCM_UDS))
{
ep.as_un(ctx->remote_host_.c_str());
ctx->remote_eps_.push_back(ep);
@ -2072,37 +2052,35 @@ int io_service::do_resolve(io_channel* ctx)
yasio__setbits(ctx->properties_, YCPF_NEEDS_RESOLVE);
}
if (yasio__unlikely(yasio__testbits(ctx->properties_, YCPF_PORT_MOD)))
if (yasio__testbits(ctx->properties_, YCPF_PORT_DIRTY))
{
yasio__clearbits(ctx->properties_, YCPF_PORT_MOD);
yasio__clearbits(ctx->properties_, YCPF_PORT_DIRTY);
if (!ctx->remote_eps_.empty())
for (auto& ep : ctx->remote_eps_)
ep.port(ctx->remote_port_);
}
if (yasio__likely(!ctx->remote_eps_.empty()))
if (!ctx->remote_eps_.empty())
{
if (!yasio__testbits(ctx->properties_, YCPF_NEEDS_RESOLVE))
return 0;
update_dns_status();
if (yasio__likely((highp_clock() - ctx->resolved_time_) < options_.dns_cache_timeout_))
if ((highp_clock() - ctx->last_resolved_time_) < options_.dns_cache_timeout_)
return 0;
ctx->remote_eps_.clear();
}
if (yasio__unlikely(!yasio__testbits(ctx->properties_, YCPF_NAME_RESOLVING)))
if (!ctx->remote_host_.empty())
{
if (yasio__likely(!ctx->remote_host_.empty()))
if (!yasio__testbits(ctx->properties_, YCPF_NAME_RESOLVING))
start_resolve(ctx);
else
ctx->error_ = yasio::errc::no_available_address;
}
else
ctx->error_ = yasio::errc::no_available_address;
return -1;
}
void io_service::start_resolve(io_channel* ctx)
{ // Only call at event-loop thread, so
// no need to consider thread safe.
{
yasio__setbits(ctx->properties_, YCPF_NAME_RESOLVING);
ctx->set_last_errno(EINPROGRESS);
YASIO_KLOGD("[index: %d] resolving %s", ctx->index_, ctx->remote_host_.c_str());
@ -2136,8 +2114,8 @@ void io_service::start_resolve(io_channel* ctx)
return;
if (error == 0)
{
ctx->remote_eps_ = std::move(remote_eps);
ctx->resolved_time_ = highp_clock();
ctx->remote_eps_ = std::move(remote_eps);
ctx->last_resolved_time_ = highp_clock();
# if defined(YASIO_ENABLE_ARES_PROFILER)
YASIO_KLOGD("[index: %d] resolve %s succeed, cost: %g(ms)", ctx->index_, ctx->remote_host_.c_str(),
(ctx->resolved_time_ - ctx->ares_start_time_) / 1000.0);
@ -2172,7 +2150,7 @@ void io_service::update_dns_status()
recreate_ares_channel();
#endif
for (auto channel : this->channels_)
channel->resolved_time_ = 0;
channel->last_resolved_time_ = 0;
}
}
int io_service::resolve(std::vector<ip::endpoint>& endpoints, const char* hostname, unsigned short port)

View File

@ -415,8 +415,8 @@ struct YASIO_API io_base {
enum class state : uint8_t
{
CLOSED,
RESOLVING,
OPENING,
RESOLVING, // for client only
CONNECTING, // for client only
OPENED,
};
enum class error_stage : uint8_t
@ -563,7 +563,7 @@ private:
u_short remote_port_ = 0;
// The last domain name resolved time in microseconds for dns cache support
highp_time_t resolved_time_ = 0;
highp_time_t last_resolved_time_ = 0;
int index_;
int socktype_ = 0;
@ -969,7 +969,7 @@ public:
YASIO__DECL void set_option_internal(int opt, va_list args);
// open a channel, default: YCK_TCP_CLIENT
YASIO__DECL void open(size_t index, int kind = YCK_TCP_CLIENT);
YASIO__DECL bool open(size_t index, int kind = YCK_TCP_CLIENT);
// check whether the channel is open
YASIO__DECL bool is_open(int index) const;
@ -1043,7 +1043,7 @@ private:
// Try to dispose thread and other resources, service state will be IDLE when succeed
YASIO__DECL void handle_stop();
YASIO__DECL void open_internal(io_channel*);
YASIO__DECL bool open_internal(io_channel*);
YASIO__DECL void process_transports(fd_set* fds_array);
YASIO__DECL void process_channels(fd_set* fds_array);