Forked from HYCAR-Hydro / airGR
Source project has a limited visibility.
Window.py 8.63 KiB
# Window.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
import tempfile

from queue import Queue
from tools import trace, timer

from View.ASubWindow import ASubWindow, ASubMainWindow
from View.ListedSubWindow import ListedSubWindow

from PyQt5.QtGui import (
    QKeySequence,
)

from PyQt5.QtCore import (
    Qt, QVariant, QAbstractTableModel,
    QCoreApplication, QModelIndex, pyqtSlot,
    QRect, QTimer, QProcess,
)

from PyQt5.QtWidgets import (
    QDialogButtonBox, QPushButton, QLineEdit,
    QFileDialog, QTableView, QAbstractItemView,
    QUndoStack, QShortcut, QAction, QItemDelegate,
    QComboBox, QVBoxLayout, QHeaderView, QTabWidget,
    QTextEdit,
)

from View.RunSolver.Log.Window import SolverLogFileWindow

try:
    from signal import SIGTERM, SIGSTOP, SIGCONT
    _signal = True
except:
    _signal = False

_translate = QCoreApplication.translate

logger = logging.getLogger()

class SelectSolverWindow(ASubWindow, ListedSubWindow):
    def __init__(self, title="Select solver",
                 study=None, config=None,
                 parent=None):
        self._title = title

        self._study = study
        self._config = config
        self._solver = None

        super(SelectSolverWindow, self).__init__(
            name=self._title, ui="SelectSolver", parent=parent
        )
        self.ui.setWindowTitle(self._title)

        self.setup_combobox()
        self.setup_connections()

    def setup_combobox(self):
        solvers = self._config.solvers
        solvers_name = list(map(lambda s: s.name + f" - ({s._type})", solvers))

        self.combobox_add_items("comboBox", solvers_name)

    def setup_connections(self):
        self.find(QPushButton, "pushButton_run").clicked.connect(self.accept)
        self.find(QPushButton, "pushButton_cancel").clicked.connect(self.reject)

    @property
    def solver(self):
        return self._solver

    def accept(self):
        solver_name = self.get_combobox_text("comboBox")
        solver_name = solver_name.rsplit(" - ", 1)[0]

        self._solver = next(
            filter(
                lambda s: s.name == solver_name,
                self._config.solvers
            )
        )

        super(SelectSolverWindow, self).accept()


class SolverLogWindow(ASubMainWindow, ListedSubWindow):
    def __init__(self, title="Solver logs",
                 study=None, config=None,
                 solver=None, parent=None):
        self._title = title
        self._parent = parent

        self._study = study
        self._config = config
        self._solver = solver

        self._results = None

        super(SolverLogWindow, self).__init__(
            name=self._title, ui="SolverLog", parent=parent
        )
        self.ui.setWindowTitle(self._title)

        self.setup_action()
        self.setup_alarm()
        self.setup_connections()

        self._workdir = ""
        if self._study.filename == "":
            self._workdir = tempfile.TemporaryDirectory()
        else:
            self._workdir = os.path.join(
                os.path.dirname(self._study.filename),
                "_PAMHYR_",
                self._study.name.replace(" ", "_"),
                self._solver.name.replace(" ", "_"),
            )
            os.makedirs(self._workdir, exist_ok = True)

        self._alarm.start(500)
        self._output = Queue()
        self._process = self.new_process(parent)

        self._log(f" *** Export study {self._solver.name}", color="blue")
        self._solver.export(self._study, self._workdir, qlog = self._output)

        self.update()

        self._log(f" *** Run solver {self._solver.name}", color="blue")
        self._solver.run(
            process = self._process,
            output_queue = self._output
        )

    def new_process(self, parent):
        new = QProcess(parent)
        new.setWorkingDirectory(self._workdir)
        return new

    def setup_action(self):
        self.find(QAction, "action_start").setEnabled(False)
        if _signal:
            self.find(QAction, "action_pause").setEnabled(True)
        else:
            self.find(QAction, "action_pause").setEnabled(False)

        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_log_file").setEnabled(False)

    def setup_alarm(self):
        self._alarm = QTimer()

    def setup_connections(self):
        self.find(QAction, "action_start").triggered.connect(self.start)
        self.find(QAction, "action_pause").triggered.connect(self.pause)
        self.find(QAction, "action_stop").triggered.connect(self.stop)
        self.find(QAction, "action_log_file").triggered.connect(self.log_file)
        self.find(QAction, "action_results").triggered.connect(self.results)

        self._alarm.timeout.connect(self.update)

    def _log(self, msg, color=None):
        if type(msg) == str:
            msg = msg.rsplit('\n')[0]

            if color is not None:
                msg = f"<font color=\"{color}\">" + msg + "</font>"

            self.find(QTextEdit, "textEdit").append(msg)

        elif type(msg) == int:
            color = "blue" if msg == 0 else "red"
            self.find(QTextEdit, "textEdit")\
                .append(f"<font color=\"{color}\">" +
                        f" *** Finished with code {msg}" +
                        "</font>")

    def update(self):
        if self._solver.is_stoped():
            self.find(QAction, "action_start").setEnabled(True)
            self.find(QAction, "action_pause").setEnabled(False)
            self.find(QAction, "action_stop").setEnabled(False)
            self.find(QAction, "action_results").setEnabled(True)
            if self._solver.log_file() != "":
                self.find(QAction, "action_log_file").setEnabled(True)

        while self._output.qsize() != 0:
            s = self._output.get()
            if type(s) is str and "[ERROR]" in s:
                self._log(s, color="red")
            else:
                self._log(s)

    def start(self):
        if self._solver.is_stoped():
            self._log(f" *** Export study {self._solver.name}", color="blue")
            self._solver.export(self._study, self._workdir, qlog = self._output)
            self.update()

            self._process = self.new_process(self._parent)

        self._log(" *** Start", color="blue")
        self._results = None
        self._solver.start(self._process)

        self.find(QAction, "action_start").setEnabled(False)
        if _signal:
            self.find(QAction, "action_pause").setEnabled(True)
        else:
            self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_log_file").setEnabled(False)
        self.find(QAction, "action_results").setEnabled(False)

    def pause(self):
        self._log(" *** Pause", color="blue")
        self._solver.pause()

        self.find(QAction, "action_start").setEnabled(True)
        self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(True)
        self.find(QAction, "action_results").setEnabled(False)

    def stop(self):
        self._log(" *** Stop", color="blue")
        self._solver.kill()

        self.find(QAction, "action_start").setEnabled(True)
        self.find(QAction, "action_pause").setEnabled(False)
        self.find(QAction, "action_stop").setEnabled(False)
        self.find(QAction, "action_results").setEnabled(True)
        if self._solver.log_file() != "":
            self.find(QAction, "action_log_file").setEnabled(True)

    def results(self):
        if self._results is None:
            self._results = self._solver.results(self._study, self._workdir, qlog = self._output)
        self._parent.open_solver_results(self._solver, self._results)

    def log_file(self):
        file_name = os.path.join(self._workdir, self._solver.log_file())
        log = SolverLogFileWindow(
            file_name = file_name,
            study = self._study,
            config = self._config,
            solver = self._solver,
            parent = self,
        )
        log.show()