ASolver.py 9.02 KB
Newer Older
# ASolver.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
try:
    from signal import SIGTERM, SIGSTOP, SIGCONT
    _signal = True
except:
    _signal = False

from enum import Enum

from Model.Except import NotImplementedMethodeError

from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile

logger = logging.getLogger()

class STATUS(Enum):
    NOT_LAUNCHED = -1
    STOPED = 0
    RUNNING = 1

class AbstractSolver(object):
    def __init__(self, name):
        super(AbstractSolver, self).__init__()

        self._current_process = None
        self._status = STATUS.NOT_LAUNCHED
        # Informations
        self._type = ""
        self._name = name
        self._description = ""
        self._path_input = ""
        self._path_solver = ""
        self._path_output = ""
        self._cmd_input = ""
        self._cmd_solver = ""
        self._cmd_output = ""
        self._process = None
        self._output = None

    def __str__(self):
        return f"{self._name} : {self._type} : {self._description}"
    def __getitem__(self, key):
        ret = None
        if key == "name":
            ret = self._name
        elif key == "description":
            ret = self._description
        elif key == "type":
            ret = self._type
        return ret

    @classmethod
    def default_parameters(cls):
            ("all_init_time", "000:00:00:00"),
            ("all_final_time", "999:99:00:00"),
            ("all_timestep", "300.0"),
    @property
    def description(self):
    @property
    def status(self):
        return self._status
    @property
    def type(self):
        return self._type
    @status.setter
    def status(self, status):
        self._status = status
    def is_running(self):
        return self._status == STATUS.RUNNING

    def is_paused(self):
        return self._status == STATUS.PAUSED

    def is_stoped(self):
        return self._status == STATUS.STOPED

    @name.setter
    def name(self, name):
        self._name = name
    @description.setter
    def description(self, description):
        self._description = description

    def set_input(self, path, cmd):
        self._path_input = path
        self._cmd_input = cmd

    def set_solver(self, path, cmd):
        self._path_solver = path
        self._cmd_solver = cmd

    def set_output(self, path, cmd):
        self._path_output = path
        self._cmd_output = cmd
    ##########
    # Export #
    ##########

    def export(self, study, repertory, qlog = None):
        raise NotImplementedMethodeError(self, self.export)

    def input_param(self):
        """Return input command line parameter(s)

        Returns:
            Returns input parameter(s) string
        """
        raise NotImplementedMethodeError(self, self.input_param)

    def log_file(self):
        """Return log file name

        Returns:
            Returns log file name as string
        """
        raise NotImplementedMethodeError(self, self.log_file)

    ###########
    # RESULTS #
    ###########

    @timer
    def results(self, study, repertory, qlog = None):
        results = Results(study = study)
        return results

    def _install_dir(self):
        return os.path.abspath(
            os.path.join(
                os.path.dirname(__file__),
                ".."
            )
        )

    def _format_command(self, cmd, path = ""):
        """Format command line
        Args:
            cmd: The command line
            path: Optional path string (replace @path in cmd)

        Returns:
            The executable and list of arguments
        """
        # HACK: Works in most case... Trust me i'm an engineer

        cmd = cmd.replace("@install_dir", self._install_dir())
        cmd = cmd.replace("@path", path.replace(" ", "\ "))
        cmd = cmd.replace("@input", self.input_param())
        cmd = cmd.replace("@dir", self._process.workingDirectory())

        logger.debug(f"! {cmd}")
        if cmd[0] == "\"":
            # Command line executable path is between " char
            cmd = cmd.split("\"")
            exe = cmd[1].replace("\ ", " ")
            args = list(
                filter(
                    lambda s: s != "",
                    "\"".join(cmd[2:]).split(" ")[1:]
                )
            )
        else:
            # We suppose the command line executable path as no space char
            cmd = cmd.replace("\ ", "&_&").split(" ")
            exe = cmd[0].replace("&_&", " ")
            args = list(
                filter(
                    lambda s: s != "",
                    map(lambda s: s.replace("&_&", "\ "), cmd[1:])
                )
            )

        logger.info(f"! {exe} {args}")
        return exe, args

    def run_input_data_fomater(self):
        if self._cmd_input == "":
            self._run_next()
            return True

        cmd = self._cmd_input
        exe, args = self._format_command(cmd, self._path_input)
        if not os.path.exists(exe):
            error = f"[ERROR] Path {exe} do not exists"
            logger.info(error)
            return error

        self._process.start(
            exe, args,
        )

        return True

    def run_solver(self):
            self._run_next()
        cmd = self._cmd_solver
        exe, args = self._format_command(cmd, self._path_solver)
        if not os.path.exists(exe):
            error = f"[ERROR] Path {exe} do not exists"
            logger.info(error)
            return error

        self._process.start(
            exe, args,
        )
        self._status = STATUS.RUNNING
        return True

    def run_output_data_fomater(self):
            self._run_next()
        exe, args = self._format_command(cmd, self._path_output)
        if not os.path.exists(exe):
            error = f"[ERROR] Path {exe} do not exists"
            logger.info(error)
            return error

        self._process.start(
            exe, args,
        )

        return True
    def _data_ready(self):
        s = self._process.readAll().data().decode()
        if self._output is not None:
            for x in s.split('\n'):
                self._output.put(x)
    def _run_next(self):
        self._step += 1
        if self._step < len(self._runs):
            res = self._runs[self._step]()
            if res is not True:
                self._output.put(res)
        else:
            self._status = STATUS.STOPED
    def _finished(self, exit_code, exit_status):
        if self._output is not None:
            self._output.put(exit_code)

        self._run_next()

    def run(self, process = None, output_queue = None):
        if process is not None:
            self._process = process
        if output_queue is not None:
            self._output = output_queue
        self._process.readyRead.connect(self._data_ready)
        self._process.finished.connect(self._finished)

        self._runs = [
            self.run_input_data_fomater,
            self.run_solver,
            self.run_output_data_fomater,
        ]
        self._step = 0
        # Run first step
        res = self._runs[0]()
        if res is not True:
            self._output.put(res)
    def kill(self):
        if self._process is None:
            return True
        self._process.kill()
        self._status = STATUS.STOPED
    def start(self, process = None):
        if _signal:
            if self._status == STATUS.PAUSED:
                os.kill(self._process.pid(), SIGCONT)
                self._status = STATUS.RUNNING
        self.run(process)
        return True

    def pause(self):
        if _signal:
            if self._process is None:
                return False
            os.kill(self._process.pid(), SIGSTOP)
            self._status = STATUS.PAUSED
            return True
        return False

    def stop(self):
        if self._process is None:
            return False

        self._process.terminate()
        self._status = STATUS.STOPED
        return True