[C++] Optimisation d'un algorithme de fusion de variables

Le problème exposé dans ce sujet a été résolu.

Bonjour,

J’utilise dans mon code un algorithme destiné à fusionner deux bordures sur une carte se situant sur la même case (même position en X et en Y).

Pour ce faire, j’utilise un vecteur de variables Vector3 : vector<sf::Vector3i> myVec (qui constituent mes bordures) et le code suivant:

for(int i = 0; i < myVec.size(); i++) {
        for(int j = 0; j < myVec.size(); j++) {
            if(i != j && myVec.at(i).position == myVec.at(j).position){
                FusionnerBordures(&myVec, i,j);
            }
        }
}

En gros, pour chaque élement du vector, je parcours l’ensemble du vector et si les deux éléments sont sur la même position, je les fusionne.

Le problème, c’est que pour une grande carte, l’algorithme met beaucoup de temps à s’exécuter, de l’ordre de la seconde…

Ma question : Est-il possible d’optimiser l’algorithme pour réduire significativement le temps d’exécution, ou dois-je limiter les appels à cette fonction pour éviter de faire lagger le programme ?

Merci d’avance pour votre aide.

Humm, tu ne peux pas significativement améliorer ces deux boucles imbriquées. L’ordre de grandeur restera le même.

Par-contre, si on suppose que myVec.at(i).position == myVec.at(j).position == myVec.at(j).position == myVec.at(i).position (La comparaison est commutative, quasiment toujours si tu ne l’as pas explicitement définie autrement) alors tu peux diviser par deux comparaisons.

En effet, dans ce cas là, tu n’as qu’à tester les j supérieurs à i.

Sinon, peut-être que c’est ta fonction FusionnerBordures qui prend du temps.

+0 -0

Salut, comme l’a dit @ache, l’ordre de grandeur restera le même. Cependant, tu peux toujours booster les performances en utilisant l’hyper threading et le parallélisme. Tu peux faire des recherches là dessus sur google, mais pour avoir une idée, ça te donnera quelque chose qui ressemble à ça:

#pragma omp parallel for
for(int i = 0; i < myVec.size(); i++) {
    for(int j = 0; j < myVec.size(); j++) {
        if(i != j && myVec.at(i).position == myVec.at(j).position){
            FusionnerBordures(&myVec, i,j);
        }
    }
}

Si ma mémoire est bonne il faut ensuite compiler avec l’option -fopenmp. Quand c’est correctement utilisé et que ta machine le permet, les gains sont énormes

+0 -0

Tu peux séparer les secondes boucle en 2: ce qu’il y a avant j et ce qu’il y a après pour enlever la condition i != j.

Ensuite, ne surtout pas utiliser at() et passer par []. At() n’a aucun sens, c’est une fonction qui lance une exception lorsque l’indice est en dehors de la borne.

Utiliser des itérateurs et une boucle for-range à sans doute plus de sens ici. Le compilateur a aussi plus d’opportunités d’optimiser un parcours avec itérateur qu’avec indice. Mais cela va dépendre beaucoup de FusionnerBordures.

À quoi ressemble la fonction FusionnerBordures ? Pourquoi donner un pointeur sur myVec ?

Merci pour vos réponses très réactives !

Tu peux séparer les secondes boucle en 2: ce qu’il y a avant j et ce qu’il y a après pour enlever la condition i != j.

Ensuite, ne surtout pas utiliser at() et passer par []. At() n’a aucun sens, c’est une fonction qui lance une exception lorsque l’indice est en dehors de la borne.

Utiliser des itérateurs et une boucle for-range à sans doute plus de sens ici. Le compilateur a aussi plus d’opportunités d’optimiser un parcours avec itérateur qu’avec indice. Mais cela va dépendre beaucoup de FusionnerBordures.

À quoi ressemble la fonction FusionnerBordures ? Pourquoi donner un pointeur sur myVec ?

jo_link_noir

Alors j’ai écrit la fonction pour faire un code minimaliste, en réalité cette "fonction" n’existe pas et le code tient sur deux lignes :

cible->at(bo).z += cible->at(i).z % 100;
cible->erase(cible->begin() + i);

La première ligne fusionne les bordures en additionnant les deux dernières décimales du vecteur z. La seconde ligne supprime la deuxième bordure "fusionnée".

Du coup effectivement je pourrai déjà optimiser en ne regardant que les valeurs de j > i, merci.

Pour le at(), il me semblait que j’étais obligé du fait que j’utilise un pointeur sur mon vecteur, mais je vais voir pour modifier ça, merci.

Pour les itérateurs et la boucle for-range, je n’ai aucune idée de comment ça marche, mais je vais faire mes recherches, merci également.

Salut, comme l’a dit @ache, l’ordre de grandeur restera le même. Cependant, tu peux toujours booster les performances en utilisant l’hyper threading et le parallélisme. Tu peux faire des recherches là dessus sur google, mais pour avoir une idée, ça te donnera quelque chose qui ressemble à ça:

#pragma omp parallel for
for(int i = 0; i < myVec.size(); i++) {
    for(int j = 0; j < myVec.size(); j++) {
        if(i != j && myVec.at(i).position == myVec.at(j).position){
            FusionnerBordures(&myVec, i,j);
        }
    }
}

Si ma mémoire est bonne il faut ensuite compiler avec l’option -fopenmp. Quand c’est correctement utilisé et que ta machine le permet, les gains sont énormes

thibsc

Merci pour cette info je vais creuser, mais cela implique donc que cela ne marchera pas sur toutes les machines ? Cela ne résout donc pas mon problème si un nombre significatif de personnes aura toujours du lag.

Humm, tu ne peux pas significativement améliorer ces deux boucles imbriquées. L’ordre de grandeur restera le même.

Par-contre, si on suppose que myVec.at(i).position == myVec.at(j).position == myVec.at(j).position == myVec.at(i).position (La comparaison est commutative, quasiment toujours si tu ne l’as pas explicitement définie autrement) alors tu peux diviser par deux comparaisons.

En effet, dans ce cas là, tu n’as qu’à tester les j supérieurs à i.

Sinon, peut-être que c’est ta fonction FusionnerBordures qui prend du temps.

ache

Merci beaucoup, comme je disais à jo_link_noir plus haut, j’ai testé en changeant simplement int j = 0 par int j = i, et j’obtiens 3300 bordures fusionnées en 120ms contre 246ms avec l’ancien algo !

Merci beaucoup, comme je disais à jo_link_noir plus haut, j’ai testé en changeant simplement int j = 0 par int j = i, et j’obtiens 3300 bordures fusionnées en 120ms contre 246ms avec l’ancien algo !

TioLouis

L’idée c’est de faire int j = i + 1. Afin de ne pas avoir à faire le test i != j.

C’est effectivement le gain espéré. Une division par 2 du temps de calcul. Comme prévu quoi.


Edit: Si tu modifies la taille du vecteur pendant que tu le parcours, alors tu devrais parcourir ce tableau à l’envers la seconde fois.

+0 -0

L’idée c’est de faire int j = i + 1. Afin de ne pas avoir à faire le test i != j.

C’est effectivement le gain espéré. Une division par 2 du temps de calcul. Comme prévu quoi.


Edit: Si tu modifies la taille du vecteur pendant que tu le parcours, alors tu devrais parcourir ce tableau à l’envers la seconde fois.

ache

Merci pour les précisions, on est passé à 112ms ;)

Non, openMP vient avec le compilateur, si tu en utilise un digne de ce nom, tu l’auras et donc ça devrait fonctionner partout

thibsc

Ok merci pour les infos !

Les problèmes que je vois:

  • at() est mauvais pour les perfs. Même si le compilo voit qu’on est dans les clous, il ne va pas savoir vectoriser — maintenant, on n’est pas vraiment dans des cas où il y beaucoup de place pour bien vectoriser (taille qui change, array of structure).
  • Tous les erase vont provoquer des déplacements, dans le pire des cas un algo va se retrouver en O(N²). C’est pour cela, que l’on dit toujours de préférer l’idiome erase-remove qui fait passer en O(N). Je passerai la boucle interne en partition + remove de fait. (contrairement au erase remove, on a besoin de garder les positions équivalentes pour accumuler sur la 2e passe)
  • A propos, le erase peut faire passer la boucle externe en hors bornes. J’imagine que c’est pour cela que at() a été choisi au lieu de corriger l’algo.
  • Le modulo, il faut le faire avant ou après l’addition? Car là il est fait après.

BTW: pour qu’openmp serve, IIRC, il faut que la quantité d’itérations ne change pas en cours de route: il faut la connaître avant de commencer.

Du coup, quelque chose comme ça (pas testé)

// Pas parallélisable car la taille, et les éléments!, changent
for(std::size_t i = 0; i < myVec.size(); ++i) {
    // Par construction, on peut supposer que la condition ne sera jamais remplie pour i>j vu que l'on a déjà gommé ces positions
    auto const sep = std::partition(&myVec[i+1], &myVec[myVec.size()],
         [vi=myVec[i]](auto const& vj){ return vi.position != vj.position; });
    auto const acc = std::accumulate(sep, myVec.end(), myVec[i].z /* à caster éventuellement en long */, 
         [](auto acc, auto const& vj){ return acc + vj.z; });
    myVec[i].z = acc % 100; // mon hyppothèse est qu'il y avait un bug et que le modulo ne devrait être fait qu'une seule fois; par contre attention si le type de z est de 8 bits
    myVec.erase(sep, myVec.end());
}

42e EDIT: autre possibilité: faire un std::sort() sur les positions et écrire notre propre erase-remove qui écrase et fusionne à la volée (sachant qu’en plus les données sont triées) pour conclure chaque sous séquence de positions identiques par le modulo 100, et faire un seul v.erase(posdernier, v.end()) à la toute fin.

D’un point de vue de la complexité algorithmique, comparer tous les éléments pair à pair est en O(n²)O(n²). Mais il est possible de réduire ça en O(nlog(n))O(n * \log(n)) en triant la liste puisque tous les éléments à fusionner se retrouves les uns à côté des autres. Finalement, il est même possible d’utiliser une hash map pour identifier tous les éléments à fusionner en O(n)O(n).

En revanche, pour trouver la meilleur solution dans ton cas, il faut un peu plus d’information. Est-ce que les éléments de myVec sont coûteux à copier ou déplacer? Est-ce que l’ordre des éléments de myVec est important? Est-ce qu’il y a typiquement beaucoup d’éléments à fusionner en comparaison de la taille de myVec? Est-ce que tu préfères optimiser le cas courant au détriment potentiel de certains cas particuliers (par exemple faire en sorte que ce soit rapide s’il y a peu de fusions au risque que ce soit lent s’il y en a beaucoup à faire)?

+0 -0

Déjà, est-ce que déplacer les éléments de myVec posent problème ? Comme le dit @Berdes en triant les données (O(nlog(n))O(n*log(n))), tu peux ensuite les traités en O(n)O(n). Si déplacer les éléments posent problèmes, ce n’est pas forcément grave si tu peux les copier.

Et c’est tout à fait exacte, tu devrais tout à fait pouvoir utiliser une table de hashage, c’est même la solution que je privilégirais. Mais vu les points que @lmghs et moi avons pointé je pense que tu devrais étudier un peu plus le language avant de te lancer dans une implémentation utilisant une table de hashage.

Globalement, l’idée c’est que position soit une clé de la table de hashage et la valeur serait soit des références (au sens français, pas au sens C++, une liste d’indice) vers les éléments du tableau.
Ensuite comment tu l’utilises dépends de comment tu vois les choses.

Enfin, tu avais tout à fait raison en disant que l’Hyper-threading ne fonctionne pas sur toutes les machines puisque c’est une technologie Intel. Pour la parallélisation avec OpenMP, le gain apporté est d’un certain facteur (donc pas un gain algorithmique) qui peut être en fait une perte sur les ordinateurs ne pouvant pas paralléliser l’action. Pour finir @lmghs a fait remarqué qu’ici étant donné l’action, je doute que OpenMP arrive à paralléliser sans faire d’erreur.

Le modulo, il faut le faire avant ou après l’addition? Car là il est fait après.

C’est le contraire justement non ? Il est fait avant la somme, du coup, il faudrait le faire après.

+0 -0

Les problèmes que je vois:

  • at() est mauvais pour les perfs. Même si le compilo voit qu’on est dans les clous, il ne va pas savoir vectoriser — maintenant, on n’est pas vraiment dans des cas où il y beaucoup de place pour bien vectoriser (taille qui change, array of structure).
  • Tous les erase vont provoquer des déplacements, dans le pire des cas un algo va se retrouver en O(N²). C’est pour cela, que l’on dit toujours de préférer l’idiome erase-remove qui fait passer en O(N). Je passerai la boucle interne en partition + remove de fait. (contrairement au erase remove, on a besoin de garder les positions équivalentes pour accumuler sur la 2e passe)
  • A propos, le erase peut faire passer la boucle externe en hors bornes. J’imagine que c’est pour cela que at() a été choisi au lieu de corriger l’algo.
  • Le modulo, il faut le faire avant ou après l’addition? Car là il est fait après.

BTW: pour qu’openmp serve, IIRC, il faut que la quantité d’itérations ne change pas en cours de route: il faut la connaître avant de commencer.

Du coup, quelque chose comme ça (pas testé)

// Pas parallélisable car la taille, et les éléments!, changent
for(std::size_t i = 0; i < myVec.size(); ++i) {
    // Par construction, on peut supposer que la condition ne sera jamais remplie pour i>j vu que l'on a déjà gommé ces positions
    auto const sep = std::partition(&myVec[i+1], &myVec[myVec.size()],
         [vi=myVec[i]](auto const& vj){ return vi.position != vj.position; });
    auto const acc = std::accumulate(sep, myVec.end(), myVec[i].z /* à caster éventuellement en long */, 
         [](auto acc, auto const& vj){ return acc + vj.z; });
    myVec[i].z = acc % 100; // mon hyppothèse est qu'il y avait un bug et que le modulo ne devrait être fait qu'une seule fois; par contre attention si le type de z est de 8 bits
    myVec.erase(sep, myVec.end());
}

42e EDIT: autre possibilité: faire un std::sort() sur les positions et écrire notre propre erase-remove qui écrase et fusionne à la volée (sachant qu’en plus les données sont triées) pour conclure chaque sous séquence de positions identiques par le modulo 100, et faire un seul v.erase(posdernier, v.end()) à la toute fin.

lmghs

Merci pour ta réponse :

  1. J’utilise .at() tout simplement parce que je pensais que c’était le seul moyen d’accéder à la valeur d’un élément depuis un pointeur… j’utilise donc myVec->at(i)

  2. Je n’avais pas compris cette subtilité. Mais je n’ai jamais réussi à utiliser toutes ces fonctions (accumulate, remove…), c’est du charabia pour moi (comme le code que tu me proposes d’ailleurs, et que je n’arrive pas à compiler). Je vais essayer de prendre le temps pour comprendre ces fonctions mais je coince au niveau des [] return ...

  3. Le modulo est bien à sa place, puisque je cherche à additionner les deux derniers chiffres du .Z, qui correspondent à la bordure :

L’idée est tout simplement d’additionner les 4 bordures de base possibles (1,2,4,8) et d’obtenir la bordure finale (ex : 1 + 2 +8 = 11)

En revanche, pour trouver la meilleur solution dans ton cas, il faut un peu plus d’information. Est-ce que les éléments de myVec sont coûteux à copier ou déplacer? Est-ce que l’ordre des éléments de myVec est important? Est-ce qu’il y a typiquement beaucoup d’éléments à fusionner en comparaison de la taille de myVec? Est-ce que tu préfères optimiser le cas courant au détriment potentiel de certains cas particuliers (par exemple faire en sorte que ce soit rapide s’il y a peu de fusions au risque que ce soit lent s’il y en a beaucoup à faire)?

Berdes

Les éléments sont de simples Vector3 contenant en x la position x, y la position y, et z l’information de la bordure, au format AABBCC, AABB contenant des informations sur la bordure, et CC le type de bordure tel que décrit plus haut, entre 0 et 15. Sur une carte test que j’utilise, j’ai 4586 bordures et l’algorithme procède à 1319 fusions, donc environ 30% des éléments sont fusionnés. L’idée serait de réduire le temps d’exécution pour les grosses cartes ( > 2000 bordures), puisque sur les petites cartes le temps d’exécution est négligeable.

Déjà, est-ce que déplacer les éléments de myVec posent problème ? Comme le dit @Berdes en triant les données (O(nlog(n))O(n*log(n))), tu peux ensuite les traités en O(n)O(n). Si déplacer les éléments posent problèmes, ce n’est pas forcément grave si tu peux les copier.

Et c’est tout à fait exacte, tu devrais tout à fait pouvoir utiliser une table de hashage, c’est même la solution que je privilégirais. Mais vu les points que @lmghs et moi avons pointé je pense que tu devrais étudier un peu plus le language avant de te lancer dans une implémentation utilisant une table de hashage.

Globalement, l’idée c’est que position soit une clé de la table de hashage et la valeur serait soit des références (au sens français, pas au sens C++, une liste d’indice) vers les éléments du tableau.
Ensuite comment tu l’utilises dépends de comment tu vois les choses.

Enfin, tu avais tout à fait raison en disant que l’Hyper-threading ne fonctionne pas sur toutes les machines puisque c’est une technologie Intel. Pour la parallélisation avec OpenMP, le gain apporté est d’un certain facteur (donc pas un gain algorithmique) qui peut être en fait une perte sur les ordinateurs ne pouvant pas paralléliser l’action. Pour finir @lmghs a fait remarqué qu’ici étant donné l’action, je doute que OpenMP arrive à paralléliser sans faire d’erreur.

Le modulo, il faut le faire avant ou après l’addition? Car là il est fait après.

C’est le contraire justement non ? Il est fait avant la somme, du coup, il faudrait le faire après.

ache

L’idée serait donc de trier d’abord les bordures selon leurs positions ? Mais cet algorithme de tri ne va-t-il pas prendre autant de temps ? Est-il possible d’avoir une "clé" à deux éléments ? Puisque pour ma part "position" est en fait constitué par .x et .y

EDIT : Voici un export du vector bordures pour ceux qui ça intéresse : https://we.tl/t-W0o5J4MUVv (Pour tester l’algo il y a une dernière condition que je n’ai pas mentionné : Les 4 premiers chiffres du .z doivent être égaux - cad vec[i].z / 100 == vec[j].z / 100

+0 -0

J’utilise .at() tout simplement parce que je pensais que c’était le seul moyen d’accéder à la valeur d’un élément depuis un pointeur… j’utilise donc myVec->at(i)

Ce sera (*myVec)[i] donc, mais sache que manipuler des vecteurs via des pointeurs est totalement contre nature. Un vecteur est déjà une abstraction de pointeur, et en C++ moins on manipule directement les pointeurs, mieux on se porte en général.

Si c’est pour du passage de paramètre, utilise alors des références. Les vecteurs étant eux crées directement dans leur destination (sur la pile, ou comme membres), sans le moindre appel à new.

(comme le code que tu me proposes d’ailleurs, et que je n’arrive pas à compiler).

Vu la syntaxe que j’ai employée, il faut compiler en C++14. Je vérifie sur godbolt… et pardon c’est ma faute, j’avais mélangé itérateurs avec pointeurs. Cela deviendrait donc https://godbolt.org/z/x693v7xfq Mais ce ne serait pas ce qu’il te faut, cf plus bas.

Je vais essayer de prendre le temps pour comprendre ces fonctions mais je coince au niveau des [] return …

C’est des lambdas, introduites en C++11 (mais que j’utilise en syntaxe 2014 pour me simplifier la vie). Elle sont expliquées sur le tuto de ce site que je ne peux que recommander.

Le modulo est bien à sa place, puisque je cherche à additionner les deux derniers chiffres du .Z

Et sur le Z courant, tu n’appliques pas le modulo? (EDIT: effectivement non!)

Du coup dans mon code il faut virer le modulo final et bouger le modulo dans la lambda d'accumulate. —> https://godbolt.org/z/EWY95db9v

Mais cet algorithme de tri ne va-t-il pas prendre autant de temps ?

Il pendra bien moins de temps AMA que les N² déplacements en moyenne que ton algo actuel déclenche.

Mais… je regarde ton fichier et les points sont déjà triés en X… Comme je disais la fusion peut alors se faire linéairement par adaptation de l’algo std::remove() pour qu’il merge les Z à la volée en plus de seulement bouger le curseur de lecture en cas de valeur équivalente. Je regarde ça sur godbolt et j’éditerai mon message.

EDIT: hop: https://godbolt.org/z/ojvje9nbK (Après je ne l’ai pas testé sur des vraies données; il faudrait des tests unitaires et perfs ensuite)

EDIT2: correction: https://godbolt.org/z/rKeTThqd8

EDIT3: https://godbolt.org/z/oahMWPrro

EDIT4: On va y arriver: https://godbolt.org/z/M78sYcPff et j’avais oublié un +1 pour la partition

Alors là je suis impressionné… L’algorithme s’exécute en moins de 2ms pour 3300 bordures et tout semble fonctionner parfaitement.

Merci beaucoup je vais essayer de bien comprendre ce code, tes différents edits me permettent déjà de mieux comprendre les erreurs commises et donc le fonctionnement de tout ça ! Honnêtement je n’aurais jamais imaginé qu’il était possible d’optimiser à ce point, bravo…

Si c’est pour du passage de paramètre, utilise alors des références. Les vecteurs étant eux crées directement dans leur destination (sur la pile, ou comme membres), sans le moindre appel à new.

Merci, je pensais qu’il s’agissait plus ou moins de la même chose, j’aurais appris beaucoup aujourd’hui :)

{EDIT} Accessoirement, si tu as la garantie que les positions identiques sont contiguës dans le vecteur, nul besoin de trier. {/}

{EDIT2}Il devrait être encore possible d’aller un micro chouilla plus vite en n’encodant pas les informations dans une base 100, mais une base puissance de 2. Genre, réserve un octet entier pour la partie que tu fusionnes. Ainsi, au lieu d’un modulo par 100 (qui est cher), on peut faire un modulo 256, que le compilation traduira par un simple masque avec 0xff qui aura un prix négligeable.) Tu le sentiras potentiellement plus ailleurs qu’ici. {/}

Pour info, et montrer d’autres outils encore: Avec un passe de google.benchmark, cela donne sur ma machine (-O3 -march=native), code sur le dernier godbolt

-----------------------------------------------------------
Benchmark                 Time             CPU   Iterations
-----------------------------------------------------------
InitialVersion      5195473 ns      5195690 ns          108
WithPartition        581241 ns       581261 ns         1178
WithSortAndMerge      21154 ns        21155 ns        33352

(Soit un x33 par rapport à l’algo du départ sur les données que tu avais partagées, sur ma machine.)

Avec le code de bench supplémentaire:

auto position = [](vect_t const& v, std::size_t i) {
    return v.at(i).x * 256 + v.at(i).y;
};

void v0(vect_t & myVec) {
    for(int i = 0; i < myVec.size(); i++) {
        for(int j = 0; j < myVec.size(); j++) {
            if(i != j && position(myVec, i) == position(myVec, j)){
                myVec.at(i).z += myVec.at(j).z % 100;
                myVec.erase(myVec.begin()+j);
            }
        }
    }
}

static void InitialVersion(benchmark::State& state) {
  auto vect = V;
  for (auto _ : state) {
    v0(vect);
    // Make sure the variable is not optimized away by compiler
    benchmark::DoNotOptimize(vect);
  }
}
BENCHMARK(InitialVersion);


static void WithPartition(benchmark::State& state) {
  auto vect = V;
  for (auto _ : state) {
    v1(vect);
    // Make sure the variable is not optimized away by compiler
    benchmark::DoNotOptimize(vect);
  }
}
BENCHMARK(WithPartition);

static void WithSortAndMerge(benchmark::State& state) {
  auto vect = V;
  for (auto _ : state) {
    v2(vect);
    // Make sure the variable is not optimized away by compiler
    benchmark::DoNotOptimize(vect);
  }
}
BENCHMARK(WithSortAndMerge);

BENCHMARK_MAIN();
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte