cocosim_python/modele/donnees.py

356 lines
13 KiB
Python

import logging
class GraphStructException(Exception):
pass
class Noeud(object):
def __init__(self, donnees):
self.nom = donnees['nom']
self.donnees = donnees
self.type = donnees['type']
self.assoc_connecteurs = {}
def __str__(self):
return f"<Noeud : {self.nom}, {self.type}>"
def set_branche(self, direction, branche):
# Associe la branche correspondant à un connecteur
if direction in self.assoc_connecteurs:
self.assoc_connecteurs[direction].append(branche)
else:
self.assoc_connecteurs[direction] = [branche]
def get_branches(self, direction):
return self.assoc_connecteurs[direction]
def get_branche(self, direction):
c = self.get_branches(direction)
if len(c) == 1:
return c[0]
else:
raise GraphStructException("Invalid number of connectors")
def remove_branche(self, branche, direction):
self.assoc_connecteurs[direction].remove(branche)
class Arete(object):
def __init__(self, amont, amont_dir, aval, aval_dir):
self.amont = amont
self.amont_dir = amont_dir
self.aval = aval
self.aval_dir = aval_dir
def deconnecte(self):
self.amont.remove_branche(self, self.amont_dir)
self.aval.remove_branche(self, self.aval_dir)
self.amont = None
self.aval = None
self.amont_dir = None
self.aval_dir = None
def reconnecte_noeud(self, nsrc, ndir_src, ndest, ndir_dest):
"""
Déconnecte le noeud source, et le remplace par le noeud dest
"""
if nsrc == self.amont and self.amont_dir == ndir_src:
self.amont.remove_branche(self, self.amont_dir)
self.amont = ndest
self.amont_dir = ndir_dest
ndest.set_branche(ndir_dest,self)
elif nsrc == self.aval and self.aval_dir == ndir_src:
self.aval.remove_branche(self, self.aval_dir)
self.aval = ndest
self.aval_dir = ndir_dest
ndest.set_branche(ndir_dest,self)
else:
raise GraphStructException
def get_neighbour(self, noeud, direction):
if self.amont == noeud and self.amont_dir == direction:
return self.aval, self.aval_dir
elif self.aval == noeud and self.aval_dir == direction:
return self.amont, self.amont_dir
else:
raise GraphStructException("No such neighbour")
def get_amont(self):
return self.amont, self.amont_dir
def get_aval (self):
return self.aval, self.aval_dir
class Branche(Arete):
def __init__(self, amont, amont_dir, aval, aval_dir, composants=None):
super().__init__(amont, amont_dir, aval, aval_dir)
self.composants = [] if composants is None else composants
class Condensateur(object):
"""
Un condensateur correspond en fait à l'objet connecteur câblé à deux noeuds amonts et aval
C'est donc "à la fois un noeud et deux branches"
"""
def __init__(self, donnees):
self.nom = donnees['nom']
self.donnees = donnees
self.type = donnees['type']
self.amont = None
self.aval = None
self.amont_dir = None # le connecteur du noeud amont auquel on est raccordé
self.aval_dir = None # le connecteur du noeud aval auquel on est raccordé
def get_amont(self):
return self.amont, self.amont_dir
def get_aval (self):
return self.aval, self.aval_dir
class SchemaReader(object):
"""
A la lecture du graphe, tous les objets sont des noeuds et ils sont raccordés par des branches.
La simplification permet de supprimer les coudes.
La réduction des branches permet de transformer une série de branches et de composants en une seule branche
contentant les composants.
"""
REDUCTIBLE = ["ContactRepos", "Relais", "Tempo", "ContactTravail", "Bouton", "Resistance",
"Diode", "Lampe", "RelaisCote", "Levier"]
RIEN_A_FAIRE = ["P24", "P0", "Pulse", "Noeud", "Condensateur", "ContactBasculeur", "ContactDouble"]
ID_NOM = -1
def __init__(self, d):
self.noeuds = {}
self.condensateurs = {}
self.branches = []
self.composants = None
self.lire_donnees(d)
@classmethod
def nouveau_nom(cls, prefix=""):
"""
Crée un nouveau nom
"""
cls.ID_NOM += 1
return f"@{prefix}{cls.ID_NOM}"
def connecte(self, n1, d1, n2, d2, composants=None):
"""
Crée une branche et connecte deux noeuds avec
"""
a = Branche(n1, d1, n2, d2, composants=composants)
self.branches.append(a)
n1.set_branche(d1, a)
n2.set_branche(d2, a)
def connecte_condensateur_amont(self, cond, n1, d1):
cond.amont = n1
cond.amont_dir = d1
def connecte_condensateur_aval(self, cond, n1, d1):
cond.aval = n1
cond.aval_dir = d1
def remove_branche(self, branche):
self.branches.remove(branche)
branche.deconnecte()
return branche.composants
def remove_noeud(self, noeud):
del self.noeuds[noeud]
def lire_donnees(self, d):
"""
Lit le dictionnaire contenant le schéma
"""
# On commence par lire les noeuds
try:
for nom, donnees in d['blocs'].items():
donnees['nom'] = nom
self.noeuds[nom] = Noeud(donnees)
# On lit (et lie) ensuite les câbles
for amont, dir_amont, aval, dir_aval in d['cables']:
self.connecte(self.noeuds[amont], dir_amont, self.noeuds[aval], dir_aval)
except KeyError:
raise GraphStructException
def traiter_composites(self):
"""
Traite les noeuds composites (contacts doubles, relais basculeurs, etc.) et les transforme en objets simples.
"""
noeuds_a_supprimer = []
noeuds_a_ajouter = []
for nom, noeud in self.noeuds.items():
# Parcoure les noeuds en cherchant les noeuds composites
if noeud.type == "RelaisCote":
# On transforme en relais simple
noeud.type = "Relais"
if 'orientation' not in noeud.donnees or noeud.donnees['orientation'] == 0:
b_hg = noeud.get_branche("hg")
b_bg = noeud.get_branche("bg")
b_hg.reconnecte_noeud(noeud, "hg", noeud, "g")
b_bg.reconnecte_noeud(noeud, "bg", noeud, "d")
elif noeud.donnees['orientation'] == 180:
b_hd = noeud.get_branche("hd")
b_bd = noeud.get_branche("bd")
b_hd.reconnecte_noeud(noeud, "hd", noeud, "g")
b_bd.reconnecte_noeud(noeud, "bd", noeud, "d")
else:
assert False, "Rien à faire là"
elif noeud.type == "RelaisBasculeur":
noeuds_a_supprimer.append(nom)
rel_g = Noeud({'nom': noeud.nom + ".gauche", 'type': 'Relais',
'orientation': noeud.donnees['orientation'], 'basculeur': 'gauche'})
rel_d = Noeud({'nom': noeud.nom + ".droite", 'type': 'Relais',
'orientation': noeud.donnees['orientation'], 'basculeur': 'droite'})
noeuds_a_ajouter.append((noeud.nom + ".gauche", rel_g))
noeuds_a_ajouter.append((noeud.nom + ".droite", rel_d))
# On récupère les branches
br_hg = noeud.get_branche("hg")
br_bg = noeud.get_branche("bg")
br_hd = noeud.get_branche("hd")
br_bd = noeud.get_branche("bd")
# Puis les voisins
vois_hg, vois_hg_dir = br_hg.get_neighbour(noeud, "hg")
vois_bg, vois_bg_dir = br_bg.get_neighbour(noeud, "bg")
vois_hd, vois_hd_dir = br_hd.get_neighbour(noeud, "hd")
vois_bd, vois_bd_dir = br_bd.get_neighbour(noeud, "bd")
# On supprime les branches
cmp_hg = self.remove_branche(br_hg)
cmp_bg = self.remove_branche(br_bg)
cmp_hd = self.remove_branche(br_hd)
cmp_bd = self.remove_branche(br_bd)
# On lie les nouveaux noeuds
self.connecte(rel_g, "g", vois_hg, vois_hg_dir, cmp_hg)
self.connecte(rel_g, "d", vois_bg, vois_bg_dir, cmp_bg)
self.connecte(rel_d, "g", vois_hd, vois_hd_dir, cmp_hd)
self.connecte(rel_d, "d", vois_bd, vois_bd_dir, cmp_bd)
[self.remove_noeud(n) for n in noeuds_a_supprimer]
[self.noeuds.__setitem__(nom, valeur) for nom, valeur in noeuds_a_ajouter]
def simplifier(self):
"""
Simplifie le graphe :
- supprime les coudes
"""
# Supprime les coudes
coudes = []
for n in self.noeuds.values():
if n.type == 'Coude':
try:
a1, a2 = n.get_branches("m")
except ValueError:
raise GraphStructException("Un coude doit avoir deux voisins")
# On récupère les voisins
v1, d1 = a1.get_neighbour(n, "m")
v2, d2 = a2.get_neighbour(n, "m")
comp1 = self.remove_branche(a1)
comp2 = self.remove_branche(a2)
self.connecte(v1, d1, v2, d2, comp1 + comp2)
coudes.append(n)
[self.remove_noeud(n.nom) for n in coudes]
def traiter_condensateurs(self):
cond_list = []
for nom, noeud in self.noeuds.items():
if noeud.type == "Condensateur":
branche_g = noeud.get_branche("g")
branche_d = noeud.get_branche("d")
n1 = self.nouveau_nom()
nn_amont = Noeud({'nom': n1, 'type': 'Noeud'})
cond_list.append(nn_amont)
n2 = self.nouveau_nom()
nn_aval = Noeud({'nom': n2, 'type': 'Noeud'})
cond_list.append(nn_aval)
cond = Condensateur(noeud.donnees)
self.condensateurs[nom] = cond
cond.amont = nn_amont
cond.aval = nn_aval
# On raccorde le nouveau noeud amont au voisin amont initial
voisin_g, dir_g = branche_g.get_neighbour(noeud, "g")
composants_g = self.remove_branche(branche_g)
self.connecte(voisin_g, dir_g, nn_amont, "m", composants_g)
# Le noeud nouveau noeud amont au condensateur
self.connecte_condensateur_amont(cond, nn_amont, "m")
# Le nouveau noeud aval au condensateur
self.connecte_condensateur_aval(cond, nn_aval, "m")
# On raccorde le nouveau noeud aval à la branche aval initiale
voisin_d, dir_d = branche_d.get_neighbour(noeud, "d")
composants_d = self.remove_branche(branche_d)
self.connecte(voisin_d, dir_d, nn_aval, "m", composants_d)
for c in cond_list:
self.noeuds[c.nom] = c
def reduire_branches(self):
noeuds_a_supprimer = []
for nom, noeud in self.noeuds.items():
if noeud.type in self.REDUCTIBLE:
# Le composant est stockable dans la branche (composant simple en série)
b_gauche = noeud.get_branche("g")
b_droite = noeud.get_branche("d")
voisin_g, voisin_g_dir = b_gauche.get_neighbour(noeud, "g")
voisin_d, voisin_d_dir = b_droite.get_neighbour(noeud, "d")
comp_br_gauche = self.remove_branche(b_gauche)
comp_br_droite = self.remove_branche(b_droite)
self.connecte(voisin_g, voisin_g_dir, voisin_d, voisin_d_dir, comp_br_gauche + [noeud] + comp_br_droite)
noeuds_a_supprimer.append(nom)
if noeud.type == "Diode" and noeud.donnees['orientation'] == 180:
noeud.nom = "!" + noeud.nom
elif noeud.type == "Coude":
assert False, "Les coudes auraient dûs être éliminés !"
elif noeud.type in self.RIEN_A_FAIRE:
pass
else:
raise GraphStructException("Type inconnu : " + noeud.type)
for n in noeuds_a_supprimer:
del self.noeuds[n]
def make_liste_composants(self):
self.composants = {}
for b in self.branches:
for c in b.composants:
self.composants[c.nom] = c
def process(self):
"""
Méthode principale de traitement du graphe
"""
try:
self.simplifier()
self.traiter_composites()
self.traiter_condensateurs()
self.reduire_branches()
self.make_liste_composants()
except KeyError:
logging.exception("Erreur de chaînage dans le graphe")
raise GraphStructException