add async task callback dipatcher, call it in the main thread

This commit is contained in:
yangxiao 2014-12-04 18:31:10 +08:00
parent f17df4b0bc
commit bc57e47ddb
5 changed files with 129 additions and 26 deletions

View File

@ -92,25 +92,24 @@ void Sprite3D::createAsync(const std::string &modelPath, const std::string &text
}
sprite->_asyncLoadParam.afterLoadCallback = callback;
sprite->_asyncLoadParam.asyncFuture = AsyncTaskPool::getInstance()->enqueue(AsyncTaskPool::TaskType::TASK_IO, [](Sprite3D* sp, const std::string& path)
sprite->_asyncLoadParam.texPath = texturePath;
sprite->_asyncLoadParam.path = modelPath;
sprite->_asyncLoadParam.materialdatas = new (std::nothrow) MaterialDatas();
sprite->_asyncLoadParam.meshdatas = new (std::nothrow) MeshDatas();
sprite->_asyncLoadParam.nodeDatas = new (std::nothrow) NodeDatas();
AsyncTaskPool::getInstance()->enqueue(AsyncTaskPool::TaskType::TASK_IO, CC_CALLBACK_1(Sprite3D::afterAsyncLoad, sprite), (void*)(&sprite->_asyncLoadParam), [](Sprite3D* sp)
{
sp->_asyncLoadParam.materialdatas = new (std::nothrow) MaterialDatas();
sp->_asyncLoadParam.meshdatas = new (std::nothrow) MeshDatas();
sp->_asyncLoadParam.nodeDatas = new (std::nothrow) NodeDatas();
sp->_asyncLoadParam.path = path;
sp->_asyncLoadParam.result = sp->loadFromFile(path, sp->_asyncLoadParam.nodeDatas, sp->_asyncLoadParam.meshdatas, sp->_asyncLoadParam.materialdatas);
}, sprite, modelPath);
sprite->schedule(SEL_SCHEDULE(&Sprite3D::asyncLoadChecker), 0);
sp->_asyncLoadParam.result = sp->loadFromFile(sp->_asyncLoadParam.path, sp->_asyncLoadParam.nodeDatas, sp->_asyncLoadParam.meshdatas, sp->_asyncLoadParam.materialdatas);
}, sprite);
}
void Sprite3D::asyncLoadChecker(float delta)
void Sprite3D::afterAsyncLoad(void* param)
{
static std::chrono::milliseconds span(0);
if (_asyncLoadParam.asyncFuture.valid() && _asyncLoadParam.asyncFuture.wait_for(span) == std::future_status::ready)
Sprite3D::AsyncLoadParam* asyncParam = (Sprite3D::AsyncLoadParam*)param;
if (asyncParam)
{
if (_asyncLoadParam.result)
if (asyncParam->result)
{
_meshes.clear();
_meshVertexDatas.clear();
@ -118,9 +117,9 @@ void Sprite3D::asyncLoadChecker(float delta)
removeAllAttachNode();
//create in the main thread
auto& meshdatas = _asyncLoadParam.meshdatas;
auto& materialdatas = _asyncLoadParam.materialdatas;
auto& nodeDatas = _asyncLoadParam.nodeDatas;
auto& meshdatas = asyncParam->meshdatas;
auto& materialdatas = asyncParam->materialdatas;
auto& nodeDatas = asyncParam->nodeDatas;
if (initFrom(*nodeDatas, *meshdatas, *materialdatas))
{
//add to cache
@ -132,7 +131,7 @@ void Sprite3D::asyncLoadChecker(float delta)
data->glProgramStates.pushBack(mesh->getGLProgramState());
}
Sprite3DCache::getInstance()->addSprite3DData(_asyncLoadParam.path, data);
Sprite3DCache::getInstance()->addSprite3DData(asyncParam->path, data);
meshdatas = nullptr;
materialdatas = nullptr;
nodeDatas = nullptr;
@ -140,9 +139,13 @@ void Sprite3D::asyncLoadChecker(float delta)
delete meshdatas;
delete materialdatas;
delete nodeDatas;
if (asyncParam->texPath != "")
{
setTexture(asyncParam->texPath);
}
_asyncLoadParam.afterLoadCallback(this);
unschedule(SEL_SCHEDULE(&Sprite3D::asyncLoadChecker));
}
asyncParam->afterLoadCallback(this);
}
}

View File

@ -26,7 +26,6 @@
#define __CCSPRITE3D_H__
#include <unordered_map>
#include <future>
#include "base/CCVector.h"
#include "base/ccTypes.h"
@ -163,7 +162,7 @@ CC_CONSTRUCTOR_ACCESS:
void onAABBDirty() { _aabbDirty = true; }
void asyncLoadChecker(float delta);
void afterAsyncLoad(void* param);
protected:
@ -185,10 +184,10 @@ protected:
struct AsyncLoadParam
{
std::future<void> asyncFuture; // future for load 3d sprite async
std::function<void(Sprite3D*)> afterLoadCallback; // callback after load
bool result; // sprite load result
std::string path;
std::string texPath; //
MeshDatas* meshdatas;
MaterialDatas* materialdatas;
NodeDatas* nodeDatas;

View File

@ -29,6 +29,8 @@ NS_CC_BEGIN
AsyncTaskPool* AsyncTaskPool::s_asyncTaskPool = nullptr;
int AsyncTaskPool::s_maxCallBackPerProcess = -1;
AsyncTaskPool* AsyncTaskPool::getInstance()
{
if (s_asyncTaskPool == nullptr)
@ -44,6 +46,17 @@ void AsyncTaskPool::destoryInstance()
s_asyncTaskPool = nullptr;
}
int AsyncTaskPool::getMaxTaskCallBackPerProcess()
{
return s_maxCallBackPerProcess;
}
void AsyncTaskPool::setMaxTaskCallBackPerProcess(int numTaskCallBack)
{
if (numTaskCallBack > 0 || numTaskCallBack == -1)
s_maxCallBackPerProcess = numTaskCallBack;
}
AsyncTaskPool::AsyncTaskPool()
{

View File

@ -27,6 +27,7 @@ THE SOFTWARE.
#define __CCSYNC_TASK_POOL_H_
#include "platform/CCPlatformMacros.h"
#include "base/CCRef.h"
#include <vector>
#include <queue>
#include <memory>
@ -43,6 +44,8 @@ NS_CC_BEGIN
class AsyncTaskPool
{
public:
typedef std::function<void(void*)> TaskCallBack;
enum class TaskType
{
TASK_IO,
@ -50,12 +53,35 @@ public:
TASK_OTHER,
TASK_MAX_TYPE,
};
/**
* get instance
*/
static AsyncTaskPool* getInstance();
/**
* destroy instance
*/
static void destoryInstance();
/**
* get max number of call back per process, -1 means unlimited (-1 is the default value)
*/
static int getMaxTaskCallBackPerProcess();
/**
* set max number of call back per process, -1 means unlimited
*/
static void setMaxTaskCallBackPerProcess(int numTaskCallBack);
/**
* process the call back after task, this should be called in the main thread
*/
void processTaskCallBack()
{
_taskcallbackDispatcher.update();
}
template<class F, class... Args>
auto enqueue(TaskType type, F&& f, Args&&... args)
auto enqueue(TaskType type, const TaskCallBack& callback, void* callbackParam, F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
CC_CONSTRUCTOR_ACCESS:
@ -63,6 +89,12 @@ CC_CONSTRUCTOR_ACCESS:
~AsyncTaskPool();
protected:
struct AsyncTaskCallBack
{
TaskCallBack callback;
void* callbackParam;
};
// thread tasks internally used
class ThreadTasks {
friend class AsyncTaskPool;
@ -76,7 +108,7 @@ protected:
for(;;)
{
std::function<void()> task;
AsyncTaskCallBack callback;
{
std::unique_lock<std::mutex> lock(this->_queue_mutex);
this->_condition.wait(lock,
@ -84,10 +116,14 @@ protected:
if(this->_stop && this->_tasks.empty())
return;
task = std::move(this->_tasks.front());
callback = std::move(this->_taskCallBacks.front());
this->_tasks.pop();
this->_taskCallBacks.pop();
}
task();
AsyncTaskPool::getInstance()->_taskcallbackDispatcher.enqueue(callback);
//task();
}
}
);
@ -100,10 +136,13 @@ protected:
_thread.join();
}
private:
// need to keep track of thread so we can join them
std::thread _thread;
// the task queue
std::queue< std::function<void()> > _tasks;
std::queue<AsyncTaskCallBack> _taskCallBacks;
// std::queue< AsyncTask > _tasks;
// synchronization
std::mutex _queue_mutex;
@ -111,13 +150,55 @@ protected:
bool _stop;
};
class AfterAsyncTaskDispatcher
{
public:
void update()
{
if (_callBacks.empty())
return;
std::unique_lock<std::mutex> lock(_queue_mutex);
int numCallBack = 0;
for (auto& it : _callBacks) {
if (s_maxCallBackPerProcess != -1 && ++numCallBack > s_maxCallBackPerProcess)
break;
if (it.callback)
{
it.callback(it.callbackParam);
}
}
_callBacks.erase(_callBacks.begin(), _callBacks.begin() + numCallBack);
}
AfterAsyncTaskDispatcher() {}
~AfterAsyncTaskDispatcher() {}
void enqueue(const AsyncTaskCallBack& callback)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
_callBacks.push_back(callback);
}
protected:
std::vector<AsyncTaskCallBack> _callBacks;
std::mutex _queue_mutex;
};
//tasks
ThreadTasks _threadTasks[int(TaskType::TASK_MAX_TYPE)];
//deal task call back
AfterAsyncTaskDispatcher _taskcallbackDispatcher;
static AsyncTaskPool* s_asyncTaskPool;
static int s_maxCallBackPerProcess;
};
template<class F, class... Args>
auto AsyncTaskPool::enqueue(AsyncTaskPool::TaskType type, F&& f, Args&&... args)
auto AsyncTaskPool::enqueue(AsyncTaskPool::TaskType type, const TaskCallBack& callback, void* callbackParam, F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
auto& threadTask = _threadTasks[(int)type];
@ -132,6 +213,7 @@ auto AsyncTaskPool::enqueue(AsyncTaskPool::TaskType type, F&& f, Args&&... args)
auto& stop = threadTask._stop;
auto& tasks = threadTask._tasks;
auto& condition = threadTask._condition;
auto& taskcallbacks = threadTask._taskCallBacks;
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
@ -143,7 +225,11 @@ auto AsyncTaskPool::enqueue(AsyncTaskPool::TaskType type, F&& f, Args&&... args)
return res;
}
AsyncTaskCallBack taskCallBack;
taskCallBack.callback = callback;
taskCallBack.callbackParam = callbackParam;
tasks.emplace([task](){ (*task)(); });
taskcallbacks.emplace(taskCallBack);
}
condition.notify_one();

View File

@ -57,6 +57,7 @@ THE SOFTWARE.
#include "base/CCConsole.h"
#include "base/CCAutoreleasePool.h"
#include "base/CCConfiguration.h"
#include "base/CCAsyncTaskPool.h"
#include "platform/CCApplication.h"
//#include "platform/CCGLViewImpl.h"
@ -266,6 +267,7 @@ void Director::drawScene()
{
_scheduler->update(_deltaTime);
_eventDispatcher->dispatchEvent(_eventAfterUpdate);
AsyncTaskPool::getInstance()->processTaskCallBack();
}
glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT);