Justesse code multi threade

a marqué ce sujet comme résolu.

Bonjour a tous.

Je travaille en ce moment sur osgEarth et en particulier sur le code de threading (header, source) de ce dernier qui deadlock dans l’environnement que j’utilise (MSYS2 sur Windows 10, g++ 8.2), ticket ici.

Selon moi, le code a selon moi des soucis de synchro (race condition sur _done, modification de _done alors qu’utilisée dans un le prédicat de _block.wait sans tenir le mutex correspondant), et de manière générale, le code ne m’inspire pas confiance (euphémisme).

J’ai doc ré-ecrit une classe qui pour offrir une abstraction que je trouve plus propre

	struct SynchronizedPriorityQueuedJob {

	    template<class ...Ts>
	    void emplace(Ts... args) {
		std::lock_guard<Mutex> lock(_queueMutex);
		_queue.emplace(std::forward<Ts>(args)...);
		_block.notify_one();
	    }

	    bool interrupt_pop(QueuedJob& next) {
		std::unique_lock<Mutex> lk(_queueMutex);
		_block.wait(lk,[this]{
		    return !_queue.empty() || _done;});
		if(_done) {
		    return false;
		}
		next = std::move(_queue.top());
		_queue.pop();
		return true;
	    }

	    QueuedJob wait_and_pop() {
		std::unique_lock<Mutex> lk(_queueMutex);
		_block.wait(lk,[this]{return !_queue.empty();});

		QueuedJob next = std::move(_queue.top());
		_queue.pop();
		return next;
	    }

	    bool try_pop(QueuedJob& next) {
		std::unique_lock<Mutex> lk(_queueMutex);
		if(_queue.empty()){
		    return false;
		}

		next = std::move(_queue.top());
		_queue.pop();
		return true;
	    }

	    bool empty() const
	    {
		std::lock_guard<Mutex> lk(_queueMutex);
		return _queue.empty();
	    }
	    int size() const
	    {
		std::lock_guard<Mutex> lk(_queueMutex);
		return _queue.size();
	    }

	    void stop() {
		std::lock_guard<Mutex> lk(_queueMutex);
		_done = true;
	    }
	    bool running() const{
		std::lock_guard<Mutex> lk(_queueMutex);
		return !_done;
	    }
	private:
	    std::priority_queue<QueuedJob> _queue;
	    mutable Mutex _queueMutex;
	    std::condition_variable_any _block;
	    bool _done{false};
	};

L’utilisation ressemble a ceci (je vous passe les details, le commit complet est ici pour les curieux/curieuses)


void JobArena::dispatch(const Job& job, Delegate& delegate)
{
	// autre trucs
	if (_targetConcurrency > 0)
        {
		_queue.emplace(job, delegate, sema);
		// autre trucs
        }
	// divers
}

void ThreadPool::runJobs()
{
    while (_queue.running())
	{
	    // autre trucs
	    QueuedJob next;
	    if (_queue.interrupt_pop(next)) {

		auto t0 = std::chrono::steady_clock::now();
		bool job_executed = next._delegate();
		auto duration = std::chrono::steady_clock::now() - t0;
		// autre trucs
	}
}

void
JobArena::startThreads()
{

    std::cerr << "Arena \"" << _name << "\" concurrency=" << _targetConcurrency << std::endl;
	
    // Not enough? Start up more
    while(_metrics->concurrency < _targetConcurrency)
    {
        _threads.push_back(std::thread([this]
            {
                // concurrency est un atomic<int> 
		_metrics->concurrency++;
                OE_THREAD_NAME(_name.c_str());
		runJobs();
		std::cerr << "Thread " << std::this_thread::get_id() << " end loop" << std::endl;
            }
        ));
    }
}
 

void ThreadPool::stopThreads()
{
    _queue.stop();
    while(!_queue.empty()){
		QueuedJob job;
		if(_queue.try_pop(job)){ 
                       // use job
		}
    }
}

Et apres tout ca, on en arrive a ma question: est ce que c’est juste d’un point de vu threading/synchronisation / utilisation (*) ? C’est pas un domaine dans lequel je travail tous les jours et c’est facile de se rater, alors je prends volontiers vos avis et retours, merci :) .

David

(*) Je sais que la boucle de création des threads est toute bof, il faut aussi la changer.

+0 -0

Bonjour David,

Quand tu fais du std::cerr avec des threads, toujours pousser l’intégralité de ce qui doit être printé en un seul morceau, '\n' compris, avec un std::flush à la fin, sinon tu vas avoir des surprises avec des logs tout emmêlés et des retours à la ligne au mauvais endroit (typiquement tu sauras pas quel thread a printé quoi).

std::cerr << "Thread " + std::to_string(std::this_thread::get_id()) + " end loop\n" << std::flush;

Pour le reste, rien ne me choque au premier coup d’œil…

+1 -0

Salut,

Bonjour David,

Quand tu fais du std::cerr avec des threads, toujours pousser l’intégralité de ce qui doit être printé en un seul morceau, '\n' compris, avec un std::flush à la fin, sinon tu vas avoir des surprises avec des logs tout emmêlés et des retours à la ligne au mauvais endroit (typiquement tu sauras pas quel thread a printé quoi).

std::cerr << "Thread " + std::to_string(std::this_thread::get_id()) + " end loop\n" << std::flush;

Pour le reste, rien ne me choque au premier coup d’œil…

germinolegrand

oui, j’ai 2/3 endroits ou les logs se mélangent a la marge, mais ca va sauter a la fin, donc çà va ^^.

Bonjour,

La gestion parait correcte. Je vois juste un problème avec la fonction stop(), il faut ajouter un _block.notify_all(), sans cela les threads qui sont en attente au moment du stop() ne détecteront pas le changement et ne s’arrêteront pas.

dalfab

Merci. J’ai tellement la tête dans le guidon que j’y ait même pas pense >< … Comment ca se fait que ca ne coince pas alors ? Coup de chance tous les threads sont en train de bosser ?

Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte