# Study.py -- Pamhyr study checkers # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import time import logging from functools import reduce from PyQt5.QtCore import QCoreApplication from Modules import Modules from Checker.Checker import AbstractModelChecker, STATUS _translate = QCoreApplication.translate logger = logging.getLogger() class StudyNetworkReachChecker(AbstractModelChecker): def __init__(self): super(StudyNetworkReachChecker, self).__init__() self._name = _translate("Checker", "Study network reach checker") self._description = _translate( "Checker", "Check if exists at least one reach for study" ) self._modules = Modules.NETWORK def run(self, study): if not self.basic_check(study): return False river = study.river edges = list(filter(lambda e: e.is_enable(), river.edges())) if len(edges) == 0: self._status = STATUS.ERROR self._summary = "no_reach_defined" return False self._summary = "ok" self._status = STATUS.OK return True class StudyGeometryChecker(AbstractModelChecker): def __init__(self): super(StudyGeometryChecker, self).__init__() self._name = _translate("Checker", "Study geometry checker") self._description = _translate( "Checker", "Check if exists geometry for each reach of study" ) self._modules = Modules.GEOMETRY self._reachs = [] def run(self, study): ok = True nerror = 0 summary = "ok" status = STATUS.OK if not self.basic_check(study): return False river = study.river edges = river.enable_edges() if len(edges) == 0: self._status = STATUS.ERROR self._summary = "no_reach_defined" return False for edge in edges: if len(edge.reach.profiles) < 2: summary = "no_geometry_defined" status = STATUS.ERROR ok = False self._reachs.append(edge) self._summary = summary self._status = status return ok class StudyInitialConditionsChecker(AbstractModelChecker): def __init__(self): super(StudyInitialConditionsChecker, self).__init__() self._name = _translate("Checker", "Study initial conditions checker") self._description = _translate( "Checker", "Check initial conditions for each node of study" ) self._modules = Modules.INITIAL_CONDITION def run(self, study): ok = True nerror = 0 self._summary = "ok" self._status = STATUS.OK if not self.basic_check(study): return False for reach in study.river.enable_edges(): ok &= self.check_reach(study, reach.reach) return ok def check_reach(self, study, reach): ok = True river = study.river if reach not in river.initial_conditions: self._summary = "missing_initial_condition_defined" self._status = STATUS.WARNING return ok ic = river.initial_conditions[reach] len_ic = len(ic) len_reach = len(reach) if len_ic < len_reach: self._summary = "initial_condition_missing_profile" self._status = STATUS.WARNING if len_ic > len_reach: self._summary = "more_initial_condition_than_profile" self._status = STATUS.ERROR ok = False return ok class StudyBoundaryConditionChecker(AbstractModelChecker): def __init__(self): super(StudyBoundaryConditionChecker, self).__init__() self._name = _translate("Checker", "Study boundary conditions checker") self._description = _translate( "Checker", "Check boundary conditions for each node of study" ) self._modules = Modules.BOUNDARY_CONDITION def run(self, study): ok = True nerror = 0 self._summary = "ok" self._status = STATUS.OK if not self.basic_check(study): return False ok = self.check_liquid(study) return ok def check_liquid(self, study): river = study.river bcs = river.boundary_condition.get_tab('liquid') if len(bcs) == 0: self._status = STATUS.ERROR self._summary = "no_boundary_condition_defined" return False upstream, downstream = reduce( lambda acc, n: ( acc[0] + [n] if river.is_upstream_node(n) else acc[0], acc[1] + [n] if river.is_downstream_node(n) else acc[1], ), filter( lambda n: river.is_enable_node(n), river.nodes() ), ([], []) ) bcs_nodes = set(map(lambda bc: bc.node, bcs)) upstream_ok = self.check_liquid_all_node_has_bc(bcs_nodes, upstream) downstream_ok = self.check_liquid_all_node_has_bc(bcs_nodes, downstream) ok = upstream_ok and downstream_ok if not ok: self._status = STATUS.ERROR self._summary = "no_boundary_condition_at_boundary_node" return ok def check_liquid_all_node_has_bc(self, bcs_nodes, nodes): return reduce( lambda acc, n: (acc and (n in bcs_nodes)), nodes, True ) class DummyOK(AbstractModelChecker): def __init__(self): super(DummyOK, self).__init__() self._name = _translate("Checker", "Dummy ok") self._description = _translate("Checker", "Dummy ok") def run(self, study): time.sleep(1) self._summary = "ok" self._status = STATUS.OK return True class DummyWARNING(AbstractModelChecker): def __init__(self): super(DummyWARNING, self).__init__() self._name = _translate("Checker", "Dummy warning") self._description = _translate("Checker", "Dummy warning") def run(self, study): time.sleep(1) self._summary = "Warning detected" self._status = STATUS.WARNING return True class DummyERROR(AbstractModelChecker): def __init__(self): super(DummyERROR, self).__init__() self._name = _translate("Checker", "Dummy error") self._description = _translate("Checker", "Dummy error") def run(self, study): time.sleep(1) self._summary = "Error detected" self._status = STATUS.ERROR return True