From f9e8ae0f09f84d9ef84646d2f934af2b39193a8e Mon Sep 17 00:00:00 2001 From: James Chen Date: Thu, 7 Jan 2016 23:15:11 +0800 Subject: [PATCH 01/14] WebSocket refactoring, bug fixes, improvements, and passes most Autobahn Test (The most standard WebSocket Protocol Test). --- cocos/network/WebSocket.cpp | 899 ++++++++++-------- cocos/network/WebSocket.h | 39 +- cocos/platform/CCPlatformMacros.h | 2 +- .../manual/network/jsb_websocket.cpp | 105 +- 4 files changed, 583 insertions(+), 462 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index ace2f76523..4d788929af 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -1,19 +1,19 @@ /**************************************************************************** Copyright (c) 2010-2012 cocos2d-x.org Copyright (c) 2013-2014 Chukong Technologies Inc. - + http://www.cocos2d-x.org - + Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - + The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. - + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -40,7 +40,56 @@ #include "libwebsockets.h" -#define WS_WRITE_BUFFER_SIZE 2048 +#define WS_RX_BUFFER_SIZE (65536) +#define WS_RESERVE_RECEIVE_BUFFER_SIZE (4096) + +#define LOG_TAG "WebSocket.cpp" + +// Since CCLOG isn't thread safe, we uses LOGD for multi-thread logging. +#if COCOS2D_DEBUG > 0 + #ifdef ANDROID + #define LOGD(...) __android_log_print(ANDROID_LOG_DEBUG, LOG_TAG,__VA_ARGS__) + #else + #define LOGD(...) printf(__VA_ARGS__) + #endif +#else + #define LOGD(...) +#endif + +static void printWebSocketLog(int level, const char *line) +{ +#if COCOS2D_DEBUG > 0 + static const char * const log_level_names[] = { + "ERR", + "WARN", + "NOTICE", + "INFO", + "DEBUG", + "PARSER", + "HEADER", + "EXTENSION", + "CLIENT", + "LATENCY", + }; + + char buf[30] = {0}; + int n; + + for (n = 0; n < LLL_COUNT; n++) { + if (level != (1 << n)) + continue; + sprintf(buf, "%s: ", log_level_names[n]); + break; + } + +#ifdef ANDROID + __android_log_print(ANDROID_LOG_DEBUG, "libwebsockets", "%s%s", buf, line); +#else + printf("%s%s\n", buf, line); +#endif + +#endif // #if COCOS2D_DEBUG > 0 +} NS_CC_BEGIN @@ -49,45 +98,44 @@ namespace network { class WsMessage { public: - WsMessage() : what(0), obj(nullptr){} + WsMessage() : id(++__id), what(0), obj(nullptr){} + unsigned int id; unsigned int what; // message type void* obj; + +private: + static unsigned int __id; }; +unsigned int WsMessage::__id = 0; + /** * @brief Websocket thread helper, it's used for sending message between UI thread and websocket thread. */ -class WsThreadHelper : public Ref +class WsThreadHelper { public: WsThreadHelper(); ~WsThreadHelper(); - + // Creates a new thread bool createThread(const WebSocket& ws); // Quits sub-thread (websocket thread). void quitSubThread(); - - // Schedule callback function - virtual void update(float dt); - + // Sends message to UI thread. It's needed to be invoked in sub-thread. - void sendMessageToUIThread(WsMessage *msg); - + void sendMessageToUIThread(const std::function& cb); + // Sends message to sub-thread(websocket thread). It's needs to be invoked in UI thread. void sendMessageToSubThread(WsMessage *msg); - + // Waits the sub-thread (websocket thread) to exit, void joinSubThread(); - - + protected: void wsThreadEntryFunc(); - private: - std::list* _UIWsMessageQueue; std::list* _subThreadWsMessageQueue; - std::mutex _UIWsMessageQueueMutex; std::mutex _subThreadWsMessageQueueMutex; std::thread* _subThreadInstance; WebSocket* _ws; @@ -98,17 +146,19 @@ private: // Wrapper for converting websocket callback from static function to member function of WebSocket class. class WebSocketCallbackWrapper { public: - - static int onSocketCallback(struct libwebsocket_context *ctx, - struct libwebsocket *wsi, - enum libwebsocket_callback_reasons reason, - void *user, void *in, size_t len) + + static int onSocketCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { // Gets the user data from context. We know that it's a 'WebSocket' instance. - WebSocket* wsInstance = (WebSocket*)libwebsocket_context_user(ctx); + if (wsi == nullptr) { + return 0; + } + + lws_context* context = lws_get_context(wsi); + WebSocket* wsInstance = (WebSocket*)lws_context_user(context); if (wsInstance) { - return wsInstance->onSocketCallback(ctx, wsi, reason, user, in, len); + return wsInstance->onSocketCallback(wsi, reason, user, in, len); } return 0; } @@ -120,27 +170,22 @@ WsThreadHelper::WsThreadHelper() , _ws(nullptr) , _needQuit(false) { - _UIWsMessageQueue = new (std::nothrow) std::list(); - _subThreadWsMessageQueue = new (std::nothrow) std::list(); - - Director::getInstance()->getScheduler()->scheduleUpdate(this, 0, false); + _subThreadWsMessageQueue = new std::list(); } WsThreadHelper::~WsThreadHelper() { - Director::getInstance()->getScheduler()->unscheduleAllForTarget(this); joinSubThread(); CC_SAFE_DELETE(_subThreadInstance); - delete _UIWsMessageQueue; delete _subThreadWsMessageQueue; } bool WsThreadHelper::createThread(const WebSocket& ws) { _ws = const_cast(&ws); - + // Creates websocket thread - _subThreadInstance = new (std::nothrow) std::thread(&WsThreadHelper::wsThreadEntryFunc, this); + _subThreadInstance = new std::thread(&WsThreadHelper::wsThreadEntryFunc, this); return true; } @@ -152,21 +197,20 @@ void WsThreadHelper::quitSubThread() void WsThreadHelper::wsThreadEntryFunc() { _ws->onSubThreadStarted(); - + while (!_needQuit) { - if (_ws->onSubThreadLoop()) - { - break; - } + _ws->onSubThreadLoop(); } - + + _ws->onSubThreadEnded(); + + LOGD("Websocket thread exit!\n"); } -void WsThreadHelper::sendMessageToUIThread(WsMessage *msg) +void WsThreadHelper::sendMessageToUIThread(const std::function& cb) { - std::lock_guard lk(_UIWsMessageQueueMutex); - _UIWsMessageQueue->push_back(msg); + Director::getInstance()->getScheduler()->performFunctionInCocosThread(cb); } void WsThreadHelper::sendMessageToSubThread(WsMessage *msg) @@ -183,57 +227,73 @@ void WsThreadHelper::joinSubThread() } } -void WsThreadHelper::update(float dt) +// Define a WebSocket frame +class WebSocketFrame { - /* Avoid locking if, in most cases, the queue is empty. This could be a little faster. - size() is not thread-safe, it might return a strange value, but it should be OK in our scenario. - */ - if (0 == _UIWsMessageQueue->size()) - return; - - // Returns quickly if no message - _UIWsMessageQueueMutex.lock(); - - if (0 == _UIWsMessageQueue->size()) +public: + WebSocketFrame() + : _payload(nullptr) + , _payloadLength(0) + , _frameLength(0) { - _UIWsMessageQueueMutex.unlock(); - return; - } - - // Gets message - // Process all messages in the queue, in case it's piling up faster than being processed - std::list messages; - while (!_UIWsMessageQueue->empty()) { - messages.push_back(_UIWsMessageQueue->front()); - _UIWsMessageQueue->pop_front(); } - _UIWsMessageQueueMutex.unlock(); - - for (auto msg : messages) { - if (_ws) + bool init(unsigned char* buf, ssize_t len) + { + if (buf == nullptr && len > 0) + return false; + + if (!_data.empty()) { - _ws->onUIThreadReceiveMessage(msg); + LOGD("WebSocketFrame was initialized, should not init it again!\n"); + return false; } - CC_SAFE_DELETE(msg); + + _data.reserve(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING); + _data.resize(LWS_SEND_BUFFER_PRE_PADDING, 0x00); + if (len > 0) + { + _data.insert(_data.end(), buf, buf + len); + } + + if (LWS_SEND_BUFFER_POST_PADDING > 0) + { + _data.insert(_data.end(), LWS_SEND_BUFFER_POST_PADDING, 0x00); + } + + _payload = _data.data() + LWS_SEND_BUFFER_PRE_PADDING; + _payloadLength = len; + _frameLength = len; + return true; } -} + + void update(ssize_t issued) + { + _payloadLength -= issued; + _payload += issued; + } + + unsigned char* getPayload() const { return _payload; } + ssize_t getPayloadLength() const { return _payloadLength; } + ssize_t getFrameLength() const { return _frameLength; } +private: + unsigned char* _payload; + ssize_t _payloadLength; + + ssize_t _frameLength; + std::vector _data; +}; +// + enum WS_MSG { WS_MSG_TO_SUBTRHEAD_SENDING_STRING = 0, WS_MSG_TO_SUBTRHEAD_SENDING_BINARY, - WS_MSG_TO_UITHREAD_OPEN, - WS_MSG_TO_UITHREAD_MESSAGE, - WS_MSG_TO_UITHREAD_ERROR, - WS_MSG_TO_UITHREAD_CLOSE }; WebSocket::WebSocket() : _readyState(State::CONNECTING) , _port(80) -, _pendingFrameDataLen(0) -, _currentDataLen(0) -, _currentData(nullptr) , _wsHelper(nullptr) , _wsInstance(nullptr) , _wsContext(nullptr) @@ -241,21 +301,22 @@ WebSocket::WebSocket() , _SSLConnection(0) , _wsProtocols(nullptr) { + // reserve data buffer to avoid allocate memory frequently + _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE); } WebSocket::~WebSocket() { - close(); - CC_SAFE_RELEASE_NULL(_wsHelper); - - if(_wsProtocols) + CC_SAFE_DELETE(_wsHelper); + + if (_wsProtocols != nullptr) { for (int i = 0; _wsProtocols[i].callback != nullptr; ++i) { CC_SAFE_DELETE_ARRAY(_wsProtocols[i].name); } } - CC_SAFE_DELETE_ARRAY(_wsProtocols); + CC_SAFE_DELETE_ARRAY(_wsProtocols); } bool WebSocket::init(const Delegate& delegate, @@ -267,40 +328,40 @@ bool WebSocket::init(const Delegate& delegate, std::string host = url; size_t pos = 0; int port = 80; - + _delegate = const_cast(&delegate); - + //ws:// pos = host.find("ws://"); if (pos == 0) host.erase(0,5); - + pos = host.find("wss://"); if (pos == 0) { host.erase(0,6); useSSL = true; } - + pos = host.find(":"); if (pos != std::string::npos) port = atoi(host.substr(pos+1, host.size()).c_str()); - + pos = host.find("/", 0); std::string path = "/"; if (pos != std::string::npos) path += host.substr(pos + 1, host.size()); - + pos = host.find(":"); if(pos != std::string::npos){ host.erase(pos, host.size()); }else if((pos = host.find("/")) != std::string::npos) { - host.erase(pos, host.size()); + host.erase(pos, host.size()); } - + _host = host; _port = port; _path = path; _SSLConnection = useSSL ? 1 : 0; - - CCLOG("[WebSocket::init] _host: %s, _port: %d, _path: %s", _host.c_str(), _port, _path.c_str()); + + LOGD("[WebSocket::init] _host: %s, _port: %d, _path: %s", _host.c_str(), _port, _path.c_str()); size_t protocolCount = 0; if (protocols && protocols->size() > 0) @@ -311,33 +372,35 @@ bool WebSocket::init(const Delegate& delegate, { protocolCount = 1; } - - _wsProtocols = new (std::nothrow) libwebsocket_protocols[protocolCount+1]; - memset(_wsProtocols, 0, sizeof(libwebsocket_protocols)*(protocolCount+1)); + + _wsProtocols = new lws_protocols[protocolCount+1]; + memset(_wsProtocols, 0, sizeof(lws_protocols)*(protocolCount+1)); if (protocols && protocols->size() > 0) { int i = 0; for (std::vector::const_iterator iter = protocols->begin(); iter != protocols->end(); ++iter, ++i) { - char* name = new (std::nothrow) char[(*iter).length()+1]; + char* name = new char[(*iter).length()+1]; strcpy(name, (*iter).c_str()); _wsProtocols[i].name = name; _wsProtocols[i].callback = WebSocketCallbackWrapper::onSocketCallback; + _wsProtocols[i].rx_buffer_size = WS_RX_BUFFER_SIZE; } } else { - char* name = new (std::nothrow) char[20]; + char* name = new char[20]; strcpy(name, "default-protocol"); _wsProtocols[0].name = name; _wsProtocols[0].callback = WebSocketCallbackWrapper::onSocketCallback; + _wsProtocols[0].rx_buffer_size = WS_RX_BUFFER_SIZE; } - + // WebSocket thread needs to be invoked at the end of this method. _wsHelper = new (std::nothrow) WsThreadHelper(); ret = _wsHelper->createThread(*this); - + return ret; } @@ -346,52 +409,57 @@ void WebSocket::send(const std::string& message) if (_readyState == State::OPEN) { // In main thread - WsMessage* msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING; Data* data = new (std::nothrow) Data(); - data->bytes = new (std::nothrow) char[message.length()+1]; + data->bytes = (char*)malloc(message.length() + 1); + // Make sure the last byte is '\0' + data->bytes[message.length()] = '\0'; strcpy(data->bytes, message.c_str()); data->len = static_cast(message.length()); + + WsMessage* msg = new (std::nothrow) WsMessage(); + msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING; msg->obj = data; _wsHelper->sendMessageToSubThread(msg); } + else + { + LOGD("Couldn't send message since websocket wasn't opened!\n"); + } } void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) { - CCASSERT(binaryMsg != nullptr && len > 0, "parameter invalid."); - if (_readyState == State::OPEN) { // In main thread + Data* data = new (std::nothrow) Data(); + if (len == 0) + { + // If data length is zero, allocate 1 byte for safe. + data->bytes = (char*)malloc(1); + data->bytes[0] = '\0'; + } + else + { + data->bytes = (char*)malloc(len); + memcpy((void*)data->bytes, (void*)binaryMsg, len); + } + data->len = len; + WsMessage* msg = new (std::nothrow) WsMessage(); msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_BINARY; - Data* data = new (std::nothrow) Data(); - data->bytes = new (std::nothrow) char[len]; - memcpy((void*)data->bytes, (void*)binaryMsg, len); - data->len = len; msg->obj = data; _wsHelper->sendMessageToSubThread(msg); } + else + { + LOGD("Couldn't send message since websocket wasn't opened!\n"); + } } void WebSocket::close() { - Director::getInstance()->getScheduler()->unscheduleAllForTarget(_wsHelper); - - if (_readyState == State::CLOSING || _readyState == State::CLOSED) - { - return; - } - - CCLOG("websocket (%p) connection closed by client", this); - _readyState = State::CLOSED; - - _wsHelper->joinSubThread(); - - // onClose callback needs to be invoked at the end of this method - // since websocket instance may be deleted in 'onClose'. - _delegate->onClose(this); + _wsHelper->quitSubThread(); } WebSocket::State WebSocket::getReadyState() @@ -399,336 +467,365 @@ WebSocket::State WebSocket::getReadyState() return _readyState; } -int WebSocket::onSubThreadLoop() +void WebSocket::onSubThreadLoop() { - if (_readyState == State::CLOSED || _readyState == State::CLOSING) - { - libwebsocket_context_destroy(_wsContext); - // return 1 to exit the loop. - return 1; - } - if (_wsContext && _readyState != State::CLOSED && _readyState != State::CLOSING) { - libwebsocket_service(_wsContext, 0); - } - - // Sleep 50 ms - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + _wsHelper->_subThreadWsMessageQueueMutex.lock(); + bool isEmpty = _wsHelper->_subThreadWsMessageQueue->empty(); + _wsHelper->_subThreadWsMessageQueueMutex.unlock(); + if (!isEmpty) + { + lws_callback_on_writable(_wsInstance); + } - // return 0 to continue the loop. - return 0; + lws_service(_wsContext, 50); + } + else + { + LOGD("Ready state is closing or was closed, code=%d, quit websocket thread!", _readyState); + _wsHelper->quitSubThread(); + } } void WebSocket::onSubThreadStarted() { - struct lws_context_creation_info info; - memset(&info, 0, sizeof info); - - /* - * create the websocket context. This tracks open connections and - * knows how to route any traffic and which protocol version to use, - * and if each connection is client or server side. - * - * For this client-only demo, we tell it to not listen on any port. - */ - - info.port = CONTEXT_PORT_NO_LISTEN; - info.protocols = _wsProtocols; -#ifndef LWS_NO_EXTENSIONS - info.extensions = libwebsocket_get_internal_extensions(); -#endif - info.gid = -1; - info.uid = -1; - info.user = (void*)this; - - _wsContext = libwebsocket_create_context(&info); - - if(nullptr != _wsContext) + struct lws_context_creation_info info; + memset(&info, 0, sizeof info); + /* + * create the websocket context. This tracks open connections and + * knows how to route any traffic and which protocol version to use, + * and if each connection is client or server side. + * + * For this client-only demo, we tell it to not listen on any port. + */ + + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = _wsProtocols; + info.extensions = lws_get_internal_extensions(); + + info.gid = -1; + info.uid = -1; + info.options = 0; + info.user = this; + + int log_level = LLL_ERR | LLL_WARN | LLL_NOTICE/* | LLL_INFO | LLL_DEBUG/* | LLL_PARSER*/ | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY; + lws_set_log_level(log_level, printWebSocketLog); + + _wsContext = lws_create_context(&info); + + if (nullptr != _wsContext) { _readyState = State::CONNECTING; std::string name; for (int i = 0; _wsProtocols[i].callback != nullptr; ++i) { name += (_wsProtocols[i].name); - + if (_wsProtocols[i+1].callback != nullptr) name += ", "; } - _wsInstance = libwebsocket_client_connect(_wsContext, _host.c_str(), _port, _SSLConnection, - _path.c_str(), _host.c_str(), _host.c_str(), - name.c_str(), -1); - - if(nullptr == _wsInstance) { - WsMessage* msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_UITHREAD_ERROR; - _readyState = State::CLOSING; - _wsHelper->sendMessageToUIThread(msg); - } - } + char portStr[10]; + sprintf(portStr, "%d", _port); + std::string ads_port = _host + ":" + portStr; + + _wsInstance = lws_client_connect(_wsContext, _host.c_str(), _port, _SSLConnection, + _path.c_str(), ads_port.c_str(), ads_port.c_str(), + name.c_str(), -1); + + if (nullptr == _wsInstance) + { + onConnectionError(); + } + } + else + { + CCLOGERROR("Create websocket context failed!"); + } } void WebSocket::onSubThreadEnded() { - + if (_wsContext != nullptr) + { + lws_context_destroy(_wsContext); + } } -int WebSocket::onSocketCallback(struct libwebsocket_context *ctx, - struct libwebsocket *wsi, +void WebSocket::onClientWritable() +{ + std::lock_guard lk(_wsHelper->_subThreadWsMessageQueueMutex); + + if (_wsHelper->_subThreadWsMessageQueue->empty()) + { + return; + } + + std::list::iterator iter = _wsHelper->_subThreadWsMessageQueue->begin(); + + ssize_t bytesWrite = 0; + if (iter != _wsHelper->_subThreadWsMessageQueue->end()) + { + WsMessage* subThreadMsg = *iter; + Data* data = (Data*)subThreadMsg->obj; + + const size_t c_bufferSize = WS_RX_BUFFER_SIZE; + + const size_t remaining = data->len - data->issued; + const size_t n = std::min(remaining, c_bufferSize ); + + WebSocketFrame* frame = nullptr; + + if (data->ext) + { + frame = (WebSocketFrame*)data->ext; + } + else + { + frame = new WebSocketFrame(); + bool success = frame && frame->init((unsigned char*)(data->bytes + data->issued), n); + if (success) + { + data->ext = frame; + } + else + { // If frame initialization failed, delete the frame and drop the sending data + // These codes should never be called. + LOGD("WebSocketFrame initialization failed, drop the sending data, msg(%d)", (int)subThreadMsg->id); + delete frame; + CC_SAFE_FREE(data->bytes); + CC_SAFE_DELETE(data); + _wsHelper->_subThreadWsMessageQueue->erase(iter); + CC_SAFE_DELETE(subThreadMsg); + return; + } + } + + int writeProtocol; + + if (data->issued == 0) + { + if (WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what) + { + writeProtocol = LWS_WRITE_TEXT; + } + else + { + writeProtocol = LWS_WRITE_BINARY; + } + + // If we have more than 1 fragment + if (data->len > c_bufferSize) + writeProtocol |= LWS_WRITE_NO_FIN; + } else { + // we are in the middle of fragments + writeProtocol = LWS_WRITE_CONTINUATION; + // and if not in the last fragment + if (remaining != n) + writeProtocol |= LWS_WRITE_NO_FIN; + } + + bytesWrite = lws_write(_wsInstance, frame->getPayload(), frame->getPayloadLength(), (lws_write_protocol)writeProtocol); + + // Handle the result of lws_write + // Buffer overrun? + if (bytesWrite < 0) + { + LOGD("ERROR: msg(%u), lws_write return: %d, but it should be %d, drop this message.\n", subThreadMsg->id, (int)bytesWrite, (int)n); + // socket error, we need to close the socket connection + CC_SAFE_FREE(data->bytes); + delete ((WebSocketFrame*)data->ext); + data->ext = nullptr; + CC_SAFE_DELETE(data); + _wsHelper->_subThreadWsMessageQueue->erase(iter); + CC_SAFE_DELETE(subThreadMsg); + + close(); + } + else if (bytesWrite < frame->getPayloadLength()) + { + frame->update(bytesWrite); + LOGD("frame wasn't sent completely, bytesWrite: %d, remain: %d\n", (int)bytesWrite, (int)frame->getPayloadLength()); + } + // Do we have another fragments to send? + else if (remaining > frame->getFrameLength() && bytesWrite == frame->getPayloadLength()) + { + // A frame was totally sent, plus data->issued to send next frame + LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength())); + data->issued += frame->getFrameLength(); + delete ((WebSocketFrame*)data->ext); + data->ext = nullptr; + } + // Safely done! + else + { + LOGD("Safely done, msg(%d)!\n", subThreadMsg->id); + if (remaining == frame->getFrameLength()) + { + LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength())); + LOGD("msg(%u) was totally sent!\n", subThreadMsg->id); + } + else + { + LOGD("ERROR: msg(%u), remaining(%d) < bytesWrite(%d)\n", subThreadMsg->id, (int)remaining, (int)frame->getFrameLength()); + LOGD("Drop the msg(%u)\n", subThreadMsg->id); + close(); + } + + CC_SAFE_FREE(data->bytes); + delete ((WebSocketFrame*)data->ext); + data->ext = nullptr; + CC_SAFE_DELETE(data); + _wsHelper->_subThreadWsMessageQueue->erase(iter); + CC_SAFE_DELETE(subThreadMsg); + + LOGD("-----------------------------------------------------------\n"); + } + } +} + +void WebSocket::onClientReceivedData(void* in, ssize_t len) +{ + // In websocket thread + static int packageIndex = 0; + packageIndex++; + if (in != nullptr && len > 0) + { + LOGD("Receiving data:index:%d, len=%d\n", packageIndex, (int)len); + + unsigned char* inData = (unsigned char*)in; + _receivedData.insert(_receivedData.end(), inData, inData + len); + } + else + { + LOGD("Emtpy message received, index=%d!\n", packageIndex); + } + + // If no more data pending, send it to the client thread + size_t remainingSize = lws_remaining_packet_payload(_wsInstance); + int isFinalFragment = lws_is_final_fragment(_wsInstance); +// LOGD("remainingSize: %d, isFinalFragment: %d\n", (int)remainingSize, isFinalFragment); + + if (remainingSize == 0 && isFinalFragment) + { + std::vector* frameData = new std::vector(std::move(_receivedData)); + + // reset capacity of received data buffer + _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE); + + ssize_t frameSize = frameData->size(); + + bool isBinary = lws_frame_is_binary(_wsInstance); + + if (!isBinary) + { + frameData->push_back('\0'); + } + + _wsHelper->sendMessageToUIThread([this, frameData, frameSize, isBinary](){ + // In UI thread + LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize); + + Data data; + data.isBinary = isBinary; + data.bytes = (char*)frameData->data(); + data.len = frameSize; + + _delegate->onMessage(this, data); + + delete frameData; + }); + } +} + +void WebSocket::onConnectionOpened() +{ + /* + * start the ball rolling, + * LWS_CALLBACK_CLIENT_WRITEABLE will come next service + */ + lws_callback_on_writable(_wsInstance); + + _readyState = State::OPEN; + + _wsHelper->sendMessageToUIThread([this](){ + _delegate->onOpen(this); + }); +} + +void WebSocket::onConnectionError() +{ + _readyState = State::CLOSING; + + _wsHelper->sendMessageToUIThread([this](){ + _delegate->onError(this, ErrorCode::CONNECTION_FAILURE); + }); +} + +void WebSocket::onConnectionClosed() +{ + LOGD("%s", "connection closing..\n"); + if (_readyState == State::CLOSED) + { + LOGD("Websocket %p was closed, no need to close it again!", this); + return; + } + + _readyState = State::CLOSED; + + _wsHelper->quitSubThread(); + _wsHelper->sendMessageToUIThread([this](){ + //Waiting for the subThread safety exit + _wsHelper->joinSubThread(); + _delegate->onClose(this); + }); +} + +int WebSocket::onSocketCallback(struct lws *wsi, int reason, void *user, void *in, ssize_t len) { - //CCLOG("socket callback for %d reason", reason); - CCASSERT(_wsContext == nullptr || ctx == _wsContext, "Invalid context."); - CCASSERT(_wsInstance == nullptr || wsi == nullptr || wsi == _wsInstance, "Invaild websocket instance."); + //LOGD("socket callback for %d reason", reason); - switch (reason) + switch (reason) { case LWS_CALLBACK_DEL_POLL_FD: case LWS_CALLBACK_PROTOCOL_DESTROY: case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { - WsMessage* msg = nullptr; if (reason == LWS_CALLBACK_CLIENT_CONNECTION_ERROR || (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CONNECTING) || (reason == LWS_CALLBACK_DEL_POLL_FD && _readyState == State::CONNECTING) ) { - msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_UITHREAD_ERROR; - _readyState = State::CLOSING; + onConnectionError(); } else if (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CLOSING) { - msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_UITHREAD_CLOSE; - } - - if (msg) - { - _wsHelper->sendMessageToUIThread(msg); + onConnectionClosed(); } } break; case LWS_CALLBACK_CLIENT_ESTABLISHED: - { - WsMessage* msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_UITHREAD_OPEN; - _readyState = State::OPEN; - - /* - * start the ball rolling, - * LWS_CALLBACK_CLIENT_WRITEABLE will come next service - */ - libwebsocket_callback_on_writable(ctx, wsi); - _wsHelper->sendMessageToUIThread(msg); - } + onConnectionOpened(); break; - + case LWS_CALLBACK_CLIENT_WRITEABLE: - { - - std::lock_guard lk(_wsHelper->_subThreadWsMessageQueueMutex); - - auto iter = _wsHelper->_subThreadWsMessageQueue->begin(); - - //To avoid automatically disconnected on Android,send only one WsMessage at a time. - //for (; iter != _wsHelper->_subThreadWsMessageQueue->end();) - if (iter != _wsHelper->_subThreadWsMessageQueue->end()) - { - WsMessage* subThreadMsg = *iter; - - if ( WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what - || WS_MSG_TO_SUBTRHEAD_SENDING_BINARY == subThreadMsg->what) - { - Data* data = (Data*)subThreadMsg->obj; - - const size_t c_bufferSize = WS_WRITE_BUFFER_SIZE; - - size_t remaining = data->len - data->issued; - size_t n = std::min(remaining, c_bufferSize ); - - unsigned char* buf = new (std::nothrow) unsigned char[LWS_SEND_BUFFER_PRE_PADDING + n + LWS_SEND_BUFFER_POST_PADDING]; - - memcpy((char*)&buf[LWS_SEND_BUFFER_PRE_PADDING], data->bytes + data->issued, n); - - int writeProtocol; - - if (data->issued == 0) { - if (WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what) - { - writeProtocol = LWS_WRITE_TEXT; - } - else - { - writeProtocol = LWS_WRITE_BINARY; - } - - // If we have more than 1 fragment - if (data->len > c_bufferSize) - writeProtocol |= LWS_WRITE_NO_FIN; - } else { - // we are in the middle of fragments - writeProtocol = LWS_WRITE_CONTINUATION; - // and if not in the last fragment - if (remaining != n) - writeProtocol |= LWS_WRITE_NO_FIN; - } - - auto bytesWrite = libwebsocket_write(wsi, &buf[LWS_SEND_BUFFER_PRE_PADDING], n, (libwebsocket_write_protocol)writeProtocol); - - // Buffer overrun? - if (bytesWrite < 0) - { - //break; - } - // Do we have another fragments to send? - else if (remaining != n) - { - data->issued += n; - //break; - } - // Safely done! - else - { - CC_SAFE_DELETE_ARRAY(data->bytes); - CC_SAFE_DELETE(data); - CC_SAFE_DELETE_ARRAY(buf); - _wsHelper->_subThreadWsMessageQueue->erase(iter++); - CC_SAFE_DELETE(subThreadMsg); - } - } - } - - /* get notified as soon as we can write again */ - - libwebsocket_callback_on_writable(ctx, wsi); - } + onClientWritable(); break; - + case LWS_CALLBACK_CLOSED: - { - //fixme: the log is not thread safe -// CCLOG("%s", "connection closing.."); - - _wsHelper->quitSubThread(); - - if (_readyState != State::CLOSED) - { - WsMessage* msg = new (std::nothrow) WsMessage(); - _readyState = State::CLOSED; - msg->what = WS_MSG_TO_UITHREAD_CLOSE; - _wsHelper->sendMessageToUIThread(msg); - } - } + onConnectionClosed(); break; - + case LWS_CALLBACK_CLIENT_RECEIVE: - { - if (in && len > 0) - { - // Accumulate the data (increasing the buffer as we go) - if (_currentDataLen == 0) - { - _currentData = new (std::nothrow) char[len]; - memcpy (_currentData, in, len); - _currentDataLen = len; - } - else - { - char *new_data = new (std::nothrow) char [_currentDataLen + len]; - memcpy (new_data, _currentData, _currentDataLen); - memcpy (new_data + _currentDataLen, in, len); - CC_SAFE_DELETE_ARRAY(_currentData); - _currentData = new_data; - _currentDataLen = _currentDataLen + len; - } - - _pendingFrameDataLen = libwebsockets_remaining_packet_payload (wsi); - - if (_pendingFrameDataLen > 0) - { - //CCLOG("%ld bytes of pending data to receive, consider increasing the libwebsocket rx_buffer_size value.", _pendingFrameDataLen); - } - - // If no more data pending, send it to the client thread - if (_pendingFrameDataLen == 0) - { - WsMessage* msg = new (std::nothrow) WsMessage(); - msg->what = WS_MSG_TO_UITHREAD_MESSAGE; - - char* bytes = nullptr; - Data* data = new (std::nothrow) Data(); - - if (lws_frame_is_binary(wsi)) - { - - bytes = new (std::nothrow) char[_currentDataLen]; - data->isBinary = true; - } - else - { - bytes = new (std::nothrow) char[_currentDataLen+1]; - bytes[_currentDataLen] = '\0'; - data->isBinary = false; - } - - memcpy(bytes, _currentData, _currentDataLen); - - data->bytes = bytes; - data->len = _currentDataLen; - msg->obj = (void*)data; - - CC_SAFE_DELETE_ARRAY(_currentData); - _currentData = nullptr; - _currentDataLen = 0; - - _wsHelper->sendMessageToUIThread(msg); - } - } - } - break; - default: - break; - - } - - return 0; -} - -void WebSocket::onUIThreadReceiveMessage(WsMessage* msg) -{ - switch (msg->what) { - case WS_MSG_TO_UITHREAD_OPEN: - { - _delegate->onOpen(this); - } - break; - case WS_MSG_TO_UITHREAD_MESSAGE: - { - Data* data = (Data*)msg->obj; - _delegate->onMessage(this, *data); - CC_SAFE_DELETE_ARRAY(data->bytes); - CC_SAFE_DELETE(data); - } - break; - case WS_MSG_TO_UITHREAD_CLOSE: - { - //Waiting for the subThread safety exit - _wsHelper->joinSubThread(); - _delegate->onClose(this); - } - break; - case WS_MSG_TO_UITHREAD_ERROR: - { - // FIXME: The exact error needs to be checked. - WebSocket::ErrorCode err = ErrorCode::CONNECTION_FAILURE; - _delegate->onError(this, err); - } + onClientReceivedData(in, len); break; default: break; } + + return 0; } } diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index c1f26eaeb6..25775bd31f 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -36,9 +36,9 @@ #include "platform/CCPlatformMacros.h" #include "platform/CCStdC.h" -struct libwebsocket; -struct libwebsocket_context; -struct libwebsocket_protocols; +struct lws; +struct lws_context; +struct lws_protocols; /** * @addtogroup network @@ -77,10 +77,11 @@ public: */ struct Data { - Data():bytes(nullptr), len(0), issued(0), isBinary(false){} + Data():bytes(nullptr), len(0), issued(0), isBinary(false), ext(nullptr){} char* bytes; ssize_t len, issued; bool isBinary; + void* ext; }; /** @@ -190,17 +191,18 @@ public: State getReadyState(); private: - virtual void onSubThreadStarted(); - virtual int onSubThreadLoop(); - virtual void onSubThreadEnded(); - virtual void onUIThreadReceiveMessage(WsMessage* msg); + void onSubThreadStarted(); + void onSubThreadLoop(); + void onSubThreadEnded(); + // The following callback functions are invoked in websocket thread + int onSocketCallback(struct lws *wsi, int reason, void *user, void *in, ssize_t len); - friend class WebSocketCallbackWrapper; - int onSocketCallback(struct libwebsocket_context *ctx, - struct libwebsocket *wsi, - int reason, - void *user, void *in, ssize_t len); + void onClientWritable(); + void onClientReceivedData(void* in, ssize_t len); + void onConnectionOpened(); + void onConnectionError(); + void onConnectionClosed(); private: State _readyState; @@ -208,18 +210,17 @@ private: unsigned int _port; std::string _path; - ssize_t _pendingFrameDataLen; - ssize_t _currentDataLen; - char *_currentData; + std::vector _receivedData; friend class WsThreadHelper; + friend class WebSocketCallbackWrapper; WsThreadHelper* _wsHelper; - struct libwebsocket* _wsInstance; - struct libwebsocket_context* _wsContext; + struct lws* _wsInstance; + struct lws_context* _wsContext; Delegate* _delegate; int _SSLConnection; - struct libwebsocket_protocols* _wsProtocols; + struct lws_protocols* _wsProtocols; }; } diff --git a/cocos/platform/CCPlatformMacros.h b/cocos/platform/CCPlatformMacros.h index d758e74c13..c0f71214b7 100644 --- a/cocos/platform/CCPlatformMacros.h +++ b/cocos/platform/CCPlatformMacros.h @@ -220,7 +220,7 @@ public: virtual void set##funName(varType var) \ #define CC_BREAK_IF(cond) if(cond) break #define __CCLOGWITHFUNCTION(s, ...) \ - log("%s : %s",__FUNCTION__, StringUtils::format(s, ##__VA_ARGS__).c_str()) + cocos2d::log("%s : %s",__FUNCTION__, cocos2d::StringUtils::format(s, ##__VA_ARGS__).c_str()) /// @name Cocos2d debug /// @{ diff --git a/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp b/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp index 69c8eb1242..8b400dcc62 100644 --- a/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp +++ b/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp @@ -107,8 +107,11 @@ public: if (data.isBinary) {// data is binary JSObject* buffer = JS_NewArrayBuffer(cx, static_cast(data.len)); - uint8_t* bufdata = JS_GetArrayBufferData(buffer); - memcpy((void*)bufdata, (void*)data.bytes, data.len); + if (data.len > 0) + { + uint8_t* bufdata = JS_GetArrayBufferData(buffer); + memcpy((void*)bufdata, (void*)data.bytes, data.len); + } JS::RootedValue dataVal(cx); dataVal = OBJECT_TO_JSVAL(buffer); JS_SetProperty(cx, jsobj, "data", dataVal); @@ -116,7 +119,20 @@ public: else {// data is string JS::RootedValue dataVal(cx); - dataVal = c_string_to_jsval(cx, data.bytes); + if (strlen(data.bytes) == 0 && data.len > 0) + {// String with 0x00 prefix + dataVal = STRING_TO_JSVAL(JS_NewStringCopyN(cx, data.bytes, data.len)); + } + else + {// Normal string + dataVal = c_string_to_jsval(cx, data.bytes); + } + + if (dataVal.isNullOrUndefined()) + { + ws->close(); + return; + } JS_SetProperty(cx, jsobj, "data", dataVal); } @@ -142,7 +158,10 @@ public: auto copy = &p->obj; JS::RemoveObjectRoot(cx, copy); jsb_remove_proxy(p); + // Delete WebSocket instance CC_SAFE_DELETE(ws); + // Delete self at last while websocket was closed. + delete this; } virtual void onError(WebSocket* ws, const WebSocket::ErrorCode& error) @@ -180,57 +199,61 @@ void js_cocos2dx_WebSocket_finalize(JSFreeOp *fop, JSObject *obj) { bool js_cocos2dx_extension_WebSocket_send(JSContext *cx, uint32_t argc, jsval *vp) { - JS::CallArgs args = JS::CallArgsFromVp(argc, vp); - JS::RootedObject obj(cx, args.thisv().toObjectOrNull()); + JS::CallArgs argv = JS::CallArgsFromVp(argc, vp); + JS::RootedObject obj(cx, argv.thisv().toObjectOrNull()); js_proxy_t *proxy = jsb_get_js_proxy(obj); WebSocket* cobj = (WebSocket *)(proxy ? proxy->ptr : NULL); JSB_PRECONDITION2( cobj, cx, false, "Invalid Native Object"); - if(argc == 1){ - do + if(argc == 1) + { + if (argv[0].isString()) { - if (args.get(0).isString()) + ssize_t len = JS_GetStringLength(argv[0].toString()); + std::string data; + jsval_to_std_string(cx, argv[0], &data); + + if (data.empty() && len > 0) { - std::string data; - jsval_to_std_string(cx, args.get(0), &data); - cobj->send(data); - break; - } - - if (args.get(0).isObject()) - { - uint8_t *bufdata = NULL; - uint32_t len = 0; - - JSObject* jsobj = args.get(0).toObjectOrNull(); - if (JS_IsArrayBufferObject(jsobj)) - { - bufdata = JS_GetArrayBufferData(jsobj); - len = JS_GetArrayBufferByteLength(jsobj); - } - else if (JS_IsArrayBufferViewObject(jsobj)) - { - bufdata = (uint8_t*)JS_GetArrayBufferViewData(jsobj); - len = JS_GetArrayBufferViewByteLength(jsobj); - } - - if (bufdata && len > 0) - { - cobj->send(bufdata, len); - break; - } + CCLOGWARN("Text message to send is empty, but its length is greater than 0!"); + //FIXME: Note that this text message contains '0x00' prefix, so its length calcuted by strlen is 0. + // we need to fix that if there is '0x00' in text message, + // since javascript language could support '0x00' inserted at the beginning or the middle of text message } + cobj->send(data); + } + else if (argv[0].isObject()) + { + uint8_t *bufdata = NULL; + uint32_t len = 0; + + JS::RootedObject jsobj(cx, argv[0].toObjectOrNull()); + if (JS_IsArrayBufferObject(jsobj)) + { + bufdata = JS_GetArrayBufferData(jsobj); + len = JS_GetArrayBufferByteLength(jsobj); + } + else if (JS_IsArrayBufferViewObject(jsobj)) + { + bufdata = (uint8_t*)JS_GetArrayBufferViewData(jsobj); + len = JS_GetArrayBufferViewByteLength(jsobj); + } + + cobj->send(bufdata, len); + } + else + { JS_ReportError(cx, "data type to be sent is unsupported."); - - } while (0); + return false; + } + + argv.rval().setUndefined(); - args.rval().setUndefined(); - return true; } JS_ReportError(cx, "wrong number of arguments: %d, was expecting %d", argc, 0); - return true; + return false; } bool js_cocos2dx_extension_WebSocket_close(JSContext *cx, uint32_t argc, jsval *vp){ From d122784ad4cb997feae8875723983da61bd49fe4 Mon Sep 17 00:00:00 2001 From: James Chen Date: Sat, 9 Jan 2016 18:07:48 +0800 Subject: [PATCH 02/14] Adds WebSocket::closeAllConnections() method. --- cocos/network/WebSocket.cpp | 58 ++++++++++++++++++++++++++++++++----- cocos/network/WebSocket.h | 7 ++++- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 4d788929af..539106de73 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -196,6 +196,7 @@ void WsThreadHelper::quitSubThread() void WsThreadHelper::wsThreadEntryFunc() { + LOGD("WebSocket thread start, helper instance: %p\n", this); _ws->onSubThreadStarted(); while (!_needQuit) @@ -204,8 +205,8 @@ void WsThreadHelper::wsThreadEntryFunc() } _ws->onSubThreadEnded(); - - LOGD("Websocket thread exit!\n"); + + LOGD("WebSocket thread exit, helper instance: %p\n", this); } void WsThreadHelper::sendMessageToUIThread(const std::function& cb) @@ -291,6 +292,27 @@ enum WS_MSG { WS_MSG_TO_SUBTRHEAD_SENDING_BINARY, }; +static std::vector* __websocketInstances = nullptr; + +void WebSocket::closeAllConnections() +{ + if (__websocketInstances != nullptr) + { + ssize_t count = __websocketInstances->size(); + for (ssize_t i = count-1; i >=0 ; i--) + { + WebSocket* instance = __websocketInstances->at(i); + LOGD("Waiting WebSocket (%p) to exit!\n", instance); + instance->close(); + // Wait for websocket thread to exit + instance->_wsHelper->joinSubThread(); + } + + __websocketInstances->clear(); + __websocketInstances = nullptr; + } +} + WebSocket::WebSocket() : _readyState(State::CONNECTING) , _port(80) @@ -303,10 +325,17 @@ WebSocket::WebSocket() { // reserve data buffer to avoid allocate memory frequently _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE); + if (__websocketInstances == nullptr) + { + __websocketInstances = new std::vector(); + } + + __websocketInstances->push_back(this); } WebSocket::~WebSocket() { + LOGD("In the destructor of WebSocket (%p)\n", this); CC_SAFE_DELETE(_wsHelper); if (_wsProtocols != nullptr) @@ -317,6 +346,19 @@ WebSocket::~WebSocket() } } CC_SAFE_DELETE_ARRAY(_wsProtocols); + + if (__websocketInstances != nullptr) + { + auto iter = std::find(__websocketInstances->begin(), __websocketInstances->end(), this); + if (iter != __websocketInstances->end()) + { + __websocketInstances->erase(iter); + } + else + { + LOGD("ERROR: WebSocket instance (%p) added to container!\n", this); + } + } } bool WebSocket::init(const Delegate& delegate, @@ -361,7 +403,7 @@ bool WebSocket::init(const Delegate& delegate, _path = path; _SSLConnection = useSSL ? 1 : 0; - LOGD("[WebSocket::init] _host: %s, _port: %d, _path: %s", _host.c_str(), _port, _path.c_str()); + LOGD("[WebSocket::init] _host: %s, _port: %d, _path: %s\n", _host.c_str(), _port, _path.c_str()); size_t protocolCount = 0; if (protocols && protocols->size() > 0) @@ -483,7 +525,7 @@ void WebSocket::onSubThreadLoop() } else { - LOGD("Ready state is closing or was closed, code=%d, quit websocket thread!", _readyState); + LOGD("Ready state is closing or was closed, code=%d, quit websocket thread!\n", _readyState); _wsHelper->quitSubThread(); } } @@ -591,7 +633,7 @@ void WebSocket::onClientWritable() else { // If frame initialization failed, delete the frame and drop the sending data // These codes should never be called. - LOGD("WebSocketFrame initialization failed, drop the sending data, msg(%d)", (int)subThreadMsg->id); + LOGD("WebSocketFrame initialization failed, drop the sending data, msg(%d)\n", (int)subThreadMsg->id); delete frame; CC_SAFE_FREE(data->bytes); CC_SAFE_DELETE(data); @@ -755,6 +797,8 @@ void WebSocket::onConnectionOpened() void WebSocket::onConnectionError() { + LOGD("WebSocket (%p) onConnectionError ...\n", this); + _readyState = State::CLOSING; _wsHelper->sendMessageToUIThread([this](){ @@ -764,10 +808,10 @@ void WebSocket::onConnectionError() void WebSocket::onConnectionClosed() { - LOGD("%s", "connection closing..\n"); + LOGD("WebSocket (%p) onConnectionClosed ...\n", this); if (_readyState == State::CLOSED) { - LOGD("Websocket %p was closed, no need to close it again!", this); + LOGD("WebSocket (%p) was closed, no need to close it again!\n", this); return; } diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index 25775bd31f..ea2e4c37f1 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -50,7 +50,6 @@ NS_CC_BEGIN namespace network { class WsThreadHelper; -class WsMessage; /** * WebSocket is wrapper of the libwebsockets-protocol, let the develop could call the websocket easily. @@ -58,6 +57,12 @@ class WsMessage; class CC_DLL WebSocket { public: + /** + * Close all connections and wait for all websocket threads to exit + * @note This method has to be invoked on Cocos Thread + */ + static void closeAllConnections(); + /** * Constructor of WebSocket. * From 763559cd1e6bb598f060f82bb0812808f6e34e9f Mon Sep 17 00:00:00 2001 From: James Chen Date: Sat, 9 Jan 2016 18:08:26 +0800 Subject: [PATCH 03/14] Invoking WebSocket::closeAllConnection while reseting Director. --- cocos/base/CCDirector.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cocos/base/CCDirector.cpp b/cocos/base/CCDirector.cpp index 1ac33741dc..e11999f911 100644 --- a/cocos/base/CCDirector.cpp +++ b/cocos/base/CCDirector.cpp @@ -61,6 +61,7 @@ THE SOFTWARE. #include "base/CCConfiguration.h" #include "base/CCAsyncTaskPool.h" #include "platform/CCApplication.h" +#include "network/WebSocket.h" #if CC_ENABLE_SCRIPT_BINDING #include "CCScriptSupport.h" @@ -930,6 +931,9 @@ void Director::reset() _runningScene = nullptr; _nextScene = nullptr; + // Close all websocket connection. It has to be invoked before cleaning scheduler + network::WebSocket::closeAllConnections(); + // cleanup scheduler getScheduler()->unscheduleAll(); From 2b5fde79e04a18f45ce14fbdb0048efc75ae9f05 Mon Sep 17 00:00:00 2001 From: James Chen Date: Mon, 11 Jan 2016 09:56:27 +0800 Subject: [PATCH 04/14] Comment fix & function name fix for WsThreadHelper: sendMessageToUIThread -> sendMessageToCocosThread sendMessageToSubThread -> sendMessageToWebSocketThread joinSubThread -> joinWebSocketThread --- cocos/network/WebSocket.cpp | 52 ++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 539106de73..0fca094743 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -123,14 +123,14 @@ public: // Quits sub-thread (websocket thread). void quitSubThread(); - // Sends message to UI thread. It's needed to be invoked in sub-thread. - void sendMessageToUIThread(const std::function& cb); + // Sends message to Cocos thread. It's needed to be invoked in Websocket thread. + void sendMessageToCocosThread(const std::function& cb); - // Sends message to sub-thread(websocket thread). It's needs to be invoked in UI thread. - void sendMessageToSubThread(WsMessage *msg); + // Sends message to Websocket thread. It's needs to be invoked in Cocos thread. + void sendMessageToWebSocketThread(WsMessage *msg); // Waits the sub-thread (websocket thread) to exit, - void joinSubThread(); + void joinWebSocketThread(); protected: void wsThreadEntryFunc(); @@ -170,12 +170,12 @@ WsThreadHelper::WsThreadHelper() , _ws(nullptr) , _needQuit(false) { - _subThreadWsMessageQueue = new std::list(); + _subThreadWsMessageQueue = new (std::nothrow) std::list(); } WsThreadHelper::~WsThreadHelper() { - joinSubThread(); + joinWebSocketThread(); CC_SAFE_DELETE(_subThreadInstance); delete _subThreadWsMessageQueue; } @@ -185,7 +185,7 @@ bool WsThreadHelper::createThread(const WebSocket& ws) _ws = const_cast(&ws); // Creates websocket thread - _subThreadInstance = new std::thread(&WsThreadHelper::wsThreadEntryFunc, this); + _subThreadInstance = new (std::nothrow) std::thread(&WsThreadHelper::wsThreadEntryFunc, this); return true; } @@ -209,18 +209,18 @@ void WsThreadHelper::wsThreadEntryFunc() LOGD("WebSocket thread exit, helper instance: %p\n", this); } -void WsThreadHelper::sendMessageToUIThread(const std::function& cb) +void WsThreadHelper::sendMessageToCocosThread(const std::function& cb) { Director::getInstance()->getScheduler()->performFunctionInCocosThread(cb); } -void WsThreadHelper::sendMessageToSubThread(WsMessage *msg) +void WsThreadHelper::sendMessageToWebSocketThread(WsMessage *msg) { std::lock_guard lk(_subThreadWsMessageQueueMutex); _subThreadWsMessageQueue->push_back(msg); } -void WsThreadHelper::joinSubThread() +void WsThreadHelper::joinWebSocketThread() { if (_subThreadInstance->joinable()) { @@ -305,7 +305,7 @@ void WebSocket::closeAllConnections() LOGD("Waiting WebSocket (%p) to exit!\n", instance); instance->close(); // Wait for websocket thread to exit - instance->_wsHelper->joinSubThread(); + instance->_wsHelper->joinWebSocketThread(); } __websocketInstances->clear(); @@ -327,7 +327,7 @@ WebSocket::WebSocket() _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE); if (__websocketInstances == nullptr) { - __websocketInstances = new std::vector(); + __websocketInstances = new (std::nothrow) std::vector(); } __websocketInstances->push_back(this); @@ -356,7 +356,7 @@ WebSocket::~WebSocket() } else { - LOGD("ERROR: WebSocket instance (%p) added to container!\n", this); + LOGD("ERROR: WebSocket instance (%p) wasn't added to the container which saves websocket instances!\n", this); } } } @@ -415,7 +415,7 @@ bool WebSocket::init(const Delegate& delegate, protocolCount = 1; } - _wsProtocols = new lws_protocols[protocolCount+1]; + _wsProtocols = new (std::nothrow) lws_protocols[protocolCount+1]; memset(_wsProtocols, 0, sizeof(lws_protocols)*(protocolCount+1)); if (protocols && protocols->size() > 0) @@ -423,7 +423,7 @@ bool WebSocket::init(const Delegate& delegate, int i = 0; for (std::vector::const_iterator iter = protocols->begin(); iter != protocols->end(); ++iter, ++i) { - char* name = new char[(*iter).length()+1]; + char* name = new (std::nothrow) char[(*iter).length()+1]; strcpy(name, (*iter).c_str()); _wsProtocols[i].name = name; _wsProtocols[i].callback = WebSocketCallbackWrapper::onSocketCallback; @@ -432,7 +432,7 @@ bool WebSocket::init(const Delegate& delegate, } else { - char* name = new char[20]; + char* name = new (std::nothrow) char[20]; strcpy(name, "default-protocol"); _wsProtocols[0].name = name; _wsProtocols[0].callback = WebSocketCallbackWrapper::onSocketCallback; @@ -461,7 +461,7 @@ void WebSocket::send(const std::string& message) WsMessage* msg = new (std::nothrow) WsMessage(); msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING; msg->obj = data; - _wsHelper->sendMessageToSubThread(msg); + _wsHelper->sendMessageToWebSocketThread(msg); } else { @@ -491,7 +491,7 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) WsMessage* msg = new (std::nothrow) WsMessage(); msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_BINARY; msg->obj = data; - _wsHelper->sendMessageToSubThread(msg); + _wsHelper->sendMessageToWebSocketThread(msg); } else { @@ -624,7 +624,7 @@ void WebSocket::onClientWritable() } else { - frame = new WebSocketFrame(); + frame = new (std::nothrow) WebSocketFrame(); bool success = frame && frame->init((unsigned char*)(data->bytes + data->issued), n); if (success) { @@ -750,7 +750,7 @@ void WebSocket::onClientReceivedData(void* in, ssize_t len) if (remainingSize == 0 && isFinalFragment) { - std::vector* frameData = new std::vector(std::move(_receivedData)); + std::vector* frameData = new (std::nothrow) std::vector(std::move(_receivedData)); // reset capacity of received data buffer _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE); @@ -764,7 +764,7 @@ void WebSocket::onClientReceivedData(void* in, ssize_t len) frameData->push_back('\0'); } - _wsHelper->sendMessageToUIThread([this, frameData, frameSize, isBinary](){ + _wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary](){ // In UI thread LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize); @@ -790,7 +790,7 @@ void WebSocket::onConnectionOpened() _readyState = State::OPEN; - _wsHelper->sendMessageToUIThread([this](){ + _wsHelper->sendMessageToCocosThread([this](){ _delegate->onOpen(this); }); } @@ -801,7 +801,7 @@ void WebSocket::onConnectionError() _readyState = State::CLOSING; - _wsHelper->sendMessageToUIThread([this](){ + _wsHelper->sendMessageToCocosThread([this](){ _delegate->onError(this, ErrorCode::CONNECTION_FAILURE); }); } @@ -818,9 +818,9 @@ void WebSocket::onConnectionClosed() _readyState = State::CLOSED; _wsHelper->quitSubThread(); - _wsHelper->sendMessageToUIThread([this](){ + _wsHelper->sendMessageToCocosThread([this](){ //Waiting for the subThread safety exit - _wsHelper->joinSubThread(); + _wsHelper->joinWebSocketThread(); _delegate->onClose(this); }); } From 5d1e98340e854d568f40537f4ea402fbf6802738 Mon Sep 17 00:00:00 2001 From: James Chen Date: Wed, 13 Jan 2016 18:53:38 +0800 Subject: [PATCH 05/14] Updates WebSocket::onSocketCallback & WebSocketTest.js --- cocos/network/WebSocket.cpp | 36 +++---- .../NetworkTest/WebSocketTest.js | 102 ++++++++++-------- 2 files changed, 71 insertions(+), 67 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 0fca094743..9e9484af55 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -808,13 +808,13 @@ void WebSocket::onConnectionError() void WebSocket::onConnectionClosed() { - LOGD("WebSocket (%p) onConnectionClosed ...\n", this); if (_readyState == State::CLOSED) { LOGD("WebSocket (%p) was closed, no need to close it again!\n", this); return; } - + + LOGD("WebSocket (%p) onConnectionClosed ...\n", this); _readyState = State::CLOSED; _wsHelper->quitSubThread(); @@ -829,35 +829,19 @@ int WebSocket::onSocketCallback(struct lws *wsi, int reason, void *user, void *in, ssize_t len) { - //LOGD("socket callback for %d reason", reason); + //LOGD("socket callback for %d reason\n", reason); switch (reason) { - case LWS_CALLBACK_DEL_POLL_FD: - case LWS_CALLBACK_PROTOCOL_DESTROY: - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - { - if (reason == LWS_CALLBACK_CLIENT_CONNECTION_ERROR - || (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CONNECTING) - || (reason == LWS_CALLBACK_DEL_POLL_FD && _readyState == State::CONNECTING) - ) - { - onConnectionError(); - } - else if (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CLOSING) - { - onConnectionClosed(); - } - } - break; case LWS_CALLBACK_CLIENT_ESTABLISHED: onConnectionOpened(); break; - - case LWS_CALLBACK_CLIENT_WRITEABLE: - onClientWritable(); + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + onConnectionError(); break; + case LWS_CALLBACK_PROTOCOL_DESTROY: case LWS_CALLBACK_CLOSED: onConnectionClosed(); break; @@ -865,7 +849,13 @@ int WebSocket::onSocketCallback(struct lws *wsi, case LWS_CALLBACK_CLIENT_RECEIVE: onClientReceivedData(in, len); break; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + onClientWritable(); + break; + default: +// LOGD("Unhandled websocket event: %d\n", reason); break; } diff --git a/tests/js-tests/src/ExtensionsTest/NetworkTest/WebSocketTest.js b/tests/js-tests/src/ExtensionsTest/NetworkTest/WebSocketTest.js index 4455043a34..31f262c578 100644 --- a/tests/js-tests/src/ExtensionsTest/NetworkTest/WebSocketTest.js +++ b/tests/js-tests/src/ExtensionsTest/NetworkTest/WebSocketTest.js @@ -121,7 +121,12 @@ var WebSocketTestLayer = cc.Layer.extend({ }; this._wsiSendText.onerror = function(evt) { - cc.log("sendText Error was fired"); + cc.log("_wsiSendText Error was fired"); + if (cc.sys.isObjectValid(self)) { + self._errorStatus.setString("an error was fired"); + } else { + cc.log("WebSocket test layer was destroyed!"); + } }; this._wsiSendText.onclose = function(evt) { @@ -130,55 +135,64 @@ var WebSocketTestLayer = cc.Layer.extend({ }; - this._wsiSendBinary = new WebSocket("ws://echo.websocket.org"); - this._wsiSendBinary.binaryType = "arraybuffer"; - this._wsiSendBinary.onopen = function(evt) { - self._sendBinaryStatus.setString("Send Binary WS was opened."); - }; + this._wsiSendBinary = new WebSocket("ws://echo.websocket.org"); + this._wsiSendBinary.binaryType = "arraybuffer"; + this._wsiSendBinary.onopen = function(evt) { + self._sendBinaryStatus.setString("Send Binary WS was opened."); + }; - this._wsiSendBinary.onmessage = function(evt) { - self._sendBinaryTimes++; - var binary = new Uint16Array(evt.data); - var binaryStr = "response bin msg: "; + this._wsiSendBinary.onmessage = function(evt) { + self._sendBinaryTimes++; + var binary = new Uint16Array(evt.data); + var binaryStr = "response bin msg: "; - var str = ""; - for (var i = 0; i < binary.length; i++) { - if (binary[i] == 0) - { - str += "\'\\0\'"; - } - else - { - var hexChar = "0x" + binary[i].toString("16").toUpperCase(); - str += String.fromCharCode(hexChar); - } - } + var str = ""; + for (var i = 0; i < binary.length; i++) { + if (binary[i] == 0) + { + str += "\'\\0\'"; + } + else + { + var hexChar = "0x" + binary[i].toString("16").toUpperCase(); + str += String.fromCharCode(hexChar); + } + } - binaryStr += str + ", " + self._sendBinaryTimes; - cc.log(binaryStr); - self._sendBinaryStatus.setString(binaryStr); - }; + binaryStr += str + ", " + self._sendBinaryTimes; + cc.log(binaryStr); + self._sendBinaryStatus.setString(binaryStr); + }; - this._wsiSendBinary.onerror = function(evt) { - cc.log("sendBinary Error was fired"); - }; + this._wsiSendBinary.onerror = function(evt) { + cc.log("_wsiSendBinary Error was fired"); + if (cc.sys.isObjectValid(self)) { + self._errorStatus.setString("an error was fired"); + } else { + cc.log("WebSocket test layer was destroyed!"); + } + }; - this._wsiSendBinary.onclose = function(evt) { - cc.log("_wsiSendBinary websocket instance closed."); - self._wsiSendBinary = null; - }; + this._wsiSendBinary.onclose = function(evt) { + cc.log("_wsiSendBinary websocket instance closed."); + self._wsiSendBinary = null; + }; - this._wsiError = new WebSocket("ws://invalid.url.com"); - this._wsiError.onopen = function(evt) {}; - this._wsiError.onmessage = function(evt) {}; - this._wsiError.onerror = function(evt) { - cc.log("Error was fired"); - self._errorStatus.setString("an error was fired"); - }; - this._wsiError.onclose = function(evt) { - cc.log("_wsiError websocket instance closed."); - self._wsiError = null; - }; + this._wsiError = new WebSocket("ws://invalidurlxxxyyy.com"); + this._wsiError.onopen = function(evt) {}; + this._wsiError.onmessage = function(evt) {}; + this._wsiError.onerror = function(evt) { + cc.log("_wsiError Error was fired"); + if (cc.sys.isObjectValid(self)) { + self._errorStatus.setString("an error was fired"); + } else { + cc.log("WebSocket test layer was destroyed!"); + } + }; + this._wsiError.onclose = function(evt) { + cc.log("_wsiError websocket instance closed."); + self._wsiError = null; + }; return true; }, From fc38857e6b110d5aa2a131b2bf5098d32c5ce1df Mon Sep 17 00:00:00 2001 From: James Chen Date: Wed, 13 Jan 2016 20:48:03 +0800 Subject: [PATCH 06/14] LWS_SEND_BUFFER_PRE_PADDING -> LWS_PRE --- cocos/network/WebSocket.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 9e9484af55..edc7537050 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -250,19 +250,14 @@ public: return false; } - _data.reserve(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING); - _data.resize(LWS_SEND_BUFFER_PRE_PADDING, 0x00); + _data.reserve(LWS_PRE + len); + _data.resize(LWS_PRE, 0x00); if (len > 0) { _data.insert(_data.end(), buf, buf + len); } - if (LWS_SEND_BUFFER_POST_PADDING > 0) - { - _data.insert(_data.end(), LWS_SEND_BUFFER_POST_PADDING, 0x00); - } - - _payload = _data.data() + LWS_SEND_BUFFER_PRE_PADDING; + _payload = _data.data() + LWS_PRE; _payloadLength = len; _frameLength = len; return true; From 649ed6737157a932a569d5815c1957a600dcf822 Mon Sep 17 00:00:00 2001 From: James Chen Date: Wed, 13 Jan 2016 20:59:10 +0800 Subject: [PATCH 07/14] Removes lws_get_internal_extensions, adds permessage-deflate extension. --- cocos/network/WebSocket.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index edc7537050..86bf349780 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -527,6 +527,20 @@ void WebSocket::onSubThreadLoop() void WebSocket::onSubThreadStarted() { + static const struct lws_extension exts[] = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate; client_no_context_takeover; client_max_window_bits" + }, + { + "deflate-frame", + lws_extension_callback_pm_deflate, + "deflate_frame" + }, + { nullptr, nullptr, nullptr /* terminator */ } + }; + struct lws_context_creation_info info; memset(&info, 0, sizeof info); /* @@ -539,7 +553,7 @@ void WebSocket::onSubThreadStarted() info.port = CONTEXT_PORT_NO_LISTEN; info.protocols = _wsProtocols; - info.extensions = lws_get_internal_extensions(); + info.extensions = exts; info.gid = -1; info.uid = -1; From dacd51c839b5fca029925f66df285fc6fa5540b8 Mon Sep 17 00:00:00 2001 From: James Chen Date: Wed, 13 Jan 2016 21:04:48 +0800 Subject: [PATCH 08/14] More function name renames. createThread -> createWebSocket, quitSubThread -> quitWebSocketThread. --- cocos/network/WebSocket.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 86bf349780..333a70c6cc 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -119,9 +119,9 @@ public: ~WsThreadHelper(); // Creates a new thread - bool createThread(const WebSocket& ws); - // Quits sub-thread (websocket thread). - void quitSubThread(); + bool createWebSocketThread(const WebSocket& ws); + // Quits websocket thread. + void quitWebSocketThread(); // Sends message to Cocos thread. It's needed to be invoked in Websocket thread. void sendMessageToCocosThread(const std::function& cb); @@ -180,7 +180,7 @@ WsThreadHelper::~WsThreadHelper() delete _subThreadWsMessageQueue; } -bool WsThreadHelper::createThread(const WebSocket& ws) +bool WsThreadHelper::createWebSocketThread(const WebSocket& ws) { _ws = const_cast(&ws); @@ -189,7 +189,7 @@ bool WsThreadHelper::createThread(const WebSocket& ws) return true; } -void WsThreadHelper::quitSubThread() +void WsThreadHelper::quitWebSocketThread() { _needQuit = true; } @@ -436,7 +436,7 @@ bool WebSocket::init(const Delegate& delegate, // WebSocket thread needs to be invoked at the end of this method. _wsHelper = new (std::nothrow) WsThreadHelper(); - ret = _wsHelper->createThread(*this); + ret = _wsHelper->createWebSocketThread(*this); return ret; } @@ -496,7 +496,7 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) void WebSocket::close() { - _wsHelper->quitSubThread(); + _wsHelper->quitWebSocketThread(); } WebSocket::State WebSocket::getReadyState() @@ -521,7 +521,7 @@ void WebSocket::onSubThreadLoop() else { LOGD("Ready state is closing or was closed, code=%d, quit websocket thread!\n", _readyState); - _wsHelper->quitSubThread(); + _wsHelper->quitWebSocketThread(); } } @@ -826,7 +826,7 @@ void WebSocket::onConnectionClosed() LOGD("WebSocket (%p) onConnectionClosed ...\n", this); _readyState = State::CLOSED; - _wsHelper->quitSubThread(); + _wsHelper->quitWebSocketThread(); _wsHelper->sendMessageToCocosThread([this](){ //Waiting for the subThread safety exit _wsHelper->joinWebSocketThread(); From f0066d03d76bc47db2d4e6a3d4872d0515c604c5 Mon Sep 17 00:00:00 2001 From: James Chen Date: Wed, 13 Jan 2016 21:07:04 +0800 Subject: [PATCH 09/14] whitespace cleanup. --- cocos/network/WebSocket.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 333a70c6cc..da2f78b658 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -205,7 +205,7 @@ void WsThreadHelper::wsThreadEntryFunc() } _ws->onSubThreadEnded(); - + LOGD("WebSocket thread exit, helper instance: %p\n", this); } @@ -302,12 +302,12 @@ void WebSocket::closeAllConnections() // Wait for websocket thread to exit instance->_wsHelper->joinWebSocketThread(); } - + __websocketInstances->clear(); __websocketInstances = nullptr; } } - + WebSocket::WebSocket() : _readyState(State::CONNECTING) , _port(80) @@ -324,7 +324,7 @@ WebSocket::WebSocket() { __websocketInstances = new (std::nothrow) std::vector(); } - + __websocketInstances->push_back(this); } @@ -341,7 +341,7 @@ WebSocket::~WebSocket() } } CC_SAFE_DELETE_ARRAY(_wsProtocols); - + if (__websocketInstances != nullptr) { auto iter = std::find(__websocketInstances->begin(), __websocketInstances->end(), this); @@ -540,7 +540,7 @@ void WebSocket::onSubThreadStarted() }, { nullptr, nullptr, nullptr /* terminator */ } }; - + struct lws_context_creation_info info; memset(&info, 0, sizeof info); /* @@ -807,7 +807,7 @@ void WebSocket::onConnectionOpened() void WebSocket::onConnectionError() { LOGD("WebSocket (%p) onConnectionError ...\n", this); - + _readyState = State::CLOSING; _wsHelper->sendMessageToCocosThread([this](){ @@ -822,7 +822,7 @@ void WebSocket::onConnectionClosed() LOGD("WebSocket (%p) was closed, no need to close it again!\n", this); return; } - + LOGD("WebSocket (%p) onConnectionClosed ...\n", this); _readyState = State::CLOSED; @@ -845,7 +845,7 @@ int WebSocket::onSocketCallback(struct lws *wsi, case LWS_CALLBACK_CLIENT_ESTABLISHED: onConnectionOpened(); break; - + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: onConnectionError(); break; @@ -858,11 +858,11 @@ int WebSocket::onSocketCallback(struct lws *wsi, case LWS_CALLBACK_CLIENT_RECEIVE: onClientReceivedData(in, len); break; - + case LWS_CALLBACK_CLIENT_WRITEABLE: onClientWritable(); break; - + default: // LOGD("Unhandled websocket event: %d\n", reason); break; From ed9ed6c92f25dd2f7f9b1b2aab58c18c18d8478c Mon Sep 17 00:00:00 2001 From: James Chen Date: Fri, 22 Jan 2016 14:22:33 +0800 Subject: [PATCH 10/14] Don't change the behavior of WebSocket::close method. WebSocket::close is an synchronous method before, adds a WebSocket::closeAsync method for asynchronous closing. --- cocos/network/WebSocket.cpp | 15 ++++++++++----- cocos/network/WebSocket.h | 11 ++++++++++- .../js-bindings/manual/network/jsb_websocket.cpp | 4 ++-- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index da2f78b658..40e7d2b8df 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -297,10 +297,7 @@ void WebSocket::closeAllConnections() for (ssize_t i = count-1; i >=0 ; i--) { WebSocket* instance = __websocketInstances->at(i); - LOGD("Waiting WebSocket (%p) to exit!\n", instance); instance->close(); - // Wait for websocket thread to exit - instance->_wsHelper->joinWebSocketThread(); } __websocketInstances->clear(); @@ -495,6 +492,14 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) } void WebSocket::close() +{ + _wsHelper->quitWebSocketThread(); + // Wait for websocket thread to exit + LOGD("Waiting WebSocket (%p) to exit!\n", this); + _wsHelper->joinWebSocketThread(); +} + +void WebSocket::closeAsync() { _wsHelper->quitWebSocketThread(); } @@ -691,7 +696,7 @@ void WebSocket::onClientWritable() _wsHelper->_subThreadWsMessageQueue->erase(iter); CC_SAFE_DELETE(subThreadMsg); - close(); + closeAsync(); } else if (bytesWrite < frame->getPayloadLength()) { @@ -720,7 +725,7 @@ void WebSocket::onClientWritable() { LOGD("ERROR: msg(%u), remaining(%d) < bytesWrite(%d)\n", subThreadMsg->id, (int)remaining, (int)frame->getFrameLength()); LOGD("Drop the msg(%u)\n", subThreadMsg->id); - close(); + closeAsync(); } CC_SAFE_FREE(data->bytes); diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index ea2e4c37f1..2af36fc817 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -185,9 +185,18 @@ public: void send(const unsigned char* binaryMsg, unsigned int len); /** - * @brief Closes the connection to server. + * @brief Closes the connection to server synchronously. + * @note It's a synchronous method, it will not return until websocket thread exits. */ void close(); + + /** + * @brief Closes the connection to server asynchronously. + * @note It's an asynchronous method, it just notifies websocket thread to exit and returns directly, + * If using 'closeAsync' to close websocket connection, + * be carefull of not using destructed variables in the callback of 'onClose'. + */ + void closeAsync(); /** * @brief Gets current state of connection. diff --git a/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp b/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp index 8b400dcc62..686b3d5132 100644 --- a/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp +++ b/cocos/scripting/js-bindings/manual/network/jsb_websocket.cpp @@ -130,7 +130,7 @@ public: if (dataVal.isNullOrUndefined()) { - ws->close(); + ws->closeAsync(); return; } JS_SetProperty(cx, jsobj, "data", dataVal); @@ -264,7 +264,7 @@ bool js_cocos2dx_extension_WebSocket_close(JSContext *cx, uint32_t argc, jsval * JSB_PRECONDITION2( cobj, cx, false, "Invalid Native Object"); if(argc == 0){ - cobj->close(); + cobj->closeAsync(); args.rval().setUndefined(); return true; } From a64a7893083089a1389bdb1337f16da38468d9d6 Mon Sep 17 00:00:00 2001 From: James Chen Date: Fri, 22 Jan 2016 15:20:43 +0800 Subject: [PATCH 11/14] _readyState must be set to State::CLOSED and _delegate->onClose(this) must be invoked at the end of WebSocket::close. --- cocos/network/WebSocket.cpp | 8 +++++++- cocos/network/WebSocket.h | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 40e7d2b8df..0e0314f51b 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -494,9 +494,15 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) void WebSocket::close() { _wsHelper->quitWebSocketThread(); - // Wait for websocket thread to exit + // Sets the state to 'closed' to make sure 'onConnectionClosed' which is + // invoked by websocket thread don't post 'close' message to Cocos thread since + // WebSocket instance is destroyed at next frame. + _readyState = State::CLOSED; LOGD("Waiting WebSocket (%p) to exit!\n", this); _wsHelper->joinWebSocketThread(); + // Since 'onConnectionClosed' didn't post message to Cocos Thread for invoking 'onClose' callback, do it here. + // onClose must be invoked at the end of this method. + _delegate->onClose(this); } void WebSocket::closeAsync() diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index 2af36fc817..5fe4fd1ab1 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -53,6 +53,7 @@ class WsThreadHelper; /** * WebSocket is wrapper of the libwebsockets-protocol, let the develop could call the websocket easily. + * Please note that all public methods of WebSocket have to be invoked on Cocos Thread. */ class CC_DLL WebSocket { From c3e0edde0664193f64e03b638f52a819ea5343f6 Mon Sep 17 00:00:00 2001 From: James Chen Date: Fri, 22 Jan 2016 16:10:49 +0800 Subject: [PATCH 12/14] 'closed' state has to be set before quit websocket thread. --- cocos/network/WebSocket.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index 0e0314f51b..a2a5d7faf1 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -493,11 +493,13 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) void WebSocket::close() { - _wsHelper->quitWebSocketThread(); // Sets the state to 'closed' to make sure 'onConnectionClosed' which is // invoked by websocket thread don't post 'close' message to Cocos thread since // WebSocket instance is destroyed at next frame. + // 'closed' state has to be set before quit websocket thread. _readyState = State::CLOSED; + + _wsHelper->quitWebSocketThread(); LOGD("Waiting WebSocket (%p) to exit!\n", this); _wsHelper->joinWebSocketThread(); // Since 'onConnectionClosed' didn't post message to Cocos Thread for invoking 'onClose' callback, do it here. From b73b16d425a86b223bdb69b2345f1013cfa097bf Mon Sep 17 00:00:00 2001 From: James Chen Date: Fri, 22 Jan 2016 17:43:30 +0800 Subject: [PATCH 13/14] Adds isDestroy std::share_ptr variable to dectect whether websocket instance was destroyed. --- cocos/network/WebSocket.cpp | 78 +++++++++++++++++++++++++++++++------ cocos/network/WebSocket.h | 2 + 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/cocos/network/WebSocket.cpp b/cocos/network/WebSocket.cpp index a2a5d7faf1..8ea5f727ec 100644 --- a/cocos/network/WebSocket.cpp +++ b/cocos/network/WebSocket.cpp @@ -311,6 +311,7 @@ WebSocket::WebSocket() , _wsHelper(nullptr) , _wsInstance(nullptr) , _wsContext(nullptr) +, _isDestroyed(std::make_shared(false)) , _delegate(nullptr) , _SSLConnection(0) , _wsProtocols(nullptr) @@ -351,6 +352,7 @@ WebSocket::~WebSocket() LOGD("ERROR: WebSocket instance (%p) wasn't added to the container which saves websocket instances!\n", this); } } + *_isDestroyed = true; } bool WebSocket::init(const Delegate& delegate, @@ -493,11 +495,19 @@ void WebSocket::send(const unsigned char* binaryMsg, unsigned int len) void WebSocket::close() { + _readStateMutex.lock(); + if (_readyState == State::CLOSED) + { + LOGD("close: WebSocket (%p) was closed, no need to close it again!\n", this); + _readStateMutex.unlock(); + return; + } // Sets the state to 'closed' to make sure 'onConnectionClosed' which is // invoked by websocket thread don't post 'close' message to Cocos thread since // WebSocket instance is destroyed at next frame. // 'closed' state has to be set before quit websocket thread. _readyState = State::CLOSED; + _readStateMutex.unlock(); _wsHelper->quitWebSocketThread(); LOGD("Waiting WebSocket (%p) to exit!\n", this); @@ -514,13 +524,16 @@ void WebSocket::closeAsync() WebSocket::State WebSocket::getReadyState() { + std::lock_guard lk(_readStateMutex); return _readyState; } void WebSocket::onSubThreadLoop() { + _readStateMutex.lock(); if (_wsContext && _readyState != State::CLOSED && _readyState != State::CLOSING) { + _readStateMutex.unlock(); _wsHelper->_subThreadWsMessageQueueMutex.lock(); bool isEmpty = _wsHelper->_subThreadWsMessageQueue->empty(); _wsHelper->_subThreadWsMessageQueueMutex.unlock(); @@ -534,6 +547,7 @@ void WebSocket::onSubThreadLoop() else { LOGD("Ready state is closing or was closed, code=%d, quit websocket thread!\n", _readyState); + _readStateMutex.unlock(); _wsHelper->quitWebSocketThread(); } } @@ -580,7 +594,10 @@ void WebSocket::onSubThreadStarted() if (nullptr != _wsContext) { + _readStateMutex.lock(); _readyState = State::CONNECTING; + _readStateMutex.unlock(); + std::string name; for (int i = 0; _wsProtocols[i].callback != nullptr; ++i) { @@ -786,7 +803,8 @@ void WebSocket::onClientReceivedData(void* in, ssize_t len) frameData->push_back('\0'); } - _wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary](){ + std::shared_ptr isDestroyed = _isDestroyed; + _wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary, isDestroyed](){ // In UI thread LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize); @@ -795,7 +813,14 @@ void WebSocket::onClientReceivedData(void* in, ssize_t len) data.bytes = (char*)frameData->data(); data.len = frameSize; - _delegate->onMessage(this, data); + if (*isDestroyed) + { + LOGD("WebSocket instance was destroyed!\n"); + } + else + { + _delegate->onMessage(this, data); + } delete frameData; }); @@ -810,10 +835,20 @@ void WebSocket::onConnectionOpened() */ lws_callback_on_writable(_wsInstance); + _readStateMutex.lock(); _readyState = State::OPEN; + _readStateMutex.unlock(); - _wsHelper->sendMessageToCocosThread([this](){ - _delegate->onOpen(this); + std::shared_ptr isDestroyed = _isDestroyed; + _wsHelper->sendMessageToCocosThread([this, isDestroyed](){ + if (*isDestroyed) + { + LOGD("WebSocket instance was destroyed!\n"); + } + else + { + _delegate->onOpen(this); + } }); } @@ -821,29 +856,50 @@ void WebSocket::onConnectionError() { LOGD("WebSocket (%p) onConnectionError ...\n", this); + _readStateMutex.lock(); _readyState = State::CLOSING; + _readStateMutex.unlock(); - _wsHelper->sendMessageToCocosThread([this](){ - _delegate->onError(this, ErrorCode::CONNECTION_FAILURE); + std::shared_ptr isDestroyed = _isDestroyed; + _wsHelper->sendMessageToCocosThread([this, isDestroyed](){ + if (*isDestroyed) + { + LOGD("WebSocket instance was destroyed!\n"); + } + else + { + _delegate->onError(this, ErrorCode::CONNECTION_FAILURE); + } }); } void WebSocket::onConnectionClosed() { + _readStateMutex.lock(); if (_readyState == State::CLOSED) { - LOGD("WebSocket (%p) was closed, no need to close it again!\n", this); + LOGD("onConnectionClosed: WebSocket (%p) was closed, no need to close it again!\n", this); + _readStateMutex.unlock(); return; } LOGD("WebSocket (%p) onConnectionClosed ...\n", this); _readyState = State::CLOSED; + _readStateMutex.unlock(); _wsHelper->quitWebSocketThread(); - _wsHelper->sendMessageToCocosThread([this](){ - //Waiting for the subThread safety exit - _wsHelper->joinWebSocketThread(); - _delegate->onClose(this); + std::shared_ptr isDestroyed = _isDestroyed; + _wsHelper->sendMessageToCocosThread([this, isDestroyed](){ + if (*isDestroyed) + { + LOGD("WebSocket instance was destroyed!\n"); + } + else + { + // Waiting for the subThread safety exit + _wsHelper->joinWebSocketThread(); + _delegate->onClose(this); + } }); } diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index 5fe4fd1ab1..a8bee07b61 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -220,6 +220,7 @@ private: void onConnectionClosed(); private: + std::mutex _readStateMutex; State _readyState; std::string _host; unsigned int _port; @@ -233,6 +234,7 @@ private: struct lws* _wsInstance; struct lws_context* _wsContext; + std::shared_ptr _isDestroyed; Delegate* _delegate; int _SSLConnection; struct lws_protocols* _wsProtocols; From 28a3f78c65ba61c8e05538a702d551562fdb2846 Mon Sep 17 00:00:00 2001 From: James Chen Date: Fri, 22 Jan 2016 18:10:24 +0800 Subject: [PATCH 14/14] Adds missing , include files in WebSocket.h --- cocos/network/WebSocket.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cocos/network/WebSocket.h b/cocos/network/WebSocket.h index a8bee07b61..d98a2dbb68 100644 --- a/cocos/network/WebSocket.h +++ b/cocos/network/WebSocket.h @@ -32,6 +32,8 @@ #include #include +#include +#include // for std::shared_ptr #include "platform/CCPlatformMacros.h" #include "platform/CCStdC.h"