# ASolver.py -- Pamhyr # Copyright (C) 2023 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import logging try: from signal import SIGTERM, SIGSTOP, SIGCONT _signal = True except: _signal = False from enum import Enum from Model.Except import NotImplementedMethodeError logger = logging.getLogger() class STATUS(Enum): NOT_LAUNCHED = -1 STOPED = 0 RUNNING = 1 PAUSED = 5 class AbstractSolver(object): _type = "" def __init__(self, name): super(AbstractSolver, self).__init__() self._current_process = None self._status = STATUS.NOT_LAUNCHED # Informations self._type = "" self._name = name self._description = "" self._path_input = "" self._path_solver = "" self._path_output = "" self._cmd_input = "" self._cmd_solver = "" self._cmd_output = "" self._process = None self._output = None def __str__(self): return f"{self._name} : {self._type} : {self._description}" def __getitem__(self, key): ret = None if key == "name": ret = self._name elif key == "description": ret = self._description elif key == "type": ret = self._type return ret @classmethod def default_parameters(cls): lst = [ ("all_init_time", "000:00:00:00"), ("all_final_time", "999:99:00:00"), ("all_timestep", "300.0"), ] return lst @classmethod def checkers(cls): lst = [ ] return lst @property def name(self): return self._name @property def description(self): return self._description @property def status(self): return self._status @property def type(self): return self._type @status.setter def status(self, status): self._status = status def is_running(self): return self._status == STATUS.RUNNING def is_paused(self): return self._status == STATUS.PAUSED def is_stoped(self): return self._status == STATUS.STOPED @name.setter def name(self, name): self._name = name @description.setter def description(self, description): self._description = description def set_input(self, path, cmd): self._path_input = path self._cmd_input = cmd def set_solver(self, path, cmd): self._path_solver = path self._cmd_solver = cmd def set_output(self, path, cmd): self._path_output = path self._cmd_output = cmd ########## # Export # ########## def export(self, study, repertory, qlog = None): raise NotImplementedMethodeError(self, self.export) def input_param(self): """Return input command line parameter(s) Returns: Returns input parameter(s) string """ raise NotImplementedMethodeError(self, self.input_param) def log_file(self): """Return log file name Returns: Returns log file name as string """ raise NotImplementedMethodeError(self, self.log_file) ####### # Run # ####### def _install_dir(self): return os.path.abspath( os.path.join( os.path.dirname(__file__), ".." ) ) def run_input_data_fomater(self): if self._cmd_input == "": self._run_next() return True cmd = self._cmd_input cmd = cmd.replace("@install_dir", self._install_dir()) cmd = cmd.replace("@path", self._path_input) cmd = cmd.replace("@input", self.input_param()) cmd = cmd.replace("@dir", self._process.workingDirectory()) logger.debug(f"! {cmd}") cmd = cmd.split() exe = cmd[0] args = cmd[1:] self._process.start( exe, args, ) return True def run_solver(self): if self._cmd_solver == "": self._run_next() return True cmd = self._cmd_solver cmd = cmd.replace("@install_dir", self._install_dir()) cmd = cmd.replace("@path", self._path_solver) cmd = cmd.replace("@input", self.input_param()) cmd = cmd.replace("@dir", self._process.workingDirectory()) logger.debug(f"! {cmd}") cmd = cmd.split() exe = cmd[0] args = cmd[1:] self._process.start( exe, args, ) self._status = STATUS.RUNNING return True def run_output_data_fomater(self): if self._cmd_output == "": self._run_next() return True cmd = self._cmd_output cmd = cmd.replace("@install_dir", self._install_dir()) cmd = cmd.replace("@path", self._path_output) cmd = cmd.replace("@input", self.input_param()) cmd = cmd.replace("@dir", self._process.workingDirectory()) logger.debug(f"! {cmd}") cmd = cmd.split() exe = cmd[0] args = cmd[1:] self._process.start( exe, args, ) return True def _data_ready(self): s = self._process.readAll().data().decode() if self._output is not None: for x in s.split('\n'): self._output.put(x) def _run_next(self): self._step += 1 if self._step < len(self._runs): self._runs[self._step]() else: self._status = STATUS.STOPED def _finished(self, exit_code, exit_status): if self._output is not None: self._output.put(exit_code) self._run_next() def run(self, process = None, output_queue = None): if process is not None: self._process = process if output_queue is not None: self._output = output_queue self._process.readyRead.connect(self._data_ready) self._process.finished.connect(self._finished) self._runs = [ self.run_input_data_fomater, self.run_solver, self.run_output_data_fomater, ] self._step = 0 # Run first step self._runs[0]() def kill(self): if self._process is None: return True self._process.kill() self._status = STATUS.STOPED return True def start(self, process = None): if _signal: if self._status == STATUS.PAUSED: os.kill(self._process.pid(), SIGCONT) self._status = STATUS.RUNNING return True self.run(process) return True def pause(self): if _signal: if self._process is None: return False os.kill(self._process.pid(), SIGSTOP) self._status = STATUS.PAUSED return True return False def stop(self): if self._process is None: return False self._process.terminate() self._status = STATUS.STOPED return True