Bonjour a tous.
Je travaille en ce moment sur osgEarth et en particulier sur le code de threading (header, source) de ce dernier qui deadlock dans l’environnement que j’utilise (MSYS2 sur Windows 10, g++ 8.2), ticket ici.
Selon moi, le code a selon moi des soucis de synchro (race condition sur _done
, modification de _done
alors qu’utilisée dans un le prédicat de _block.wait
sans tenir le mutex correspondant), et de manière générale, le code ne m’inspire pas confiance (euphémisme).
J’ai doc ré-ecrit une classe qui pour offrir une abstraction que je trouve plus propre
struct SynchronizedPriorityQueuedJob {
template<class ...Ts>
void emplace(Ts... args) {
std::lock_guard<Mutex> lock(_queueMutex);
_queue.emplace(std::forward<Ts>(args)...);
_block.notify_one();
}
bool interrupt_pop(QueuedJob& next) {
std::unique_lock<Mutex> lk(_queueMutex);
_block.wait(lk,[this]{
return !_queue.empty() || _done;});
if(_done) {
return false;
}
next = std::move(_queue.top());
_queue.pop();
return true;
}
QueuedJob wait_and_pop() {
std::unique_lock<Mutex> lk(_queueMutex);
_block.wait(lk,[this]{return !_queue.empty();});
QueuedJob next = std::move(_queue.top());
_queue.pop();
return next;
}
bool try_pop(QueuedJob& next) {
std::unique_lock<Mutex> lk(_queueMutex);
if(_queue.empty()){
return false;
}
next = std::move(_queue.top());
_queue.pop();
return true;
}
bool empty() const
{
std::lock_guard<Mutex> lk(_queueMutex);
return _queue.empty();
}
int size() const
{
std::lock_guard<Mutex> lk(_queueMutex);
return _queue.size();
}
void stop() {
std::lock_guard<Mutex> lk(_queueMutex);
_done = true;
}
bool running() const{
std::lock_guard<Mutex> lk(_queueMutex);
return !_done;
}
private:
std::priority_queue<QueuedJob> _queue;
mutable Mutex _queueMutex;
std::condition_variable_any _block;
bool _done{false};
};
L’utilisation ressemble a ceci (je vous passe les details, le commit complet est ici pour les curieux/curieuses)
void JobArena::dispatch(const Job& job, Delegate& delegate)
{
// autre trucs
if (_targetConcurrency > 0)
{
_queue.emplace(job, delegate, sema);
// autre trucs
}
// divers
}
void ThreadPool::runJobs()
{
while (_queue.running())
{
// autre trucs
QueuedJob next;
if (_queue.interrupt_pop(next)) {
auto t0 = std::chrono::steady_clock::now();
bool job_executed = next._delegate();
auto duration = std::chrono::steady_clock::now() - t0;
// autre trucs
}
}
void
JobArena::startThreads()
{
std::cerr << "Arena \"" << _name << "\" concurrency=" << _targetConcurrency << std::endl;
// Not enough? Start up more
while(_metrics->concurrency < _targetConcurrency)
{
_threads.push_back(std::thread([this]
{
// concurrency est un atomic<int>
_metrics->concurrency++;
OE_THREAD_NAME(_name.c_str());
runJobs();
std::cerr << "Thread " << std::this_thread::get_id() << " end loop" << std::endl;
}
));
}
}
void ThreadPool::stopThreads()
{
_queue.stop();
while(!_queue.empty()){
QueuedJob job;
if(_queue.try_pop(job)){
// use job
}
}
}
Et apres tout ca, on en arrive a ma question: est ce que c’est juste d’un point de vu threading/synchronisation / utilisation (*) ? C’est pas un domaine dans lequel je travail tous les jours et c’est facile de se rater, alors je prends volontiers vos avis et retours, merci .
David
(*) Je sais que la boucle de création des threads est toute bof, il faut aussi la changer.