Forked from HYCAR-Hydro / airGR
Source project has a limited visibility.
Window.py 10.90 KiB
# Window.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
import tempfile

from queue import Queue
from tools import trace, timer, logger_exception

from View.Tools.PamhyrWindow import PamhyrDialog, PamhyrWindow

from PyQt5.QtGui import (
    QKeySequence,
)

from PyQt5.QtCore import (
    Qt, QVariant, QAbstractTableModel,
    QCoreApplication, QModelIndex, pyqtSlot,
    QRect, QTimer, QProcess,
)

from PyQt5.QtWidgets import (
    QDialogButtonBox, QPushButton, QLineEdit,
    QFileDialog, QTableView, QAbstractItemView,
    QUndoStack, QShortcut, QAction, QItemDelegate,
    QComboBox, QVBoxLayout, QHeaderView, QTabWidget,
    QTextEdit,
)

from View.RunSolver.Log.Window import SolverLogFileWindow

try:
    from signal import SIGTERM, SIGSTOP, SIGCONT
    _signal = True
except Exception:
    _signal = False

_translate = QCoreApplication.translate

logger = logging.getLogger()


class SelectSolverWindow(PamhyrDialog):
    _pamhyr_ui = "SelectSolver"
    _pamhyr_name = "Select solver"

    def __init__(self, study=None, config=None,
                 parent=None):
        self._solver = None

        super(SelectSolverWindow, self).__init__(
            title=self._pamhyr_name,
            study=study,
            config=config,
            options=[],
            parent=parent
        )

        self.setup_combobox()
        self.setup_connections()
        self.select_last_solver()

    def setup_combobox(self):
        solvers = self._config.solvers
        solvers_name = list(
            map(
                self._format_solver_name,
                solvers
            )
        )

        self.combobox_add_items("comboBox", solvers_name)

    def setup_connections(self):
        self.find(QPushButton, "pushButton_run").clicked.connect(self.accept)
        self.find(QPushButton, "pushButton_cancel")\
            .clicked.connect(self.reject)

    def select_last_solver(self):
        solvers = self._config.solvers
        last = self._config.last_solver_name

        solver = list(
            filter(
                lambda s: s.name == last,
                solvers
            )
        )

        if len(solver) != 0:
            self.set_combobox_text(
                "comboBox",
                self._format_solver_name(solver[0])
            )

    def _format_solver_name(self, solver):
        return f"{solver.name} - ({solver._type})"

    @property
    def solver(self):
        return self._solver

    def accept(self):
        solver_name = self.get_combobox_text("comboBox")
        solver_name = solver_name.rsplit(" - ", 1)[0]

        self._config.update_last_solver_used(solver_name)

        self._solver = next(
            filter(
                lambda s: s.name == solver_name,
                self._config.solvers
            )
        )

        super(SelectSolverWindow, self).accept()


class SolverLogWindow(PamhyrWindow):
    _pamhyr_ui = "SolverLog"
    _pamhyr_name = "Solver Log"

    def __init__(self, study=None, config=None,
                 solver=None, parent=None):
        self._solver = solver
        self._results = None

        super(SolverLogWindow, self).__init__(
            title=self._pamhyr_name,
            study=study,
            config=config,
            options=[],
            parent=parent
        )

        self.setup_action()
        self.setup_alarm()
        self.setup_connections()
        self.setup_workdir()
        self.setup_process()

        self.export()
        self.run()

    def setup_action(self):
        self.find(QAction, "action_start").setEnabled(False)
        if _signal:
            self.find(QAction, "action_pause").setEnabled(True)
        else:
            self.find(QAction, "action_pause").setEnabled(False)

        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_log_file").setEnabled(False)
        self.find(QAction, "action_results").setEnabled(False)

    def setup_alarm(self):
        self._alarm = QTimer()

    def setup_connections(self):
        self.find(QAction, "action_start").triggered.connect(self.start)
        self.find(QAction, "action_pause").triggered.connect(self.pause)
        self.find(QAction, "action_stop").triggered.connect(self.stop)
        self.find(QAction, "action_log_file").triggered.connect(self.log_file)
        self.find(QAction, "action_results").triggered.connect(self.results)

        self._alarm.timeout.connect(self.update)

    def setup_workdir(self):
        self._workdir = ""
        if self._study.filename == "":
            self._workdir = tempfile.TemporaryDirectory()
        else:
            self._workdir = os.path.join(
                os.path.dirname(self._study.filename),
                "_PAMHYR_",
                self._study.name.replace(" ", "_"),
                self._solver.name.replace(" ", "_"),
            )
            os.makedirs(self._workdir, exist_ok=True)

    def setup_process(self):
        self._alarm.start(100)
        self._output = Queue()
        self._process = self.new_process(self._parent)

    def new_process(self, parent):
        new = QProcess(parent)
        new.setWorkingDirectory(self._workdir)
        new.setProcessChannelMode(QProcess.MergedChannels)
        return new

    def export(self):
        self._log(f" *** Export study {self._solver.name}", color="blue")
        self._solver.export(self._study, self._workdir, qlog=self._output)
        self.update()

    def closeEvent(self, event):
        self._alarm.stop()
        super(SolverLogWindow, self).closeEvent(event)

    #######
    # LOG #
    #######

    def _log(self, msg, color=None):
        if type(msg) is str:
            self._log_str(msg, color)
        elif type(msg) is int:
            self._log_int(msg, color)

    def _log_str(self, msg, color=None):
        if msg == "":
            return

        logger.info(f"solver: {msg}")
        msg = msg.rsplit('\n', 1)[0]

        if color is not None:
            msg = f"<font color=\"{color}\">" + msg + "</font>"

        self.find(QTextEdit, "textEdit").append(msg)

    def _log_int(self, int_code, color=None):
        logger.info(f"solver: Returns {int_code}")
        color = "blue" if int_code == 0 else "red"

        self.find(QTextEdit, "textEdit")\
            .append(
                f"<font color=\"{color}\">" +
                f" *** Finished with code {int_code}" +
                "</font>"
            )

        self.statusbar.showMessage(
            "Done" if int_code == 0 else "Failed",
            3000
        )

    ##########
    # UPDATE #
    ##########

    def update(self):
        if self._solver.is_stoped():
            self.find(QAction, "action_start").setEnabled(True)
            self.find(QAction, "action_pause").setEnabled(False)
            self.find(QAction, "action_stop").setEnabled(False)
            self.find(QAction, "action_results").setEnabled(True)

            if self._solver.log_file() != "":
                self.find(QAction, "action_log_file").setEnabled(True)

            self._update_logs_all()
            # self._update_get_results()

        self._update_logs_all()

    def _update_get_results(self):
        if self._results is None:
            try:
                self._results = self._solver.results(
                    self._study, self._workdir, qlog=self._output
                )
                self._parent.set_results(self._solver, self._results)
            except Exception as e:
                logger.error(f"Failed to open results")
                logger_exception(e)

    def _update_logs_all(self):
        while self._output.qsize() != 0:
            s = self._output.get()

            if type(s) is str and "[ERROR]" in s:
                self._log(s, color="red")
            else:
                self._log(s)

    ####################
    # Process controle #
    ####################

    def run(self):
        self._log(f" *** Run solver {self._solver.name}", color="blue")
        self._solver.run(
            self._study,
            process=self._process,
            output_queue=self._output
        )

    def start(self):
        if self._solver.is_stoped():
            self._log(f" *** Export study {self._solver.name}", color="blue")
            self._solver.export(self._study, self._workdir, qlog=self._output)
            self.update()

            self._process = self.new_process(self._parent)

        self._log(" *** Start", color="blue")
        self._results = None
        self._solver.start(self._study, process=self._process)

        self.find(QAction, "action_start").setEnabled(False)
        if _signal:
            self.find(QAction, "action_pause").setEnabled(True)
        else:
            self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_log_file").setEnabled(False)
        self.find(QAction, "action_results").setEnabled(False)

    def pause(self):
        self._log(" *** Pause", color="blue")
        self._solver.pause()

        self.find(QAction, "action_start").setEnabled(True)
        self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_results").setEnabled(False)

    def stop(self):
        self._log(" *** Stop", color="blue")
        self._solver.kill()

        self.find(QAction, "action_start").setEnabled(True)
        self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(False)
        self.find(QAction, "action_results").setEnabled(True)
        if self._solver.log_file() != "":
            self.find(QAction, "action_log_file").setEnabled(True)

    ###########
    # Results #
    ###########

    def results(self):
        if self._results is None:
            self._results = self._solver.results(
                self._study, self._workdir, qlog=self._output
            )

        self._parent.set_results(self._solver, self._results)
        self._parent.open_solver_results(self._solver, self._results)

        self._solver.has_results_loaded()

    def log_file(self):
        file_name = os.path.join(self._workdir, self._solver.log_file())
        log = SolverLogFileWindow(
            file_name=file_name,
            study=self._study,
            config=self._config,
            solver=self._solver,
            parent=self,
        )
        log.show()