Always decompress astc parallel for sotware decoder [ci build]

a. Improve task life cycle management.
b. Fix ci
This commit is contained in:
halx99 2021-07-02 14:20:54 +08:00
parent 26c9365a04
commit eb11fd9a61
1 changed files with 93 additions and 104 deletions

View File

@ -15,6 +15,9 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <memory>
#include <vector>
#include <deque>
#include "astc/astcenc.h" #include "astc/astcenc.h"
#include "astc/astcenc_internal.h" #include "astc/astcenc_internal.h"
#include "yasio/detail/utils.hpp" #include "yasio/detail/utils.hpp"
@ -25,49 +28,47 @@
typedef std::mutex astc_decompress_mutex_t; typedef std::mutex astc_decompress_mutex_t;
struct astc_decompress_task { struct astc_decompress_task {
astc_decompress_task( static std::shared_ptr<astc_decompress_task> allocate(const uint8_t* in, unsigned int inlen, uint8_t* out,
const uint8_t* in, uint8_t* out, unsigned int dim_x, unsigned int dim_y, int block_x, int block_y); unsigned int dim_x, unsigned int dim_y, int block_x, int block_y);
astc_decompress_task() {}
~astc_decompress_task(); ~astc_decompress_task() {
#if !ASTCDEC_NO_CONTEXT
void retain() { if (_context)
++_nref; astcenc_context_free(this->_context);
#endif
} }
void release() { void wait_done() {
if (--_nref == 0) #if ASTCDEC_NO_CONTEXT
delete this; _decompress_pm.wait();
#else
_context->manage_decompress.wait();
#endif
} }
astcenc_config _config{};
const uint8_t* _in_texels = nullptr; const uint8_t* _in_texels = nullptr;
void* _out_texels[1]{}; void* _out_texels[1]{};
uint32_t _out_len = 0; unsigned int _xblocks, _yblocks;
#if ASTCDEC_NO_CONTEXT #if ASTCDEC_NO_CONTEXT
unsigned int _block_x, _block_y;
ParallelManager _decompress_pm{}; ParallelManager _decompress_pm{};
block_size_descriptor _bsd{}; block_size_descriptor _bsd{};
#else #else
astcenc_config _config{};
astcenc_context* _context = nullptr; astcenc_context* _context = nullptr;
#endif #endif
astcenc_image _image_out{}; astcenc_image _image_out{};
std::atomic<int> _nref;
}; };
class astc_decompress_job_manager { class astc_decompress_job_manager {
public: public:
static int s_thread_count;
static astc_decompress_job_manager* get_instance() { static astc_decompress_job_manager* get_instance() {
static astc_decompress_job_manager s_instance; static astc_decompress_job_manager s_instance;
return &s_instance; return &s_instance;
} }
astc_decompress_job_manager() { astc_decompress_job_manager() {
s_thread_count = std::thread::hardware_concurrency(); int thread_count = std::thread::hardware_concurrency();
for (int i = 0; i < thread_count; ++i) {
for (int i = 0; i < s_thread_count; ++i) {
_threads.push_back(std::thread{&astc_decompress_job_manager::run, this}); _threads.push_back(std::thread{&astc_decompress_job_manager::run, this});
} }
} }
@ -75,12 +76,10 @@ public:
~astc_decompress_job_manager() { ~astc_decompress_job_manager() {
_stopped = true; _stopped = true;
_taskQueueMtx.lock(); _task_queue_mtx.lock();
for (auto task : _taskQueue) _task_queue.clear();
task->release(); _task_queue_cv.notify_all();
_taskQueue.clear(); _task_queue_mtx.unlock();
_taskQueueCV.notify_all();
_taskQueueMtx.unlock();
for (auto& t : _threads) { for (auto& t : _threads) {
if (t.joinable()) if (t.joinable())
@ -91,6 +90,31 @@ public:
int decompress_parallel_sync(const uint8_t* in, uint32_t inlen, uint8_t* out, unsigned int dim_x, int decompress_parallel_sync(const uint8_t* in, uint32_t inlen, uint8_t* out, unsigned int dim_x,
unsigned int dim_y, int block_x, int block_y) { unsigned int dim_y, int block_x, int block_y) {
auto task = make_task(in, inlen, out, dim_x, dim_y, block_x, block_y);
if (!task)
return ASTCENC_ERR_OUT_OF_MEM;
_task_queue_mtx.lock();
_task_queue.push_back(task);
_task_queue_mtx.unlock();
_task_queue_cv.notify_all(); // notify all thread to process the single decompress task parallel
task->wait_done();
_task_queue_mtx.lock();
assert(!_task_queue.empty());
auto t = _task_queue.front();
assert(t.get() == task.get());
_task_queue.pop_front();
_task_queue_mtx.unlock();
return ASTCENC_SUCCESS;
}
private:
std::shared_ptr<astc_decompress_task> make_task(const uint8_t* in, unsigned int inlen,
uint8_t* out, unsigned int dim_x, unsigned int dim_y, int block_x, int block_y) {
unsigned int xblocks = (dim_x + block_x - 1) / block_x; unsigned int xblocks = (dim_x + block_x - 1) / block_x;
unsigned int yblocks = (dim_y + block_y - 1) / block_y; unsigned int yblocks = (dim_y + block_y - 1) / block_y;
unsigned int zblocks = 1; // (dim_z + block_z - 1) / block_z; unsigned int zblocks = 1; // (dim_z + block_z - 1) / block_z;
@ -98,86 +122,73 @@ public:
// Check we have enough output space (16 bytes per block) // Check we have enough output space (16 bytes per block)
auto total_blocks = xblocks * yblocks * zblocks; auto total_blocks = xblocks * yblocks * zblocks;
size_t size_needed = total_blocks * 16; size_t size_needed = total_blocks * 16;
if (inlen < size_needed) { if (inlen < size_needed)
return ASTCENC_ERR_OUT_OF_MEM; return nullptr;
}
auto task = std::make_shared<astc_decompress_task>();
task->_in_texels = in;
task->_out_texels[0] = out;
task->_image_out = {dim_x, dim_y, 1, ASTCENC_TYPE_U8, task->_out_texels};
task->_xblocks = xblocks;
task->_yblocks = yblocks;
#if ASTCDEC_NO_CONTEXT #if ASTCDEC_NO_CONTEXT
static std::once_flag once_flag; static std::once_flag once_flag;
std::call_once(once_flag, init_quant_mode_table); std::call_once(once_flag, init_quant_mode_table);
#endif
auto task = new astc_decompress_task(in, out, dim_x, dim_y, block_x, block_y); task->_block_x = block_x;
#if ASTCDEC_NO_CONTEXT task->_block_y = block_y;
init_block_size_descriptor(block_x, block_y, 1, false, 0 /*unused for decompress*/, task->_bsd);
task->_decompress_pm.init(total_blocks); task->_decompress_pm.init(total_blocks);
#else #else
(void) astcenc_config_init(
ASTCENC_PRF_LDR, block_x, block_y, 1, 0, ASTCENC_FLG_DECOMPRESS_ONLY, &task->_config);
(void) astcenc_context_alloc(&task->_config, (unsigned int)_threads.size(), &task->_context);
task->_context->manage_decompress.init(total_blocks); task->_context->manage_decompress.init(total_blocks);
#endif #endif
_taskQueueMtx.lock(); return task;
_taskQueue.push_back(task);
_taskQueueMtx.unlock();
_taskQueueCV.notify_all(); // notify all thread to process the single decompress task parallel
#if ASTCDEC_NO_CONTEXT
task->_decompress_pm.wait();
#else
task->_context->manage_decompress.wait();
#endif
_taskQueueMtx.lock();
assert(!_taskQueue.empty());
auto t = _taskQueue.front();
assert(t == task);
_taskQueue.pop_front();
_taskQueueMtx.unlock();
task->release();
return ASTCENC_SUCCESS;
} }
private:
void run() { void run() {
const astcenc_swizzle swz_decode{ASTCENC_SWZ_R, ASTCENC_SWZ_G, ASTCENC_SWZ_B, ASTCENC_SWZ_A}; const astcenc_swizzle swz_decode{ASTCENC_SWZ_R, ASTCENC_SWZ_G, ASTCENC_SWZ_B, ASTCENC_SWZ_A};
bool no_task_count = false; bool no_task_count = false;
for (;;) { for (;;) {
std::unique_lock<astc_decompress_mutex_t> lck(_taskQueueMtx); std::unique_lock<astc_decompress_mutex_t> lck(_task_queue_mtx);
if (!_stopped && (_taskQueue.empty() || no_task_count)) if (!_stopped && (_task_queue.empty() || no_task_count))
_taskQueueCV.wait(lck); _task_queue_cv.wait(lck);
if (_stopped) if (_stopped)
break; break;
if (_taskQueue.empty()) if (_task_queue.empty())
continue; continue;
auto task = _task_queue.front();
lck.unlock(); // unlock make sure other thread can work for the task lck.unlock(); // unlock make sure other thread can work for the task
auto task = _taskQueue.front();
auto& image_out = task->_image_out; auto& image_out = task->_image_out;
#if ASTCDEC_NO_CONTEXT
unsigned int block_x = task->_block_x;
unsigned int block_y = task->_block_y;
unsigned int block_z = 1; // task->block_z;
auto& bsd = task->_bsd;
auto& decompress_pm = task->_decompress_pm;
#else
unsigned int block_x = task->_config.block_x; unsigned int block_x = task->_config.block_x;
unsigned int block_y = task->_config.block_y; unsigned int block_y = task->_config.block_y;
unsigned int block_z = task->_config.block_z; unsigned int block_z = 1; // task->_config.block_z;
auto& bsd = *task->_context->bsd;
unsigned int xblocks = (image_out.dim_x + block_x - 1) / block_x;
unsigned int yblocks = (image_out.dim_y + block_y - 1) / block_y;
unsigned int zblocks = 1; // (image_out.dim_z + block_z - 1) / block_z;
const auto profile = task->_config.profile;
#if ASTCDEC_NO_CONTEXT
auto& bsd = task->_bsd;
auto& decompress_pm = task->_decompress_pm;
#else
auto& bsd = *task->_context->bsd;
auto& decompress_pm = task->_context->manage_decompress; auto& decompress_pm = task->_context->manage_decompress;
#endif #endif
unsigned int xblocks = task->_xblocks;
unsigned int yblocks = task->_yblocks;
unsigned int zblocks = 1; // (image_out.dim_z + block_z - 1) / block_z;
int row_blocks = xblocks; int row_blocks = xblocks;
int plane_blocks = xblocks * yblocks; int plane_blocks = xblocks * yblocks;
image_block blk; image_block blk;
auto data = task->_in_texels; auto data = task->_in_texels;
for (;;) { // process the task for (;;) { // process the task
unsigned int count = 0; unsigned int count = 0;
@ -202,7 +213,7 @@ private:
physical_to_symbolic(bsd, pcb, scb); physical_to_symbolic(bsd, pcb, scb);
decompress_symbolic_block(profile, bsd, x * block_x, y * block_y, z * block_z, scb, blk); decompress_symbolic_block(ASTCENC_PRF_LDR, bsd, x * block_x, y * block_y, z * block_z, scb, blk);
write_image_block(image_out, blk, bsd, x * block_x, y * block_y, z * block_z, swz_decode); write_image_block(image_out, blk, bsd, x * block_x, y * block_y, z * block_z, swz_decode);
} }
@ -212,36 +223,14 @@ private:
} }
} }
std::deque<astc_decompress_task*> _taskQueue;
astc_decompress_mutex_t _taskQueueMtx;
std::condition_variable_any _taskQueueCV;
std::vector<std::thread> _threads; std::vector<std::thread> _threads;
std::deque<std::shared_ptr<astc_decompress_task>> _task_queue;
astc_decompress_mutex_t _task_queue_mtx;
std::condition_variable_any _task_queue_cv;
bool _stopped = false; bool _stopped = false;
}; };
int astc_decompress_job_manager::s_thread_count = 1;
astc_decompress_task::astc_decompress_task(
const uint8_t* in, uint8_t* out, unsigned int dim_x, unsigned int dim_y, int block_x, int block_y) {
_nref = 1;
this->_in_texels = in;
this->_out_texels[0] = out;
this->_image_out = {dim_x, dim_y, 1, ASTCENC_TYPE_U8, this->_out_texels};
(void) astcenc_config_init(ASTCENC_PRF_LDR, block_x, block_y, 1, 0, ASTCENC_FLG_DECOMPRESS_ONLY, &this->_config);
#if ASTCDEC_NO_CONTEXT
init_block_size_descriptor(block_x, block_y, 1, false, 0 /*unused for decompress*/, this->_bsd);
#else
(void) astcenc_context_alloc(&this->_config, astc_decompress_job_manager::s_thread_count, &this->_context);
#endif
}
astc_decompress_task::~astc_decompress_task() {
#if !ASTCDEC_NO_CONTEXT
astcenc_context_free(this->_context);
#endif
}
int astc_decompress_image(const uint8_t* in, uint32_t inlen, uint8_t* out, uint32_t dim_x, uint32_t dim_y, int astc_decompress_image(const uint8_t* in, uint32_t inlen, uint8_t* out, uint32_t dim_x, uint32_t dim_y,
uint32_t block_x, uint32_t block_y) { uint32_t block_x, uint32_t block_y) {