import logging class GraphStructException(Exception): pass class Noeud(object): def __init__(self, donnees): self.nom = donnees['nom'] self.donnees = donnees self.type = donnees['type'] self.assoc_connecteurs = {} def __str__(self): return f"" def set_branche(self, direction, branche): # Associe la branche correspondant à un connecteur if direction in self.assoc_connecteurs: self.assoc_connecteurs[direction].append(branche) else: self.assoc_connecteurs[direction] = [branche] def get_branches(self, direction): return self.assoc_connecteurs[direction] def get_branche(self, direction): c = self.get_branches(direction) if len(c) == 1: return c[0] else: raise GraphStructException("Invalid number of connectors") def remove_branche(self, branche, direction): self.assoc_connecteurs[direction].remove(branche) class Arete(object): def __init__(self, amont, amont_dir, aval, aval_dir): self.amont = amont self.amont_dir = amont_dir self.aval = aval self.aval_dir = aval_dir def deconnecte(self): self.amont.remove_branche(self, self.amont_dir) self.aval.remove_branche(self, self.aval_dir) self.amont = None self.aval = None self.amont_dir = None self.aval_dir = None def reconnecte_noeud(self, nsrc, ndir_src, ndest, ndir_dest): """ Déconnecte le noeud source, et le remplace par le noeud dest """ if nsrc == self.amont and self.amont_dir == ndir_src: self.amont.remove_branche(self, self.amont_dir) self.amont = ndest self.amont_dir = ndir_dest ndest.set_branche(ndir_dest,self) elif nsrc == self.aval and self.aval_dir == ndir_src: self.aval.remove_branche(self, self.aval_dir) self.aval = ndest self.aval_dir = ndir_dest ndest.set_branche(ndir_dest,self) else: raise GraphStructException def get_neighbour(self, noeud, direction): if self.amont == noeud and self.amont_dir == direction: return self.aval, self.aval_dir elif self.aval == noeud and self.aval_dir == direction: return self.amont, self.amont_dir else: raise GraphStructException("No such neighbour") def get_amont(self): return self.amont, self.amont_dir def get_aval (self): return self.aval, self.aval_dir class Branche(Arete): def __init__(self, amont, amont_dir, aval, aval_dir, composants=None): super().__init__(amont, amont_dir, aval, aval_dir) self.composants = [] if composants is None else composants class Condensateur(object): """ Un condensateur correspond en fait à l'objet connecteur câblé à deux noeuds amonts et aval C'est donc "à la fois un noeud et deux branches" """ def __init__(self, donnees): self.nom = donnees['nom'] self.donnees = donnees self.type = donnees['type'] self.amont = None self.aval = None self.amont_dir = None # le connecteur du noeud amont auquel on est raccordé self.aval_dir = None # le connecteur du noeud aval auquel on est raccordé def get_amont(self): return self.amont, self.amont_dir def get_aval (self): return self.aval, self.aval_dir class SchemaReader(object): """ A la lecture du graphe, tous les objets sont des noeuds et ils sont raccordés par des branches. La simplification permet de supprimer les coudes. La réduction des branches permet de transformer une série de branches et de composants en une seule branche contentant les composants. """ REDUCTIBLE = ["ContactRepos", "Relais", "Tempo", "ContactTravail", "Bouton", "Resistance", "Diode", "Lampe", "RelaisCote", "Levier"] RIEN_A_FAIRE = ["P24", "P0", "Pulse", "Noeud", "Condensateur", "ContactBasculeur", "ContactDouble"] ID_NOM = -1 def __init__(self, d): self.noeuds = {} self.condensateurs = {} self.branches = [] self.composants = None self.lire_donnees(d) @classmethod def nouveau_nom(cls, prefix=""): """ Crée un nouveau nom """ cls.ID_NOM += 1 return f"@{prefix}{cls.ID_NOM}" def connecte(self, n1, d1, n2, d2, composants=None): """ Crée une branche et connecte deux noeuds avec """ a = Branche(n1, d1, n2, d2, composants=composants) self.branches.append(a) n1.set_branche(d1, a) n2.set_branche(d2, a) def connecte_condensateur_amont(self, cond, n1, d1): cond.amont = n1 cond.amont_dir = d1 def connecte_condensateur_aval(self, cond, n1, d1): cond.aval = n1 cond.aval_dir = d1 def remove_branche(self, branche): self.branches.remove(branche) branche.deconnecte() return branche.composants def remove_noeud(self, noeud): del self.noeuds[noeud] def lire_donnees(self, d): """ Lit le dictionnaire contenant le schéma """ # On commence par lire les noeuds try: for nom, donnees in d['blocs'].items(): donnees['nom'] = nom self.noeuds[nom] = Noeud(donnees) # On lit (et lie) ensuite les câbles for amont, dir_amont, aval, dir_aval in d['cables']: self.connecte(self.noeuds[amont], dir_amont, self.noeuds[aval], dir_aval) except KeyError: raise GraphStructException def traiter_composites(self): """ Traite les noeuds composites (contacts doubles, relais basculeurs, etc.) et les transforme en objets simples. """ noeuds_a_supprimer = [] noeuds_a_ajouter = [] for nom, noeud in self.noeuds.items(): # Parcoure les noeuds en cherchant les noeuds composites if noeud.type == "RelaisCote": # On transforme en relais simple noeud.type = "Relais" if 'orientation' not in noeud.donnees or noeud.donnees['orientation'] == 0: b_hg = noeud.get_branche("hg") b_bg = noeud.get_branche("bg") b_hg.reconnecte_noeud(noeud, "hg", noeud, "g") b_bg.reconnecte_noeud(noeud, "bg", noeud, "d") elif noeud.donnees['orientation'] == 180: b_hd = noeud.get_branche("hd") b_bd = noeud.get_branche("bd") b_hd.reconnecte_noeud(noeud, "hd", noeud, "g") b_bd.reconnecte_noeud(noeud, "bd", noeud, "d") else: assert False, "Rien à faire là" elif noeud.type == "RelaisBasculeur": noeuds_a_supprimer.append(nom) rel_g = Noeud({'nom': noeud.nom + ".gauche", 'type': 'Relais', 'orientation': noeud.donnees['orientation'], 'basculeur': 'gauche'}) rel_d = Noeud({'nom': noeud.nom + ".droite", 'type': 'Relais', 'orientation': noeud.donnees['orientation'], 'basculeur': 'droite'}) noeuds_a_ajouter.append((noeud.nom + ".gauche", rel_g)) noeuds_a_ajouter.append((noeud.nom + ".droite", rel_d)) # On récupère les branches br_hg = noeud.get_branche("hg") br_bg = noeud.get_branche("bg") br_hd = noeud.get_branche("hd") br_bd = noeud.get_branche("bd") # Puis les voisins vois_hg, vois_hg_dir = br_hg.get_neighbour(noeud, "hg") vois_bg, vois_bg_dir = br_bg.get_neighbour(noeud, "bg") vois_hd, vois_hd_dir = br_hd.get_neighbour(noeud, "hd") vois_bd, vois_bd_dir = br_bd.get_neighbour(noeud, "bd") # On supprime les branches cmp_hg = self.remove_branche(br_hg) cmp_bg = self.remove_branche(br_bg) cmp_hd = self.remove_branche(br_hd) cmp_bd = self.remove_branche(br_bd) # On lie les nouveaux noeuds self.connecte(rel_g, "g", vois_hg, vois_hg_dir, cmp_hg) self.connecte(rel_g, "d", vois_bg, vois_bg_dir, cmp_bg) self.connecte(rel_d, "g", vois_hd, vois_hd_dir, cmp_hd) self.connecte(rel_d, "d", vois_bd, vois_bd_dir, cmp_bd) [self.remove_noeud(n) for n in noeuds_a_supprimer] [self.noeuds.__setitem__(nom, valeur) for nom, valeur in noeuds_a_ajouter] def simplifier(self): """ Simplifie le graphe : - supprime les coudes """ # Supprime les coudes coudes = [] for n in self.noeuds.values(): if n.type == 'Coude': try: a1, a2 = n.get_branches("m") except ValueError: raise GraphStructException("Un coude doit avoir deux voisins") # On récupère les voisins v1, d1 = a1.get_neighbour(n, "m") v2, d2 = a2.get_neighbour(n, "m") comp1 = self.remove_branche(a1) comp2 = self.remove_branche(a2) self.connecte(v1, d1, v2, d2, comp1 + comp2) coudes.append(n) [self.remove_noeud(n.nom) for n in coudes] def traiter_condensateurs(self): cond_list = [] for nom, noeud in self.noeuds.items(): if noeud.type == "Condensateur": branche_g = noeud.get_branche("g") branche_d = noeud.get_branche("d") n1 = self.nouveau_nom() nn_amont = Noeud({'nom': n1, 'type': 'Noeud'}) cond_list.append(nn_amont) n2 = self.nouveau_nom() nn_aval = Noeud({'nom': n2, 'type': 'Noeud'}) cond_list.append(nn_aval) cond = Condensateur(noeud.donnees) self.condensateurs[nom] = cond cond.amont = nn_amont cond.aval = nn_aval # On raccorde le nouveau noeud amont au voisin amont initial voisin_g, dir_g = branche_g.get_neighbour(noeud, "g") composants_g = self.remove_branche(branche_g) self.connecte(voisin_g, dir_g, nn_amont, "m", composants_g) # Le noeud nouveau noeud amont au condensateur self.connecte_condensateur_amont(cond, nn_amont, "m") # Le nouveau noeud aval au condensateur self.connecte_condensateur_aval(cond, nn_aval, "m") # On raccorde le nouveau noeud aval à la branche aval initiale voisin_d, dir_d = branche_d.get_neighbour(noeud, "d") composants_d = self.remove_branche(branche_d) self.connecte(voisin_d, dir_d, nn_aval, "m", composants_d) for c in cond_list: self.noeuds[c.nom] = c def reduire_branches(self): noeuds_a_supprimer = [] for nom, noeud in self.noeuds.items(): if noeud.type in self.REDUCTIBLE: # Le composant est stockable dans la branche (composant simple en série) b_gauche = noeud.get_branche("g") b_droite = noeud.get_branche("d") voisin_g, voisin_g_dir = b_gauche.get_neighbour(noeud, "g") voisin_d, voisin_d_dir = b_droite.get_neighbour(noeud, "d") comp_br_gauche = self.remove_branche(b_gauche) comp_br_droite = self.remove_branche(b_droite) self.connecte(voisin_g, voisin_g_dir, voisin_d, voisin_d_dir, comp_br_gauche + [noeud] + comp_br_droite) noeuds_a_supprimer.append(nom) if noeud.type == "Diode" and noeud.donnees['orientation'] == 180: noeud.nom = "!" + noeud.nom elif noeud.type == "Coude": assert False, "Les coudes auraient dûs être éliminés !" elif noeud.type in self.RIEN_A_FAIRE: pass else: raise GraphStructException("Type inconnu : " + noeud.type) for n in noeuds_a_supprimer: del self.noeuds[n] def make_liste_composants(self): self.composants = {} for b in self.branches: for c in b.composants: self.composants[c.nom] = c def process(self): """ Méthode principale de traitement du graphe """ try: self.simplifier() self.traiter_composites() self.traiter_condensateurs() self.reduire_branches() self.make_liste_composants() except KeyError: logging.exception("Erreur de chaînage dans le graphe") raise GraphStructException