Improve HttpClient, remove unsafe API `sendSync`

This commit is contained in:
halx99 2023-04-03 19:54:58 +08:00
parent 72251dcf86
commit 2f1a4a9921
6 changed files with 133 additions and 138 deletions

View File

@ -123,17 +123,15 @@ HttpClient::HttpClient()
_scheduler = Director::getInstance()->getScheduler(); _scheduler = Director::getInstance()->getScheduler();
_service = new yasio::io_service(HttpClient::MAX_CHANNELS); _service = new yasio::io_service(HttpClient::MAX_CHANNELS);
_service->set_option(yasio::YOPT_S_FORWARD_EVENT, 1); _service->set_option(yasio::YOPT_S_DEFERRED_EVENT, 1);
_service->set_option(yasio::YOPT_S_DNS_QUERIES_TIMEOUT, 3); _service->set_option(yasio::YOPT_S_DNS_QUERIES_TIMEOUT, 3);
_service->set_option(yasio::YOPT_S_DNS_QUERIES_TRIES, 1); _service->set_option(yasio::YOPT_S_DNS_QUERIES_TRIES, 1);
_service->start([this](yasio::event_ptr&& e) { handleNetworkEvent(e.get()); }); _service->start([this](yasio::event_ptr&& e) { handleNetworkEvent(e.get()); });
for (int i = 0; i < HttpClient::MAX_CHANNELS; ++i) for (int i = 0; i < HttpClient::MAX_CHANNELS; ++i)
{
_availChannelQueue.unsafe_emplace_back(i); _availChannelQueue.unsafe_emplace_back(i);
}
_scheduler->schedule([this](float) { dispatchResponseCallbacks(); }, this, 0, false, "#"); setDispatchOnWorkThread(false);
_isInited = true; _isInited = true;
} }
@ -159,7 +157,26 @@ void HttpClient::setDispatchOnWorkThread(bool bVal)
_scheduler->unscheduleAllForTarget(this); _scheduler->unscheduleAllForTarget(this);
_dispatchOnWorkThread = bVal; _dispatchOnWorkThread = bVal;
if (!bVal) if (!bVal)
_scheduler->schedule([this](float) { dispatchResponseCallbacks(); }, this, 0, false, "#"); _scheduler->schedule([this](float) { tickInput(); }, this, 0, false, "#");
}
// Poll and notify main thread if responses exists in queue
void HttpClient::tickInput()
{
_service->dispatch();
if (_finishedResponseQueue.unsafe_empty())
return;
auto lck = _finishedResponseQueue.get_lock();
if (!_finishedResponseQueue.unsafe_empty())
{
HttpResponse* response = _finishedResponseQueue.front();
_finishedResponseQueue.unsafe_pop_front();
lck.unlock();
invokeResposneCallbackAndRelease(response);
}
} }
void HttpClient::handleNetworkStatusChanged() void HttpClient::handleNetworkStatusChanged()
@ -183,19 +200,12 @@ bool HttpClient::send(HttpRequest* request)
return false; return false;
auto response = new HttpResponse(request); auto response = new HttpResponse(request);
processResponse(response, request->getUrl()); response->setLocation(request->getUrl(), false);
processResponse(response, -1);
response->release(); response->release();
return true; return true;
} }
HttpResponse* HttpClient::sendSync(HttpRequest* request)
{
request->setSync(true);
if (this->send(request))
return request->wait();
return nullptr;
}
int HttpClient::tryTakeAvailChannel() int HttpClient::tryTakeAvailChannel()
{ {
auto lck = _availChannelQueue.get_lock(); auto lck = _availChannelQueue.get_lock();
@ -208,18 +218,23 @@ int HttpClient::tryTakeAvailChannel()
return -1; return -1;
} }
void HttpClient::processResponse(HttpResponse* response, std::string_view url) void HttpClient::processResponse(HttpResponse* response, int channelIndex)
{ {
auto channelIndex = tryTakeAvailChannel();
response->retain(); response->retain();
if (channelIndex != -1) if (response->validateUri())
{ {
if (response->prepareForProcess(url)) if (channelIndex == -1)
channelIndex = tryTakeAvailChannel();
if (channelIndex != -1)
{ {
response->_responseHeaders.clear(); // redirect needs clear old response headers auto channelHandle = _service->channel_at(channelIndex);
auto& requestUri = response->getRequestUri();
auto channelHandle = _service->channel_at(channelIndex); auto& requestUri = response->getRequestUri();
ax::print("###### open connection for %s", requestUri.getHostName().data());
channelHandle->ud_.ptr = response; channelHandle->ud_.ptr = response;
_service->set_option(YOPT_C_REMOTE_ENDPOINT, channelIndex, requestUri.getHost().data(), _service->set_option(YOPT_C_REMOTE_ENDPOINT, channelIndex, requestUri.getHost().data(),
(int)requestUri.getPort()); (int)requestUri.getPort());
@ -229,14 +244,10 @@ void HttpClient::processResponse(HttpResponse* response, std::string_view url)
_service->open(channelIndex, YCK_TCP_CLIENT); _service->open(channelIndex, YCK_TCP_CLIENT);
} }
else else
{ _pendingResponseQueue.emplace_back(response);
finishResponse(response);
}
} }
else else
{ finishResponse(response);
_pendingResponseQueue.emplace_back(response);
}
} }
void HttpClient::handleNetworkEvent(yasio::io_event* event) void HttpClient::handleNetworkEvent(yasio::io_event* event)
@ -244,8 +255,7 @@ void HttpClient::handleNetworkEvent(yasio::io_event* event)
int channelIndex = event->cindex(); int channelIndex = event->cindex();
auto channel = _service->channel_at(event->cindex()); auto channel = _service->channel_at(event->cindex());
HttpResponse* response = (HttpResponse*)channel->ud_.ptr; HttpResponse* response = (HttpResponse*)channel->ud_.ptr;
if (!response) assert(response);
return;
bool responseFinished = response->isFinished(); bool responseFinished = response->isFinished();
switch (event->kind()) switch (event->kind())
@ -253,7 +263,7 @@ void HttpClient::handleNetworkEvent(yasio::io_event* event)
case YEK_ON_PACKET: case YEK_ON_PACKET:
if (!responseFinished) if (!responseFinished)
{ {
auto&& pkt = event->packet_view(); auto&& pkt = event->packet();
response->handleInput(pkt.data(), pkt.size()); response->handleInput(pkt.data(), pkt.size());
} }
if (response->isFinished()) if (response->isFinished())
@ -352,7 +362,8 @@ void HttpClient::handleNetworkEvent(yasio::io_event* event)
char strContentLength[128] = {0}; char strContentLength[128] = {0};
auto requestData = request->getRequestData(); auto requestData = request->getRequestData();
auto requestDataSize = request->getRequestDataSize(); auto requestDataSize = request->getRequestDataSize();
snprintf(strContentLength, sizeof(strContentLength), "Content-Length: %d\r\n\r\n", static_cast<int>(requestDataSize)); snprintf(strContentLength, sizeof(strContentLength), "Content-Length: %d\r\n\r\n",
static_cast<int>(requestDataSize));
obs.write_bytes(strContentLength); obs.write_bytes(strContentLength);
if (requestData && requestDataSize > 0) if (requestData && requestDataSize > 0)
@ -387,6 +398,8 @@ void HttpClient::handleNetworkEvent(yasio::io_event* event)
void HttpClient::handleNetworkEOF(HttpResponse* response, yasio::io_channel* channel, int internalErrorCode) void HttpClient::handleNetworkEOF(HttpResponse* response, yasio::io_channel* channel, int internalErrorCode)
{ {
channel->ud_.ptr = nullptr;
channel->get_user_timer().cancel(*_service); channel->get_user_timer().cancel(*_service);
response->updateInternalCode(internalErrorCode); response->updateInternalCode(internalErrorCode);
auto responseCode = response->getResponseCode(); auto responseCode = response->getResponseCode();
@ -395,54 +408,30 @@ void HttpClient::handleNetworkEOF(HttpResponse* response, yasio::io_channel* cha
case 301: case 301:
case 302: case 302:
case 307: case 307:
if (response->increaseRedirectCount() < HttpClient::MAX_REDIRECT_COUNT) if (response->tryRedirect())
{ {
auto iter = response->_responseHeaders.find("location"); processResponse(response, channel->index());
if (iter != response->_responseHeaders.end()) response->release();
{ break;
if (responseCode == 302)
response->getHttpRequest()->setRequestType(HttpRequest::Type::GET);
AXLOG("Process url redirect (%d): %s", responseCode, iter->second.c_str());
_availChannelQueue.push_front(channel->index());
processResponse(response, iter->second);
response->release();
return;
}
} }
} default:
finishResponse(response);
finishResponse(response); // try process pending response
auto lck = _pendingResponseQueue.get_lock();
if (!_pendingResponseQueue.unsafe_empty())
{
auto pendingResponse = _pendingResponseQueue.unsafe_front();
_pendingResponseQueue.unsafe_pop_front();
lck.unlock();
// recycle channel processResponse(pendingResponse, channel->index());
channel->ud_.ptr = nullptr; pendingResponse->release();
_availChannelQueue.push_front(channel->index()); }
else
// try process pending response { // recycle channel
auto lck = _pendingResponseQueue.get_lock(); _availChannelQueue.push_front(channel->index());
if (!_pendingResponseQueue.unsafe_empty()) }
{
auto pendingResponse = _pendingResponseQueue.unsafe_front();
_pendingResponseQueue.unsafe_pop_front();
lck.unlock();
processResponse(pendingResponse, pendingResponse->getHttpRequest()->getUrl());
pendingResponse->release();
}
}
// Poll and notify main thread if responses exists in queue
void HttpClient::dispatchResponseCallbacks()
{
if (_finishedResponseQueue.unsafe_empty())
return;
auto lck = _finishedResponseQueue.get_lock();
if (!_finishedResponseQueue.unsafe_empty())
{
HttpResponse* response = _finishedResponseQueue.front();
_finishedResponseQueue.unsafe_pop_front();
lck.unlock();
invokeResposneCallbackAndRelease(response);
} }
} }

View File

@ -63,7 +63,6 @@ public:
* How many requests could be perform concurrency. * How many requests could be perform concurrency.
*/ */
static const int MAX_CHANNELS = 21; static const int MAX_CHANNELS = 21;
static const int MAX_REDIRECT_COUNT = 3;
/** /**
* Get instance of HttpClient. * Get instance of HttpClient.
@ -120,12 +119,6 @@ public:
*/ */
bool send(HttpRequest* request); bool send(HttpRequest* request);
/**
* Send http request sync, will block caller thread until request finished.
* @remark Caller must call release manually when the response never been used.
*/
HttpResponse* sendSync(HttpRequest* request);
/** /**
* Set the timeout value for connecting. * Set the timeout value for connecting.
* *
@ -205,7 +198,7 @@ private:
HttpClient(); HttpClient();
virtual ~HttpClient(); virtual ~HttpClient();
void processResponse(HttpResponse* response, std::string_view url); void processResponse(HttpResponse* response, int channelIndex);
int tryTakeAvailChannel(); int tryTakeAvailChannel();
@ -213,7 +206,7 @@ private:
void handleNetworkEOF(HttpResponse* response, yasio::io_channel* channel, int internalErrorCode); void handleNetworkEOF(HttpResponse* response, yasio::io_channel* channel, int internalErrorCode);
void dispatchResponseCallbacks(); void tickInput();
void finishResponse(HttpResponse* response); void finishResponse(HttpResponse* response);

View File

@ -65,6 +65,8 @@ class AX_DLL HttpRequest : public Ref
friend class HttpClient; friend class HttpClient;
public: public:
static const int MAX_REDIRECT_COUNT = 3;
/** /**
* The HttpRequest type enum used in the HttpRequest::setRequestType. * The HttpRequest type enum used in the HttpRequest::setRequestType.
*/ */

View File

@ -150,51 +150,77 @@ private:
} }
} }
/** bool tryRedirect()
* Set the response data by the string pointer and the defined size.
* @param value a string pointer that point to response data buffer.
* @param n the defined size that the response data buffer would be copied.
*/
bool prepareForProcess(std::string_view url)
{ {
/* Resets response status */ if ((_redirectCount < HttpRequest::MAX_REDIRECT_COUNT))
_finished = false; {
_responseData.clear(); auto iter = _responseHeaders.find("location");
_currentHeader.clear(); if (iter != _responseHeaders.end())
_responseCode = -1; {
_internalCode = 0; auto redirectUrl = iter->second;
if (_responseCode == 302)
getHttpRequest()->setRequestType(HttpRequest::Type::GET);
AXLOG("Process url redirect (%d): %s", _responseCode, redirectUrl.c_str());
return setLocation(redirectUrl, true);
}
}
return false;
}
Uri uri = Uri::parse(url); /**
if (!uri.isValid()) * Set new request location with url
return false; * @param url the actually url to request
* @param redirect wither redirect
*/
bool setLocation(std::string_view url, bool redirect)
{
if (redirect)
{
++_redirectCount;
_requestUri.invalid();
}
_requestUri = std::move(uri); if (!_requestUri.isValid())
{
Uri uri = Uri::parse(url);
if (!uri.isValid())
return false;
_requestUri = std::move(uri);
/* Initialize user callbacks and settings */ /* Resets response status */
llhttp_settings_init(&_contextSettings); _responseHeaders.clear();
_finished = false;
_responseData.clear();
_currentHeader.clear();
_responseCode = -1;
_internalCode = 0;
/* Initialize the parser in HTTP_BOTH mode, meaning that it will select between /* Initialize user callbacks and settings */
* HTTP_REQUEST and HTTP_RESPONSE parsing automatically while reading the first llhttp_settings_init(&_contextSettings);
* input.
*/
llhttp_init(&_context, HTTP_RESPONSE, &_contextSettings);
_context.data = this; /* Initialize the parser in HTTP_BOTH mode, meaning that it will select between
* HTTP_REQUEST and HTTP_RESPONSE parsing automatically while reading the first
* input.
*/
llhttp_init(&_context, HTTP_RESPONSE, &_contextSettings);
/* Set user callbacks */ _context.data = this;
_contextSettings.on_header_field = on_header_field;
_contextSettings.on_header_field_complete = on_header_field_complete; /* Set user callbacks */
_contextSettings.on_header_value = on_header_value; _contextSettings.on_header_field = on_header_field;
_contextSettings.on_header_value_complete = on_header_value_complete; _contextSettings.on_header_field_complete = on_header_field_complete;
_contextSettings.on_body = on_body; _contextSettings.on_header_value = on_header_value;
_contextSettings.on_message_complete = on_complete; _contextSettings.on_header_value_complete = on_header_value_complete;
_contextSettings.on_body = on_body;
_contextSettings.on_message_complete = on_complete;
}
return true; return true;
} }
const Uri& getRequestUri() const { return _requestUri; } bool validateUri() const { return _requestUri.isValid(); }
int increaseRedirectCount() { return ++_redirectCount; } const Uri& getRequestUri() const { return _requestUri; }
static int on_header_field(llhttp_t* context, const char* at, size_t length) static int on_header_field(llhttp_t* context, const char* at, size_t length)
{ {

View File

@ -84,6 +84,8 @@ public:
/** Checks whether it's a SSL connection */ /** Checks whether it's a SSL connection */
bool isSecure() const { return _isSecure; } bool isSecure() const { return _isSecure; }
void invalid() { _isValid = false; }
/** Gets the scheme name for this URI. */ /** Gets the scheme name for this URI. */
std::string_view getScheme() const { return _scheme; } std::string_view getScheme() const { return _scheme; }

View File

@ -104,24 +104,7 @@ HttpClientTest::~HttpClientTest()
void HttpClientTest::onMenuGetTestClicked(ax::Ref* sender) void HttpClientTest::onMenuGetTestClicked(ax::Ref* sender)
{ {
// test 1(sync request test) // test 1
{
HttpRequest* request = new HttpRequest();
request->setUrl("https://tool.chinaz.com");
request->setRequestType(HttpRequest::Type::GET);
request->setHeaders(std::vector<std::string>{CHROME_UA});
// request->setResponseCallback(AX_CALLBACK_2(HttpClientTest::onHttpRequestCompleted, this));
request->setTag("GET test1");
HttpResponse* response = HttpClient::getInstance()->sendSync(request);
if (response)
{
onHttpRequestCompleted(HttpClient::getInstance(), response);
response->release();
}
request->release();
}
// test 2
{ {
HttpRequest* request = new HttpRequest(); HttpRequest* request = new HttpRequest();
request->setUrl("https://just-make-this-request-failed.com"); request->setUrl("https://just-make-this-request-failed.com");
@ -133,7 +116,7 @@ void HttpClientTest::onMenuGetTestClicked(ax::Ref* sender)
request->release(); request->release();
} }
// test 3 // test 2
{ {
HttpRequest* request = new HttpRequest(); HttpRequest* request = new HttpRequest();
// required fields // required fields
@ -147,7 +130,7 @@ void HttpClientTest::onMenuGetTestClicked(ax::Ref* sender)
request->release(); request->release();
} }
// test 4 // test 3
{ {
HttpRequest* request = new HttpRequest(); HttpRequest* request = new HttpRequest();
request->setUrl("https://httpbin.org/get"); request->setUrl("https://httpbin.org/get");
@ -159,7 +142,7 @@ void HttpClientTest::onMenuGetTestClicked(ax::Ref* sender)
request->release(); request->release();
} }
// test 5 // test 4
{ {
HttpRequest* request = new HttpRequest(); HttpRequest* request = new HttpRequest();
request->setUrl("https://github.com/yasio/yasio"); request->setUrl("https://github.com/yasio/yasio");