diff --git a/src/Model/Network/Graph.py b/src/Model/Network/Graph.py index c96345b1242d6911ffb2de58b6151e6ccb7bfc09..13e6d66c879c0989f7c652e670850c7bc73a22d2 100644 --- a/src/Model/Network/Graph.py +++ b/src/Model/Network/Graph.py @@ -48,10 +48,14 @@ class Graph(object): def enable_edges(self): return list( - filter( - lambda e: e.is_enable(), - self._edges - ) + self._enable_edges() + ) + + def _enable_edges(self): + """Return a generator""" + return filter( + lambda e: e.is_enable(), + self._edges ) def edges_names(self): @@ -138,8 +142,20 @@ class Graph(object): self._nodes ) ) + self._remove_associated_edge(node_name) self._status.modified() + def _remove_associated_edge(self, node_name: str): + edges = list( + filter( + lambda e: (e.node1.name == node_name or + e.node2.name == node_name), + self._edges, + ) + ) + for edge in edges: + self.remove_edge(edge.name) + def create_node(self, x: float = 0.0, y: float = 0.0): node = self._create_node(x, y) return node @@ -186,14 +202,14 @@ class Graph(object): def is_upstream_node(self, node): return reduce( lambda acc, e: (acc and (e.node2 != node or not e.enable)), - self._edges, + self._enable_edges(), True ) def is_downstream_node(self, node): return reduce( lambda acc, e: (acc and (e.node1 != node or not e.enable)), - self._edges, + self._enable_edges(), True ) @@ -203,10 +219,9 @@ class Graph(object): acc or ( (e.node1 == node or e.node2 == node) - and e._enable ) ), - self._edges, + self._enable_edges(), False ) diff --git a/src/Model/Network/__init__.py b/src/Model/Network/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..89e3fbe8b5c9d03daa6dcdb50838fb054f67a946 --- /dev/null +++ b/src/Model/Network/__init__.py @@ -0,0 +1,17 @@ +# __init__.py -- Pamhyr +# Copyright (C) 2023 INRAE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +# -*- coding: utf-8 -*- diff --git a/src/Model/Network/test_network.py b/src/Model/Network/test_network.py new file mode 100644 index 0000000000000000000000000000000000000000..a232e9ca7986a26fd772f6b21e2af627fae2dde6 --- /dev/null +++ b/src/Model/Network/test_network.py @@ -0,0 +1,182 @@ +# test_Network.py -- Pamhyr +# Copyright (C) 2023 INRAE +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +# -*- coding: utf-8 -*- + +import os +import unittest +import tempfile + +from Model.Saved import SavedStatus + +from Model.Network.Graph import Graph +from Model.Network.Edge import Edge +from Model.Network.Node import Node + + +def new_graph(): + status = SavedStatus() + g = Graph(status=status) + + return g + + +class GraphTestCase(unittest.TestCase): + def test_graph_0(self): + g = new_graph() + + self.assertEqual(g.nodes_counts(), 0) + self.assertEqual(g.edges_counts(), 0) + + def test_graph_1(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + + e0 = g.add_edge(n0, n1) + + self.assertEqual(g.nodes_counts(), 2) + self.assertEqual(g.edges_counts(), 1) + + def test_graph_2(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + + e0 = g.add_edge(n0, n1) + + g.remove_edge(e0.name) + + self.assertEqual(g.nodes_counts(), 2) + self.assertEqual(g.edges_counts(), 0) + + def test_graph_3(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + + e0 = g.add_edge(n0, n1) + + g.remove_node(n0.name) + + self.assertEqual(g.nodes_counts(), 1) + self.assertEqual(g.edges_counts(), 0) + + def test_graph_4(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n1, n2) + + g.remove_node(n1.name) + + self.assertEqual(g.nodes_counts(), 2) + self.assertEqual(g.edges_counts(), 0) + + def test_graph_upstream_0(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n1, n2) + + self.assertEqual(g.is_upstream_node(n0), True) + self.assertEqual(g.is_upstream_node(n1), False) + self.assertEqual(g.is_upstream_node(n2), False) + + def test_graph_upstream_1(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n2, n1) + + self.assertEqual(g.is_upstream_node(n0), True) + self.assertEqual(g.is_upstream_node(n1), False) + self.assertEqual(g.is_upstream_node(n2), True) + + def test_graph_upstream_disable(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n1, n2) + + e0.disable() + + # self.assertEqual(g.is_upstream_node(n0), False) + self.assertEqual(g.is_upstream_node(n1), True) + self.assertEqual(g.is_upstream_node(n2), False) + + def test_graph_downstream_0(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n1, n2) + + self.assertEqual(g.is_downstream_node(n0), False) + self.assertEqual(g.is_downstream_node(n1), False) + self.assertEqual(g.is_downstream_node(n2), True) + + def test_graph_downstream_1(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n2, n1) + + self.assertEqual(g.is_downstream_node(n0), False) + self.assertEqual(g.is_downstream_node(n1), True) + self.assertEqual(g.is_downstream_node(n2), False) + + def test_graph_downstream_disable(self): + g = new_graph() + + n0 = g.add_node() + n1 = g.add_node() + n2 = g.add_node() + + e0 = g.add_edge(n0, n1) + e1 = g.add_edge(n1, n2) + + e1.disable() + + self.assertEqual(g.is_downstream_node(n0), False) + self.assertEqual(g.is_downstream_node(n1), True) + # self.assertEqual(g.is_downstream_node(n2), False)