# MainWindow.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import sys import logging import subprocess from queue import Queue from PyQt5 import QtGui from PyQt5.QtGui import ( QKeySequence, QDesktopServices, ) from PyQt5.QtCore import ( Qt, QTranslator, QEvent, QUrl, QTimer, QCoreApplication, ) from PyQt5.QtWidgets import ( QMainWindow, QApplication, QAction, QFileDialog, QShortcut, QMenu, QToolBar, QMessageBox, QProgressDialog, ) from PyQt5.uic import loadUi from View.Tools.ASubWindow import WindowToolKit from View.Tools.ListedSubWindow import ListedSubWindow from View.DummyWindow import DummyWindow from View.Translate import MainTranslate from View.Configure.Window import ConfigureWindow from View.Study.Window import NewStudyWindow from View.About.Window import AboutWindow from View.Network.Window import NetworkWindow from View.Geometry.Window import GeometryWindow from View.BoundaryCondition.Window import BoundaryConditionWindow from View.Reservoir.Window import ReservoirWindow from View.HydraulicStructures.Window import HydraulicStructuresWindow from View.LateralContribution.Window import LateralContributionWindow from View.InitialConditions.Window import InitialConditionsWindow from View.Stricklers.Window import StricklersWindow from View.Frictions.Window import FrictionsWindow from View.SedimentLayers.Window import SedimentLayersWindow from View.SedimentLayers.Reach.Window import ReachSedimentLayersWindow from View.SolverParameters.Window import SolverParametersWindow from View.RunSolver.Window import SelectSolverWindow, SolverLogWindow from View.CheckList.Window import CheckListWindow from View.Results.Window import ResultsWindow from View.Results.ReadingResultsDialog import ReadingResultsDialog from View.Debug.Window import ReplWindow # Optional internal display of documentation for make the application # package lighter... try: from View.Doc.Window import DocWindow _doc = "internal" except Exception as e: print("Handle exception: {e}") _doc = "external" from Model.Study import Study logger = logging.getLogger() no_model_action = [ "action_menu_new", "action_menu_open", "action_menu_import_mage", "action_menu_import_rubarbe", "action_toolBar_open", ] model_action = [ "action_menu_close", "action_menu_edit", "action_menu_save", "action_menu_save_as", "action_toolBar_close", "action_toolBar_save", "action_menu_numerical_parameter", "action_open_results_from_file", ] other_model_action = [ "action_toolBar_run_solver", "action_toolBar_kill_solver" ] define_model_action = [ # Toolbar "action_toolBar_network", "action_toolBar_geometry", "action_toolBar_mesh", "action_toolBar_run_meshing_tool", "action_toolBar_boundary_cond", "action_toolBar_lateral_contrib", "action_toolBar_spills", "action_toolBar_frictions", "action_toolBar_stricklers", "action_toolBar_building", "action_toolBar_initial_cond", # Menu "action_menu_run_solver", "action_menu_numerical_parameter", "action_menu_edit_network", "action_menu_edit_geometry", "action_menu_boundary_conditions", "action_menu_initial_conditions", "action_menu_edit_friction", "action_menu_edit_lateral_contribution", "action_menu_run_solver", "action_menu_sediment_layers", "action_menu_edit_reach_sediment_layers", "action_menu_edit_reservoirs", "action_menu_edit_hydraulic_structures", "action_menu_results_last", "action_open_results_from_file", "action_menu_boundary_conditions_sediment", ] action = ( no_model_action + model_action + define_model_action ) class ApplicationWindow(QMainWindow, ListedSubWindow, WindowToolKit): def __init__(self, conf=None): super(ApplicationWindow, self).__init__() self._close_question = False # App Configuration self.conf = conf # Model self._study = None # Translate self._trad = MainTranslate() # Results self._last_solver = None self._last_results = None # UI self.ui = loadUi( os.path.join(os.path.dirname(__file__), "ui", "MainWindow.ui"), self ) self.setup_sc() self.setup_connection() self.default_style() self.setup_debug_mode(init=True) self.setup_results() self.setup_timer() if not self.conf.close_correctly: if self.conf.last_study != "": self.dialog_reopen_study() if _doc == "external": logger.info("doc: Internal documentation is not available...") def set_title(self): title = "(dbg) " if self.conf.debug else "" if self._study is not None: title += f"Pamhyr2 - {self._study.name}" self.setWindowTitle(title) else: title += "Pamhyr2" self.setWindowTitle(title) def enable_actions(self, action: str, enable: bool): """Enable of disable an action componant Args: action: Action to enable/disable enable: True to Enable, or False to disable Returns: Nothing """ logger.debug(f"Set {action} to {enable}") self.findChild(QAction, action).setEnabled(enable) def setup_sc(self): self._run_sc = QShortcut(QKeySequence("F6"), self) return def setup_connection(self): """Connect action to callback function Returns: Nothing """ actions = { # Menu action "action_menu_config": self.open_configure, "action_menu_new": self.open_new_study, "action_menu_edit": self.open_edit_study, "action_menu_open": self.open_model, "action_menu_save": self.save_study, "action_menu_save_as": self.save_as_study, "action_menu_numerical_parameter": self.open_solver_parameters, "action_menu_edit_network": self.open_network, "action_menu_edit_geometry": self.open_geometry, "action_menu_boundary_conditions": self.open_boundary_cond, "action_menu_boundary_conditions_sediment": self.open_boundary_cond_sed, "action_menu_edit_reservoirs": self.open_reservoir, "action_menu_edit_hydraulic_structures": self.open_hydraulic_structures, "action_menu_initial_conditions": self.open_initial_conditions, "action_menu_edit_friction": self.open_frictions, "action_menu_edit_lateral_contribution": self.open_lateral_contrib, "action_menu_run_solver": self.select_and_run_solver, "action_menu_sediment_layers": self.open_sediment_layers, "action_menu_edit_reach_sediment_layers": self.open_reach_sediment_layers, "action_menu_close": self.close_model, "action_menu_results_last": self.open_last_results, "action_open_results_from_file": self.open_results_from_file, # Help "action_menu_pamhyr_users_wiki": self.open_doc_user, "action_menu_pamhyr_developers_pdf": lambda: self.open_doc_dev(ext="pdf"), "action_menu_pamhyr_developers_html": lambda: self.open_doc_dev(ext="html"), "action_menu_mage": self.open_doc_mage, "action_menu_about": self.open_about, # ToolBar action "action_toolBar_quit": self.close, "action_toolBar_open": self.open_model, "action_toolBar_save": self.save_study, "action_toolBar_close": self.close_model, "action_toolBar_run_solver": self.run_lasest_solver, # Current actions "action_toolBar_network": self.open_network, "action_toolBar_geometry": self.open_geometry, "action_toolBar_mesh": lambda: self.open_dummy("Mesh"), "action_toolBar_run_meshing_tool": self.open_solver_parameters, "action_toolBar_boundary_cond": self.open_boundary_cond, "action_toolBar_lateral_contrib": self.open_lateral_contrib, "action_toolBar_spills": lambda: self.open_dummy("Deversement"), "action_toolBar_stricklers": self.open_stricklers, "action_toolBar_frictions": self.open_frictions, "action_toolBar_building": lambda: self.open_dummy("Ouvrages"), "action_toolBar_initial_cond": self.open_initial_conditions, } for action in actions: logger.debug("Setup connection : " + action) self.findChild(QAction, action)\ .triggered.connect(actions[action]) # action.triggered.connect(actions[action]) self._run_sc.activated.connect(self.run_lasest_solver) def changeEvent(self, event): if event.type() == QEvent.LanguageChange: self.retranslateUi() super(ApplicationWindow, self).changeEvent(event) def close(self): if self._study is not None and not self._study.is_saved: self._close_question = True if self.dialog_close(): # PAMHYR is close correctly (no crash) self.conf.set_close_correctly() super(ApplicationWindow, self).close() else: self._close_question = False else: # PAMHYR is close correctly (no crash) self.conf.set_close_correctly() super(ApplicationWindow, self).close() def closeEvent(self, event): if not self._close_question: if self._study is not None and not self._study.is_saved: if self.dialog_close(cancel=False): # PAMHYR is close correctly (no crash) self.conf.set_close_correctly() super(ApplicationWindow, self).closeEvent(event) else: super(ApplicationWindow, self).closeEvent(event) def default_style(self): """Set default window style Returns: Nothing """ self.update_enable_action() # self.showMaximized() def set_debug_lvl(self, debug=True): if debug: logger.setLevel(logging.DEBUG) logger.info("Set logging level to DEBUG") else: logger.setLevel(logging.INFO) logger.info("Set logging level to INFO") def setup_debug_mode(self, init=False): menu = self.findChild(QMenu, "menu_help") menu.setToolTipsVisible(True) self.set_title() if init: self.debug_action = QAction("Debug", self) self.debug_action.setToolTip(self._trad["open_debug"]) self.debug_action.triggered.connect(self.open_debug) self.debug_sqlite_action = QAction("Debug SQLite", self) self.debug_sqlite_action.setToolTip(self._trad["open_debug_sql"]) self.debug_sqlite_action.triggered.connect(self.open_sqlite) if self.conf.debug: menu.addAction(self.debug_action) menu.addAction(self.debug_sqlite_action) self.set_debug_lvl(debug=True) else: if self.conf.debug: menu.addAction(self.debug_action) menu.addAction(self.debug_sqlite_action) self.set_debug_lvl(debug=True) else: menu.removeAction(self.debug_action) menu.removeAction(self.debug_sqlite_action) self.set_debug_lvl(debug=False) def setup_timer(self): self._init_propagation_keys() self._propagation_timer = QTimer(self) self._propagation_timer.start(2000) self._propagation_timer.timeout.connect( self._do_propagate_update ) def _init_propagation_keys(self): self._propagation_keys = set() def _propagate_update(self, key=None): self._propagation_keys.add(key) logger.debug(f"Propagation keys: {self._propagation_keys}") def _do_propagate_update(self): keys = self._propagation_keys.copy() self._init_propagation_keys() for key in keys: if key == "window_list": logger.debug(f"Update window list") self._do_update_window_list() continue logger.debug(f"Propagation of {key}") for _, window in self.sub_win_list: window._propagated_update(key=key) ######### # MODEL # ######### def get_model(self): return self._study def set_model(self, model): self._study = model self.update_enable_action() self.conf.set_last_study(self._study.filename) self.set_title() def close_model(self): self._study = None self.update_enable_action() self.conf.set_close_correctly() self.set_title() self._close_sub_window() def update_enable_action(self): """Update status of action componante Update status of action componant, enable or disable in function of model state Returns: Nothing """ no_model = self._study is None for action in no_model_action: self.enable_actions(action, no_model) for action in define_model_action + other_model_action: self.enable_actions(action, not no_model) for action in model_action: self.enable_actions(action, not no_model) def setup_results(self): self._last_solver = None self._last_results = None default = None for solver in self.conf.solvers: if solver.name == "default-mage": default = solver if solver.name == self.conf.last_solver_name: self._last_solver = solver if self._study is not None: self.enable_actions("action_menu_results_last", True) return # Last solver note found, use default-mage if exists self._last_solver = default def set_results(self, solver, results): self._last_solver = solver self._last_results = results self.enable_actions("action_menu_results_last", True) ############ # FEATURES # ############ def open_study(self, filename): """Open a study Args: filename: The study path Returns: Nothing """ self.set_model(Study.open(filename)) logger.info(f"Open Study - {self._study.name}") self.set_title() def save_study(self): """Save current study Save current study, if study as no associate file, open a file dialog. Returns: Nothing """ if self._study.filename is None or self._study.filename == "": file_name, _ = QFileDialog.getSaveFileName( self, "Save File", "", "Pamhyr(*.pamhyr)" ) if file_name.rsplit(".", 1)[-1] == "pamhyr": self._study.filename = file_name else: self._study.filename = file_name + ".pamhyr" if self._study.is_saved: return sql_request_count = self._study.sql_save_request_count() progress = QProgressDialog( "Saving...", None, 0, sql_request_count, parent=self ) progress.setWindowModality(Qt.WindowModal) progress.setValue(0) logger.info("Save...") self._study.save( progress=lambda: progress.setValue(progress.value() + 1) ) logger.info("Done") def save_as_study(self): """Save current study as new file Save current study as new file, if study as no associate file, open a file dialog. Returns: Nothing """ file_name, _ = QFileDialog.getSaveFileName( self, "Save File", "", "Pamhyr(*.pamhyr)" ) logger.debug(f"Save study as : {repr(file_name)}") if file_name == "": return if file_name.rsplit(".", 1)[-1] == "pamhyr": logger.debug( "Pamhyr extention is present : " + f"{repr(file_name)}" ) self._study.filename = file_name else: logger.debug( "Pamhyr extention is not present : " + f"{repr(file_name + '.pamhyr')}" ) self._study.filename = file_name + ".pamhyr" sql_request_count = self._study.sql_save_request_count() progress = QProgressDialog( "Saving...", None, 0, sql_request_count, parent=self ) progress.setWindowModality(Qt.WindowModal) progress.setValue(0) logger.info("Save...") self._study.save( progress=lambda: progress.setValue(progress.value() + 1) ) logger.info("Done") ################## # MSG AND DIALOG # ################## def msg_select_reach(self): self.message_box("Please select a reach", "Geometry edition need a reach selected " "into river network window to work on it") def dialog_reopen_study(self): dlg = QMessageBox(self) dlg.setWindowTitle("Last open study") dlg.setText("Do you want to open again the last open study?") opt = QMessageBox.Cancel | QMessageBox.Ok # | QMessageBox.Open dlg.setStandardButtons(opt) dlg.setIcon(QMessageBox.Question) res = dlg.exec() if res == QMessageBox.Ok: self.open_study(self.conf.last_study) return True elif res == QMessageBox.Open: self.open_model() return True elif res == QMessageBox.Cancel: return False def dialog_close(self, cancel=True): dlg = QMessageBox(self) dlg.setWindowTitle("Close PAMHYR without saving study") dlg.setText("Do you want to save current study before PAMHYR close ?") opt = QMessageBox.Save | QMessageBox.Ignore if cancel: opt |= QMessageBox.Cancel dlg.setStandardButtons(opt) dlg.setIcon(QMessageBox.Warning) res = dlg.exec() if res == QMessageBox.Save: self.save_study() return True elif res == QMessageBox.Ignore: return True elif res == QMessageBox.Cancel: return False ######################### # SUB WINDOWS MENU LIST # ######################### def _activate_window(self, window_hash): self._try_activate_window_for_window(self, window_hash) def _try_activate_window_for_window(self, source_window, window_hash): try: window = source_window.get_sub_win(window_hash) if window is not None: window.activateWindow() else: for _, win in source_window.sub_win_list: self._try_activate_window_for_window( win, window_hash ) except Exception: return def _update_window_list(self): self._propagation_keys.add("window_list") def _do_update_window_list(self): menu = self.findChild(QMenu, "menu_windows") menu.setToolTipsVisible(True) # Remove all actions menu.clear() self._do_update_window_list_add_action(menu, self.sub_win_list) def _do_update_window_list_add_action(self, menu, win_list): for _, win in win_list: self._do_update_window_list_add_action_for_window( menu, win ) try: lst = win.sub_win_list self._do_update_window_list_add_action(menu, lst) except Exception: continue def _do_update_window_list_add_action_for_window(self, menu, window): def lambda_generator(h): return lambda: self._activate_window(h) action = QAction(window._title, self) action.setToolTip(self._trad["active_window"]) h = window.hash() fn = lambda_generator(h) action.triggered.connect(fn) menu.addAction(action) ############### # SUB WINDOWS # ############### def open_configure(self): """Open configure window Open PamHyr configure window Returns: Nothing """ if self.sub_window_exists( ConfigureWindow, data=[None, self.conf] ): return self.config = ConfigureWindow(config=self.conf, parent=self) self.config.show() def open_about(self): """Open about window Open a new window with information about PamHyr Returns: Nothing """ if self.sub_window_exists( AboutWindow, data=[None, None] ): return self.about = AboutWindow(parent=self) self.about.show() def open_model(self): """Open file dialog to select saved model Returns: Nothing """ if self._study is None: dialog = QFileDialog(self) dialog.setFileMode(QFileDialog.FileMode.ExistingFile) dialog.setDefaultSuffix(".pamhyr") # dialog.setFilter(dialog.filter() | QtCore.QDir.Hidden) dialog.setNameFilters(['PamHyr (*.pamhyr)']) dialog.setDirectory(os.path.dirname(self.conf.last_study)) if dialog.exec_(): file_name = dialog.selectedFiles() self.open_study(file_name[0]) def open_new_study(self): """Open dialog to set new study Returns: Nothing """ if self._study is None: if self.sub_window_exists( NewStudyWindow, data=[None, None] ): return self.new_study = NewStudyWindow(parent=self) self.new_study.show() def open_edit_study(self): """Open dialog to set new study Returns: Nothing """ if self._study is not None: if self.sub_window_exists( NewStudyWindow, data=[self._study, None] ): return self.new_study = NewStudyWindow(study=self._study, parent=self) self.new_study.show() def open_network(self): """Open network dialog Returns: Nothing """ if self._study is not None: if self.sub_window_exists( NetworkWindow, data=[self._study, None] ): return self.network = NetworkWindow(study=self._study, parent=self) self.network.show() def open_geometry(self): """Open geometry window Returns: Nothing """ if (self._study is not None and self._study.river.has_current_reach()): reach = self._study.river.current_reach().reach if self.sub_window_exists( GeometryWindow, data=[self._study, self.conf, reach] ): return geometry = GeometryWindow( study=self._study, config=self.conf, reach=reach, parent=self ) geometry.show() else: self.msg_select_reach() def open_boundary_cond_sed(self): self.open_boundary_cond(tab=1) def open_boundary_cond(self, tab=0): if self.sub_window_exists( BoundaryConditionWindow, data=[self._study, None] ): bound = self.get_sub_window( BoundaryConditionWindow, data=[self._study, None] ) bound.set_active_tab(tab) return bound = BoundaryConditionWindow(study=self._study, parent=self) bound.show() bound.set_active_tab(tab) def open_reservoir(self): if self.sub_window_exists( ReservoirWindow, data=[self._study, None] ): return reservoir = ReservoirWindow(study=self._study, parent=self) reservoir.show() def open_hydraulic_structures(self): if self.sub_window_exists( HydraulicStructuresWindow, data=[self._study, None] ): return hydraulic_structures = HydraulicStructuresWindow( study=self._study, parent=self ) hydraulic_structures.show() def open_lateral_contrib(self): if self.sub_window_exists( LateralContributionWindow, data=[self._study, None] ): return lateral = LateralContributionWindow( study=self._study, parent=self ) lateral.show() def open_stricklers(self): if self.sub_window_exists( StricklersWindow, data=[self._study, self.conf] ): return strick = StricklersWindow( study=self._study, config=self.conf, parent=self ) strick.show() def open_frictions(self): if self._study is not None: if self._study.river.has_current_reach(): reach = self._study.river.current_reach() if self.sub_window_exists( FrictionsWindow, data=[self._study, None, reach] ): return frictions = FrictionsWindow( study=self._study, parent=self ) frictions.show() else: self.msg_select_reach() def open_initial_conditions(self): if self._study.river.has_current_reach(): reach = self._study.river.current_reach() if self.sub_window_exists( InitialConditionsWindow, data=[self._study, self.conf, reach] ): return initial = InitialConditionsWindow( study=self._study, config=self.conf, reach=reach, parent=self ) initial.show() else: self.msg_select_reach() def open_solver_parameters(self): if self.sub_window_exists( SolverParametersWindow, data=[self._study, None] ): return params = SolverParametersWindow( study=self._study, parent=self ) params.show() def open_sediment_layers(self): if self.sub_window_exists( SedimentLayersWindow, data=[self._study, None] ): return sl = SedimentLayersWindow( study=self._study, parent=self ) sl.show() def open_reach_sediment_layers(self): reach = self._study.river.current_reach().reach if self.sub_window_exists( ReachSedimentLayersWindow, data=[self._study, None, reach] ): return sl = ReachSedimentLayersWindow( study=self._study, reach=reach, parent=self ) sl.show() def run_lasest_solver(self): if self._last_solver is None: return self.run_solver(self._last_solver) def select_and_run_solver(self): if self._study is None: return run = SelectSolverWindow( study=self._study, config=self.conf, parent=self ) if run.exec(): self.run_solver(run.solver) def run_solver(self, solver): if self._study is None: return if self.sub_window_exists( CheckListWindow, data=[ self._study, self.conf, solver ] ): return check = CheckListWindow( study=self._study, config=self.conf, solver=solver, parent=self ) check.show() def solver_log(self, solver): sol = SolverLogWindow( study=self._study, config=self.conf, solver=solver, parent=self ) sol.show() def open_solver_results(self, solver, results=None): def reading_fn(): self._tmp_results = results # If no specific results, get last results if results is None: def reading_fn(): self._tmp_results = self._last_results if self._last_results is None: def reading_fn(): self._tmp_results = solver.results( self._study, self._solver_workdir(solver), ) # Open from file if type(results) is str: logger.info(f"Open results from {os.path.dirname(results)}") name = os.path.basename(results).replace(".BIN", "") def reading_fn(): self._tmp_results = solver.results( self._study, os.path.dirname(results), name=name ) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() results = self._tmp_results # No results available if results is None: return # Windows already opened if self.sub_window_exists( ResultsWindow, data=[ self._study, None, # No config solver, results ] ): return res = ResultsWindow( study=self._study, solver=solver, results=results, parent=self ) res.show() def _solver_workdir(self, solver): workdir = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), solver.name.replace(" ", "_"), ) return workdir def open_last_results(self): if self._last_solver is None: return self.open_solver_results(self._last_solver, self._last_results) def open_results_from_file(self): if self._study is None: return if self._study.filename == "": return dialog = QFileDialog(self) dialog.setFileMode(QFileDialog.FileMode.ExistingFile) dialog.setDefaultSuffix(".BIN") # dialog.setFilter(dialog.filter() | QtCore.QDir.Hidden) dialog.setNameFilters(['Mage (*.BIN)']) if self._last_solver is None: dialog.setDirectory( os.path.dirname(self._study.filename) ) else: dialog.setDirectory( self._solver_workdir(self._last_solver) ) if dialog.exec_(): file_name = dialog.selectedFiles() logger.info(f"Select results: {file_name}") self.open_solver_results( self._last_solver, results=file_name[0] ) ################# # DOCUMENTATION # ################# def _doc_path_file(self, filename): if ".py" in sys.argv[0]: return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "..", "doc", filename ) ) return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "..", "..", "doc", filename ) ) def open_doc(self, filename): if "https://" in filename: url = QUrl(filename) QDesktopServices.openUrl(url) return if _doc == "external": url = QUrl(f"file://{self._doc_path_file(filename)}") QDesktopServices.openUrl(url) else: if ".odt" in filename: url = QUrl(f"file://{DocWindow._path_file(filename)}") QDesktopServices.openUrl(url) else: doc = DocWindow( filename=filename, parent=self ) doc.show() def open_doc_user(self): self.open_doc( "https://gitlab.irstea.fr/theophile.terraz/pamhyr/-/wikis/home" ) def open_doc_dev(self, ext="pdf"): self.open_doc(f"Pamhyr2-dev.{ext}") def open_doc_mage(self): self.open_doc("mage8.pdf") ######### # DEBUG # ######### def open_debug(self): repl = ReplWindow( study=self._study, config=self.conf, parent=self ) repl.show() def open_sqlite(self): if self._study is None: logger.debug("No study open for sql debuging...") return file = self._study.filename _ = subprocess.Popen( f"sqlitebrowser {file}", shell=True ) # TODO: Delete me ! ############### # DUMMY STUFF # ############### def open_dummy(self, title="Dummy"): self.dummy = DummyWindow( title=title if type(title) is str else "Dummy", parent=self ) self.dummy.show()