# Window.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import logging import tempfile from queue import Queue from tools import trace, timer, logger_exception from View.Tools.PamhyrWindow import PamhyrDialog, PamhyrWindow from PyQt5.QtGui import ( QKeySequence, ) from PyQt5.QtCore import ( Qt, QVariant, QAbstractTableModel, QCoreApplication, QModelIndex, pyqtSlot, QRect, QTimer, QProcess, ) from PyQt5.QtWidgets import ( QDialogButtonBox, QPushButton, QLineEdit, QFileDialog, QTableView, QAbstractItemView, QUndoStack, QShortcut, QAction, QItemDelegate, QComboBox, QVBoxLayout, QHeaderView, QTabWidget, QTextEdit, ) from View.RunSolver.Log.Window import SolverLogFileWindow from View.Results.ReadingResultsDialog import ReadingResultsDialog try: from signal import SIGTERM, SIGSTOP, SIGCONT _signal = True except Exception: _signal = False _translate = QCoreApplication.translate logger = logging.getLogger() class SelectSolverWindow(PamhyrDialog): _pamhyr_ui = "SelectSolver" _pamhyr_name = "Select solver" def __init__(self, study=None, config=None, parent=None): self._solver = None name = _translate("Solver", "Select solver") super(SelectSolverWindow, self).__init__( title=name, study=study, config=config, options=[], parent=parent ) self.setup_combobox() self.setup_connections() self.select_last_solver() def setup_combobox(self): solvers = self._config.solvers solvers_name = list( map( self._format_solver_name, solvers ) ) self.combobox_add_items("comboBox", solvers_name) def setup_connections(self): self.find(QPushButton, "pushButton_run").clicked.connect(self.accept) self.find(QPushButton, "pushButton_cancel")\ .clicked.connect(self.reject) def select_last_solver(self): solvers = self._config.solvers last = self._config.last_solver_name solver = list( filter( lambda s: s.name == last, solvers ) ) if len(solver) != 0: self.set_combobox_text( "comboBox", self._format_solver_name(solver[0]) ) def _format_solver_name(self, solver): return f"{solver.name} - ({solver._type})" @property def solver(self): return self._solver def accept(self): solver_name = self.get_combobox_text("comboBox") solver_name = solver_name.rsplit(" - ", 1)[0] self._config.update_last_solver_used(solver_name) self._solver = next( filter( lambda s: s.name == solver_name, self._config.solvers ) ) super(SelectSolverWindow, self).accept() class SolverLogWindow(PamhyrWindow): _pamhyr_ui = "SolverLog" _pamhyr_name = "Solver Log" def __init__(self, study=None, config=None, solver=None, parent=None): self._solver = solver self._results = None name = _translate("Solver", "Select log") super(SolverLogWindow, self).__init__( title=name, study=study, config=config, options=[], parent=parent ) self.setup_action() self.setup_alarm() self.setup_connections() self.setup_workdir() self.setup_process() ok = self.export() if ok: self.run() else: self._log(f" *** Failed to export study to {self._solver._type}", color="red") def setup_action(self): self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) self.find(QAction, "action_results").setEnabled(False) def setup_alarm(self): self._alarm = QTimer() def setup_connections(self): self.find(QAction, "action_start").triggered.connect(self.start) self.find(QAction, "action_pause").triggered.connect(self.pause) self.find(QAction, "action_stop").triggered.connect(self.stop) self.find(QAction, "action_log_file").triggered.connect(self.log_file) self.find(QAction, "action_results").triggered.connect(self.results) self._alarm.timeout.connect(self.update) def setup_workdir(self): self._workdir = "" if self._study.filename == "": self._workdir = tempfile.TemporaryDirectory() else: self._workdir = os.path.join( os.path.dirname(self._study.filename), "_PAMHYR_", self._study.name.replace(" ", "_"), self._solver.name.replace(" ", "_"), ) os.makedirs(self._workdir, exist_ok=True) def setup_process(self): self._alarm.start(100) self._output = Queue() self._process = self.new_process(self._parent) def new_process(self, parent): new = QProcess(parent) new.setWorkingDirectory(self._workdir) new.setProcessChannelMode(QProcess.MergedChannels) return new def export(self): self._log(f" *** Export study {self._solver.name}", color="blue") ok = self._solver.export(self._study, self._workdir, qlog=self._output) self.update() return ok def closeEvent(self, event): self._alarm.stop() super(SolverLogWindow, self).closeEvent(event) def _copy(self): self.find(QTextEdit, "textEdit").copy() ####### # LOG # ####### def _log(self, msg, color=None): if type(msg) is str: self._log_str(msg, color) elif type(msg) is int: self._log_int(msg, color) def _log_str(self, msg, color=None): if msg == "": return logger.info(f"solver: {msg}") msg = msg.rsplit('\n', 1)[0] if color is not None: msg = f"<font color=\"{color}\">" + msg + "</font>" self.find(QTextEdit, "textEdit").append(msg) def _log_int(self, int_code, color=None): logger.info(f"solver: Returns {int_code}") color = "blue" if int_code == 0 else "red" self.find(QTextEdit, "textEdit")\ .append( f"<font color=\"{color}\">" + f" *** Finished with code {int_code}" + "</font>" ) self.statusbar.showMessage( "Done" if int_code == 0 else "Failed", 3000 ) ########## # UPDATE # ########## def update(self): if self._solver.is_stoped(): self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) self.find(QAction, "action_results").setEnabled(True) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) self._update_logs_all() # self._update_get_results() self._update_logs_all() def _update_get_results(self): if self._results is None: def reading_fn(): try: self._results = self._solver.results( self._study, self._workdir, qlog=self._output ) self._parent.set_results(self._solver, self._results) except Exception as e: logger.error(f"Failed to open results") logger_exception(e) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() def _update_logs_all(self): while self._output.qsize() != 0: s = self._output.get() try: if type(s) is str and "[ERROR]" in s: self._log(s.encode("utf-8"), color="red") else: self._log(s) except Exception as e: logger_exception(e) #################### # Process controle # #################### def run(self): self._log(f" *** Run solver {self._solver.name}", color="blue") self._solver.run( self._study, process=self._process, output_queue=self._output ) def start(self): if self._solver.is_stoped(): self._log(f" *** Export study {self._solver.name}", color="blue") ok = self._solver.export(self._study, self._workdir, qlog=self._output) if not ok: self._log(f" *** Failed to export", color="red") self.update() return else: self.update() self._process = self.new_process(self._parent) self._log(" *** Start", color="blue") self._results = None self._solver.start(self._study, process=self._process) self.find(QAction, "action_start").setEnabled(False) if _signal: self.find(QAction, "action_pause").setEnabled(True) else: self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_log_file").setEnabled(False) self.find(QAction, "action_results").setEnabled(False) def pause(self): self._log(" *** Pause", color="blue") self._solver.pause() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(True) self.find(QAction, "action_results").setEnabled(False) def stop(self): self._log(" *** Stop", color="blue") self._solver.kill() self.find(QAction, "action_start").setEnabled(True) self.find(QAction, "action_pause").setEnabled(False) self.find(QAction, "action_stop").setEnabled(False) self.find(QAction, "action_results").setEnabled(True) if self._solver.log_file() != "": self.find(QAction, "action_log_file").setEnabled(True) ########### # Results # ########### def results(self): if self._results is None: def reading_fn(): self._results = self._solver.results( self._study, self._workdir, qlog=self._output ) dlg = ReadingResultsDialog(reading_fn=reading_fn, parent=self) dlg.exec_() self._parent.set_results(self._solver, self._results) self._parent.open_solver_results(self._solver, self._results) self._solver.has_results_loaded() def log_file(self): file_name = os.path.join(self._workdir, self._solver.log_file()) log = SolverLogFileWindow( file_name=file_name, study=self._study, config=self._config, solver=self._solver, parent=self, ) log.show()